source: trunk/libtransmission/peer-msgs.c @ 9779

Last change on this file since 9779 was 9779, checked in by charles, 13 years ago

(trunk libT) #2671 "downloading from too many peers" -- address a smaller aspect of this ticket, by reducing the minimum number of blocks we request from a peer, as noted by jch's comment @ http://trac.transmissionbt.com/ticket/2671#comment:2

  • Property svn:keywords set to Date Rev Author Id
File size: 71.1 KB
Line 
1/*
2 * This file Copyright (C) 2007-2009 Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 9779 2009-12-15 18:11:20Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <event.h>
21
22#include "transmission.h"
23#include "bencode.h"
24#include "completion.h"
25#include "crypto.h"
26#include "inout.h"
27#ifdef WIN32
28#include "net.h" /* for ECONN */
29#endif
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-msgs.h"
33#include "platform.h" /* MAX_STACK_ARRAY_SIZE */
34#include "session.h"
35#include "stats.h"
36#include "torrent.h"
37#include "torrent-magnet.h"
38#include "tr-dht.h"
39#include "utils.h"
40#include "version.h"
41
42/**
43***
44**/
45
46enum
47{
48    BT_CHOKE                = 0,
49    BT_UNCHOKE              = 1,
50    BT_INTERESTED           = 2,
51    BT_NOT_INTERESTED       = 3,
52    BT_HAVE                 = 4,
53    BT_BITFIELD             = 5,
54    BT_REQUEST              = 6,
55    BT_PIECE                = 7,
56    BT_CANCEL               = 8,
57    BT_PORT                 = 9,
58
59    BT_FEXT_SUGGEST         = 13,
60    BT_FEXT_HAVE_ALL        = 14,
61    BT_FEXT_HAVE_NONE       = 15,
62    BT_FEXT_REJECT          = 16,
63    BT_FEXT_ALLOWED_FAST    = 17,
64
65    BT_LTEP                 = 20,
66
67    LTEP_HANDSHAKE          = 0,
68
69    UT_PEX_ID               = 1,
70    UT_METADATA_ID          = 3,
71
72    MAX_PEX_PEER_COUNT      = 50,
73
74    MIN_CHOKE_PERIOD_SEC    = 10,
75
76    /* idle seconds before we send a keepalive */
77    KEEPALIVE_INTERVAL_SECS = 100,
78
79    PEX_INTERVAL_SECS       = 90, /* sec between sendPex() calls */
80
81    REQQ                    = 512,
82
83    METADATA_REQQ           = 64,
84
85    MAX_BLOCK_SIZE          = ( 1024 * 16 ),
86
87    /* used in lowering the outMessages queue period */
88    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
89    HIGH_PRIORITY_INTERVAL_SECS = 2,
90    LOW_PRIORITY_INTERVAL_SECS = 10,
91
92    /* number of pieces to remove from the bitfield when
93     * lazy bitfields are turned on */
94    LAZY_PIECE_COUNT = 26,
95
96    /* number of pieces we'll allow in our fast set */
97    MAX_FAST_SET_SIZE = 3,
98
99    /* defined in BEP #9 */
100    METADATA_MSG_TYPE_REQUEST = 0,
101    METADATA_MSG_TYPE_DATA = 1,
102    METADATA_MSG_TYPE_REJECT = 2
103};
104
105enum
106{
107    AWAITING_BT_LENGTH,
108    AWAITING_BT_ID,
109    AWAITING_BT_MESSAGE,
110    AWAITING_BT_PIECE
111};
112
113/**
114***
115**/
116
117struct peer_request
118{
119    uint32_t    index;
120    uint32_t    offset;
121    uint32_t    length;
122};
123
124static uint32_t
125getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
126{
127    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
128    const uint64_t blockPos = tor->blockSize * b;
129    assert( blockPos >= piecePos );
130    return (uint32_t)( blockPos - piecePos );
131}
132
133static void
134blockToReq( const tr_torrent     * tor,
135            tr_block_index_t       block,
136            struct peer_request  * setme )
137{
138    assert( setme != NULL );
139
140    setme->index = tr_torBlockPiece( tor, block );
141    setme->offset = getBlockOffsetInPiece( tor, block );
142    setme->length = tr_torBlockCountBytes( tor, block );
143}
144
145/**
146***
147**/
148
149/* this is raw, unchanged data from the peer regarding
150 * the current message that it's sending us. */
151struct tr_incoming
152{
153    uint8_t                id;
154    uint32_t               length; /* includes the +1 for id length */
155    struct peer_request    blockReq; /* metadata for incoming blocks */
156    struct evbuffer *      block; /* piece data for incoming blocks */
157};
158
159/**
160 * Low-level communication state information about a connected peer.
161 *
162 * This structure remembers the low-level protocol states that we're
163 * in with this peer, such as active requests, pex messages, and so on.
164 * Its fields are all private to peer-msgs.c.
165 *
166 * Data not directly involved with sending & receiving messages is
167 * stored in tr_peer, where it can be accessed by both peermsgs and
168 * the peer manager.
169 *
170 * @see struct peer_atom
171 * @see tr_peer
172 */
173struct tr_peermsgs
174{
175    tr_bool         peerSupportsPex;
176    tr_bool         peerSupportsMetadataXfer;
177    tr_bool         clientSentLtepHandshake;
178    tr_bool         peerSentLtepHandshake;
179
180    /*tr_bool         haveFastSet;*/
181
182    int             activeRequestCount;
183    int             desiredRequestCount;
184
185    int             prefetchCount;
186
187    /* how long the outMessages batch should be allowed to grow before
188     * it's flushed -- some messages (like requests >:) should be sent
189     * very quickly; others aren't as urgent. */
190    int8_t          outMessagesBatchPeriod;
191
192    uint8_t         state;
193    uint8_t         ut_pex_id;
194    uint8_t         ut_metadata_id;
195    uint16_t        pexCount;
196    uint16_t        pexCount6;
197
198#if 0
199    size_t                 fastsetSize;
200    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
201#endif
202
203    tr_peer *              peer;
204
205    tr_torrent *           torrent;
206
207    tr_publisher           publisher;
208
209    struct evbuffer *      outMessages; /* all the non-piece messages */
210
211    struct peer_request    peerAskedFor[REQQ];
212    int                    peerAskedForCount;
213
214    int                    peerAskedForMetadata[METADATA_REQQ];
215    int                    peerAskedForMetadataCount;
216
217    tr_pex               * pex;
218    tr_pex               * pex6;
219
220    /*time_t                 clientSentPexAt;*/
221    time_t                 clientSentAnythingAt;
222
223    /* when we started batching the outMessages */
224    time_t                outMessagesBatchedAt;
225
226    struct tr_incoming    incoming;
227
228    /* if the peer supports the Extension Protocol in BEP 10 and
229       supplied a reqq argument, it's stored here.  otherwise the
230       value is zero and should be ignored. */
231    int64_t               reqq;
232
233    struct event          pexTimer;
234};
235
236/**
237***
238**/
239
240#if 0
241static tr_bitfield*
242getHave( const struct tr_peermsgs * msgs )
243{
244    if( msgs->peer->have == NULL )
245        msgs->peer->have = tr_bitfieldNew( msgs->torrent->info.pieceCount );
246    return msgs->peer->have;
247}
248#endif
249
250static TR_INLINE tr_session*
251getSession( struct tr_peermsgs * msgs )
252{
253    return msgs->torrent->session;
254}
255
256/**
257***
258**/
259
260static void
261myDebug( const char * file, int line,
262         const struct tr_peermsgs * msgs,
263         const char * fmt, ... )
264{
265    FILE * fp = tr_getLog( );
266
267    if( fp )
268    {
269        va_list           args;
270        char              timestr[64];
271        struct evbuffer * buf = evbuffer_new( );
272        char *            base = tr_basename( file );
273
274        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
275                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
276                             tr_torrentName( msgs->torrent ),
277                             tr_peerIoGetAddrStr( msgs->peer->io ),
278                             msgs->peer->client );
279        va_start( args, fmt );
280        evbuffer_add_vprintf( buf, fmt, args );
281        va_end( args );
282        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
283        /* FIXME(libevent2) tr_getLog() should return an fd, then use evbuffer_write() here */
284        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
285
286        tr_free( base );
287        evbuffer_free( buf );
288    }
289}
290
291#define dbgmsg( msgs, ... ) \
292    do { \
293        if( tr_deepLoggingIsActive( ) ) \
294            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
295    } while( 0 )
296
297/**
298***
299**/
300
301static void
302pokeBatchPeriod( tr_peermsgs * msgs,
303                 int           interval )
304{
305    if( msgs->outMessagesBatchPeriod > interval )
306    {
307        msgs->outMessagesBatchPeriod = interval;
308        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
309    }
310}
311
312static TR_INLINE void
313dbgOutMessageLen( tr_peermsgs * msgs )
314{
315    dbgmsg( msgs, "outMessage size is now %zu", EVBUFFER_LENGTH( msgs->outMessages ) );
316}
317
318static void
319protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
320{
321    tr_peerIo       * io  = msgs->peer->io;
322    struct evbuffer * out = msgs->outMessages;
323
324    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
325
326    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
327    tr_peerIoWriteUint8 ( io, out, BT_FEXT_REJECT );
328    tr_peerIoWriteUint32( io, out, req->index );
329    tr_peerIoWriteUint32( io, out, req->offset );
330    tr_peerIoWriteUint32( io, out, req->length );
331
332    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
333    dbgOutMessageLen( msgs );
334}
335
336static void
337protocolSendRequest( tr_peermsgs               * msgs,
338                     const struct peer_request * req )
339{
340    tr_peerIo       * io  = msgs->peer->io;
341    struct evbuffer * out = msgs->outMessages;
342
343    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
344    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
345    tr_peerIoWriteUint32( io, out, req->index );
346    tr_peerIoWriteUint32( io, out, req->offset );
347    tr_peerIoWriteUint32( io, out, req->length );
348
349    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
350    dbgOutMessageLen( msgs );
351    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
352}
353
354static void
355protocolSendCancel( tr_peermsgs               * msgs,
356                    const struct peer_request * req )
357{
358    tr_peerIo       * io  = msgs->peer->io;
359    struct evbuffer * out = msgs->outMessages;
360
361    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
362    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
363    tr_peerIoWriteUint32( io, out, req->index );
364    tr_peerIoWriteUint32( io, out, req->offset );
365    tr_peerIoWriteUint32( io, out, req->length );
366
367    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
368    dbgOutMessageLen( msgs );
369    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
370}
371
372static void
373protocolSendPort(tr_peermsgs *msgs, uint16_t port)
374{
375    tr_peerIo       * io  = msgs->peer->io;
376    struct evbuffer * out = msgs->outMessages;
377
378    dbgmsg( msgs, "sending Port %u", port);
379    tr_peerIoWriteUint32( io, out, 3 );
380    tr_peerIoWriteUint8 ( io, out, BT_PORT );
381    tr_peerIoWriteUint16( io, out, port);
382}
383
384static void
385protocolSendHave( tr_peermsgs * msgs,
386                  uint32_t      index )
387{
388    tr_peerIo       * io  = msgs->peer->io;
389    struct evbuffer * out = msgs->outMessages;
390
391    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
392    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
393    tr_peerIoWriteUint32( io, out, index );
394
395    dbgmsg( msgs, "sending Have %u", index );
396    dbgOutMessageLen( msgs );
397    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
398}
399
400#if 0
401static void
402protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
403{
404    tr_peerIo       * io  = msgs->peer->io;
405    struct evbuffer * out = msgs->outMessages;
406
407    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
408
409    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
410    tr_peerIoWriteUint8 ( io, out, BT_FEXT_ALLOWED_FAST );
411    tr_peerIoWriteUint32( io, out, pieceIndex );
412
413    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
414    dbgOutMessageLen( msgs );
415}
416#endif
417
418static void
419protocolSendChoke( tr_peermsgs * msgs,
420                   int           choke )
421{
422    tr_peerIo       * io  = msgs->peer->io;
423    struct evbuffer * out = msgs->outMessages;
424
425    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
426    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
427
428    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
429    dbgOutMessageLen( msgs );
430    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
431}
432
433static void
434protocolSendHaveAll( tr_peermsgs * msgs )
435{
436    tr_peerIo       * io  = msgs->peer->io;
437    struct evbuffer * out = msgs->outMessages;
438
439    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
440
441    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
442    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_ALL );
443
444    dbgmsg( msgs, "sending HAVE_ALL..." );
445    dbgOutMessageLen( msgs );
446    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
447}
448
449static void
450protocolSendHaveNone( tr_peermsgs * msgs )
451{
452    tr_peerIo       * io  = msgs->peer->io;
453    struct evbuffer * out = msgs->outMessages;
454
455    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
456
457    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
458    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_NONE );
459
460    dbgmsg( msgs, "sending HAVE_NONE..." );
461    dbgOutMessageLen( msgs );
462    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
463}
464
465/**
466***  EVENTS
467**/
468
469static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0, 0, 0 };
470
471static void
472publish( tr_peermsgs * msgs, tr_peer_event * e )
473{
474    assert( msgs->peer );
475    assert( msgs->peer->msgs == msgs );
476
477    tr_publisherPublish( &msgs->publisher, msgs->peer, e );
478}
479
480static void
481fireError( tr_peermsgs * msgs, int err )
482{
483    tr_peer_event e = blankEvent;
484    e.eventType = TR_PEER_ERROR;
485    e.err = err;
486    publish( msgs, &e );
487}
488
489static void
490fireUploadOnly( tr_peermsgs * msgs, tr_bool uploadOnly )
491{
492    tr_peer_event e = blankEvent;
493    e.eventType = TR_PEER_UPLOAD_ONLY;
494    e.uploadOnly = uploadOnly;
495    publish( msgs, &e );
496}
497
498static void
499firePeerProgress( tr_peermsgs * msgs )
500{
501    tr_peer_event e = blankEvent;
502    e.eventType = TR_PEER_PEER_PROGRESS;
503    e.progress = msgs->peer->progress;
504    publish( msgs, &e );
505}
506
507static void
508fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
509{
510    tr_peer_event e = blankEvent;
511    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
512    e.pieceIndex = req->index;
513    e.offset = req->offset;
514    e.length = req->length;
515    publish( msgs, &e );
516}
517
518static void
519fireGotRej( tr_peermsgs * msgs, const struct peer_request * req )
520{
521    tr_peer_event e = blankEvent;
522    e.eventType = TR_PEER_CLIENT_GOT_REJ;
523    e.pieceIndex = req->index;
524    e.offset = req->offset;
525    e.length = req->length;
526    publish( msgs, &e );
527}
528
529static void
530fireGotChoke( tr_peermsgs * msgs )
531{
532    tr_peer_event e = blankEvent;
533    e.eventType = TR_PEER_CLIENT_GOT_CHOKE;
534    publish( msgs, &e );
535}
536
537static void
538fireClientGotData( tr_peermsgs * msgs,
539                   uint32_t      length,
540                   int           wasPieceData )
541{
542    tr_peer_event e = blankEvent;
543
544    e.length = length;
545    e.eventType = TR_PEER_CLIENT_GOT_DATA;
546    e.wasPieceData = wasPieceData;
547    publish( msgs, &e );
548}
549
550static void
551fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
552{
553    tr_peer_event e = blankEvent;
554    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
555    e.pieceIndex = pieceIndex;
556    publish( msgs, &e );
557}
558
559static void
560fireClientGotPort( tr_peermsgs * msgs, tr_port port )
561{
562    tr_peer_event e = blankEvent;
563    e.eventType = TR_PEER_CLIENT_GOT_PORT;
564    e.port = port;
565    publish( msgs, &e );
566}
567
568static void
569fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
570{
571    tr_peer_event e = blankEvent;
572    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
573    e.pieceIndex = pieceIndex;
574    publish( msgs, &e );
575}
576
577static void
578firePeerGotData( tr_peermsgs  * msgs,
579                 uint32_t       length,
580                 int            wasPieceData )
581{
582    tr_peer_event e = blankEvent;
583
584    e.length = length;
585    e.eventType = TR_PEER_PEER_GOT_DATA;
586    e.wasPieceData = wasPieceData;
587
588    publish( msgs, &e );
589}
590
591/**
592***  ALLOWED FAST SET
593***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
594**/
595
596size_t
597tr_generateAllowedSet( tr_piece_index_t * setmePieces,
598                       size_t             desiredSetSize,
599                       size_t             pieceCount,
600                       const uint8_t    * infohash,
601                       const tr_address * addr )
602{
603    size_t setSize = 0;
604
605    assert( setmePieces );
606    assert( desiredSetSize <= pieceCount );
607    assert( desiredSetSize );
608    assert( pieceCount );
609    assert( infohash );
610    assert( addr );
611
612    if( addr->type == TR_AF_INET )
613    {
614        uint8_t w[SHA_DIGEST_LENGTH + 4], *walk=w;
615        uint8_t x[SHA_DIGEST_LENGTH];
616
617        uint32_t ui32 = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
618        memcpy( w, &ui32, sizeof( uint32_t ) );
619        walk += sizeof( uint32_t );
620        memcpy( walk, infohash, SHA_DIGEST_LENGTH );                 /* (2) */
621        walk += SHA_DIGEST_LENGTH;
622        tr_sha1( x, w, walk-w, NULL );                               /* (3) */
623        assert( sizeof( w ) == walk-w );
624
625        while( setSize<desiredSetSize )
626        {
627            int i;
628            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
629            {
630                size_t k;
631                uint32_t j = i * 4;                                  /* (5) */
632                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
633                uint32_t index = y % pieceCount;                     /* (7) */
634
635                for( k=0; k<setSize; ++k )                           /* (8) */
636                    if( setmePieces[k] == index )
637                        break;
638
639                if( k == setSize )
640                    setmePieces[setSize++] = index;                  /* (9) */
641            }
642
643            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
644        }
645    }
646
647    return setSize;
648}
649
650static void
651updateFastSet( tr_peermsgs * msgs UNUSED )
652{
653#if 0
654    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
655    const int peerIsNeedy = msgs->peer->progress < 0.10;
656
657    if( fext && peerIsNeedy && !msgs->haveFastSet )
658    {
659        size_t i;
660        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
661        const tr_info * inf = &msgs->torrent->info;
662        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
663
664        /* build the fast set */
665        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
666        msgs->haveFastSet = 1;
667
668        /* send it to the peer */
669        for( i=0; i<msgs->fastsetSize; ++i )
670            protocolSendAllowedFast( msgs, msgs->fastset[i] );
671    }
672#endif
673}
674
675/**
676***  INTEREST
677**/
678
679static tr_bool
680isPieceInteresting( const tr_peermsgs * msgs,
681                    tr_piece_index_t    piece )
682{
683    const tr_torrent * torrent = msgs->torrent;
684
685    return ( !torrent->info.pieces[piece].dnd )                  /* we want it */
686          && ( !tr_cpPieceIsComplete( &torrent->completion, piece ) ) /* !have */
687          && ( tr_bitsetHas( &msgs->peer->have, piece ) );      /* peer has it */
688}
689
690/* "interested" means we'll ask for piece data if they unchoke us */
691static tr_bool
692isPeerInteresting( const tr_peermsgs * msgs )
693{
694    tr_piece_index_t    i;
695    const tr_torrent *  torrent;
696    const tr_bitfield * bitfield;
697    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
698
699    if( clientIsSeed )
700        return FALSE;
701
702    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
703        return FALSE;
704
705    torrent = msgs->torrent;
706    bitfield = tr_cpPieceBitfield( &torrent->completion );
707
708    for( i = 0; i < torrent->info.pieceCount; ++i )
709        if( isPieceInteresting( msgs, i ) )
710            return TRUE;
711
712    return FALSE;
713}
714
715static void
716sendInterest( tr_peermsgs * msgs,
717              int           weAreInterested )
718{
719    struct evbuffer * out = msgs->outMessages;
720
721    assert( msgs );
722    assert( weAreInterested == 0 || weAreInterested == 1 );
723
724    msgs->peer->clientIsInterested = weAreInterested;
725    dbgmsg( msgs, "Sending %s", weAreInterested ? "Interested" : "Not Interested" );
726    tr_peerIoWriteUint32( msgs->peer->io, out, sizeof( uint8_t ) );
727    tr_peerIoWriteUint8 ( msgs->peer->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
728
729    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
730    dbgOutMessageLen( msgs );
731}
732
733static void
734updateInterest( tr_peermsgs * msgs )
735{
736    const int i = isPeerInteresting( msgs );
737
738    if( i != msgs->peer->clientIsInterested )
739        sendInterest( msgs, i );
740}
741
742static tr_bool
743popNextMetadataRequest( tr_peermsgs * msgs, int * piece )
744{
745    if( msgs->peerAskedForMetadataCount == 0 )
746        return FALSE;
747
748    *piece = msgs->peerAskedForMetadata[0];
749
750    tr_removeElementFromArray( msgs->peerAskedForMetadata, 0, sizeof( int ),
751                               msgs->peerAskedForMetadataCount-- );
752
753    return TRUE;
754}
755
756static tr_bool
757popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
758{
759    if( msgs->peerAskedForCount == 0 )
760        return FALSE;
761
762    *setme = msgs->peerAskedFor[0];
763
764    tr_removeElementFromArray( msgs->peerAskedFor, 0, sizeof( struct peer_request ),
765                               msgs->peerAskedForCount-- );
766
767    return TRUE;
768}
769
770static void
771cancelAllRequestsToClient( tr_peermsgs * msgs )
772{
773    struct peer_request req;
774    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
775
776    while( popNextRequest( msgs, &req ))
777        if( mustSendCancel )
778            protocolSendReject( msgs, &req );
779}
780
781void
782tr_peerMsgsSetChoke( tr_peermsgs * msgs,
783                     int           choke )
784{
785    const time_t now = tr_time( );
786    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
787
788    assert( msgs );
789    assert( msgs->peer );
790    assert( choke == 0 || choke == 1 );
791
792    if( msgs->peer->chokeChangedAt > fibrillationTime )
793    {
794        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
795    }
796    else if( msgs->peer->peerIsChoked != choke )
797    {
798        msgs->peer->peerIsChoked = choke;
799        if( choke )
800            cancelAllRequestsToClient( msgs );
801        protocolSendChoke( msgs, choke );
802        msgs->peer->chokeChangedAt = now;
803    }
804}
805
806/**
807***
808**/
809
810void
811tr_peerMsgsHave( tr_peermsgs * msgs,
812                 uint32_t      index )
813{
814    protocolSendHave( msgs, index );
815
816    /* since we have more pieces now, we might not be interested in this peer */
817    updateInterest( msgs );
818}
819
820/**
821***
822**/
823
824static tr_bool
825reqIsValid( const tr_peermsgs * peer,
826            uint32_t            index,
827            uint32_t            offset,
828            uint32_t            length )
829{
830    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
831}
832
833static tr_bool
834requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
835{
836    return reqIsValid( msgs, req->index, req->offset, req->length );
837}
838
839
840void
841tr_peerMsgsCancel( tr_peermsgs * msgs, tr_block_index_t block )
842{
843    struct peer_request req;
844    blockToReq( msgs->torrent, block, &req );
845    protocolSendCancel( msgs, &req );
846}
847
848/**
849***
850**/
851
852static void
853sendLtepHandshake( tr_peermsgs * msgs )
854{
855    tr_benc val, *m;
856    char * buf;
857    int len;
858    tr_bool allow_pex;
859    tr_bool allow_metadata_xfer;
860    struct evbuffer * out = msgs->outMessages;
861    const unsigned char * ipv6 = tr_globalIPv6();
862
863    if( msgs->clientSentLtepHandshake )
864        return;
865
866    dbgmsg( msgs, "sending an ltep handshake" );
867    msgs->clientSentLtepHandshake = 1;
868
869    /* decide if we want to advertise metadata xfer support (BEP 9) */
870    if( tr_torrentIsPrivate( msgs->torrent ) )
871        allow_metadata_xfer = 0;
872    else
873        allow_metadata_xfer = 1;
874
875    /* decide if we want to advertise pex support */
876    if( !tr_torrentAllowsPex( msgs->torrent ) )
877        allow_pex = 0;
878    else if( msgs->peerSentLtepHandshake )
879        allow_pex = msgs->peerSupportsPex ? 1 : 0;
880    else
881        allow_pex = 1;
882
883    tr_bencInitDict( &val, 8 );
884    tr_bencDictAddInt( &val, "e", getSession(msgs)->encryptionMode != TR_CLEAR_PREFERRED );
885    if( ipv6 != NULL )
886        tr_bencDictAddRaw( &val, "ipv6", ipv6, 16 );
887    if( allow_metadata_xfer && tr_torrentHasMetadata( msgs->torrent )
888                            && ( msgs->torrent->infoDictLength > 0 ) )
889        tr_bencDictAddInt( &val, "metadata_size", msgs->torrent->infoDictLength );
890    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( getSession(msgs) ) );
891    tr_bencDictAddInt( &val, "reqq", REQQ );
892    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
893    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
894    m  = tr_bencDictAddDict( &val, "m", 2 );
895    if( allow_metadata_xfer )
896        tr_bencDictAddInt( m, "ut_metadata", UT_METADATA_ID );
897    if( allow_pex )
898        tr_bencDictAddInt( m, "ut_pex", UT_PEX_ID );
899
900    buf = tr_bencToStr( &val, TR_FMT_BENC, &len );
901
902    tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + len );
903    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
904    tr_peerIoWriteUint8 ( msgs->peer->io, out, LTEP_HANDSHAKE );
905    tr_peerIoWriteBytes ( msgs->peer->io, out, buf, len );
906    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
907    dbgOutMessageLen( msgs );
908
909    /* cleanup */
910    tr_bencFree( &val );
911    tr_free( buf );
912}
913
914static void
915parseLtepHandshake( tr_peermsgs *     msgs,
916                    int               len,
917                    struct evbuffer * inbuf )
918{
919    int64_t   i;
920    tr_benc   val, * sub;
921    uint8_t * tmp = tr_new( uint8_t, len );
922    const uint8_t *addr;
923    size_t addr_len;
924    tr_pex pex;
925
926    memset( &pex, 0, sizeof( tr_pex ) );
927
928    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
929    msgs->peerSentLtepHandshake = 1;
930
931    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
932    {
933        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
934        tr_free( tmp );
935        return;
936    }
937
938    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
939
940    /* does the peer prefer encrypted connections? */
941    if( tr_bencDictFindInt( &val, "e", &i ) ) {
942        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
943                                              : ENCRYPTION_PREFERENCE_NO;
944        if( i )
945            pex.flags |= ADDED_F_ENCRYPTION_FLAG;
946    }
947
948    /* check supported messages for utorrent pex */
949    msgs->peerSupportsPex = 0;
950    msgs->peerSupportsMetadataXfer = 0;
951
952    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
953        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
954            msgs->peerSupportsPex = i != 0;
955            msgs->ut_pex_id = (uint8_t) i;
956            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
957        }
958        if( tr_bencDictFindInt( sub, "ut_metadata", &i ) ) {
959            msgs->peerSupportsMetadataXfer = i != 0;
960            msgs->ut_metadata_id = (uint8_t) i;
961            dbgmsg( msgs, "msgs->ut_metadata_id is %d", (int)msgs->ut_metadata_id );
962        }
963    }
964
965    /* look for metainfo size (BEP 9) */
966    if( tr_bencDictFindInt( &val, "metadata_size", &i ) )
967        tr_torrentSetMetadataSizeHint( msgs->torrent, i );
968
969    /* look for upload_only (BEP 21) */
970    if( tr_bencDictFindInt( &val, "upload_only", &i ) ) {
971        fireUploadOnly( msgs, i!=0 );
972        if( i )
973            pex.flags |= ADDED_F_SEED_FLAG;
974    }
975
976    /* get peer's listening port */
977    if( tr_bencDictFindInt( &val, "p", &i ) ) {
978        fireClientGotPort( msgs, (tr_port)i );
979        pex.port = htons( (uint16_t)i );
980        dbgmsg( msgs, "peer's port is now %d", (int)i );
981    }
982
983    if( tr_peerIoIsIncoming( msgs->peer->io )
984        && tr_bencDictFindRaw( &val, "ipv4", &addr, &addr_len )
985        && ( addr_len == 4 ) )
986    {
987        pex.addr.type = TR_AF_INET;
988        memcpy( &pex.addr.addr.addr4, addr, 4 );
989        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex );
990    }
991
992    if( tr_peerIoIsIncoming( msgs->peer->io )
993        && tr_bencDictFindRaw( &val, "ipv6", &addr, &addr_len )
994        && ( addr_len == 16 ) )
995    {
996        pex.addr.type = TR_AF_INET6;
997        memcpy( &pex.addr.addr.addr6, addr, 16 );
998        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex );
999    }
1000
1001    /* get peer's maximum request queue size */
1002    if( tr_bencDictFindInt( &val, "reqq", &i ) )
1003        msgs->reqq = i;
1004
1005    tr_bencFree( &val );
1006    tr_free( tmp );
1007}
1008
1009static void
1010parseUtMetadata( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1011{
1012    tr_benc dict;
1013    char * msg_end;
1014    char * benc_end;
1015    int64_t msg_type = -1;
1016    int64_t piece = -1;
1017    int64_t total_size = 0;
1018    uint8_t * tmp = tr_new( uint8_t, msglen );
1019
1020    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1021    msg_end = (char*)tmp + msglen;
1022
1023    if( !tr_bencLoad( tmp, msglen, &dict, &benc_end ) )
1024    {
1025        tr_bencDictFindInt( &dict, "msg_type", &msg_type );
1026        tr_bencDictFindInt( &dict, "piece", &piece );
1027        tr_bencDictFindInt( &dict, "total_size", &total_size );
1028        tr_bencFree( &dict );
1029    }
1030
1031    dbgmsg( msgs, "got ut_metadata msg: type %d, piece %d, total_size %d",
1032            (int)msg_type, (int)piece, (int)total_size );
1033
1034    if( msg_type == METADATA_MSG_TYPE_REJECT )
1035    {
1036        /* NOOP */
1037    }
1038
1039    if( ( msg_type == METADATA_MSG_TYPE_DATA )
1040        && ( !tr_torrentHasMetadata( msgs->torrent ) )
1041        && ( msg_end - benc_end <= METADATA_PIECE_SIZE )
1042        && ( piece * METADATA_PIECE_SIZE + (msg_end - benc_end) <= total_size ) )
1043    {
1044        const int pieceLen = msg_end - benc_end;
1045        tr_torrentSetMetadataPiece( msgs->torrent, piece, benc_end, pieceLen );
1046    }
1047
1048    if( msg_type == METADATA_MSG_TYPE_REQUEST )
1049    {
1050        if( ( piece >= 0 )
1051            && tr_torrentHasMetadata( msgs->torrent )
1052            && !tr_torrentIsPrivate( msgs->torrent )
1053            && ( msgs->peerAskedForMetadataCount < METADATA_REQQ ) )
1054        {
1055            msgs->peerAskedForMetadata[msgs->peerAskedForMetadataCount++] = piece;
1056        }
1057        else
1058        {
1059            tr_benc tmp;
1060            int payloadLen;
1061            char * payload;
1062            tr_peerIo  * io  = msgs->peer->io;
1063            struct evbuffer * out = msgs->outMessages;
1064
1065            /* build the rejection message */
1066            tr_bencInitDict( &tmp, 2 );
1067            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1068            tr_bencDictAddInt( &tmp, "piece", piece );
1069            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1070            tr_bencFree( &tmp );
1071
1072            /* write it out as a LTEP message to our outMessages buffer */
1073            tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + payloadLen );
1074            tr_peerIoWriteUint8 ( io, out, BT_LTEP );
1075            tr_peerIoWriteUint8 ( io, out, msgs->ut_metadata_id );
1076            tr_peerIoWriteBytes ( io, out, payload, payloadLen );
1077            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1078            dbgOutMessageLen( msgs );
1079
1080            tr_free( payload );
1081        }
1082    }
1083
1084    tr_free( tmp );
1085}
1086
1087static void
1088parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1089{
1090    int loaded = 0;
1091    uint8_t * tmp = tr_new( uint8_t, msglen );
1092    tr_benc val;
1093    tr_torrent * tor = msgs->torrent;
1094    const uint8_t * added;
1095    size_t added_len;
1096
1097    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1098
1099    if( tr_torrentAllowsPex( tor )
1100      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1101    {
1102        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1103        {
1104            tr_pex * pex;
1105            size_t i, n;
1106            size_t added_f_len = 0;
1107            const uint8_t * added_f = NULL;
1108
1109            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1110            pex = tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len, &n );
1111
1112            n = MIN( n, MAX_PEX_PEER_COUNT );
1113            for( i=0; i<n; ++i )
1114                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex + i );
1115
1116            tr_free( pex );
1117        }
1118
1119        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1120        {
1121            tr_pex * pex;
1122            size_t i, n;
1123            size_t added_f_len = 0;
1124            const uint8_t * added_f = NULL;
1125
1126            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1127            pex = tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len, &n );
1128
1129            n = MIN( n, MAX_PEX_PEER_COUNT );
1130            for( i=0; i<n; ++i )
1131                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex + i );
1132
1133            tr_free( pex );
1134        }
1135    }
1136
1137    if( loaded )
1138        tr_bencFree( &val );
1139    tr_free( tmp );
1140}
1141
1142static void sendPex( tr_peermsgs * msgs );
1143
1144static void
1145parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer  * inbuf )
1146{
1147    uint8_t ltep_msgid;
1148
1149    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1150    msglen--;
1151
1152    if( ltep_msgid == LTEP_HANDSHAKE )
1153    {
1154        dbgmsg( msgs, "got ltep handshake" );
1155        parseLtepHandshake( msgs, msglen, inbuf );
1156        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1157        {
1158            sendLtepHandshake( msgs );
1159            sendPex( msgs );
1160        }
1161    }
1162    else if( ltep_msgid == UT_PEX_ID )
1163    {
1164        dbgmsg( msgs, "got ut pex" );
1165        msgs->peerSupportsPex = 1;
1166        parseUtPex( msgs, msglen, inbuf );
1167    }
1168    else if( ltep_msgid == UT_METADATA_ID )
1169    {
1170        dbgmsg( msgs, "got ut metadata" );
1171        msgs->peerSupportsMetadataXfer = 1;
1172        parseUtMetadata( msgs, msglen, inbuf );
1173    }
1174    else
1175    {
1176        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1177        evbuffer_drain( inbuf, msglen );
1178    }
1179}
1180
1181static int
1182readBtLength( tr_peermsgs *     msgs,
1183              struct evbuffer * inbuf,
1184              size_t            inlen )
1185{
1186    uint32_t len;
1187
1188    if( inlen < sizeof( len ) )
1189        return READ_LATER;
1190
1191    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1192
1193    if( len == 0 ) /* peer sent us a keepalive message */
1194        dbgmsg( msgs, "got KeepAlive" );
1195    else
1196    {
1197        msgs->incoming.length = len;
1198        msgs->state = AWAITING_BT_ID;
1199    }
1200
1201    return READ_NOW;
1202}
1203
1204static int readBtMessage( tr_peermsgs     * msgs,
1205                          struct evbuffer * inbuf,
1206                          size_t            inlen );
1207
1208static int
1209readBtId( tr_peermsgs * msgs, struct evbuffer  * inbuf, size_t inlen )
1210{
1211    uint8_t id;
1212
1213    if( inlen < sizeof( uint8_t ) )
1214        return READ_LATER;
1215
1216    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1217    msgs->incoming.id = id;
1218    dbgmsg( msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length );
1219
1220    if( id == BT_PIECE )
1221    {
1222        msgs->state = AWAITING_BT_PIECE;
1223        return READ_NOW;
1224    }
1225    else if( msgs->incoming.length != 1 )
1226    {
1227        msgs->state = AWAITING_BT_MESSAGE;
1228        return READ_NOW;
1229    }
1230    else return readBtMessage( msgs, inbuf, inlen - 1 );
1231}
1232
1233static void
1234updatePeerProgress( tr_peermsgs * msgs )
1235{
1236    msgs->peer->progress = tr_bitsetPercent( &msgs->peer->have );
1237    dbgmsg( msgs, "peer progress is %f", msgs->peer->progress );
1238    updateFastSet( msgs );
1239    updateInterest( msgs );
1240    firePeerProgress( msgs );
1241}
1242
1243static void
1244peerMadeRequest( tr_peermsgs *               msgs,
1245                 const struct peer_request * req )
1246{
1247    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1248    const int reqIsValid = requestIsValid( msgs, req );
1249    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( &msgs->torrent->completion, req->index );
1250    const int peerIsChoked = msgs->peer->peerIsChoked;
1251
1252    int allow = FALSE;
1253
1254    if( !reqIsValid )
1255        dbgmsg( msgs, "rejecting an invalid request." );
1256    else if( !clientHasPiece )
1257        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1258    else if( peerIsChoked )
1259        dbgmsg( msgs, "rejecting request from choked peer" );
1260    else if( msgs->peerAskedForCount + 1 >= REQQ )
1261        dbgmsg( msgs, "rejecting request ... reqq is full" );
1262    else
1263        allow = TRUE;
1264
1265    if( allow )
1266        msgs->peerAskedFor[msgs->peerAskedForCount++] = *req;
1267    else if( fext )
1268        protocolSendReject( msgs, req );
1269}
1270
1271static tr_bool
1272messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1273{
1274    switch( id )
1275    {
1276        case BT_CHOKE:
1277        case BT_UNCHOKE:
1278        case BT_INTERESTED:
1279        case BT_NOT_INTERESTED:
1280        case BT_FEXT_HAVE_ALL:
1281        case BT_FEXT_HAVE_NONE:
1282            return len == 1;
1283
1284        case BT_HAVE:
1285        case BT_FEXT_SUGGEST:
1286        case BT_FEXT_ALLOWED_FAST:
1287            return len == 5;
1288
1289        case BT_BITFIELD:
1290            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1291
1292        case BT_REQUEST:
1293        case BT_CANCEL:
1294        case BT_FEXT_REJECT:
1295            return len == 13;
1296
1297        case BT_PIECE:
1298            return len > 9 && len <= 16393;
1299
1300        case BT_PORT:
1301            return len == 3;
1302
1303        case BT_LTEP:
1304            return len >= 2;
1305
1306        default:
1307            return FALSE;
1308    }
1309}
1310
1311static int clientGotBlock( tr_peermsgs *               msgs,
1312                           const uint8_t *             block,
1313                           const struct peer_request * req );
1314
1315static int
1316readBtPiece( tr_peermsgs      * msgs,
1317             struct evbuffer  * inbuf,
1318             size_t             inlen,
1319             size_t           * setme_piece_bytes_read )
1320{
1321    struct peer_request * req = &msgs->incoming.blockReq;
1322
1323    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1324    dbgmsg( msgs, "In readBtPiece" );
1325
1326    if( !req->length )
1327    {
1328        if( inlen < 8 )
1329            return READ_LATER;
1330
1331        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1332        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1333        req->length = msgs->incoming.length - 9;
1334        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1335        return READ_NOW;
1336    }
1337    else
1338    {
1339        int err;
1340
1341        /* read in another chunk of data */
1342        const size_t nLeft = req->length - EVBUFFER_LENGTH( msgs->incoming.block );
1343        size_t n = MIN( nLeft, inlen );
1344        size_t i = n;
1345
1346        while( i > 0 )
1347        {
1348            uint8_t buf[MAX_STACK_ARRAY_SIZE];
1349            const size_t thisPass = MIN( i, sizeof( buf ) );
1350            tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, thisPass );
1351            evbuffer_add( msgs->incoming.block, buf, thisPass );
1352            i -= thisPass;
1353        }
1354
1355        fireClientGotData( msgs, n, TRUE );
1356        *setme_piece_bytes_read += n;
1357        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1358               n, req->index, req->offset, req->length,
1359               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1360        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1361            return READ_LATER;
1362
1363        /* we've got the whole block ... process it */
1364        err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), req );
1365
1366        /* cleanup */
1367        evbuffer_free( msgs->incoming.block );
1368        msgs->incoming.block = evbuffer_new( );
1369        req->length = 0;
1370        msgs->state = AWAITING_BT_LENGTH;
1371        return err ? READ_ERR : READ_NOW;
1372    }
1373}
1374
1375static void updateDesiredRequestCount( tr_peermsgs * msgs, uint64_t now );
1376
1377static void
1378decrementActiveRequestCount( tr_peermsgs * msgs )
1379{
1380    if( msgs->activeRequestCount > 0 )
1381        msgs->activeRequestCount--;
1382}
1383
1384static int
1385readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1386{
1387    uint32_t      ui32;
1388    uint32_t      msglen = msgs->incoming.length;
1389    const uint8_t id = msgs->incoming.id;
1390    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1391    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1392
1393    --msglen; /* id length */
1394
1395    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1396
1397    if( inlen < msglen )
1398        return READ_LATER;
1399
1400    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1401    {
1402        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1403        fireError( msgs, EMSGSIZE );
1404        return READ_ERR;
1405    }
1406
1407    switch( id )
1408    {
1409        case BT_CHOKE:
1410            dbgmsg( msgs, "got Choke" );
1411            msgs->peer->clientIsChoked = 1;
1412            if( !fext )
1413                fireGotChoke( msgs );
1414            break;
1415
1416        case BT_UNCHOKE:
1417            dbgmsg( msgs, "got Unchoke" );
1418            msgs->peer->clientIsChoked = 0;
1419            updateDesiredRequestCount( msgs, tr_date( ) );
1420            break;
1421
1422        case BT_INTERESTED:
1423            dbgmsg( msgs, "got Interested" );
1424            msgs->peer->peerIsInterested = 1;
1425            break;
1426
1427        case BT_NOT_INTERESTED:
1428            dbgmsg( msgs, "got Not Interested" );
1429            msgs->peer->peerIsInterested = 0;
1430            break;
1431
1432        case BT_HAVE:
1433            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1434            dbgmsg( msgs, "got Have: %u", ui32 );
1435            if( tr_bitsetAdd( &msgs->peer->have, ui32 ) ) {
1436                fireError( msgs, ERANGE );
1437                return READ_ERR;
1438            }
1439            updatePeerProgress( msgs );
1440            break;
1441
1442        case BT_BITFIELD: {
1443            const size_t bitCount = tr_torrentHasMetadata( msgs->torrent )
1444                                  ? msgs->torrent->info.pieceCount
1445                                  : msglen * 8;
1446            dbgmsg( msgs, "got a bitfield" );
1447            tr_bitsetReserve( &msgs->peer->have, bitCount );
1448            tr_peerIoReadBytes( msgs->peer->io, inbuf,
1449                                msgs->peer->have.bitfield.bits, msglen );
1450            updatePeerProgress( msgs );
1451            break;
1452        }
1453
1454        case BT_REQUEST:
1455        {
1456            struct peer_request r;
1457            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1458            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1459            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1460            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1461            peerMadeRequest( msgs, &r );
1462            break;
1463        }
1464
1465        case BT_CANCEL:
1466        {
1467            int i;
1468            struct peer_request r;
1469            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1470            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1471            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1472            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1473
1474            for( i=0; i<msgs->peerAskedForCount; ++i ) {
1475                const struct peer_request * req = msgs->peerAskedFor + i;
1476                if( ( req->index == r.index ) && ( req->offset == r.offset ) && ( req->length == r.length ) )
1477                    break;
1478            }
1479
1480            if( i < msgs->peerAskedForCount )
1481                tr_removeElementFromArray( msgs->peerAskedFor, i, sizeof( struct peer_request ),
1482                                           msgs->peerAskedForCount-- );
1483            break;
1484        }
1485
1486        case BT_PIECE:
1487            assert( 0 ); /* handled elsewhere! */
1488            break;
1489
1490        case BT_PORT:
1491            dbgmsg( msgs, "Got a BT_PORT" );
1492            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->dht_port );
1493            if( msgs->peer->dht_port > 0 )
1494                tr_dhtAddNode( getSession(msgs),
1495                               tr_peerAddress( msgs->peer ),
1496                               msgs->peer->dht_port, 0 );
1497            break;
1498
1499        case BT_FEXT_SUGGEST:
1500            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1501            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1502            if( fext )
1503                fireClientGotSuggest( msgs, ui32 );
1504            else {
1505                fireError( msgs, EMSGSIZE );
1506                return READ_ERR;
1507            }
1508            break;
1509
1510        case BT_FEXT_ALLOWED_FAST:
1511            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1512            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1513            if( fext )
1514                fireClientGotAllowedFast( msgs, ui32 );
1515            else {
1516                fireError( msgs, EMSGSIZE );
1517                return READ_ERR;
1518            }
1519            break;
1520
1521        case BT_FEXT_HAVE_ALL:
1522            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1523            if( fext ) {
1524                tr_bitsetSetHaveAll( &msgs->peer->have );
1525                updatePeerProgress( msgs );
1526            } else {
1527                fireError( msgs, EMSGSIZE );
1528                return READ_ERR;
1529            }
1530            break;
1531
1532        case BT_FEXT_HAVE_NONE:
1533            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1534            if( fext ) {
1535                tr_bitsetSetHaveNone( &msgs->peer->have );
1536                updatePeerProgress( msgs );
1537            } else {
1538                fireError( msgs, EMSGSIZE );
1539                return READ_ERR;
1540            }
1541            break;
1542
1543        case BT_FEXT_REJECT:
1544        {
1545            struct peer_request r;
1546            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1547            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1548            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1549            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1550            if( fext ) {
1551                decrementActiveRequestCount( msgs );
1552                fireGotRej( msgs, &r );
1553            } else {
1554                fireError( msgs, EMSGSIZE );
1555                return READ_ERR;
1556            }
1557            break;
1558        }
1559
1560        case BT_LTEP:
1561            dbgmsg( msgs, "Got a BT_LTEP" );
1562            parseLtep( msgs, msglen, inbuf );
1563            break;
1564
1565        default:
1566            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1567            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1568            break;
1569    }
1570
1571    assert( msglen + 1 == msgs->incoming.length );
1572    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1573
1574    msgs->state = AWAITING_BT_LENGTH;
1575    return READ_NOW;
1576}
1577
1578static TR_INLINE void
1579decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount )
1580{
1581    tr_torrent * tor = msgs->torrent;
1582
1583    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1584}
1585
1586static TR_INLINE void
1587clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1588{
1589    decrementDownloadedCount( msgs, req->length );
1590}
1591
1592static void
1593addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1594{
1595    if( !msgs->peer->blame )
1596         msgs->peer->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1597    tr_bitfieldAdd( msgs->peer->blame, index );
1598}
1599
1600/* returns 0 on success, or an errno on failure */
1601static int
1602clientGotBlock( tr_peermsgs *               msgs,
1603                const uint8_t *             data,
1604                const struct peer_request * req )
1605{
1606    int err;
1607    tr_torrent * tor = msgs->torrent;
1608    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1609
1610    assert( msgs );
1611    assert( req );
1612
1613    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1614        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1615                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1616        return EMSGSIZE;
1617    }
1618
1619    /* save the block */
1620    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1621
1622    if( !tr_peerMgrDidPeerRequest( msgs->torrent, msgs->peer, block ) ) {
1623        dbgmsg( msgs, "we didn't ask for this message..." );
1624        return 0;
1625    }
1626
1627    /**
1628    ***  Save the block
1629    **/
1630
1631    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1632        return err;
1633
1634    addPeerToBlamefield( msgs, req->index );
1635    decrementActiveRequestCount( msgs );
1636    fireGotBlock( msgs, req );
1637    return 0;
1638}
1639
1640static int peerPulse( void * vmsgs );
1641
1642static void
1643didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1644{
1645    tr_peermsgs * msgs = vmsgs;
1646    firePeerGotData( msgs, bytesWritten, wasPieceData );
1647
1648    if ( tr_isPeerIo( io ) && io->userData )
1649        peerPulse( msgs );
1650}
1651
1652static ReadState
1653canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1654{
1655    ReadState         ret;
1656    tr_peermsgs *     msgs = vmsgs;
1657    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1658    const size_t      inlen = EVBUFFER_LENGTH( in );
1659
1660    dbgmsg( msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state );
1661
1662    if( !inlen )
1663    {
1664        ret = READ_LATER;
1665    }
1666    else if( msgs->state == AWAITING_BT_PIECE )
1667    {
1668        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1669    }
1670    else switch( msgs->state )
1671    {
1672        case AWAITING_BT_LENGTH:
1673            ret = readBtLength ( msgs, in, inlen ); break;
1674
1675        case AWAITING_BT_ID:
1676            ret = readBtId     ( msgs, in, inlen ); break;
1677
1678        case AWAITING_BT_MESSAGE:
1679            ret = readBtMessage( msgs, in, inlen ); break;
1680
1681        default:
1682            ret = READ_ERR;
1683            assert( 0 );
1684    }
1685
1686    dbgmsg( msgs, "canRead: ret is %d", (int)ret );
1687
1688    /* log the raw data that was read */
1689    if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
1690        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1691
1692    return ret;
1693}
1694
1695/**
1696***
1697**/
1698
1699static void
1700updateDesiredRequestCount( tr_peermsgs * msgs, uint64_t now )
1701{
1702    const tr_torrent * const torrent = msgs->torrent;
1703
1704    if( tr_torrentIsSeed( msgs->torrent ) )
1705    {
1706        msgs->desiredRequestCount = 0;
1707    }
1708    else if( msgs->peer->clientIsChoked )
1709    {
1710        msgs->desiredRequestCount = 0;
1711    }
1712    else
1713    {
1714        int irate;
1715        int estimatedBlocksInPeriod;
1716        double rate;
1717        const int floor = 4;
1718        const int seconds = REQUEST_BUF_SECS;
1719
1720        /* Get the rate limit we should use.
1721         * FIXME: this needs to consider all the other peers as well... */
1722        rate = tr_peerGetPieceSpeed( msgs->peer, now, TR_PEER_TO_CLIENT );
1723        if( tr_torrentUsesSpeedLimit( torrent, TR_PEER_TO_CLIENT ) )
1724            rate = MIN( rate, tr_torrentGetSpeedLimit( torrent, TR_PEER_TO_CLIENT ) );
1725
1726        /* honor the session limits, if enabled */
1727        if( tr_torrentUsesSessionLimits( torrent ) )
1728            if( tr_sessionGetActiveSpeedLimit( torrent->session, TR_PEER_TO_CLIENT, &irate ) )
1729                rate = MIN( rate, irate );
1730
1731        /* use this desired rate to figure out how
1732         * many requests we should send to this peer */
1733        estimatedBlocksInPeriod = ( rate * seconds * 1024 ) / torrent->blockSize;
1734        msgs->desiredRequestCount = MAX( floor, estimatedBlocksInPeriod );
1735
1736        /* honor the peer's maximum request count, if specified */
1737        if( msgs->reqq > 0 )
1738            if( msgs->desiredRequestCount > msgs->reqq )
1739                msgs->desiredRequestCount = msgs->reqq;
1740    }
1741}
1742
1743static void
1744updateMetadataRequests( tr_peermsgs * msgs, time_t now )
1745{
1746    int piece;
1747
1748    if( msgs->peerSupportsMetadataXfer
1749        && tr_torrentGetNextMetadataRequest( msgs->torrent, now, &piece ) )
1750    {
1751        tr_benc tmp;
1752        int payloadLen;
1753        char * payload;
1754        tr_peerIo  * io  = msgs->peer->io;
1755        struct evbuffer * out = msgs->outMessages;
1756
1757        /* build the data message */
1758        tr_bencInitDict( &tmp, 3 );
1759        tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REQUEST );
1760        tr_bencDictAddInt( &tmp, "piece", piece );
1761        payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1762        tr_bencFree( &tmp );
1763
1764        dbgmsg( msgs, "requesting metadata piece #%d", piece );
1765
1766        /* write it out as a LTEP message to our outMessages buffer */
1767        tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + payloadLen );
1768        tr_peerIoWriteUint8 ( io, out, BT_LTEP );
1769        tr_peerIoWriteUint8 ( io, out, msgs->ut_metadata_id );
1770        tr_peerIoWriteBytes ( io, out, payload, payloadLen );
1771        pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1772        dbgOutMessageLen( msgs );
1773
1774        tr_free( payload );
1775    }
1776}
1777
1778static void
1779updateBlockRequests( tr_peermsgs * msgs )
1780{
1781    const int MIN_BATCH_SIZE = 4;
1782    const int numwant = msgs->desiredRequestCount - msgs->activeRequestCount;
1783
1784    /* make sure we have enough block requests queued up */
1785    if( numwant >= MIN_BATCH_SIZE )
1786    {
1787        int i;
1788        int n;
1789        tr_block_index_t * blocks = tr_new( tr_block_index_t, numwant );
1790
1791        tr_peerMgrGetNextRequests( msgs->torrent, msgs->peer, numwant, blocks, &n );
1792
1793        for( i=0; i<n; ++i )
1794        {
1795            struct peer_request req;
1796            blockToReq( msgs->torrent, blocks[i], &req );
1797            protocolSendRequest( msgs, &req );
1798        }
1799
1800        msgs->activeRequestCount += n;
1801
1802        tr_free( blocks );
1803    }
1804}
1805
1806static void
1807prefetchPieces( tr_peermsgs *msgs )
1808{
1809    int i;
1810
1811    /* Maintain 12 prefetched blocks per unchoked peer */
1812    for( i=msgs->prefetchCount; i<msgs->peerAskedForCount && i<12; ++i )
1813    {
1814        const struct peer_request * req = msgs->peerAskedFor + i;
1815        tr_ioPrefetch( msgs->torrent, req->index, req->offset, req->length );
1816        ++msgs->prefetchCount;
1817    }
1818}
1819
1820static size_t
1821fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1822{
1823    int piece;
1824    size_t bytesWritten = 0;
1825    struct peer_request req;
1826    const tr_bool haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1827    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1828
1829    /**
1830    ***  Protocol messages
1831    **/
1832
1833    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1834    {
1835        dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1836        msgs->outMessagesBatchedAt = now;
1837    }
1838    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1839    {
1840        const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
1841        /* flush the protocol messages */
1842        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1843        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, FALSE );
1844        msgs->clientSentAnythingAt = now;
1845        msgs->outMessagesBatchedAt = 0;
1846        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1847        bytesWritten +=  len;
1848    }
1849
1850    /**
1851    ***  Metadata Pieces
1852    **/
1853
1854    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= METADATA_PIECE_SIZE )
1855        && popNextMetadataRequest( msgs, &piece ) )
1856    {
1857        char * data;
1858        int dataLen;
1859        tr_bool ok = FALSE;
1860
1861        data = tr_torrentGetMetadataPiece( msgs->torrent, piece, &dataLen );
1862        if( ( dataLen > 0 ) && ( data != NULL ) )
1863        {
1864            tr_benc tmp;
1865            int payloadLen;
1866            char * payload;
1867            tr_peerIo  * io  = msgs->peer->io;
1868            struct evbuffer * out = msgs->outMessages;
1869
1870            /* build the data message */
1871            tr_bencInitDict( &tmp, 3 );
1872            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_DATA );
1873            tr_bencDictAddInt( &tmp, "piece", piece );
1874            tr_bencDictAddInt( &tmp, "total_size", msgs->torrent->infoDictLength );
1875            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1876            tr_bencFree( &tmp );
1877
1878            /* write it out as a LTEP message to our outMessages buffer */
1879            tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + payloadLen + dataLen );
1880            tr_peerIoWriteUint8 ( io, out, BT_LTEP );
1881            tr_peerIoWriteUint8 ( io, out, msgs->ut_metadata_id );
1882            tr_peerIoWriteBytes ( io, out, payload, payloadLen );
1883            tr_peerIoWriteBytes ( io, out, data, dataLen );
1884            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1885            dbgOutMessageLen( msgs );
1886
1887            tr_free( payload );
1888            tr_free( data );
1889
1890            ok = TRUE;
1891        }
1892
1893        if( !ok ) /* send a rejection message */
1894        {
1895            tr_benc tmp;
1896            int payloadLen;
1897            char * payload;
1898            tr_peerIo  * io  = msgs->peer->io;
1899            struct evbuffer * out = msgs->outMessages;
1900
1901            /* build the rejection message */
1902            tr_bencInitDict( &tmp, 2 );
1903            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1904            tr_bencDictAddInt( &tmp, "piece", piece );
1905            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1906            tr_bencFree( &tmp );
1907
1908            /* write it out as a LTEP message to our outMessages buffer */
1909            tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + payloadLen );
1910            tr_peerIoWriteUint8 ( io, out, BT_LTEP );
1911            tr_peerIoWriteUint8 ( io, out, msgs->ut_metadata_id );
1912            tr_peerIoWriteBytes ( io, out, payload, payloadLen );
1913            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1914            dbgOutMessageLen( msgs );
1915
1916            tr_free( payload );
1917        }
1918    }
1919
1920    /**
1921    ***  Data Blocks
1922    **/
1923
1924    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= msgs->torrent->blockSize )
1925        && popNextRequest( msgs, &req ) )
1926    {
1927        --msgs->prefetchCount;
1928
1929        if( requestIsValid( msgs, &req )
1930            && tr_cpPieceIsComplete( &msgs->torrent->completion, req.index ) )
1931        {
1932            /* FIXME(libevent2) use evbuffer_reserve_space() + evbuffer_commit_space() */
1933            int err;
1934            const uint32_t msglen = 4 + 1 + 4 + 4 + req.length;
1935            struct evbuffer * out;
1936            tr_peerIo * io = msgs->peer->io;
1937
1938            out = evbuffer_new( );
1939            evbuffer_expand( out, msglen );
1940
1941            tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1942            tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1943            tr_peerIoWriteUint32( io, out, req.index );
1944            tr_peerIoWriteUint32( io, out, req.offset );
1945
1946            err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, EVBUFFER_DATA(out)+EVBUFFER_LENGTH(out) );
1947            if( err )
1948            {
1949                if( fext )
1950                    protocolSendReject( msgs, &req );
1951            }
1952            else
1953            {
1954                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1955                EVBUFFER_LENGTH(out) += req.length;
1956                assert( EVBUFFER_LENGTH( out ) == msglen );
1957                tr_peerIoWriteBuf( io, out, TRUE );
1958                bytesWritten += EVBUFFER_LENGTH( out );
1959                msgs->clientSentAnythingAt = now;
1960            }
1961
1962            evbuffer_free( out );
1963
1964            if( err )
1965            {
1966                bytesWritten = 0;
1967                msgs = NULL;
1968            }
1969        }
1970        else if( fext ) /* peer needs a reject message */
1971        {
1972            protocolSendReject( msgs, &req );
1973        }
1974
1975        if( msgs != NULL )
1976            prefetchPieces( msgs );
1977    }
1978
1979    /**
1980    ***  Keepalive
1981    **/
1982
1983    if( ( msgs != NULL )
1984        && ( msgs->clientSentAnythingAt != 0 )
1985        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1986    {
1987        dbgmsg( msgs, "sending a keepalive message" );
1988        tr_peerIoWriteUint32( msgs->peer->io, msgs->outMessages, 0 );
1989        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1990    }
1991
1992    return bytesWritten;
1993}
1994
1995static int
1996peerPulse( void * vmsgs )
1997{
1998    tr_peermsgs * msgs = vmsgs;
1999    const time_t  now = tr_time( );
2000
2001    if ( tr_isPeerIo( msgs->peer->io ) ) {
2002        updateDesiredRequestCount( msgs, now );
2003        updateBlockRequests( msgs );
2004        updateMetadataRequests( msgs, now );
2005    }
2006
2007    for( ;; )
2008        if( fillOutputBuffer( msgs, now ) < 1 )
2009            break;
2010
2011    return TRUE; /* loop forever */
2012}
2013
2014void
2015tr_peerMsgsPulse( tr_peermsgs * msgs )
2016{
2017    if( msgs != NULL )
2018        peerPulse( msgs );
2019}
2020
2021static void
2022gotError( tr_peerIo  * io UNUSED,
2023          short        what,
2024          void       * vmsgs )
2025{
2026    if( what & EVBUFFER_TIMEOUT )
2027        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
2028    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
2029        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
2030               what, errno, tr_strerror( errno ) );
2031    fireError( vmsgs, ENOTCONN );
2032}
2033
2034static void
2035sendBitfield( tr_peermsgs * msgs )
2036{
2037    struct evbuffer * out = msgs->outMessages;
2038    tr_bitfield *     field;
2039    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
2040    size_t            i;
2041    size_t            lazyCount = 0;
2042
2043    field = tr_bitfieldDup( tr_cpPieceBitfield( &msgs->torrent->completion ) );
2044
2045    if( tr_sessionIsLazyBitfieldEnabled( getSession( msgs ) ) )
2046    {
2047        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
2048            speed over a truly random sample -- let's limit the pool size to
2049            the first 1000 pieces so large torrents don't bog things down */
2050        size_t poolSize;
2051        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
2052        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
2053
2054        /* build the pool */
2055        for( i=poolSize=0; i<maxPoolSize; ++i )
2056            if( tr_bitfieldHas( field, i ) )
2057                pool[poolSize++] = i;
2058
2059        /* pull random piece indices from the pool */
2060        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
2061        {
2062            const int pos = tr_cryptoWeakRandInt( poolSize );
2063            const tr_piece_index_t piece = pool[pos];
2064            tr_bitfieldRem( field, piece );
2065            lazyPieces[lazyCount++] = piece;
2066            pool[pos] = pool[--poolSize];
2067        }
2068
2069        /* cleanup */
2070        tr_free( pool );
2071    }
2072
2073    tr_peerIoWriteUint32( msgs->peer->io, out,
2074                          sizeof( uint8_t ) + field->byteCount );
2075    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_BITFIELD );
2076    /* FIXME(libevent2): use evbuffer_add_reference() */
2077    tr_peerIoWriteBytes ( msgs->peer->io, out, field->bits, field->byteCount );
2078    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu",
2079            EVBUFFER_LENGTH( out ) );
2080    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
2081
2082    for( i = 0; i < lazyCount; ++i )
2083        protocolSendHave( msgs, lazyPieces[i] );
2084
2085    tr_bitfieldFree( field );
2086}
2087
2088static void
2089tellPeerWhatWeHave( tr_peermsgs * msgs )
2090{
2091    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
2092
2093    if( fext && ( tr_cpGetStatus( &msgs->torrent->completion ) == TR_SEED ) )
2094    {
2095        protocolSendHaveAll( msgs );
2096    }
2097    else if( fext && ( tr_cpHaveValid( &msgs->torrent->completion ) == 0 ) )
2098    {
2099        protocolSendHaveNone( msgs );
2100    }
2101    else
2102    {
2103        sendBitfield( msgs );
2104    }
2105}
2106
2107/**
2108***
2109**/
2110
2111/* some peers give us error messages if we send
2112   more than this many peers in a single pex message
2113   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
2114#define MAX_PEX_ADDED 50
2115#define MAX_PEX_DROPPED 50
2116
2117typedef struct
2118{
2119    tr_pex *  added;
2120    tr_pex *  dropped;
2121    tr_pex *  elements;
2122    int       addedCount;
2123    int       droppedCount;
2124    int       elementCount;
2125}
2126PexDiffs;
2127
2128static void
2129pexAddedCb( void * vpex,
2130            void * userData )
2131{
2132    PexDiffs * diffs = userData;
2133    tr_pex *   pex = vpex;
2134
2135    if( diffs->addedCount < MAX_PEX_ADDED )
2136    {
2137        diffs->added[diffs->addedCount++] = *pex;
2138        diffs->elements[diffs->elementCount++] = *pex;
2139    }
2140}
2141
2142static TR_INLINE void
2143pexDroppedCb( void * vpex,
2144              void * userData )
2145{
2146    PexDiffs * diffs = userData;
2147    tr_pex *   pex = vpex;
2148
2149    if( diffs->droppedCount < MAX_PEX_DROPPED )
2150    {
2151        diffs->dropped[diffs->droppedCount++] = *pex;
2152    }
2153}
2154
2155static TR_INLINE void
2156pexElementCb( void * vpex,
2157              void * userData )
2158{
2159    PexDiffs * diffs = userData;
2160    tr_pex * pex = vpex;
2161
2162    diffs->elements[diffs->elementCount++] = *pex;
2163}
2164
2165static void
2166sendPex( tr_peermsgs * msgs )
2167{
2168    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2169    {
2170        PexDiffs diffs;
2171        PexDiffs diffs6;
2172        tr_pex * newPex = NULL;
2173        tr_pex * newPex6 = NULL;
2174        const int newCount = tr_peerMgrGetPeers( msgs->torrent, &newPex, TR_AF_INET, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2175        const int newCount6 = tr_peerMgrGetPeers( msgs->torrent, &newPex6, TR_AF_INET6, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2176
2177        /* build the diffs */
2178        diffs.added = tr_new( tr_pex, newCount );
2179        diffs.addedCount = 0;
2180        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2181        diffs.droppedCount = 0;
2182        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2183        diffs.elementCount = 0;
2184        tr_set_compare( msgs->pex, msgs->pexCount,
2185                        newPex, newCount,
2186                        tr_pexCompare, sizeof( tr_pex ),
2187                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2188        diffs6.added = tr_new( tr_pex, newCount6 );
2189        diffs6.addedCount = 0;
2190        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
2191        diffs6.droppedCount = 0;
2192        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
2193        diffs6.elementCount = 0;
2194        tr_set_compare( msgs->pex6, msgs->pexCount6,
2195                        newPex6, newCount6,
2196                        tr_pexCompare, sizeof( tr_pex ),
2197                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
2198        dbgmsg(
2199            msgs,
2200            "pex: old peer count %d+%d, new peer count %d+%d, "
2201            "added %d+%d, removed %d+%d",
2202            msgs->pexCount, msgs->pexCount6, newCount, newCount6,
2203            diffs.addedCount, diffs6.addedCount,
2204            diffs.droppedCount, diffs6.droppedCount );
2205
2206        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2207            !diffs6.droppedCount )
2208        {
2209            tr_free( diffs.elements );
2210            tr_free( diffs6.elements );
2211        }
2212        else
2213        {
2214            int  i;
2215            tr_benc val;
2216            char * benc;
2217            int bencLen;
2218            uint8_t * tmp, *walk;
2219            tr_peerIo       * io  = msgs->peer->io;
2220            struct evbuffer * out = msgs->outMessages;
2221
2222            /* update peer */
2223            tr_free( msgs->pex );
2224            msgs->pex = diffs.elements;
2225            msgs->pexCount = diffs.elementCount;
2226            tr_free( msgs->pex6 );
2227            msgs->pex6 = diffs6.elements;
2228            msgs->pexCount6 = diffs6.elementCount;
2229
2230            /* build the pex payload */
2231            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
2232                                         * speed vs. likelihood? */
2233
2234            if( diffs.addedCount > 0)
2235            {
2236                /* "added" */
2237                tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2238                for( i = 0; i < diffs.addedCount; ++i ) {
2239                    memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2240                    memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2241                }
2242                assert( ( walk - tmp ) == diffs.addedCount * 6 );
2243                tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2244                tr_free( tmp );
2245
2246                /* "added.f" */
2247                tmp = walk = tr_new( uint8_t, diffs.addedCount );
2248                for( i = 0; i < diffs.addedCount; ++i )
2249                    *walk++ = diffs.added[i].flags;
2250                assert( ( walk - tmp ) == diffs.addedCount );
2251                tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2252                tr_free( tmp );
2253            }
2254
2255            if( diffs.droppedCount > 0 )
2256            {
2257                /* "dropped" */
2258                tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2259                for( i = 0; i < diffs.droppedCount; ++i ) {
2260                    memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2261                    memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2262                }
2263                assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2264                tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2265                tr_free( tmp );
2266            }
2267
2268            if( diffs6.addedCount > 0 )
2269            {
2270                /* "added6" */
2271                tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2272                for( i = 0; i < diffs6.addedCount; ++i ) {
2273                    memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2274                    walk += 16;
2275                    memcpy( walk, &diffs6.added[i].port, 2 );
2276                    walk += 2;
2277                }
2278                assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2279                tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2280                tr_free( tmp );
2281
2282                /* "added6.f" */
2283                tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2284                for( i = 0; i < diffs6.addedCount; ++i )
2285                    *walk++ = diffs6.added[i].flags;
2286                assert( ( walk - tmp ) == diffs6.addedCount );
2287                tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2288                tr_free( tmp );
2289            }
2290
2291            if( diffs6.droppedCount > 0 )
2292            {
2293                /* "dropped6" */
2294                tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2295                for( i = 0; i < diffs6.droppedCount; ++i ) {
2296                    memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2297                    walk += 16;
2298                    memcpy( walk, &diffs6.dropped[i].port, 2 );
2299                    walk += 2;
2300                }
2301                assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2302                tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2303                tr_free( tmp );
2304            }
2305
2306            /* write the pex message */
2307            benc = tr_bencToStr( &val, TR_FMT_BENC, &bencLen );
2308            tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + bencLen );
2309            tr_peerIoWriteUint8 ( io, out, BT_LTEP );
2310            tr_peerIoWriteUint8 ( io, out, msgs->ut_pex_id );
2311            tr_peerIoWriteBytes ( io, out, benc, bencLen );
2312            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2313            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
2314            dbgOutMessageLen( msgs );
2315
2316            tr_free( benc );
2317            tr_bencFree( &val );
2318        }
2319
2320        /* cleanup */
2321        tr_free( diffs.added );
2322        tr_free( diffs.dropped );
2323        tr_free( newPex );
2324        tr_free( diffs6.added );
2325        tr_free( diffs6.dropped );
2326        tr_free( newPex6 );
2327
2328        /*msgs->clientSentPexAt = tr_time( );*/
2329    }
2330}
2331
2332static void
2333pexPulse( int foo UNUSED, short bar UNUSED, void * vmsgs )
2334{
2335    struct tr_peermsgs * msgs = vmsgs;
2336
2337    sendPex( msgs );
2338
2339    tr_timerAdd( &msgs->pexTimer, PEX_INTERVAL_SECS, 0 );
2340}
2341
2342/**
2343***
2344**/
2345
2346tr_peermsgs*
2347tr_peerMsgsNew( struct tr_torrent * torrent,
2348                struct tr_peer    * peer,
2349                tr_delivery_func    func,
2350                void              * userData,
2351                tr_publisher_tag  * setme )
2352{
2353    tr_peermsgs * m;
2354
2355    assert( peer );
2356    assert( peer->io );
2357
2358    m = tr_new0( tr_peermsgs, 1 );
2359    m->publisher = TR_PUBLISHER_INIT;
2360    m->peer = peer;
2361    m->torrent = torrent;
2362    m->peer->clientIsChoked = 1;
2363    m->peer->peerIsChoked = 1;
2364    m->peer->clientIsInterested = 0;
2365    m->peer->peerIsInterested = 0;
2366    m->state = AWAITING_BT_LENGTH;
2367    m->outMessages = evbuffer_new( );
2368    m->outMessagesBatchedAt = 0;
2369    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2370    m->incoming.block = evbuffer_new( );
2371    m->peerAskedForCount = 0;
2372    evtimer_set( &m->pexTimer, pexPulse, m );
2373    tr_timerAdd( &m->pexTimer, PEX_INTERVAL_SECS, 0 );
2374    peer->msgs = m;
2375
2376    *setme = tr_publisherSubscribe( &m->publisher, func, userData );
2377
2378    if( tr_peerIoSupportsLTEP( peer->io ) )
2379        sendLtepHandshake( m );
2380
2381    if(tr_peerIoSupportsDHT(peer->io)) {
2382        /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
2383        const struct tr_address *addr = tr_peerIoGetAddress( peer->io, NULL );
2384        if( addr->type == TR_AF_INET || tr_globalIPv6() ) {
2385            protocolSendPort( m, tr_dhtPort( torrent->session ) );
2386        }
2387    }
2388
2389    tellPeerWhatWeHave( m );
2390
2391    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2392    updateDesiredRequestCount( m, tr_date( ) );
2393
2394    return m;
2395}
2396
2397void
2398tr_peerMsgsFree( tr_peermsgs* msgs )
2399{
2400    if( msgs )
2401    {
2402        evtimer_del( &msgs->pexTimer );
2403        tr_publisherDestruct( &msgs->publisher );
2404
2405        evbuffer_free( msgs->incoming.block );
2406        evbuffer_free( msgs->outMessages );
2407        tr_free( msgs->pex6 );
2408        tr_free( msgs->pex );
2409
2410        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2411        tr_free( msgs );
2412    }
2413}
2414
2415void
2416tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2417                        tr_publisher_tag tag )
2418{
2419    tr_publisherUnsubscribe( &peer->publisher, tag );
2420}
Note: See TracBrowser for help on using the repository browser.