source: trunk/libtransmission/peer-msgs.c @ 11599

Last change on this file since 11599 was 11599, checked in by charles, 11 years ago

(trunk) Join the 21st century and use only 1 space at the end sentences. This commit is nearly as important as the semi-annual ones that remove trailing spaces from the ends of lines of code... :)

  • Property svn:keywords set to Date Rev Author Id
File size: 70.5 KB
Line 
1/*
2 * This file Copyright (C) 2007-2010 Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 11599 2010-12-27 19:18:17Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdarg.h>
17#include <stdio.h>
18#include <stdlib.h>
19#include <string.h>
20
21#include <event2/bufferevent.h>
22#include <event2/event.h>
23
24#include "transmission.h"
25#include "bencode.h"
26#include "cache.h"
27#include "completion.h"
28#include "crypto.h"
29#ifdef WIN32
30#include "net.h" /* for ECONN */
31#endif
32#include "peer-io.h"
33#include "peer-mgr.h"
34#include "peer-msgs.h"
35#include "session.h"
36#include "stats.h"
37#include "torrent.h"
38#include "torrent-magnet.h"
39#include "tr-dht.h"
40#include "utils.h"
41#include "version.h"
42
43/**
44***
45**/
46
47enum
48{
49    BT_CHOKE                = 0,
50    BT_UNCHOKE              = 1,
51    BT_INTERESTED           = 2,
52    BT_NOT_INTERESTED       = 3,
53    BT_HAVE                 = 4,
54    BT_BITFIELD             = 5,
55    BT_REQUEST              = 6,
56    BT_PIECE                = 7,
57    BT_CANCEL               = 8,
58    BT_PORT                 = 9,
59
60    BT_FEXT_SUGGEST         = 13,
61    BT_FEXT_HAVE_ALL        = 14,
62    BT_FEXT_HAVE_NONE       = 15,
63    BT_FEXT_REJECT          = 16,
64    BT_FEXT_ALLOWED_FAST    = 17,
65
66    BT_LTEP                 = 20,
67
68    LTEP_HANDSHAKE          = 0,
69
70    UT_PEX_ID               = 1,
71    UT_METADATA_ID          = 3,
72
73    MAX_PEX_PEER_COUNT      = 50,
74
75    MIN_CHOKE_PERIOD_SEC    = 10,
76
77    /* idle seconds before we send a keepalive */
78    KEEPALIVE_INTERVAL_SECS = 100,
79
80    PEX_INTERVAL_SECS       = 90, /* sec between sendPex() calls */
81
82    REQQ                    = 512,
83
84    METADATA_REQQ           = 64,
85
86    /* used in lowering the outMessages queue period */
87    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
88    HIGH_PRIORITY_INTERVAL_SECS = 2,
89    LOW_PRIORITY_INTERVAL_SECS = 10,
90
91    /* number of pieces to remove from the bitfield when
92     * lazy bitfields are turned on */
93    LAZY_PIECE_COUNT = 26,
94
95    /* number of pieces we'll allow in our fast set */
96    MAX_FAST_SET_SIZE = 3,
97
98    /* defined in BEP #9 */
99    METADATA_MSG_TYPE_REQUEST = 0,
100    METADATA_MSG_TYPE_DATA = 1,
101    METADATA_MSG_TYPE_REJECT = 2
102};
103
104enum
105{
106    AWAITING_BT_LENGTH,
107    AWAITING_BT_ID,
108    AWAITING_BT_MESSAGE,
109    AWAITING_BT_PIECE
110};
111
112/**
113***
114**/
115
116struct peer_request
117{
118    uint32_t    index;
119    uint32_t    offset;
120    uint32_t    length;
121};
122
123static uint32_t
124getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
125{
126    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
127    const uint64_t blockPos = tor->blockSize * b;
128    assert( blockPos >= piecePos );
129    return (uint32_t)( blockPos - piecePos );
130}
131
132static void
133blockToReq( const tr_torrent     * tor,
134            tr_block_index_t       block,
135            struct peer_request  * setme )
136{
137    assert( setme != NULL );
138
139    setme->index = tr_torBlockPiece( tor, block );
140    setme->offset = getBlockOffsetInPiece( tor, block );
141    setme->length = tr_torBlockCountBytes( tor, block );
142}
143
144/**
145***
146**/
147
148/* this is raw, unchanged data from the peer regarding
149 * the current message that it's sending us. */
150struct tr_incoming
151{
152    uint8_t                id;
153    uint32_t               length; /* includes the +1 for id length */
154    struct peer_request    blockReq; /* metadata for incoming blocks */
155    struct evbuffer *      block; /* piece data for incoming blocks */
156};
157
158/**
159 * Low-level communication state information about a connected peer.
160 *
161 * This structure remembers the low-level protocol states that we're
162 * in with this peer, such as active requests, pex messages, and so on.
163 * Its fields are all private to peer-msgs.c.
164 *
165 * Data not directly involved with sending & receiving messages is
166 * stored in tr_peer, where it can be accessed by both peermsgs and
167 * the peer manager.
168 *
169 * @see struct peer_atom
170 * @see tr_peer
171 */
172struct tr_peermsgs
173{
174    tr_bool         peerSupportsPex;
175    tr_bool         peerSupportsMetadataXfer;
176    tr_bool         clientSentLtepHandshake;
177    tr_bool         peerSentLtepHandshake;
178
179    /*tr_bool         haveFastSet;*/
180
181    int             desiredRequestCount;
182
183    int             prefetchCount;
184
185    /* how long the outMessages batch should be allowed to grow before
186     * it's flushed -- some messages (like requests >:) should be sent
187     * very quickly; others aren't as urgent. */
188    int8_t          outMessagesBatchPeriod;
189
190    uint8_t         state;
191    uint8_t         ut_pex_id;
192    uint8_t         ut_metadata_id;
193    uint16_t        pexCount;
194    uint16_t        pexCount6;
195
196#if 0
197    size_t                 fastsetSize;
198    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
199#endif
200
201    tr_peer *              peer;
202
203    tr_torrent *           torrent;
204
205    tr_peer_callback      * callback;
206    void                  * callbackData;
207
208    struct evbuffer *      outMessages; /* all the non-piece messages */
209
210    struct peer_request    peerAskedFor[REQQ];
211
212    int                    peerAskedForMetadata[METADATA_REQQ];
213    int                    peerAskedForMetadataCount;
214
215    tr_pex               * pex;
216    tr_pex               * pex6;
217
218    /*time_t                 clientSentPexAt;*/
219    time_t                 clientSentAnythingAt;
220
221    /* when we started batching the outMessages */
222    time_t                outMessagesBatchedAt;
223
224    struct tr_incoming    incoming;
225
226    /* if the peer supports the Extension Protocol in BEP 10 and
227       supplied a reqq argument, it's stored here. Otherwise, the
228       value is zero and should be ignored. */
229    int64_t               reqq;
230
231    struct event        * pexTimer;
232};
233
234/**
235***
236**/
237
238#if 0
239static tr_bitfield*
240getHave( const struct tr_peermsgs * msgs )
241{
242    if( msgs->peer->have == NULL )
243        msgs->peer->have = tr_bitfieldNew( msgs->torrent->info.pieceCount );
244    return msgs->peer->have;
245}
246#endif
247
248static inline tr_session*
249getSession( struct tr_peermsgs * msgs )
250{
251    return msgs->torrent->session;
252}
253
254/**
255***
256**/
257
258static void
259myDebug( const char * file, int line,
260         const struct tr_peermsgs * msgs,
261         const char * fmt, ... )
262{
263    FILE * fp = tr_getLog( );
264
265    if( fp )
266    {
267        va_list           args;
268        char              timestr[64];
269        struct evbuffer * buf = evbuffer_new( );
270        char *            base = tr_basename( file );
271
272        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
273                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
274                             tr_torrentName( msgs->torrent ),
275                             tr_peerIoGetAddrStr( msgs->peer->io ),
276                             msgs->peer->client );
277        va_start( args, fmt );
278        evbuffer_add_vprintf( buf, fmt, args );
279        va_end( args );
280        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
281        fwrite( evbuffer_pullup( buf, -1 ), 1, evbuffer_get_length( buf ), fp );
282
283        tr_free( base );
284        evbuffer_free( buf );
285    }
286}
287
288#define dbgmsg( msgs, ... ) \
289    do { \
290        if( tr_deepLoggingIsActive( ) ) \
291            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
292    } while( 0 )
293
294/**
295***
296**/
297
298static void
299pokeBatchPeriod( tr_peermsgs * msgs,
300                 int           interval )
301{
302    if( msgs->outMessagesBatchPeriod > interval )
303    {
304        msgs->outMessagesBatchPeriod = interval;
305        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
306    }
307}
308
309static void
310dbgOutMessageLen( tr_peermsgs * msgs )
311{
312    dbgmsg( msgs, "outMessage size is now %zu", evbuffer_get_length( msgs->outMessages ) );
313}
314
315static void
316protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
317{
318    struct evbuffer * out = msgs->outMessages;
319
320    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
321
322    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
323    evbuffer_add_uint8 ( out, BT_FEXT_REJECT );
324    evbuffer_add_uint32( out, req->index );
325    evbuffer_add_uint32( out, req->offset );
326    evbuffer_add_uint32( out, req->length );
327
328    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
329    dbgOutMessageLen( msgs );
330}
331
332static void
333protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
334{
335    struct evbuffer * out = msgs->outMessages;
336
337    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
338    evbuffer_add_uint8 ( out, BT_REQUEST );
339    evbuffer_add_uint32( out, req->index );
340    evbuffer_add_uint32( out, req->offset );
341    evbuffer_add_uint32( out, req->length );
342
343    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
344    dbgOutMessageLen( msgs );
345    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
346}
347
348static void
349protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
350{
351    struct evbuffer * out = msgs->outMessages;
352
353    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
354    evbuffer_add_uint8 ( out, BT_CANCEL );
355    evbuffer_add_uint32( out, req->index );
356    evbuffer_add_uint32( out, req->offset );
357    evbuffer_add_uint32( out, req->length );
358
359    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
360    dbgOutMessageLen( msgs );
361    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
362}
363
364static void
365protocolSendPort(tr_peermsgs *msgs, uint16_t port)
366{
367    struct evbuffer * out = msgs->outMessages;
368
369    dbgmsg( msgs, "sending Port %u", port);
370    evbuffer_add_uint32( out, 3 );
371    evbuffer_add_uint8 ( out, BT_PORT );
372    evbuffer_add_uint16( out, port);
373}
374
375static void
376protocolSendHave( tr_peermsgs * msgs, uint32_t index )
377{
378    struct evbuffer * out = msgs->outMessages;
379
380    evbuffer_add_uint32( out, sizeof(uint8_t) + sizeof(uint32_t) );
381    evbuffer_add_uint8 ( out, BT_HAVE );
382    evbuffer_add_uint32( out, index );
383
384    dbgmsg( msgs, "sending Have %u", index );
385    dbgOutMessageLen( msgs );
386    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
387}
388
389#if 0
390static void
391protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
392{
393    tr_peerIo       * io  = msgs->peer->io;
394    struct evbuffer * out = msgs->outMessages;
395
396    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
397
398    evbuffer_add_uint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
399    evbuffer_add_uint8 ( io, out, BT_FEXT_ALLOWED_FAST );
400    evbuffer_add_uint32( io, out, pieceIndex );
401
402    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
403    dbgOutMessageLen( msgs );
404}
405#endif
406
407static void
408protocolSendChoke( tr_peermsgs * msgs, int choke )
409{
410    struct evbuffer * out = msgs->outMessages;
411
412    evbuffer_add_uint32( out, sizeof( uint8_t ) );
413    evbuffer_add_uint8 ( out, choke ? BT_CHOKE : BT_UNCHOKE );
414
415    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
416    dbgOutMessageLen( msgs );
417    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
418}
419
420static void
421protocolSendHaveAll( tr_peermsgs * msgs )
422{
423    struct evbuffer * out = msgs->outMessages;
424
425    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
426
427    evbuffer_add_uint32( out, sizeof( uint8_t ) );
428    evbuffer_add_uint8 ( out, BT_FEXT_HAVE_ALL );
429
430    dbgmsg( msgs, "sending HAVE_ALL..." );
431    dbgOutMessageLen( msgs );
432    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
433}
434
435static void
436protocolSendHaveNone( tr_peermsgs * msgs )
437{
438    struct evbuffer * out = msgs->outMessages;
439
440    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
441
442    evbuffer_add_uint32( out, sizeof( uint8_t ) );
443    evbuffer_add_uint8 ( out, BT_FEXT_HAVE_NONE );
444
445    dbgmsg( msgs, "sending HAVE_NONE..." );
446    dbgOutMessageLen( msgs );
447    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
448}
449
450/**
451***  EVENTS
452**/
453
454static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0, 0 };
455
456static void
457publish( tr_peermsgs * msgs, tr_peer_event * e )
458{
459    assert( msgs->peer );
460    assert( msgs->peer->msgs == msgs );
461
462    if( msgs->callback != NULL )
463        msgs->callback( msgs->peer, e, msgs->callbackData );
464}
465
466static void
467fireError( tr_peermsgs * msgs, int err )
468{
469    tr_peer_event e = blankEvent;
470    e.eventType = TR_PEER_ERROR;
471    e.err = err;
472    publish( msgs, &e );
473}
474
475static void
476firePeerProgress( tr_peermsgs * msgs )
477{
478    tr_peer_event e = blankEvent;
479    e.eventType = TR_PEER_PEER_PROGRESS;
480    e.progress = msgs->peer->progress;
481    publish( msgs, &e );
482}
483
484static void
485fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
486{
487    tr_peer_event e = blankEvent;
488    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
489    e.pieceIndex = req->index;
490    e.offset = req->offset;
491    e.length = req->length;
492    publish( msgs, &e );
493}
494
495static void
496fireGotRej( tr_peermsgs * msgs, const struct peer_request * req )
497{
498    tr_peer_event e = blankEvent;
499    e.eventType = TR_PEER_CLIENT_GOT_REJ;
500    e.pieceIndex = req->index;
501    e.offset = req->offset;
502    e.length = req->length;
503    publish( msgs, &e );
504}
505
506static void
507fireGotChoke( tr_peermsgs * msgs )
508{
509    tr_peer_event e = blankEvent;
510    e.eventType = TR_PEER_CLIENT_GOT_CHOKE;
511    publish( msgs, &e );
512}
513
514static void
515fireClientGotData( tr_peermsgs * msgs,
516                   uint32_t      length,
517                   int           wasPieceData )
518{
519    tr_peer_event e = blankEvent;
520
521    e.length = length;
522    e.eventType = TR_PEER_CLIENT_GOT_DATA;
523    e.wasPieceData = wasPieceData;
524    publish( msgs, &e );
525}
526
527static void
528fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
529{
530    tr_peer_event e = blankEvent;
531    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
532    e.pieceIndex = pieceIndex;
533    publish( msgs, &e );
534}
535
536static void
537fireClientGotPort( tr_peermsgs * msgs, tr_port port )
538{
539    tr_peer_event e = blankEvent;
540    e.eventType = TR_PEER_CLIENT_GOT_PORT;
541    e.port = port;
542    publish( msgs, &e );
543}
544
545static void
546fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
547{
548    tr_peer_event e = blankEvent;
549    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
550    e.pieceIndex = pieceIndex;
551    publish( msgs, &e );
552}
553
554static void
555firePeerGotData( tr_peermsgs  * msgs,
556                 uint32_t       length,
557                 int            wasPieceData )
558{
559    tr_peer_event e = blankEvent;
560
561    e.length = length;
562    e.eventType = TR_PEER_PEER_GOT_DATA;
563    e.wasPieceData = wasPieceData;
564
565    publish( msgs, &e );
566}
567
568/**
569***  ALLOWED FAST SET
570***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
571**/
572
573size_t
574tr_generateAllowedSet( tr_piece_index_t * setmePieces,
575                       size_t             desiredSetSize,
576                       size_t             pieceCount,
577                       const uint8_t    * infohash,
578                       const tr_address * addr )
579{
580    size_t setSize = 0;
581
582    assert( setmePieces );
583    assert( desiredSetSize <= pieceCount );
584    assert( desiredSetSize );
585    assert( pieceCount );
586    assert( infohash );
587    assert( addr );
588
589    if( addr->type == TR_AF_INET )
590    {
591        uint8_t w[SHA_DIGEST_LENGTH + 4], *walk=w;
592        uint8_t x[SHA_DIGEST_LENGTH];
593
594        uint32_t ui32 = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
595        memcpy( w, &ui32, sizeof( uint32_t ) );
596        walk += sizeof( uint32_t );
597        memcpy( walk, infohash, SHA_DIGEST_LENGTH );                 /* (2) */
598        walk += SHA_DIGEST_LENGTH;
599        tr_sha1( x, w, walk-w, NULL );                               /* (3) */
600        assert( sizeof( w ) == walk-w );
601
602        while( setSize<desiredSetSize )
603        {
604            int i;
605            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
606            {
607                size_t k;
608                uint32_t j = i * 4;                                  /* (5) */
609                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
610                uint32_t index = y % pieceCount;                     /* (7) */
611
612                for( k=0; k<setSize; ++k )                           /* (8) */
613                    if( setmePieces[k] == index )
614                        break;
615
616                if( k == setSize )
617                    setmePieces[setSize++] = index;                  /* (9) */
618            }
619
620            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
621        }
622    }
623
624    return setSize;
625}
626
627static void
628updateFastSet( tr_peermsgs * msgs UNUSED )
629{
630#if 0
631    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
632    const int peerIsNeedy = msgs->peer->progress < 0.10;
633
634    if( fext && peerIsNeedy && !msgs->haveFastSet )
635    {
636        size_t i;
637        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
638        const tr_info * inf = &msgs->torrent->info;
639        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
640
641        /* build the fast set */
642        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
643        msgs->haveFastSet = 1;
644
645        /* send it to the peer */
646        for( i=0; i<msgs->fastsetSize; ++i )
647            protocolSendAllowedFast( msgs, msgs->fastset[i] );
648    }
649#endif
650}
651
652/**
653***  INTEREST
654**/
655
656static void
657sendInterest( tr_peermsgs * msgs, tr_bool clientIsInterested )
658{
659    struct evbuffer * out = msgs->outMessages;
660
661    assert( msgs );
662    assert( tr_isBool( clientIsInterested ) );
663
664    msgs->peer->clientIsInterested = clientIsInterested;
665    dbgmsg( msgs, "Sending %s", clientIsInterested ? "Interested" : "Not Interested" );
666    evbuffer_add_uint32( out, sizeof( uint8_t ) );
667    evbuffer_add_uint8 ( out, clientIsInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
668
669    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
670    dbgOutMessageLen( msgs );
671}
672
673static void
674updateInterest( tr_peermsgs * msgs UNUSED )
675{
676    /* FIXME -- might need to poke the mgr on startup */
677}
678
679void
680tr_peerMsgsSetInterested( tr_peermsgs * msgs, int isInterested )
681{
682    assert( tr_isBool( isInterested ) );
683
684    if( isInterested != msgs->peer->clientIsInterested )
685        sendInterest( msgs, isInterested );
686}
687
688static tr_bool
689popNextMetadataRequest( tr_peermsgs * msgs, int * piece )
690{
691    if( msgs->peerAskedForMetadataCount == 0 )
692        return FALSE;
693
694    *piece = msgs->peerAskedForMetadata[0];
695
696    tr_removeElementFromArray( msgs->peerAskedForMetadata, 0, sizeof( int ),
697                               msgs->peerAskedForMetadataCount-- );
698
699    return TRUE;
700}
701
702static tr_bool
703popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
704{
705    if( msgs->peer->pendingReqsToClient == 0 )
706        return FALSE;
707
708    *setme = msgs->peerAskedFor[0];
709
710    tr_removeElementFromArray( msgs->peerAskedFor, 0, sizeof( struct peer_request ),
711                               msgs->peer->pendingReqsToClient-- );
712
713    return TRUE;
714}
715
716static void
717cancelAllRequestsToClient( tr_peermsgs * msgs )
718{
719    struct peer_request req;
720    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
721
722    while( popNextRequest( msgs, &req ))
723        if( mustSendCancel )
724            protocolSendReject( msgs, &req );
725}
726
727void
728tr_peerMsgsSetChoke( tr_peermsgs * msgs,
729                     int           choke )
730{
731    const time_t now = tr_time( );
732    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
733
734    assert( msgs );
735    assert( msgs->peer );
736    assert( choke == 0 || choke == 1 );
737
738    if( msgs->peer->chokeChangedAt > fibrillationTime )
739    {
740        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
741    }
742    else if( msgs->peer->peerIsChoked != choke )
743    {
744        msgs->peer->peerIsChoked = choke;
745        if( choke )
746            cancelAllRequestsToClient( msgs );
747        protocolSendChoke( msgs, choke );
748        msgs->peer->chokeChangedAt = now;
749    }
750}
751
752/**
753***
754**/
755
756void
757tr_peerMsgsHave( tr_peermsgs * msgs,
758                 uint32_t      index )
759{
760    protocolSendHave( msgs, index );
761
762    /* since we have more pieces now, we might not be interested in this peer */
763    updateInterest( msgs );
764}
765
766/**
767***
768**/
769
770static tr_bool
771reqIsValid( const tr_peermsgs * peer,
772            uint32_t            index,
773            uint32_t            offset,
774            uint32_t            length )
775{
776    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
777}
778
779static tr_bool
780requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
781{
782    return reqIsValid( msgs, req->index, req->offset, req->length );
783}
784
785void
786tr_peerMsgsCancel( tr_peermsgs * msgs, tr_block_index_t block )
787{
788    struct peer_request req;
789/*fprintf( stderr, "SENDING CANCEL MESSAGE FOR BLOCK %zu\n\t\tFROM PEER %p ------------------------------------\n", (size_t)block, msgs->peer );*/
790    blockToReq( msgs->torrent, block, &req );
791    protocolSendCancel( msgs, &req );
792}
793
794/**
795***
796**/
797
798static void
799sendLtepHandshake( tr_peermsgs * msgs )
800{
801    tr_benc val, *m;
802    char * buf;
803    int len;
804    tr_bool allow_pex;
805    tr_bool allow_metadata_xfer;
806    struct evbuffer * out = msgs->outMessages;
807    const unsigned char * ipv6 = tr_globalIPv6();
808
809    if( msgs->clientSentLtepHandshake )
810        return;
811
812    dbgmsg( msgs, "sending an ltep handshake" );
813    msgs->clientSentLtepHandshake = 1;
814
815    /* decide if we want to advertise metadata xfer support (BEP 9) */
816    if( tr_torrentIsPrivate( msgs->torrent ) )
817        allow_metadata_xfer = 0;
818    else
819        allow_metadata_xfer = 1;
820
821    /* decide if we want to advertise pex support */
822    if( !tr_torrentAllowsPex( msgs->torrent ) )
823        allow_pex = 0;
824    else if( msgs->peerSentLtepHandshake )
825        allow_pex = msgs->peerSupportsPex ? 1 : 0;
826    else
827        allow_pex = 1;
828
829    tr_bencInitDict( &val, 8 );
830    tr_bencDictAddInt( &val, "e", getSession(msgs)->encryptionMode != TR_CLEAR_PREFERRED );
831    if( ipv6 != NULL )
832        tr_bencDictAddRaw( &val, "ipv6", ipv6, 16 );
833    if( allow_metadata_xfer && tr_torrentHasMetadata( msgs->torrent )
834                            && ( msgs->torrent->infoDictLength > 0 ) )
835        tr_bencDictAddInt( &val, "metadata_size", msgs->torrent->infoDictLength );
836    tr_bencDictAddInt( &val, "p", tr_sessionGetPublicPeerPort( getSession(msgs) ) );
837    tr_bencDictAddInt( &val, "reqq", REQQ );
838    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
839    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
840    m  = tr_bencDictAddDict( &val, "m", 2 );
841    if( allow_metadata_xfer )
842        tr_bencDictAddInt( m, "ut_metadata", UT_METADATA_ID );
843    if( allow_pex )
844        tr_bencDictAddInt( m, "ut_pex", UT_PEX_ID );
845
846    buf = tr_bencToStr( &val, TR_FMT_BENC, &len );
847
848    evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + len );
849    evbuffer_add_uint8 ( out, BT_LTEP );
850    evbuffer_add_uint8 ( out, LTEP_HANDSHAKE );
851    evbuffer_add       ( out, buf, len );
852    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
853    dbgOutMessageLen( msgs );
854
855    /* cleanup */
856    tr_bencFree( &val );
857    tr_free( buf );
858}
859
860static void
861parseLtepHandshake( tr_peermsgs *     msgs,
862                    int               len,
863                    struct evbuffer * inbuf )
864{
865    int64_t   i;
866    tr_benc   val, * sub;
867    uint8_t * tmp = tr_new( uint8_t, len );
868    const uint8_t *addr;
869    size_t addr_len;
870    tr_pex pex;
871    int8_t seedProbability = -1;
872
873    memset( &pex, 0, sizeof( tr_pex ) );
874
875    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
876    msgs->peerSentLtepHandshake = 1;
877
878    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
879    {
880        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
881        tr_free( tmp );
882        return;
883    }
884
885    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
886
887    /* does the peer prefer encrypted connections? */
888    if( tr_bencDictFindInt( &val, "e", &i ) ) {
889        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
890                                              : ENCRYPTION_PREFERENCE_NO;
891        if( i )
892            pex.flags |= ADDED_F_ENCRYPTION_FLAG;
893    }
894
895    /* check supported messages for utorrent pex */
896    msgs->peerSupportsPex = 0;
897    msgs->peerSupportsMetadataXfer = 0;
898
899    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
900        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
901            msgs->peerSupportsPex = i != 0;
902            msgs->ut_pex_id = (uint8_t) i;
903            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
904        }
905        if( tr_bencDictFindInt( sub, "ut_metadata", &i ) ) {
906            msgs->peerSupportsMetadataXfer = i != 0;
907            msgs->ut_metadata_id = (uint8_t) i;
908            dbgmsg( msgs, "msgs->ut_metadata_id is %d", (int)msgs->ut_metadata_id );
909        }
910    }
911
912    /* look for metainfo size (BEP 9) */
913    if( tr_bencDictFindInt( &val, "metadata_size", &i ) )
914        tr_torrentSetMetadataSizeHint( msgs->torrent, i );
915
916    /* look for upload_only (BEP 21) */
917    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
918        seedProbability = i==0 ? 0 : 100;
919
920    /* get peer's listening port */
921    if( tr_bencDictFindInt( &val, "p", &i ) ) {
922        pex.port = htons( (uint16_t)i );
923        fireClientGotPort( msgs, pex.port );
924        dbgmsg( msgs, "peer's port is now %d", (int)i );
925    }
926
927    if( tr_peerIoIsIncoming( msgs->peer->io )
928        && tr_bencDictFindRaw( &val, "ipv4", &addr, &addr_len )
929        && ( addr_len == 4 ) )
930    {
931        pex.addr.type = TR_AF_INET;
932        memcpy( &pex.addr.addr.addr4, addr, 4 );
933        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
934    }
935
936    if( tr_peerIoIsIncoming( msgs->peer->io )
937        && tr_bencDictFindRaw( &val, "ipv6", &addr, &addr_len )
938        && ( addr_len == 16 ) )
939    {
940        pex.addr.type = TR_AF_INET6;
941        memcpy( &pex.addr.addr.addr6, addr, 16 );
942        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
943    }
944
945    /* get peer's maximum request queue size */
946    if( tr_bencDictFindInt( &val, "reqq", &i ) )
947        msgs->reqq = i;
948
949    tr_bencFree( &val );
950    tr_free( tmp );
951}
952
953static void
954parseUtMetadata( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
955{
956    tr_benc dict;
957    char * msg_end;
958    char * benc_end;
959    int64_t msg_type = -1;
960    int64_t piece = -1;
961    int64_t total_size = 0;
962    uint8_t * tmp = tr_new( uint8_t, msglen );
963
964    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
965    msg_end = (char*)tmp + msglen;
966
967    if( !tr_bencLoad( tmp, msglen, &dict, &benc_end ) )
968    {
969        tr_bencDictFindInt( &dict, "msg_type", &msg_type );
970        tr_bencDictFindInt( &dict, "piece", &piece );
971        tr_bencDictFindInt( &dict, "total_size", &total_size );
972        tr_bencFree( &dict );
973    }
974
975    dbgmsg( msgs, "got ut_metadata msg: type %d, piece %d, total_size %d",
976            (int)msg_type, (int)piece, (int)total_size );
977
978    if( msg_type == METADATA_MSG_TYPE_REJECT )
979    {
980        /* NOOP */
981    }
982
983    if( ( msg_type == METADATA_MSG_TYPE_DATA )
984        && ( !tr_torrentHasMetadata( msgs->torrent ) )
985        && ( msg_end - benc_end <= METADATA_PIECE_SIZE )
986        && ( piece * METADATA_PIECE_SIZE + (msg_end - benc_end) <= total_size ) )
987    {
988        const int pieceLen = msg_end - benc_end;
989        tr_torrentSetMetadataPiece( msgs->torrent, piece, benc_end, pieceLen );
990    }
991
992    if( msg_type == METADATA_MSG_TYPE_REQUEST )
993    {
994        if( ( piece >= 0 )
995            && tr_torrentHasMetadata( msgs->torrent )
996            && !tr_torrentIsPrivate( msgs->torrent )
997            && ( msgs->peerAskedForMetadataCount < METADATA_REQQ ) )
998        {
999            msgs->peerAskedForMetadata[msgs->peerAskedForMetadataCount++] = piece;
1000        }
1001        else
1002        {
1003            tr_benc tmp;
1004            int payloadLen;
1005            char * payload;
1006            struct evbuffer * out = msgs->outMessages;
1007
1008            /* build the rejection message */
1009            tr_bencInitDict( &tmp, 2 );
1010            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1011            tr_bencDictAddInt( &tmp, "piece", piece );
1012            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1013            tr_bencFree( &tmp );
1014
1015            /* write it out as a LTEP message to our outMessages buffer */
1016            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen );
1017            evbuffer_add_uint8 ( out, BT_LTEP );
1018            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1019            evbuffer_add       ( out, payload, payloadLen );
1020            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1021            dbgOutMessageLen( msgs );
1022
1023            tr_free( payload );
1024        }
1025    }
1026
1027    tr_free( tmp );
1028}
1029
1030static void
1031parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1032{
1033    int loaded = 0;
1034    uint8_t * tmp = tr_new( uint8_t, msglen );
1035    tr_benc val;
1036    tr_torrent * tor = msgs->torrent;
1037    const uint8_t * added;
1038    size_t added_len;
1039
1040    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1041
1042    if( tr_torrentAllowsPex( tor )
1043      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1044    {
1045        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1046        {
1047            tr_pex * pex;
1048            size_t i, n;
1049            size_t added_f_len = 0;
1050            const uint8_t * added_f = NULL;
1051
1052            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1053            pex = tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len, &n );
1054
1055            n = MIN( n, MAX_PEX_PEER_COUNT );
1056            for( i=0; i<n; ++i )
1057            {
1058                int seedProbability = -1;
1059                if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
1060                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
1061            }
1062
1063            tr_free( pex );
1064        }
1065
1066        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1067        {
1068            tr_pex * pex;
1069            size_t i, n;
1070            size_t added_f_len = 0;
1071            const uint8_t * added_f = NULL;
1072
1073            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1074            pex = tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len, &n );
1075
1076            n = MIN( n, MAX_PEX_PEER_COUNT );
1077            for( i=0; i<n; ++i )
1078            {
1079                int seedProbability = -1;
1080                if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
1081                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
1082            }
1083
1084            tr_free( pex );
1085        }
1086    }
1087
1088    if( loaded )
1089        tr_bencFree( &val );
1090    tr_free( tmp );
1091}
1092
1093static void sendPex( tr_peermsgs * msgs );
1094
1095static void
1096parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer  * inbuf )
1097{
1098    uint8_t ltep_msgid;
1099
1100    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1101    msglen--;
1102
1103    if( ltep_msgid == LTEP_HANDSHAKE )
1104    {
1105        dbgmsg( msgs, "got ltep handshake" );
1106        parseLtepHandshake( msgs, msglen, inbuf );
1107        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1108        {
1109            sendLtepHandshake( msgs );
1110            sendPex( msgs );
1111        }
1112    }
1113    else if( ltep_msgid == UT_PEX_ID )
1114    {
1115        dbgmsg( msgs, "got ut pex" );
1116        msgs->peerSupportsPex = 1;
1117        parseUtPex( msgs, msglen, inbuf );
1118    }
1119    else if( ltep_msgid == UT_METADATA_ID )
1120    {
1121        dbgmsg( msgs, "got ut metadata" );
1122        msgs->peerSupportsMetadataXfer = 1;
1123        parseUtMetadata( msgs, msglen, inbuf );
1124    }
1125    else
1126    {
1127        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1128        evbuffer_drain( inbuf, msglen );
1129    }
1130}
1131
1132static int
1133readBtLength( tr_peermsgs *     msgs,
1134              struct evbuffer * inbuf,
1135              size_t            inlen )
1136{
1137    uint32_t len;
1138
1139    if( inlen < sizeof( len ) )
1140        return READ_LATER;
1141
1142    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1143
1144    if( len == 0 ) /* peer sent us a keepalive message */
1145        dbgmsg( msgs, "got KeepAlive" );
1146    else
1147    {
1148        msgs->incoming.length = len;
1149        msgs->state = AWAITING_BT_ID;
1150    }
1151
1152    return READ_NOW;
1153}
1154
1155static int readBtMessage( tr_peermsgs     * msgs,
1156                          struct evbuffer * inbuf,
1157                          size_t            inlen );
1158
1159static int
1160readBtId( tr_peermsgs * msgs, struct evbuffer  * inbuf, size_t inlen )
1161{
1162    uint8_t id;
1163
1164    if( inlen < sizeof( uint8_t ) )
1165        return READ_LATER;
1166
1167    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1168    msgs->incoming.id = id;
1169    dbgmsg( msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length );
1170
1171    if( id == BT_PIECE )
1172    {
1173        msgs->state = AWAITING_BT_PIECE;
1174        return READ_NOW;
1175    }
1176    else if( msgs->incoming.length != 1 )
1177    {
1178        msgs->state = AWAITING_BT_MESSAGE;
1179        return READ_NOW;
1180    }
1181    else return readBtMessage( msgs, inbuf, inlen - 1 );
1182}
1183
1184static void
1185updatePeerProgress( tr_peermsgs * msgs )
1186{
1187    msgs->peer->progress = tr_bitsetPercent( &msgs->peer->have );
1188    dbgmsg( msgs, "peer progress is %f", msgs->peer->progress );
1189    updateFastSet( msgs );
1190    updateInterest( msgs );
1191    firePeerProgress( msgs );
1192}
1193
1194static void
1195prefetchPieces( tr_peermsgs *msgs )
1196{
1197    int i;
1198
1199    /* Maintain 12 prefetched blocks per unchoked peer */
1200    for( i=msgs->prefetchCount; i<msgs->peer->pendingReqsToClient && i<12; ++i )
1201    {
1202        const struct peer_request * req = msgs->peerAskedFor + i;
1203        if( requestIsValid( msgs, req ) )
1204        {
1205            tr_cachePrefetchBlock( getSession(msgs)->cache, msgs->torrent, req->index, req->offset, req->length );
1206            ++msgs->prefetchCount;
1207        }
1208    }
1209}
1210
1211static void
1212peerMadeRequest( tr_peermsgs *               msgs,
1213                 const struct peer_request * req )
1214{
1215    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1216    const int reqIsValid = requestIsValid( msgs, req );
1217    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( &msgs->torrent->completion, req->index );
1218    const int peerIsChoked = msgs->peer->peerIsChoked;
1219
1220    int allow = FALSE;
1221
1222    if( !reqIsValid )
1223        dbgmsg( msgs, "rejecting an invalid request." );
1224    else if( !clientHasPiece )
1225        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1226    else if( peerIsChoked )
1227        dbgmsg( msgs, "rejecting request from choked peer" );
1228    else if( msgs->peer->pendingReqsToClient + 1 >= REQQ )
1229        dbgmsg( msgs, "rejecting request ... reqq is full" );
1230    else
1231        allow = TRUE;
1232
1233    if( allow ) {
1234        msgs->peerAskedFor[msgs->peer->pendingReqsToClient++] = *req;
1235        prefetchPieces( msgs );
1236    } else if( fext ) {
1237        protocolSendReject( msgs, req );
1238    }
1239}
1240
1241static tr_bool
1242messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1243{
1244    switch( id )
1245    {
1246        case BT_CHOKE:
1247        case BT_UNCHOKE:
1248        case BT_INTERESTED:
1249        case BT_NOT_INTERESTED:
1250        case BT_FEXT_HAVE_ALL:
1251        case BT_FEXT_HAVE_NONE:
1252            return len == 1;
1253
1254        case BT_HAVE:
1255        case BT_FEXT_SUGGEST:
1256        case BT_FEXT_ALLOWED_FAST:
1257            return len == 5;
1258
1259        case BT_BITFIELD:
1260            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1261
1262        case BT_REQUEST:
1263        case BT_CANCEL:
1264        case BT_FEXT_REJECT:
1265            return len == 13;
1266
1267        case BT_PIECE:
1268            return len > 9 && len <= 16393;
1269
1270        case BT_PORT:
1271            return len == 3;
1272
1273        case BT_LTEP:
1274            return len >= 2;
1275
1276        default:
1277            return FALSE;
1278    }
1279}
1280
1281static int clientGotBlock( tr_peermsgs *               msgs,
1282                           const uint8_t *             block,
1283                           const struct peer_request * req );
1284
1285static int
1286readBtPiece( tr_peermsgs      * msgs,
1287             struct evbuffer  * inbuf,
1288             size_t             inlen,
1289             size_t           * setme_piece_bytes_read )
1290{
1291    struct peer_request * req = &msgs->incoming.blockReq;
1292
1293    assert( evbuffer_get_length( inbuf ) >= inlen );
1294    dbgmsg( msgs, "In readBtPiece" );
1295
1296    if( !req->length )
1297    {
1298        if( inlen < 8 )
1299            return READ_LATER;
1300
1301        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1302        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1303        req->length = msgs->incoming.length - 9;
1304        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1305        return READ_NOW;
1306    }
1307    else
1308    {
1309        int err;
1310
1311        /* read in another chunk of data */
1312        const size_t nLeft = req->length - evbuffer_get_length( msgs->incoming.block );
1313        size_t n = MIN( nLeft, inlen );
1314        size_t i = n;
1315        void * buf = tr_sessionGetBuffer( getSession( msgs ) );
1316        const size_t buflen = SESSION_BUFFER_SIZE;
1317
1318        while( i > 0 )
1319        {
1320            const size_t thisPass = MIN( i, buflen );
1321            tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, thisPass );
1322            evbuffer_add( msgs->incoming.block, buf, thisPass );
1323            i -= thisPass;
1324        }
1325
1326        tr_sessionReleaseBuffer( getSession( msgs ) );
1327        buf = NULL;
1328
1329        fireClientGotData( msgs, n, TRUE );
1330        *setme_piece_bytes_read += n;
1331        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1332               n, req->index, req->offset, req->length,
1333               (int)( req->length - evbuffer_get_length( msgs->incoming.block ) ) );
1334        if( evbuffer_get_length( msgs->incoming.block ) < req->length )
1335            return READ_LATER;
1336
1337        /* we've got the whole block ... process it */
1338        err = clientGotBlock( msgs, evbuffer_pullup( msgs->incoming.block, -1 ), req );
1339
1340        /* cleanup */
1341        evbuffer_free( msgs->incoming.block );
1342        msgs->incoming.block = evbuffer_new( );
1343        req->length = 0;
1344        msgs->state = AWAITING_BT_LENGTH;
1345        return err ? READ_ERR : READ_NOW;
1346    }
1347}
1348
1349static void updateDesiredRequestCount( tr_peermsgs * msgs );
1350
1351static int
1352readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1353{
1354    uint32_t      ui32;
1355    uint32_t      msglen = msgs->incoming.length;
1356    const uint8_t id = msgs->incoming.id;
1357#ifndef NDEBUG
1358    const size_t  startBufLen = evbuffer_get_length( inbuf );
1359#endif
1360    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1361
1362    --msglen; /* id length */
1363
1364    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1365
1366    if( inlen < msglen )
1367        return READ_LATER;
1368
1369    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1370    {
1371        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1372        fireError( msgs, EMSGSIZE );
1373        return READ_ERR;
1374    }
1375
1376    switch( id )
1377    {
1378        case BT_CHOKE:
1379            dbgmsg( msgs, "got Choke" );
1380            msgs->peer->clientIsChoked = 1;
1381            if( !fext )
1382                fireGotChoke( msgs );
1383            break;
1384
1385        case BT_UNCHOKE:
1386            dbgmsg( msgs, "got Unchoke" );
1387            msgs->peer->clientIsChoked = 0;
1388            updateDesiredRequestCount( msgs );
1389            break;
1390
1391        case BT_INTERESTED:
1392            dbgmsg( msgs, "got Interested" );
1393            msgs->peer->peerIsInterested = 1;
1394            break;
1395
1396        case BT_NOT_INTERESTED:
1397            dbgmsg( msgs, "got Not Interested" );
1398            msgs->peer->peerIsInterested = 0;
1399            break;
1400
1401        case BT_HAVE:
1402            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1403            dbgmsg( msgs, "got Have: %u", ui32 );
1404            if( tr_torrentHasMetadata( msgs->torrent )
1405                    && ( ui32 >= msgs->torrent->info.pieceCount ) )
1406            {
1407                fireError( msgs, ERANGE );
1408                return READ_ERR;
1409            }
1410            if( tr_bitsetAdd( &msgs->peer->have, ui32 ) )
1411            {
1412                fireError( msgs, ERANGE );
1413                return READ_ERR;
1414            }
1415            updatePeerProgress( msgs );
1416            break;
1417
1418        case BT_BITFIELD: {
1419            const size_t bitCount = tr_torrentHasMetadata( msgs->torrent )
1420                                  ? msgs->torrent->info.pieceCount
1421                                  : msglen * 8;
1422            dbgmsg( msgs, "got a bitfield" );
1423            tr_bitsetReserve( &msgs->peer->have, bitCount );
1424            tr_peerIoReadBytes( msgs->peer->io, inbuf,
1425                                msgs->peer->have.bitfield.bits, msglen );
1426            updatePeerProgress( msgs );
1427            break;
1428        }
1429
1430        case BT_REQUEST:
1431        {
1432            struct peer_request r;
1433            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1434            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1435            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1436            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1437            peerMadeRequest( msgs, &r );
1438            break;
1439        }
1440
1441        case BT_CANCEL:
1442        {
1443            int i;
1444            struct peer_request r;
1445            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1446            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1447            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1448            tr_historyAdd( msgs->peer->cancelsSentToClient, tr_time( ), 1 );
1449            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1450
1451            for( i=0; i<msgs->peer->pendingReqsToClient; ++i ) {
1452                const struct peer_request * req = msgs->peerAskedFor + i;
1453                if( ( req->index == r.index ) && ( req->offset == r.offset ) && ( req->length == r.length ) )
1454                    break;
1455            }
1456
1457            if( i < msgs->peer->pendingReqsToClient )
1458                tr_removeElementFromArray( msgs->peerAskedFor, i, sizeof( struct peer_request ),
1459                                           msgs->peer->pendingReqsToClient-- );
1460            break;
1461        }
1462
1463        case BT_PIECE:
1464            assert( 0 ); /* handled elsewhere! */
1465            break;
1466
1467        case BT_PORT:
1468            dbgmsg( msgs, "Got a BT_PORT" );
1469            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->dht_port );
1470            if( msgs->peer->dht_port > 0 )
1471                tr_dhtAddNode( getSession(msgs),
1472                               tr_peerAddress( msgs->peer ),
1473                               msgs->peer->dht_port, 0 );
1474            break;
1475
1476        case BT_FEXT_SUGGEST:
1477            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1478            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1479            if( fext )
1480                fireClientGotSuggest( msgs, ui32 );
1481            else {
1482                fireError( msgs, EMSGSIZE );
1483                return READ_ERR;
1484            }
1485            break;
1486
1487        case BT_FEXT_ALLOWED_FAST:
1488            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1489            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1490            if( fext )
1491                fireClientGotAllowedFast( msgs, ui32 );
1492            else {
1493                fireError( msgs, EMSGSIZE );
1494                return READ_ERR;
1495            }
1496            break;
1497
1498        case BT_FEXT_HAVE_ALL:
1499            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1500            if( fext ) {
1501                tr_bitsetSetHaveAll( &msgs->peer->have );
1502                updatePeerProgress( msgs );
1503            } else {
1504                fireError( msgs, EMSGSIZE );
1505                return READ_ERR;
1506            }
1507            break;
1508
1509        case BT_FEXT_HAVE_NONE:
1510            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1511            if( fext ) {
1512                tr_bitsetSetHaveNone( &msgs->peer->have );
1513                updatePeerProgress( msgs );
1514            } else {
1515                fireError( msgs, EMSGSIZE );
1516                return READ_ERR;
1517            }
1518            break;
1519
1520        case BT_FEXT_REJECT:
1521        {
1522            struct peer_request r;
1523            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1524            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1525            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1526            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1527            if( fext )
1528                fireGotRej( msgs, &r );
1529            else {
1530                fireError( msgs, EMSGSIZE );
1531                return READ_ERR;
1532            }
1533            break;
1534        }
1535
1536        case BT_LTEP:
1537            dbgmsg( msgs, "Got a BT_LTEP" );
1538            parseLtep( msgs, msglen, inbuf );
1539            break;
1540
1541        default:
1542            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1543            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1544            break;
1545    }
1546
1547    assert( msglen + 1 == msgs->incoming.length );
1548    assert( evbuffer_get_length( inbuf ) == startBufLen - msglen );
1549
1550    msgs->state = AWAITING_BT_LENGTH;
1551    return READ_NOW;
1552}
1553
1554static void
1555addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1556{
1557    if( !msgs->peer->blame )
1558         msgs->peer->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1559    tr_bitfieldAdd( msgs->peer->blame, index );
1560}
1561
1562/* returns 0 on success, or an errno on failure */
1563static int
1564clientGotBlock( tr_peermsgs *               msgs,
1565                const uint8_t *             data,
1566                const struct peer_request * req )
1567{
1568    int err;
1569    tr_torrent * tor = msgs->torrent;
1570    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1571
1572    assert( msgs );
1573    assert( req );
1574
1575    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1576        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1577                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1578        return EMSGSIZE;
1579    }
1580
1581    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1582
1583    if( !tr_peerMgrDidPeerRequest( msgs->torrent, msgs->peer, block ) ) {
1584        dbgmsg( msgs, "we didn't ask for this message..." );
1585        return 0;
1586    }
1587    if( tr_cpPieceIsComplete( &msgs->torrent->completion, req->index ) ) {
1588        dbgmsg( msgs, "we did ask for this message, but the piece is already complete..." );
1589        return 0;
1590    }
1591
1592    /**
1593    ***  Save the block
1594    **/
1595
1596    if(( err = tr_cacheWriteBlock( getSession(msgs)->cache, tor, req->index, req->offset, req->length, data )))
1597        return err;
1598
1599    addPeerToBlamefield( msgs, req->index );
1600    fireGotBlock( msgs, req );
1601    return 0;
1602}
1603
1604static int peerPulse( void * vmsgs );
1605
1606static void
1607didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1608{
1609    tr_peermsgs * msgs = vmsgs;
1610    firePeerGotData( msgs, bytesWritten, wasPieceData );
1611
1612    if ( tr_isPeerIo( io ) && io->userData )
1613        peerPulse( msgs );
1614}
1615
1616static ReadState
1617canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1618{
1619    ReadState         ret;
1620    tr_peermsgs *     msgs = vmsgs;
1621    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1622    const size_t      inlen = evbuffer_get_length( in );
1623
1624    dbgmsg( msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state );
1625
1626    if( !inlen )
1627    {
1628        ret = READ_LATER;
1629    }
1630    else if( msgs->state == AWAITING_BT_PIECE )
1631    {
1632        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1633    }
1634    else switch( msgs->state )
1635    {
1636        case AWAITING_BT_LENGTH:
1637            ret = readBtLength ( msgs, in, inlen ); break;
1638
1639        case AWAITING_BT_ID:
1640            ret = readBtId     ( msgs, in, inlen ); break;
1641
1642        case AWAITING_BT_MESSAGE:
1643            ret = readBtMessage( msgs, in, inlen ); break;
1644
1645        default:
1646            ret = READ_ERR;
1647            assert( 0 );
1648    }
1649
1650    dbgmsg( msgs, "canRead: ret is %d", (int)ret );
1651
1652    /* log the raw data that was read */
1653    if( ( ret != READ_ERR ) && ( evbuffer_get_length( in ) != inlen ) )
1654        fireClientGotData( msgs, inlen - evbuffer_get_length( in ), FALSE );
1655
1656    return ret;
1657}
1658
1659int
1660tr_peerMsgsIsReadingBlock( const tr_peermsgs * msgs, tr_block_index_t block )
1661{
1662    if( msgs->state != AWAITING_BT_PIECE )
1663        return FALSE;
1664
1665    return block == _tr_block( msgs->torrent,
1666                               msgs->incoming.blockReq.index,
1667                               msgs->incoming.blockReq.offset );
1668}
1669
1670/**
1671***
1672**/
1673
1674static void
1675updateDesiredRequestCount( tr_peermsgs * msgs )
1676{
1677    const tr_torrent * const torrent = msgs->torrent;
1678
1679    if( tr_torrentIsSeed( msgs->torrent ) )
1680    {
1681        msgs->desiredRequestCount = 0;
1682    }
1683    else if( msgs->peer->clientIsChoked )
1684    {
1685        msgs->desiredRequestCount = 0;
1686    }
1687    else if( !msgs->peer->clientIsInterested )
1688    {
1689        msgs->desiredRequestCount = 0;
1690    }
1691    else
1692    {
1693        int estimatedBlocksInPeriod;
1694        int rate_Bps;
1695        int irate_Bps;
1696        const int floor = 4;
1697        const int seconds = REQUEST_BUF_SECS;
1698        const uint64_t now = tr_time_msec( );
1699
1700        /* Get the rate limit we should use.
1701         * FIXME: this needs to consider all the other peers as well... */
1702        rate_Bps = tr_peerGetPieceSpeed_Bps( msgs->peer, now, TR_PEER_TO_CLIENT );
1703        if( tr_torrentUsesSpeedLimit( torrent, TR_PEER_TO_CLIENT ) )
1704            rate_Bps = MIN( rate_Bps, tr_torrentGetSpeedLimit_Bps( torrent, TR_PEER_TO_CLIENT ) );
1705
1706        /* honor the session limits, if enabled */
1707        if( tr_torrentUsesSessionLimits( torrent ) )
1708            if( tr_sessionGetActiveSpeedLimit_Bps( torrent->session, TR_PEER_TO_CLIENT, &irate_Bps ) )
1709                rate_Bps = MIN( rate_Bps, irate_Bps );
1710
1711        /* use this desired rate to figure out how
1712         * many requests we should send to this peer */
1713        estimatedBlocksInPeriod = ( rate_Bps * seconds ) / torrent->blockSize;
1714        msgs->desiredRequestCount = MAX( floor, estimatedBlocksInPeriod );
1715
1716        /* honor the peer's maximum request count, if specified */
1717        if( msgs->reqq > 0 )
1718            if( msgs->desiredRequestCount > msgs->reqq )
1719                msgs->desiredRequestCount = msgs->reqq;
1720    }
1721}
1722
1723static void
1724updateMetadataRequests( tr_peermsgs * msgs, time_t now )
1725{
1726    int piece;
1727
1728    if( msgs->peerSupportsMetadataXfer
1729        && tr_torrentGetNextMetadataRequest( msgs->torrent, now, &piece ) )
1730    {
1731        tr_benc tmp;
1732        int payloadLen;
1733        char * payload;
1734        struct evbuffer * out = msgs->outMessages;
1735
1736        /* build the data message */
1737        tr_bencInitDict( &tmp, 3 );
1738        tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REQUEST );
1739        tr_bencDictAddInt( &tmp, "piece", piece );
1740        payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1741        tr_bencFree( &tmp );
1742
1743        dbgmsg( msgs, "requesting metadata piece #%d", piece );
1744
1745        /* write it out as a LTEP message to our outMessages buffer */
1746        evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen );
1747        evbuffer_add_uint8 ( out, BT_LTEP );
1748        evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1749        evbuffer_add       ( out, payload, payloadLen );
1750        pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1751        dbgOutMessageLen( msgs );
1752
1753        tr_free( payload );
1754    }
1755}
1756
1757static void
1758updateBlockRequests( tr_peermsgs * msgs )
1759{
1760    if( tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT )
1761        && ( msgs->desiredRequestCount > 0 )
1762        && ( msgs->peer->pendingReqsToPeer <= ( msgs->desiredRequestCount * 0.66 ) ) )
1763    {
1764        int i;
1765        int n;
1766        const int numwant = msgs->desiredRequestCount - msgs->peer->pendingReqsToPeer;
1767        tr_block_index_t * blocks = tr_new( tr_block_index_t, numwant );
1768
1769        tr_peerMgrGetNextRequests( msgs->torrent, msgs->peer, numwant, blocks, &n );
1770
1771        for( i=0; i<n; ++i )
1772        {
1773            struct peer_request req;
1774            blockToReq( msgs->torrent, blocks[i], &req );
1775            protocolSendRequest( msgs, &req );
1776        }
1777
1778        tr_free( blocks );
1779    }
1780}
1781
1782static size_t
1783fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1784{
1785    int piece;
1786    size_t bytesWritten = 0;
1787    struct peer_request req;
1788    const tr_bool haveMessages = evbuffer_get_length( msgs->outMessages ) != 0;
1789    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1790
1791    /**
1792    ***  Protocol messages
1793    **/
1794
1795    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1796    {
1797        dbgmsg( msgs, "started an outMessages batch (length is %zu)", evbuffer_get_length( msgs->outMessages ) );
1798        msgs->outMessagesBatchedAt = now;
1799    }
1800    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1801    {
1802        const size_t len = evbuffer_get_length( msgs->outMessages );
1803        /* flush the protocol messages */
1804        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1805        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, FALSE );
1806        msgs->clientSentAnythingAt = now;
1807        msgs->outMessagesBatchedAt = 0;
1808        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1809        bytesWritten +=  len;
1810    }
1811
1812    /**
1813    ***  Metadata Pieces
1814    **/
1815
1816    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= METADATA_PIECE_SIZE )
1817        && popNextMetadataRequest( msgs, &piece ) )
1818    {
1819        char * data;
1820        int dataLen;
1821        tr_bool ok = FALSE;
1822
1823        data = tr_torrentGetMetadataPiece( msgs->torrent, piece, &dataLen );
1824        if( ( dataLen > 0 ) && ( data != NULL ) )
1825        {
1826            tr_benc tmp;
1827            int payloadLen;
1828            char * payload;
1829            struct evbuffer * out = msgs->outMessages;
1830
1831            /* build the data message */
1832            tr_bencInitDict( &tmp, 3 );
1833            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_DATA );
1834            tr_bencDictAddInt( &tmp, "piece", piece );
1835            tr_bencDictAddInt( &tmp, "total_size", msgs->torrent->infoDictLength );
1836            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1837            tr_bencFree( &tmp );
1838
1839            /* write it out as a LTEP message to our outMessages buffer */
1840            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen + dataLen );
1841            evbuffer_add_uint8 ( out, BT_LTEP );
1842            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1843            evbuffer_add       ( out, payload, payloadLen );
1844            evbuffer_add       ( out, data, dataLen );
1845            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1846            dbgOutMessageLen( msgs );
1847
1848            tr_free( payload );
1849            tr_free( data );
1850
1851            ok = TRUE;
1852        }
1853
1854        if( !ok ) /* send a rejection message */
1855        {
1856            tr_benc tmp;
1857            int payloadLen;
1858            char * payload;
1859            struct evbuffer * out = msgs->outMessages;
1860
1861            /* build the rejection message */
1862            tr_bencInitDict( &tmp, 2 );
1863            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1864            tr_bencDictAddInt( &tmp, "piece", piece );
1865            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1866            tr_bencFree( &tmp );
1867
1868            /* write it out as a LTEP message to our outMessages buffer */
1869            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen );
1870            evbuffer_add_uint8 ( out, BT_LTEP );
1871            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1872            evbuffer_add       ( out, payload, payloadLen );
1873            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1874            dbgOutMessageLen( msgs );
1875
1876            tr_free( payload );
1877        }
1878    }
1879
1880    /**
1881    ***  Data Blocks
1882    **/
1883
1884    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= msgs->torrent->blockSize )
1885        && popNextRequest( msgs, &req ) )
1886    {
1887        --msgs->prefetchCount;
1888
1889        if( requestIsValid( msgs, &req )
1890            && tr_cpPieceIsComplete( &msgs->torrent->completion, req.index ) )
1891        {
1892            int err;
1893            const uint32_t msglen = 4 + 1 + 4 + 4 + req.length;
1894            struct evbuffer * out;
1895            struct evbuffer_iovec iovec[1];
1896
1897            out = evbuffer_new( );
1898            evbuffer_expand( out, msglen );
1899
1900            evbuffer_add_uint32( out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1901            evbuffer_add_uint8 ( out, BT_PIECE );
1902            evbuffer_add_uint32( out, req.index );
1903            evbuffer_add_uint32( out, req.offset );
1904
1905            evbuffer_reserve_space( out, req.length, iovec, 1 );
1906            err = tr_cacheReadBlock( getSession(msgs)->cache, msgs->torrent, req.index, req.offset, req.length, iovec[0].iov_base );
1907            iovec[0].iov_len = req.length;
1908            evbuffer_commit_space( out, iovec, 1 );
1909
1910            /* if we couldn't load a piece we thought we could load... */
1911            if( err )
1912                tr_torrentSetLocalError( msgs->torrent, _( "Couldn't read piece #%zu from disk! Please Verify Local Data." ), (size_t)req.index );
1913
1914            /* check the piece if it needs checking... */
1915            if( !err && tr_torrentPieceNeedsCheck( msgs->torrent, req.index ) )
1916                if(( err = !tr_torrentCheckPiece( msgs->torrent, req.index )))
1917                    tr_torrentSetLocalError( msgs->torrent, _( "Piece #%zu is corrupt! Please Verify Local Data." ), (size_t)req.index );
1918
1919            if( err )
1920            {
1921                if( fext )
1922                    protocolSendReject( msgs, &req );
1923            }
1924            else
1925            {
1926                const size_t n = evbuffer_get_length( out );
1927                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1928                assert( n == msglen );
1929                tr_peerIoWriteBuf( msgs->peer->io, out, TRUE );
1930                bytesWritten += n;
1931                msgs->clientSentAnythingAt = now;
1932                tr_historyAdd( msgs->peer->blocksSentToPeer, tr_time( ), 1 );
1933            }
1934
1935            evbuffer_free( out );
1936
1937            if( err )
1938            {
1939                bytesWritten = 0;
1940                msgs = NULL;
1941            }
1942        }
1943        else if( fext ) /* peer needs a reject message */
1944        {
1945            protocolSendReject( msgs, &req );
1946        }
1947
1948        if( msgs != NULL )
1949            prefetchPieces( msgs );
1950    }
1951
1952    /**
1953    ***  Keepalive
1954    **/
1955
1956    if( ( msgs != NULL )
1957        && ( msgs->clientSentAnythingAt != 0 )
1958        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1959    {
1960        dbgmsg( msgs, "sending a keepalive message" );
1961        evbuffer_add_uint32( msgs->outMessages, 0 );
1962        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1963    }
1964
1965    return bytesWritten;
1966}
1967
1968static int
1969peerPulse( void * vmsgs )
1970{
1971    tr_peermsgs * msgs = vmsgs;
1972    const time_t  now = tr_time( );
1973
1974    if ( tr_isPeerIo( msgs->peer->io ) ) {
1975        updateDesiredRequestCount( msgs );
1976        updateBlockRequests( msgs );
1977        updateMetadataRequests( msgs, now );
1978    }
1979
1980    for( ;; )
1981        if( fillOutputBuffer( msgs, now ) < 1 )
1982            break;
1983
1984    return TRUE; /* loop forever */
1985}
1986
1987void
1988tr_peerMsgsPulse( tr_peermsgs * msgs )
1989{
1990    if( msgs != NULL )
1991        peerPulse( msgs );
1992}
1993
1994static void
1995gotError( tr_peerIo  * io UNUSED,
1996          short        what,
1997          void       * vmsgs )
1998{
1999    if( what & BEV_EVENT_TIMEOUT )
2000        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
2001    if( what & ( BEV_EVENT_EOF | BEV_EVENT_ERROR ) )
2002        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
2003               what, errno, tr_strerror( errno ) );
2004    fireError( vmsgs, ENOTCONN );
2005}
2006
2007static void
2008sendBitfield( tr_peermsgs * msgs )
2009{
2010    struct evbuffer * out = msgs->outMessages;
2011    tr_bitfield *     field;
2012    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
2013    size_t            i;
2014    size_t            lazyCount = 0;
2015
2016    field = tr_bitfieldDup( tr_cpPieceBitfield( &msgs->torrent->completion ) );
2017
2018    if( tr_sessionIsLazyBitfieldEnabled( getSession( msgs ) ) )
2019    {
2020        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
2021            speed over a truly random sample -- let's limit the pool size to
2022            the first 1000 pieces so large torrents don't bog things down */
2023        size_t poolSize;
2024        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
2025        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
2026
2027        /* build the pool */
2028        for( i=poolSize=0; i<maxPoolSize; ++i )
2029            if( tr_bitfieldHas( field, i ) )
2030                pool[poolSize++] = i;
2031
2032        /* pull random piece indices from the pool */
2033        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
2034        {
2035            const int pos = tr_cryptoWeakRandInt( poolSize );
2036            const tr_piece_index_t piece = pool[pos];
2037            tr_bitfieldRem( field, piece );
2038            lazyPieces[lazyCount++] = piece;
2039            pool[pos] = pool[--poolSize];
2040        }
2041
2042        /* cleanup */
2043        tr_free( pool );
2044    }
2045
2046    evbuffer_add_uint32( out, sizeof( uint8_t ) + field->byteCount );
2047    evbuffer_add_uint8 ( out, BT_BITFIELD );
2048    evbuffer_add       ( out, field->bits, field->byteCount );
2049    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu",
2050            evbuffer_get_length( out ) );
2051    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
2052
2053    for( i = 0; i < lazyCount; ++i )
2054        protocolSendHave( msgs, lazyPieces[i] );
2055
2056    tr_bitfieldFree( field );
2057}
2058
2059static void
2060tellPeerWhatWeHave( tr_peermsgs * msgs )
2061{
2062    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
2063
2064    if( fext && ( tr_cpGetStatus( &msgs->torrent->completion ) == TR_SEED ) )
2065    {
2066        protocolSendHaveAll( msgs );
2067    }
2068    else if( fext && ( tr_cpHaveValid( &msgs->torrent->completion ) == 0 ) )
2069    {
2070        protocolSendHaveNone( msgs );
2071    }
2072    else
2073    {
2074        sendBitfield( msgs );
2075    }
2076}
2077
2078/**
2079***
2080**/
2081
2082/* some peers give us error messages if we send
2083   more than this many peers in a single pex message
2084   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
2085#define MAX_PEX_ADDED 50
2086#define MAX_PEX_DROPPED 50
2087
2088typedef struct
2089{
2090    tr_pex *  added;
2091    tr_pex *  dropped;
2092    tr_pex *  elements;
2093    int       addedCount;
2094    int       droppedCount;
2095    int       elementCount;
2096}
2097PexDiffs;
2098
2099static void
2100pexAddedCb( void * vpex,
2101            void * userData )
2102{
2103    PexDiffs * diffs = userData;
2104    tr_pex *   pex = vpex;
2105
2106    if( diffs->addedCount < MAX_PEX_ADDED )
2107    {
2108        diffs->added[diffs->addedCount++] = *pex;
2109        diffs->elements[diffs->elementCount++] = *pex;
2110    }
2111}
2112
2113static inline void
2114pexDroppedCb( void * vpex,
2115              void * userData )
2116{
2117    PexDiffs * diffs = userData;
2118    tr_pex *   pex = vpex;
2119
2120    if( diffs->droppedCount < MAX_PEX_DROPPED )
2121    {
2122        diffs->dropped[diffs->droppedCount++] = *pex;
2123    }
2124}
2125
2126static inline void
2127pexElementCb( void * vpex,
2128              void * userData )
2129{
2130    PexDiffs * diffs = userData;
2131    tr_pex * pex = vpex;
2132
2133    diffs->elements[diffs->elementCount++] = *pex;
2134}
2135
2136static void
2137sendPex( tr_peermsgs * msgs )
2138{
2139    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2140    {
2141        PexDiffs diffs;
2142        PexDiffs diffs6;
2143        tr_pex * newPex = NULL;
2144        tr_pex * newPex6 = NULL;
2145        const int newCount = tr_peerMgrGetPeers( msgs->torrent, &newPex, TR_AF_INET, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2146        const int newCount6 = tr_peerMgrGetPeers( msgs->torrent, &newPex6, TR_AF_INET6, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2147
2148        /* build the diffs */
2149        diffs.added = tr_new( tr_pex, newCount );
2150        diffs.addedCount = 0;
2151        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2152        diffs.droppedCount = 0;
2153        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2154        diffs.elementCount = 0;
2155        tr_set_compare( msgs->pex, msgs->pexCount,
2156                        newPex, newCount,
2157                        tr_pexCompare, sizeof( tr_pex ),
2158                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2159        diffs6.added = tr_new( tr_pex, newCount6 );
2160        diffs6.addedCount = 0;
2161        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
2162        diffs6.droppedCount = 0;
2163        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
2164        diffs6.elementCount = 0;
2165        tr_set_compare( msgs->pex6, msgs->pexCount6,
2166                        newPex6, newCount6,
2167                        tr_pexCompare, sizeof( tr_pex ),
2168                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
2169        dbgmsg(
2170            msgs,
2171            "pex: old peer count %d+%d, new peer count %d+%d, "
2172            "added %d+%d, removed %d+%d",
2173            msgs->pexCount, msgs->pexCount6, newCount, newCount6,
2174            diffs.addedCount, diffs6.addedCount,
2175            diffs.droppedCount, diffs6.droppedCount );
2176
2177        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2178            !diffs6.droppedCount )
2179        {
2180            tr_free( diffs.elements );
2181            tr_free( diffs6.elements );
2182        }
2183        else
2184        {
2185            int  i;
2186            tr_benc val;
2187            char * benc;
2188            int bencLen;
2189            uint8_t * tmp, *walk;
2190            struct evbuffer * out = msgs->outMessages;
2191
2192            /* update peer */
2193            tr_free( msgs->pex );
2194            msgs->pex = diffs.elements;
2195            msgs->pexCount = diffs.elementCount;
2196            tr_free( msgs->pex6 );
2197            msgs->pex6 = diffs6.elements;
2198            msgs->pexCount6 = diffs6.elementCount;
2199
2200            /* build the pex payload */
2201            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
2202                                         * speed vs. likelihood? */
2203
2204            if( diffs.addedCount > 0)
2205            {
2206                /* "added" */
2207                tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2208                for( i = 0; i < diffs.addedCount; ++i ) {
2209                    memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2210                    memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2211                }
2212                assert( ( walk - tmp ) == diffs.addedCount * 6 );
2213                tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2214                tr_free( tmp );
2215
2216                /* "added.f" */
2217                tmp = walk = tr_new( uint8_t, diffs.addedCount );
2218                for( i = 0; i < diffs.addedCount; ++i )
2219                    *walk++ = diffs.added[i].flags;
2220                assert( ( walk - tmp ) == diffs.addedCount );
2221                tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2222                tr_free( tmp );
2223            }
2224
2225            if( diffs.droppedCount > 0 )
2226            {
2227                /* "dropped" */
2228                tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2229                for( i = 0; i < diffs.droppedCount; ++i ) {
2230                    memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2231                    memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2232                }
2233                assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2234                tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2235                tr_free( tmp );
2236            }
2237
2238            if( diffs6.addedCount > 0 )
2239            {
2240                /* "added6" */
2241                tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2242                for( i = 0; i < diffs6.addedCount; ++i ) {
2243                    memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2244                    walk += 16;
2245                    memcpy( walk, &diffs6.added[i].port, 2 );
2246                    walk += 2;
2247                }
2248                assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2249                tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2250                tr_free( tmp );
2251
2252                /* "added6.f" */
2253                tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2254                for( i = 0; i < diffs6.addedCount; ++i )
2255                    *walk++ = diffs6.added[i].flags;
2256                assert( ( walk - tmp ) == diffs6.addedCount );
2257                tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2258                tr_free( tmp );
2259            }
2260
2261            if( diffs6.droppedCount > 0 )
2262            {
2263                /* "dropped6" */
2264                tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2265                for( i = 0; i < diffs6.droppedCount; ++i ) {
2266                    memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2267                    walk += 16;
2268                    memcpy( walk, &diffs6.dropped[i].port, 2 );
2269                    walk += 2;
2270                }
2271                assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2272                tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2273                tr_free( tmp );
2274            }
2275
2276            /* write the pex message */
2277            benc = tr_bencToStr( &val, TR_FMT_BENC, &bencLen );
2278            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + bencLen );
2279            evbuffer_add_uint8 ( out, BT_LTEP );
2280            evbuffer_add_uint8 ( out, msgs->ut_pex_id );
2281            evbuffer_add       ( out, benc, bencLen );
2282            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2283            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", evbuffer_get_length( out ) );
2284            dbgOutMessageLen( msgs );
2285
2286            tr_free( benc );
2287            tr_bencFree( &val );
2288        }
2289
2290        /* cleanup */
2291        tr_free( diffs.added );
2292        tr_free( diffs.dropped );
2293        tr_free( newPex );
2294        tr_free( diffs6.added );
2295        tr_free( diffs6.dropped );
2296        tr_free( newPex6 );
2297
2298        /*msgs->clientSentPexAt = tr_time( );*/
2299    }
2300}
2301
2302static void
2303pexPulse( int foo UNUSED, short bar UNUSED, void * vmsgs )
2304{
2305    struct tr_peermsgs * msgs = vmsgs;
2306
2307    sendPex( msgs );
2308
2309    tr_timerAdd( msgs->pexTimer, PEX_INTERVAL_SECS, 0 );
2310}
2311
2312/**
2313***
2314**/
2315
2316tr_peermsgs*
2317tr_peerMsgsNew( struct tr_torrent    * torrent,
2318                struct tr_peer       * peer,
2319                tr_peer_callback     * callback,
2320                void                 * callbackData )
2321{
2322    tr_peermsgs * m;
2323
2324    assert( peer );
2325    assert( peer->io );
2326
2327    m = tr_new0( tr_peermsgs, 1 );
2328    m->callback = callback;
2329    m->callbackData = callbackData;
2330    m->peer = peer;
2331    m->torrent = torrent;
2332    m->peer->clientIsChoked = 1;
2333    m->peer->peerIsChoked = 1;
2334    m->peer->clientIsInterested = 0;
2335    m->peer->peerIsInterested = 0;
2336    m->state = AWAITING_BT_LENGTH;
2337    m->outMessages = evbuffer_new( );
2338    m->outMessagesBatchedAt = 0;
2339    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2340    m->incoming.block = evbuffer_new( );
2341    m->pexTimer = evtimer_new( torrent->session->event_base, pexPulse, m );
2342    peer->msgs = m;
2343    tr_timerAdd( m->pexTimer, PEX_INTERVAL_SECS, 0 );
2344
2345    if( tr_peerIoSupportsLTEP( peer->io ) )
2346        sendLtepHandshake( m );
2347
2348    tellPeerWhatWeHave( m );
2349
2350    if( tr_dhtEnabled( torrent->session ) && tr_peerIoSupportsDHT( peer->io ))
2351    {
2352        /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
2353        const struct tr_address *addr = tr_peerIoGetAddress( peer->io, NULL );
2354        if( addr->type == TR_AF_INET || tr_globalIPv6() ) {
2355            protocolSendPort( m, tr_dhtPort( torrent->session ) );
2356        }
2357    }
2358
2359    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2360    updateDesiredRequestCount( m );
2361
2362    return m;
2363}
2364
2365void
2366tr_peerMsgsFree( tr_peermsgs* msgs )
2367{
2368    if( msgs )
2369    {
2370        event_free( msgs->pexTimer );
2371
2372        evbuffer_free( msgs->incoming.block );
2373        evbuffer_free( msgs->outMessages );
2374        tr_free( msgs->pex6 );
2375        tr_free( msgs->pex );
2376
2377        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2378        tr_free( msgs );
2379    }
2380}
Note: See TracBrowser for help on using the repository browser.