source: trunk/libtransmission/peer-msgs.c @ 10022

Last change on this file since 10022 was 10022, checked in by charles, 12 years ago

(trunk libT) #2800 "crashing during operation" -- if a peer sends an out-of-bounds "have piece" message, drop the connection

  • Property svn:keywords set to Date Rev Author Id
File size: 71.0 KB
Line 
1/*
2 * This file Copyright (C) 2007-2010 Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 10022 2010-01-26 20:38:04Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdarg.h>
17#include <stdio.h>
18#include <stdlib.h>
19#include <string.h>
20
21#include <event.h>
22
23#include "transmission.h"
24#include "bencode.h"
25#include "completion.h"
26#include "crypto.h"
27#include "inout.h"
28#ifdef WIN32
29#include "net.h" /* for ECONN */
30#endif
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-msgs.h"
34#include "platform.h" /* MAX_STACK_ARRAY_SIZE */
35#include "session.h"
36#include "stats.h"
37#include "torrent.h"
38#include "torrent-magnet.h"
39#include "tr-dht.h"
40#include "utils.h"
41#include "version.h"
42
43/**
44***
45**/
46
47enum
48{
49    BT_CHOKE                = 0,
50    BT_UNCHOKE              = 1,
51    BT_INTERESTED           = 2,
52    BT_NOT_INTERESTED       = 3,
53    BT_HAVE                 = 4,
54    BT_BITFIELD             = 5,
55    BT_REQUEST              = 6,
56    BT_PIECE                = 7,
57    BT_CANCEL               = 8,
58    BT_PORT                 = 9,
59
60    BT_FEXT_SUGGEST         = 13,
61    BT_FEXT_HAVE_ALL        = 14,
62    BT_FEXT_HAVE_NONE       = 15,
63    BT_FEXT_REJECT          = 16,
64    BT_FEXT_ALLOWED_FAST    = 17,
65
66    BT_LTEP                 = 20,
67
68    LTEP_HANDSHAKE          = 0,
69
70    UT_PEX_ID               = 1,
71    UT_METADATA_ID          = 3,
72
73    MAX_PEX_PEER_COUNT      = 50,
74
75    MIN_CHOKE_PERIOD_SEC    = 10,
76
77    /* idle seconds before we send a keepalive */
78    KEEPALIVE_INTERVAL_SECS = 100,
79
80    PEX_INTERVAL_SECS       = 90, /* sec between sendPex() calls */
81
82    REQQ                    = 512,
83
84    METADATA_REQQ           = 64,
85
86    MAX_BLOCK_SIZE          = ( 1024 * 16 ),
87
88    /* used in lowering the outMessages queue period */
89    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
90    HIGH_PRIORITY_INTERVAL_SECS = 2,
91    LOW_PRIORITY_INTERVAL_SECS = 10,
92
93    /* number of pieces to remove from the bitfield when
94     * lazy bitfields are turned on */
95    LAZY_PIECE_COUNT = 26,
96
97    /* number of pieces we'll allow in our fast set */
98    MAX_FAST_SET_SIZE = 3,
99
100    /* defined in BEP #9 */
101    METADATA_MSG_TYPE_REQUEST = 0,
102    METADATA_MSG_TYPE_DATA = 1,
103    METADATA_MSG_TYPE_REJECT = 2
104};
105
106enum
107{
108    AWAITING_BT_LENGTH,
109    AWAITING_BT_ID,
110    AWAITING_BT_MESSAGE,
111    AWAITING_BT_PIECE
112};
113
114/**
115***
116**/
117
118struct peer_request
119{
120    uint32_t    index;
121    uint32_t    offset;
122    uint32_t    length;
123};
124
125static uint32_t
126getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
127{
128    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
129    const uint64_t blockPos = tor->blockSize * b;
130    assert( blockPos >= piecePos );
131    return (uint32_t)( blockPos - piecePos );
132}
133
134static void
135blockToReq( const tr_torrent     * tor,
136            tr_block_index_t       block,
137            struct peer_request  * setme )
138{
139    assert( setme != NULL );
140
141    setme->index = tr_torBlockPiece( tor, block );
142    setme->offset = getBlockOffsetInPiece( tor, block );
143    setme->length = tr_torBlockCountBytes( tor, block );
144}
145
146/**
147***
148**/
149
150/* this is raw, unchanged data from the peer regarding
151 * the current message that it's sending us. */
152struct tr_incoming
153{
154    uint8_t                id;
155    uint32_t               length; /* includes the +1 for id length */
156    struct peer_request    blockReq; /* metadata for incoming blocks */
157    struct evbuffer *      block; /* piece data for incoming blocks */
158};
159
160/**
161 * Low-level communication state information about a connected peer.
162 *
163 * This structure remembers the low-level protocol states that we're
164 * in with this peer, such as active requests, pex messages, and so on.
165 * Its fields are all private to peer-msgs.c.
166 *
167 * Data not directly involved with sending & receiving messages is
168 * stored in tr_peer, where it can be accessed by both peermsgs and
169 * the peer manager.
170 *
171 * @see struct peer_atom
172 * @see tr_peer
173 */
174struct tr_peermsgs
175{
176    tr_bool         peerSupportsPex;
177    tr_bool         peerSupportsMetadataXfer;
178    tr_bool         clientSentLtepHandshake;
179    tr_bool         peerSentLtepHandshake;
180
181    /*tr_bool         haveFastSet;*/
182
183    int             desiredRequestCount;
184
185    int             prefetchCount;
186
187    /* how long the outMessages batch should be allowed to grow before
188     * it's flushed -- some messages (like requests >:) should be sent
189     * very quickly; others aren't as urgent. */
190    int8_t          outMessagesBatchPeriod;
191
192    uint8_t         state;
193    uint8_t         ut_pex_id;
194    uint8_t         ut_metadata_id;
195    uint16_t        pexCount;
196    uint16_t        pexCount6;
197
198#if 0
199    size_t                 fastsetSize;
200    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
201#endif
202
203    tr_peer *              peer;
204
205    tr_torrent *           torrent;
206
207    tr_publisher           publisher;
208
209    struct evbuffer *      outMessages; /* all the non-piece messages */
210
211    struct peer_request    peerAskedFor[REQQ];
212
213    int                    peerAskedForMetadata[METADATA_REQQ];
214    int                    peerAskedForMetadataCount;
215
216    tr_pex               * pex;
217    tr_pex               * pex6;
218
219    /*time_t                 clientSentPexAt;*/
220    time_t                 clientSentAnythingAt;
221
222    /* when we started batching the outMessages */
223    time_t                outMessagesBatchedAt;
224
225    struct tr_incoming    incoming;
226
227    /* if the peer supports the Extension Protocol in BEP 10 and
228       supplied a reqq argument, it's stored here.  otherwise the
229       value is zero and should be ignored. */
230    int64_t               reqq;
231
232    struct event          pexTimer;
233};
234
235/**
236***
237**/
238
239#if 0
240static tr_bitfield*
241getHave( const struct tr_peermsgs * msgs )
242{
243    if( msgs->peer->have == NULL )
244        msgs->peer->have = tr_bitfieldNew( msgs->torrent->info.pieceCount );
245    return msgs->peer->have;
246}
247#endif
248
249static inline tr_session*
250getSession( struct tr_peermsgs * msgs )
251{
252    return msgs->torrent->session;
253}
254
255/**
256***
257**/
258
259static void
260myDebug( const char * file, int line,
261         const struct tr_peermsgs * msgs,
262         const char * fmt, ... )
263{
264    FILE * fp = tr_getLog( );
265
266    if( fp )
267    {
268        va_list           args;
269        char              timestr[64];
270        struct evbuffer * buf = evbuffer_new( );
271        char *            base = tr_basename( file );
272
273        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
274                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
275                             tr_torrentName( msgs->torrent ),
276                             tr_peerIoGetAddrStr( msgs->peer->io ),
277                             msgs->peer->client );
278        va_start( args, fmt );
279        evbuffer_add_vprintf( buf, fmt, args );
280        va_end( args );
281        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
282        /* FIXME(libevent2) tr_getLog() should return an fd, then use evbuffer_write() here */
283        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
284
285        tr_free( base );
286        evbuffer_free( buf );
287    }
288}
289
290#define dbgmsg( msgs, ... ) \
291    do { \
292        if( tr_deepLoggingIsActive( ) ) \
293            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
294    } while( 0 )
295
296/**
297***
298**/
299
300static void
301pokeBatchPeriod( tr_peermsgs * msgs,
302                 int           interval )
303{
304    if( msgs->outMessagesBatchPeriod > interval )
305    {
306        msgs->outMessagesBatchPeriod = interval;
307        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
308    }
309}
310
311static inline void
312dbgOutMessageLen( tr_peermsgs * msgs )
313{
314    dbgmsg( msgs, "outMessage size is now %zu", EVBUFFER_LENGTH( msgs->outMessages ) );
315}
316
317static void
318protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
319{
320    tr_peerIo       * io  = msgs->peer->io;
321    struct evbuffer * out = msgs->outMessages;
322
323    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
324
325    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
326    tr_peerIoWriteUint8 ( io, out, BT_FEXT_REJECT );
327    tr_peerIoWriteUint32( io, out, req->index );
328    tr_peerIoWriteUint32( io, out, req->offset );
329    tr_peerIoWriteUint32( io, out, req->length );
330
331    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
332    dbgOutMessageLen( msgs );
333}
334
335static void
336protocolSendRequest( tr_peermsgs               * msgs,
337                     const struct peer_request * req )
338{
339    tr_peerIo       * io  = msgs->peer->io;
340    struct evbuffer * out = msgs->outMessages;
341
342    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
343    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
344    tr_peerIoWriteUint32( io, out, req->index );
345    tr_peerIoWriteUint32( io, out, req->offset );
346    tr_peerIoWriteUint32( io, out, req->length );
347
348    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
349    dbgOutMessageLen( msgs );
350    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
351}
352
353static void
354protocolSendCancel( tr_peermsgs               * msgs,
355                    const struct peer_request * req )
356{
357    tr_peerIo       * io  = msgs->peer->io;
358    struct evbuffer * out = msgs->outMessages;
359
360    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
361    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
362    tr_peerIoWriteUint32( io, out, req->index );
363    tr_peerIoWriteUint32( io, out, req->offset );
364    tr_peerIoWriteUint32( io, out, req->length );
365
366    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
367    dbgOutMessageLen( msgs );
368    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
369}
370
371static void
372protocolSendPort(tr_peermsgs *msgs, uint16_t port)
373{
374    tr_peerIo       * io  = msgs->peer->io;
375    struct evbuffer * out = msgs->outMessages;
376
377    dbgmsg( msgs, "sending Port %u", port);
378    tr_peerIoWriteUint32( io, out, 3 );
379    tr_peerIoWriteUint8 ( io, out, BT_PORT );
380    tr_peerIoWriteUint16( io, out, port);
381}
382
383static void
384protocolSendHave( tr_peermsgs * msgs,
385                  uint32_t      index )
386{
387    tr_peerIo       * io  = msgs->peer->io;
388    struct evbuffer * out = msgs->outMessages;
389
390    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
391    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
392    tr_peerIoWriteUint32( io, out, index );
393
394    dbgmsg( msgs, "sending Have %u", index );
395    dbgOutMessageLen( msgs );
396    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
397}
398
399#if 0
400static void
401protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
402{
403    tr_peerIo       * io  = msgs->peer->io;
404    struct evbuffer * out = msgs->outMessages;
405
406    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
407
408    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
409    tr_peerIoWriteUint8 ( io, out, BT_FEXT_ALLOWED_FAST );
410    tr_peerIoWriteUint32( io, out, pieceIndex );
411
412    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
413    dbgOutMessageLen( msgs );
414}
415#endif
416
417static void
418protocolSendChoke( tr_peermsgs * msgs,
419                   int           choke )
420{
421    tr_peerIo       * io  = msgs->peer->io;
422    struct evbuffer * out = msgs->outMessages;
423
424    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
425    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
426
427    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
428    dbgOutMessageLen( msgs );
429    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
430}
431
432static void
433protocolSendHaveAll( tr_peermsgs * msgs )
434{
435    tr_peerIo       * io  = msgs->peer->io;
436    struct evbuffer * out = msgs->outMessages;
437
438    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
439
440    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
441    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_ALL );
442
443    dbgmsg( msgs, "sending HAVE_ALL..." );
444    dbgOutMessageLen( msgs );
445    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
446}
447
448static void
449protocolSendHaveNone( tr_peermsgs * msgs )
450{
451    tr_peerIo       * io  = msgs->peer->io;
452    struct evbuffer * out = msgs->outMessages;
453
454    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
455
456    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
457    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_NONE );
458
459    dbgmsg( msgs, "sending HAVE_NONE..." );
460    dbgOutMessageLen( msgs );
461    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
462}
463
464/**
465***  EVENTS
466**/
467
468static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0, 0, 0 };
469
470static void
471publish( tr_peermsgs * msgs, tr_peer_event * e )
472{
473    assert( msgs->peer );
474    assert( msgs->peer->msgs == msgs );
475
476    tr_publisherPublish( &msgs->publisher, msgs->peer, e );
477}
478
479static void
480fireError( tr_peermsgs * msgs, int err )
481{
482    tr_peer_event e = blankEvent;
483    e.eventType = TR_PEER_ERROR;
484    e.err = err;
485    publish( msgs, &e );
486}
487
488static void
489fireUploadOnly( tr_peermsgs * msgs, tr_bool uploadOnly )
490{
491    tr_peer_event e = blankEvent;
492    e.eventType = TR_PEER_UPLOAD_ONLY;
493    e.uploadOnly = uploadOnly;
494    publish( msgs, &e );
495}
496
497static void
498firePeerProgress( tr_peermsgs * msgs )
499{
500    tr_peer_event e = blankEvent;
501    e.eventType = TR_PEER_PEER_PROGRESS;
502    e.progress = msgs->peer->progress;
503    publish( msgs, &e );
504}
505
506static void
507fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
508{
509    tr_peer_event e = blankEvent;
510    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
511    e.pieceIndex = req->index;
512    e.offset = req->offset;
513    e.length = req->length;
514    publish( msgs, &e );
515}
516
517static void
518fireGotRej( tr_peermsgs * msgs, const struct peer_request * req )
519{
520    tr_peer_event e = blankEvent;
521    e.eventType = TR_PEER_CLIENT_GOT_REJ;
522    e.pieceIndex = req->index;
523    e.offset = req->offset;
524    e.length = req->length;
525    publish( msgs, &e );
526}
527
528static void
529fireGotChoke( tr_peermsgs * msgs )
530{
531    tr_peer_event e = blankEvent;
532    e.eventType = TR_PEER_CLIENT_GOT_CHOKE;
533    publish( msgs, &e );
534}
535
536static void
537fireClientGotData( tr_peermsgs * msgs,
538                   uint32_t      length,
539                   int           wasPieceData )
540{
541    tr_peer_event e = blankEvent;
542
543    e.length = length;
544    e.eventType = TR_PEER_CLIENT_GOT_DATA;
545    e.wasPieceData = wasPieceData;
546    publish( msgs, &e );
547}
548
549static void
550fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
551{
552    tr_peer_event e = blankEvent;
553    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
554    e.pieceIndex = pieceIndex;
555    publish( msgs, &e );
556}
557
558static void
559fireClientGotPort( tr_peermsgs * msgs, tr_port port )
560{
561    tr_peer_event e = blankEvent;
562    e.eventType = TR_PEER_CLIENT_GOT_PORT;
563    e.port = port;
564    publish( msgs, &e );
565}
566
567static void
568fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
569{
570    tr_peer_event e = blankEvent;
571    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
572    e.pieceIndex = pieceIndex;
573    publish( msgs, &e );
574}
575
576static void
577firePeerGotData( tr_peermsgs  * msgs,
578                 uint32_t       length,
579                 int            wasPieceData )
580{
581    tr_peer_event e = blankEvent;
582
583    e.length = length;
584    e.eventType = TR_PEER_PEER_GOT_DATA;
585    e.wasPieceData = wasPieceData;
586
587    publish( msgs, &e );
588}
589
590/**
591***  ALLOWED FAST SET
592***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
593**/
594
595size_t
596tr_generateAllowedSet( tr_piece_index_t * setmePieces,
597                       size_t             desiredSetSize,
598                       size_t             pieceCount,
599                       const uint8_t    * infohash,
600                       const tr_address * addr )
601{
602    size_t setSize = 0;
603
604    assert( setmePieces );
605    assert( desiredSetSize <= pieceCount );
606    assert( desiredSetSize );
607    assert( pieceCount );
608    assert( infohash );
609    assert( addr );
610
611    if( addr->type == TR_AF_INET )
612    {
613        uint8_t w[SHA_DIGEST_LENGTH + 4], *walk=w;
614        uint8_t x[SHA_DIGEST_LENGTH];
615
616        uint32_t ui32 = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
617        memcpy( w, &ui32, sizeof( uint32_t ) );
618        walk += sizeof( uint32_t );
619        memcpy( walk, infohash, SHA_DIGEST_LENGTH );                 /* (2) */
620        walk += SHA_DIGEST_LENGTH;
621        tr_sha1( x, w, walk-w, NULL );                               /* (3) */
622        assert( sizeof( w ) == walk-w );
623
624        while( setSize<desiredSetSize )
625        {
626            int i;
627            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
628            {
629                size_t k;
630                uint32_t j = i * 4;                                  /* (5) */
631                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
632                uint32_t index = y % pieceCount;                     /* (7) */
633
634                for( k=0; k<setSize; ++k )                           /* (8) */
635                    if( setmePieces[k] == index )
636                        break;
637
638                if( k == setSize )
639                    setmePieces[setSize++] = index;                  /* (9) */
640            }
641
642            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
643        }
644    }
645
646    return setSize;
647}
648
649static void
650updateFastSet( tr_peermsgs * msgs UNUSED )
651{
652#if 0
653    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
654    const int peerIsNeedy = msgs->peer->progress < 0.10;
655
656    if( fext && peerIsNeedy && !msgs->haveFastSet )
657    {
658        size_t i;
659        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
660        const tr_info * inf = &msgs->torrent->info;
661        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
662
663        /* build the fast set */
664        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
665        msgs->haveFastSet = 1;
666
667        /* send it to the peer */
668        for( i=0; i<msgs->fastsetSize; ++i )
669            protocolSendAllowedFast( msgs, msgs->fastset[i] );
670    }
671#endif
672}
673
674/**
675***  INTEREST
676**/
677
678static tr_bool
679isPieceInteresting( const tr_peermsgs * msgs,
680                    tr_piece_index_t    piece )
681{
682    const tr_torrent * torrent = msgs->torrent;
683
684    return ( !torrent->info.pieces[piece].dnd )                  /* we want it */
685          && ( !tr_cpPieceIsComplete( &torrent->completion, piece ) ) /* !have */
686          && ( tr_bitsetHas( &msgs->peer->have, piece ) );      /* peer has it */
687}
688
689/* "interested" means we'll ask for piece data if they unchoke us */
690static tr_bool
691isPeerInteresting( const tr_peermsgs * msgs )
692{
693    tr_piece_index_t    i;
694    const tr_torrent *  torrent;
695    const tr_bitfield * bitfield;
696    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
697
698    if( clientIsSeed )
699        return FALSE;
700
701    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
702        return FALSE;
703
704    torrent = msgs->torrent;
705    bitfield = tr_cpPieceBitfield( &torrent->completion );
706
707    for( i = 0; i < torrent->info.pieceCount; ++i )
708        if( isPieceInteresting( msgs, i ) )
709            return TRUE;
710
711    return FALSE;
712}
713
714static void
715sendInterest( tr_peermsgs * msgs,
716              int           weAreInterested )
717{
718    struct evbuffer * out = msgs->outMessages;
719
720    assert( msgs );
721    assert( weAreInterested == 0 || weAreInterested == 1 );
722
723    msgs->peer->clientIsInterested = weAreInterested;
724    dbgmsg( msgs, "Sending %s", weAreInterested ? "Interested" : "Not Interested" );
725    tr_peerIoWriteUint32( msgs->peer->io, out, sizeof( uint8_t ) );
726    tr_peerIoWriteUint8 ( msgs->peer->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
727
728    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
729    dbgOutMessageLen( msgs );
730}
731
732static void
733updateInterest( tr_peermsgs * msgs )
734{
735    const int i = isPeerInteresting( msgs );
736
737    if( i != msgs->peer->clientIsInterested )
738        sendInterest( msgs, i );
739}
740
741static tr_bool
742popNextMetadataRequest( tr_peermsgs * msgs, int * piece )
743{
744    if( msgs->peerAskedForMetadataCount == 0 )
745        return FALSE;
746
747    *piece = msgs->peerAskedForMetadata[0];
748
749    tr_removeElementFromArray( msgs->peerAskedForMetadata, 0, sizeof( int ),
750                               msgs->peerAskedForMetadataCount-- );
751
752    return TRUE;
753}
754
755static tr_bool
756popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
757{
758    if( msgs->peer->pendingReqsToClient == 0 )
759        return FALSE;
760
761    *setme = msgs->peerAskedFor[0];
762
763    tr_removeElementFromArray( msgs->peerAskedFor, 0, sizeof( struct peer_request ),
764                               msgs->peer->pendingReqsToClient-- );
765
766    return TRUE;
767}
768
769static void
770cancelAllRequestsToClient( tr_peermsgs * msgs )
771{
772    struct peer_request req;
773    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
774
775    while( popNextRequest( msgs, &req ))
776        if( mustSendCancel )
777            protocolSendReject( msgs, &req );
778}
779
780void
781tr_peerMsgsSetChoke( tr_peermsgs * msgs,
782                     int           choke )
783{
784    const time_t now = tr_time( );
785    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
786
787    assert( msgs );
788    assert( msgs->peer );
789    assert( choke == 0 || choke == 1 );
790
791    if( msgs->peer->chokeChangedAt > fibrillationTime )
792    {
793        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
794    }
795    else if( msgs->peer->peerIsChoked != choke )
796    {
797        msgs->peer->peerIsChoked = choke;
798        if( choke )
799            cancelAllRequestsToClient( msgs );
800        protocolSendChoke( msgs, choke );
801        msgs->peer->chokeChangedAt = now;
802    }
803}
804
805/**
806***
807**/
808
809void
810tr_peerMsgsHave( tr_peermsgs * msgs,
811                 uint32_t      index )
812{
813    protocolSendHave( msgs, index );
814
815    /* since we have more pieces now, we might not be interested in this peer */
816    updateInterest( msgs );
817}
818
819/**
820***
821**/
822
823static tr_bool
824reqIsValid( const tr_peermsgs * peer,
825            uint32_t            index,
826            uint32_t            offset,
827            uint32_t            length )
828{
829    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
830}
831
832static tr_bool
833requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
834{
835    return reqIsValid( msgs, req->index, req->offset, req->length );
836}
837
838void
839tr_peerMsgsCancel( tr_peermsgs * msgs, tr_block_index_t block )
840{
841    struct peer_request req;
842    blockToReq( msgs->torrent, block, &req );
843    protocolSendCancel( msgs, &req );
844}
845
846/**
847***
848**/
849
850static void
851sendLtepHandshake( tr_peermsgs * msgs )
852{
853    tr_benc val, *m;
854    char * buf;
855    int len;
856    tr_bool allow_pex;
857    tr_bool allow_metadata_xfer;
858    struct evbuffer * out = msgs->outMessages;
859    const unsigned char * ipv6 = tr_globalIPv6();
860
861    if( msgs->clientSentLtepHandshake )
862        return;
863
864    dbgmsg( msgs, "sending an ltep handshake" );
865    msgs->clientSentLtepHandshake = 1;
866
867    /* decide if we want to advertise metadata xfer support (BEP 9) */
868    if( tr_torrentIsPrivate( msgs->torrent ) )
869        allow_metadata_xfer = 0;
870    else
871        allow_metadata_xfer = 1;
872
873    /* decide if we want to advertise pex support */
874    if( !tr_torrentAllowsPex( msgs->torrent ) )
875        allow_pex = 0;
876    else if( msgs->peerSentLtepHandshake )
877        allow_pex = msgs->peerSupportsPex ? 1 : 0;
878    else
879        allow_pex = 1;
880
881    tr_bencInitDict( &val, 8 );
882    tr_bencDictAddInt( &val, "e", getSession(msgs)->encryptionMode != TR_CLEAR_PREFERRED );
883    if( ipv6 != NULL )
884        tr_bencDictAddRaw( &val, "ipv6", ipv6, 16 );
885    if( allow_metadata_xfer && tr_torrentHasMetadata( msgs->torrent )
886                            && ( msgs->torrent->infoDictLength > 0 ) )
887        tr_bencDictAddInt( &val, "metadata_size", msgs->torrent->infoDictLength );
888    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( getSession(msgs) ) );
889    tr_bencDictAddInt( &val, "reqq", REQQ );
890    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
891    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
892    m  = tr_bencDictAddDict( &val, "m", 2 );
893    if( allow_metadata_xfer )
894        tr_bencDictAddInt( m, "ut_metadata", UT_METADATA_ID );
895    if( allow_pex )
896        tr_bencDictAddInt( m, "ut_pex", UT_PEX_ID );
897
898    buf = tr_bencToStr( &val, TR_FMT_BENC, &len );
899
900    tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + len );
901    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
902    tr_peerIoWriteUint8 ( msgs->peer->io, out, LTEP_HANDSHAKE );
903    tr_peerIoWriteBytes ( msgs->peer->io, out, buf, len );
904    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
905    dbgOutMessageLen( msgs );
906
907    /* cleanup */
908    tr_bencFree( &val );
909    tr_free( buf );
910}
911
912static void
913parseLtepHandshake( tr_peermsgs *     msgs,
914                    int               len,
915                    struct evbuffer * inbuf )
916{
917    int64_t   i;
918    tr_benc   val, * sub;
919    uint8_t * tmp = tr_new( uint8_t, len );
920    const uint8_t *addr;
921    size_t addr_len;
922    tr_pex pex;
923
924    memset( &pex, 0, sizeof( tr_pex ) );
925
926    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
927    msgs->peerSentLtepHandshake = 1;
928
929    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
930    {
931        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
932        tr_free( tmp );
933        return;
934    }
935
936    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
937
938    /* does the peer prefer encrypted connections? */
939    if( tr_bencDictFindInt( &val, "e", &i ) ) {
940        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
941                                              : ENCRYPTION_PREFERENCE_NO;
942        if( i )
943            pex.flags |= ADDED_F_ENCRYPTION_FLAG;
944    }
945
946    /* check supported messages for utorrent pex */
947    msgs->peerSupportsPex = 0;
948    msgs->peerSupportsMetadataXfer = 0;
949
950    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
951        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
952            msgs->peerSupportsPex = i != 0;
953            msgs->ut_pex_id = (uint8_t) i;
954            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
955        }
956        if( tr_bencDictFindInt( sub, "ut_metadata", &i ) ) {
957            msgs->peerSupportsMetadataXfer = i != 0;
958            msgs->ut_metadata_id = (uint8_t) i;
959            dbgmsg( msgs, "msgs->ut_metadata_id is %d", (int)msgs->ut_metadata_id );
960        }
961    }
962
963    /* look for metainfo size (BEP 9) */
964    if( tr_bencDictFindInt( &val, "metadata_size", &i ) )
965        tr_torrentSetMetadataSizeHint( msgs->torrent, i );
966
967    /* look for upload_only (BEP 21) */
968    if( tr_bencDictFindInt( &val, "upload_only", &i ) ) {
969        fireUploadOnly( msgs, i!=0 );
970        if( i )
971            pex.flags |= ADDED_F_SEED_FLAG;
972    }
973
974    /* get peer's listening port */
975    if( tr_bencDictFindInt( &val, "p", &i ) ) {
976        pex.port = htons( (uint16_t)i );
977        fireClientGotPort( msgs, pex.port );
978        dbgmsg( msgs, "peer's port is now %d", (int)i );
979    }
980
981    if( tr_peerIoIsIncoming( msgs->peer->io )
982        && tr_bencDictFindRaw( &val, "ipv4", &addr, &addr_len )
983        && ( addr_len == 4 ) )
984    {
985        pex.addr.type = TR_AF_INET;
986        memcpy( &pex.addr.addr.addr4, addr, 4 );
987        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex );
988    }
989
990    if( tr_peerIoIsIncoming( msgs->peer->io )
991        && tr_bencDictFindRaw( &val, "ipv6", &addr, &addr_len )
992        && ( addr_len == 16 ) )
993    {
994        pex.addr.type = TR_AF_INET6;
995        memcpy( &pex.addr.addr.addr6, addr, 16 );
996        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex );
997    }
998
999    /* get peer's maximum request queue size */
1000    if( tr_bencDictFindInt( &val, "reqq", &i ) )
1001        msgs->reqq = i;
1002
1003    tr_bencFree( &val );
1004    tr_free( tmp );
1005}
1006
1007static void
1008parseUtMetadata( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1009{
1010    tr_benc dict;
1011    char * msg_end;
1012    char * benc_end;
1013    int64_t msg_type = -1;
1014    int64_t piece = -1;
1015    int64_t total_size = 0;
1016    uint8_t * tmp = tr_new( uint8_t, msglen );
1017
1018    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1019    msg_end = (char*)tmp + msglen;
1020
1021    if( !tr_bencLoad( tmp, msglen, &dict, &benc_end ) )
1022    {
1023        tr_bencDictFindInt( &dict, "msg_type", &msg_type );
1024        tr_bencDictFindInt( &dict, "piece", &piece );
1025        tr_bencDictFindInt( &dict, "total_size", &total_size );
1026        tr_bencFree( &dict );
1027    }
1028
1029    dbgmsg( msgs, "got ut_metadata msg: type %d, piece %d, total_size %d",
1030            (int)msg_type, (int)piece, (int)total_size );
1031
1032    if( msg_type == METADATA_MSG_TYPE_REJECT )
1033    {
1034        /* NOOP */
1035    }
1036
1037    if( ( msg_type == METADATA_MSG_TYPE_DATA )
1038        && ( !tr_torrentHasMetadata( msgs->torrent ) )
1039        && ( msg_end - benc_end <= METADATA_PIECE_SIZE )
1040        && ( piece * METADATA_PIECE_SIZE + (msg_end - benc_end) <= total_size ) )
1041    {
1042        const int pieceLen = msg_end - benc_end;
1043        tr_torrentSetMetadataPiece( msgs->torrent, piece, benc_end, pieceLen );
1044    }
1045
1046    if( msg_type == METADATA_MSG_TYPE_REQUEST )
1047    {
1048        if( ( piece >= 0 )
1049            && tr_torrentHasMetadata( msgs->torrent )
1050            && !tr_torrentIsPrivate( msgs->torrent )
1051            && ( msgs->peerAskedForMetadataCount < METADATA_REQQ ) )
1052        {
1053            msgs->peerAskedForMetadata[msgs->peerAskedForMetadataCount++] = piece;
1054        }
1055        else
1056        {
1057            tr_benc tmp;
1058            int payloadLen;
1059            char * payload;
1060            tr_peerIo  * io  = msgs->peer->io;
1061            struct evbuffer * out = msgs->outMessages;
1062
1063            /* build the rejection message */
1064            tr_bencInitDict( &tmp, 2 );
1065            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1066            tr_bencDictAddInt( &tmp, "piece", piece );
1067            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1068            tr_bencFree( &tmp );
1069
1070            /* write it out as a LTEP message to our outMessages buffer */
1071            tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + payloadLen );
1072            tr_peerIoWriteUint8 ( io, out, BT_LTEP );
1073            tr_peerIoWriteUint8 ( io, out, msgs->ut_metadata_id );
1074            tr_peerIoWriteBytes ( io, out, payload, payloadLen );
1075            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1076            dbgOutMessageLen( msgs );
1077
1078            tr_free( payload );
1079        }
1080    }
1081
1082    tr_free( tmp );
1083}
1084
1085static void
1086parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1087{
1088    int loaded = 0;
1089    uint8_t * tmp = tr_new( uint8_t, msglen );
1090    tr_benc val;
1091    tr_torrent * tor = msgs->torrent;
1092    const uint8_t * added;
1093    size_t added_len;
1094
1095    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1096
1097    if( tr_torrentAllowsPex( tor )
1098      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1099    {
1100        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1101        {
1102            tr_pex * pex;
1103            size_t i, n;
1104            size_t added_f_len = 0;
1105            const uint8_t * added_f = NULL;
1106
1107            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1108            pex = tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len, &n );
1109
1110            n = MIN( n, MAX_PEX_PEER_COUNT );
1111            for( i=0; i<n; ++i )
1112                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex + i );
1113
1114            tr_free( pex );
1115        }
1116
1117        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1118        {
1119            tr_pex * pex;
1120            size_t i, n;
1121            size_t added_f_len = 0;
1122            const uint8_t * added_f = NULL;
1123
1124            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1125            pex = tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len, &n );
1126
1127            n = MIN( n, MAX_PEX_PEER_COUNT );
1128            for( i=0; i<n; ++i )
1129                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex + i );
1130
1131            tr_free( pex );
1132        }
1133    }
1134
1135    if( loaded )
1136        tr_bencFree( &val );
1137    tr_free( tmp );
1138}
1139
1140static void sendPex( tr_peermsgs * msgs );
1141
1142static void
1143parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer  * inbuf )
1144{
1145    uint8_t ltep_msgid;
1146
1147    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1148    msglen--;
1149
1150    if( ltep_msgid == LTEP_HANDSHAKE )
1151    {
1152        dbgmsg( msgs, "got ltep handshake" );
1153        parseLtepHandshake( msgs, msglen, inbuf );
1154        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1155        {
1156            sendLtepHandshake( msgs );
1157            sendPex( msgs );
1158        }
1159    }
1160    else if( ltep_msgid == UT_PEX_ID )
1161    {
1162        dbgmsg( msgs, "got ut pex" );
1163        msgs->peerSupportsPex = 1;
1164        parseUtPex( msgs, msglen, inbuf );
1165    }
1166    else if( ltep_msgid == UT_METADATA_ID )
1167    {
1168        dbgmsg( msgs, "got ut metadata" );
1169        msgs->peerSupportsMetadataXfer = 1;
1170        parseUtMetadata( msgs, msglen, inbuf );
1171    }
1172    else
1173    {
1174        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1175        evbuffer_drain( inbuf, msglen );
1176    }
1177}
1178
1179static int
1180readBtLength( tr_peermsgs *     msgs,
1181              struct evbuffer * inbuf,
1182              size_t            inlen )
1183{
1184    uint32_t len;
1185
1186    if( inlen < sizeof( len ) )
1187        return READ_LATER;
1188
1189    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1190
1191    if( len == 0 ) /* peer sent us a keepalive message */
1192        dbgmsg( msgs, "got KeepAlive" );
1193    else
1194    {
1195        msgs->incoming.length = len;
1196        msgs->state = AWAITING_BT_ID;
1197    }
1198
1199    return READ_NOW;
1200}
1201
1202static int readBtMessage( tr_peermsgs     * msgs,
1203                          struct evbuffer * inbuf,
1204                          size_t            inlen );
1205
1206static int
1207readBtId( tr_peermsgs * msgs, struct evbuffer  * inbuf, size_t inlen )
1208{
1209    uint8_t id;
1210
1211    if( inlen < sizeof( uint8_t ) )
1212        return READ_LATER;
1213
1214    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1215    msgs->incoming.id = id;
1216    dbgmsg( msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length );
1217
1218    if( id == BT_PIECE )
1219    {
1220        msgs->state = AWAITING_BT_PIECE;
1221        return READ_NOW;
1222    }
1223    else if( msgs->incoming.length != 1 )
1224    {
1225        msgs->state = AWAITING_BT_MESSAGE;
1226        return READ_NOW;
1227    }
1228    else return readBtMessage( msgs, inbuf, inlen - 1 );
1229}
1230
1231static void
1232updatePeerProgress( tr_peermsgs * msgs )
1233{
1234    msgs->peer->progress = tr_bitsetPercent( &msgs->peer->have );
1235    dbgmsg( msgs, "peer progress is %f", msgs->peer->progress );
1236    updateFastSet( msgs );
1237    updateInterest( msgs );
1238    firePeerProgress( msgs );
1239}
1240
1241static void
1242peerMadeRequest( tr_peermsgs *               msgs,
1243                 const struct peer_request * req )
1244{
1245    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1246    const int reqIsValid = requestIsValid( msgs, req );
1247    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( &msgs->torrent->completion, req->index );
1248    const int peerIsChoked = msgs->peer->peerIsChoked;
1249
1250    int allow = FALSE;
1251
1252    if( !reqIsValid )
1253        dbgmsg( msgs, "rejecting an invalid request." );
1254    else if( !clientHasPiece )
1255        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1256    else if( peerIsChoked )
1257        dbgmsg( msgs, "rejecting request from choked peer" );
1258    else if( msgs->peer->pendingReqsToClient + 1 >= REQQ )
1259        dbgmsg( msgs, "rejecting request ... reqq is full" );
1260    else
1261        allow = TRUE;
1262
1263    if( allow )
1264        msgs->peerAskedFor[msgs->peer->pendingReqsToClient++] = *req;
1265    else if( fext )
1266        protocolSendReject( msgs, req );
1267}
1268
1269static tr_bool
1270messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1271{
1272    switch( id )
1273    {
1274        case BT_CHOKE:
1275        case BT_UNCHOKE:
1276        case BT_INTERESTED:
1277        case BT_NOT_INTERESTED:
1278        case BT_FEXT_HAVE_ALL:
1279        case BT_FEXT_HAVE_NONE:
1280            return len == 1;
1281
1282        case BT_HAVE:
1283        case BT_FEXT_SUGGEST:
1284        case BT_FEXT_ALLOWED_FAST:
1285            return len == 5;
1286
1287        case BT_BITFIELD:
1288            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1289
1290        case BT_REQUEST:
1291        case BT_CANCEL:
1292        case BT_FEXT_REJECT:
1293            return len == 13;
1294
1295        case BT_PIECE:
1296            return len > 9 && len <= 16393;
1297
1298        case BT_PORT:
1299            return len == 3;
1300
1301        case BT_LTEP:
1302            return len >= 2;
1303
1304        default:
1305            return FALSE;
1306    }
1307}
1308
1309static int clientGotBlock( tr_peermsgs *               msgs,
1310                           const uint8_t *             block,
1311                           const struct peer_request * req );
1312
1313static int
1314readBtPiece( tr_peermsgs      * msgs,
1315             struct evbuffer  * inbuf,
1316             size_t             inlen,
1317             size_t           * setme_piece_bytes_read )
1318{
1319    struct peer_request * req = &msgs->incoming.blockReq;
1320
1321    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1322    dbgmsg( msgs, "In readBtPiece" );
1323
1324    if( !req->length )
1325    {
1326        if( inlen < 8 )
1327            return READ_LATER;
1328
1329        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1330        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1331        req->length = msgs->incoming.length - 9;
1332        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1333        return READ_NOW;
1334    }
1335    else
1336    {
1337        int err;
1338
1339        /* read in another chunk of data */
1340        const size_t nLeft = req->length - EVBUFFER_LENGTH( msgs->incoming.block );
1341        size_t n = MIN( nLeft, inlen );
1342        size_t i = n;
1343
1344        while( i > 0 )
1345        {
1346            uint8_t buf[MAX_STACK_ARRAY_SIZE];
1347            const size_t thisPass = MIN( i, sizeof( buf ) );
1348            tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, thisPass );
1349            evbuffer_add( msgs->incoming.block, buf, thisPass );
1350            i -= thisPass;
1351        }
1352
1353        fireClientGotData( msgs, n, TRUE );
1354        *setme_piece_bytes_read += n;
1355        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1356               n, req->index, req->offset, req->length,
1357               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1358        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1359            return READ_LATER;
1360
1361        /* we've got the whole block ... process it */
1362        err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), req );
1363
1364        /* cleanup */
1365        evbuffer_free( msgs->incoming.block );
1366        msgs->incoming.block = evbuffer_new( );
1367        req->length = 0;
1368        msgs->state = AWAITING_BT_LENGTH;
1369        return err ? READ_ERR : READ_NOW;
1370    }
1371}
1372
1373static void updateDesiredRequestCount( tr_peermsgs * msgs, uint64_t now );
1374
1375static int
1376readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1377{
1378    uint32_t      ui32;
1379    uint32_t      msglen = msgs->incoming.length;
1380    const uint8_t id = msgs->incoming.id;
1381#ifndef NDEBUG
1382    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1383#endif
1384    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1385
1386    --msglen; /* id length */
1387
1388    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1389
1390    if( inlen < msglen )
1391        return READ_LATER;
1392
1393    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1394    {
1395        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1396        fireError( msgs, EMSGSIZE );
1397        return READ_ERR;
1398    }
1399
1400    switch( id )
1401    {
1402        case BT_CHOKE:
1403            dbgmsg( msgs, "got Choke" );
1404            msgs->peer->clientIsChoked = 1;
1405            if( !fext )
1406                fireGotChoke( msgs );
1407            break;
1408
1409        case BT_UNCHOKE:
1410            dbgmsg( msgs, "got Unchoke" );
1411            msgs->peer->clientIsChoked = 0;
1412            updateDesiredRequestCount( msgs, tr_date( ) );
1413            break;
1414
1415        case BT_INTERESTED:
1416            dbgmsg( msgs, "got Interested" );
1417            msgs->peer->peerIsInterested = 1;
1418            break;
1419
1420        case BT_NOT_INTERESTED:
1421            dbgmsg( msgs, "got Not Interested" );
1422            msgs->peer->peerIsInterested = 0;
1423            break;
1424
1425        case BT_HAVE:
1426            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1427            dbgmsg( msgs, "got Have: %u", ui32 );
1428            if( tr_torrentHasMetadata( msgs->torrent )
1429                    && ( ui32 >= msgs->torrent->info.pieceCount ) )
1430            {
1431                fireError( msgs, ERANGE );
1432                return READ_ERR;
1433            }
1434            if( tr_bitsetAdd( &msgs->peer->have, ui32 ) )
1435            {
1436                fireError( msgs, ERANGE );
1437                return READ_ERR;
1438            }
1439            updatePeerProgress( msgs );
1440            break;
1441
1442        case BT_BITFIELD: {
1443            const size_t bitCount = tr_torrentHasMetadata( msgs->torrent )
1444                                  ? msgs->torrent->info.pieceCount
1445                                  : msglen * 8;
1446            dbgmsg( msgs, "got a bitfield" );
1447            tr_bitsetReserve( &msgs->peer->have, bitCount );
1448            tr_peerIoReadBytes( msgs->peer->io, inbuf,
1449                                msgs->peer->have.bitfield.bits, msglen );
1450            updatePeerProgress( msgs );
1451            break;
1452        }
1453
1454        case BT_REQUEST:
1455        {
1456            struct peer_request r;
1457            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1458            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1459            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1460            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1461            peerMadeRequest( msgs, &r );
1462            break;
1463        }
1464
1465        case BT_CANCEL:
1466        {
1467            int i;
1468            struct peer_request r;
1469            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1470            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1471            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1472            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1473
1474            for( i=0; i<msgs->peer->pendingReqsToClient; ++i ) {
1475                const struct peer_request * req = msgs->peerAskedFor + i;
1476                if( ( req->index == r.index ) && ( req->offset == r.offset ) && ( req->length == r.length ) )
1477                    break;
1478            }
1479
1480            if( i < msgs->peer->pendingReqsToClient )
1481                tr_removeElementFromArray( msgs->peerAskedFor, i, sizeof( struct peer_request ),
1482                                           msgs->peer->pendingReqsToClient-- );
1483            break;
1484        }
1485
1486        case BT_PIECE:
1487            assert( 0 ); /* handled elsewhere! */
1488            break;
1489
1490        case BT_PORT:
1491            dbgmsg( msgs, "Got a BT_PORT" );
1492            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->dht_port );
1493            if( msgs->peer->dht_port > 0 )
1494                tr_dhtAddNode( getSession(msgs),
1495                               tr_peerAddress( msgs->peer ),
1496                               msgs->peer->dht_port, 0 );
1497            break;
1498
1499        case BT_FEXT_SUGGEST:
1500            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1501            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1502            if( fext )
1503                fireClientGotSuggest( msgs, ui32 );
1504            else {
1505                fireError( msgs, EMSGSIZE );
1506                return READ_ERR;
1507            }
1508            break;
1509
1510        case BT_FEXT_ALLOWED_FAST:
1511            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1512            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1513            if( fext )
1514                fireClientGotAllowedFast( msgs, ui32 );
1515            else {
1516                fireError( msgs, EMSGSIZE );
1517                return READ_ERR;
1518            }
1519            break;
1520
1521        case BT_FEXT_HAVE_ALL:
1522            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1523            if( fext ) {
1524                tr_bitsetSetHaveAll( &msgs->peer->have );
1525                updatePeerProgress( msgs );
1526            } else {
1527                fireError( msgs, EMSGSIZE );
1528                return READ_ERR;
1529            }
1530            break;
1531
1532        case BT_FEXT_HAVE_NONE:
1533            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1534            if( fext ) {
1535                tr_bitsetSetHaveNone( &msgs->peer->have );
1536                updatePeerProgress( msgs );
1537            } else {
1538                fireError( msgs, EMSGSIZE );
1539                return READ_ERR;
1540            }
1541            break;
1542
1543        case BT_FEXT_REJECT:
1544        {
1545            struct peer_request r;
1546            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1547            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1548            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1549            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1550            if( fext )
1551                fireGotRej( msgs, &r );
1552            else {
1553                fireError( msgs, EMSGSIZE );
1554                return READ_ERR;
1555            }
1556            break;
1557        }
1558
1559        case BT_LTEP:
1560            dbgmsg( msgs, "Got a BT_LTEP" );
1561            parseLtep( msgs, msglen, inbuf );
1562            break;
1563
1564        default:
1565            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1566            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1567            break;
1568    }
1569
1570    assert( msglen + 1 == msgs->incoming.length );
1571    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1572
1573    msgs->state = AWAITING_BT_LENGTH;
1574    return READ_NOW;
1575}
1576
1577static inline void
1578decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount )
1579{
1580    tr_torrent * tor = msgs->torrent;
1581
1582    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1583}
1584
1585static inline void
1586clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1587{
1588    decrementDownloadedCount( msgs, req->length );
1589}
1590
1591static void
1592addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1593{
1594    if( !msgs->peer->blame )
1595         msgs->peer->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1596    tr_bitfieldAdd( msgs->peer->blame, index );
1597}
1598
1599/* returns 0 on success, or an errno on failure */
1600static int
1601clientGotBlock( tr_peermsgs *               msgs,
1602                const uint8_t *             data,
1603                const struct peer_request * req )
1604{
1605    int err;
1606    tr_torrent * tor = msgs->torrent;
1607    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1608
1609    assert( msgs );
1610    assert( req );
1611
1612    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1613        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1614                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1615        return EMSGSIZE;
1616    }
1617
1618    /* save the block */
1619    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1620
1621    if( !tr_peerMgrDidPeerRequest( msgs->torrent, msgs->peer, block ) ) {
1622        dbgmsg( msgs, "we didn't ask for this message..." );
1623        return 0;
1624    }
1625
1626    /**
1627    ***  Save the block
1628    **/
1629
1630    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1631        return err;
1632
1633    addPeerToBlamefield( msgs, req->index );
1634    fireGotBlock( msgs, req );
1635    return 0;
1636}
1637
1638static int peerPulse( void * vmsgs );
1639
1640static void
1641didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1642{
1643    tr_peermsgs * msgs = vmsgs;
1644    firePeerGotData( msgs, bytesWritten, wasPieceData );
1645
1646    if ( tr_isPeerIo( io ) && io->userData )
1647        peerPulse( msgs );
1648}
1649
1650static ReadState
1651canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1652{
1653    ReadState         ret;
1654    tr_peermsgs *     msgs = vmsgs;
1655    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1656    const size_t      inlen = EVBUFFER_LENGTH( in );
1657
1658    dbgmsg( msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state );
1659
1660    if( !inlen )
1661    {
1662        ret = READ_LATER;
1663    }
1664    else if( msgs->state == AWAITING_BT_PIECE )
1665    {
1666        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1667    }
1668    else switch( msgs->state )
1669    {
1670        case AWAITING_BT_LENGTH:
1671            ret = readBtLength ( msgs, in, inlen ); break;
1672
1673        case AWAITING_BT_ID:
1674            ret = readBtId     ( msgs, in, inlen ); break;
1675
1676        case AWAITING_BT_MESSAGE:
1677            ret = readBtMessage( msgs, in, inlen ); break;
1678
1679        default:
1680            ret = READ_ERR;
1681            assert( 0 );
1682    }
1683
1684    dbgmsg( msgs, "canRead: ret is %d", (int)ret );
1685
1686    /* log the raw data that was read */
1687    if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
1688        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1689
1690    return ret;
1691}
1692
1693/**
1694***
1695**/
1696
1697static void
1698updateDesiredRequestCount( tr_peermsgs * msgs, uint64_t now )
1699{
1700    const tr_torrent * const torrent = msgs->torrent;
1701
1702    if( tr_torrentIsSeed( msgs->torrent ) )
1703    {
1704        msgs->desiredRequestCount = 0;
1705    }
1706    else if( msgs->peer->clientIsChoked )
1707    {
1708        msgs->desiredRequestCount = 0;
1709    }
1710    else
1711    {
1712        int irate;
1713        int estimatedBlocksInPeriod;
1714        double rate;
1715        const int floor = 4;
1716        const int seconds = REQUEST_BUF_SECS;
1717
1718        /* Get the rate limit we should use.
1719         * FIXME: this needs to consider all the other peers as well... */
1720        rate = tr_peerGetPieceSpeed( msgs->peer, now, TR_PEER_TO_CLIENT );
1721        if( tr_torrentUsesSpeedLimit( torrent, TR_PEER_TO_CLIENT ) )
1722            rate = MIN( rate, tr_torrentGetSpeedLimit( torrent, TR_PEER_TO_CLIENT ) );
1723
1724        /* honor the session limits, if enabled */
1725        if( tr_torrentUsesSessionLimits( torrent ) )
1726            if( tr_sessionGetActiveSpeedLimit( torrent->session, TR_PEER_TO_CLIENT, &irate ) )
1727                rate = MIN( rate, irate );
1728
1729        /* use this desired rate to figure out how
1730         * many requests we should send to this peer */
1731        estimatedBlocksInPeriod = ( rate * seconds * 1024 ) / torrent->blockSize;
1732        msgs->desiredRequestCount = MAX( floor, estimatedBlocksInPeriod );
1733
1734        /* honor the peer's maximum request count, if specified */
1735        if( msgs->reqq > 0 )
1736            if( msgs->desiredRequestCount > msgs->reqq )
1737                msgs->desiredRequestCount = msgs->reqq;
1738    }
1739}
1740
1741static void
1742updateMetadataRequests( tr_peermsgs * msgs, time_t now )
1743{
1744    int piece;
1745
1746    if( msgs->peerSupportsMetadataXfer
1747        && tr_torrentGetNextMetadataRequest( msgs->torrent, now, &piece ) )
1748    {
1749        tr_benc tmp;
1750        int payloadLen;
1751        char * payload;
1752        tr_peerIo  * io  = msgs->peer->io;
1753        struct evbuffer * out = msgs->outMessages;
1754
1755        /* build the data message */
1756        tr_bencInitDict( &tmp, 3 );
1757        tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REQUEST );
1758        tr_bencDictAddInt( &tmp, "piece", piece );
1759        payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1760        tr_bencFree( &tmp );
1761
1762        dbgmsg( msgs, "requesting metadata piece #%d", piece );
1763
1764        /* write it out as a LTEP message to our outMessages buffer */
1765        tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + payloadLen );
1766        tr_peerIoWriteUint8 ( io, out, BT_LTEP );
1767        tr_peerIoWriteUint8 ( io, out, msgs->ut_metadata_id );
1768        tr_peerIoWriteBytes ( io, out, payload, payloadLen );
1769        pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1770        dbgOutMessageLen( msgs );
1771
1772        tr_free( payload );
1773    }
1774}
1775
1776static void
1777updateBlockRequests( tr_peermsgs * msgs )
1778{
1779    if( ( msgs->desiredRequestCount > 0 ) &&
1780        ( msgs->peer->pendingReqsToPeer <= ( msgs->desiredRequestCount * 0.66 ) ) )
1781    {
1782        int i;
1783        int n;
1784        const int numwant = msgs->desiredRequestCount - msgs->peer->pendingReqsToPeer;
1785        tr_block_index_t * blocks = tr_new( tr_block_index_t, numwant );
1786
1787        tr_peerMgrGetNextRequests( msgs->torrent, msgs->peer, numwant, blocks, &n );
1788
1789        for( i=0; i<n; ++i )
1790        {
1791            struct peer_request req;
1792            blockToReq( msgs->torrent, blocks[i], &req );
1793            protocolSendRequest( msgs, &req );
1794        }
1795
1796        tr_free( blocks );
1797    }
1798}
1799
1800static void
1801prefetchPieces( tr_peermsgs *msgs )
1802{
1803    int i;
1804
1805    /* Maintain 12 prefetched blocks per unchoked peer */
1806    for( i=msgs->prefetchCount; i<msgs->peer->pendingReqsToClient && i<12; ++i )
1807    {
1808        const struct peer_request * req = msgs->peerAskedFor + i;
1809        tr_ioPrefetch( msgs->torrent, req->index, req->offset, req->length );
1810        ++msgs->prefetchCount;
1811    }
1812}
1813
1814static size_t
1815fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1816{
1817    int piece;
1818    size_t bytesWritten = 0;
1819    struct peer_request req;
1820    const tr_bool haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1821    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1822
1823    /**
1824    ***  Protocol messages
1825    **/
1826
1827    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1828    {
1829        dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1830        msgs->outMessagesBatchedAt = now;
1831    }
1832    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1833    {
1834        const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
1835        /* flush the protocol messages */
1836        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1837        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, FALSE );
1838        msgs->clientSentAnythingAt = now;
1839        msgs->outMessagesBatchedAt = 0;
1840        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1841        bytesWritten +=  len;
1842    }
1843
1844    /**
1845    ***  Metadata Pieces
1846    **/
1847
1848    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= METADATA_PIECE_SIZE )
1849        && popNextMetadataRequest( msgs, &piece ) )
1850    {
1851        char * data;
1852        int dataLen;
1853        tr_bool ok = FALSE;
1854
1855        data = tr_torrentGetMetadataPiece( msgs->torrent, piece, &dataLen );
1856        if( ( dataLen > 0 ) && ( data != NULL ) )
1857        {
1858            tr_benc tmp;
1859            int payloadLen;
1860            char * payload;
1861            tr_peerIo  * io  = msgs->peer->io;
1862            struct evbuffer * out = msgs->outMessages;
1863
1864            /* build the data message */
1865            tr_bencInitDict( &tmp, 3 );
1866            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_DATA );
1867            tr_bencDictAddInt( &tmp, "piece", piece );
1868            tr_bencDictAddInt( &tmp, "total_size", msgs->torrent->infoDictLength );
1869            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1870            tr_bencFree( &tmp );
1871
1872            /* write it out as a LTEP message to our outMessages buffer */
1873            tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + payloadLen + dataLen );
1874            tr_peerIoWriteUint8 ( io, out, BT_LTEP );
1875            tr_peerIoWriteUint8 ( io, out, msgs->ut_metadata_id );
1876            tr_peerIoWriteBytes ( io, out, payload, payloadLen );
1877            tr_peerIoWriteBytes ( io, out, data, dataLen );
1878            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1879            dbgOutMessageLen( msgs );
1880
1881            tr_free( payload );
1882            tr_free( data );
1883
1884            ok = TRUE;
1885        }
1886
1887        if( !ok ) /* send a rejection message */
1888        {
1889            tr_benc tmp;
1890            int payloadLen;
1891            char * payload;
1892            tr_peerIo  * io  = msgs->peer->io;
1893            struct evbuffer * out = msgs->outMessages;
1894
1895            /* build the rejection message */
1896            tr_bencInitDict( &tmp, 2 );
1897            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1898            tr_bencDictAddInt( &tmp, "piece", piece );
1899            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1900            tr_bencFree( &tmp );
1901
1902            /* write it out as a LTEP message to our outMessages buffer */
1903            tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + payloadLen );
1904            tr_peerIoWriteUint8 ( io, out, BT_LTEP );
1905            tr_peerIoWriteUint8 ( io, out, msgs->ut_metadata_id );
1906            tr_peerIoWriteBytes ( io, out, payload, payloadLen );
1907            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1908            dbgOutMessageLen( msgs );
1909
1910            tr_free( payload );
1911        }
1912    }
1913
1914    /**
1915    ***  Data Blocks
1916    **/
1917
1918    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= msgs->torrent->blockSize )
1919        && popNextRequest( msgs, &req ) )
1920    {
1921        --msgs->prefetchCount;
1922
1923        if( requestIsValid( msgs, &req )
1924            && tr_cpPieceIsComplete( &msgs->torrent->completion, req.index ) )
1925        {
1926            /* FIXME(libevent2) use evbuffer_reserve_space() + evbuffer_commit_space() */
1927            int err;
1928            const uint32_t msglen = 4 + 1 + 4 + 4 + req.length;
1929            struct evbuffer * out;
1930            tr_peerIo * io = msgs->peer->io;
1931
1932            out = evbuffer_new( );
1933            evbuffer_expand( out, msglen );
1934
1935            tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1936            tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1937            tr_peerIoWriteUint32( io, out, req.index );
1938            tr_peerIoWriteUint32( io, out, req.offset );
1939
1940            err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, EVBUFFER_DATA(out)+EVBUFFER_LENGTH(out) );
1941            if( err )
1942            {
1943                if( fext )
1944                    protocolSendReject( msgs, &req );
1945            }
1946            else
1947            {
1948                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1949                EVBUFFER_LENGTH(out) += req.length;
1950                assert( EVBUFFER_LENGTH( out ) == msglen );
1951                tr_peerIoWriteBuf( io, out, TRUE );
1952                bytesWritten += EVBUFFER_LENGTH( out );
1953                msgs->clientSentAnythingAt = now;
1954            }
1955
1956            evbuffer_free( out );
1957
1958            if( err )
1959            {
1960                bytesWritten = 0;
1961                msgs = NULL;
1962            }
1963        }
1964        else if( fext ) /* peer needs a reject message */
1965        {
1966            protocolSendReject( msgs, &req );
1967        }
1968
1969        if( msgs != NULL )
1970            prefetchPieces( msgs );
1971    }
1972
1973    /**
1974    ***  Keepalive
1975    **/
1976
1977    if( ( msgs != NULL )
1978        && ( msgs->clientSentAnythingAt != 0 )
1979        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1980    {
1981        dbgmsg( msgs, "sending a keepalive message" );
1982        tr_peerIoWriteUint32( msgs->peer->io, msgs->outMessages, 0 );
1983        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1984    }
1985
1986    return bytesWritten;
1987}
1988
1989static int
1990peerPulse( void * vmsgs )
1991{
1992    tr_peermsgs * msgs = vmsgs;
1993    const time_t  now = tr_time( );
1994
1995    if ( tr_isPeerIo( msgs->peer->io ) ) {
1996        updateDesiredRequestCount( msgs, tr_date( ) );
1997        updateBlockRequests( msgs );
1998        updateMetadataRequests( msgs, now );
1999    }
2000
2001    for( ;; )
2002        if( fillOutputBuffer( msgs, now ) < 1 )
2003            break;
2004
2005    return TRUE; /* loop forever */
2006}
2007
2008void
2009tr_peerMsgsPulse( tr_peermsgs * msgs )
2010{
2011    if( msgs != NULL )
2012        peerPulse( msgs );
2013}
2014
2015static void
2016gotError( tr_peerIo  * io UNUSED,
2017          short        what,
2018          void       * vmsgs )
2019{
2020    if( what & EVBUFFER_TIMEOUT )
2021        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
2022    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
2023        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
2024               what, errno, tr_strerror( errno ) );
2025    fireError( vmsgs, ENOTCONN );
2026}
2027
2028static void
2029sendBitfield( tr_peermsgs * msgs )
2030{
2031    struct evbuffer * out = msgs->outMessages;
2032    tr_bitfield *     field;
2033    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
2034    size_t            i;
2035    size_t            lazyCount = 0;
2036
2037    field = tr_bitfieldDup( tr_cpPieceBitfield( &msgs->torrent->completion ) );
2038
2039    if( tr_sessionIsLazyBitfieldEnabled( getSession( msgs ) ) )
2040    {
2041        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
2042            speed over a truly random sample -- let's limit the pool size to
2043            the first 1000 pieces so large torrents don't bog things down */
2044        size_t poolSize;
2045        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
2046        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
2047
2048        /* build the pool */
2049        for( i=poolSize=0; i<maxPoolSize; ++i )
2050            if( tr_bitfieldHas( field, i ) )
2051                pool[poolSize++] = i;
2052
2053        /* pull random piece indices from the pool */
2054        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
2055        {
2056            const int pos = tr_cryptoWeakRandInt( poolSize );
2057            const tr_piece_index_t piece = pool[pos];
2058            tr_bitfieldRem( field, piece );
2059            lazyPieces[lazyCount++] = piece;
2060            pool[pos] = pool[--poolSize];
2061        }
2062
2063        /* cleanup */
2064        tr_free( pool );
2065    }
2066
2067    tr_peerIoWriteUint32( msgs->peer->io, out,
2068                          sizeof( uint8_t ) + field->byteCount );
2069    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_BITFIELD );
2070    /* FIXME(libevent2): use evbuffer_add_reference() */
2071    tr_peerIoWriteBytes ( msgs->peer->io, out, field->bits, field->byteCount );
2072    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu",
2073            EVBUFFER_LENGTH( out ) );
2074    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
2075
2076    for( i = 0; i < lazyCount; ++i )
2077        protocolSendHave( msgs, lazyPieces[i] );
2078
2079    tr_bitfieldFree( field );
2080}
2081
2082static void
2083tellPeerWhatWeHave( tr_peermsgs * msgs )
2084{
2085    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
2086
2087    if( fext && ( tr_cpGetStatus( &msgs->torrent->completion ) == TR_SEED ) )
2088    {
2089        protocolSendHaveAll( msgs );
2090    }
2091    else if( fext && ( tr_cpHaveValid( &msgs->torrent->completion ) == 0 ) )
2092    {
2093        protocolSendHaveNone( msgs );
2094    }
2095    else
2096    {
2097        sendBitfield( msgs );
2098    }
2099}
2100
2101/**
2102***
2103**/
2104
2105/* some peers give us error messages if we send
2106   more than this many peers in a single pex message
2107   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
2108#define MAX_PEX_ADDED 50
2109#define MAX_PEX_DROPPED 50
2110
2111typedef struct
2112{
2113    tr_pex *  added;
2114    tr_pex *  dropped;
2115    tr_pex *  elements;
2116    int       addedCount;
2117    int       droppedCount;
2118    int       elementCount;
2119}
2120PexDiffs;
2121
2122static void
2123pexAddedCb( void * vpex,
2124            void * userData )
2125{
2126    PexDiffs * diffs = userData;
2127    tr_pex *   pex = vpex;
2128
2129    if( diffs->addedCount < MAX_PEX_ADDED )
2130    {
2131        diffs->added[diffs->addedCount++] = *pex;
2132        diffs->elements[diffs->elementCount++] = *pex;
2133    }
2134}
2135
2136static inline void
2137pexDroppedCb( void * vpex,
2138              void * userData )
2139{
2140    PexDiffs * diffs = userData;
2141    tr_pex *   pex = vpex;
2142
2143    if( diffs->droppedCount < MAX_PEX_DROPPED )
2144    {
2145        diffs->dropped[diffs->droppedCount++] = *pex;
2146    }
2147}
2148
2149static inline void
2150pexElementCb( void * vpex,
2151              void * userData )
2152{
2153    PexDiffs * diffs = userData;
2154    tr_pex * pex = vpex;
2155
2156    diffs->elements[diffs->elementCount++] = *pex;
2157}
2158
2159static void
2160sendPex( tr_peermsgs * msgs )
2161{
2162    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2163    {
2164        PexDiffs diffs;
2165        PexDiffs diffs6;
2166        tr_pex * newPex = NULL;
2167        tr_pex * newPex6 = NULL;
2168        const int newCount = tr_peerMgrGetPeers( msgs->torrent, &newPex, TR_AF_INET, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2169        const int newCount6 = tr_peerMgrGetPeers( msgs->torrent, &newPex6, TR_AF_INET6, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2170
2171        /* build the diffs */
2172        diffs.added = tr_new( tr_pex, newCount );
2173        diffs.addedCount = 0;
2174        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2175        diffs.droppedCount = 0;
2176        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2177        diffs.elementCount = 0;
2178        tr_set_compare( msgs->pex, msgs->pexCount,
2179                        newPex, newCount,
2180                        tr_pexCompare, sizeof( tr_pex ),
2181                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2182        diffs6.added = tr_new( tr_pex, newCount6 );
2183        diffs6.addedCount = 0;
2184        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
2185        diffs6.droppedCount = 0;
2186        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
2187        diffs6.elementCount = 0;
2188        tr_set_compare( msgs->pex6, msgs->pexCount6,
2189                        newPex6, newCount6,
2190                        tr_pexCompare, sizeof( tr_pex ),
2191                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
2192        dbgmsg(
2193            msgs,
2194            "pex: old peer count %d+%d, new peer count %d+%d, "
2195            "added %d+%d, removed %d+%d",
2196            msgs->pexCount, msgs->pexCount6, newCount, newCount6,
2197            diffs.addedCount, diffs6.addedCount,
2198            diffs.droppedCount, diffs6.droppedCount );
2199
2200        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2201            !diffs6.droppedCount )
2202        {
2203            tr_free( diffs.elements );
2204            tr_free( diffs6.elements );
2205        }
2206        else
2207        {
2208            int  i;
2209            tr_benc val;
2210            char * benc;
2211            int bencLen;
2212            uint8_t * tmp, *walk;
2213            tr_peerIo       * io  = msgs->peer->io;
2214            struct evbuffer * out = msgs->outMessages;
2215
2216            /* update peer */
2217            tr_free( msgs->pex );
2218            msgs->pex = diffs.elements;
2219            msgs->pexCount = diffs.elementCount;
2220            tr_free( msgs->pex6 );
2221            msgs->pex6 = diffs6.elements;
2222            msgs->pexCount6 = diffs6.elementCount;
2223
2224            /* build the pex payload */
2225            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
2226                                         * speed vs. likelihood? */
2227
2228            if( diffs.addedCount > 0)
2229            {
2230                /* "added" */
2231                tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2232                for( i = 0; i < diffs.addedCount; ++i ) {
2233                    memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2234                    memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2235                }
2236                assert( ( walk - tmp ) == diffs.addedCount * 6 );
2237                tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2238                tr_free( tmp );
2239
2240                /* "added.f" */
2241                tmp = walk = tr_new( uint8_t, diffs.addedCount );
2242                for( i = 0; i < diffs.addedCount; ++i )
2243                    *walk++ = diffs.added[i].flags;
2244                assert( ( walk - tmp ) == diffs.addedCount );
2245                tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2246                tr_free( tmp );
2247            }
2248
2249            if( diffs.droppedCount > 0 )
2250            {
2251                /* "dropped" */
2252                tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2253                for( i = 0; i < diffs.droppedCount; ++i ) {
2254                    memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2255                    memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2256                }
2257                assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2258                tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2259                tr_free( tmp );
2260            }
2261
2262            if( diffs6.addedCount > 0 )
2263            {
2264                /* "added6" */
2265                tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2266                for( i = 0; i < diffs6.addedCount; ++i ) {
2267                    memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2268                    walk += 16;
2269                    memcpy( walk, &diffs6.added[i].port, 2 );
2270                    walk += 2;
2271                }
2272                assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2273                tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2274                tr_free( tmp );
2275
2276                /* "added6.f" */
2277                tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2278                for( i = 0; i < diffs6.addedCount; ++i )
2279                    *walk++ = diffs6.added[i].flags;
2280                assert( ( walk - tmp ) == diffs6.addedCount );
2281                tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2282                tr_free( tmp );
2283            }
2284
2285            if( diffs6.droppedCount > 0 )
2286            {
2287                /* "dropped6" */
2288                tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2289                for( i = 0; i < diffs6.droppedCount; ++i ) {
2290                    memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2291                    walk += 16;
2292                    memcpy( walk, &diffs6.dropped[i].port, 2 );
2293                    walk += 2;
2294                }
2295                assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2296                tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2297                tr_free( tmp );
2298            }
2299
2300            /* write the pex message */
2301            benc = tr_bencToStr( &val, TR_FMT_BENC, &bencLen );
2302            tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + bencLen );
2303            tr_peerIoWriteUint8 ( io, out, BT_LTEP );
2304            tr_peerIoWriteUint8 ( io, out, msgs->ut_pex_id );
2305            tr_peerIoWriteBytes ( io, out, benc, bencLen );
2306            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2307            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
2308            dbgOutMessageLen( msgs );
2309
2310            tr_free( benc );
2311            tr_bencFree( &val );
2312        }
2313
2314        /* cleanup */
2315        tr_free( diffs.added );
2316        tr_free( diffs.dropped );
2317        tr_free( newPex );
2318        tr_free( diffs6.added );
2319        tr_free( diffs6.dropped );
2320        tr_free( newPex6 );
2321
2322        /*msgs->clientSentPexAt = tr_time( );*/
2323    }
2324}
2325
2326static void
2327pexPulse( int foo UNUSED, short bar UNUSED, void * vmsgs )
2328{
2329    struct tr_peermsgs * msgs = vmsgs;
2330
2331    sendPex( msgs );
2332
2333    tr_timerAdd( &msgs->pexTimer, PEX_INTERVAL_SECS, 0 );
2334}
2335
2336/**
2337***
2338**/
2339
2340tr_peermsgs*
2341tr_peerMsgsNew( struct tr_torrent * torrent,
2342                struct tr_peer    * peer,
2343                tr_delivery_func    func,
2344                void              * userData,
2345                tr_publisher_tag  * setme )
2346{
2347    tr_peermsgs * m;
2348
2349    assert( peer );
2350    assert( peer->io );
2351
2352    m = tr_new0( tr_peermsgs, 1 );
2353    m->publisher = TR_PUBLISHER_INIT;
2354    m->peer = peer;
2355    m->torrent = torrent;
2356    m->peer->clientIsChoked = 1;
2357    m->peer->peerIsChoked = 1;
2358    m->peer->clientIsInterested = 0;
2359    m->peer->peerIsInterested = 0;
2360    m->state = AWAITING_BT_LENGTH;
2361    m->outMessages = evbuffer_new( );
2362    m->outMessagesBatchedAt = 0;
2363    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2364    m->incoming.block = evbuffer_new( );
2365    evtimer_set( &m->pexTimer, pexPulse, m );
2366    tr_timerAdd( &m->pexTimer, PEX_INTERVAL_SECS, 0 );
2367    peer->msgs = m;
2368
2369    *setme = tr_publisherSubscribe( &m->publisher, func, userData );
2370
2371    if( tr_peerIoSupportsLTEP( peer->io ) )
2372        sendLtepHandshake( m );
2373
2374    if(tr_peerIoSupportsDHT(peer->io)) {
2375        /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
2376        const struct tr_address *addr = tr_peerIoGetAddress( peer->io, NULL );
2377        if( addr->type == TR_AF_INET || tr_globalIPv6() ) {
2378            protocolSendPort( m, tr_dhtPort( torrent->session ) );
2379        }
2380    }
2381
2382    tellPeerWhatWeHave( m );
2383
2384    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2385    updateDesiredRequestCount( m, tr_date( ) );
2386
2387    return m;
2388}
2389
2390void
2391tr_peerMsgsFree( tr_peermsgs* msgs )
2392{
2393    if( msgs )
2394    {
2395        evtimer_del( &msgs->pexTimer );
2396        tr_publisherDestruct( &msgs->publisher );
2397
2398        evbuffer_free( msgs->incoming.block );
2399        evbuffer_free( msgs->outMessages );
2400        tr_free( msgs->pex6 );
2401        tr_free( msgs->pex );
2402
2403        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2404        tr_free( msgs );
2405    }
2406}
2407
2408void
2409tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2410                        tr_publisher_tag tag )
2411{
2412    tr_publisherUnsubscribe( &peer->publisher, tag );
2413}
Note: See TracBrowser for help on using the repository browser.