source: trunk/libtransmission/peer-msgs.c @ 10302

Last change on this file since 10302 was 10302, checked in by charles, 12 years ago

(trunk libT) "don't cancel requests for blocks that we're downloading from slow peers" -- fixed in trunk for 1.92

  • Property svn:keywords set to Date Rev Author Id
File size: 71.3 KB
Line 
1/*
2 * This file Copyright (C) 2007-2010 Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 10302 2010-03-06 14:56:15Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdarg.h>
17#include <stdio.h>
18#include <stdlib.h>
19#include <string.h>
20
21#include <event.h>
22
23#include "transmission.h"
24#include "bencode.h"
25#include "completion.h"
26#include "crypto.h"
27#include "inout.h"
28#ifdef WIN32
29#include "net.h" /* for ECONN */
30#endif
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-msgs.h"
34#include "session.h"
35#include "stats.h"
36#include "torrent.h"
37#include "torrent-magnet.h"
38#include "tr-dht.h"
39#include "utils.h"
40#include "version.h"
41
42/**
43***
44**/
45
46enum
47{
48    BT_CHOKE                = 0,
49    BT_UNCHOKE              = 1,
50    BT_INTERESTED           = 2,
51    BT_NOT_INTERESTED       = 3,
52    BT_HAVE                 = 4,
53    BT_BITFIELD             = 5,
54    BT_REQUEST              = 6,
55    BT_PIECE                = 7,
56    BT_CANCEL               = 8,
57    BT_PORT                 = 9,
58
59    BT_FEXT_SUGGEST         = 13,
60    BT_FEXT_HAVE_ALL        = 14,
61    BT_FEXT_HAVE_NONE       = 15,
62    BT_FEXT_REJECT          = 16,
63    BT_FEXT_ALLOWED_FAST    = 17,
64
65    BT_LTEP                 = 20,
66
67    LTEP_HANDSHAKE          = 0,
68
69    UT_PEX_ID               = 1,
70    UT_METADATA_ID          = 3,
71
72    MAX_PEX_PEER_COUNT      = 50,
73
74    MIN_CHOKE_PERIOD_SEC    = 10,
75
76    /* idle seconds before we send a keepalive */
77    KEEPALIVE_INTERVAL_SECS = 100,
78
79    PEX_INTERVAL_SECS       = 90, /* sec between sendPex() calls */
80
81    REQQ                    = 512,
82
83    METADATA_REQQ           = 64,
84
85    MAX_BLOCK_SIZE          = ( 1024 * 16 ),
86
87    /* used in lowering the outMessages queue period */
88    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
89    HIGH_PRIORITY_INTERVAL_SECS = 2,
90    LOW_PRIORITY_INTERVAL_SECS = 10,
91
92    /* number of pieces to remove from the bitfield when
93     * lazy bitfields are turned on */
94    LAZY_PIECE_COUNT = 26,
95
96    /* number of pieces we'll allow in our fast set */
97    MAX_FAST_SET_SIZE = 3,
98
99    /* defined in BEP #9 */
100    METADATA_MSG_TYPE_REQUEST = 0,
101    METADATA_MSG_TYPE_DATA = 1,
102    METADATA_MSG_TYPE_REJECT = 2
103};
104
105enum
106{
107    AWAITING_BT_LENGTH,
108    AWAITING_BT_ID,
109    AWAITING_BT_MESSAGE,
110    AWAITING_BT_PIECE
111};
112
113/**
114***
115**/
116
117struct peer_request
118{
119    uint32_t    index;
120    uint32_t    offset;
121    uint32_t    length;
122};
123
124static uint32_t
125getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
126{
127    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
128    const uint64_t blockPos = tor->blockSize * b;
129    assert( blockPos >= piecePos );
130    return (uint32_t)( blockPos - piecePos );
131}
132
133static void
134blockToReq( const tr_torrent     * tor,
135            tr_block_index_t       block,
136            struct peer_request  * setme )
137{
138    assert( setme != NULL );
139
140    setme->index = tr_torBlockPiece( tor, block );
141    setme->offset = getBlockOffsetInPiece( tor, block );
142    setme->length = tr_torBlockCountBytes( tor, block );
143}
144
145/**
146***
147**/
148
149/* this is raw, unchanged data from the peer regarding
150 * the current message that it's sending us. */
151struct tr_incoming
152{
153    uint8_t                id;
154    uint32_t               length; /* includes the +1 for id length */
155    struct peer_request    blockReq; /* metadata for incoming blocks */
156    struct evbuffer *      block; /* piece data for incoming blocks */
157};
158
159/**
160 * Low-level communication state information about a connected peer.
161 *
162 * This structure remembers the low-level protocol states that we're
163 * in with this peer, such as active requests, pex messages, and so on.
164 * Its fields are all private to peer-msgs.c.
165 *
166 * Data not directly involved with sending & receiving messages is
167 * stored in tr_peer, where it can be accessed by both peermsgs and
168 * the peer manager.
169 *
170 * @see struct peer_atom
171 * @see tr_peer
172 */
173struct tr_peermsgs
174{
175    tr_bool         peerSupportsPex;
176    tr_bool         peerSupportsMetadataXfer;
177    tr_bool         clientSentLtepHandshake;
178    tr_bool         peerSentLtepHandshake;
179
180    /*tr_bool         haveFastSet;*/
181
182    int             desiredRequestCount;
183
184    int             prefetchCount;
185
186    /* how long the outMessages batch should be allowed to grow before
187     * it's flushed -- some messages (like requests >:) should be sent
188     * very quickly; others aren't as urgent. */
189    int8_t          outMessagesBatchPeriod;
190
191    uint8_t         state;
192    uint8_t         ut_pex_id;
193    uint8_t         ut_metadata_id;
194    uint16_t        pexCount;
195    uint16_t        pexCount6;
196
197#if 0
198    size_t                 fastsetSize;
199    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
200#endif
201
202    tr_peer *              peer;
203
204    tr_torrent *           torrent;
205
206    tr_publisher           publisher;
207
208    struct evbuffer *      outMessages; /* all the non-piece messages */
209
210    struct peer_request    peerAskedFor[REQQ];
211
212    int                    peerAskedForMetadata[METADATA_REQQ];
213    int                    peerAskedForMetadataCount;
214
215    tr_pex               * pex;
216    tr_pex               * pex6;
217
218    /*time_t                 clientSentPexAt;*/
219    time_t                 clientSentAnythingAt;
220
221    /* when we started batching the outMessages */
222    time_t                outMessagesBatchedAt;
223
224    struct tr_incoming    incoming;
225
226    /* if the peer supports the Extension Protocol in BEP 10 and
227       supplied a reqq argument, it's stored here.  otherwise the
228       value is zero and should be ignored. */
229    int64_t               reqq;
230
231    struct event          pexTimer;
232};
233
234/**
235***
236**/
237
238#if 0
239static tr_bitfield*
240getHave( const struct tr_peermsgs * msgs )
241{
242    if( msgs->peer->have == NULL )
243        msgs->peer->have = tr_bitfieldNew( msgs->torrent->info.pieceCount );
244    return msgs->peer->have;
245}
246#endif
247
248static inline tr_session*
249getSession( struct tr_peermsgs * msgs )
250{
251    return msgs->torrent->session;
252}
253
254/**
255***
256**/
257
258static void
259myDebug( const char * file, int line,
260         const struct tr_peermsgs * msgs,
261         const char * fmt, ... )
262{
263    FILE * fp = tr_getLog( );
264
265    if( fp )
266    {
267        va_list           args;
268        char              timestr[64];
269        struct evbuffer * buf = evbuffer_new( );
270        char *            base = tr_basename( file );
271
272        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
273                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
274                             tr_torrentName( msgs->torrent ),
275                             tr_peerIoGetAddrStr( msgs->peer->io ),
276                             msgs->peer->client );
277        va_start( args, fmt );
278        evbuffer_add_vprintf( buf, fmt, args );
279        va_end( args );
280        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
281        /* FIXME(libevent2) tr_getLog() should return an fd, then use evbuffer_write() here */
282        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
283
284        tr_free( base );
285        evbuffer_free( buf );
286    }
287}
288
289#define dbgmsg( msgs, ... ) \
290    do { \
291        if( tr_deepLoggingIsActive( ) ) \
292            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
293    } while( 0 )
294
295/**
296***
297**/
298
299static void
300pokeBatchPeriod( tr_peermsgs * msgs,
301                 int           interval )
302{
303    if( msgs->outMessagesBatchPeriod > interval )
304    {
305        msgs->outMessagesBatchPeriod = interval;
306        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
307    }
308}
309
310static inline void
311dbgOutMessageLen( tr_peermsgs * msgs )
312{
313    dbgmsg( msgs, "outMessage size is now %zu", EVBUFFER_LENGTH( msgs->outMessages ) );
314}
315
316static void
317protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
318{
319    tr_peerIo       * io  = msgs->peer->io;
320    struct evbuffer * out = msgs->outMessages;
321
322    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
323
324    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
325    tr_peerIoWriteUint8 ( io, out, BT_FEXT_REJECT );
326    tr_peerIoWriteUint32( io, out, req->index );
327    tr_peerIoWriteUint32( io, out, req->offset );
328    tr_peerIoWriteUint32( io, out, req->length );
329
330    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
331    dbgOutMessageLen( msgs );
332}
333
334static void
335protocolSendRequest( tr_peermsgs               * msgs,
336                     const struct peer_request * req )
337{
338    tr_peerIo       * io  = msgs->peer->io;
339    struct evbuffer * out = msgs->outMessages;
340
341    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
342    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
343    tr_peerIoWriteUint32( io, out, req->index );
344    tr_peerIoWriteUint32( io, out, req->offset );
345    tr_peerIoWriteUint32( io, out, req->length );
346
347    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
348    dbgOutMessageLen( msgs );
349    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
350}
351
352static void
353protocolSendCancel( tr_peermsgs               * msgs,
354                    const struct peer_request * req )
355{
356    tr_peerIo       * io  = msgs->peer->io;
357    struct evbuffer * out = msgs->outMessages;
358
359    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
360    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
361    tr_peerIoWriteUint32( io, out, req->index );
362    tr_peerIoWriteUint32( io, out, req->offset );
363    tr_peerIoWriteUint32( io, out, req->length );
364
365    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
366    dbgOutMessageLen( msgs );
367    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
368}
369
370static void
371protocolSendPort(tr_peermsgs *msgs, uint16_t port)
372{
373    tr_peerIo       * io  = msgs->peer->io;
374    struct evbuffer * out = msgs->outMessages;
375
376    dbgmsg( msgs, "sending Port %u", port);
377    tr_peerIoWriteUint32( io, out, 3 );
378    tr_peerIoWriteUint8 ( io, out, BT_PORT );
379    tr_peerIoWriteUint16( io, out, port);
380}
381
382static void
383protocolSendHave( tr_peermsgs * msgs,
384                  uint32_t      index )
385{
386    tr_peerIo       * io  = msgs->peer->io;
387    struct evbuffer * out = msgs->outMessages;
388
389    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
390    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
391    tr_peerIoWriteUint32( io, out, index );
392
393    dbgmsg( msgs, "sending Have %u", index );
394    dbgOutMessageLen( msgs );
395    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
396}
397
398#if 0
399static void
400protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
401{
402    tr_peerIo       * io  = msgs->peer->io;
403    struct evbuffer * out = msgs->outMessages;
404
405    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
406
407    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
408    tr_peerIoWriteUint8 ( io, out, BT_FEXT_ALLOWED_FAST );
409    tr_peerIoWriteUint32( io, out, pieceIndex );
410
411    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
412    dbgOutMessageLen( msgs );
413}
414#endif
415
416static void
417protocolSendChoke( tr_peermsgs * msgs,
418                   int           choke )
419{
420    tr_peerIo       * io  = msgs->peer->io;
421    struct evbuffer * out = msgs->outMessages;
422
423    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
424    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
425
426    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
427    dbgOutMessageLen( msgs );
428    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
429}
430
431static void
432protocolSendHaveAll( tr_peermsgs * msgs )
433{
434    tr_peerIo       * io  = msgs->peer->io;
435    struct evbuffer * out = msgs->outMessages;
436
437    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
438
439    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
440    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_ALL );
441
442    dbgmsg( msgs, "sending HAVE_ALL..." );
443    dbgOutMessageLen( msgs );
444    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
445}
446
447static void
448protocolSendHaveNone( tr_peermsgs * msgs )
449{
450    tr_peerIo       * io  = msgs->peer->io;
451    struct evbuffer * out = msgs->outMessages;
452
453    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
454
455    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
456    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_NONE );
457
458    dbgmsg( msgs, "sending HAVE_NONE..." );
459    dbgOutMessageLen( msgs );
460    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
461}
462
463/**
464***  EVENTS
465**/
466
467static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0, 0, 0 };
468
469static void
470publish( tr_peermsgs * msgs, tr_peer_event * e )
471{
472    assert( msgs->peer );
473    assert( msgs->peer->msgs == msgs );
474
475    tr_publisherPublish( &msgs->publisher, msgs->peer, e );
476}
477
478static void
479fireError( tr_peermsgs * msgs, int err )
480{
481    tr_peer_event e = blankEvent;
482    e.eventType = TR_PEER_ERROR;
483    e.err = err;
484    publish( msgs, &e );
485}
486
487static void
488fireUploadOnly( tr_peermsgs * msgs, tr_bool uploadOnly )
489{
490    tr_peer_event e = blankEvent;
491    e.eventType = TR_PEER_UPLOAD_ONLY;
492    e.uploadOnly = uploadOnly;
493    publish( msgs, &e );
494}
495
496static void
497firePeerProgress( tr_peermsgs * msgs )
498{
499    tr_peer_event e = blankEvent;
500    e.eventType = TR_PEER_PEER_PROGRESS;
501    e.progress = msgs->peer->progress;
502    publish( msgs, &e );
503}
504
505static void
506fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
507{
508    tr_peer_event e = blankEvent;
509    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
510    e.pieceIndex = req->index;
511    e.offset = req->offset;
512    e.length = req->length;
513    publish( msgs, &e );
514}
515
516static void
517fireGotRej( tr_peermsgs * msgs, const struct peer_request * req )
518{
519    tr_peer_event e = blankEvent;
520    e.eventType = TR_PEER_CLIENT_GOT_REJ;
521    e.pieceIndex = req->index;
522    e.offset = req->offset;
523    e.length = req->length;
524    publish( msgs, &e );
525}
526
527static void
528fireGotChoke( tr_peermsgs * msgs )
529{
530    tr_peer_event e = blankEvent;
531    e.eventType = TR_PEER_CLIENT_GOT_CHOKE;
532    publish( msgs, &e );
533}
534
535static void
536fireClientGotData( tr_peermsgs * msgs,
537                   uint32_t      length,
538                   int           wasPieceData )
539{
540    tr_peer_event e = blankEvent;
541
542    e.length = length;
543    e.eventType = TR_PEER_CLIENT_GOT_DATA;
544    e.wasPieceData = wasPieceData;
545    publish( msgs, &e );
546}
547
548static void
549fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
550{
551    tr_peer_event e = blankEvent;
552    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
553    e.pieceIndex = pieceIndex;
554    publish( msgs, &e );
555}
556
557static void
558fireClientGotPort( tr_peermsgs * msgs, tr_port port )
559{
560    tr_peer_event e = blankEvent;
561    e.eventType = TR_PEER_CLIENT_GOT_PORT;
562    e.port = port;
563    publish( msgs, &e );
564}
565
566static void
567fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
568{
569    tr_peer_event e = blankEvent;
570    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
571    e.pieceIndex = pieceIndex;
572    publish( msgs, &e );
573}
574
575static void
576firePeerGotData( tr_peermsgs  * msgs,
577                 uint32_t       length,
578                 int            wasPieceData )
579{
580    tr_peer_event e = blankEvent;
581
582    e.length = length;
583    e.eventType = TR_PEER_PEER_GOT_DATA;
584    e.wasPieceData = wasPieceData;
585
586    publish( msgs, &e );
587}
588
589/**
590***  ALLOWED FAST SET
591***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
592**/
593
594size_t
595tr_generateAllowedSet( tr_piece_index_t * setmePieces,
596                       size_t             desiredSetSize,
597                       size_t             pieceCount,
598                       const uint8_t    * infohash,
599                       const tr_address * addr )
600{
601    size_t setSize = 0;
602
603    assert( setmePieces );
604    assert( desiredSetSize <= pieceCount );
605    assert( desiredSetSize );
606    assert( pieceCount );
607    assert( infohash );
608    assert( addr );
609
610    if( addr->type == TR_AF_INET )
611    {
612        uint8_t w[SHA_DIGEST_LENGTH + 4], *walk=w;
613        uint8_t x[SHA_DIGEST_LENGTH];
614
615        uint32_t ui32 = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
616        memcpy( w, &ui32, sizeof( uint32_t ) );
617        walk += sizeof( uint32_t );
618        memcpy( walk, infohash, SHA_DIGEST_LENGTH );                 /* (2) */
619        walk += SHA_DIGEST_LENGTH;
620        tr_sha1( x, w, walk-w, NULL );                               /* (3) */
621        assert( sizeof( w ) == walk-w );
622
623        while( setSize<desiredSetSize )
624        {
625            int i;
626            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
627            {
628                size_t k;
629                uint32_t j = i * 4;                                  /* (5) */
630                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
631                uint32_t index = y % pieceCount;                     /* (7) */
632
633                for( k=0; k<setSize; ++k )                           /* (8) */
634                    if( setmePieces[k] == index )
635                        break;
636
637                if( k == setSize )
638                    setmePieces[setSize++] = index;                  /* (9) */
639            }
640
641            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
642        }
643    }
644
645    return setSize;
646}
647
648static void
649updateFastSet( tr_peermsgs * msgs UNUSED )
650{
651#if 0
652    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
653    const int peerIsNeedy = msgs->peer->progress < 0.10;
654
655    if( fext && peerIsNeedy && !msgs->haveFastSet )
656    {
657        size_t i;
658        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
659        const tr_info * inf = &msgs->torrent->info;
660        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
661
662        /* build the fast set */
663        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
664        msgs->haveFastSet = 1;
665
666        /* send it to the peer */
667        for( i=0; i<msgs->fastsetSize; ++i )
668            protocolSendAllowedFast( msgs, msgs->fastset[i] );
669    }
670#endif
671}
672
673/**
674***  INTEREST
675**/
676
677static tr_bool
678isPieceInteresting( const tr_peermsgs * msgs,
679                    tr_piece_index_t    piece )
680{
681    const tr_torrent * torrent = msgs->torrent;
682
683    return ( !torrent->info.pieces[piece].dnd )                  /* we want it */
684          && ( !tr_cpPieceIsComplete( &torrent->completion, piece ) ) /* !have */
685          && ( tr_bitsetHas( &msgs->peer->have, piece ) );      /* peer has it */
686}
687
688/* "interested" means we'll ask for piece data if they unchoke us */
689static tr_bool
690isPeerInteresting( const tr_peermsgs * msgs )
691{
692    tr_piece_index_t    i;
693    const tr_torrent *  torrent;
694    const tr_bitfield * bitfield;
695    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
696
697    if( clientIsSeed )
698        return FALSE;
699
700    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
701        return FALSE;
702
703    torrent = msgs->torrent;
704    bitfield = tr_cpPieceBitfield( &torrent->completion );
705
706    for( i = 0; i < torrent->info.pieceCount; ++i )
707        if( isPieceInteresting( msgs, i ) )
708            return TRUE;
709
710    return FALSE;
711}
712
713static void
714sendInterest( tr_peermsgs * msgs, tr_bool clientIsInterested )
715{
716    struct evbuffer * out = msgs->outMessages;
717
718    assert( msgs );
719    assert( tr_isBool( clientIsInterested ) );
720
721    msgs->peer->clientIsInterested = clientIsInterested;
722    dbgmsg( msgs, "Sending %s", clientIsInterested ? "Interested" : "Not Interested" );
723    tr_peerIoWriteUint32( msgs->peer->io, out, sizeof( uint8_t ) );
724    tr_peerIoWriteUint8 ( msgs->peer->io, out, clientIsInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
725
726    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
727    dbgOutMessageLen( msgs );
728}
729
730static void
731updateInterest( tr_peermsgs * msgs )
732{
733    const int i = isPeerInteresting( msgs );
734
735    if( i != msgs->peer->clientIsInterested )
736        sendInterest( msgs, i );
737}
738
739static tr_bool
740popNextMetadataRequest( tr_peermsgs * msgs, int * piece )
741{
742    if( msgs->peerAskedForMetadataCount == 0 )
743        return FALSE;
744
745    *piece = msgs->peerAskedForMetadata[0];
746
747    tr_removeElementFromArray( msgs->peerAskedForMetadata, 0, sizeof( int ),
748                               msgs->peerAskedForMetadataCount-- );
749
750    return TRUE;
751}
752
753static tr_bool
754popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
755{
756    if( msgs->peer->pendingReqsToClient == 0 )
757        return FALSE;
758
759    *setme = msgs->peerAskedFor[0];
760
761    tr_removeElementFromArray( msgs->peerAskedFor, 0, sizeof( struct peer_request ),
762                               msgs->peer->pendingReqsToClient-- );
763
764    return TRUE;
765}
766
767static void
768cancelAllRequestsToClient( tr_peermsgs * msgs )
769{
770    struct peer_request req;
771    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
772
773    while( popNextRequest( msgs, &req ))
774        if( mustSendCancel )
775            protocolSendReject( msgs, &req );
776}
777
778void
779tr_peerMsgsSetChoke( tr_peermsgs * msgs,
780                     int           choke )
781{
782    const time_t now = tr_time( );
783    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
784
785    assert( msgs );
786    assert( msgs->peer );
787    assert( choke == 0 || choke == 1 );
788
789    if( msgs->peer->chokeChangedAt > fibrillationTime )
790    {
791        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
792    }
793    else if( msgs->peer->peerIsChoked != choke )
794    {
795        msgs->peer->peerIsChoked = choke;
796        if( choke )
797            cancelAllRequestsToClient( msgs );
798        protocolSendChoke( msgs, choke );
799        msgs->peer->chokeChangedAt = now;
800    }
801}
802
803/**
804***
805**/
806
807void
808tr_peerMsgsHave( tr_peermsgs * msgs,
809                 uint32_t      index )
810{
811    protocolSendHave( msgs, index );
812
813    /* since we have more pieces now, we might not be interested in this peer */
814    updateInterest( msgs );
815}
816
817/**
818***
819**/
820
821static tr_bool
822reqIsValid( const tr_peermsgs * peer,
823            uint32_t            index,
824            uint32_t            offset,
825            uint32_t            length )
826{
827    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
828}
829
830static tr_bool
831requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
832{
833    return reqIsValid( msgs, req->index, req->offset, req->length );
834}
835
836void
837tr_peerMsgsCancel( tr_peermsgs * msgs, tr_block_index_t block )
838{
839    struct peer_request req;
840    blockToReq( msgs->torrent, block, &req );
841    protocolSendCancel( msgs, &req );
842}
843
844/**
845***
846**/
847
848static void
849sendLtepHandshake( tr_peermsgs * msgs )
850{
851    tr_benc val, *m;
852    char * buf;
853    int len;
854    tr_bool allow_pex;
855    tr_bool allow_metadata_xfer;
856    struct evbuffer * out = msgs->outMessages;
857    const unsigned char * ipv6 = tr_globalIPv6();
858
859    if( msgs->clientSentLtepHandshake )
860        return;
861
862    dbgmsg( msgs, "sending an ltep handshake" );
863    msgs->clientSentLtepHandshake = 1;
864
865    /* decide if we want to advertise metadata xfer support (BEP 9) */
866    if( tr_torrentIsPrivate( msgs->torrent ) )
867        allow_metadata_xfer = 0;
868    else
869        allow_metadata_xfer = 1;
870
871    /* decide if we want to advertise pex support */
872    if( !tr_torrentAllowsPex( msgs->torrent ) )
873        allow_pex = 0;
874    else if( msgs->peerSentLtepHandshake )
875        allow_pex = msgs->peerSupportsPex ? 1 : 0;
876    else
877        allow_pex = 1;
878
879    tr_bencInitDict( &val, 8 );
880    tr_bencDictAddInt( &val, "e", getSession(msgs)->encryptionMode != TR_CLEAR_PREFERRED );
881    if( ipv6 != NULL )
882        tr_bencDictAddRaw( &val, "ipv6", ipv6, 16 );
883    if( allow_metadata_xfer && tr_torrentHasMetadata( msgs->torrent )
884                            && ( msgs->torrent->infoDictLength > 0 ) )
885        tr_bencDictAddInt( &val, "metadata_size", msgs->torrent->infoDictLength );
886    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( getSession(msgs) ) );
887    tr_bencDictAddInt( &val, "reqq", REQQ );
888    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
889    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
890    m  = tr_bencDictAddDict( &val, "m", 2 );
891    if( allow_metadata_xfer )
892        tr_bencDictAddInt( m, "ut_metadata", UT_METADATA_ID );
893    if( allow_pex )
894        tr_bencDictAddInt( m, "ut_pex", UT_PEX_ID );
895
896    buf = tr_bencToStr( &val, TR_FMT_BENC, &len );
897
898    tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + len );
899    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
900    tr_peerIoWriteUint8 ( msgs->peer->io, out, LTEP_HANDSHAKE );
901    tr_peerIoWriteBytes ( msgs->peer->io, out, buf, len );
902    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
903    dbgOutMessageLen( msgs );
904
905    /* cleanup */
906    tr_bencFree( &val );
907    tr_free( buf );
908}
909
910static void
911parseLtepHandshake( tr_peermsgs *     msgs,
912                    int               len,
913                    struct evbuffer * inbuf )
914{
915    int64_t   i;
916    tr_benc   val, * sub;
917    uint8_t * tmp = tr_new( uint8_t, len );
918    const uint8_t *addr;
919    size_t addr_len;
920    tr_pex pex;
921
922    memset( &pex, 0, sizeof( tr_pex ) );
923
924    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
925    msgs->peerSentLtepHandshake = 1;
926
927    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
928    {
929        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
930        tr_free( tmp );
931        return;
932    }
933
934    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
935
936    /* does the peer prefer encrypted connections? */
937    if( tr_bencDictFindInt( &val, "e", &i ) ) {
938        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
939                                              : ENCRYPTION_PREFERENCE_NO;
940        if( i )
941            pex.flags |= ADDED_F_ENCRYPTION_FLAG;
942    }
943
944    /* check supported messages for utorrent pex */
945    msgs->peerSupportsPex = 0;
946    msgs->peerSupportsMetadataXfer = 0;
947
948    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
949        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
950            msgs->peerSupportsPex = i != 0;
951            msgs->ut_pex_id = (uint8_t) i;
952            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
953        }
954        if( tr_bencDictFindInt( sub, "ut_metadata", &i ) ) {
955            msgs->peerSupportsMetadataXfer = i != 0;
956            msgs->ut_metadata_id = (uint8_t) i;
957            dbgmsg( msgs, "msgs->ut_metadata_id is %d", (int)msgs->ut_metadata_id );
958        }
959    }
960
961    /* look for metainfo size (BEP 9) */
962    if( tr_bencDictFindInt( &val, "metadata_size", &i ) )
963        tr_torrentSetMetadataSizeHint( msgs->torrent, i );
964
965    /* look for upload_only (BEP 21) */
966    if( tr_bencDictFindInt( &val, "upload_only", &i ) ) {
967        fireUploadOnly( msgs, i!=0 );
968        if( i )
969            pex.flags |= ADDED_F_SEED_FLAG;
970    }
971
972    /* get peer's listening port */
973    if( tr_bencDictFindInt( &val, "p", &i ) ) {
974        pex.port = htons( (uint16_t)i );
975        fireClientGotPort( msgs, pex.port );
976        dbgmsg( msgs, "peer's port is now %d", (int)i );
977    }
978
979    if( tr_peerIoIsIncoming( msgs->peer->io )
980        && tr_bencDictFindRaw( &val, "ipv4", &addr, &addr_len )
981        && ( addr_len == 4 ) )
982    {
983        pex.addr.type = TR_AF_INET;
984        memcpy( &pex.addr.addr.addr4, addr, 4 );
985        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex );
986    }
987
988    if( tr_peerIoIsIncoming( msgs->peer->io )
989        && tr_bencDictFindRaw( &val, "ipv6", &addr, &addr_len )
990        && ( addr_len == 16 ) )
991    {
992        pex.addr.type = TR_AF_INET6;
993        memcpy( &pex.addr.addr.addr6, addr, 16 );
994        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex );
995    }
996
997    /* get peer's maximum request queue size */
998    if( tr_bencDictFindInt( &val, "reqq", &i ) )
999        msgs->reqq = i;
1000
1001    tr_bencFree( &val );
1002    tr_free( tmp );
1003}
1004
1005static void
1006parseUtMetadata( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1007{
1008    tr_benc dict;
1009    char * msg_end;
1010    char * benc_end;
1011    int64_t msg_type = -1;
1012    int64_t piece = -1;
1013    int64_t total_size = 0;
1014    uint8_t * tmp = tr_new( uint8_t, msglen );
1015
1016    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1017    msg_end = (char*)tmp + msglen;
1018
1019    if( !tr_bencLoad( tmp, msglen, &dict, &benc_end ) )
1020    {
1021        tr_bencDictFindInt( &dict, "msg_type", &msg_type );
1022        tr_bencDictFindInt( &dict, "piece", &piece );
1023        tr_bencDictFindInt( &dict, "total_size", &total_size );
1024        tr_bencFree( &dict );
1025    }
1026
1027    dbgmsg( msgs, "got ut_metadata msg: type %d, piece %d, total_size %d",
1028            (int)msg_type, (int)piece, (int)total_size );
1029
1030    if( msg_type == METADATA_MSG_TYPE_REJECT )
1031    {
1032        /* NOOP */
1033    }
1034
1035    if( ( msg_type == METADATA_MSG_TYPE_DATA )
1036        && ( !tr_torrentHasMetadata( msgs->torrent ) )
1037        && ( msg_end - benc_end <= METADATA_PIECE_SIZE )
1038        && ( piece * METADATA_PIECE_SIZE + (msg_end - benc_end) <= total_size ) )
1039    {
1040        const int pieceLen = msg_end - benc_end;
1041        tr_torrentSetMetadataPiece( msgs->torrent, piece, benc_end, pieceLen );
1042    }
1043
1044    if( msg_type == METADATA_MSG_TYPE_REQUEST )
1045    {
1046        if( ( piece >= 0 )
1047            && tr_torrentHasMetadata( msgs->torrent )
1048            && !tr_torrentIsPrivate( msgs->torrent )
1049            && ( msgs->peerAskedForMetadataCount < METADATA_REQQ ) )
1050        {
1051            msgs->peerAskedForMetadata[msgs->peerAskedForMetadataCount++] = piece;
1052        }
1053        else
1054        {
1055            tr_benc tmp;
1056            int payloadLen;
1057            char * payload;
1058            tr_peerIo  * io  = msgs->peer->io;
1059            struct evbuffer * out = msgs->outMessages;
1060
1061            /* build the rejection message */
1062            tr_bencInitDict( &tmp, 2 );
1063            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1064            tr_bencDictAddInt( &tmp, "piece", piece );
1065            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1066            tr_bencFree( &tmp );
1067
1068            /* write it out as a LTEP message to our outMessages buffer */
1069            tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + payloadLen );
1070            tr_peerIoWriteUint8 ( io, out, BT_LTEP );
1071            tr_peerIoWriteUint8 ( io, out, msgs->ut_metadata_id );
1072            tr_peerIoWriteBytes ( io, out, payload, payloadLen );
1073            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1074            dbgOutMessageLen( msgs );
1075
1076            tr_free( payload );
1077        }
1078    }
1079
1080    tr_free( tmp );
1081}
1082
1083static void
1084parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1085{
1086    int loaded = 0;
1087    uint8_t * tmp = tr_new( uint8_t, msglen );
1088    tr_benc val;
1089    tr_torrent * tor = msgs->torrent;
1090    const uint8_t * added;
1091    size_t added_len;
1092
1093    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1094
1095    if( tr_torrentAllowsPex( tor )
1096      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1097    {
1098        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1099        {
1100            tr_pex * pex;
1101            size_t i, n;
1102            size_t added_f_len = 0;
1103            const uint8_t * added_f = NULL;
1104
1105            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1106            pex = tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len, &n );
1107
1108            n = MIN( n, MAX_PEX_PEER_COUNT );
1109            for( i=0; i<n; ++i )
1110                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex + i );
1111
1112            tr_free( pex );
1113        }
1114
1115        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1116        {
1117            tr_pex * pex;
1118            size_t i, n;
1119            size_t added_f_len = 0;
1120            const uint8_t * added_f = NULL;
1121
1122            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1123            pex = tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len, &n );
1124
1125            n = MIN( n, MAX_PEX_PEER_COUNT );
1126            for( i=0; i<n; ++i )
1127                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex + i );
1128
1129            tr_free( pex );
1130        }
1131    }
1132
1133    if( loaded )
1134        tr_bencFree( &val );
1135    tr_free( tmp );
1136}
1137
1138static void sendPex( tr_peermsgs * msgs );
1139
1140static void
1141parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer  * inbuf )
1142{
1143    uint8_t ltep_msgid;
1144
1145    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1146    msglen--;
1147
1148    if( ltep_msgid == LTEP_HANDSHAKE )
1149    {
1150        dbgmsg( msgs, "got ltep handshake" );
1151        parseLtepHandshake( msgs, msglen, inbuf );
1152        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1153        {
1154            sendLtepHandshake( msgs );
1155            sendPex( msgs );
1156        }
1157    }
1158    else if( ltep_msgid == UT_PEX_ID )
1159    {
1160        dbgmsg( msgs, "got ut pex" );
1161        msgs->peerSupportsPex = 1;
1162        parseUtPex( msgs, msglen, inbuf );
1163    }
1164    else if( ltep_msgid == UT_METADATA_ID )
1165    {
1166        dbgmsg( msgs, "got ut metadata" );
1167        msgs->peerSupportsMetadataXfer = 1;
1168        parseUtMetadata( msgs, msglen, inbuf );
1169    }
1170    else
1171    {
1172        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1173        evbuffer_drain( inbuf, msglen );
1174    }
1175}
1176
1177static int
1178readBtLength( tr_peermsgs *     msgs,
1179              struct evbuffer * inbuf,
1180              size_t            inlen )
1181{
1182    uint32_t len;
1183
1184    if( inlen < sizeof( len ) )
1185        return READ_LATER;
1186
1187    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1188
1189    if( len == 0 ) /* peer sent us a keepalive message */
1190        dbgmsg( msgs, "got KeepAlive" );
1191    else
1192    {
1193        msgs->incoming.length = len;
1194        msgs->state = AWAITING_BT_ID;
1195    }
1196
1197    return READ_NOW;
1198}
1199
1200static int readBtMessage( tr_peermsgs     * msgs,
1201                          struct evbuffer * inbuf,
1202                          size_t            inlen );
1203
1204static int
1205readBtId( tr_peermsgs * msgs, struct evbuffer  * inbuf, size_t inlen )
1206{
1207    uint8_t id;
1208
1209    if( inlen < sizeof( uint8_t ) )
1210        return READ_LATER;
1211
1212    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1213    msgs->incoming.id = id;
1214    dbgmsg( msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length );
1215
1216    if( id == BT_PIECE )
1217    {
1218        msgs->state = AWAITING_BT_PIECE;
1219        return READ_NOW;
1220    }
1221    else if( msgs->incoming.length != 1 )
1222    {
1223        msgs->state = AWAITING_BT_MESSAGE;
1224        return READ_NOW;
1225    }
1226    else return readBtMessage( msgs, inbuf, inlen - 1 );
1227}
1228
1229static void
1230updatePeerProgress( tr_peermsgs * msgs )
1231{
1232    msgs->peer->progress = tr_bitsetPercent( &msgs->peer->have );
1233    dbgmsg( msgs, "peer progress is %f", msgs->peer->progress );
1234    updateFastSet( msgs );
1235    updateInterest( msgs );
1236    firePeerProgress( msgs );
1237}
1238
1239static void
1240peerMadeRequest( tr_peermsgs *               msgs,
1241                 const struct peer_request * req )
1242{
1243    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1244    const int reqIsValid = requestIsValid( msgs, req );
1245    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( &msgs->torrent->completion, req->index );
1246    const int peerIsChoked = msgs->peer->peerIsChoked;
1247
1248    int allow = FALSE;
1249
1250    if( !reqIsValid )
1251        dbgmsg( msgs, "rejecting an invalid request." );
1252    else if( !clientHasPiece )
1253        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1254    else if( peerIsChoked )
1255        dbgmsg( msgs, "rejecting request from choked peer" );
1256    else if( msgs->peer->pendingReqsToClient + 1 >= REQQ )
1257        dbgmsg( msgs, "rejecting request ... reqq is full" );
1258    else
1259        allow = TRUE;
1260
1261    if( allow )
1262        msgs->peerAskedFor[msgs->peer->pendingReqsToClient++] = *req;
1263    else if( fext )
1264        protocolSendReject( msgs, req );
1265}
1266
1267static tr_bool
1268messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1269{
1270    switch( id )
1271    {
1272        case BT_CHOKE:
1273        case BT_UNCHOKE:
1274        case BT_INTERESTED:
1275        case BT_NOT_INTERESTED:
1276        case BT_FEXT_HAVE_ALL:
1277        case BT_FEXT_HAVE_NONE:
1278            return len == 1;
1279
1280        case BT_HAVE:
1281        case BT_FEXT_SUGGEST:
1282        case BT_FEXT_ALLOWED_FAST:
1283            return len == 5;
1284
1285        case BT_BITFIELD:
1286            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1287
1288        case BT_REQUEST:
1289        case BT_CANCEL:
1290        case BT_FEXT_REJECT:
1291            return len == 13;
1292
1293        case BT_PIECE:
1294            return len > 9 && len <= 16393;
1295
1296        case BT_PORT:
1297            return len == 3;
1298
1299        case BT_LTEP:
1300            return len >= 2;
1301
1302        default:
1303            return FALSE;
1304    }
1305}
1306
1307static int clientGotBlock( tr_peermsgs *               msgs,
1308                           const uint8_t *             block,
1309                           const struct peer_request * req );
1310
1311static int
1312readBtPiece( tr_peermsgs      * msgs,
1313             struct evbuffer  * inbuf,
1314             size_t             inlen,
1315             size_t           * setme_piece_bytes_read )
1316{
1317    struct peer_request * req = &msgs->incoming.blockReq;
1318
1319    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1320    dbgmsg( msgs, "In readBtPiece" );
1321
1322    if( !req->length )
1323    {
1324        if( inlen < 8 )
1325            return READ_LATER;
1326
1327        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1328        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1329        req->length = msgs->incoming.length - 9;
1330        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1331        return READ_NOW;
1332    }
1333    else
1334    {
1335        int err;
1336
1337        /* read in another chunk of data */
1338        const size_t nLeft = req->length - EVBUFFER_LENGTH( msgs->incoming.block );
1339        size_t n = MIN( nLeft, inlen );
1340        size_t i = n;
1341        void * buf = tr_sessionGetBuffer( getSession( msgs ) );
1342        const size_t buflen = SESSION_BUFFER_SIZE;
1343
1344        while( i > 0 )
1345        {
1346            const size_t thisPass = MIN( i, buflen );
1347            tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, thisPass );
1348            evbuffer_add( msgs->incoming.block, buf, thisPass );
1349            i -= thisPass;
1350        }
1351
1352        tr_sessionReleaseBuffer( getSession( msgs ) );
1353        buf = NULL;
1354
1355        fireClientGotData( msgs, n, TRUE );
1356        *setme_piece_bytes_read += n;
1357        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1358               n, req->index, req->offset, req->length,
1359               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1360        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1361            return READ_LATER;
1362
1363        /* we've got the whole block ... process it */
1364        err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), req );
1365
1366        /* cleanup */
1367        evbuffer_free( msgs->incoming.block );
1368        msgs->incoming.block = evbuffer_new( );
1369        req->length = 0;
1370        msgs->state = AWAITING_BT_LENGTH;
1371        return err ? READ_ERR : READ_NOW;
1372    }
1373}
1374
1375static void updateDesiredRequestCount( tr_peermsgs * msgs, uint64_t now );
1376
1377static int
1378readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1379{
1380    uint32_t      ui32;
1381    uint32_t      msglen = msgs->incoming.length;
1382    const uint8_t id = msgs->incoming.id;
1383#ifndef NDEBUG
1384    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1385#endif
1386    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1387
1388    --msglen; /* id length */
1389
1390    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1391
1392    if( inlen < msglen )
1393        return READ_LATER;
1394
1395    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1396    {
1397        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1398        fireError( msgs, EMSGSIZE );
1399        return READ_ERR;
1400    }
1401
1402    switch( id )
1403    {
1404        case BT_CHOKE:
1405            dbgmsg( msgs, "got Choke" );
1406            msgs->peer->clientIsChoked = 1;
1407            if( !fext )
1408                fireGotChoke( msgs );
1409            break;
1410
1411        case BT_UNCHOKE:
1412            dbgmsg( msgs, "got Unchoke" );
1413            msgs->peer->clientIsChoked = 0;
1414            updateDesiredRequestCount( msgs, tr_date( ) );
1415            break;
1416
1417        case BT_INTERESTED:
1418            dbgmsg( msgs, "got Interested" );
1419            msgs->peer->peerIsInterested = 1;
1420            break;
1421
1422        case BT_NOT_INTERESTED:
1423            dbgmsg( msgs, "got Not Interested" );
1424            msgs->peer->peerIsInterested = 0;
1425            break;
1426
1427        case BT_HAVE:
1428            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1429            dbgmsg( msgs, "got Have: %u", ui32 );
1430            if( tr_torrentHasMetadata( msgs->torrent )
1431                    && ( ui32 >= msgs->torrent->info.pieceCount ) )
1432            {
1433                fireError( msgs, ERANGE );
1434                return READ_ERR;
1435            }
1436            if( tr_bitsetAdd( &msgs->peer->have, ui32 ) )
1437            {
1438                fireError( msgs, ERANGE );
1439                return READ_ERR;
1440            }
1441            updatePeerProgress( msgs );
1442            break;
1443
1444        case BT_BITFIELD: {
1445            const size_t bitCount = tr_torrentHasMetadata( msgs->torrent )
1446                                  ? msgs->torrent->info.pieceCount
1447                                  : msglen * 8;
1448            dbgmsg( msgs, "got a bitfield" );
1449            tr_bitsetReserve( &msgs->peer->have, bitCount );
1450            tr_peerIoReadBytes( msgs->peer->io, inbuf,
1451                                msgs->peer->have.bitfield.bits, msglen );
1452            updatePeerProgress( msgs );
1453            break;
1454        }
1455
1456        case BT_REQUEST:
1457        {
1458            struct peer_request r;
1459            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1460            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1461            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1462            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1463            peerMadeRequest( msgs, &r );
1464            break;
1465        }
1466
1467        case BT_CANCEL:
1468        {
1469            int i;
1470            struct peer_request r;
1471            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1472            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1473            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1474            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1475
1476            for( i=0; i<msgs->peer->pendingReqsToClient; ++i ) {
1477                const struct peer_request * req = msgs->peerAskedFor + i;
1478                if( ( req->index == r.index ) && ( req->offset == r.offset ) && ( req->length == r.length ) )
1479                    break;
1480            }
1481
1482            if( i < msgs->peer->pendingReqsToClient )
1483                tr_removeElementFromArray( msgs->peerAskedFor, i, sizeof( struct peer_request ),
1484                                           msgs->peer->pendingReqsToClient-- );
1485            break;
1486        }
1487
1488        case BT_PIECE:
1489            assert( 0 ); /* handled elsewhere! */
1490            break;
1491
1492        case BT_PORT:
1493            dbgmsg( msgs, "Got a BT_PORT" );
1494            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->dht_port );
1495            if( msgs->peer->dht_port > 0 )
1496                tr_dhtAddNode( getSession(msgs),
1497                               tr_peerAddress( msgs->peer ),
1498                               msgs->peer->dht_port, 0 );
1499            break;
1500
1501        case BT_FEXT_SUGGEST:
1502            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1503            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1504            if( fext )
1505                fireClientGotSuggest( msgs, ui32 );
1506            else {
1507                fireError( msgs, EMSGSIZE );
1508                return READ_ERR;
1509            }
1510            break;
1511
1512        case BT_FEXT_ALLOWED_FAST:
1513            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1514            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1515            if( fext )
1516                fireClientGotAllowedFast( msgs, ui32 );
1517            else {
1518                fireError( msgs, EMSGSIZE );
1519                return READ_ERR;
1520            }
1521            break;
1522
1523        case BT_FEXT_HAVE_ALL:
1524            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1525            if( fext ) {
1526                tr_bitsetSetHaveAll( &msgs->peer->have );
1527                updatePeerProgress( msgs );
1528            } else {
1529                fireError( msgs, EMSGSIZE );
1530                return READ_ERR;
1531            }
1532            break;
1533
1534        case BT_FEXT_HAVE_NONE:
1535            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1536            if( fext ) {
1537                tr_bitsetSetHaveNone( &msgs->peer->have );
1538                updatePeerProgress( msgs );
1539            } else {
1540                fireError( msgs, EMSGSIZE );
1541                return READ_ERR;
1542            }
1543            break;
1544
1545        case BT_FEXT_REJECT:
1546        {
1547            struct peer_request r;
1548            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1549            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1550            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1551            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1552            if( fext )
1553                fireGotRej( msgs, &r );
1554            else {
1555                fireError( msgs, EMSGSIZE );
1556                return READ_ERR;
1557            }
1558            break;
1559        }
1560
1561        case BT_LTEP:
1562            dbgmsg( msgs, "Got a BT_LTEP" );
1563            parseLtep( msgs, msglen, inbuf );
1564            break;
1565
1566        default:
1567            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1568            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1569            break;
1570    }
1571
1572    assert( msglen + 1 == msgs->incoming.length );
1573    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1574
1575    msgs->state = AWAITING_BT_LENGTH;
1576    return READ_NOW;
1577}
1578
1579static void
1580addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1581{
1582    if( !msgs->peer->blame )
1583         msgs->peer->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1584    tr_bitfieldAdd( msgs->peer->blame, index );
1585}
1586
1587/* returns 0 on success, or an errno on failure */
1588static int
1589clientGotBlock( tr_peermsgs *               msgs,
1590                const uint8_t *             data,
1591                const struct peer_request * req )
1592{
1593    int err;
1594    tr_torrent * tor = msgs->torrent;
1595    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1596
1597    assert( msgs );
1598    assert( req );
1599
1600    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1601        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1602                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1603        return EMSGSIZE;
1604    }
1605
1606    /* save the block */
1607    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1608
1609    if( !tr_peerMgrDidPeerRequest( msgs->torrent, msgs->peer, block ) ) {
1610        dbgmsg( msgs, "we didn't ask for this message..." );
1611        return 0;
1612    }
1613
1614    /**
1615    ***  Save the block
1616    **/
1617
1618    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1619        return err;
1620
1621    addPeerToBlamefield( msgs, req->index );
1622    fireGotBlock( msgs, req );
1623    return 0;
1624}
1625
1626static int peerPulse( void * vmsgs );
1627
1628static void
1629didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1630{
1631    tr_peermsgs * msgs = vmsgs;
1632    firePeerGotData( msgs, bytesWritten, wasPieceData );
1633
1634    if ( tr_isPeerIo( io ) && io->userData )
1635        peerPulse( msgs );
1636}
1637
1638static ReadState
1639canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1640{
1641    ReadState         ret;
1642    tr_peermsgs *     msgs = vmsgs;
1643    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1644    const size_t      inlen = EVBUFFER_LENGTH( in );
1645
1646    dbgmsg( msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state );
1647
1648    if( !inlen )
1649    {
1650        ret = READ_LATER;
1651    }
1652    else if( msgs->state == AWAITING_BT_PIECE )
1653    {
1654        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1655    }
1656    else switch( msgs->state )
1657    {
1658        case AWAITING_BT_LENGTH:
1659            ret = readBtLength ( msgs, in, inlen ); break;
1660
1661        case AWAITING_BT_ID:
1662            ret = readBtId     ( msgs, in, inlen ); break;
1663
1664        case AWAITING_BT_MESSAGE:
1665            ret = readBtMessage( msgs, in, inlen ); break;
1666
1667        default:
1668            ret = READ_ERR;
1669            assert( 0 );
1670    }
1671
1672    dbgmsg( msgs, "canRead: ret is %d", (int)ret );
1673
1674    /* log the raw data that was read */
1675    if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
1676        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1677
1678    return ret;
1679}
1680
1681int
1682tr_peerMsgsIsReadingBlock( const tr_peermsgs * msgs, tr_block_index_t block )
1683{
1684    if( msgs->state != AWAITING_BT_PIECE )
1685        return FALSE;
1686
1687    return block == _tr_block( msgs->torrent,
1688                               msgs->incoming.blockReq.index,
1689                               msgs->incoming.blockReq.offset );
1690}
1691
1692/**
1693***
1694**/
1695
1696static void
1697updateDesiredRequestCount( tr_peermsgs * msgs, uint64_t now )
1698{
1699    const tr_torrent * const torrent = msgs->torrent;
1700
1701    if( tr_torrentIsSeed( msgs->torrent ) )
1702    {
1703        msgs->desiredRequestCount = 0;
1704    }
1705    else if( msgs->peer->clientIsChoked )
1706    {
1707        msgs->desiredRequestCount = 0;
1708    }
1709    else if( !msgs->peer->clientIsInterested )
1710    {
1711        msgs->desiredRequestCount = 0;
1712    }
1713    else
1714    {
1715        int irate;
1716        int estimatedBlocksInPeriod;
1717        double rate;
1718        const int floor = 4;
1719        const int seconds = REQUEST_BUF_SECS;
1720
1721        /* Get the rate limit we should use.
1722         * FIXME: this needs to consider all the other peers as well... */
1723        rate = tr_peerGetPieceSpeed( msgs->peer, now, TR_PEER_TO_CLIENT );
1724        if( tr_torrentUsesSpeedLimit( torrent, TR_PEER_TO_CLIENT ) )
1725            rate = MIN( rate, tr_torrentGetSpeedLimit( torrent, TR_PEER_TO_CLIENT ) );
1726
1727        /* honor the session limits, if enabled */
1728        if( tr_torrentUsesSessionLimits( torrent ) )
1729            if( tr_sessionGetActiveSpeedLimit( torrent->session, TR_PEER_TO_CLIENT, &irate ) )
1730                rate = MIN( rate, irate );
1731
1732        /* use this desired rate to figure out how
1733         * many requests we should send to this peer */
1734        estimatedBlocksInPeriod = ( rate * seconds * 1024 ) / torrent->blockSize;
1735        msgs->desiredRequestCount = MAX( floor, estimatedBlocksInPeriod );
1736
1737        /* honor the peer's maximum request count, if specified */
1738        if( msgs->reqq > 0 )
1739            if( msgs->desiredRequestCount > msgs->reqq )
1740                msgs->desiredRequestCount = msgs->reqq;
1741    }
1742}
1743
1744static void
1745updateMetadataRequests( tr_peermsgs * msgs, time_t now )
1746{
1747    int piece;
1748
1749    if( msgs->peerSupportsMetadataXfer
1750        && tr_torrentGetNextMetadataRequest( msgs->torrent, now, &piece ) )
1751    {
1752        tr_benc tmp;
1753        int payloadLen;
1754        char * payload;
1755        tr_peerIo  * io  = msgs->peer->io;
1756        struct evbuffer * out = msgs->outMessages;
1757
1758        /* build the data message */
1759        tr_bencInitDict( &tmp, 3 );
1760        tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REQUEST );
1761        tr_bencDictAddInt( &tmp, "piece", piece );
1762        payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1763        tr_bencFree( &tmp );
1764
1765        dbgmsg( msgs, "requesting metadata piece #%d", piece );
1766
1767        /* write it out as a LTEP message to our outMessages buffer */
1768        tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + payloadLen );
1769        tr_peerIoWriteUint8 ( io, out, BT_LTEP );
1770        tr_peerIoWriteUint8 ( io, out, msgs->ut_metadata_id );
1771        tr_peerIoWriteBytes ( io, out, payload, payloadLen );
1772        pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1773        dbgOutMessageLen( msgs );
1774
1775        tr_free( payload );
1776    }
1777}
1778
1779static void
1780updateBlockRequests( tr_peermsgs * msgs )
1781{
1782    if( tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT )
1783        && ( msgs->desiredRequestCount > 0 )
1784        && ( msgs->peer->pendingReqsToPeer <= ( msgs->desiredRequestCount * 0.66 ) ) )
1785    {
1786        int i;
1787        int n;
1788        const int numwant = msgs->desiredRequestCount - msgs->peer->pendingReqsToPeer;
1789        tr_block_index_t * blocks = tr_new( tr_block_index_t, numwant );
1790
1791        tr_peerMgrGetNextRequests( msgs->torrent, msgs->peer, numwant, blocks, &n );
1792
1793        for( i=0; i<n; ++i )
1794        {
1795            struct peer_request req;
1796            blockToReq( msgs->torrent, blocks[i], &req );
1797            protocolSendRequest( msgs, &req );
1798        }
1799
1800        tr_free( blocks );
1801    }
1802}
1803
1804static void
1805prefetchPieces( tr_peermsgs *msgs )
1806{
1807    int i;
1808
1809    /* Maintain 12 prefetched blocks per unchoked peer */
1810    for( i=msgs->prefetchCount; i<msgs->peer->pendingReqsToClient && i<12; ++i )
1811    {
1812        const struct peer_request * req = msgs->peerAskedFor + i;
1813        tr_ioPrefetch( msgs->torrent, req->index, req->offset, req->length );
1814        ++msgs->prefetchCount;
1815    }
1816}
1817
1818static size_t
1819fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1820{
1821    int piece;
1822    size_t bytesWritten = 0;
1823    struct peer_request req;
1824    const tr_bool haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1825    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1826
1827    /**
1828    ***  Protocol messages
1829    **/
1830
1831    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1832    {
1833        dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1834        msgs->outMessagesBatchedAt = now;
1835    }
1836    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1837    {
1838        const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
1839        /* flush the protocol messages */
1840        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1841        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, FALSE );
1842        msgs->clientSentAnythingAt = now;
1843        msgs->outMessagesBatchedAt = 0;
1844        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1845        bytesWritten +=  len;
1846    }
1847
1848    /**
1849    ***  Metadata Pieces
1850    **/
1851
1852    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= METADATA_PIECE_SIZE )
1853        && popNextMetadataRequest( msgs, &piece ) )
1854    {
1855        char * data;
1856        int dataLen;
1857        tr_bool ok = FALSE;
1858
1859        data = tr_torrentGetMetadataPiece( msgs->torrent, piece, &dataLen );
1860        if( ( dataLen > 0 ) && ( data != NULL ) )
1861        {
1862            tr_benc tmp;
1863            int payloadLen;
1864            char * payload;
1865            tr_peerIo  * io  = msgs->peer->io;
1866            struct evbuffer * out = msgs->outMessages;
1867
1868            /* build the data message */
1869            tr_bencInitDict( &tmp, 3 );
1870            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_DATA );
1871            tr_bencDictAddInt( &tmp, "piece", piece );
1872            tr_bencDictAddInt( &tmp, "total_size", msgs->torrent->infoDictLength );
1873            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1874            tr_bencFree( &tmp );
1875
1876            /* write it out as a LTEP message to our outMessages buffer */
1877            tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + payloadLen + dataLen );
1878            tr_peerIoWriteUint8 ( io, out, BT_LTEP );
1879            tr_peerIoWriteUint8 ( io, out, msgs->ut_metadata_id );
1880            tr_peerIoWriteBytes ( io, out, payload, payloadLen );
1881            tr_peerIoWriteBytes ( io, out, data, dataLen );
1882            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1883            dbgOutMessageLen( msgs );
1884
1885            tr_free( payload );
1886            tr_free( data );
1887
1888            ok = TRUE;
1889        }
1890
1891        if( !ok ) /* send a rejection message */
1892        {
1893            tr_benc tmp;
1894            int payloadLen;
1895            char * payload;
1896            tr_peerIo  * io  = msgs->peer->io;
1897            struct evbuffer * out = msgs->outMessages;
1898
1899            /* build the rejection message */
1900            tr_bencInitDict( &tmp, 2 );
1901            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1902            tr_bencDictAddInt( &tmp, "piece", piece );
1903            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1904            tr_bencFree( &tmp );
1905
1906            /* write it out as a LTEP message to our outMessages buffer */
1907            tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + payloadLen );
1908            tr_peerIoWriteUint8 ( io, out, BT_LTEP );
1909            tr_peerIoWriteUint8 ( io, out, msgs->ut_metadata_id );
1910            tr_peerIoWriteBytes ( io, out, payload, payloadLen );
1911            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1912            dbgOutMessageLen( msgs );
1913
1914            tr_free( payload );
1915        }
1916    }
1917
1918    /**
1919    ***  Data Blocks
1920    **/
1921
1922    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= msgs->torrent->blockSize )
1923        && popNextRequest( msgs, &req ) )
1924    {
1925        --msgs->prefetchCount;
1926
1927        if( requestIsValid( msgs, &req )
1928            && tr_cpPieceIsComplete( &msgs->torrent->completion, req.index ) )
1929        {
1930            /* FIXME(libevent2) use evbuffer_reserve_space() + evbuffer_commit_space() */
1931            int err;
1932            const uint32_t msglen = 4 + 1 + 4 + 4 + req.length;
1933            struct evbuffer * out;
1934            tr_peerIo * io = msgs->peer->io;
1935
1936            out = evbuffer_new( );
1937            evbuffer_expand( out, msglen );
1938
1939            tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1940            tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1941            tr_peerIoWriteUint32( io, out, req.index );
1942            tr_peerIoWriteUint32( io, out, req.offset );
1943
1944            err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, EVBUFFER_DATA(out)+EVBUFFER_LENGTH(out) );
1945            if( err )
1946            {
1947                if( fext )
1948                    protocolSendReject( msgs, &req );
1949            }
1950            else
1951            {
1952                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1953                EVBUFFER_LENGTH(out) += req.length;
1954                assert( EVBUFFER_LENGTH( out ) == msglen );
1955                tr_peerIoWriteBuf( io, out, TRUE );
1956                bytesWritten += EVBUFFER_LENGTH( out );
1957                msgs->clientSentAnythingAt = now;
1958            }
1959
1960            evbuffer_free( out );
1961
1962            if( err )
1963            {
1964                bytesWritten = 0;
1965                msgs = NULL;
1966            }
1967        }
1968        else if( fext ) /* peer needs a reject message */
1969        {
1970            protocolSendReject( msgs, &req );
1971        }
1972
1973        if( msgs != NULL )
1974            prefetchPieces( msgs );
1975    }
1976
1977    /**
1978    ***  Keepalive
1979    **/
1980
1981    if( ( msgs != NULL )
1982        && ( msgs->clientSentAnythingAt != 0 )
1983        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1984    {
1985        dbgmsg( msgs, "sending a keepalive message" );
1986        tr_peerIoWriteUint32( msgs->peer->io, msgs->outMessages, 0 );
1987        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1988    }
1989
1990    return bytesWritten;
1991}
1992
1993static int
1994peerPulse( void * vmsgs )
1995{
1996    tr_peermsgs * msgs = vmsgs;
1997    const time_t  now = tr_time( );
1998
1999    if ( tr_isPeerIo( msgs->peer->io ) ) {
2000        updateDesiredRequestCount( msgs, tr_date( ) );
2001        updateBlockRequests( msgs );
2002        updateMetadataRequests( msgs, now );
2003    }
2004
2005    for( ;; )
2006        if( fillOutputBuffer( msgs, now ) < 1 )
2007            break;
2008
2009    return TRUE; /* loop forever */
2010}
2011
2012void
2013tr_peerMsgsPulse( tr_peermsgs * msgs )
2014{
2015    if( msgs != NULL )
2016        peerPulse( msgs );
2017}
2018
2019static void
2020gotError( tr_peerIo  * io UNUSED,
2021          short        what,
2022          void       * vmsgs )
2023{
2024    if( what & EVBUFFER_TIMEOUT )
2025        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
2026    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
2027        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
2028               what, errno, tr_strerror( errno ) );
2029    fireError( vmsgs, ENOTCONN );
2030}
2031
2032static void
2033sendBitfield( tr_peermsgs * msgs )
2034{
2035    struct evbuffer * out = msgs->outMessages;
2036    tr_bitfield *     field;
2037    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
2038    size_t            i;
2039    size_t            lazyCount = 0;
2040
2041    field = tr_bitfieldDup( tr_cpPieceBitfield( &msgs->torrent->completion ) );
2042
2043    if( tr_sessionIsLazyBitfieldEnabled( getSession( msgs ) ) )
2044    {
2045        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
2046            speed over a truly random sample -- let's limit the pool size to
2047            the first 1000 pieces so large torrents don't bog things down */
2048        size_t poolSize;
2049        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
2050        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
2051
2052        /* build the pool */
2053        for( i=poolSize=0; i<maxPoolSize; ++i )
2054            if( tr_bitfieldHas( field, i ) )
2055                pool[poolSize++] = i;
2056
2057        /* pull random piece indices from the pool */
2058        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
2059        {
2060            const int pos = tr_cryptoWeakRandInt( poolSize );
2061            const tr_piece_index_t piece = pool[pos];
2062            tr_bitfieldRem( field, piece );
2063            lazyPieces[lazyCount++] = piece;
2064            pool[pos] = pool[--poolSize];
2065        }
2066
2067        /* cleanup */
2068        tr_free( pool );
2069    }
2070
2071    tr_peerIoWriteUint32( msgs->peer->io, out,
2072                          sizeof( uint8_t ) + field->byteCount );
2073    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_BITFIELD );
2074    /* FIXME(libevent2): use evbuffer_add_reference() */
2075    tr_peerIoWriteBytes ( msgs->peer->io, out, field->bits, field->byteCount );
2076    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu",
2077            EVBUFFER_LENGTH( out ) );
2078    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
2079
2080    for( i = 0; i < lazyCount; ++i )
2081        protocolSendHave( msgs, lazyPieces[i] );
2082
2083    tr_bitfieldFree( field );
2084}
2085
2086static void
2087tellPeerWhatWeHave( tr_peermsgs * msgs )
2088{
2089    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
2090
2091    if( fext && ( tr_cpGetStatus( &msgs->torrent->completion ) == TR_SEED ) )
2092    {
2093        protocolSendHaveAll( msgs );
2094    }
2095    else if( fext && ( tr_cpHaveValid( &msgs->torrent->completion ) == 0 ) )
2096    {
2097        protocolSendHaveNone( msgs );
2098    }
2099    else
2100    {
2101        sendBitfield( msgs );
2102    }
2103}
2104
2105/**
2106***
2107**/
2108
2109/* some peers give us error messages if we send
2110   more than this many peers in a single pex message
2111   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
2112#define MAX_PEX_ADDED 50
2113#define MAX_PEX_DROPPED 50
2114
2115typedef struct
2116{
2117    tr_pex *  added;
2118    tr_pex *  dropped;
2119    tr_pex *  elements;
2120    int       addedCount;
2121    int       droppedCount;
2122    int       elementCount;
2123}
2124PexDiffs;
2125
2126static void
2127pexAddedCb( void * vpex,
2128            void * userData )
2129{
2130    PexDiffs * diffs = userData;
2131    tr_pex *   pex = vpex;
2132
2133    if( diffs->addedCount < MAX_PEX_ADDED )
2134    {
2135        diffs->added[diffs->addedCount++] = *pex;
2136        diffs->elements[diffs->elementCount++] = *pex;
2137    }
2138}
2139
2140static inline void
2141pexDroppedCb( void * vpex,
2142              void * userData )
2143{
2144    PexDiffs * diffs = userData;
2145    tr_pex *   pex = vpex;
2146
2147    if( diffs->droppedCount < MAX_PEX_DROPPED )
2148    {
2149        diffs->dropped[diffs->droppedCount++] = *pex;
2150    }
2151}
2152
2153static inline void
2154pexElementCb( void * vpex,
2155              void * userData )
2156{
2157    PexDiffs * diffs = userData;
2158    tr_pex * pex = vpex;
2159
2160    diffs->elements[diffs->elementCount++] = *pex;
2161}
2162
2163static void
2164sendPex( tr_peermsgs * msgs )
2165{
2166    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2167    {
2168        PexDiffs diffs;
2169        PexDiffs diffs6;
2170        tr_pex * newPex = NULL;
2171        tr_pex * newPex6 = NULL;
2172        const int newCount = tr_peerMgrGetPeers( msgs->torrent, &newPex, TR_AF_INET, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2173        const int newCount6 = tr_peerMgrGetPeers( msgs->torrent, &newPex6, TR_AF_INET6, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2174
2175        /* build the diffs */
2176        diffs.added = tr_new( tr_pex, newCount );
2177        diffs.addedCount = 0;
2178        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2179        diffs.droppedCount = 0;
2180        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2181        diffs.elementCount = 0;
2182        tr_set_compare( msgs->pex, msgs->pexCount,
2183                        newPex, newCount,
2184                        tr_pexCompare, sizeof( tr_pex ),
2185                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2186        diffs6.added = tr_new( tr_pex, newCount6 );
2187        diffs6.addedCount = 0;
2188        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
2189        diffs6.droppedCount = 0;
2190        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
2191        diffs6.elementCount = 0;
2192        tr_set_compare( msgs->pex6, msgs->pexCount6,
2193                        newPex6, newCount6,
2194                        tr_pexCompare, sizeof( tr_pex ),
2195                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
2196        dbgmsg(
2197            msgs,
2198            "pex: old peer count %d+%d, new peer count %d+%d, "
2199            "added %d+%d, removed %d+%d",
2200            msgs->pexCount, msgs->pexCount6, newCount, newCount6,
2201            diffs.addedCount, diffs6.addedCount,
2202            diffs.droppedCount, diffs6.droppedCount );
2203
2204        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2205            !diffs6.droppedCount )
2206        {
2207            tr_free( diffs.elements );
2208            tr_free( diffs6.elements );
2209        }
2210        else
2211        {
2212            int  i;
2213            tr_benc val;
2214            char * benc;
2215            int bencLen;
2216            uint8_t * tmp, *walk;
2217            tr_peerIo       * io  = msgs->peer->io;
2218            struct evbuffer * out = msgs->outMessages;
2219
2220            /* update peer */
2221            tr_free( msgs->pex );
2222            msgs->pex = diffs.elements;
2223            msgs->pexCount = diffs.elementCount;
2224            tr_free( msgs->pex6 );
2225            msgs->pex6 = diffs6.elements;
2226            msgs->pexCount6 = diffs6.elementCount;
2227
2228            /* build the pex payload */
2229            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
2230                                         * speed vs. likelihood? */
2231
2232            if( diffs.addedCount > 0)
2233            {
2234                /* "added" */
2235                tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2236                for( i = 0; i < diffs.addedCount; ++i ) {
2237                    memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2238                    memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2239                }
2240                assert( ( walk - tmp ) == diffs.addedCount * 6 );
2241                tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2242                tr_free( tmp );
2243
2244                /* "added.f" */
2245                tmp = walk = tr_new( uint8_t, diffs.addedCount );
2246                for( i = 0; i < diffs.addedCount; ++i )
2247                    *walk++ = diffs.added[i].flags;
2248                assert( ( walk - tmp ) == diffs.addedCount );
2249                tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2250                tr_free( tmp );
2251            }
2252
2253            if( diffs.droppedCount > 0 )
2254            {
2255                /* "dropped" */
2256                tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2257                for( i = 0; i < diffs.droppedCount; ++i ) {
2258                    memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2259                    memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2260                }
2261                assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2262                tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2263                tr_free( tmp );
2264            }
2265
2266            if( diffs6.addedCount > 0 )
2267            {
2268                /* "added6" */
2269                tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2270                for( i = 0; i < diffs6.addedCount; ++i ) {
2271                    memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2272                    walk += 16;
2273                    memcpy( walk, &diffs6.added[i].port, 2 );
2274                    walk += 2;
2275                }
2276                assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2277                tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2278                tr_free( tmp );
2279
2280                /* "added6.f" */
2281                tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2282                for( i = 0; i < diffs6.addedCount; ++i )
2283                    *walk++ = diffs6.added[i].flags;
2284                assert( ( walk - tmp ) == diffs6.addedCount );
2285                tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2286                tr_free( tmp );
2287            }
2288
2289            if( diffs6.droppedCount > 0 )
2290            {
2291                /* "dropped6" */
2292                tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2293                for( i = 0; i < diffs6.droppedCount; ++i ) {
2294                    memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2295                    walk += 16;
2296                    memcpy( walk, &diffs6.dropped[i].port, 2 );
2297                    walk += 2;
2298                }
2299                assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2300                tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2301                tr_free( tmp );
2302            }
2303
2304            /* write the pex message */
2305            benc = tr_bencToStr( &val, TR_FMT_BENC, &bencLen );
2306            tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + bencLen );
2307            tr_peerIoWriteUint8 ( io, out, BT_LTEP );
2308            tr_peerIoWriteUint8 ( io, out, msgs->ut_pex_id );
2309            tr_peerIoWriteBytes ( io, out, benc, bencLen );
2310            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2311            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
2312            dbgOutMessageLen( msgs );
2313
2314            tr_free( benc );
2315            tr_bencFree( &val );
2316        }
2317
2318        /* cleanup */
2319        tr_free( diffs.added );
2320        tr_free( diffs.dropped );
2321        tr_free( newPex );
2322        tr_free( diffs6.added );
2323        tr_free( diffs6.dropped );
2324        tr_free( newPex6 );
2325
2326        /*msgs->clientSentPexAt = tr_time( );*/
2327    }
2328}
2329
2330static void
2331pexPulse( int foo UNUSED, short bar UNUSED, void * vmsgs )
2332{
2333    struct tr_peermsgs * msgs = vmsgs;
2334
2335    sendPex( msgs );
2336
2337    tr_timerAdd( &msgs->pexTimer, PEX_INTERVAL_SECS, 0 );
2338}
2339
2340/**
2341***
2342**/
2343
2344tr_peermsgs*
2345tr_peerMsgsNew( struct tr_torrent * torrent,
2346                struct tr_peer    * peer,
2347                tr_delivery_func    func,
2348                void              * userData,
2349                tr_publisher_tag  * setme )
2350{
2351    tr_peermsgs * m;
2352
2353    assert( peer );
2354    assert( peer->io );
2355
2356    m = tr_new0( tr_peermsgs, 1 );
2357    m->publisher = TR_PUBLISHER_INIT;
2358    m->peer = peer;
2359    m->torrent = torrent;
2360    m->peer->clientIsChoked = 1;
2361    m->peer->peerIsChoked = 1;
2362    m->peer->clientIsInterested = 0;
2363    m->peer->peerIsInterested = 0;
2364    m->state = AWAITING_BT_LENGTH;
2365    m->outMessages = evbuffer_new( );
2366    m->outMessagesBatchedAt = 0;
2367    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2368    m->incoming.block = evbuffer_new( );
2369    evtimer_set( &m->pexTimer, pexPulse, m );
2370    tr_timerAdd( &m->pexTimer, PEX_INTERVAL_SECS, 0 );
2371    peer->msgs = m;
2372
2373    *setme = tr_publisherSubscribe( &m->publisher, func, userData );
2374
2375    if( tr_peerIoSupportsLTEP( peer->io ) )
2376        sendLtepHandshake( m );
2377
2378    if( tr_dhtEnabled( torrent->session ) && tr_peerIoSupportsDHT( peer->io ))
2379    {
2380        /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
2381        const struct tr_address *addr = tr_peerIoGetAddress( peer->io, NULL );
2382        if( addr->type == TR_AF_INET || tr_globalIPv6() ) {
2383            protocolSendPort( m, tr_dhtPort( torrent->session ) );
2384        }
2385    }
2386
2387    tellPeerWhatWeHave( m );
2388
2389    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2390    updateDesiredRequestCount( m, tr_date( ) );
2391
2392    return m;
2393}
2394
2395void
2396tr_peerMsgsFree( tr_peermsgs* msgs )
2397{
2398    if( msgs )
2399    {
2400        evtimer_del( &msgs->pexTimer );
2401        tr_publisherDestruct( &msgs->publisher );
2402
2403        evbuffer_free( msgs->incoming.block );
2404        evbuffer_free( msgs->outMessages );
2405        tr_free( msgs->pex6 );
2406        tr_free( msgs->pex );
2407
2408        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2409        tr_free( msgs );
2410    }
2411}
2412
2413void
2414tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2415                        tr_publisher_tag tag )
2416{
2417    tr_publisherUnsubscribe( &peer->publisher, tag );
2418}
Note: See TracBrowser for help on using the repository browser.