source: trunk/libtransmission/peer-msgs.c @ 9659

Last change on this file since 9659 was 9659, checked in by charles, 13 years ago

(trunk libT) #2636: "wrong peer progress update" -- possible fix as described in comment:2

  • Property svn:keywords set to Date Rev Author Id
File size: 71.4 KB
Line 
1/*
2 * This file Copyright (C) 2007-2009 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 9659 2009-12-03 03:11:22Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <event.h>
21
22#include "transmission.h"
23#include "bencode.h"
24#include "completion.h"
25#include "crypto.h"
26#include "inout.h"
27#ifdef WIN32
28#include "net.h" /* for ECONN */
29#endif
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-msgs.h"
33#include "platform.h" /* MAX_STACK_ARRAY_SIZE */
34#include "ratecontrol.h"
35#include "session.h"
36#include "stats.h"
37#include "torrent.h"
38#include "torrent-magnet.h"
39#include "tr-dht.h"
40#include "utils.h"
41#include "version.h"
42
43/**
44***
45**/
46
47enum
48{
49    BT_CHOKE                = 0,
50    BT_UNCHOKE              = 1,
51    BT_INTERESTED           = 2,
52    BT_NOT_INTERESTED       = 3,
53    BT_HAVE                 = 4,
54    BT_BITFIELD             = 5,
55    BT_REQUEST              = 6,
56    BT_PIECE                = 7,
57    BT_CANCEL               = 8,
58    BT_PORT                 = 9,
59
60    BT_FEXT_SUGGEST         = 13,
61    BT_FEXT_HAVE_ALL        = 14,
62    BT_FEXT_HAVE_NONE       = 15,
63    BT_FEXT_REJECT          = 16,
64    BT_FEXT_ALLOWED_FAST    = 17,
65
66    BT_LTEP                 = 20,
67
68    LTEP_HANDSHAKE          = 0,
69
70    UT_PEX_ID               = 1,
71    UT_METADATA_ID          = 3,
72
73    MAX_PEX_PEER_COUNT      = 50,
74
75    MIN_CHOKE_PERIOD_SEC    = 10,
76
77    /* idle seconds before we send a keepalive */
78    KEEPALIVE_INTERVAL_SECS = 100,
79
80    PEX_INTERVAL_SECS       = 90, /* sec between sendPex() calls */
81
82    REQQ                    = 512,
83
84    METADATA_REQQ           = 64,
85
86    MAX_BLOCK_SIZE          = ( 1024 * 16 ),
87
88    /* used in lowering the outMessages queue period */
89    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
90    HIGH_PRIORITY_INTERVAL_SECS = 2,
91    LOW_PRIORITY_INTERVAL_SECS = 10,
92
93    /* number of pieces to remove from the bitfield when
94     * lazy bitfields are turned on */
95    LAZY_PIECE_COUNT = 26,
96
97    /* number of pieces we'll allow in our fast set */
98    MAX_FAST_SET_SIZE = 3,
99
100    /* defined in BEP #9 */
101    METADATA_MSG_TYPE_REQUEST = 0,
102    METADATA_MSG_TYPE_DATA = 1,
103    METADATA_MSG_TYPE_REJECT = 2
104};
105
106enum
107{
108    AWAITING_BT_LENGTH,
109    AWAITING_BT_ID,
110    AWAITING_BT_MESSAGE,
111    AWAITING_BT_PIECE
112};
113
114/**
115***
116**/
117
118struct peer_request
119{
120    uint32_t    index;
121    uint32_t    offset;
122    uint32_t    length;
123};
124
125static uint32_t
126getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
127{
128    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
129    const uint64_t blockPos = tor->blockSize * b;
130    assert( blockPos >= piecePos );
131    return (uint32_t)( blockPos - piecePos );
132}
133
134static void
135blockToReq( const tr_torrent     * tor,
136            tr_block_index_t       block,
137            struct peer_request  * setme )
138{
139    assert( setme != NULL );
140
141    setme->index = tr_torBlockPiece( tor, block );
142    setme->offset = getBlockOffsetInPiece( tor, block );
143    setme->length = tr_torBlockCountBytes( tor, block );
144}
145
146/**
147***
148**/
149
150/* this is raw, unchanged data from the peer regarding
151 * the current message that it's sending us. */
152struct tr_incoming
153{
154    uint8_t                id;
155    uint32_t               length; /* includes the +1 for id length */
156    struct peer_request    blockReq; /* metadata for incoming blocks */
157    struct evbuffer *      block; /* piece data for incoming blocks */
158};
159
160/**
161 * Low-level communication state information about a connected peer.
162 *
163 * This structure remembers the low-level protocol states that we're
164 * in with this peer, such as active requests, pex messages, and so on.
165 * Its fields are all private to peer-msgs.c.
166 *
167 * Data not directly involved with sending & receiving messages is
168 * stored in tr_peer, where it can be accessed by both peermsgs and
169 * the peer manager.
170 *
171 * @see struct peer_atom
172 * @see tr_peer
173 */
174struct tr_peermsgs
175{
176    tr_bool         peerSupportsPex;
177    tr_bool         peerSupportsMetadataXfer;
178    tr_bool         clientSentLtepHandshake;
179    tr_bool         peerSentLtepHandshake;
180
181    /*tr_bool         haveFastSet;*/
182
183    int             activeRequestCount;
184    int             desiredRequestCount;
185
186    int             prefetchCount;
187
188    /* how long the outMessages batch should be allowed to grow before
189     * it's flushed -- some messages (like requests >:) should be sent
190     * very quickly; others aren't as urgent. */
191    int8_t          outMessagesBatchPeriod;
192
193    uint8_t         state;
194    uint8_t         ut_pex_id;
195    uint8_t         ut_metadata_id;
196    uint16_t        pexCount;
197    uint16_t        pexCount6;
198
199#if 0
200    size_t                 fastsetSize;
201    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
202#endif
203
204    tr_peer *              peer;
205
206    tr_torrent *           torrent;
207
208    tr_publisher           publisher;
209
210    struct evbuffer *      outMessages; /* all the non-piece messages */
211
212    struct peer_request    peerAskedFor[REQQ];
213    int                    peerAskedForCount;
214
215    int                    peerAskedForMetadata[METADATA_REQQ];
216    int                    peerAskedForMetadataCount;
217
218    tr_pex               * pex;
219    tr_pex               * pex6;
220
221    /*time_t                 clientSentPexAt;*/
222    time_t                 clientSentAnythingAt;
223
224    /* when we started batching the outMessages */
225    time_t                outMessagesBatchedAt;
226
227    struct tr_incoming    incoming;
228
229    /* if the peer supports the Extension Protocol in BEP 10 and
230       supplied a reqq argument, it's stored here.  otherwise the
231       value is zero and should be ignored. */
232    int64_t               reqq;
233
234    struct event          pexTimer;
235};
236
237/**
238***
239**/
240
241#if 0
242static tr_bitfield*
243getHave( const struct tr_peermsgs * msgs )
244{
245    if( msgs->peer->have == NULL )
246        msgs->peer->have = tr_bitfieldNew( msgs->torrent->info.pieceCount );
247    return msgs->peer->have;
248}
249#endif
250
251static TR_INLINE tr_session*
252getSession( struct tr_peermsgs * msgs )
253{
254    return msgs->torrent->session;
255}
256
257/**
258***
259**/
260
261static void
262myDebug( const char * file, int line,
263         const struct tr_peermsgs * msgs,
264         const char * fmt, ... )
265{
266    FILE * fp = tr_getLog( );
267
268    if( fp )
269    {
270        va_list           args;
271        char              timestr[64];
272        struct evbuffer * buf = evbuffer_new( );
273        char *            base = tr_basename( file );
274
275        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
276                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
277                             tr_torrentName( msgs->torrent ),
278                             tr_peerIoGetAddrStr( msgs->peer->io ),
279                             msgs->peer->client );
280        va_start( args, fmt );
281        evbuffer_add_vprintf( buf, fmt, args );
282        va_end( args );
283        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
284        /* FIXME(libevent2) tr_getLog() should return an fd, then use evbuffer_write() here */
285        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
286
287        tr_free( base );
288        evbuffer_free( buf );
289    }
290}
291
292#define dbgmsg( msgs, ... ) \
293    do { \
294        if( tr_deepLoggingIsActive( ) ) \
295            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
296    } while( 0 )
297
298/**
299***
300**/
301
302static void
303pokeBatchPeriod( tr_peermsgs * msgs,
304                 int           interval )
305{
306    if( msgs->outMessagesBatchPeriod > interval )
307    {
308        msgs->outMessagesBatchPeriod = interval;
309        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
310    }
311}
312
313static TR_INLINE void
314dbgOutMessageLen( tr_peermsgs * msgs )
315{
316    dbgmsg( msgs, "outMessage size is now %zu", EVBUFFER_LENGTH( msgs->outMessages ) );
317}
318
319static void
320protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
321{
322    tr_peerIo       * io  = msgs->peer->io;
323    struct evbuffer * out = msgs->outMessages;
324
325    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
326
327    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
328    tr_peerIoWriteUint8 ( io, out, BT_FEXT_REJECT );
329    tr_peerIoWriteUint32( io, out, req->index );
330    tr_peerIoWriteUint32( io, out, req->offset );
331    tr_peerIoWriteUint32( io, out, req->length );
332
333    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
334    dbgOutMessageLen( msgs );
335}
336
337static void
338protocolSendRequest( tr_peermsgs               * msgs,
339                     const struct peer_request * req )
340{
341    tr_peerIo       * io  = msgs->peer->io;
342    struct evbuffer * out = msgs->outMessages;
343
344    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
345    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
346    tr_peerIoWriteUint32( io, out, req->index );
347    tr_peerIoWriteUint32( io, out, req->offset );
348    tr_peerIoWriteUint32( io, out, req->length );
349
350    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
351    dbgOutMessageLen( msgs );
352    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
353}
354
355static void
356protocolSendCancel( tr_peermsgs               * msgs,
357                    const struct peer_request * req )
358{
359    tr_peerIo       * io  = msgs->peer->io;
360    struct evbuffer * out = msgs->outMessages;
361
362    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
363    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
364    tr_peerIoWriteUint32( io, out, req->index );
365    tr_peerIoWriteUint32( io, out, req->offset );
366    tr_peerIoWriteUint32( io, out, req->length );
367
368    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
369    dbgOutMessageLen( msgs );
370    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
371}
372
373static void
374protocolSendPort(tr_peermsgs *msgs, uint16_t port)
375{
376    tr_peerIo       * io  = msgs->peer->io;
377    struct evbuffer * out = msgs->outMessages;
378
379    dbgmsg( msgs, "sending Port %u", port);
380    tr_peerIoWriteUint32( io, out, 3 );
381    tr_peerIoWriteUint8 ( io, out, BT_PORT );
382    tr_peerIoWriteUint16( io, out, port);
383}
384
385static void
386protocolSendHave( tr_peermsgs * msgs,
387                  uint32_t      index )
388{
389    tr_peerIo       * io  = msgs->peer->io;
390    struct evbuffer * out = msgs->outMessages;
391
392    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
393    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
394    tr_peerIoWriteUint32( io, out, index );
395
396    dbgmsg( msgs, "sending Have %u", index );
397    dbgOutMessageLen( msgs );
398    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
399}
400
401#if 0
402static void
403protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
404{
405    tr_peerIo       * io  = msgs->peer->io;
406    struct evbuffer * out = msgs->outMessages;
407
408    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
409
410    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
411    tr_peerIoWriteUint8 ( io, out, BT_FEXT_ALLOWED_FAST );
412    tr_peerIoWriteUint32( io, out, pieceIndex );
413
414    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
415    dbgOutMessageLen( msgs );
416}
417#endif
418
419static void
420protocolSendChoke( tr_peermsgs * msgs,
421                   int           choke )
422{
423    tr_peerIo       * io  = msgs->peer->io;
424    struct evbuffer * out = msgs->outMessages;
425
426    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
427    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
428
429    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
430    dbgOutMessageLen( msgs );
431    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
432}
433
434static void
435protocolSendHaveAll( tr_peermsgs * msgs )
436{
437    tr_peerIo       * io  = msgs->peer->io;
438    struct evbuffer * out = msgs->outMessages;
439
440    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
441
442    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
443    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_ALL );
444
445    dbgmsg( msgs, "sending HAVE_ALL..." );
446    dbgOutMessageLen( msgs );
447    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
448}
449
450static void
451protocolSendHaveNone( tr_peermsgs * msgs )
452{
453    tr_peerIo       * io  = msgs->peer->io;
454    struct evbuffer * out = msgs->outMessages;
455
456    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
457
458    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
459    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_NONE );
460
461    dbgmsg( msgs, "sending HAVE_NONE..." );
462    dbgOutMessageLen( msgs );
463    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
464}
465
466/**
467***  EVENTS
468**/
469
470static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0, 0, 0 };
471
472static void
473publish( tr_peermsgs * msgs, tr_peer_event * e )
474{
475    assert( msgs->peer );
476    assert( msgs->peer->msgs == msgs );
477
478    tr_publisherPublish( &msgs->publisher, msgs->peer, e );
479}
480
481static void
482fireError( tr_peermsgs * msgs, int err )
483{
484    tr_peer_event e = blankEvent;
485    e.eventType = TR_PEER_ERROR;
486    e.err = err;
487    publish( msgs, &e );
488}
489
490static void
491fireUploadOnly( tr_peermsgs * msgs, tr_bool uploadOnly )
492{
493    tr_peer_event e = blankEvent;
494    e.eventType = TR_PEER_UPLOAD_ONLY;
495    e.uploadOnly = uploadOnly;
496    publish( msgs, &e );
497}
498
499static void
500firePeerProgress( tr_peermsgs * msgs )
501{
502    tr_peer_event e = blankEvent;
503    e.eventType = TR_PEER_PEER_PROGRESS;
504    e.progress = msgs->peer->progress;
505    publish( msgs, &e );
506}
507
508static void
509fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
510{
511    tr_peer_event e = blankEvent;
512    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
513    e.pieceIndex = req->index;
514    e.offset = req->offset;
515    e.length = req->length;
516    publish( msgs, &e );
517}
518
519static void
520fireGotRej( tr_peermsgs * msgs, const struct peer_request * req )
521{
522    tr_peer_event e = blankEvent;
523    e.eventType = TR_PEER_CLIENT_GOT_REJ;
524    e.pieceIndex = req->index;
525    e.offset = req->offset;
526    e.length = req->length;
527    publish( msgs, &e );
528}
529
530static void
531fireGotChoke( tr_peermsgs * msgs )
532{
533    tr_peer_event e = blankEvent;
534    e.eventType = TR_PEER_CLIENT_GOT_CHOKE;
535    publish( msgs, &e );
536}
537
538static void
539fireClientGotData( tr_peermsgs * msgs,
540                   uint32_t      length,
541                   int           wasPieceData )
542{
543    tr_peer_event e = blankEvent;
544
545    e.length = length;
546    e.eventType = TR_PEER_CLIENT_GOT_DATA;
547    e.wasPieceData = wasPieceData;
548    publish( msgs, &e );
549}
550
551static void
552fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
553{
554    tr_peer_event e = blankEvent;
555    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
556    e.pieceIndex = pieceIndex;
557    publish( msgs, &e );
558}
559
560static void
561fireClientGotPort( tr_peermsgs * msgs, tr_port port )
562{
563    tr_peer_event e = blankEvent;
564    e.eventType = TR_PEER_CLIENT_GOT_PORT;
565    e.port = port;
566    publish( msgs, &e );
567}
568
569static void
570fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
571{
572    tr_peer_event e = blankEvent;
573    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
574    e.pieceIndex = pieceIndex;
575    publish( msgs, &e );
576}
577
578static void
579firePeerGotData( tr_peermsgs  * msgs,
580                 uint32_t       length,
581                 int            wasPieceData )
582{
583    tr_peer_event e = blankEvent;
584
585    e.length = length;
586    e.eventType = TR_PEER_PEER_GOT_DATA;
587    e.wasPieceData = wasPieceData;
588
589    publish( msgs, &e );
590}
591
592/**
593***  ALLOWED FAST SET
594***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
595**/
596
597size_t
598tr_generateAllowedSet( tr_piece_index_t * setmePieces,
599                       size_t             desiredSetSize,
600                       size_t             pieceCount,
601                       const uint8_t    * infohash,
602                       const tr_address * addr )
603{
604    size_t setSize = 0;
605
606    assert( setmePieces );
607    assert( desiredSetSize <= pieceCount );
608    assert( desiredSetSize );
609    assert( pieceCount );
610    assert( infohash );
611    assert( addr );
612
613    if( addr->type == TR_AF_INET )
614    {
615        uint8_t w[SHA_DIGEST_LENGTH + 4], *walk=w;
616        uint8_t x[SHA_DIGEST_LENGTH];
617
618        uint32_t ui32 = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
619        memcpy( w, &ui32, sizeof( uint32_t ) );
620        walk += sizeof( uint32_t );
621        memcpy( walk, infohash, SHA_DIGEST_LENGTH );                 /* (2) */
622        walk += SHA_DIGEST_LENGTH;
623        tr_sha1( x, w, walk-w, NULL );                               /* (3) */
624        assert( sizeof( w ) == walk-w );
625
626        while( setSize<desiredSetSize )
627        {
628            int i;
629            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
630            {
631                size_t k;
632                uint32_t j = i * 4;                                  /* (5) */
633                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
634                uint32_t index = y % pieceCount;                     /* (7) */
635
636                for( k=0; k<setSize; ++k )                           /* (8) */
637                    if( setmePieces[k] == index )
638                        break;
639
640                if( k == setSize )
641                    setmePieces[setSize++] = index;                  /* (9) */
642            }
643
644            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
645        }
646    }
647
648    return setSize;
649}
650
651static void
652updateFastSet( tr_peermsgs * msgs UNUSED )
653{
654#if 0
655    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
656    const int peerIsNeedy = msgs->peer->progress < 0.10;
657
658    if( fext && peerIsNeedy && !msgs->haveFastSet )
659    {
660        size_t i;
661        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
662        const tr_info * inf = &msgs->torrent->info;
663        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
664
665        /* build the fast set */
666        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
667        msgs->haveFastSet = 1;
668
669        /* send it to the peer */
670        for( i=0; i<msgs->fastsetSize; ++i )
671            protocolSendAllowedFast( msgs, msgs->fastset[i] );
672    }
673#endif
674}
675
676/**
677***  INTEREST
678**/
679
680static tr_bool
681isPieceInteresting( const tr_peermsgs * msgs,
682                    tr_piece_index_t    piece )
683{
684    const tr_torrent * torrent = msgs->torrent;
685
686    return ( !torrent->info.pieces[piece].dnd )                  /* we want it */
687          && ( !tr_cpPieceIsComplete( &torrent->completion, piece ) ) /* !have */
688          && ( tr_bitsetHas( &msgs->peer->have, piece ) );      /* peer has it */
689}
690
691/* "interested" means we'll ask for piece data if they unchoke us */
692static tr_bool
693isPeerInteresting( const tr_peermsgs * msgs )
694{
695    tr_piece_index_t    i;
696    const tr_torrent *  torrent;
697    const tr_bitfield * bitfield;
698    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
699
700    if( clientIsSeed )
701        return FALSE;
702
703    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
704        return FALSE;
705
706    torrent = msgs->torrent;
707    bitfield = tr_cpPieceBitfield( &torrent->completion );
708
709    for( i = 0; i < torrent->info.pieceCount; ++i )
710        if( isPieceInteresting( msgs, i ) )
711            return TRUE;
712
713    return FALSE;
714}
715
716static void
717sendInterest( tr_peermsgs * msgs,
718              int           weAreInterested )
719{
720    struct evbuffer * out = msgs->outMessages;
721
722    assert( msgs );
723    assert( weAreInterested == 0 || weAreInterested == 1 );
724
725    msgs->peer->clientIsInterested = weAreInterested;
726    dbgmsg( msgs, "Sending %s", weAreInterested ? "Interested" : "Not Interested" );
727    tr_peerIoWriteUint32( msgs->peer->io, out, sizeof( uint8_t ) );
728    tr_peerIoWriteUint8 ( msgs->peer->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
729
730    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
731    dbgOutMessageLen( msgs );
732}
733
734static void
735updateInterest( tr_peermsgs * msgs )
736{
737    const int i = isPeerInteresting( msgs );
738
739    if( i != msgs->peer->clientIsInterested )
740        sendInterest( msgs, i );
741}
742
743static tr_bool
744popNextMetadataRequest( tr_peermsgs * msgs, int * piece )
745{
746    if( msgs->peerAskedForMetadataCount == 0 )
747        return FALSE;
748
749    *piece = msgs->peerAskedForMetadata[0];
750
751    tr_removeElementFromArray( msgs->peerAskedForMetadata, 0, sizeof( int ),
752                               msgs->peerAskedForMetadataCount-- );
753
754    return TRUE;
755}
756
757static tr_bool
758popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
759{
760    if( msgs->peerAskedForCount == 0 )
761        return FALSE;
762
763    *setme = msgs->peerAskedFor[0];
764
765    tr_removeElementFromArray( msgs->peerAskedFor, 0, sizeof( struct peer_request ),
766                               msgs->peerAskedForCount-- );
767
768    return TRUE;
769}
770
771static void
772cancelAllRequestsToClient( tr_peermsgs * msgs )
773{
774    struct peer_request req;
775    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
776
777    while( popNextRequest( msgs, &req ))
778        if( mustSendCancel )
779            protocolSendReject( msgs, &req );
780}
781
782void
783tr_peerMsgsSetChoke( tr_peermsgs * msgs,
784                     int           choke )
785{
786    const time_t now = tr_time( );
787    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
788
789    assert( msgs );
790    assert( msgs->peer );
791    assert( choke == 0 || choke == 1 );
792
793    if( msgs->peer->chokeChangedAt > fibrillationTime )
794    {
795        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
796    }
797    else if( msgs->peer->peerIsChoked != choke )
798    {
799        msgs->peer->peerIsChoked = choke;
800        if( choke )
801            cancelAllRequestsToClient( msgs );
802        protocolSendChoke( msgs, choke );
803        msgs->peer->chokeChangedAt = now;
804    }
805}
806
807/**
808***
809**/
810
811void
812tr_peerMsgsHave( tr_peermsgs * msgs,
813                 uint32_t      index )
814{
815    protocolSendHave( msgs, index );
816
817    /* since we have more pieces now, we might not be interested in this peer */
818    updateInterest( msgs );
819}
820
821/**
822***
823**/
824
825static tr_bool
826reqIsValid( const tr_peermsgs * peer,
827            uint32_t            index,
828            uint32_t            offset,
829            uint32_t            length )
830{
831    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
832}
833
834static tr_bool
835requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
836{
837    return reqIsValid( msgs, req->index, req->offset, req->length );
838}
839
840
841void
842tr_peerMsgsCancel( tr_peermsgs * msgs, tr_block_index_t block )
843{
844    struct peer_request req;
845    blockToReq( msgs->torrent, block, &req );
846    protocolSendCancel( msgs, &req );
847}
848
849/**
850***
851**/
852
853static void
854sendLtepHandshake( tr_peermsgs * msgs )
855{
856    tr_benc val, *m;
857    char * buf;
858    int len;
859    tr_bool allow_pex;
860    tr_bool allow_metadata_xfer;
861    struct evbuffer * out = msgs->outMessages;
862    const unsigned char * ipv6 = tr_globalIPv6();
863
864    if( msgs->clientSentLtepHandshake )
865        return;
866
867    dbgmsg( msgs, "sending an ltep handshake" );
868    msgs->clientSentLtepHandshake = 1;
869
870    /* decide if we want to advertise metadata xfer support (BEP 9) */
871    if( tr_torrentIsPrivate( msgs->torrent ) )
872        allow_metadata_xfer = 0;
873    else
874        allow_metadata_xfer = 1;
875
876    /* decide if we want to advertise pex support */
877    if( !tr_torrentAllowsPex( msgs->torrent ) )
878        allow_pex = 0;
879    else if( msgs->peerSentLtepHandshake )
880        allow_pex = msgs->peerSupportsPex ? 1 : 0;
881    else
882        allow_pex = 1;
883
884    tr_bencInitDict( &val, 8 );
885    tr_bencDictAddInt( &val, "e", getSession(msgs)->encryptionMode != TR_CLEAR_PREFERRED );
886    if( ipv6 != NULL )
887        tr_bencDictAddRaw( &val, "ipv6", ipv6, 16 );
888    if( allow_metadata_xfer && tr_torrentHasMetadata( msgs->torrent )
889                            && ( msgs->torrent->infoDictLength > 0 ) )
890        tr_bencDictAddInt( &val, "metadata_size", msgs->torrent->infoDictLength );
891    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( getSession(msgs) ) );
892    tr_bencDictAddInt( &val, "reqq", REQQ );
893    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
894    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
895    m  = tr_bencDictAddDict( &val, "m", 2 );
896    if( allow_metadata_xfer )
897        tr_bencDictAddInt( m, "ut_metadata", UT_METADATA_ID );
898    if( allow_pex )
899        tr_bencDictAddInt( m, "ut_pex", UT_PEX_ID );
900
901    buf = tr_bencToStr( &val, TR_FMT_BENC, &len );
902
903    tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + len );
904    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
905    tr_peerIoWriteUint8 ( msgs->peer->io, out, LTEP_HANDSHAKE );
906    tr_peerIoWriteBytes ( msgs->peer->io, out, buf, len );
907    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
908    dbgOutMessageLen( msgs );
909
910    /* cleanup */
911    tr_bencFree( &val );
912    tr_free( buf );
913}
914
915static void
916parseLtepHandshake( tr_peermsgs *     msgs,
917                    int               len,
918                    struct evbuffer * inbuf )
919{
920    int64_t   i;
921    tr_benc   val, * sub;
922    uint8_t * tmp = tr_new( uint8_t, len );
923    const uint8_t *addr;
924    size_t addr_len;
925    tr_pex pex;
926
927    memset( &pex, 0, sizeof( tr_pex ) );
928
929    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
930    msgs->peerSentLtepHandshake = 1;
931
932    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
933    {
934        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
935        tr_free( tmp );
936        return;
937    }
938
939    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
940
941    /* does the peer prefer encrypted connections? */
942    if( tr_bencDictFindInt( &val, "e", &i ) ) {
943        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
944                                              : ENCRYPTION_PREFERENCE_NO;
945        if( i )
946            pex.flags |= ADDED_F_ENCRYPTION_FLAG;
947    }
948
949    /* check supported messages for utorrent pex */
950    msgs->peerSupportsPex = 0;
951    msgs->peerSupportsMetadataXfer = 0;
952
953    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
954        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
955            msgs->peerSupportsPex = i != 0;
956            msgs->ut_pex_id = (uint8_t) i;
957            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
958        }
959        if( tr_bencDictFindInt( sub, "ut_metadata", &i ) ) {
960            msgs->peerSupportsMetadataXfer = i != 0;
961            msgs->ut_metadata_id = (uint8_t) i;
962            dbgmsg( msgs, "msgs->ut_metadata_id is %d", (int)msgs->ut_metadata_id );
963        }
964    }
965
966    /* look for metainfo size (BEP 9) */
967    if( tr_bencDictFindInt( &val, "metadata_size", &i ) )
968        tr_torrentSetMetadataSizeHint( msgs->torrent, i );
969
970    /* look for upload_only (BEP 21) */
971    if( tr_bencDictFindInt( &val, "upload_only", &i ) ) {
972        fireUploadOnly( msgs, i!=0 );
973        if( i )
974            pex.flags |= ADDED_F_SEED_FLAG;
975    }
976
977    /* get peer's listening port */
978    if( tr_bencDictFindInt( &val, "p", &i ) ) {
979        fireClientGotPort( msgs, (tr_port)i );
980        pex.port = htons( (uint16_t)i );
981        dbgmsg( msgs, "peer's port is now %d", (int)i );
982    }
983
984    if( tr_bencDictFindRaw( &val, "ipv4", &addr, &addr_len) && addr_len == 4 ) {
985        pex.addr.type = TR_AF_INET;
986        memcpy( &pex.addr.addr.addr4, addr, 4 );
987        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex );
988    }
989
990    if( tr_bencDictFindRaw( &val, "ipv6", &addr, &addr_len) && addr_len == 16 ) {
991        pex.addr.type = TR_AF_INET6;
992        memcpy( &pex.addr.addr.addr6, addr, 16 );
993        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex );
994    }
995
996    /* get peer's maximum request queue size */
997    if( tr_bencDictFindInt( &val, "reqq", &i ) )
998        msgs->reqq = i;
999
1000    tr_bencFree( &val );
1001    tr_free( tmp );
1002}
1003
1004static void
1005parseUtMetadata( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1006{
1007    tr_benc dict;
1008    char * msg_end;
1009    char * benc_end;
1010    int64_t msg_type = -1;
1011    int64_t piece = -1;
1012    int64_t total_size = 0;
1013    uint8_t * tmp = tr_new( uint8_t, msglen );
1014
1015    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1016    msg_end = (char*)tmp + msglen;
1017
1018    if( !tr_bencLoad( tmp, msglen, &dict, &benc_end ) )
1019    {
1020        tr_bencDictFindInt( &dict, "msg_type", &msg_type );
1021        tr_bencDictFindInt( &dict, "piece", &piece );
1022        tr_bencDictFindInt( &dict, "total_size", &total_size );
1023        tr_bencFree( &dict );
1024    }
1025
1026    dbgmsg( msgs, "got ut_metadata msg: type %d, piece %d, total_size %d",
1027            (int)msg_type, (int)piece, (int)total_size );
1028
1029    if( msg_type == METADATA_MSG_TYPE_REJECT )
1030    {
1031        /* NOOP */
1032    }
1033
1034    if( ( msg_type == METADATA_MSG_TYPE_DATA )
1035        && ( !tr_torrentHasMetadata( msgs->torrent ) )
1036        && ( msg_end - benc_end <= METADATA_PIECE_SIZE )
1037        && ( piece * METADATA_PIECE_SIZE + (msg_end - benc_end) <= total_size ) )
1038    {
1039        const int pieceLen = msg_end - benc_end;
1040        tr_torrentSetMetadataPiece( msgs->torrent, piece, benc_end, pieceLen );
1041    }
1042
1043    if( msg_type == METADATA_MSG_TYPE_REQUEST )
1044    {
1045        if( ( piece >= 0 )
1046            && tr_torrentHasMetadata( msgs->torrent )
1047            && !tr_torrentIsPrivate( msgs->torrent )
1048            && ( msgs->peerAskedForMetadataCount < METADATA_REQQ ) )
1049        {
1050            msgs->peerAskedForMetadata[msgs->peerAskedForMetadataCount++] = piece;
1051        }
1052        else
1053        {
1054            tr_benc tmp;
1055            int payloadLen;
1056            char * payload;
1057            tr_peerIo  * io  = msgs->peer->io;
1058            struct evbuffer * out = msgs->outMessages;
1059
1060            /* build the rejection message */
1061            tr_bencInitDict( &tmp, 2 );
1062            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1063            tr_bencDictAddInt( &tmp, "piece", piece );
1064            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1065            tr_bencFree( &tmp );
1066
1067            /* write it out as a LTEP message to our outMessages buffer */
1068            tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + payloadLen );
1069            tr_peerIoWriteUint8 ( io, out, BT_LTEP );
1070            tr_peerIoWriteUint8 ( io, out, msgs->ut_metadata_id );
1071            tr_peerIoWriteBytes ( io, out, payload, payloadLen );
1072            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1073            dbgOutMessageLen( msgs );
1074
1075            tr_free( payload );
1076        }
1077    }
1078
1079    tr_free( tmp );
1080}
1081
1082static void
1083parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1084{
1085    int loaded = 0;
1086    uint8_t * tmp = tr_new( uint8_t, msglen );
1087    tr_benc val;
1088    tr_torrent * tor = msgs->torrent;
1089    const uint8_t * added;
1090    size_t added_len;
1091
1092    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1093
1094    if( tr_torrentAllowsPex( tor )
1095      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1096    {
1097        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1098        {
1099            tr_pex * pex;
1100            size_t i, n;
1101            size_t added_f_len = 0;
1102            const uint8_t * added_f = NULL;
1103
1104            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1105            pex = tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len, &n );
1106
1107            n = MIN( n, MAX_PEX_PEER_COUNT );
1108            for( i=0; i<n; ++i )
1109                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex + i );
1110
1111            tr_free( pex );
1112        }
1113
1114        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1115        {
1116            tr_pex * pex;
1117            size_t i, n;
1118            size_t added_f_len = 0;
1119            const uint8_t * added_f = NULL;
1120
1121            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1122            pex = tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len, &n );
1123
1124            n = MIN( n, MAX_PEX_PEER_COUNT );
1125            for( i=0; i<n; ++i )
1126                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex + i );
1127
1128            tr_free( pex );
1129        }
1130    }
1131
1132    if( loaded )
1133        tr_bencFree( &val );
1134    tr_free( tmp );
1135}
1136
1137static void sendPex( tr_peermsgs * msgs );
1138
1139static void
1140parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer  * inbuf )
1141{
1142    uint8_t ltep_msgid;
1143
1144    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1145    msglen--;
1146
1147    if( ltep_msgid == LTEP_HANDSHAKE )
1148    {
1149        dbgmsg( msgs, "got ltep handshake" );
1150        parseLtepHandshake( msgs, msglen, inbuf );
1151        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1152        {
1153            sendLtepHandshake( msgs );
1154            sendPex( msgs );
1155        }
1156    }
1157    else if( ltep_msgid == UT_PEX_ID )
1158    {
1159        dbgmsg( msgs, "got ut pex" );
1160        msgs->peerSupportsPex = 1;
1161        parseUtPex( msgs, msglen, inbuf );
1162    }
1163    else if( ltep_msgid == UT_METADATA_ID )
1164    {
1165        dbgmsg( msgs, "got ut metadata" );
1166        msgs->peerSupportsMetadataXfer = 1;
1167        parseUtMetadata( msgs, msglen, inbuf );
1168    }
1169    else
1170    {
1171        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1172        evbuffer_drain( inbuf, msglen );
1173    }
1174}
1175
1176static int
1177readBtLength( tr_peermsgs *     msgs,
1178              struct evbuffer * inbuf,
1179              size_t            inlen )
1180{
1181    uint32_t len;
1182
1183    if( inlen < sizeof( len ) )
1184        return READ_LATER;
1185
1186    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1187
1188    if( len == 0 ) /* peer sent us a keepalive message */
1189        dbgmsg( msgs, "got KeepAlive" );
1190    else
1191    {
1192        msgs->incoming.length = len;
1193        msgs->state = AWAITING_BT_ID;
1194    }
1195
1196    return READ_NOW;
1197}
1198
1199static int readBtMessage( tr_peermsgs     * msgs,
1200                          struct evbuffer * inbuf,
1201                          size_t            inlen );
1202
1203static int
1204readBtId( tr_peermsgs * msgs, struct evbuffer  * inbuf, size_t inlen )
1205{
1206    uint8_t id;
1207
1208    if( inlen < sizeof( uint8_t ) )
1209        return READ_LATER;
1210
1211    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1212    msgs->incoming.id = id;
1213    dbgmsg( msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length );
1214
1215    if( id == BT_PIECE )
1216    {
1217        msgs->state = AWAITING_BT_PIECE;
1218        return READ_NOW;
1219    }
1220    else if( msgs->incoming.length != 1 )
1221    {
1222        msgs->state = AWAITING_BT_MESSAGE;
1223        return READ_NOW;
1224    }
1225    else return readBtMessage( msgs, inbuf, inlen - 1 );
1226}
1227
1228static void
1229updatePeerProgress( tr_peermsgs * msgs )
1230{
1231    msgs->peer->progress = tr_bitsetPercent( &msgs->peer->have );
1232    dbgmsg( msgs, "peer progress is %f", msgs->peer->progress );
1233    updateFastSet( msgs );
1234    updateInterest( msgs );
1235    firePeerProgress( msgs );
1236}
1237
1238static void
1239peerMadeRequest( tr_peermsgs *               msgs,
1240                 const struct peer_request * req )
1241{
1242    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1243    const int reqIsValid = requestIsValid( msgs, req );
1244    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( &msgs->torrent->completion, req->index );
1245    const int peerIsChoked = msgs->peer->peerIsChoked;
1246
1247    int allow = FALSE;
1248
1249    if( !reqIsValid )
1250        dbgmsg( msgs, "rejecting an invalid request." );
1251    else if( !clientHasPiece )
1252        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1253    else if( peerIsChoked )
1254        dbgmsg( msgs, "rejecting request from choked peer" );
1255    else if( msgs->peerAskedForCount + 1 >= REQQ )
1256        dbgmsg( msgs, "rejecting request ... reqq is full" );
1257    else
1258        allow = TRUE;
1259
1260    if( allow )
1261        msgs->peerAskedFor[msgs->peerAskedForCount++] = *req;
1262    else if( fext )
1263        protocolSendReject( msgs, req );
1264}
1265
1266static tr_bool
1267messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1268{
1269    switch( id )
1270    {
1271        case BT_CHOKE:
1272        case BT_UNCHOKE:
1273        case BT_INTERESTED:
1274        case BT_NOT_INTERESTED:
1275        case BT_FEXT_HAVE_ALL:
1276        case BT_FEXT_HAVE_NONE:
1277            return len == 1;
1278
1279        case BT_HAVE:
1280        case BT_FEXT_SUGGEST:
1281        case BT_FEXT_ALLOWED_FAST:
1282            return len == 5;
1283
1284        case BT_BITFIELD:
1285            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1286
1287        case BT_REQUEST:
1288        case BT_CANCEL:
1289        case BT_FEXT_REJECT:
1290            return len == 13;
1291
1292        case BT_PIECE:
1293            return len > 9 && len <= 16393;
1294
1295        case BT_PORT:
1296            return len == 3;
1297
1298        case BT_LTEP:
1299            return len >= 2;
1300
1301        default:
1302            return FALSE;
1303    }
1304}
1305
1306static int clientGotBlock( tr_peermsgs *               msgs,
1307                           const uint8_t *             block,
1308                           const struct peer_request * req );
1309
1310static int
1311readBtPiece( tr_peermsgs      * msgs,
1312             struct evbuffer  * inbuf,
1313             size_t             inlen,
1314             size_t           * setme_piece_bytes_read )
1315{
1316    struct peer_request * req = &msgs->incoming.blockReq;
1317
1318    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1319    dbgmsg( msgs, "In readBtPiece" );
1320
1321    if( !req->length )
1322    {
1323        if( inlen < 8 )
1324            return READ_LATER;
1325
1326        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1327        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1328        req->length = msgs->incoming.length - 9;
1329        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1330        return READ_NOW;
1331    }
1332    else
1333    {
1334        int err;
1335
1336        /* read in another chunk of data */
1337        const size_t nLeft = req->length - EVBUFFER_LENGTH( msgs->incoming.block );
1338        size_t n = MIN( nLeft, inlen );
1339        size_t i = n;
1340
1341        while( i > 0 )
1342        {
1343            uint8_t buf[MAX_STACK_ARRAY_SIZE];
1344            const size_t thisPass = MIN( i, sizeof( buf ) );
1345            tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, thisPass );
1346            evbuffer_add( msgs->incoming.block, buf, thisPass );
1347            i -= thisPass;
1348        }
1349
1350        fireClientGotData( msgs, n, TRUE );
1351        *setme_piece_bytes_read += n;
1352        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1353               n, req->index, req->offset, req->length,
1354               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1355        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1356            return READ_LATER;
1357
1358        /* we've got the whole block ... process it */
1359        err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), req );
1360
1361        /* cleanup */
1362        evbuffer_free( msgs->incoming.block );
1363        msgs->incoming.block = evbuffer_new( );
1364        req->length = 0;
1365        msgs->state = AWAITING_BT_LENGTH;
1366        return err ? READ_ERR : READ_NOW;
1367    }
1368}
1369
1370static void updateDesiredRequestCount( tr_peermsgs * msgs, uint64_t now );
1371
1372static void
1373decrementActiveRequestCount( tr_peermsgs * msgs )
1374{
1375    if( msgs->activeRequestCount > 0 )
1376        msgs->activeRequestCount--;
1377}
1378
1379static int
1380readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1381{
1382    uint32_t      ui32;
1383    uint32_t      msglen = msgs->incoming.length;
1384    const uint8_t id = msgs->incoming.id;
1385    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1386    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1387
1388    --msglen; /* id length */
1389
1390    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1391
1392    if( inlen < msglen )
1393        return READ_LATER;
1394
1395    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1396    {
1397        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1398        fireError( msgs, EMSGSIZE );
1399        return READ_ERR;
1400    }
1401
1402    switch( id )
1403    {
1404        case BT_CHOKE:
1405            dbgmsg( msgs, "got Choke" );
1406            msgs->peer->clientIsChoked = 1;
1407            if( !fext )
1408                fireGotChoke( msgs );
1409            break;
1410
1411        case BT_UNCHOKE:
1412            dbgmsg( msgs, "got Unchoke" );
1413            msgs->peer->clientIsChoked = 0;
1414            updateDesiredRequestCount( msgs, tr_date( ) );
1415            break;
1416
1417        case BT_INTERESTED:
1418            dbgmsg( msgs, "got Interested" );
1419            msgs->peer->peerIsInterested = 1;
1420            break;
1421
1422        case BT_NOT_INTERESTED:
1423            dbgmsg( msgs, "got Not Interested" );
1424            msgs->peer->peerIsInterested = 0;
1425            break;
1426
1427        case BT_HAVE:
1428            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1429            dbgmsg( msgs, "got Have: %u", ui32 );
1430            if( tr_bitsetAdd( &msgs->peer->have, ui32 ) ) {
1431                fireError( msgs, ERANGE );
1432                return READ_ERR;
1433            }
1434            updatePeerProgress( msgs );
1435            break;
1436
1437        case BT_BITFIELD: {
1438            const size_t bitCount = tr_torrentHasMetadata( msgs->torrent )
1439                                  ? msgs->torrent->info.pieceCount
1440                                  : msglen * 8;
1441            dbgmsg( msgs, "got a bitfield" );
1442            tr_bitsetReserve( &msgs->peer->have, bitCount );
1443            tr_peerIoReadBytes( msgs->peer->io, inbuf,
1444                                msgs->peer->have.bitfield.bits, msglen );
1445            updatePeerProgress( msgs );
1446            break;
1447        }
1448
1449        case BT_REQUEST:
1450        {
1451            struct peer_request r;
1452            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1453            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1454            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1455            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1456            peerMadeRequest( msgs, &r );
1457            break;
1458        }
1459
1460        case BT_CANCEL:
1461        {
1462            int i;
1463            struct peer_request r;
1464            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1465            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1466            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1467            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1468
1469            for( i=0; i<msgs->peerAskedForCount; ++i ) {
1470                const struct peer_request * req = msgs->peerAskedFor + i;
1471                if( ( req->index == r.index ) && ( req->offset == r.offset ) && ( req->length == r.length ) )
1472                    break;
1473            }
1474
1475            if( i < msgs->peerAskedForCount )
1476                tr_removeElementFromArray( msgs->peerAskedFor, i, sizeof( struct peer_request ),
1477                                           msgs->peerAskedForCount-- );
1478            break;
1479        }
1480
1481        case BT_PIECE:
1482            assert( 0 ); /* handled elsewhere! */
1483            break;
1484
1485        case BT_PORT:
1486            dbgmsg( msgs, "Got a BT_PORT" );
1487            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->dht_port );
1488            if( msgs->peer->dht_port > 0 )
1489                tr_dhtAddNode( getSession(msgs),
1490                               tr_peerAddress( msgs->peer ),
1491                               msgs->peer->dht_port, 0 );
1492            break;
1493
1494        case BT_FEXT_SUGGEST:
1495            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1496            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1497            if( fext )
1498                fireClientGotSuggest( msgs, ui32 );
1499            else {
1500                fireError( msgs, EMSGSIZE );
1501                return READ_ERR;
1502            }
1503            break;
1504
1505        case BT_FEXT_ALLOWED_FAST:
1506            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1507            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1508            if( fext )
1509                fireClientGotAllowedFast( msgs, ui32 );
1510            else {
1511                fireError( msgs, EMSGSIZE );
1512                return READ_ERR;
1513            }
1514            break;
1515
1516        case BT_FEXT_HAVE_ALL:
1517            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1518            if( fext ) {
1519                tr_bitsetSetHaveAll( &msgs->peer->have );
1520                updatePeerProgress( msgs );
1521            } else {
1522                fireError( msgs, EMSGSIZE );
1523                return READ_ERR;
1524            }
1525            break;
1526
1527        case BT_FEXT_HAVE_NONE:
1528            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1529            if( fext ) {
1530                tr_bitsetSetHaveNone( &msgs->peer->have );
1531                updatePeerProgress( msgs );
1532            } else {
1533                fireError( msgs, EMSGSIZE );
1534                return READ_ERR;
1535            }
1536            break;
1537
1538        case BT_FEXT_REJECT:
1539        {
1540            struct peer_request r;
1541            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1542            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1543            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1544            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1545            if( fext ) {
1546                decrementActiveRequestCount( msgs );
1547                fireGotRej( msgs, &r );
1548            } else {
1549                fireError( msgs, EMSGSIZE );
1550                return READ_ERR;
1551            }
1552            break;
1553        }
1554
1555        case BT_LTEP:
1556            dbgmsg( msgs, "Got a BT_LTEP" );
1557            parseLtep( msgs, msglen, inbuf );
1558            break;
1559
1560        default:
1561            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1562            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1563            break;
1564    }
1565
1566    assert( msglen + 1 == msgs->incoming.length );
1567    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1568
1569    msgs->state = AWAITING_BT_LENGTH;
1570    return READ_NOW;
1571}
1572
1573static TR_INLINE void
1574decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount )
1575{
1576    tr_torrent * tor = msgs->torrent;
1577
1578    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1579}
1580
1581static TR_INLINE void
1582clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1583{
1584    decrementDownloadedCount( msgs, req->length );
1585}
1586
1587static void
1588addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1589{
1590    if( !msgs->peer->blame )
1591         msgs->peer->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1592    tr_bitfieldAdd( msgs->peer->blame, index );
1593}
1594
1595/* returns 0 on success, or an errno on failure */
1596static int
1597clientGotBlock( tr_peermsgs *               msgs,
1598                const uint8_t *             data,
1599                const struct peer_request * req )
1600{
1601    int err;
1602    tr_torrent * tor = msgs->torrent;
1603    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1604
1605    assert( msgs );
1606    assert( req );
1607
1608    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1609        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1610                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1611        return EMSGSIZE;
1612    }
1613
1614    /* save the block */
1615    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1616
1617    if( !tr_peerMgrDidPeerRequest( msgs->torrent, msgs->peer, block ) ) {
1618        dbgmsg( msgs, "we didn't ask for this message..." );
1619        return 0;
1620    }
1621
1622    /**
1623    ***  Save the block
1624    **/
1625
1626    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1627        return err;
1628
1629    addPeerToBlamefield( msgs, req->index );
1630    decrementActiveRequestCount( msgs );
1631    fireGotBlock( msgs, req );
1632    return 0;
1633}
1634
1635static int peerPulse( void * vmsgs );
1636
1637static void
1638didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1639{
1640    tr_peermsgs * msgs = vmsgs;
1641    firePeerGotData( msgs, bytesWritten, wasPieceData );
1642
1643    if ( tr_isPeerIo( io ) && io->userData )
1644        peerPulse( msgs );
1645}
1646
1647static ReadState
1648canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1649{
1650    ReadState         ret;
1651    tr_peermsgs *     msgs = vmsgs;
1652    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1653    const size_t      inlen = EVBUFFER_LENGTH( in );
1654
1655    dbgmsg( msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state );
1656
1657    if( !inlen )
1658    {
1659        ret = READ_LATER;
1660    }
1661    else if( msgs->state == AWAITING_BT_PIECE )
1662    {
1663        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1664    }
1665    else switch( msgs->state )
1666    {
1667        case AWAITING_BT_LENGTH:
1668            ret = readBtLength ( msgs, in, inlen ); break;
1669
1670        case AWAITING_BT_ID:
1671            ret = readBtId     ( msgs, in, inlen ); break;
1672
1673        case AWAITING_BT_MESSAGE:
1674            ret = readBtMessage( msgs, in, inlen ); break;
1675
1676        default:
1677            ret = READ_ERR;
1678            assert( 0 );
1679    }
1680
1681    dbgmsg( msgs, "canRead: ret is %d", (int)ret );
1682
1683    /* log the raw data that was read */
1684    if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
1685        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1686
1687    return ret;
1688}
1689
1690/**
1691***
1692**/
1693
1694static void
1695updateDesiredRequestCount( tr_peermsgs * msgs, uint64_t now )
1696{
1697    const tr_torrent * const torrent = msgs->torrent;
1698
1699    if( tr_torrentIsSeed( msgs->torrent ) )
1700    {
1701        msgs->desiredRequestCount = 0;
1702    }
1703    else if( msgs->peer->clientIsChoked )
1704    {
1705        msgs->desiredRequestCount = 0;
1706    }
1707    else
1708    {
1709        int irate;
1710        int estimatedBlocksInPeriod;
1711        double rate;
1712        const int floor = 16;
1713        const int seconds = REQUEST_BUF_SECS;
1714
1715        /* Get the rate limit we should use.
1716         * FIXME: this needs to consider all the other peers as well... */
1717        rate = tr_peerGetPieceSpeed( msgs->peer, now, TR_PEER_TO_CLIENT );
1718        if( tr_torrentUsesSpeedLimit( torrent, TR_PEER_TO_CLIENT ) )
1719            rate = MIN( rate, tr_torrentGetSpeedLimit( torrent, TR_PEER_TO_CLIENT ) );
1720
1721        /* honor the session limits, if enabled */
1722        if( tr_torrentUsesSessionLimits( torrent ) )
1723            if( tr_sessionGetActiveSpeedLimit( torrent->session, TR_PEER_TO_CLIENT, &irate ) )
1724                rate = MIN( rate, irate );
1725
1726        /* use this desired rate to figure out how
1727         * many requests we should send to this peer */
1728        estimatedBlocksInPeriod = ( rate * seconds * 1024 ) / torrent->blockSize;
1729        msgs->desiredRequestCount = MAX( floor, estimatedBlocksInPeriod );
1730
1731        /* honor the peer's maximum request count, if specified */
1732        if( msgs->reqq > 0 )
1733            if( msgs->desiredRequestCount > msgs->reqq )
1734                msgs->desiredRequestCount = msgs->reqq;
1735    }
1736}
1737
1738static void
1739updateMetadataRequests( tr_peermsgs * msgs, time_t now )
1740{
1741    int piece;
1742
1743    if( msgs->peerSupportsMetadataXfer
1744        && tr_torrentGetNextMetadataRequest( msgs->torrent, now, &piece ) )
1745    {
1746        tr_benc tmp;
1747        int payloadLen;
1748        char * payload;
1749        tr_peerIo  * io  = msgs->peer->io;
1750        struct evbuffer * out = msgs->outMessages;
1751
1752        /* build the data message */
1753        tr_bencInitDict( &tmp, 3 );
1754        tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REQUEST );
1755        tr_bencDictAddInt( &tmp, "piece", piece );
1756        payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1757        tr_bencFree( &tmp );
1758
1759        dbgmsg( msgs, "requesting metadata piece #%d", piece );
1760
1761        /* write it out as a LTEP message to our outMessages buffer */
1762        tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + payloadLen );
1763        tr_peerIoWriteUint8 ( io, out, BT_LTEP );
1764        tr_peerIoWriteUint8 ( io, out, msgs->ut_metadata_id );
1765        tr_peerIoWriteBytes ( io, out, payload, payloadLen );
1766        pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1767        dbgOutMessageLen( msgs );
1768
1769        tr_free( payload );
1770    }
1771}
1772
1773static void
1774updateBlockRequests( tr_peermsgs * msgs )
1775{
1776    const int MIN_BATCH_SIZE = 4;
1777    const int numwant = msgs->desiredRequestCount - msgs->activeRequestCount;
1778
1779    /* make sure we have enough block requests queued up */
1780    if( numwant >= MIN_BATCH_SIZE )
1781    {
1782        int i;
1783        int n;
1784        tr_block_index_t * blocks = tr_new( tr_block_index_t, numwant );
1785
1786        tr_peerMgrGetNextRequests( msgs->torrent, msgs->peer, numwant, blocks, &n );
1787
1788        for( i=0; i<n; ++i )
1789        {
1790            struct peer_request req;
1791            blockToReq( msgs->torrent, blocks[i], &req );
1792            protocolSendRequest( msgs, &req );
1793        }
1794
1795        msgs->activeRequestCount += n;
1796
1797        tr_free( blocks );
1798    }
1799}
1800
1801static void
1802prefetchPieces( tr_peermsgs *msgs )
1803{
1804    int i;
1805    uint64_t next = 0;
1806
1807    /* Maintain at least 8 prefetched blocks per unchoked peer, but allow
1808       up to 4 extra blocks if that would cause sequential writes. */
1809    for( i=msgs->prefetchCount; i<msgs->peerAskedForCount; ++i )
1810    {
1811        const struct peer_request * req = msgs->peerAskedFor + i;
1812        const uint64_t begin = tr_pieceOffset( msgs->torrent, req->index, req->offset, 0 );
1813        const uint64_t end = begin + req->length;
1814        const tr_bool isSequential = next == begin;
1815
1816        if( ( i >= 12 ) || ( !isSequential && ( i >= 8 ) ) )
1817            break;
1818
1819        tr_ioPrefetch( msgs->torrent, req->index, req->offset, req->length );
1820        ++msgs->prefetchCount;
1821
1822        next = end;
1823    }
1824}
1825
1826static size_t
1827fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1828{
1829    int piece;
1830    size_t bytesWritten = 0;
1831    struct peer_request req;
1832    const tr_bool haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1833    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1834
1835    /**
1836    ***  Protocol messages
1837    **/
1838
1839    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1840    {
1841        dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1842        msgs->outMessagesBatchedAt = now;
1843    }
1844    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1845    {
1846        const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
1847        /* flush the protocol messages */
1848        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1849        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, FALSE );
1850        msgs->clientSentAnythingAt = now;
1851        msgs->outMessagesBatchedAt = 0;
1852        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1853        bytesWritten +=  len;
1854    }
1855
1856    /**
1857    ***  Metadata Pieces
1858    **/
1859
1860    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= METADATA_PIECE_SIZE )
1861        && popNextMetadataRequest( msgs, &piece ) )
1862    {
1863        char * data;
1864        int dataLen;
1865        tr_bool ok = FALSE;
1866
1867        data = tr_torrentGetMetadataPiece( msgs->torrent, piece, &dataLen );
1868        if( ( dataLen > 0 ) && ( data != NULL ) )
1869        {
1870            tr_benc tmp;
1871            int payloadLen;
1872            char * payload;
1873            tr_peerIo  * io  = msgs->peer->io;
1874            struct evbuffer * out = msgs->outMessages;
1875
1876            /* build the data message */
1877            tr_bencInitDict( &tmp, 3 );
1878            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_DATA );
1879            tr_bencDictAddInt( &tmp, "piece", piece );
1880            tr_bencDictAddInt( &tmp, "total_size", msgs->torrent->infoDictLength );
1881            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1882            tr_bencFree( &tmp );
1883
1884            /* write it out as a LTEP message to our outMessages buffer */
1885            tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + payloadLen + dataLen );
1886            tr_peerIoWriteUint8 ( io, out, BT_LTEP );
1887            tr_peerIoWriteUint8 ( io, out, msgs->ut_metadata_id );
1888            tr_peerIoWriteBytes ( io, out, payload, payloadLen );
1889            tr_peerIoWriteBytes ( io, out, data, dataLen );
1890            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1891            dbgOutMessageLen( msgs );
1892
1893            tr_free( payload );
1894            tr_free( data );
1895
1896            ok = TRUE;
1897        }
1898
1899        if( !ok ) /* send a rejection message */
1900        {
1901            tr_benc tmp;
1902            int payloadLen;
1903            char * payload;
1904            tr_peerIo  * io  = msgs->peer->io;
1905            struct evbuffer * out = msgs->outMessages;
1906
1907            /* build the rejection message */
1908            tr_bencInitDict( &tmp, 2 );
1909            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1910            tr_bencDictAddInt( &tmp, "piece", piece );
1911            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1912            tr_bencFree( &tmp );
1913
1914            /* write it out as a LTEP message to our outMessages buffer */
1915            tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + payloadLen );
1916            tr_peerIoWriteUint8 ( io, out, BT_LTEP );
1917            tr_peerIoWriteUint8 ( io, out, msgs->ut_metadata_id );
1918            tr_peerIoWriteBytes ( io, out, payload, payloadLen );
1919            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1920            dbgOutMessageLen( msgs );
1921
1922            tr_free( payload );
1923        }
1924    }
1925
1926    /**
1927    ***  Data Blocks
1928    **/
1929
1930    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= msgs->torrent->blockSize )
1931        && popNextRequest( msgs, &req ) )
1932    {
1933        --msgs->prefetchCount;
1934
1935        if( requestIsValid( msgs, &req )
1936            && tr_cpPieceIsComplete( &msgs->torrent->completion, req.index ) )
1937        {
1938            /* FIXME(libevent2) use evbuffer_reserve_space() + evbuffer_commit_space() */
1939            int err;
1940            const uint32_t msglen = 4 + 1 + 4 + 4 + req.length;
1941            struct evbuffer * out;
1942            tr_peerIo * io = msgs->peer->io;
1943
1944            out = evbuffer_new( );
1945            evbuffer_expand( out, msglen );
1946
1947            tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1948            tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1949            tr_peerIoWriteUint32( io, out, req.index );
1950            tr_peerIoWriteUint32( io, out, req.offset );
1951
1952            err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, EVBUFFER_DATA(out)+EVBUFFER_LENGTH(out) );
1953            if( err )
1954            {
1955                if( fext )
1956                    protocolSendReject( msgs, &req );
1957            }
1958            else
1959            {
1960                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1961                EVBUFFER_LENGTH(out) += req.length;
1962                assert( EVBUFFER_LENGTH( out ) == msglen );
1963                tr_peerIoWriteBuf( io, out, TRUE );
1964                bytesWritten += EVBUFFER_LENGTH( out );
1965                msgs->clientSentAnythingAt = now;
1966            }
1967
1968            evbuffer_free( out );
1969
1970            if( err )
1971            {
1972                bytesWritten = 0;
1973                msgs = NULL;
1974            }
1975        }
1976        else if( fext ) /* peer needs a reject message */
1977        {
1978            protocolSendReject( msgs, &req );
1979        }
1980
1981        if( msgs != NULL )
1982            prefetchPieces( msgs );
1983    }
1984
1985    /**
1986    ***  Keepalive
1987    **/
1988
1989    if( ( msgs != NULL )
1990        && ( msgs->clientSentAnythingAt != 0 )
1991        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1992    {
1993        dbgmsg( msgs, "sending a keepalive message" );
1994        tr_peerIoWriteUint32( msgs->peer->io, msgs->outMessages, 0 );
1995        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1996    }
1997
1998    return bytesWritten;
1999}
2000
2001static int
2002peerPulse( void * vmsgs )
2003{
2004    tr_peermsgs * msgs = vmsgs;
2005    const time_t  now = tr_time( );
2006
2007    if ( tr_isPeerIo( msgs->peer->io ) ) {
2008        updateDesiredRequestCount( msgs, now );
2009        updateBlockRequests( msgs );
2010        updateMetadataRequests( msgs, now );
2011    }
2012
2013    for( ;; )
2014        if( fillOutputBuffer( msgs, now ) < 1 )
2015            break;
2016
2017    return TRUE; /* loop forever */
2018}
2019
2020void
2021tr_peerMsgsPulse( tr_peermsgs * msgs )
2022{
2023    if( msgs != NULL )
2024        peerPulse( msgs );
2025}
2026
2027static void
2028gotError( tr_peerIo  * io UNUSED,
2029          short        what,
2030          void       * vmsgs )
2031{
2032    if( what & EVBUFFER_TIMEOUT )
2033        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
2034    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
2035        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
2036               what, errno, tr_strerror( errno ) );
2037    fireError( vmsgs, ENOTCONN );
2038}
2039
2040static void
2041sendBitfield( tr_peermsgs * msgs )
2042{
2043    struct evbuffer * out = msgs->outMessages;
2044    tr_bitfield *     field;
2045    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
2046    size_t            i;
2047    size_t            lazyCount = 0;
2048
2049    field = tr_bitfieldDup( tr_cpPieceBitfield( &msgs->torrent->completion ) );
2050
2051    if( tr_sessionIsLazyBitfieldEnabled( getSession( msgs ) ) )
2052    {
2053        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
2054            speed over a truly random sample -- let's limit the pool size to
2055            the first 1000 pieces so large torrents don't bog things down */
2056        size_t poolSize;
2057        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
2058        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
2059
2060        /* build the pool */
2061        for( i=poolSize=0; i<maxPoolSize; ++i )
2062            if( tr_bitfieldHas( field, i ) )
2063                pool[poolSize++] = i;
2064
2065        /* pull random piece indices from the pool */
2066        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
2067        {
2068            const int pos = tr_cryptoWeakRandInt( poolSize );
2069            const tr_piece_index_t piece = pool[pos];
2070            tr_bitfieldRem( field, piece );
2071            lazyPieces[lazyCount++] = piece;
2072            pool[pos] = pool[--poolSize];
2073        }
2074
2075        /* cleanup */
2076        tr_free( pool );
2077    }
2078
2079    tr_peerIoWriteUint32( msgs->peer->io, out,
2080                          sizeof( uint8_t ) + field->byteCount );
2081    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_BITFIELD );
2082    /* FIXME(libevent2): use evbuffer_add_reference() */
2083    tr_peerIoWriteBytes ( msgs->peer->io, out, field->bits, field->byteCount );
2084    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu",
2085            EVBUFFER_LENGTH( out ) );
2086    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
2087
2088    for( i = 0; i < lazyCount; ++i )
2089        protocolSendHave( msgs, lazyPieces[i] );
2090
2091    tr_bitfieldFree( field );
2092}
2093
2094static void
2095tellPeerWhatWeHave( tr_peermsgs * msgs )
2096{
2097    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
2098
2099    if( fext && ( tr_cpGetStatus( &msgs->torrent->completion ) == TR_SEED ) )
2100    {
2101        protocolSendHaveAll( msgs );
2102    }
2103    else if( fext && ( tr_cpHaveValid( &msgs->torrent->completion ) == 0 ) )
2104    {
2105        protocolSendHaveNone( msgs );
2106    }
2107    else
2108    {
2109        sendBitfield( msgs );
2110    }
2111}
2112
2113/**
2114***
2115**/
2116
2117/* some peers give us error messages if we send
2118   more than this many peers in a single pex message
2119   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
2120#define MAX_PEX_ADDED 50
2121#define MAX_PEX_DROPPED 50
2122
2123typedef struct
2124{
2125    tr_pex *  added;
2126    tr_pex *  dropped;
2127    tr_pex *  elements;
2128    int       addedCount;
2129    int       droppedCount;
2130    int       elementCount;
2131}
2132PexDiffs;
2133
2134static void
2135pexAddedCb( void * vpex,
2136            void * userData )
2137{
2138    PexDiffs * diffs = userData;
2139    tr_pex *   pex = vpex;
2140
2141    if( diffs->addedCount < MAX_PEX_ADDED )
2142    {
2143        diffs->added[diffs->addedCount++] = *pex;
2144        diffs->elements[diffs->elementCount++] = *pex;
2145    }
2146}
2147
2148static TR_INLINE void
2149pexDroppedCb( void * vpex,
2150              void * userData )
2151{
2152    PexDiffs * diffs = userData;
2153    tr_pex *   pex = vpex;
2154
2155    if( diffs->droppedCount < MAX_PEX_DROPPED )
2156    {
2157        diffs->dropped[diffs->droppedCount++] = *pex;
2158    }
2159}
2160
2161static TR_INLINE void
2162pexElementCb( void * vpex,
2163              void * userData )
2164{
2165    PexDiffs * diffs = userData;
2166    tr_pex * pex = vpex;
2167
2168    diffs->elements[diffs->elementCount++] = *pex;
2169}
2170
2171static void
2172sendPex( tr_peermsgs * msgs )
2173{
2174    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2175    {
2176        PexDiffs diffs;
2177        PexDiffs diffs6;
2178        tr_pex * newPex = NULL;
2179        tr_pex * newPex6 = NULL;
2180        const int newCount = tr_peerMgrGetPeers( msgs->torrent, &newPex, TR_AF_INET, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2181        const int newCount6 = tr_peerMgrGetPeers( msgs->torrent, &newPex6, TR_AF_INET6, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2182
2183        /* build the diffs */
2184        diffs.added = tr_new( tr_pex, newCount );
2185        diffs.addedCount = 0;
2186        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2187        diffs.droppedCount = 0;
2188        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2189        diffs.elementCount = 0;
2190        tr_set_compare( msgs->pex, msgs->pexCount,
2191                        newPex, newCount,
2192                        tr_pexCompare, sizeof( tr_pex ),
2193                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2194        diffs6.added = tr_new( tr_pex, newCount6 );
2195        diffs6.addedCount = 0;
2196        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
2197        diffs6.droppedCount = 0;
2198        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
2199        diffs6.elementCount = 0;
2200        tr_set_compare( msgs->pex6, msgs->pexCount6,
2201                        newPex6, newCount6,
2202                        tr_pexCompare, sizeof( tr_pex ),
2203                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
2204        dbgmsg(
2205            msgs,
2206            "pex: old peer count %d+%d, new peer count %d+%d, "
2207            "added %d+%d, removed %d+%d",
2208            msgs->pexCount, msgs->pexCount6, newCount, newCount6,
2209            diffs.addedCount, diffs6.addedCount,
2210            diffs.droppedCount, diffs6.droppedCount );
2211
2212        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2213            !diffs6.droppedCount )
2214        {
2215            tr_free( diffs.elements );
2216            tr_free( diffs6.elements );
2217        }
2218        else
2219        {
2220            int  i;
2221            tr_benc val;
2222            char * benc;
2223            int bencLen;
2224            uint8_t * tmp, *walk;
2225            tr_peerIo       * io  = msgs->peer->io;
2226            struct evbuffer * out = msgs->outMessages;
2227
2228            /* update peer */
2229            tr_free( msgs->pex );
2230            msgs->pex = diffs.elements;
2231            msgs->pexCount = diffs.elementCount;
2232            tr_free( msgs->pex6 );
2233            msgs->pex6 = diffs6.elements;
2234            msgs->pexCount6 = diffs6.elementCount;
2235
2236            /* build the pex payload */
2237            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
2238                                         * speed vs. likelihood? */
2239
2240            if( diffs.addedCount > 0)
2241            {
2242                /* "added" */
2243                tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2244                for( i = 0; i < diffs.addedCount; ++i ) {
2245                    memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2246                    memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2247                }
2248                assert( ( walk - tmp ) == diffs.addedCount * 6 );
2249                tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2250                tr_free( tmp );
2251
2252                /* "added.f" */
2253                tmp = walk = tr_new( uint8_t, diffs.addedCount );
2254                for( i = 0; i < diffs.addedCount; ++i )
2255                    *walk++ = diffs.added[i].flags;
2256                assert( ( walk - tmp ) == diffs.addedCount );
2257                tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2258                tr_free( tmp );
2259            }
2260
2261            if( diffs.droppedCount > 0 )
2262            {
2263                /* "dropped" */
2264                tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2265                for( i = 0; i < diffs.droppedCount; ++i ) {
2266                    memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2267                    memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2268                }
2269                assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2270                tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2271                tr_free( tmp );
2272            }
2273
2274            if( diffs6.addedCount > 0 )
2275            {
2276                /* "added6" */
2277                tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2278                for( i = 0; i < diffs6.addedCount; ++i ) {
2279                    memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2280                    walk += 16;
2281                    memcpy( walk, &diffs6.added[i].port, 2 );
2282                    walk += 2;
2283                }
2284                assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2285                tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2286                tr_free( tmp );
2287
2288                /* "added6.f" */
2289                tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2290                for( i = 0; i < diffs6.addedCount; ++i )
2291                    *walk++ = diffs6.added[i].flags;
2292                assert( ( walk - tmp ) == diffs6.addedCount );
2293                tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2294                tr_free( tmp );
2295            }
2296
2297            if( diffs6.droppedCount > 0 )
2298            {
2299                /* "dropped6" */
2300                tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2301                for( i = 0; i < diffs6.droppedCount; ++i ) {
2302                    memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2303                    walk += 16;
2304                    memcpy( walk, &diffs6.dropped[i].port, 2 );
2305                    walk += 2;
2306                }
2307                assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2308                tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2309                tr_free( tmp );
2310            }
2311
2312            /* write the pex message */
2313            benc = tr_bencToStr( &val, TR_FMT_BENC, &bencLen );
2314            tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + bencLen );
2315            tr_peerIoWriteUint8 ( io, out, BT_LTEP );
2316            tr_peerIoWriteUint8 ( io, out, msgs->ut_pex_id );
2317            tr_peerIoWriteBytes ( io, out, benc, bencLen );
2318            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2319            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
2320            dbgOutMessageLen( msgs );
2321
2322            tr_free( benc );
2323            tr_bencFree( &val );
2324        }
2325
2326        /* cleanup */
2327        tr_free( diffs.added );
2328        tr_free( diffs.dropped );
2329        tr_free( newPex );
2330        tr_free( diffs6.added );
2331        tr_free( diffs6.dropped );
2332        tr_free( newPex6 );
2333
2334        /*msgs->clientSentPexAt = tr_time( );*/
2335    }
2336}
2337
2338static void
2339pexPulse( int foo UNUSED, short bar UNUSED, void * vmsgs )
2340{
2341    struct tr_peermsgs * msgs = vmsgs;
2342
2343    sendPex( msgs );
2344
2345    tr_timerAdd( &msgs->pexTimer, PEX_INTERVAL_SECS, 0 );
2346}
2347
2348/**
2349***
2350**/
2351
2352tr_peermsgs*
2353tr_peerMsgsNew( struct tr_torrent * torrent,
2354                struct tr_peer    * peer,
2355                tr_delivery_func    func,
2356                void              * userData,
2357                tr_publisher_tag  * setme )
2358{
2359    tr_peermsgs * m;
2360
2361    assert( peer );
2362    assert( peer->io );
2363
2364    m = tr_new0( tr_peermsgs, 1 );
2365    m->publisher = TR_PUBLISHER_INIT;
2366    m->peer = peer;
2367    m->torrent = torrent;
2368    m->peer->clientIsChoked = 1;
2369    m->peer->peerIsChoked = 1;
2370    m->peer->clientIsInterested = 0;
2371    m->peer->peerIsInterested = 0;
2372    m->state = AWAITING_BT_LENGTH;
2373    m->outMessages = evbuffer_new( );
2374    m->outMessagesBatchedAt = 0;
2375    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2376    m->incoming.block = evbuffer_new( );
2377    m->peerAskedForCount = 0;
2378    evtimer_set( &m->pexTimer, pexPulse, m );
2379    tr_timerAdd( &m->pexTimer, PEX_INTERVAL_SECS, 0 );
2380    peer->msgs = m;
2381
2382    *setme = tr_publisherSubscribe( &m->publisher, func, userData );
2383
2384    if( tr_peerIoSupportsLTEP( peer->io ) )
2385        sendLtepHandshake( m );
2386
2387    if(tr_peerIoSupportsDHT(peer->io)) {
2388        /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
2389        const struct tr_address *addr = tr_peerIoGetAddress( peer->io, NULL );
2390        if( addr->type == TR_AF_INET || tr_globalIPv6() ) {
2391            protocolSendPort( m, tr_dhtPort( torrent->session ) );
2392        }
2393    }
2394
2395    tellPeerWhatWeHave( m );
2396
2397    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2398    updateDesiredRequestCount( m, tr_date( ) );
2399
2400    return m;
2401}
2402
2403void
2404tr_peerMsgsFree( tr_peermsgs* msgs )
2405{
2406    if( msgs )
2407    {
2408        evtimer_del( &msgs->pexTimer );
2409        tr_publisherDestruct( &msgs->publisher );
2410
2411        evbuffer_free( msgs->incoming.block );
2412        evbuffer_free( msgs->outMessages );
2413        tr_free( msgs->pex6 );
2414        tr_free( msgs->pex );
2415
2416        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2417        tr_free( msgs );
2418    }
2419}
2420
2421void
2422tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2423                        tr_publisher_tag tag )
2424{
2425    tr_publisherUnsubscribe( &peer->publisher, tag );
2426}
Note: See TracBrowser for help on using the repository browser.