source: trunk/libtransmission/peer-msgs.c @ 10798

Last change on this file since 10798 was 10798, checked in by charles, 12 years ago

(trunk) #1521 "memory cache to reduce disk IO" -- commit block-cache-rc1.diff to trunk for the nightlies.

  • Property svn:keywords set to Date Rev Author Id
File size: 71.1 KB
Line 
1/*
2 * This file Copyright (C) 2007-2010 Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 10798 2010-06-19 14:25:11Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdarg.h>
17#include <stdio.h>
18#include <stdlib.h>
19#include <string.h>
20
21#include <event.h>
22
23#include "transmission.h"
24#include "bencode.h"
25#include "cache.h"
26#include "completion.h"
27#include "crypto.h"
28#ifdef WIN32
29#include "net.h" /* for ECONN */
30#endif
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-msgs.h"
34#include "session.h"
35#include "stats.h"
36#include "torrent.h"
37#include "torrent-magnet.h"
38#include "tr-dht.h"
39#include "utils.h"
40#include "version.h"
41
42/**
43***
44**/
45
46enum
47{
48    BT_CHOKE                = 0,
49    BT_UNCHOKE              = 1,
50    BT_INTERESTED           = 2,
51    BT_NOT_INTERESTED       = 3,
52    BT_HAVE                 = 4,
53    BT_BITFIELD             = 5,
54    BT_REQUEST              = 6,
55    BT_PIECE                = 7,
56    BT_CANCEL               = 8,
57    BT_PORT                 = 9,
58
59    BT_FEXT_SUGGEST         = 13,
60    BT_FEXT_HAVE_ALL        = 14,
61    BT_FEXT_HAVE_NONE       = 15,
62    BT_FEXT_REJECT          = 16,
63    BT_FEXT_ALLOWED_FAST    = 17,
64
65    BT_LTEP                 = 20,
66
67    LTEP_HANDSHAKE          = 0,
68
69    UT_PEX_ID               = 1,
70    UT_METADATA_ID          = 3,
71
72    MAX_PEX_PEER_COUNT      = 50,
73
74    MIN_CHOKE_PERIOD_SEC    = 10,
75
76    /* idle seconds before we send a keepalive */
77    KEEPALIVE_INTERVAL_SECS = 100,
78
79    PEX_INTERVAL_SECS       = 90, /* sec between sendPex() calls */
80
81    REQQ                    = 512,
82
83    METADATA_REQQ           = 64,
84
85    /* used in lowering the outMessages queue period */
86    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
87    HIGH_PRIORITY_INTERVAL_SECS = 2,
88    LOW_PRIORITY_INTERVAL_SECS = 10,
89
90    /* number of pieces to remove from the bitfield when
91     * lazy bitfields are turned on */
92    LAZY_PIECE_COUNT = 26,
93
94    /* number of pieces we'll allow in our fast set */
95    MAX_FAST_SET_SIZE = 3,
96
97    /* defined in BEP #9 */
98    METADATA_MSG_TYPE_REQUEST = 0,
99    METADATA_MSG_TYPE_DATA = 1,
100    METADATA_MSG_TYPE_REJECT = 2
101};
102
103enum
104{
105    AWAITING_BT_LENGTH,
106    AWAITING_BT_ID,
107    AWAITING_BT_MESSAGE,
108    AWAITING_BT_PIECE
109};
110
111/**
112***
113**/
114
115struct peer_request
116{
117    uint32_t    index;
118    uint32_t    offset;
119    uint32_t    length;
120};
121
122static uint32_t
123getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
124{
125    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
126    const uint64_t blockPos = tor->blockSize * b;
127    assert( blockPos >= piecePos );
128    return (uint32_t)( blockPos - piecePos );
129}
130
131static void
132blockToReq( const tr_torrent     * tor,
133            tr_block_index_t       block,
134            struct peer_request  * setme )
135{
136    assert( setme != NULL );
137
138    setme->index = tr_torBlockPiece( tor, block );
139    setme->offset = getBlockOffsetInPiece( tor, block );
140    setme->length = tr_torBlockCountBytes( tor, block );
141}
142
143/**
144***
145**/
146
147/* this is raw, unchanged data from the peer regarding
148 * the current message that it's sending us. */
149struct tr_incoming
150{
151    uint8_t                id;
152    uint32_t               length; /* includes the +1 for id length */
153    struct peer_request    blockReq; /* metadata for incoming blocks */
154    struct evbuffer *      block; /* piece data for incoming blocks */
155};
156
157/**
158 * Low-level communication state information about a connected peer.
159 *
160 * This structure remembers the low-level protocol states that we're
161 * in with this peer, such as active requests, pex messages, and so on.
162 * Its fields are all private to peer-msgs.c.
163 *
164 * Data not directly involved with sending & receiving messages is
165 * stored in tr_peer, where it can be accessed by both peermsgs and
166 * the peer manager.
167 *
168 * @see struct peer_atom
169 * @see tr_peer
170 */
171struct tr_peermsgs
172{
173    tr_bool         peerSupportsPex;
174    tr_bool         peerSupportsMetadataXfer;
175    tr_bool         clientSentLtepHandshake;
176    tr_bool         peerSentLtepHandshake;
177
178    /*tr_bool         haveFastSet;*/
179
180    int             desiredRequestCount;
181
182    int             prefetchCount;
183
184    /* how long the outMessages batch should be allowed to grow before
185     * it's flushed -- some messages (like requests >:) should be sent
186     * very quickly; others aren't as urgent. */
187    int8_t          outMessagesBatchPeriod;
188
189    uint8_t         state;
190    uint8_t         ut_pex_id;
191    uint8_t         ut_metadata_id;
192    uint16_t        pexCount;
193    uint16_t        pexCount6;
194
195#if 0
196    size_t                 fastsetSize;
197    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
198#endif
199
200    tr_peer *              peer;
201
202    tr_torrent *           torrent;
203
204    tr_publisher           publisher;
205
206    struct evbuffer *      outMessages; /* all the non-piece messages */
207
208    struct peer_request    peerAskedFor[REQQ];
209
210    int                    peerAskedForMetadata[METADATA_REQQ];
211    int                    peerAskedForMetadataCount;
212
213    tr_pex               * pex;
214    tr_pex               * pex6;
215
216    /*time_t                 clientSentPexAt;*/
217    time_t                 clientSentAnythingAt;
218
219    /* when we started batching the outMessages */
220    time_t                outMessagesBatchedAt;
221
222    struct tr_incoming    incoming;
223
224    /* if the peer supports the Extension Protocol in BEP 10 and
225       supplied a reqq argument, it's stored here.  otherwise the
226       value is zero and should be ignored. */
227    int64_t               reqq;
228
229    struct event          pexTimer;
230};
231
232/**
233***
234**/
235
236#if 0
237static tr_bitfield*
238getHave( const struct tr_peermsgs * msgs )
239{
240    if( msgs->peer->have == NULL )
241        msgs->peer->have = tr_bitfieldNew( msgs->torrent->info.pieceCount );
242    return msgs->peer->have;
243}
244#endif
245
246static inline tr_session*
247getSession( struct tr_peermsgs * msgs )
248{
249    return msgs->torrent->session;
250}
251
252/**
253***
254**/
255
256static void
257myDebug( const char * file, int line,
258         const struct tr_peermsgs * msgs,
259         const char * fmt, ... )
260{
261    FILE * fp = tr_getLog( );
262
263    if( fp )
264    {
265        va_list           args;
266        char              timestr[64];
267        struct evbuffer * buf = evbuffer_new( );
268        char *            base = tr_basename( file );
269
270        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
271                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
272                             tr_torrentName( msgs->torrent ),
273                             tr_peerIoGetAddrStr( msgs->peer->io ),
274                             msgs->peer->client );
275        va_start( args, fmt );
276        evbuffer_add_vprintf( buf, fmt, args );
277        va_end( args );
278        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
279        /* FIXME(libevent2) tr_getLog() should return an fd, then use evbuffer_write() here */
280        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
281
282        tr_free( base );
283        evbuffer_free( buf );
284    }
285}
286
287#define dbgmsg( msgs, ... ) \
288    do { \
289        if( tr_deepLoggingIsActive( ) ) \
290            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
291    } while( 0 )
292
293/**
294***
295**/
296
297static void
298pokeBatchPeriod( tr_peermsgs * msgs,
299                 int           interval )
300{
301    if( msgs->outMessagesBatchPeriod > interval )
302    {
303        msgs->outMessagesBatchPeriod = interval;
304        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
305    }
306}
307
308static inline void
309dbgOutMessageLen( tr_peermsgs * msgs )
310{
311    dbgmsg( msgs, "outMessage size is now %zu", EVBUFFER_LENGTH( msgs->outMessages ) );
312}
313
314static void
315protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
316{
317    tr_peerIo       * io  = msgs->peer->io;
318    struct evbuffer * out = msgs->outMessages;
319
320    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
321
322    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
323    tr_peerIoWriteUint8 ( io, out, BT_FEXT_REJECT );
324    tr_peerIoWriteUint32( io, out, req->index );
325    tr_peerIoWriteUint32( io, out, req->offset );
326    tr_peerIoWriteUint32( io, out, req->length );
327
328    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
329    dbgOutMessageLen( msgs );
330}
331
332static void
333protocolSendRequest( tr_peermsgs               * msgs,
334                     const struct peer_request * req )
335{
336    tr_peerIo       * io  = msgs->peer->io;
337    struct evbuffer * out = msgs->outMessages;
338
339    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
340    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
341    tr_peerIoWriteUint32( io, out, req->index );
342    tr_peerIoWriteUint32( io, out, req->offset );
343    tr_peerIoWriteUint32( io, out, req->length );
344
345    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
346    dbgOutMessageLen( msgs );
347    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
348}
349
350static void
351protocolSendCancel( tr_peermsgs               * msgs,
352                    const struct peer_request * req )
353{
354    tr_peerIo       * io  = msgs->peer->io;
355    struct evbuffer * out = msgs->outMessages;
356
357    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
358    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
359    tr_peerIoWriteUint32( io, out, req->index );
360    tr_peerIoWriteUint32( io, out, req->offset );
361    tr_peerIoWriteUint32( io, out, req->length );
362
363    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
364    dbgOutMessageLen( msgs );
365    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
366}
367
368static void
369protocolSendPort(tr_peermsgs *msgs, uint16_t port)
370{
371    tr_peerIo       * io  = msgs->peer->io;
372    struct evbuffer * out = msgs->outMessages;
373
374    dbgmsg( msgs, "sending Port %u", port);
375    tr_peerIoWriteUint32( io, out, 3 );
376    tr_peerIoWriteUint8 ( io, out, BT_PORT );
377    tr_peerIoWriteUint16( io, out, port);
378}
379
380static void
381protocolSendHave( tr_peermsgs * msgs,
382                  uint32_t      index )
383{
384    tr_peerIo       * io  = msgs->peer->io;
385    struct evbuffer * out = msgs->outMessages;
386
387    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
388    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
389    tr_peerIoWriteUint32( io, out, index );
390
391    dbgmsg( msgs, "sending Have %u", index );
392    dbgOutMessageLen( msgs );
393    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
394}
395
396#if 0
397static void
398protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
399{
400    tr_peerIo       * io  = msgs->peer->io;
401    struct evbuffer * out = msgs->outMessages;
402
403    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
404
405    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
406    tr_peerIoWriteUint8 ( io, out, BT_FEXT_ALLOWED_FAST );
407    tr_peerIoWriteUint32( io, out, pieceIndex );
408
409    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
410    dbgOutMessageLen( msgs );
411}
412#endif
413
414static void
415protocolSendChoke( tr_peermsgs * msgs,
416                   int           choke )
417{
418    tr_peerIo       * io  = msgs->peer->io;
419    struct evbuffer * out = msgs->outMessages;
420
421    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
422    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
423
424    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
425    dbgOutMessageLen( msgs );
426    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
427}
428
429static void
430protocolSendHaveAll( tr_peermsgs * msgs )
431{
432    tr_peerIo       * io  = msgs->peer->io;
433    struct evbuffer * out = msgs->outMessages;
434
435    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
436
437    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
438    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_ALL );
439
440    dbgmsg( msgs, "sending HAVE_ALL..." );
441    dbgOutMessageLen( msgs );
442    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
443}
444
445static void
446protocolSendHaveNone( tr_peermsgs * msgs )
447{
448    tr_peerIo       * io  = msgs->peer->io;
449    struct evbuffer * out = msgs->outMessages;
450
451    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
452
453    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
454    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_NONE );
455
456    dbgmsg( msgs, "sending HAVE_NONE..." );
457    dbgOutMessageLen( msgs );
458    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
459}
460
461/**
462***  EVENTS
463**/
464
465static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0, 0 };
466
467static void
468publish( tr_peermsgs * msgs, tr_peer_event * e )
469{
470    assert( msgs->peer );
471    assert( msgs->peer->msgs == msgs );
472
473    tr_publisherPublish( &msgs->publisher, msgs->peer, e );
474}
475
476static void
477fireError( tr_peermsgs * msgs, int err )
478{
479    tr_peer_event e = blankEvent;
480    e.eventType = TR_PEER_ERROR;
481    e.err = err;
482    publish( msgs, &e );
483}
484
485static void
486firePeerProgress( tr_peermsgs * msgs )
487{
488    tr_peer_event e = blankEvent;
489    e.eventType = TR_PEER_PEER_PROGRESS;
490    e.progress = msgs->peer->progress;
491    publish( msgs, &e );
492}
493
494static void
495fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
496{
497    tr_peer_event e = blankEvent;
498    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
499    e.pieceIndex = req->index;
500    e.offset = req->offset;
501    e.length = req->length;
502    publish( msgs, &e );
503}
504
505static void
506fireGotRej( tr_peermsgs * msgs, const struct peer_request * req )
507{
508    tr_peer_event e = blankEvent;
509    e.eventType = TR_PEER_CLIENT_GOT_REJ;
510    e.pieceIndex = req->index;
511    e.offset = req->offset;
512    e.length = req->length;
513    publish( msgs, &e );
514}
515
516static void
517fireGotChoke( tr_peermsgs * msgs )
518{
519    tr_peer_event e = blankEvent;
520    e.eventType = TR_PEER_CLIENT_GOT_CHOKE;
521    publish( msgs, &e );
522}
523
524static void
525fireClientGotData( tr_peermsgs * msgs,
526                   uint32_t      length,
527                   int           wasPieceData )
528{
529    tr_peer_event e = blankEvent;
530
531    e.length = length;
532    e.eventType = TR_PEER_CLIENT_GOT_DATA;
533    e.wasPieceData = wasPieceData;
534    publish( msgs, &e );
535}
536
537static void
538fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
539{
540    tr_peer_event e = blankEvent;
541    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
542    e.pieceIndex = pieceIndex;
543    publish( msgs, &e );
544}
545
546static void
547fireClientGotPort( tr_peermsgs * msgs, tr_port port )
548{
549    tr_peer_event e = blankEvent;
550    e.eventType = TR_PEER_CLIENT_GOT_PORT;
551    e.port = port;
552    publish( msgs, &e );
553}
554
555static void
556fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
557{
558    tr_peer_event e = blankEvent;
559    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
560    e.pieceIndex = pieceIndex;
561    publish( msgs, &e );
562}
563
564static void
565firePeerGotData( tr_peermsgs  * msgs,
566                 uint32_t       length,
567                 int            wasPieceData )
568{
569    tr_peer_event e = blankEvent;
570
571    e.length = length;
572    e.eventType = TR_PEER_PEER_GOT_DATA;
573    e.wasPieceData = wasPieceData;
574
575    publish( msgs, &e );
576}
577
578/**
579***  ALLOWED FAST SET
580***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
581**/
582
583size_t
584tr_generateAllowedSet( tr_piece_index_t * setmePieces,
585                       size_t             desiredSetSize,
586                       size_t             pieceCount,
587                       const uint8_t    * infohash,
588                       const tr_address * addr )
589{
590    size_t setSize = 0;
591
592    assert( setmePieces );
593    assert( desiredSetSize <= pieceCount );
594    assert( desiredSetSize );
595    assert( pieceCount );
596    assert( infohash );
597    assert( addr );
598
599    if( addr->type == TR_AF_INET )
600    {
601        uint8_t w[SHA_DIGEST_LENGTH + 4], *walk=w;
602        uint8_t x[SHA_DIGEST_LENGTH];
603
604        uint32_t ui32 = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
605        memcpy( w, &ui32, sizeof( uint32_t ) );
606        walk += sizeof( uint32_t );
607        memcpy( walk, infohash, SHA_DIGEST_LENGTH );                 /* (2) */
608        walk += SHA_DIGEST_LENGTH;
609        tr_sha1( x, w, walk-w, NULL );                               /* (3) */
610        assert( sizeof( w ) == walk-w );
611
612        while( setSize<desiredSetSize )
613        {
614            int i;
615            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
616            {
617                size_t k;
618                uint32_t j = i * 4;                                  /* (5) */
619                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
620                uint32_t index = y % pieceCount;                     /* (7) */
621
622                for( k=0; k<setSize; ++k )                           /* (8) */
623                    if( setmePieces[k] == index )
624                        break;
625
626                if( k == setSize )
627                    setmePieces[setSize++] = index;                  /* (9) */
628            }
629
630            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
631        }
632    }
633
634    return setSize;
635}
636
637static void
638updateFastSet( tr_peermsgs * msgs UNUSED )
639{
640#if 0
641    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
642    const int peerIsNeedy = msgs->peer->progress < 0.10;
643
644    if( fext && peerIsNeedy && !msgs->haveFastSet )
645    {
646        size_t i;
647        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
648        const tr_info * inf = &msgs->torrent->info;
649        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
650
651        /* build the fast set */
652        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
653        msgs->haveFastSet = 1;
654
655        /* send it to the peer */
656        for( i=0; i<msgs->fastsetSize; ++i )
657            protocolSendAllowedFast( msgs, msgs->fastset[i] );
658    }
659#endif
660}
661
662/**
663***  INTEREST
664**/
665
666static void
667sendInterest( tr_peermsgs * msgs, tr_bool clientIsInterested )
668{
669    struct evbuffer * out = msgs->outMessages;
670
671    assert( msgs );
672    assert( tr_isBool( clientIsInterested ) );
673
674    msgs->peer->clientIsInterested = clientIsInterested;
675    dbgmsg( msgs, "Sending %s", clientIsInterested ? "Interested" : "Not Interested" );
676    tr_peerIoWriteUint32( msgs->peer->io, out, sizeof( uint8_t ) );
677    tr_peerIoWriteUint8 ( msgs->peer->io, out, clientIsInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
678
679    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
680    dbgOutMessageLen( msgs );
681}
682
683static void
684updateInterest( tr_peermsgs * msgs UNUSED )
685{
686    /* FIXME -- might need to poke the mgr on startup */
687}
688
689void
690tr_peerMsgsSetInterested( tr_peermsgs * msgs, int isInterested )
691{
692    assert( tr_isBool( isInterested ) );
693
694    if( isInterested != msgs->peer->clientIsInterested )
695        sendInterest( msgs, isInterested );
696}
697
698static tr_bool
699popNextMetadataRequest( tr_peermsgs * msgs, int * piece )
700{
701    if( msgs->peerAskedForMetadataCount == 0 )
702        return FALSE;
703
704    *piece = msgs->peerAskedForMetadata[0];
705
706    tr_removeElementFromArray( msgs->peerAskedForMetadata, 0, sizeof( int ),
707                               msgs->peerAskedForMetadataCount-- );
708
709    return TRUE;
710}
711
712static tr_bool
713popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
714{
715    if( msgs->peer->pendingReqsToClient == 0 )
716        return FALSE;
717
718    *setme = msgs->peerAskedFor[0];
719
720    tr_removeElementFromArray( msgs->peerAskedFor, 0, sizeof( struct peer_request ),
721                               msgs->peer->pendingReqsToClient-- );
722
723    return TRUE;
724}
725
726static void
727cancelAllRequestsToClient( tr_peermsgs * msgs )
728{
729    struct peer_request req;
730    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
731
732    while( popNextRequest( msgs, &req ))
733        if( mustSendCancel )
734            protocolSendReject( msgs, &req );
735}
736
737void
738tr_peerMsgsSetChoke( tr_peermsgs * msgs,
739                     int           choke )
740{
741    const time_t now = tr_time( );
742    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
743
744    assert( msgs );
745    assert( msgs->peer );
746    assert( choke == 0 || choke == 1 );
747
748    if( msgs->peer->chokeChangedAt > fibrillationTime )
749    {
750        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
751    }
752    else if( msgs->peer->peerIsChoked != choke )
753    {
754        msgs->peer->peerIsChoked = choke;
755        if( choke )
756            cancelAllRequestsToClient( msgs );
757        protocolSendChoke( msgs, choke );
758        msgs->peer->chokeChangedAt = now;
759    }
760}
761
762/**
763***
764**/
765
766void
767tr_peerMsgsHave( tr_peermsgs * msgs,
768                 uint32_t      index )
769{
770    protocolSendHave( msgs, index );
771
772    /* since we have more pieces now, we might not be interested in this peer */
773    updateInterest( msgs );
774}
775
776/**
777***
778**/
779
780static tr_bool
781reqIsValid( const tr_peermsgs * peer,
782            uint32_t            index,
783            uint32_t            offset,
784            uint32_t            length )
785{
786    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
787}
788
789static tr_bool
790requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
791{
792    return reqIsValid( msgs, req->index, req->offset, req->length );
793}
794
795void
796tr_peerMsgsCancel( tr_peermsgs * msgs, tr_block_index_t block )
797{
798    struct peer_request req;
799/*fprintf( stderr, "SENDING CANCEL MESSAGE FOR BLOCK %zu\n\t\tFROM PEER %p ------------------------------------\n", (size_t)block, msgs->peer );*/
800    blockToReq( msgs->torrent, block, &req );
801    protocolSendCancel( msgs, &req );
802}
803
804/**
805***
806**/
807
808static void
809sendLtepHandshake( tr_peermsgs * msgs )
810{
811    tr_benc val, *m;
812    char * buf;
813    int len;
814    tr_bool allow_pex;
815    tr_bool allow_metadata_xfer;
816    struct evbuffer * out = msgs->outMessages;
817    const unsigned char * ipv6 = tr_globalIPv6();
818
819    if( msgs->clientSentLtepHandshake )
820        return;
821
822    dbgmsg( msgs, "sending an ltep handshake" );
823    msgs->clientSentLtepHandshake = 1;
824
825    /* decide if we want to advertise metadata xfer support (BEP 9) */
826    if( tr_torrentIsPrivate( msgs->torrent ) )
827        allow_metadata_xfer = 0;
828    else
829        allow_metadata_xfer = 1;
830
831    /* decide if we want to advertise pex support */
832    if( !tr_torrentAllowsPex( msgs->torrent ) )
833        allow_pex = 0;
834    else if( msgs->peerSentLtepHandshake )
835        allow_pex = msgs->peerSupportsPex ? 1 : 0;
836    else
837        allow_pex = 1;
838
839    tr_bencInitDict( &val, 8 );
840    tr_bencDictAddInt( &val, "e", getSession(msgs)->encryptionMode != TR_CLEAR_PREFERRED );
841    if( ipv6 != NULL )
842        tr_bencDictAddRaw( &val, "ipv6", ipv6, 16 );
843    if( allow_metadata_xfer && tr_torrentHasMetadata( msgs->torrent )
844                            && ( msgs->torrent->infoDictLength > 0 ) )
845        tr_bencDictAddInt( &val, "metadata_size", msgs->torrent->infoDictLength );
846    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( getSession(msgs) ) );
847    tr_bencDictAddInt( &val, "reqq", REQQ );
848    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
849    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
850    m  = tr_bencDictAddDict( &val, "m", 2 );
851    if( allow_metadata_xfer )
852        tr_bencDictAddInt( m, "ut_metadata", UT_METADATA_ID );
853    if( allow_pex )
854        tr_bencDictAddInt( m, "ut_pex", UT_PEX_ID );
855
856    buf = tr_bencToStr( &val, TR_FMT_BENC, &len );
857
858    tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + len );
859    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
860    tr_peerIoWriteUint8 ( msgs->peer->io, out, LTEP_HANDSHAKE );
861    tr_peerIoWriteBytes ( msgs->peer->io, out, buf, len );
862    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
863    dbgOutMessageLen( msgs );
864
865    /* cleanup */
866    tr_bencFree( &val );
867    tr_free( buf );
868}
869
870static void
871parseLtepHandshake( tr_peermsgs *     msgs,
872                    int               len,
873                    struct evbuffer * inbuf )
874{
875    int64_t   i;
876    tr_benc   val, * sub;
877    uint8_t * tmp = tr_new( uint8_t, len );
878    const uint8_t *addr;
879    size_t addr_len;
880    tr_pex pex;
881    int8_t seedProbability = -1;
882
883    memset( &pex, 0, sizeof( tr_pex ) );
884
885    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
886    msgs->peerSentLtepHandshake = 1;
887
888    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
889    {
890        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
891        tr_free( tmp );
892        return;
893    }
894
895    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
896
897    /* does the peer prefer encrypted connections? */
898    if( tr_bencDictFindInt( &val, "e", &i ) ) {
899        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
900                                              : ENCRYPTION_PREFERENCE_NO;
901        if( i )
902            pex.flags |= ADDED_F_ENCRYPTION_FLAG;
903    }
904
905    /* check supported messages for utorrent pex */
906    msgs->peerSupportsPex = 0;
907    msgs->peerSupportsMetadataXfer = 0;
908
909    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
910        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
911            msgs->peerSupportsPex = i != 0;
912            msgs->ut_pex_id = (uint8_t) i;
913            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
914        }
915        if( tr_bencDictFindInt( sub, "ut_metadata", &i ) ) {
916            msgs->peerSupportsMetadataXfer = i != 0;
917            msgs->ut_metadata_id = (uint8_t) i;
918            dbgmsg( msgs, "msgs->ut_metadata_id is %d", (int)msgs->ut_metadata_id );
919        }
920    }
921
922    /* look for metainfo size (BEP 9) */
923    if( tr_bencDictFindInt( &val, "metadata_size", &i ) )
924        tr_torrentSetMetadataSizeHint( msgs->torrent, i );
925
926    /* look for upload_only (BEP 21) */
927    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
928        seedProbability = i==0 ? 0 : 100;
929
930    /* get peer's listening port */
931    if( tr_bencDictFindInt( &val, "p", &i ) ) {
932        pex.port = htons( (uint16_t)i );
933        fireClientGotPort( msgs, pex.port );
934        dbgmsg( msgs, "peer's port is now %d", (int)i );
935    }
936
937    if( tr_peerIoIsIncoming( msgs->peer->io )
938        && tr_bencDictFindRaw( &val, "ipv4", &addr, &addr_len )
939        && ( addr_len == 4 ) )
940    {
941        pex.addr.type = TR_AF_INET;
942        memcpy( &pex.addr.addr.addr4, addr, 4 );
943        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
944    }
945
946    if( tr_peerIoIsIncoming( msgs->peer->io )
947        && tr_bencDictFindRaw( &val, "ipv6", &addr, &addr_len )
948        && ( addr_len == 16 ) )
949    {
950        pex.addr.type = TR_AF_INET6;
951        memcpy( &pex.addr.addr.addr6, addr, 16 );
952        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
953    }
954
955    /* get peer's maximum request queue size */
956    if( tr_bencDictFindInt( &val, "reqq", &i ) )
957        msgs->reqq = i;
958
959    tr_bencFree( &val );
960    tr_free( tmp );
961}
962
963static void
964parseUtMetadata( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
965{
966    tr_benc dict;
967    char * msg_end;
968    char * benc_end;
969    int64_t msg_type = -1;
970    int64_t piece = -1;
971    int64_t total_size = 0;
972    uint8_t * tmp = tr_new( uint8_t, msglen );
973
974    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
975    msg_end = (char*)tmp + msglen;
976
977    if( !tr_bencLoad( tmp, msglen, &dict, &benc_end ) )
978    {
979        tr_bencDictFindInt( &dict, "msg_type", &msg_type );
980        tr_bencDictFindInt( &dict, "piece", &piece );
981        tr_bencDictFindInt( &dict, "total_size", &total_size );
982        tr_bencFree( &dict );
983    }
984
985    dbgmsg( msgs, "got ut_metadata msg: type %d, piece %d, total_size %d",
986            (int)msg_type, (int)piece, (int)total_size );
987
988    if( msg_type == METADATA_MSG_TYPE_REJECT )
989    {
990        /* NOOP */
991    }
992
993    if( ( msg_type == METADATA_MSG_TYPE_DATA )
994        && ( !tr_torrentHasMetadata( msgs->torrent ) )
995        && ( msg_end - benc_end <= METADATA_PIECE_SIZE )
996        && ( piece * METADATA_PIECE_SIZE + (msg_end - benc_end) <= total_size ) )
997    {
998        const int pieceLen = msg_end - benc_end;
999        tr_torrentSetMetadataPiece( msgs->torrent, piece, benc_end, pieceLen );
1000    }
1001
1002    if( msg_type == METADATA_MSG_TYPE_REQUEST )
1003    {
1004        if( ( piece >= 0 )
1005            && tr_torrentHasMetadata( msgs->torrent )
1006            && !tr_torrentIsPrivate( msgs->torrent )
1007            && ( msgs->peerAskedForMetadataCount < METADATA_REQQ ) )
1008        {
1009            msgs->peerAskedForMetadata[msgs->peerAskedForMetadataCount++] = piece;
1010        }
1011        else
1012        {
1013            tr_benc tmp;
1014            int payloadLen;
1015            char * payload;
1016            tr_peerIo  * io  = msgs->peer->io;
1017            struct evbuffer * out = msgs->outMessages;
1018
1019            /* build the rejection message */
1020            tr_bencInitDict( &tmp, 2 );
1021            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1022            tr_bencDictAddInt( &tmp, "piece", piece );
1023            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1024            tr_bencFree( &tmp );
1025
1026            /* write it out as a LTEP message to our outMessages buffer */
1027            tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + payloadLen );
1028            tr_peerIoWriteUint8 ( io, out, BT_LTEP );
1029            tr_peerIoWriteUint8 ( io, out, msgs->ut_metadata_id );
1030            tr_peerIoWriteBytes ( io, out, payload, payloadLen );
1031            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1032            dbgOutMessageLen( msgs );
1033
1034            tr_free( payload );
1035        }
1036    }
1037
1038    tr_free( tmp );
1039}
1040
1041static void
1042parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1043{
1044    int loaded = 0;
1045    uint8_t * tmp = tr_new( uint8_t, msglen );
1046    tr_benc val;
1047    tr_torrent * tor = msgs->torrent;
1048    const uint8_t * added;
1049    size_t added_len;
1050
1051    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1052
1053    if( tr_torrentAllowsPex( tor )
1054      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1055    {
1056        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1057        {
1058            tr_pex * pex;
1059            size_t i, n;
1060            size_t added_f_len = 0;
1061            const uint8_t * added_f = NULL;
1062
1063            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1064            pex = tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len, &n );
1065
1066            n = MIN( n, MAX_PEX_PEER_COUNT );
1067            for( i=0; i<n; ++i )
1068            {
1069                int seedProbability = -1;
1070                if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
1071                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
1072            }
1073
1074            tr_free( pex );
1075        }
1076
1077        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1078        {
1079            tr_pex * pex;
1080            size_t i, n;
1081            size_t added_f_len = 0;
1082            const uint8_t * added_f = NULL;
1083
1084            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1085            pex = tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len, &n );
1086
1087            n = MIN( n, MAX_PEX_PEER_COUNT );
1088            for( i=0; i<n; ++i )
1089            {
1090                int seedProbability = -1;
1091                if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
1092                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
1093            }
1094
1095            tr_free( pex );
1096        }
1097    }
1098
1099    if( loaded )
1100        tr_bencFree( &val );
1101    tr_free( tmp );
1102}
1103
1104static void sendPex( tr_peermsgs * msgs );
1105
1106static void
1107parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer  * inbuf )
1108{
1109    uint8_t ltep_msgid;
1110
1111    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1112    msglen--;
1113
1114    if( ltep_msgid == LTEP_HANDSHAKE )
1115    {
1116        dbgmsg( msgs, "got ltep handshake" );
1117        parseLtepHandshake( msgs, msglen, inbuf );
1118        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1119        {
1120            sendLtepHandshake( msgs );
1121            sendPex( msgs );
1122        }
1123    }
1124    else if( ltep_msgid == UT_PEX_ID )
1125    {
1126        dbgmsg( msgs, "got ut pex" );
1127        msgs->peerSupportsPex = 1;
1128        parseUtPex( msgs, msglen, inbuf );
1129    }
1130    else if( ltep_msgid == UT_METADATA_ID )
1131    {
1132        dbgmsg( msgs, "got ut metadata" );
1133        msgs->peerSupportsMetadataXfer = 1;
1134        parseUtMetadata( msgs, msglen, inbuf );
1135    }
1136    else
1137    {
1138        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1139        evbuffer_drain( inbuf, msglen );
1140    }
1141}
1142
1143static int
1144readBtLength( tr_peermsgs *     msgs,
1145              struct evbuffer * inbuf,
1146              size_t            inlen )
1147{
1148    uint32_t len;
1149
1150    if( inlen < sizeof( len ) )
1151        return READ_LATER;
1152
1153    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1154
1155    if( len == 0 ) /* peer sent us a keepalive message */
1156        dbgmsg( msgs, "got KeepAlive" );
1157    else
1158    {
1159        msgs->incoming.length = len;
1160        msgs->state = AWAITING_BT_ID;
1161    }
1162
1163    return READ_NOW;
1164}
1165
1166static int readBtMessage( tr_peermsgs     * msgs,
1167                          struct evbuffer * inbuf,
1168                          size_t            inlen );
1169
1170static int
1171readBtId( tr_peermsgs * msgs, struct evbuffer  * inbuf, size_t inlen )
1172{
1173    uint8_t id;
1174
1175    if( inlen < sizeof( uint8_t ) )
1176        return READ_LATER;
1177
1178    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1179    msgs->incoming.id = id;
1180    dbgmsg( msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length );
1181
1182    if( id == BT_PIECE )
1183    {
1184        msgs->state = AWAITING_BT_PIECE;
1185        return READ_NOW;
1186    }
1187    else if( msgs->incoming.length != 1 )
1188    {
1189        msgs->state = AWAITING_BT_MESSAGE;
1190        return READ_NOW;
1191    }
1192    else return readBtMessage( msgs, inbuf, inlen - 1 );
1193}
1194
1195static void
1196updatePeerProgress( tr_peermsgs * msgs )
1197{
1198    msgs->peer->progress = tr_bitsetPercent( &msgs->peer->have );
1199    dbgmsg( msgs, "peer progress is %f", msgs->peer->progress );
1200    updateFastSet( msgs );
1201    updateInterest( msgs );
1202    firePeerProgress( msgs );
1203}
1204
1205static void
1206prefetchPieces( tr_peermsgs *msgs )
1207{
1208    int i;
1209
1210    /* Maintain 12 prefetched blocks per unchoked peer */
1211    for( i=msgs->prefetchCount; i<msgs->peer->pendingReqsToClient && i<12; ++i )
1212    {
1213        const struct peer_request * req = msgs->peerAskedFor + i;
1214        tr_cachePrefetchBlock( getSession(msgs)->cache, msgs->torrent, req->index, req->offset, req->length );
1215        ++msgs->prefetchCount;
1216    }
1217}
1218
1219static void
1220peerMadeRequest( tr_peermsgs *               msgs,
1221                 const struct peer_request * req )
1222{
1223    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1224    const int reqIsValid = requestIsValid( msgs, req );
1225    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( &msgs->torrent->completion, req->index );
1226    const int peerIsChoked = msgs->peer->peerIsChoked;
1227
1228    int allow = FALSE;
1229
1230    if( !reqIsValid )
1231        dbgmsg( msgs, "rejecting an invalid request." );
1232    else if( !clientHasPiece )
1233        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1234    else if( peerIsChoked )
1235        dbgmsg( msgs, "rejecting request from choked peer" );
1236    else if( msgs->peer->pendingReqsToClient + 1 >= REQQ )
1237        dbgmsg( msgs, "rejecting request ... reqq is full" );
1238    else
1239        allow = TRUE;
1240
1241    if( allow ) {
1242        msgs->peerAskedFor[msgs->peer->pendingReqsToClient++] = *req;
1243        prefetchPieces( msgs );
1244    } else if( fext ) {
1245        protocolSendReject( msgs, req );
1246    }
1247}
1248
1249static tr_bool
1250messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1251{
1252    switch( id )
1253    {
1254        case BT_CHOKE:
1255        case BT_UNCHOKE:
1256        case BT_INTERESTED:
1257        case BT_NOT_INTERESTED:
1258        case BT_FEXT_HAVE_ALL:
1259        case BT_FEXT_HAVE_NONE:
1260            return len == 1;
1261
1262        case BT_HAVE:
1263        case BT_FEXT_SUGGEST:
1264        case BT_FEXT_ALLOWED_FAST:
1265            return len == 5;
1266
1267        case BT_BITFIELD:
1268            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1269
1270        case BT_REQUEST:
1271        case BT_CANCEL:
1272        case BT_FEXT_REJECT:
1273            return len == 13;
1274
1275        case BT_PIECE:
1276            return len > 9 && len <= 16393;
1277
1278        case BT_PORT:
1279            return len == 3;
1280
1281        case BT_LTEP:
1282            return len >= 2;
1283
1284        default:
1285            return FALSE;
1286    }
1287}
1288
1289static int clientGotBlock( tr_peermsgs *               msgs,
1290                           const uint8_t *             block,
1291                           const struct peer_request * req );
1292
1293static int
1294readBtPiece( tr_peermsgs      * msgs,
1295             struct evbuffer  * inbuf,
1296             size_t             inlen,
1297             size_t           * setme_piece_bytes_read )
1298{
1299    struct peer_request * req = &msgs->incoming.blockReq;
1300
1301    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1302    dbgmsg( msgs, "In readBtPiece" );
1303
1304    if( !req->length )
1305    {
1306        if( inlen < 8 )
1307            return READ_LATER;
1308
1309        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1310        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1311        req->length = msgs->incoming.length - 9;
1312        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1313        return READ_NOW;
1314    }
1315    else
1316    {
1317        int err;
1318
1319        /* read in another chunk of data */
1320        const size_t nLeft = req->length - EVBUFFER_LENGTH( msgs->incoming.block );
1321        size_t n = MIN( nLeft, inlen );
1322        size_t i = n;
1323        void * buf = tr_sessionGetBuffer( getSession( msgs ) );
1324        const size_t buflen = SESSION_BUFFER_SIZE;
1325
1326        while( i > 0 )
1327        {
1328            const size_t thisPass = MIN( i, buflen );
1329            tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, thisPass );
1330            evbuffer_add( msgs->incoming.block, buf, thisPass );
1331            i -= thisPass;
1332        }
1333
1334        tr_sessionReleaseBuffer( getSession( msgs ) );
1335        buf = NULL;
1336
1337        fireClientGotData( msgs, n, TRUE );
1338        *setme_piece_bytes_read += n;
1339        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1340               n, req->index, req->offset, req->length,
1341               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1342        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1343            return READ_LATER;
1344
1345        /* we've got the whole block ... process it */
1346        err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), req );
1347
1348        /* cleanup */
1349        evbuffer_free( msgs->incoming.block );
1350        msgs->incoming.block = evbuffer_new( );
1351        req->length = 0;
1352        msgs->state = AWAITING_BT_LENGTH;
1353        return err ? READ_ERR : READ_NOW;
1354    }
1355}
1356
1357static void updateDesiredRequestCount( tr_peermsgs * msgs, uint64_t now );
1358
1359static int
1360readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1361{
1362    uint32_t      ui32;
1363    uint32_t      msglen = msgs->incoming.length;
1364    const uint8_t id = msgs->incoming.id;
1365#ifndef NDEBUG
1366    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1367#endif
1368    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1369
1370    --msglen; /* id length */
1371
1372    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1373
1374    if( inlen < msglen )
1375        return READ_LATER;
1376
1377    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1378    {
1379        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1380        fireError( msgs, EMSGSIZE );
1381        return READ_ERR;
1382    }
1383
1384    switch( id )
1385    {
1386        case BT_CHOKE:
1387            dbgmsg( msgs, "got Choke" );
1388            msgs->peer->clientIsChoked = 1;
1389            if( !fext )
1390                fireGotChoke( msgs );
1391            break;
1392
1393        case BT_UNCHOKE:
1394            dbgmsg( msgs, "got Unchoke" );
1395            msgs->peer->clientIsChoked = 0;
1396            updateDesiredRequestCount( msgs, tr_date( ) );
1397            break;
1398
1399        case BT_INTERESTED:
1400            dbgmsg( msgs, "got Interested" );
1401            msgs->peer->peerIsInterested = 1;
1402            break;
1403
1404        case BT_NOT_INTERESTED:
1405            dbgmsg( msgs, "got Not Interested" );
1406            msgs->peer->peerIsInterested = 0;
1407            break;
1408
1409        case BT_HAVE:
1410            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1411            dbgmsg( msgs, "got Have: %u", ui32 );
1412            if( tr_torrentHasMetadata( msgs->torrent )
1413                    && ( ui32 >= msgs->torrent->info.pieceCount ) )
1414            {
1415                fireError( msgs, ERANGE );
1416                return READ_ERR;
1417            }
1418            if( tr_bitsetAdd( &msgs->peer->have, ui32 ) )
1419            {
1420                fireError( msgs, ERANGE );
1421                return READ_ERR;
1422            }
1423            updatePeerProgress( msgs );
1424            break;
1425
1426        case BT_BITFIELD: {
1427            const size_t bitCount = tr_torrentHasMetadata( msgs->torrent )
1428                                  ? msgs->torrent->info.pieceCount
1429                                  : msglen * 8;
1430            dbgmsg( msgs, "got a bitfield" );
1431            tr_bitsetReserve( &msgs->peer->have, bitCount );
1432            tr_peerIoReadBytes( msgs->peer->io, inbuf,
1433                                msgs->peer->have.bitfield.bits, msglen );
1434            updatePeerProgress( msgs );
1435            break;
1436        }
1437
1438        case BT_REQUEST:
1439        {
1440            struct peer_request r;
1441            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1442            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1443            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1444            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1445            peerMadeRequest( msgs, &r );
1446            break;
1447        }
1448
1449        case BT_CANCEL:
1450        {
1451            int i;
1452            struct peer_request r;
1453            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1454            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1455            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1456            tr_historyAdd( msgs->peer->cancelsSentToClient, tr_date( ), 1 );
1457            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1458
1459            for( i=0; i<msgs->peer->pendingReqsToClient; ++i ) {
1460                const struct peer_request * req = msgs->peerAskedFor + i;
1461                if( ( req->index == r.index ) && ( req->offset == r.offset ) && ( req->length == r.length ) )
1462                    break;
1463            }
1464
1465            if( i < msgs->peer->pendingReqsToClient )
1466                tr_removeElementFromArray( msgs->peerAskedFor, i, sizeof( struct peer_request ),
1467                                           msgs->peer->pendingReqsToClient-- );
1468            break;
1469        }
1470
1471        case BT_PIECE:
1472            assert( 0 ); /* handled elsewhere! */
1473            break;
1474
1475        case BT_PORT:
1476            dbgmsg( msgs, "Got a BT_PORT" );
1477            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->dht_port );
1478            if( msgs->peer->dht_port > 0 )
1479                tr_dhtAddNode( getSession(msgs),
1480                               tr_peerAddress( msgs->peer ),
1481                               msgs->peer->dht_port, 0 );
1482            break;
1483
1484        case BT_FEXT_SUGGEST:
1485            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1486            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1487            if( fext )
1488                fireClientGotSuggest( msgs, ui32 );
1489            else {
1490                fireError( msgs, EMSGSIZE );
1491                return READ_ERR;
1492            }
1493            break;
1494
1495        case BT_FEXT_ALLOWED_FAST:
1496            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1497            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1498            if( fext )
1499                fireClientGotAllowedFast( msgs, ui32 );
1500            else {
1501                fireError( msgs, EMSGSIZE );
1502                return READ_ERR;
1503            }
1504            break;
1505
1506        case BT_FEXT_HAVE_ALL:
1507            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1508            if( fext ) {
1509                tr_bitsetSetHaveAll( &msgs->peer->have );
1510                updatePeerProgress( msgs );
1511            } else {
1512                fireError( msgs, EMSGSIZE );
1513                return READ_ERR;
1514            }
1515            break;
1516
1517        case BT_FEXT_HAVE_NONE:
1518            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1519            if( fext ) {
1520                tr_bitsetSetHaveNone( &msgs->peer->have );
1521                updatePeerProgress( msgs );
1522            } else {
1523                fireError( msgs, EMSGSIZE );
1524                return READ_ERR;
1525            }
1526            break;
1527
1528        case BT_FEXT_REJECT:
1529        {
1530            struct peer_request r;
1531            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1532            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1533            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1534            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1535            if( fext )
1536                fireGotRej( msgs, &r );
1537            else {
1538                fireError( msgs, EMSGSIZE );
1539                return READ_ERR;
1540            }
1541            break;
1542        }
1543
1544        case BT_LTEP:
1545            dbgmsg( msgs, "Got a BT_LTEP" );
1546            parseLtep( msgs, msglen, inbuf );
1547            break;
1548
1549        default:
1550            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1551            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1552            break;
1553    }
1554
1555    assert( msglen + 1 == msgs->incoming.length );
1556    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1557
1558    msgs->state = AWAITING_BT_LENGTH;
1559    return READ_NOW;
1560}
1561
1562static void
1563addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1564{
1565    if( !msgs->peer->blame )
1566         msgs->peer->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1567    tr_bitfieldAdd( msgs->peer->blame, index );
1568}
1569
1570/* returns 0 on success, or an errno on failure */
1571static int
1572clientGotBlock( tr_peermsgs *               msgs,
1573                const uint8_t *             data,
1574                const struct peer_request * req )
1575{
1576    int err;
1577    tr_torrent * tor = msgs->torrent;
1578    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1579
1580    assert( msgs );
1581    assert( req );
1582
1583    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1584        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1585                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1586        return EMSGSIZE;
1587    }
1588
1589    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1590
1591    if( !tr_peerMgrDidPeerRequest( msgs->torrent, msgs->peer, block ) ) {
1592        dbgmsg( msgs, "we didn't ask for this message..." );
1593        return 0;
1594    }
1595    if( tr_cpPieceIsComplete( &msgs->torrent->completion, req->index ) ) {
1596        dbgmsg( msgs, "we did ask for this message, but the piece is already complete..." );
1597        return 0;
1598    }
1599
1600    /**
1601    ***  Save the block
1602    **/
1603
1604    if(( err = tr_cacheWriteBlock( getSession(msgs)->cache, tor, req->index, req->offset, req->length, data )))
1605        return err;
1606
1607    addPeerToBlamefield( msgs, req->index );
1608    fireGotBlock( msgs, req );
1609    return 0;
1610}
1611
1612static int peerPulse( void * vmsgs );
1613
1614static void
1615didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1616{
1617    tr_peermsgs * msgs = vmsgs;
1618    firePeerGotData( msgs, bytesWritten, wasPieceData );
1619
1620    if ( tr_isPeerIo( io ) && io->userData )
1621        peerPulse( msgs );
1622}
1623
1624static ReadState
1625canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1626{
1627    ReadState         ret;
1628    tr_peermsgs *     msgs = vmsgs;
1629    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1630    const size_t      inlen = EVBUFFER_LENGTH( in );
1631
1632    dbgmsg( msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state );
1633
1634    if( !inlen )
1635    {
1636        ret = READ_LATER;
1637    }
1638    else if( msgs->state == AWAITING_BT_PIECE )
1639    {
1640        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1641    }
1642    else switch( msgs->state )
1643    {
1644        case AWAITING_BT_LENGTH:
1645            ret = readBtLength ( msgs, in, inlen ); break;
1646
1647        case AWAITING_BT_ID:
1648            ret = readBtId     ( msgs, in, inlen ); break;
1649
1650        case AWAITING_BT_MESSAGE:
1651            ret = readBtMessage( msgs, in, inlen ); break;
1652
1653        default:
1654            ret = READ_ERR;
1655            assert( 0 );
1656    }
1657
1658    dbgmsg( msgs, "canRead: ret is %d", (int)ret );
1659
1660    /* log the raw data that was read */
1661    if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
1662        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1663
1664    return ret;
1665}
1666
1667int
1668tr_peerMsgsIsReadingBlock( const tr_peermsgs * msgs, tr_block_index_t block )
1669{
1670    if( msgs->state != AWAITING_BT_PIECE )
1671        return FALSE;
1672
1673    return block == _tr_block( msgs->torrent,
1674                               msgs->incoming.blockReq.index,
1675                               msgs->incoming.blockReq.offset );
1676}
1677
1678/**
1679***
1680**/
1681
1682static void
1683updateDesiredRequestCount( tr_peermsgs * msgs, uint64_t now )
1684{
1685    const tr_torrent * const torrent = msgs->torrent;
1686
1687    if( tr_torrentIsSeed( msgs->torrent ) )
1688    {
1689        msgs->desiredRequestCount = 0;
1690    }
1691    else if( msgs->peer->clientIsChoked )
1692    {
1693        msgs->desiredRequestCount = 0;
1694    }
1695    else if( !msgs->peer->clientIsInterested )
1696    {
1697        msgs->desiredRequestCount = 0;
1698    }
1699    else
1700    {
1701        int irate;
1702        int estimatedBlocksInPeriod;
1703        double rate;
1704        const int floor = 4;
1705        const int seconds = REQUEST_BUF_SECS;
1706
1707        /* Get the rate limit we should use.
1708         * FIXME: this needs to consider all the other peers as well... */
1709        rate = tr_peerGetPieceSpeed( msgs->peer, now, TR_PEER_TO_CLIENT );
1710        if( tr_torrentUsesSpeedLimit( torrent, TR_PEER_TO_CLIENT ) )
1711            rate = MIN( rate, tr_torrentGetSpeedLimit( torrent, TR_PEER_TO_CLIENT ) );
1712
1713        /* honor the session limits, if enabled */
1714        if( tr_torrentUsesSessionLimits( torrent ) )
1715            if( tr_sessionGetActiveSpeedLimit( torrent->session, TR_PEER_TO_CLIENT, &irate ) )
1716                rate = MIN( rate, irate );
1717
1718        /* use this desired rate to figure out how
1719         * many requests we should send to this peer */
1720        estimatedBlocksInPeriod = ( rate * seconds * 1024 ) / torrent->blockSize;
1721        msgs->desiredRequestCount = MAX( floor, estimatedBlocksInPeriod );
1722
1723        /* honor the peer's maximum request count, if specified */
1724        if( msgs->reqq > 0 )
1725            if( msgs->desiredRequestCount > msgs->reqq )
1726                msgs->desiredRequestCount = msgs->reqq;
1727    }
1728}
1729
1730static void
1731updateMetadataRequests( tr_peermsgs * msgs, time_t now )
1732{
1733    int piece;
1734
1735    if( msgs->peerSupportsMetadataXfer
1736        && tr_torrentGetNextMetadataRequest( msgs->torrent, now, &piece ) )
1737    {
1738        tr_benc tmp;
1739        int payloadLen;
1740        char * payload;
1741        tr_peerIo  * io  = msgs->peer->io;
1742        struct evbuffer * out = msgs->outMessages;
1743
1744        /* build the data message */
1745        tr_bencInitDict( &tmp, 3 );
1746        tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REQUEST );
1747        tr_bencDictAddInt( &tmp, "piece", piece );
1748        payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1749        tr_bencFree( &tmp );
1750
1751        dbgmsg( msgs, "requesting metadata piece #%d", piece );
1752
1753        /* write it out as a LTEP message to our outMessages buffer */
1754        tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + payloadLen );
1755        tr_peerIoWriteUint8 ( io, out, BT_LTEP );
1756        tr_peerIoWriteUint8 ( io, out, msgs->ut_metadata_id );
1757        tr_peerIoWriteBytes ( io, out, payload, payloadLen );
1758        pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1759        dbgOutMessageLen( msgs );
1760
1761        tr_free( payload );
1762    }
1763}
1764
1765static void
1766updateBlockRequests( tr_peermsgs * msgs )
1767{
1768    if( tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT )
1769        && ( msgs->desiredRequestCount > 0 )
1770        && ( msgs->peer->pendingReqsToPeer <= ( msgs->desiredRequestCount * 0.66 ) ) )
1771    {
1772        int i;
1773        int n;
1774        const int numwant = msgs->desiredRequestCount - msgs->peer->pendingReqsToPeer;
1775        tr_block_index_t * blocks = tr_new( tr_block_index_t, numwant );
1776
1777        tr_peerMgrGetNextRequests( msgs->torrent, msgs->peer, numwant, blocks, &n );
1778
1779        for( i=0; i<n; ++i )
1780        {
1781            struct peer_request req;
1782            blockToReq( msgs->torrent, blocks[i], &req );
1783            protocolSendRequest( msgs, &req );
1784        }
1785
1786        tr_free( blocks );
1787    }
1788}
1789
1790static size_t
1791fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1792{
1793    int piece;
1794    size_t bytesWritten = 0;
1795    struct peer_request req;
1796    const tr_bool haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1797    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1798
1799    /**
1800    ***  Protocol messages
1801    **/
1802
1803    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1804    {
1805        dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1806        msgs->outMessagesBatchedAt = now;
1807    }
1808    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1809    {
1810        const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
1811        /* flush the protocol messages */
1812        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1813        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, FALSE );
1814        msgs->clientSentAnythingAt = now;
1815        msgs->outMessagesBatchedAt = 0;
1816        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1817        bytesWritten +=  len;
1818    }
1819
1820    /**
1821    ***  Metadata Pieces
1822    **/
1823
1824    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= METADATA_PIECE_SIZE )
1825        && popNextMetadataRequest( msgs, &piece ) )
1826    {
1827        char * data;
1828        int dataLen;
1829        tr_bool ok = FALSE;
1830
1831        data = tr_torrentGetMetadataPiece( msgs->torrent, piece, &dataLen );
1832        if( ( dataLen > 0 ) && ( data != NULL ) )
1833        {
1834            tr_benc tmp;
1835            int payloadLen;
1836            char * payload;
1837            tr_peerIo  * io  = msgs->peer->io;
1838            struct evbuffer * out = msgs->outMessages;
1839
1840            /* build the data message */
1841            tr_bencInitDict( &tmp, 3 );
1842            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_DATA );
1843            tr_bencDictAddInt( &tmp, "piece", piece );
1844            tr_bencDictAddInt( &tmp, "total_size", msgs->torrent->infoDictLength );
1845            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1846            tr_bencFree( &tmp );
1847
1848            /* write it out as a LTEP message to our outMessages buffer */
1849            tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + payloadLen + dataLen );
1850            tr_peerIoWriteUint8 ( io, out, BT_LTEP );
1851            tr_peerIoWriteUint8 ( io, out, msgs->ut_metadata_id );
1852            tr_peerIoWriteBytes ( io, out, payload, payloadLen );
1853            tr_peerIoWriteBytes ( io, out, data, dataLen );
1854            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1855            dbgOutMessageLen( msgs );
1856
1857            tr_free( payload );
1858            tr_free( data );
1859
1860            ok = TRUE;
1861        }
1862
1863        if( !ok ) /* send a rejection message */
1864        {
1865            tr_benc tmp;
1866            int payloadLen;
1867            char * payload;
1868            tr_peerIo  * io  = msgs->peer->io;
1869            struct evbuffer * out = msgs->outMessages;
1870
1871            /* build the rejection message */
1872            tr_bencInitDict( &tmp, 2 );
1873            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1874            tr_bencDictAddInt( &tmp, "piece", piece );
1875            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1876            tr_bencFree( &tmp );
1877
1878            /* write it out as a LTEP message to our outMessages buffer */
1879            tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + payloadLen );
1880            tr_peerIoWriteUint8 ( io, out, BT_LTEP );
1881            tr_peerIoWriteUint8 ( io, out, msgs->ut_metadata_id );
1882            tr_peerIoWriteBytes ( io, out, payload, payloadLen );
1883            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1884            dbgOutMessageLen( msgs );
1885
1886            tr_free( payload );
1887        }
1888    }
1889
1890    /**
1891    ***  Data Blocks
1892    **/
1893
1894    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= msgs->torrent->blockSize )
1895        && popNextRequest( msgs, &req ) )
1896    {
1897        --msgs->prefetchCount;
1898
1899        if( requestIsValid( msgs, &req )
1900            && tr_cpPieceIsComplete( &msgs->torrent->completion, req.index ) )
1901        {
1902            /* FIXME(libevent2) use evbuffer_reserve_space() + evbuffer_commit_space() */
1903            int err;
1904            const uint32_t msglen = 4 + 1 + 4 + 4 + req.length;
1905            struct evbuffer * out;
1906            tr_peerIo * io = msgs->peer->io;
1907
1908            out = evbuffer_new( );
1909            evbuffer_expand( out, msglen );
1910
1911            tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1912            tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1913            tr_peerIoWriteUint32( io, out, req.index );
1914            tr_peerIoWriteUint32( io, out, req.offset );
1915
1916            err = tr_cacheReadBlock( getSession(msgs)->cache, msgs->torrent, req.index, req.offset, req.length, EVBUFFER_DATA(out)+EVBUFFER_LENGTH(out) );
1917            if( err )
1918            {
1919                if( fext )
1920                    protocolSendReject( msgs, &req );
1921            }
1922            else
1923            {
1924                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1925                EVBUFFER_LENGTH(out) += req.length;
1926                assert( EVBUFFER_LENGTH( out ) == msglen );
1927                tr_peerIoWriteBuf( io, out, TRUE );
1928                bytesWritten += EVBUFFER_LENGTH( out );
1929                msgs->clientSentAnythingAt = now;
1930                tr_historyAdd( msgs->peer->blocksSentToPeer, tr_date( ), 1 );
1931            }
1932
1933            evbuffer_free( out );
1934
1935            if( err )
1936            {
1937                bytesWritten = 0;
1938                msgs = NULL;
1939            }
1940        }
1941        else if( fext ) /* peer needs a reject message */
1942        {
1943            protocolSendReject( msgs, &req );
1944        }
1945
1946        if( msgs != NULL )
1947            prefetchPieces( msgs );
1948    }
1949
1950    /**
1951    ***  Keepalive
1952    **/
1953
1954    if( ( msgs != NULL )
1955        && ( msgs->clientSentAnythingAt != 0 )
1956        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1957    {
1958        dbgmsg( msgs, "sending a keepalive message" );
1959        tr_peerIoWriteUint32( msgs->peer->io, msgs->outMessages, 0 );
1960        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1961    }
1962
1963    return bytesWritten;
1964}
1965
1966static int
1967peerPulse( void * vmsgs )
1968{
1969    tr_peermsgs * msgs = vmsgs;
1970    const time_t  now = tr_time( );
1971
1972    if ( tr_isPeerIo( msgs->peer->io ) ) {
1973        updateDesiredRequestCount( msgs, tr_date( ) );
1974        updateBlockRequests( msgs );
1975        updateMetadataRequests( msgs, now );
1976    }
1977
1978    for( ;; )
1979        if( fillOutputBuffer( msgs, now ) < 1 )
1980            break;
1981
1982    return TRUE; /* loop forever */
1983}
1984
1985void
1986tr_peerMsgsPulse( tr_peermsgs * msgs )
1987{
1988    if( msgs != NULL )
1989        peerPulse( msgs );
1990}
1991
1992static void
1993gotError( tr_peerIo  * io UNUSED,
1994          short        what,
1995          void       * vmsgs )
1996{
1997    if( what & EVBUFFER_TIMEOUT )
1998        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1999    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
2000        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
2001               what, errno, tr_strerror( errno ) );
2002    fireError( vmsgs, ENOTCONN );
2003}
2004
2005static void
2006sendBitfield( tr_peermsgs * msgs )
2007{
2008    struct evbuffer * out = msgs->outMessages;
2009    tr_bitfield *     field;
2010    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
2011    size_t            i;
2012    size_t            lazyCount = 0;
2013
2014    field = tr_bitfieldDup( tr_cpPieceBitfield( &msgs->torrent->completion ) );
2015
2016    if( tr_sessionIsLazyBitfieldEnabled( getSession( msgs ) ) )
2017    {
2018        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
2019            speed over a truly random sample -- let's limit the pool size to
2020            the first 1000 pieces so large torrents don't bog things down */
2021        size_t poolSize;
2022        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
2023        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
2024
2025        /* build the pool */
2026        for( i=poolSize=0; i<maxPoolSize; ++i )
2027            if( tr_bitfieldHas( field, i ) )
2028                pool[poolSize++] = i;
2029
2030        /* pull random piece indices from the pool */
2031        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
2032        {
2033            const int pos = tr_cryptoWeakRandInt( poolSize );
2034            const tr_piece_index_t piece = pool[pos];
2035            tr_bitfieldRem( field, piece );
2036            lazyPieces[lazyCount++] = piece;
2037            pool[pos] = pool[--poolSize];
2038        }
2039
2040        /* cleanup */
2041        tr_free( pool );
2042    }
2043
2044    tr_peerIoWriteUint32( msgs->peer->io, out,
2045                          sizeof( uint8_t ) + field->byteCount );
2046    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_BITFIELD );
2047    /* FIXME(libevent2): use evbuffer_add_reference() */
2048    tr_peerIoWriteBytes ( msgs->peer->io, out, field->bits, field->byteCount );
2049    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu",
2050            EVBUFFER_LENGTH( out ) );
2051    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
2052
2053    for( i = 0; i < lazyCount; ++i )
2054        protocolSendHave( msgs, lazyPieces[i] );
2055
2056    tr_bitfieldFree( field );
2057}
2058
2059static void
2060tellPeerWhatWeHave( tr_peermsgs * msgs )
2061{
2062    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
2063
2064    if( fext && ( tr_cpGetStatus( &msgs->torrent->completion ) == TR_SEED ) )
2065    {
2066        protocolSendHaveAll( msgs );
2067    }
2068    else if( fext && ( tr_cpHaveValid( &msgs->torrent->completion ) == 0 ) )
2069    {
2070        protocolSendHaveNone( msgs );
2071    }
2072    else
2073    {
2074        sendBitfield( msgs );
2075    }
2076}
2077
2078/**
2079***
2080**/
2081
2082/* some peers give us error messages if we send
2083   more than this many peers in a single pex message
2084   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
2085#define MAX_PEX_ADDED 50
2086#define MAX_PEX_DROPPED 50
2087
2088typedef struct
2089{
2090    tr_pex *  added;
2091    tr_pex *  dropped;
2092    tr_pex *  elements;
2093    int       addedCount;
2094    int       droppedCount;
2095    int       elementCount;
2096}
2097PexDiffs;
2098
2099static void
2100pexAddedCb( void * vpex,
2101            void * userData )
2102{
2103    PexDiffs * diffs = userData;
2104    tr_pex *   pex = vpex;
2105
2106    if( diffs->addedCount < MAX_PEX_ADDED )
2107    {
2108        diffs->added[diffs->addedCount++] = *pex;
2109        diffs->elements[diffs->elementCount++] = *pex;
2110    }
2111}
2112
2113static inline void
2114pexDroppedCb( void * vpex,
2115              void * userData )
2116{
2117    PexDiffs * diffs = userData;
2118    tr_pex *   pex = vpex;
2119
2120    if( diffs->droppedCount < MAX_PEX_DROPPED )
2121    {
2122        diffs->dropped[diffs->droppedCount++] = *pex;
2123    }
2124}
2125
2126static inline void
2127pexElementCb( void * vpex,
2128              void * userData )
2129{
2130    PexDiffs * diffs = userData;
2131    tr_pex * pex = vpex;
2132
2133    diffs->elements[diffs->elementCount++] = *pex;
2134}
2135
2136static void
2137sendPex( tr_peermsgs * msgs )
2138{
2139    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2140    {
2141        PexDiffs diffs;
2142        PexDiffs diffs6;
2143        tr_pex * newPex = NULL;
2144        tr_pex * newPex6 = NULL;
2145        const int newCount = tr_peerMgrGetPeers( msgs->torrent, &newPex, TR_AF_INET, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2146        const int newCount6 = tr_peerMgrGetPeers( msgs->torrent, &newPex6, TR_AF_INET6, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2147
2148        /* build the diffs */
2149        diffs.added = tr_new( tr_pex, newCount );
2150        diffs.addedCount = 0;
2151        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2152        diffs.droppedCount = 0;
2153        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2154        diffs.elementCount = 0;
2155        tr_set_compare( msgs->pex, msgs->pexCount,
2156                        newPex, newCount,
2157                        tr_pexCompare, sizeof( tr_pex ),
2158                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2159        diffs6.added = tr_new( tr_pex, newCount6 );
2160        diffs6.addedCount = 0;
2161        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
2162        diffs6.droppedCount = 0;
2163        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
2164        diffs6.elementCount = 0;
2165        tr_set_compare( msgs->pex6, msgs->pexCount6,
2166                        newPex6, newCount6,
2167                        tr_pexCompare, sizeof( tr_pex ),
2168                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
2169        dbgmsg(
2170            msgs,
2171            "pex: old peer count %d+%d, new peer count %d+%d, "
2172            "added %d+%d, removed %d+%d",
2173            msgs->pexCount, msgs->pexCount6, newCount, newCount6,
2174            diffs.addedCount, diffs6.addedCount,
2175            diffs.droppedCount, diffs6.droppedCount );
2176
2177        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2178            !diffs6.droppedCount )
2179        {
2180            tr_free( diffs.elements );
2181            tr_free( diffs6.elements );
2182        }
2183        else
2184        {
2185            int  i;
2186            tr_benc val;
2187            char * benc;
2188            int bencLen;
2189            uint8_t * tmp, *walk;
2190            tr_peerIo       * io  = msgs->peer->io;
2191            struct evbuffer * out = msgs->outMessages;
2192
2193            /* update peer */
2194            tr_free( msgs->pex );
2195            msgs->pex = diffs.elements;
2196            msgs->pexCount = diffs.elementCount;
2197            tr_free( msgs->pex6 );
2198            msgs->pex6 = diffs6.elements;
2199            msgs->pexCount6 = diffs6.elementCount;
2200
2201            /* build the pex payload */
2202            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
2203                                         * speed vs. likelihood? */
2204
2205            if( diffs.addedCount > 0)
2206            {
2207                /* "added" */
2208                tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2209                for( i = 0; i < diffs.addedCount; ++i ) {
2210                    memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2211                    memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2212                }
2213                assert( ( walk - tmp ) == diffs.addedCount * 6 );
2214                tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2215                tr_free( tmp );
2216
2217                /* "added.f" */
2218                tmp = walk = tr_new( uint8_t, diffs.addedCount );
2219                for( i = 0; i < diffs.addedCount; ++i )
2220                    *walk++ = diffs.added[i].flags;
2221                assert( ( walk - tmp ) == diffs.addedCount );
2222                tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2223                tr_free( tmp );
2224            }
2225
2226            if( diffs.droppedCount > 0 )
2227            {
2228                /* "dropped" */
2229                tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2230                for( i = 0; i < diffs.droppedCount; ++i ) {
2231                    memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2232                    memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2233                }
2234                assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2235                tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2236                tr_free( tmp );
2237            }
2238
2239            if( diffs6.addedCount > 0 )
2240            {
2241                /* "added6" */
2242                tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2243                for( i = 0; i < diffs6.addedCount; ++i ) {
2244                    memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2245                    walk += 16;
2246                    memcpy( walk, &diffs6.added[i].port, 2 );
2247                    walk += 2;
2248                }
2249                assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2250                tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2251                tr_free( tmp );
2252
2253                /* "added6.f" */
2254                tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2255                for( i = 0; i < diffs6.addedCount; ++i )
2256                    *walk++ = diffs6.added[i].flags;
2257                assert( ( walk - tmp ) == diffs6.addedCount );
2258                tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2259                tr_free( tmp );
2260            }
2261
2262            if( diffs6.droppedCount > 0 )
2263            {
2264                /* "dropped6" */
2265                tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2266                for( i = 0; i < diffs6.droppedCount; ++i ) {
2267                    memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2268                    walk += 16;
2269                    memcpy( walk, &diffs6.dropped[i].port, 2 );
2270                    walk += 2;
2271                }
2272                assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2273                tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2274                tr_free( tmp );
2275            }
2276
2277            /* write the pex message */
2278            benc = tr_bencToStr( &val, TR_FMT_BENC, &bencLen );
2279            tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + bencLen );
2280            tr_peerIoWriteUint8 ( io, out, BT_LTEP );
2281            tr_peerIoWriteUint8 ( io, out, msgs->ut_pex_id );
2282            tr_peerIoWriteBytes ( io, out, benc, bencLen );
2283            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2284            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
2285            dbgOutMessageLen( msgs );
2286
2287            tr_free( benc );
2288            tr_bencFree( &val );
2289        }
2290
2291        /* cleanup */
2292        tr_free( diffs.added );
2293        tr_free( diffs.dropped );
2294        tr_free( newPex );
2295        tr_free( diffs6.added );
2296        tr_free( diffs6.dropped );
2297        tr_free( newPex6 );
2298
2299        /*msgs->clientSentPexAt = tr_time( );*/
2300    }
2301}
2302
2303static void
2304pexPulse( int foo UNUSED, short bar UNUSED, void * vmsgs )
2305{
2306    struct tr_peermsgs * msgs = vmsgs;
2307
2308    sendPex( msgs );
2309
2310    tr_timerAdd( &msgs->pexTimer, PEX_INTERVAL_SECS, 0 );
2311}
2312
2313/**
2314***
2315**/
2316
2317tr_peermsgs*
2318tr_peerMsgsNew( struct tr_torrent * torrent,
2319                struct tr_peer    * peer,
2320                tr_delivery_func    func,
2321                void              * userData,
2322                tr_publisher_tag  * setme )
2323{
2324    tr_peermsgs * m;
2325
2326    assert( peer );
2327    assert( peer->io );
2328
2329    m = tr_new0( tr_peermsgs, 1 );
2330    m->publisher = TR_PUBLISHER_INIT;
2331    m->peer = peer;
2332    m->torrent = torrent;
2333    m->peer->clientIsChoked = 1;
2334    m->peer->peerIsChoked = 1;
2335    m->peer->clientIsInterested = 0;
2336    m->peer->peerIsInterested = 0;
2337    m->state = AWAITING_BT_LENGTH;
2338    m->outMessages = evbuffer_new( );
2339    m->outMessagesBatchedAt = 0;
2340    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2341    m->incoming.block = evbuffer_new( );
2342    evtimer_set( &m->pexTimer, pexPulse, m );
2343    tr_timerAdd( &m->pexTimer, PEX_INTERVAL_SECS, 0 );
2344    peer->msgs = m;
2345
2346    *setme = tr_publisherSubscribe( &m->publisher, func, userData );
2347
2348    if( tr_peerIoSupportsLTEP( peer->io ) )
2349        sendLtepHandshake( m );
2350
2351    tellPeerWhatWeHave( m );
2352
2353    if( tr_dhtEnabled( torrent->session ) && tr_peerIoSupportsDHT( peer->io ))
2354    {
2355        /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
2356        const struct tr_address *addr = tr_peerIoGetAddress( peer->io, NULL );
2357        if( addr->type == TR_AF_INET || tr_globalIPv6() ) {
2358            protocolSendPort( m, tr_dhtPort( torrent->session ) );
2359        }
2360    }
2361
2362    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2363    updateDesiredRequestCount( m, tr_date( ) );
2364
2365    return m;
2366}
2367
2368void
2369tr_peerMsgsFree( tr_peermsgs* msgs )
2370{
2371    if( msgs )
2372    {
2373        evtimer_del( &msgs->pexTimer );
2374        tr_publisherDestruct( &msgs->publisher );
2375
2376        evbuffer_free( msgs->incoming.block );
2377        evbuffer_free( msgs->outMessages );
2378        tr_free( msgs->pex6 );
2379        tr_free( msgs->pex );
2380
2381        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2382        tr_free( msgs );
2383    }
2384}
2385
2386void
2387tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2388                        tr_publisher_tag tag )
2389{
2390    tr_publisherUnsubscribe( &peer->publisher, tag );
2391}
Note: See TracBrowser for help on using the repository browser.