source: trunk/libtransmission/peer-msgs.c @ 10325

Last change on this file since 10325 was 10325, checked in by charles, 12 years ago

(trunk libT) #1242 "don't accept duplicate blocks during endgame" -- fixed in trunk for 1.92

  • Property svn:keywords set to Date Rev Author Id
File size: 71.4 KB
Line 
1/*
2 * This file Copyright (C) 2007-2010 Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 10325 2010-03-07 17:38:34Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdarg.h>
17#include <stdio.h>
18#include <stdlib.h>
19#include <string.h>
20
21#include <event.h>
22
23#include "transmission.h"
24#include "bencode.h"
25#include "completion.h"
26#include "crypto.h"
27#include "inout.h"
28#ifdef WIN32
29#include "net.h" /* for ECONN */
30#endif
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-msgs.h"
34#include "session.h"
35#include "stats.h"
36#include "torrent.h"
37#include "torrent-magnet.h"
38#include "tr-dht.h"
39#include "utils.h"
40#include "version.h"
41
42/**
43***
44**/
45
46enum
47{
48    BT_CHOKE                = 0,
49    BT_UNCHOKE              = 1,
50    BT_INTERESTED           = 2,
51    BT_NOT_INTERESTED       = 3,
52    BT_HAVE                 = 4,
53    BT_BITFIELD             = 5,
54    BT_REQUEST              = 6,
55    BT_PIECE                = 7,
56    BT_CANCEL               = 8,
57    BT_PORT                 = 9,
58
59    BT_FEXT_SUGGEST         = 13,
60    BT_FEXT_HAVE_ALL        = 14,
61    BT_FEXT_HAVE_NONE       = 15,
62    BT_FEXT_REJECT          = 16,
63    BT_FEXT_ALLOWED_FAST    = 17,
64
65    BT_LTEP                 = 20,
66
67    LTEP_HANDSHAKE          = 0,
68
69    UT_PEX_ID               = 1,
70    UT_METADATA_ID          = 3,
71
72    MAX_PEX_PEER_COUNT      = 50,
73
74    MIN_CHOKE_PERIOD_SEC    = 10,
75
76    /* idle seconds before we send a keepalive */
77    KEEPALIVE_INTERVAL_SECS = 100,
78
79    PEX_INTERVAL_SECS       = 90, /* sec between sendPex() calls */
80
81    REQQ                    = 512,
82
83    METADATA_REQQ           = 64,
84
85    MAX_BLOCK_SIZE          = ( 1024 * 16 ),
86
87    /* used in lowering the outMessages queue period */
88    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
89    HIGH_PRIORITY_INTERVAL_SECS = 2,
90    LOW_PRIORITY_INTERVAL_SECS = 10,
91
92    /* number of pieces to remove from the bitfield when
93     * lazy bitfields are turned on */
94    LAZY_PIECE_COUNT = 26,
95
96    /* number of pieces we'll allow in our fast set */
97    MAX_FAST_SET_SIZE = 3,
98
99    /* defined in BEP #9 */
100    METADATA_MSG_TYPE_REQUEST = 0,
101    METADATA_MSG_TYPE_DATA = 1,
102    METADATA_MSG_TYPE_REJECT = 2
103};
104
105enum
106{
107    AWAITING_BT_LENGTH,
108    AWAITING_BT_ID,
109    AWAITING_BT_MESSAGE,
110    AWAITING_BT_PIECE
111};
112
113/**
114***
115**/
116
117struct peer_request
118{
119    uint32_t    index;
120    uint32_t    offset;
121    uint32_t    length;
122};
123
124static uint32_t
125getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
126{
127    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
128    const uint64_t blockPos = tor->blockSize * b;
129    assert( blockPos >= piecePos );
130    return (uint32_t)( blockPos - piecePos );
131}
132
133static void
134blockToReq( const tr_torrent     * tor,
135            tr_block_index_t       block,
136            struct peer_request  * setme )
137{
138    assert( setme != NULL );
139
140    setme->index = tr_torBlockPiece( tor, block );
141    setme->offset = getBlockOffsetInPiece( tor, block );
142    setme->length = tr_torBlockCountBytes( tor, block );
143}
144
145/**
146***
147**/
148
149/* this is raw, unchanged data from the peer regarding
150 * the current message that it's sending us. */
151struct tr_incoming
152{
153    uint8_t                id;
154    uint32_t               length; /* includes the +1 for id length */
155    struct peer_request    blockReq; /* metadata for incoming blocks */
156    struct evbuffer *      block; /* piece data for incoming blocks */
157};
158
159/**
160 * Low-level communication state information about a connected peer.
161 *
162 * This structure remembers the low-level protocol states that we're
163 * in with this peer, such as active requests, pex messages, and so on.
164 * Its fields are all private to peer-msgs.c.
165 *
166 * Data not directly involved with sending & receiving messages is
167 * stored in tr_peer, where it can be accessed by both peermsgs and
168 * the peer manager.
169 *
170 * @see struct peer_atom
171 * @see tr_peer
172 */
173struct tr_peermsgs
174{
175    tr_bool         peerSupportsPex;
176    tr_bool         peerSupportsMetadataXfer;
177    tr_bool         clientSentLtepHandshake;
178    tr_bool         peerSentLtepHandshake;
179
180    /*tr_bool         haveFastSet;*/
181
182    int             desiredRequestCount;
183
184    int             prefetchCount;
185
186    /* how long the outMessages batch should be allowed to grow before
187     * it's flushed -- some messages (like requests >:) should be sent
188     * very quickly; others aren't as urgent. */
189    int8_t          outMessagesBatchPeriod;
190
191    uint8_t         state;
192    uint8_t         ut_pex_id;
193    uint8_t         ut_metadata_id;
194    uint16_t        pexCount;
195    uint16_t        pexCount6;
196
197#if 0
198    size_t                 fastsetSize;
199    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
200#endif
201
202    tr_peer *              peer;
203
204    tr_torrent *           torrent;
205
206    tr_publisher           publisher;
207
208    struct evbuffer *      outMessages; /* all the non-piece messages */
209
210    struct peer_request    peerAskedFor[REQQ];
211
212    int                    peerAskedForMetadata[METADATA_REQQ];
213    int                    peerAskedForMetadataCount;
214
215    tr_pex               * pex;
216    tr_pex               * pex6;
217
218    /*time_t                 clientSentPexAt;*/
219    time_t                 clientSentAnythingAt;
220
221    /* when we started batching the outMessages */
222    time_t                outMessagesBatchedAt;
223
224    struct tr_incoming    incoming;
225
226    /* if the peer supports the Extension Protocol in BEP 10 and
227       supplied a reqq argument, it's stored here.  otherwise the
228       value is zero and should be ignored. */
229    int64_t               reqq;
230
231    struct event          pexTimer;
232};
233
234/**
235***
236**/
237
238#if 0
239static tr_bitfield*
240getHave( const struct tr_peermsgs * msgs )
241{
242    if( msgs->peer->have == NULL )
243        msgs->peer->have = tr_bitfieldNew( msgs->torrent->info.pieceCount );
244    return msgs->peer->have;
245}
246#endif
247
248static inline tr_session*
249getSession( struct tr_peermsgs * msgs )
250{
251    return msgs->torrent->session;
252}
253
254/**
255***
256**/
257
258static void
259myDebug( const char * file, int line,
260         const struct tr_peermsgs * msgs,
261         const char * fmt, ... )
262{
263    FILE * fp = tr_getLog( );
264
265    if( fp )
266    {
267        va_list           args;
268        char              timestr[64];
269        struct evbuffer * buf = evbuffer_new( );
270        char *            base = tr_basename( file );
271
272        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
273                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
274                             tr_torrentName( msgs->torrent ),
275                             tr_peerIoGetAddrStr( msgs->peer->io ),
276                             msgs->peer->client );
277        va_start( args, fmt );
278        evbuffer_add_vprintf( buf, fmt, args );
279        va_end( args );
280        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
281        /* FIXME(libevent2) tr_getLog() should return an fd, then use evbuffer_write() here */
282        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
283
284        tr_free( base );
285        evbuffer_free( buf );
286    }
287}
288
289#define dbgmsg( msgs, ... ) \
290    do { \
291        if( tr_deepLoggingIsActive( ) ) \
292            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
293    } while( 0 )
294
295/**
296***
297**/
298
299static void
300pokeBatchPeriod( tr_peermsgs * msgs,
301                 int           interval )
302{
303    if( msgs->outMessagesBatchPeriod > interval )
304    {
305        msgs->outMessagesBatchPeriod = interval;
306        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
307    }
308}
309
310static inline void
311dbgOutMessageLen( tr_peermsgs * msgs )
312{
313    dbgmsg( msgs, "outMessage size is now %zu", EVBUFFER_LENGTH( msgs->outMessages ) );
314}
315
316static void
317protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
318{
319    tr_peerIo       * io  = msgs->peer->io;
320    struct evbuffer * out = msgs->outMessages;
321
322    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
323
324    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
325    tr_peerIoWriteUint8 ( io, out, BT_FEXT_REJECT );
326    tr_peerIoWriteUint32( io, out, req->index );
327    tr_peerIoWriteUint32( io, out, req->offset );
328    tr_peerIoWriteUint32( io, out, req->length );
329
330    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
331    dbgOutMessageLen( msgs );
332}
333
334static void
335protocolSendRequest( tr_peermsgs               * msgs,
336                     const struct peer_request * req )
337{
338    tr_peerIo       * io  = msgs->peer->io;
339    struct evbuffer * out = msgs->outMessages;
340
341    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
342    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
343    tr_peerIoWriteUint32( io, out, req->index );
344    tr_peerIoWriteUint32( io, out, req->offset );
345    tr_peerIoWriteUint32( io, out, req->length );
346
347    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
348    dbgOutMessageLen( msgs );
349    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
350}
351
352static void
353protocolSendCancel( tr_peermsgs               * msgs,
354                    const struct peer_request * req )
355{
356    tr_peerIo       * io  = msgs->peer->io;
357    struct evbuffer * out = msgs->outMessages;
358
359    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
360    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
361    tr_peerIoWriteUint32( io, out, req->index );
362    tr_peerIoWriteUint32( io, out, req->offset );
363    tr_peerIoWriteUint32( io, out, req->length );
364
365    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
366    dbgOutMessageLen( msgs );
367    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
368}
369
370static void
371protocolSendPort(tr_peermsgs *msgs, uint16_t port)
372{
373    tr_peerIo       * io  = msgs->peer->io;
374    struct evbuffer * out = msgs->outMessages;
375
376    dbgmsg( msgs, "sending Port %u", port);
377    tr_peerIoWriteUint32( io, out, 3 );
378    tr_peerIoWriteUint8 ( io, out, BT_PORT );
379    tr_peerIoWriteUint16( io, out, port);
380}
381
382static void
383protocolSendHave( tr_peermsgs * msgs,
384                  uint32_t      index )
385{
386    tr_peerIo       * io  = msgs->peer->io;
387    struct evbuffer * out = msgs->outMessages;
388
389    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
390    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
391    tr_peerIoWriteUint32( io, out, index );
392
393    dbgmsg( msgs, "sending Have %u", index );
394    dbgOutMessageLen( msgs );
395    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
396}
397
398#if 0
399static void
400protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
401{
402    tr_peerIo       * io  = msgs->peer->io;
403    struct evbuffer * out = msgs->outMessages;
404
405    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
406
407    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
408    tr_peerIoWriteUint8 ( io, out, BT_FEXT_ALLOWED_FAST );
409    tr_peerIoWriteUint32( io, out, pieceIndex );
410
411    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
412    dbgOutMessageLen( msgs );
413}
414#endif
415
416static void
417protocolSendChoke( tr_peermsgs * msgs,
418                   int           choke )
419{
420    tr_peerIo       * io  = msgs->peer->io;
421    struct evbuffer * out = msgs->outMessages;
422
423    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
424    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
425
426    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
427    dbgOutMessageLen( msgs );
428    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
429}
430
431static void
432protocolSendHaveAll( tr_peermsgs * msgs )
433{
434    tr_peerIo       * io  = msgs->peer->io;
435    struct evbuffer * out = msgs->outMessages;
436
437    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
438
439    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
440    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_ALL );
441
442    dbgmsg( msgs, "sending HAVE_ALL..." );
443    dbgOutMessageLen( msgs );
444    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
445}
446
447static void
448protocolSendHaveNone( tr_peermsgs * msgs )
449{
450    tr_peerIo       * io  = msgs->peer->io;
451    struct evbuffer * out = msgs->outMessages;
452
453    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
454
455    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
456    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_NONE );
457
458    dbgmsg( msgs, "sending HAVE_NONE..." );
459    dbgOutMessageLen( msgs );
460    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
461}
462
463/**
464***  EVENTS
465**/
466
467static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0, 0, 0 };
468
469static void
470publish( tr_peermsgs * msgs, tr_peer_event * e )
471{
472    assert( msgs->peer );
473    assert( msgs->peer->msgs == msgs );
474
475    tr_publisherPublish( &msgs->publisher, msgs->peer, e );
476}
477
478static void
479fireError( tr_peermsgs * msgs, int err )
480{
481    tr_peer_event e = blankEvent;
482    e.eventType = TR_PEER_ERROR;
483    e.err = err;
484    publish( msgs, &e );
485}
486
487static void
488fireUploadOnly( tr_peermsgs * msgs, tr_bool uploadOnly )
489{
490    tr_peer_event e = blankEvent;
491    e.eventType = TR_PEER_UPLOAD_ONLY;
492    e.uploadOnly = uploadOnly;
493    publish( msgs, &e );
494}
495
496static void
497firePeerProgress( tr_peermsgs * msgs )
498{
499    tr_peer_event e = blankEvent;
500    e.eventType = TR_PEER_PEER_PROGRESS;
501    e.progress = msgs->peer->progress;
502    publish( msgs, &e );
503}
504
505static void
506fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
507{
508    tr_peer_event e = blankEvent;
509    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
510    e.pieceIndex = req->index;
511    e.offset = req->offset;
512    e.length = req->length;
513    publish( msgs, &e );
514}
515
516static void
517fireGotRej( tr_peermsgs * msgs, const struct peer_request * req )
518{
519    tr_peer_event e = blankEvent;
520    e.eventType = TR_PEER_CLIENT_GOT_REJ;
521    e.pieceIndex = req->index;
522    e.offset = req->offset;
523    e.length = req->length;
524    publish( msgs, &e );
525}
526
527static void
528fireGotChoke( tr_peermsgs * msgs )
529{
530    tr_peer_event e = blankEvent;
531    e.eventType = TR_PEER_CLIENT_GOT_CHOKE;
532    publish( msgs, &e );
533}
534
535static void
536fireClientGotData( tr_peermsgs * msgs,
537                   uint32_t      length,
538                   int           wasPieceData )
539{
540    tr_peer_event e = blankEvent;
541
542    e.length = length;
543    e.eventType = TR_PEER_CLIENT_GOT_DATA;
544    e.wasPieceData = wasPieceData;
545    publish( msgs, &e );
546}
547
548static void
549fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
550{
551    tr_peer_event e = blankEvent;
552    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
553    e.pieceIndex = pieceIndex;
554    publish( msgs, &e );
555}
556
557static void
558fireClientGotPort( tr_peermsgs * msgs, tr_port port )
559{
560    tr_peer_event e = blankEvent;
561    e.eventType = TR_PEER_CLIENT_GOT_PORT;
562    e.port = port;
563    publish( msgs, &e );
564}
565
566static void
567fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
568{
569    tr_peer_event e = blankEvent;
570    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
571    e.pieceIndex = pieceIndex;
572    publish( msgs, &e );
573}
574
575static void
576firePeerGotData( tr_peermsgs  * msgs,
577                 uint32_t       length,
578                 int            wasPieceData )
579{
580    tr_peer_event e = blankEvent;
581
582    e.length = length;
583    e.eventType = TR_PEER_PEER_GOT_DATA;
584    e.wasPieceData = wasPieceData;
585
586    publish( msgs, &e );
587}
588
589/**
590***  ALLOWED FAST SET
591***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
592**/
593
594size_t
595tr_generateAllowedSet( tr_piece_index_t * setmePieces,
596                       size_t             desiredSetSize,
597                       size_t             pieceCount,
598                       const uint8_t    * infohash,
599                       const tr_address * addr )
600{
601    size_t setSize = 0;
602
603    assert( setmePieces );
604    assert( desiredSetSize <= pieceCount );
605    assert( desiredSetSize );
606    assert( pieceCount );
607    assert( infohash );
608    assert( addr );
609
610    if( addr->type == TR_AF_INET )
611    {
612        uint8_t w[SHA_DIGEST_LENGTH + 4], *walk=w;
613        uint8_t x[SHA_DIGEST_LENGTH];
614
615        uint32_t ui32 = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
616        memcpy( w, &ui32, sizeof( uint32_t ) );
617        walk += sizeof( uint32_t );
618        memcpy( walk, infohash, SHA_DIGEST_LENGTH );                 /* (2) */
619        walk += SHA_DIGEST_LENGTH;
620        tr_sha1( x, w, walk-w, NULL );                               /* (3) */
621        assert( sizeof( w ) == walk-w );
622
623        while( setSize<desiredSetSize )
624        {
625            int i;
626            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
627            {
628                size_t k;
629                uint32_t j = i * 4;                                  /* (5) */
630                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
631                uint32_t index = y % pieceCount;                     /* (7) */
632
633                for( k=0; k<setSize; ++k )                           /* (8) */
634                    if( setmePieces[k] == index )
635                        break;
636
637                if( k == setSize )
638                    setmePieces[setSize++] = index;                  /* (9) */
639            }
640
641            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
642        }
643    }
644
645    return setSize;
646}
647
648static void
649updateFastSet( tr_peermsgs * msgs UNUSED )
650{
651#if 0
652    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
653    const int peerIsNeedy = msgs->peer->progress < 0.10;
654
655    if( fext && peerIsNeedy && !msgs->haveFastSet )
656    {
657        size_t i;
658        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
659        const tr_info * inf = &msgs->torrent->info;
660        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
661
662        /* build the fast set */
663        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
664        msgs->haveFastSet = 1;
665
666        /* send it to the peer */
667        for( i=0; i<msgs->fastsetSize; ++i )
668            protocolSendAllowedFast( msgs, msgs->fastset[i] );
669    }
670#endif
671}
672
673/**
674***  INTEREST
675**/
676
677static tr_bool
678isPieceInteresting( const tr_peermsgs * msgs,
679                    tr_piece_index_t    piece )
680{
681    const tr_torrent * torrent = msgs->torrent;
682
683    return ( !torrent->info.pieces[piece].dnd )                  /* we want it */
684          && ( !tr_cpPieceIsComplete( &torrent->completion, piece ) ) /* !have */
685          && ( tr_bitsetHas( &msgs->peer->have, piece ) );      /* peer has it */
686}
687
688/* "interested" means we'll ask for piece data if they unchoke us */
689static tr_bool
690isPeerInteresting( const tr_peermsgs * msgs )
691{
692    tr_piece_index_t    i;
693    const tr_torrent *  torrent;
694    const tr_bitfield * bitfield;
695    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
696
697    if( clientIsSeed )
698        return FALSE;
699
700    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
701        return FALSE;
702
703    torrent = msgs->torrent;
704    bitfield = tr_cpPieceBitfield( &torrent->completion );
705
706    for( i = 0; i < torrent->info.pieceCount; ++i )
707        if( isPieceInteresting( msgs, i ) )
708            return TRUE;
709
710    return FALSE;
711}
712
713static void
714sendInterest( tr_peermsgs * msgs, tr_bool clientIsInterested )
715{
716    struct evbuffer * out = msgs->outMessages;
717
718    assert( msgs );
719    assert( tr_isBool( clientIsInterested ) );
720
721    msgs->peer->clientIsInterested = clientIsInterested;
722    dbgmsg( msgs, "Sending %s", clientIsInterested ? "Interested" : "Not Interested" );
723    tr_peerIoWriteUint32( msgs->peer->io, out, sizeof( uint8_t ) );
724    tr_peerIoWriteUint8 ( msgs->peer->io, out, clientIsInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
725
726    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
727    dbgOutMessageLen( msgs );
728}
729
730static void
731updateInterest( tr_peermsgs * msgs )
732{
733    const int i = isPeerInteresting( msgs );
734
735    if( i != msgs->peer->clientIsInterested )
736        sendInterest( msgs, i );
737}
738
739static tr_bool
740popNextMetadataRequest( tr_peermsgs * msgs, int * piece )
741{
742    if( msgs->peerAskedForMetadataCount == 0 )
743        return FALSE;
744
745    *piece = msgs->peerAskedForMetadata[0];
746
747    tr_removeElementFromArray( msgs->peerAskedForMetadata, 0, sizeof( int ),
748                               msgs->peerAskedForMetadataCount-- );
749
750    return TRUE;
751}
752
753static tr_bool
754popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
755{
756    if( msgs->peer->pendingReqsToClient == 0 )
757        return FALSE;
758
759    *setme = msgs->peerAskedFor[0];
760
761    tr_removeElementFromArray( msgs->peerAskedFor, 0, sizeof( struct peer_request ),
762                               msgs->peer->pendingReqsToClient-- );
763
764    return TRUE;
765}
766
767static void
768cancelAllRequestsToClient( tr_peermsgs * msgs )
769{
770    struct peer_request req;
771    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
772
773    while( popNextRequest( msgs, &req ))
774        if( mustSendCancel )
775            protocolSendReject( msgs, &req );
776}
777
778void
779tr_peerMsgsSetChoke( tr_peermsgs * msgs,
780                     int           choke )
781{
782    const time_t now = tr_time( );
783    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
784
785    assert( msgs );
786    assert( msgs->peer );
787    assert( choke == 0 || choke == 1 );
788
789    if( msgs->peer->chokeChangedAt > fibrillationTime )
790    {
791        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
792    }
793    else if( msgs->peer->peerIsChoked != choke )
794    {
795        msgs->peer->peerIsChoked = choke;
796        if( choke )
797            cancelAllRequestsToClient( msgs );
798        protocolSendChoke( msgs, choke );
799        msgs->peer->chokeChangedAt = now;
800    }
801}
802
803/**
804***
805**/
806
807void
808tr_peerMsgsHave( tr_peermsgs * msgs,
809                 uint32_t      index )
810{
811    protocolSendHave( msgs, index );
812
813    /* since we have more pieces now, we might not be interested in this peer */
814    updateInterest( msgs );
815}
816
817/**
818***
819**/
820
821static tr_bool
822reqIsValid( const tr_peermsgs * peer,
823            uint32_t            index,
824            uint32_t            offset,
825            uint32_t            length )
826{
827    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
828}
829
830static tr_bool
831requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
832{
833    return reqIsValid( msgs, req->index, req->offset, req->length );
834}
835
836void
837tr_peerMsgsCancel( tr_peermsgs * msgs, tr_block_index_t block )
838{
839    struct peer_request req;
840    blockToReq( msgs->torrent, block, &req );
841    protocolSendCancel( msgs, &req );
842}
843
844/**
845***
846**/
847
848static void
849sendLtepHandshake( tr_peermsgs * msgs )
850{
851    tr_benc val, *m;
852    char * buf;
853    int len;
854    tr_bool allow_pex;
855    tr_bool allow_metadata_xfer;
856    struct evbuffer * out = msgs->outMessages;
857    const unsigned char * ipv6 = tr_globalIPv6();
858
859    if( msgs->clientSentLtepHandshake )
860        return;
861
862    dbgmsg( msgs, "sending an ltep handshake" );
863    msgs->clientSentLtepHandshake = 1;
864
865    /* decide if we want to advertise metadata xfer support (BEP 9) */
866    if( tr_torrentIsPrivate( msgs->torrent ) )
867        allow_metadata_xfer = 0;
868    else
869        allow_metadata_xfer = 1;
870
871    /* decide if we want to advertise pex support */
872    if( !tr_torrentAllowsPex( msgs->torrent ) )
873        allow_pex = 0;
874    else if( msgs->peerSentLtepHandshake )
875        allow_pex = msgs->peerSupportsPex ? 1 : 0;
876    else
877        allow_pex = 1;
878
879    tr_bencInitDict( &val, 8 );
880    tr_bencDictAddInt( &val, "e", getSession(msgs)->encryptionMode != TR_CLEAR_PREFERRED );
881    if( ipv6 != NULL )
882        tr_bencDictAddRaw( &val, "ipv6", ipv6, 16 );
883    if( allow_metadata_xfer && tr_torrentHasMetadata( msgs->torrent )
884                            && ( msgs->torrent->infoDictLength > 0 ) )
885        tr_bencDictAddInt( &val, "metadata_size", msgs->torrent->infoDictLength );
886    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( getSession(msgs) ) );
887    tr_bencDictAddInt( &val, "reqq", REQQ );
888    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
889    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
890    m  = tr_bencDictAddDict( &val, "m", 2 );
891    if( allow_metadata_xfer )
892        tr_bencDictAddInt( m, "ut_metadata", UT_METADATA_ID );
893    if( allow_pex )
894        tr_bencDictAddInt( m, "ut_pex", UT_PEX_ID );
895
896    buf = tr_bencToStr( &val, TR_FMT_BENC, &len );
897
898    tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + len );
899    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
900    tr_peerIoWriteUint8 ( msgs->peer->io, out, LTEP_HANDSHAKE );
901    tr_peerIoWriteBytes ( msgs->peer->io, out, buf, len );
902    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
903    dbgOutMessageLen( msgs );
904
905    /* cleanup */
906    tr_bencFree( &val );
907    tr_free( buf );
908}
909
910static void
911parseLtepHandshake( tr_peermsgs *     msgs,
912                    int               len,
913                    struct evbuffer * inbuf )
914{
915    int64_t   i;
916    tr_benc   val, * sub;
917    uint8_t * tmp = tr_new( uint8_t, len );
918    const uint8_t *addr;
919    size_t addr_len;
920    tr_pex pex;
921
922    memset( &pex, 0, sizeof( tr_pex ) );
923
924    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
925    msgs->peerSentLtepHandshake = 1;
926
927    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
928    {
929        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
930        tr_free( tmp );
931        return;
932    }
933
934    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
935
936    /* does the peer prefer encrypted connections? */
937    if( tr_bencDictFindInt( &val, "e", &i ) ) {
938        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
939                                              : ENCRYPTION_PREFERENCE_NO;
940        if( i )
941            pex.flags |= ADDED_F_ENCRYPTION_FLAG;
942    }
943
944    /* check supported messages for utorrent pex */
945    msgs->peerSupportsPex = 0;
946    msgs->peerSupportsMetadataXfer = 0;
947
948    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
949        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
950            msgs->peerSupportsPex = i != 0;
951            msgs->ut_pex_id = (uint8_t) i;
952            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
953        }
954        if( tr_bencDictFindInt( sub, "ut_metadata", &i ) ) {
955            msgs->peerSupportsMetadataXfer = i != 0;
956            msgs->ut_metadata_id = (uint8_t) i;
957            dbgmsg( msgs, "msgs->ut_metadata_id is %d", (int)msgs->ut_metadata_id );
958        }
959    }
960
961    /* look for metainfo size (BEP 9) */
962    if( tr_bencDictFindInt( &val, "metadata_size", &i ) )
963        tr_torrentSetMetadataSizeHint( msgs->torrent, i );
964
965    /* look for upload_only (BEP 21) */
966    if( tr_bencDictFindInt( &val, "upload_only", &i ) ) {
967        fireUploadOnly( msgs, i!=0 );
968        if( i )
969            pex.flags |= ADDED_F_SEED_FLAG;
970    }
971
972    /* get peer's listening port */
973    if( tr_bencDictFindInt( &val, "p", &i ) ) {
974        pex.port = htons( (uint16_t)i );
975        fireClientGotPort( msgs, pex.port );
976        dbgmsg( msgs, "peer's port is now %d", (int)i );
977    }
978
979    if( tr_peerIoIsIncoming( msgs->peer->io )
980        && tr_bencDictFindRaw( &val, "ipv4", &addr, &addr_len )
981        && ( addr_len == 4 ) )
982    {
983        pex.addr.type = TR_AF_INET;
984        memcpy( &pex.addr.addr.addr4, addr, 4 );
985        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex );
986    }
987
988    if( tr_peerIoIsIncoming( msgs->peer->io )
989        && tr_bencDictFindRaw( &val, "ipv6", &addr, &addr_len )
990        && ( addr_len == 16 ) )
991    {
992        pex.addr.type = TR_AF_INET6;
993        memcpy( &pex.addr.addr.addr6, addr, 16 );
994        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex );
995    }
996
997    /* get peer's maximum request queue size */
998    if( tr_bencDictFindInt( &val, "reqq", &i ) )
999        msgs->reqq = i;
1000
1001    tr_bencFree( &val );
1002    tr_free( tmp );
1003}
1004
1005static void
1006parseUtMetadata( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1007{
1008    tr_benc dict;
1009    char * msg_end;
1010    char * benc_end;
1011    int64_t msg_type = -1;
1012    int64_t piece = -1;
1013    int64_t total_size = 0;
1014    uint8_t * tmp = tr_new( uint8_t, msglen );
1015
1016    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1017    msg_end = (char*)tmp + msglen;
1018
1019    if( !tr_bencLoad( tmp, msglen, &dict, &benc_end ) )
1020    {
1021        tr_bencDictFindInt( &dict, "msg_type", &msg_type );
1022        tr_bencDictFindInt( &dict, "piece", &piece );
1023        tr_bencDictFindInt( &dict, "total_size", &total_size );
1024        tr_bencFree( &dict );
1025    }
1026
1027    dbgmsg( msgs, "got ut_metadata msg: type %d, piece %d, total_size %d",
1028            (int)msg_type, (int)piece, (int)total_size );
1029
1030    if( msg_type == METADATA_MSG_TYPE_REJECT )
1031    {
1032        /* NOOP */
1033    }
1034
1035    if( ( msg_type == METADATA_MSG_TYPE_DATA )
1036        && ( !tr_torrentHasMetadata( msgs->torrent ) )
1037        && ( msg_end - benc_end <= METADATA_PIECE_SIZE )
1038        && ( piece * METADATA_PIECE_SIZE + (msg_end - benc_end) <= total_size ) )
1039    {
1040        const int pieceLen = msg_end - benc_end;
1041        tr_torrentSetMetadataPiece( msgs->torrent, piece, benc_end, pieceLen );
1042    }
1043
1044    if( msg_type == METADATA_MSG_TYPE_REQUEST )
1045    {
1046        if( ( piece >= 0 )
1047            && tr_torrentHasMetadata( msgs->torrent )
1048            && !tr_torrentIsPrivate( msgs->torrent )
1049            && ( msgs->peerAskedForMetadataCount < METADATA_REQQ ) )
1050        {
1051            msgs->peerAskedForMetadata[msgs->peerAskedForMetadataCount++] = piece;
1052        }
1053        else
1054        {
1055            tr_benc tmp;
1056            int payloadLen;
1057            char * payload;
1058            tr_peerIo  * io  = msgs->peer->io;
1059            struct evbuffer * out = msgs->outMessages;
1060
1061            /* build the rejection message */
1062            tr_bencInitDict( &tmp, 2 );
1063            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1064            tr_bencDictAddInt( &tmp, "piece", piece );
1065            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1066            tr_bencFree( &tmp );
1067
1068            /* write it out as a LTEP message to our outMessages buffer */
1069            tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + payloadLen );
1070            tr_peerIoWriteUint8 ( io, out, BT_LTEP );
1071            tr_peerIoWriteUint8 ( io, out, msgs->ut_metadata_id );
1072            tr_peerIoWriteBytes ( io, out, payload, payloadLen );
1073            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1074            dbgOutMessageLen( msgs );
1075
1076            tr_free( payload );
1077        }
1078    }
1079
1080    tr_free( tmp );
1081}
1082
1083static void
1084parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1085{
1086    int loaded = 0;
1087    uint8_t * tmp = tr_new( uint8_t, msglen );
1088    tr_benc val;
1089    tr_torrent * tor = msgs->torrent;
1090    const uint8_t * added;
1091    size_t added_len;
1092
1093    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1094
1095    if( tr_torrentAllowsPex( tor )
1096      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1097    {
1098        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1099        {
1100            tr_pex * pex;
1101            size_t i, n;
1102            size_t added_f_len = 0;
1103            const uint8_t * added_f = NULL;
1104
1105            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1106            pex = tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len, &n );
1107
1108            n = MIN( n, MAX_PEX_PEER_COUNT );
1109            for( i=0; i<n; ++i )
1110                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex + i );
1111
1112            tr_free( pex );
1113        }
1114
1115        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1116        {
1117            tr_pex * pex;
1118            size_t i, n;
1119            size_t added_f_len = 0;
1120            const uint8_t * added_f = NULL;
1121
1122            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1123            pex = tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len, &n );
1124
1125            n = MIN( n, MAX_PEX_PEER_COUNT );
1126            for( i=0; i<n; ++i )
1127                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex + i );
1128
1129            tr_free( pex );
1130        }
1131    }
1132
1133    if( loaded )
1134        tr_bencFree( &val );
1135    tr_free( tmp );
1136}
1137
1138static void sendPex( tr_peermsgs * msgs );
1139
1140static void
1141parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer  * inbuf )
1142{
1143    uint8_t ltep_msgid;
1144
1145    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1146    msglen--;
1147
1148    if( ltep_msgid == LTEP_HANDSHAKE )
1149    {
1150        dbgmsg( msgs, "got ltep handshake" );
1151        parseLtepHandshake( msgs, msglen, inbuf );
1152        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1153        {
1154            sendLtepHandshake( msgs );
1155            sendPex( msgs );
1156        }
1157    }
1158    else if( ltep_msgid == UT_PEX_ID )
1159    {
1160        dbgmsg( msgs, "got ut pex" );
1161        msgs->peerSupportsPex = 1;
1162        parseUtPex( msgs, msglen, inbuf );
1163    }
1164    else if( ltep_msgid == UT_METADATA_ID )
1165    {
1166        dbgmsg( msgs, "got ut metadata" );
1167        msgs->peerSupportsMetadataXfer = 1;
1168        parseUtMetadata( msgs, msglen, inbuf );
1169    }
1170    else
1171    {
1172        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1173        evbuffer_drain( inbuf, msglen );
1174    }
1175}
1176
1177static int
1178readBtLength( tr_peermsgs *     msgs,
1179              struct evbuffer * inbuf,
1180              size_t            inlen )
1181{
1182    uint32_t len;
1183
1184    if( inlen < sizeof( len ) )
1185        return READ_LATER;
1186
1187    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1188
1189    if( len == 0 ) /* peer sent us a keepalive message */
1190        dbgmsg( msgs, "got KeepAlive" );
1191    else
1192    {
1193        msgs->incoming.length = len;
1194        msgs->state = AWAITING_BT_ID;
1195    }
1196
1197    return READ_NOW;
1198}
1199
1200static int readBtMessage( tr_peermsgs     * msgs,
1201                          struct evbuffer * inbuf,
1202                          size_t            inlen );
1203
1204static int
1205readBtId( tr_peermsgs * msgs, struct evbuffer  * inbuf, size_t inlen )
1206{
1207    uint8_t id;
1208
1209    if( inlen < sizeof( uint8_t ) )
1210        return READ_LATER;
1211
1212    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1213    msgs->incoming.id = id;
1214    dbgmsg( msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length );
1215
1216    if( id == BT_PIECE )
1217    {
1218        msgs->state = AWAITING_BT_PIECE;
1219        return READ_NOW;
1220    }
1221    else if( msgs->incoming.length != 1 )
1222    {
1223        msgs->state = AWAITING_BT_MESSAGE;
1224        return READ_NOW;
1225    }
1226    else return readBtMessage( msgs, inbuf, inlen - 1 );
1227}
1228
1229static void
1230updatePeerProgress( tr_peermsgs * msgs )
1231{
1232    msgs->peer->progress = tr_bitsetPercent( &msgs->peer->have );
1233    dbgmsg( msgs, "peer progress is %f", msgs->peer->progress );
1234    updateFastSet( msgs );
1235    updateInterest( msgs );
1236    firePeerProgress( msgs );
1237}
1238
1239static void
1240peerMadeRequest( tr_peermsgs *               msgs,
1241                 const struct peer_request * req )
1242{
1243    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1244    const int reqIsValid = requestIsValid( msgs, req );
1245    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( &msgs->torrent->completion, req->index );
1246    const int peerIsChoked = msgs->peer->peerIsChoked;
1247
1248    int allow = FALSE;
1249
1250    if( !reqIsValid )
1251        dbgmsg( msgs, "rejecting an invalid request." );
1252    else if( !clientHasPiece )
1253        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1254    else if( peerIsChoked )
1255        dbgmsg( msgs, "rejecting request from choked peer" );
1256    else if( msgs->peer->pendingReqsToClient + 1 >= REQQ )
1257        dbgmsg( msgs, "rejecting request ... reqq is full" );
1258    else
1259        allow = TRUE;
1260
1261    if( allow )
1262        msgs->peerAskedFor[msgs->peer->pendingReqsToClient++] = *req;
1263    else if( fext )
1264        protocolSendReject( msgs, req );
1265}
1266
1267static tr_bool
1268messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1269{
1270    switch( id )
1271    {
1272        case BT_CHOKE:
1273        case BT_UNCHOKE:
1274        case BT_INTERESTED:
1275        case BT_NOT_INTERESTED:
1276        case BT_FEXT_HAVE_ALL:
1277        case BT_FEXT_HAVE_NONE:
1278            return len == 1;
1279
1280        case BT_HAVE:
1281        case BT_FEXT_SUGGEST:
1282        case BT_FEXT_ALLOWED_FAST:
1283            return len == 5;
1284
1285        case BT_BITFIELD:
1286            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1287
1288        case BT_REQUEST:
1289        case BT_CANCEL:
1290        case BT_FEXT_REJECT:
1291            return len == 13;
1292
1293        case BT_PIECE:
1294            return len > 9 && len <= 16393;
1295
1296        case BT_PORT:
1297            return len == 3;
1298
1299        case BT_LTEP:
1300            return len >= 2;
1301
1302        default:
1303            return FALSE;
1304    }
1305}
1306
1307static int clientGotBlock( tr_peermsgs *               msgs,
1308                           const uint8_t *             block,
1309                           const struct peer_request * req );
1310
1311static int
1312readBtPiece( tr_peermsgs      * msgs,
1313             struct evbuffer  * inbuf,
1314             size_t             inlen,
1315             size_t           * setme_piece_bytes_read )
1316{
1317    struct peer_request * req = &msgs->incoming.blockReq;
1318
1319    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1320    dbgmsg( msgs, "In readBtPiece" );
1321
1322    if( !req->length )
1323    {
1324        if( inlen < 8 )
1325            return READ_LATER;
1326
1327        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1328        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1329        req->length = msgs->incoming.length - 9;
1330        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1331        return READ_NOW;
1332    }
1333    else
1334    {
1335        int err;
1336
1337        /* read in another chunk of data */
1338        const size_t nLeft = req->length - EVBUFFER_LENGTH( msgs->incoming.block );
1339        size_t n = MIN( nLeft, inlen );
1340        size_t i = n;
1341        void * buf = tr_sessionGetBuffer( getSession( msgs ) );
1342        const size_t buflen = SESSION_BUFFER_SIZE;
1343
1344        while( i > 0 )
1345        {
1346            const size_t thisPass = MIN( i, buflen );
1347            tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, thisPass );
1348            evbuffer_add( msgs->incoming.block, buf, thisPass );
1349            i -= thisPass;
1350        }
1351
1352        tr_sessionReleaseBuffer( getSession( msgs ) );
1353        buf = NULL;
1354
1355        fireClientGotData( msgs, n, TRUE );
1356        *setme_piece_bytes_read += n;
1357        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1358               n, req->index, req->offset, req->length,
1359               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1360        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1361            return READ_LATER;
1362
1363        /* we've got the whole block ... process it */
1364        err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), req );
1365
1366        /* cleanup */
1367        evbuffer_free( msgs->incoming.block );
1368        msgs->incoming.block = evbuffer_new( );
1369        req->length = 0;
1370        msgs->state = AWAITING_BT_LENGTH;
1371        return err ? READ_ERR : READ_NOW;
1372    }
1373}
1374
1375static void updateDesiredRequestCount( tr_peermsgs * msgs, uint64_t now );
1376
1377static int
1378readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1379{
1380    uint32_t      ui32;
1381    uint32_t      msglen = msgs->incoming.length;
1382    const uint8_t id = msgs->incoming.id;
1383#ifndef NDEBUG
1384    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1385#endif
1386    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1387
1388    --msglen; /* id length */
1389
1390    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1391
1392    if( inlen < msglen )
1393        return READ_LATER;
1394
1395    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1396    {
1397        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1398        fireError( msgs, EMSGSIZE );
1399        return READ_ERR;
1400    }
1401
1402    switch( id )
1403    {
1404        case BT_CHOKE:
1405            dbgmsg( msgs, "got Choke" );
1406            msgs->peer->clientIsChoked = 1;
1407            if( !fext )
1408                fireGotChoke( msgs );
1409            break;
1410
1411        case BT_UNCHOKE:
1412            dbgmsg( msgs, "got Unchoke" );
1413            msgs->peer->clientIsChoked = 0;
1414            updateDesiredRequestCount( msgs, tr_date( ) );
1415            break;
1416
1417        case BT_INTERESTED:
1418            dbgmsg( msgs, "got Interested" );
1419            msgs->peer->peerIsInterested = 1;
1420            break;
1421
1422        case BT_NOT_INTERESTED:
1423            dbgmsg( msgs, "got Not Interested" );
1424            msgs->peer->peerIsInterested = 0;
1425            break;
1426
1427        case BT_HAVE:
1428            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1429            dbgmsg( msgs, "got Have: %u", ui32 );
1430            if( tr_torrentHasMetadata( msgs->torrent )
1431                    && ( ui32 >= msgs->torrent->info.pieceCount ) )
1432            {
1433                fireError( msgs, ERANGE );
1434                return READ_ERR;
1435            }
1436            if( tr_bitsetAdd( &msgs->peer->have, ui32 ) )
1437            {
1438                fireError( msgs, ERANGE );
1439                return READ_ERR;
1440            }
1441            updatePeerProgress( msgs );
1442            break;
1443
1444        case BT_BITFIELD: {
1445            const size_t bitCount = tr_torrentHasMetadata( msgs->torrent )
1446                                  ? msgs->torrent->info.pieceCount
1447                                  : msglen * 8;
1448            dbgmsg( msgs, "got a bitfield" );
1449            tr_bitsetReserve( &msgs->peer->have, bitCount );
1450            tr_peerIoReadBytes( msgs->peer->io, inbuf,
1451                                msgs->peer->have.bitfield.bits, msglen );
1452            updatePeerProgress( msgs );
1453            break;
1454        }
1455
1456        case BT_REQUEST:
1457        {
1458            struct peer_request r;
1459            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1460            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1461            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1462            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1463            peerMadeRequest( msgs, &r );
1464            break;
1465        }
1466
1467        case BT_CANCEL:
1468        {
1469            int i;
1470            struct peer_request r;
1471            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1472            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1473            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1474            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1475
1476            for( i=0; i<msgs->peer->pendingReqsToClient; ++i ) {
1477                const struct peer_request * req = msgs->peerAskedFor + i;
1478                if( ( req->index == r.index ) && ( req->offset == r.offset ) && ( req->length == r.length ) )
1479                    break;
1480            }
1481
1482            if( i < msgs->peer->pendingReqsToClient )
1483                tr_removeElementFromArray( msgs->peerAskedFor, i, sizeof( struct peer_request ),
1484                                           msgs->peer->pendingReqsToClient-- );
1485            break;
1486        }
1487
1488        case BT_PIECE:
1489            assert( 0 ); /* handled elsewhere! */
1490            break;
1491
1492        case BT_PORT:
1493            dbgmsg( msgs, "Got a BT_PORT" );
1494            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->dht_port );
1495            if( msgs->peer->dht_port > 0 )
1496                tr_dhtAddNode( getSession(msgs),
1497                               tr_peerAddress( msgs->peer ),
1498                               msgs->peer->dht_port, 0 );
1499            break;
1500
1501        case BT_FEXT_SUGGEST:
1502            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1503            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1504            if( fext )
1505                fireClientGotSuggest( msgs, ui32 );
1506            else {
1507                fireError( msgs, EMSGSIZE );
1508                return READ_ERR;
1509            }
1510            break;
1511
1512        case BT_FEXT_ALLOWED_FAST:
1513            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1514            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1515            if( fext )
1516                fireClientGotAllowedFast( msgs, ui32 );
1517            else {
1518                fireError( msgs, EMSGSIZE );
1519                return READ_ERR;
1520            }
1521            break;
1522
1523        case BT_FEXT_HAVE_ALL:
1524            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1525            if( fext ) {
1526                tr_bitsetSetHaveAll( &msgs->peer->have );
1527                updatePeerProgress( msgs );
1528            } else {
1529                fireError( msgs, EMSGSIZE );
1530                return READ_ERR;
1531            }
1532            break;
1533
1534        case BT_FEXT_HAVE_NONE:
1535            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1536            if( fext ) {
1537                tr_bitsetSetHaveNone( &msgs->peer->have );
1538                updatePeerProgress( msgs );
1539            } else {
1540                fireError( msgs, EMSGSIZE );
1541                return READ_ERR;
1542            }
1543            break;
1544
1545        case BT_FEXT_REJECT:
1546        {
1547            struct peer_request r;
1548            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1549            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1550            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1551            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1552            if( fext )
1553                fireGotRej( msgs, &r );
1554            else {
1555                fireError( msgs, EMSGSIZE );
1556                return READ_ERR;
1557            }
1558            break;
1559        }
1560
1561        case BT_LTEP:
1562            dbgmsg( msgs, "Got a BT_LTEP" );
1563            parseLtep( msgs, msglen, inbuf );
1564            break;
1565
1566        default:
1567            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1568            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1569            break;
1570    }
1571
1572    assert( msglen + 1 == msgs->incoming.length );
1573    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1574
1575    msgs->state = AWAITING_BT_LENGTH;
1576    return READ_NOW;
1577}
1578
1579static void
1580addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1581{
1582    if( !msgs->peer->blame )
1583         msgs->peer->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1584    tr_bitfieldAdd( msgs->peer->blame, index );
1585}
1586
1587/* returns 0 on success, or an errno on failure */
1588static int
1589clientGotBlock( tr_peermsgs *               msgs,
1590                const uint8_t *             data,
1591                const struct peer_request * req )
1592{
1593    int err;
1594    tr_torrent * tor = msgs->torrent;
1595    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1596
1597    assert( msgs );
1598    assert( req );
1599
1600    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1601        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1602                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1603        return EMSGSIZE;
1604    }
1605
1606    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1607
1608    if( !tr_peerMgrDidPeerRequest( msgs->torrent, msgs->peer, block ) ) {
1609        dbgmsg( msgs, "we didn't ask for this message..." );
1610        return 0;
1611    }
1612    if( tr_cpPieceIsComplete( &msgs->torrent->completion, req->index ) ) {
1613        dbgmsg( msgs, "we did ask for this message, but the piece is already complete..." );
1614        return 0;
1615    }
1616
1617    /**
1618    ***  Save the block
1619    **/
1620
1621    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1622        return err;
1623
1624    addPeerToBlamefield( msgs, req->index );
1625    fireGotBlock( msgs, req );
1626    return 0;
1627}
1628
1629static int peerPulse( void * vmsgs );
1630
1631static void
1632didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1633{
1634    tr_peermsgs * msgs = vmsgs;
1635    firePeerGotData( msgs, bytesWritten, wasPieceData );
1636
1637    if ( tr_isPeerIo( io ) && io->userData )
1638        peerPulse( msgs );
1639}
1640
1641static ReadState
1642canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1643{
1644    ReadState         ret;
1645    tr_peermsgs *     msgs = vmsgs;
1646    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1647    const size_t      inlen = EVBUFFER_LENGTH( in );
1648
1649    dbgmsg( msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state );
1650
1651    if( !inlen )
1652    {
1653        ret = READ_LATER;
1654    }
1655    else if( msgs->state == AWAITING_BT_PIECE )
1656    {
1657        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1658    }
1659    else switch( msgs->state )
1660    {
1661        case AWAITING_BT_LENGTH:
1662            ret = readBtLength ( msgs, in, inlen ); break;
1663
1664        case AWAITING_BT_ID:
1665            ret = readBtId     ( msgs, in, inlen ); break;
1666
1667        case AWAITING_BT_MESSAGE:
1668            ret = readBtMessage( msgs, in, inlen ); break;
1669
1670        default:
1671            ret = READ_ERR;
1672            assert( 0 );
1673    }
1674
1675    dbgmsg( msgs, "canRead: ret is %d", (int)ret );
1676
1677    /* log the raw data that was read */
1678    if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
1679        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1680
1681    return ret;
1682}
1683
1684int
1685tr_peerMsgsIsReadingBlock( const tr_peermsgs * msgs, tr_block_index_t block )
1686{
1687    if( msgs->state != AWAITING_BT_PIECE )
1688        return FALSE;
1689
1690    return block == _tr_block( msgs->torrent,
1691                               msgs->incoming.blockReq.index,
1692                               msgs->incoming.blockReq.offset );
1693}
1694
1695/**
1696***
1697**/
1698
1699static void
1700updateDesiredRequestCount( tr_peermsgs * msgs, uint64_t now )
1701{
1702    const tr_torrent * const torrent = msgs->torrent;
1703
1704    if( tr_torrentIsSeed( msgs->torrent ) )
1705    {
1706        msgs->desiredRequestCount = 0;
1707    }
1708    else if( msgs->peer->clientIsChoked )
1709    {
1710        msgs->desiredRequestCount = 0;
1711    }
1712    else if( !msgs->peer->clientIsInterested )
1713    {
1714        msgs->desiredRequestCount = 0;
1715    }
1716    else
1717    {
1718        int irate;
1719        int estimatedBlocksInPeriod;
1720        double rate;
1721        const int floor = 4;
1722        const int seconds = REQUEST_BUF_SECS;
1723
1724        /* Get the rate limit we should use.
1725         * FIXME: this needs to consider all the other peers as well... */
1726        rate = tr_peerGetPieceSpeed( msgs->peer, now, TR_PEER_TO_CLIENT );
1727        if( tr_torrentUsesSpeedLimit( torrent, TR_PEER_TO_CLIENT ) )
1728            rate = MIN( rate, tr_torrentGetSpeedLimit( torrent, TR_PEER_TO_CLIENT ) );
1729
1730        /* honor the session limits, if enabled */
1731        if( tr_torrentUsesSessionLimits( torrent ) )
1732            if( tr_sessionGetActiveSpeedLimit( torrent->session, TR_PEER_TO_CLIENT, &irate ) )
1733                rate = MIN( rate, irate );
1734
1735        /* use this desired rate to figure out how
1736         * many requests we should send to this peer */
1737        estimatedBlocksInPeriod = ( rate * seconds * 1024 ) / torrent->blockSize;
1738        msgs->desiredRequestCount = MAX( floor, estimatedBlocksInPeriod );
1739
1740        /* honor the peer's maximum request count, if specified */
1741        if( msgs->reqq > 0 )
1742            if( msgs->desiredRequestCount > msgs->reqq )
1743                msgs->desiredRequestCount = msgs->reqq;
1744    }
1745}
1746
1747static void
1748updateMetadataRequests( tr_peermsgs * msgs, time_t now )
1749{
1750    int piece;
1751
1752    if( msgs->peerSupportsMetadataXfer
1753        && tr_torrentGetNextMetadataRequest( msgs->torrent, now, &piece ) )
1754    {
1755        tr_benc tmp;
1756        int payloadLen;
1757        char * payload;
1758        tr_peerIo  * io  = msgs->peer->io;
1759        struct evbuffer * out = msgs->outMessages;
1760
1761        /* build the data message */
1762        tr_bencInitDict( &tmp, 3 );
1763        tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REQUEST );
1764        tr_bencDictAddInt( &tmp, "piece", piece );
1765        payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1766        tr_bencFree( &tmp );
1767
1768        dbgmsg( msgs, "requesting metadata piece #%d", piece );
1769
1770        /* write it out as a LTEP message to our outMessages buffer */
1771        tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + payloadLen );
1772        tr_peerIoWriteUint8 ( io, out, BT_LTEP );
1773        tr_peerIoWriteUint8 ( io, out, msgs->ut_metadata_id );
1774        tr_peerIoWriteBytes ( io, out, payload, payloadLen );
1775        pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1776        dbgOutMessageLen( msgs );
1777
1778        tr_free( payload );
1779    }
1780}
1781
1782static void
1783updateBlockRequests( tr_peermsgs * msgs )
1784{
1785    if( tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT )
1786        && ( msgs->desiredRequestCount > 0 )
1787        && ( msgs->peer->pendingReqsToPeer <= ( msgs->desiredRequestCount * 0.66 ) ) )
1788    {
1789        int i;
1790        int n;
1791        const int numwant = msgs->desiredRequestCount - msgs->peer->pendingReqsToPeer;
1792        tr_block_index_t * blocks = tr_new( tr_block_index_t, numwant );
1793
1794        tr_peerMgrGetNextRequests( msgs->torrent, msgs->peer, numwant, blocks, &n );
1795
1796        for( i=0; i<n; ++i )
1797        {
1798            struct peer_request req;
1799            blockToReq( msgs->torrent, blocks[i], &req );
1800            protocolSendRequest( msgs, &req );
1801        }
1802
1803        tr_free( blocks );
1804    }
1805}
1806
1807static void
1808prefetchPieces( tr_peermsgs *msgs )
1809{
1810    int i;
1811
1812    /* Maintain 12 prefetched blocks per unchoked peer */
1813    for( i=msgs->prefetchCount; i<msgs->peer->pendingReqsToClient && i<12; ++i )
1814    {
1815        const struct peer_request * req = msgs->peerAskedFor + i;
1816        tr_ioPrefetch( msgs->torrent, req->index, req->offset, req->length );
1817        ++msgs->prefetchCount;
1818    }
1819}
1820
1821static size_t
1822fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1823{
1824    int piece;
1825    size_t bytesWritten = 0;
1826    struct peer_request req;
1827    const tr_bool haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1828    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1829
1830    /**
1831    ***  Protocol messages
1832    **/
1833
1834    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1835    {
1836        dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1837        msgs->outMessagesBatchedAt = now;
1838    }
1839    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1840    {
1841        const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
1842        /* flush the protocol messages */
1843        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1844        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, FALSE );
1845        msgs->clientSentAnythingAt = now;
1846        msgs->outMessagesBatchedAt = 0;
1847        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1848        bytesWritten +=  len;
1849    }
1850
1851    /**
1852    ***  Metadata Pieces
1853    **/
1854
1855    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= METADATA_PIECE_SIZE )
1856        && popNextMetadataRequest( msgs, &piece ) )
1857    {
1858        char * data;
1859        int dataLen;
1860        tr_bool ok = FALSE;
1861
1862        data = tr_torrentGetMetadataPiece( msgs->torrent, piece, &dataLen );
1863        if( ( dataLen > 0 ) && ( data != NULL ) )
1864        {
1865            tr_benc tmp;
1866            int payloadLen;
1867            char * payload;
1868            tr_peerIo  * io  = msgs->peer->io;
1869            struct evbuffer * out = msgs->outMessages;
1870
1871            /* build the data message */
1872            tr_bencInitDict( &tmp, 3 );
1873            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_DATA );
1874            tr_bencDictAddInt( &tmp, "piece", piece );
1875            tr_bencDictAddInt( &tmp, "total_size", msgs->torrent->infoDictLength );
1876            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1877            tr_bencFree( &tmp );
1878
1879            /* write it out as a LTEP message to our outMessages buffer */
1880            tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + payloadLen + dataLen );
1881            tr_peerIoWriteUint8 ( io, out, BT_LTEP );
1882            tr_peerIoWriteUint8 ( io, out, msgs->ut_metadata_id );
1883            tr_peerIoWriteBytes ( io, out, payload, payloadLen );
1884            tr_peerIoWriteBytes ( io, out, data, dataLen );
1885            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1886            dbgOutMessageLen( msgs );
1887
1888            tr_free( payload );
1889            tr_free( data );
1890
1891            ok = TRUE;
1892        }
1893
1894        if( !ok ) /* send a rejection message */
1895        {
1896            tr_benc tmp;
1897            int payloadLen;
1898            char * payload;
1899            tr_peerIo  * io  = msgs->peer->io;
1900            struct evbuffer * out = msgs->outMessages;
1901
1902            /* build the rejection message */
1903            tr_bencInitDict( &tmp, 2 );
1904            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1905            tr_bencDictAddInt( &tmp, "piece", piece );
1906            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1907            tr_bencFree( &tmp );
1908
1909            /* write it out as a LTEP message to our outMessages buffer */
1910            tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + payloadLen );
1911            tr_peerIoWriteUint8 ( io, out, BT_LTEP );
1912            tr_peerIoWriteUint8 ( io, out, msgs->ut_metadata_id );
1913            tr_peerIoWriteBytes ( io, out, payload, payloadLen );
1914            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1915            dbgOutMessageLen( msgs );
1916
1917            tr_free( payload );
1918        }
1919    }
1920
1921    /**
1922    ***  Data Blocks
1923    **/
1924
1925    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= msgs->torrent->blockSize )
1926        && popNextRequest( msgs, &req ) )
1927    {
1928        --msgs->prefetchCount;
1929
1930        if( requestIsValid( msgs, &req )
1931            && tr_cpPieceIsComplete( &msgs->torrent->completion, req.index ) )
1932        {
1933            /* FIXME(libevent2) use evbuffer_reserve_space() + evbuffer_commit_space() */
1934            int err;
1935            const uint32_t msglen = 4 + 1 + 4 + 4 + req.length;
1936            struct evbuffer * out;
1937            tr_peerIo * io = msgs->peer->io;
1938
1939            out = evbuffer_new( );
1940            evbuffer_expand( out, msglen );
1941
1942            tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1943            tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1944            tr_peerIoWriteUint32( io, out, req.index );
1945            tr_peerIoWriteUint32( io, out, req.offset );
1946
1947            err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, EVBUFFER_DATA(out)+EVBUFFER_LENGTH(out) );
1948            if( err )
1949            {
1950                if( fext )
1951                    protocolSendReject( msgs, &req );
1952            }
1953            else
1954            {
1955                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1956                EVBUFFER_LENGTH(out) += req.length;
1957                assert( EVBUFFER_LENGTH( out ) == msglen );
1958                tr_peerIoWriteBuf( io, out, TRUE );
1959                bytesWritten += EVBUFFER_LENGTH( out );
1960                msgs->clientSentAnythingAt = now;
1961            }
1962
1963            evbuffer_free( out );
1964
1965            if( err )
1966            {
1967                bytesWritten = 0;
1968                msgs = NULL;
1969            }
1970        }
1971        else if( fext ) /* peer needs a reject message */
1972        {
1973            protocolSendReject( msgs, &req );
1974        }
1975
1976        if( msgs != NULL )
1977            prefetchPieces( msgs );
1978    }
1979
1980    /**
1981    ***  Keepalive
1982    **/
1983
1984    if( ( msgs != NULL )
1985        && ( msgs->clientSentAnythingAt != 0 )
1986        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1987    {
1988        dbgmsg( msgs, "sending a keepalive message" );
1989        tr_peerIoWriteUint32( msgs->peer->io, msgs->outMessages, 0 );
1990        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1991    }
1992
1993    return bytesWritten;
1994}
1995
1996static int
1997peerPulse( void * vmsgs )
1998{
1999    tr_peermsgs * msgs = vmsgs;
2000    const time_t  now = tr_time( );
2001
2002    if ( tr_isPeerIo( msgs->peer->io ) ) {
2003        updateDesiredRequestCount( msgs, tr_date( ) );
2004        updateBlockRequests( msgs );
2005        updateMetadataRequests( msgs, now );
2006    }
2007
2008    for( ;; )
2009        if( fillOutputBuffer( msgs, now ) < 1 )
2010            break;
2011
2012    return TRUE; /* loop forever */
2013}
2014
2015void
2016tr_peerMsgsPulse( tr_peermsgs * msgs )
2017{
2018    if( msgs != NULL )
2019        peerPulse( msgs );
2020}
2021
2022static void
2023gotError( tr_peerIo  * io UNUSED,
2024          short        what,
2025          void       * vmsgs )
2026{
2027    if( what & EVBUFFER_TIMEOUT )
2028        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
2029    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
2030        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
2031               what, errno, tr_strerror( errno ) );
2032    fireError( vmsgs, ENOTCONN );
2033}
2034
2035static void
2036sendBitfield( tr_peermsgs * msgs )
2037{
2038    struct evbuffer * out = msgs->outMessages;
2039    tr_bitfield *     field;
2040    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
2041    size_t            i;
2042    size_t            lazyCount = 0;
2043
2044    field = tr_bitfieldDup( tr_cpPieceBitfield( &msgs->torrent->completion ) );
2045
2046    if( tr_sessionIsLazyBitfieldEnabled( getSession( msgs ) ) )
2047    {
2048        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
2049            speed over a truly random sample -- let's limit the pool size to
2050            the first 1000 pieces so large torrents don't bog things down */
2051        size_t poolSize;
2052        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
2053        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
2054
2055        /* build the pool */
2056        for( i=poolSize=0; i<maxPoolSize; ++i )
2057            if( tr_bitfieldHas( field, i ) )
2058                pool[poolSize++] = i;
2059
2060        /* pull random piece indices from the pool */
2061        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
2062        {
2063            const int pos = tr_cryptoWeakRandInt( poolSize );
2064            const tr_piece_index_t piece = pool[pos];
2065            tr_bitfieldRem( field, piece );
2066            lazyPieces[lazyCount++] = piece;
2067            pool[pos] = pool[--poolSize];
2068        }
2069
2070        /* cleanup */
2071        tr_free( pool );
2072    }
2073
2074    tr_peerIoWriteUint32( msgs->peer->io, out,
2075                          sizeof( uint8_t ) + field->byteCount );
2076    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_BITFIELD );
2077    /* FIXME(libevent2): use evbuffer_add_reference() */
2078    tr_peerIoWriteBytes ( msgs->peer->io, out, field->bits, field->byteCount );
2079    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu",
2080            EVBUFFER_LENGTH( out ) );
2081    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
2082
2083    for( i = 0; i < lazyCount; ++i )
2084        protocolSendHave( msgs, lazyPieces[i] );
2085
2086    tr_bitfieldFree( field );
2087}
2088
2089static void
2090tellPeerWhatWeHave( tr_peermsgs * msgs )
2091{
2092    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
2093
2094    if( fext && ( tr_cpGetStatus( &msgs->torrent->completion ) == TR_SEED ) )
2095    {
2096        protocolSendHaveAll( msgs );
2097    }
2098    else if( fext && ( tr_cpHaveValid( &msgs->torrent->completion ) == 0 ) )
2099    {
2100        protocolSendHaveNone( msgs );
2101    }
2102    else
2103    {
2104        sendBitfield( msgs );
2105    }
2106}
2107
2108/**
2109***
2110**/
2111
2112/* some peers give us error messages if we send
2113   more than this many peers in a single pex message
2114   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
2115#define MAX_PEX_ADDED 50
2116#define MAX_PEX_DROPPED 50
2117
2118typedef struct
2119{
2120    tr_pex *  added;
2121    tr_pex *  dropped;
2122    tr_pex *  elements;
2123    int       addedCount;
2124    int       droppedCount;
2125    int       elementCount;
2126}
2127PexDiffs;
2128
2129static void
2130pexAddedCb( void * vpex,
2131            void * userData )
2132{
2133    PexDiffs * diffs = userData;
2134    tr_pex *   pex = vpex;
2135
2136    if( diffs->addedCount < MAX_PEX_ADDED )
2137    {
2138        diffs->added[diffs->addedCount++] = *pex;
2139        diffs->elements[diffs->elementCount++] = *pex;
2140    }
2141}
2142
2143static inline void
2144pexDroppedCb( void * vpex,
2145              void * userData )
2146{
2147    PexDiffs * diffs = userData;
2148    tr_pex *   pex = vpex;
2149
2150    if( diffs->droppedCount < MAX_PEX_DROPPED )
2151    {
2152        diffs->dropped[diffs->droppedCount++] = *pex;
2153    }
2154}
2155
2156static inline void
2157pexElementCb( void * vpex,
2158              void * userData )
2159{
2160    PexDiffs * diffs = userData;
2161    tr_pex * pex = vpex;
2162
2163    diffs->elements[diffs->elementCount++] = *pex;
2164}
2165
2166static void
2167sendPex( tr_peermsgs * msgs )
2168{
2169    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2170    {
2171        PexDiffs diffs;
2172        PexDiffs diffs6;
2173        tr_pex * newPex = NULL;
2174        tr_pex * newPex6 = NULL;
2175        const int newCount = tr_peerMgrGetPeers( msgs->torrent, &newPex, TR_AF_INET, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2176        const int newCount6 = tr_peerMgrGetPeers( msgs->torrent, &newPex6, TR_AF_INET6, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2177
2178        /* build the diffs */
2179        diffs.added = tr_new( tr_pex, newCount );
2180        diffs.addedCount = 0;
2181        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2182        diffs.droppedCount = 0;
2183        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2184        diffs.elementCount = 0;
2185        tr_set_compare( msgs->pex, msgs->pexCount,
2186                        newPex, newCount,
2187                        tr_pexCompare, sizeof( tr_pex ),
2188                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2189        diffs6.added = tr_new( tr_pex, newCount6 );
2190        diffs6.addedCount = 0;
2191        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
2192        diffs6.droppedCount = 0;
2193        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
2194        diffs6.elementCount = 0;
2195        tr_set_compare( msgs->pex6, msgs->pexCount6,
2196                        newPex6, newCount6,
2197                        tr_pexCompare, sizeof( tr_pex ),
2198                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
2199        dbgmsg(
2200            msgs,
2201            "pex: old peer count %d+%d, new peer count %d+%d, "
2202            "added %d+%d, removed %d+%d",
2203            msgs->pexCount, msgs->pexCount6, newCount, newCount6,
2204            diffs.addedCount, diffs6.addedCount,
2205            diffs.droppedCount, diffs6.droppedCount );
2206
2207        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2208            !diffs6.droppedCount )
2209        {
2210            tr_free( diffs.elements );
2211            tr_free( diffs6.elements );
2212        }
2213        else
2214        {
2215            int  i;
2216            tr_benc val;
2217            char * benc;
2218            int bencLen;
2219            uint8_t * tmp, *walk;
2220            tr_peerIo       * io  = msgs->peer->io;
2221            struct evbuffer * out = msgs->outMessages;
2222
2223            /* update peer */
2224            tr_free( msgs->pex );
2225            msgs->pex = diffs.elements;
2226            msgs->pexCount = diffs.elementCount;
2227            tr_free( msgs->pex6 );
2228            msgs->pex6 = diffs6.elements;
2229            msgs->pexCount6 = diffs6.elementCount;
2230
2231            /* build the pex payload */
2232            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
2233                                         * speed vs. likelihood? */
2234
2235            if( diffs.addedCount > 0)
2236            {
2237                /* "added" */
2238                tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2239                for( i = 0; i < diffs.addedCount; ++i ) {
2240                    memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2241                    memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2242                }
2243                assert( ( walk - tmp ) == diffs.addedCount * 6 );
2244                tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2245                tr_free( tmp );
2246
2247                /* "added.f" */
2248                tmp = walk = tr_new( uint8_t, diffs.addedCount );
2249                for( i = 0; i < diffs.addedCount; ++i )
2250                    *walk++ = diffs.added[i].flags;
2251                assert( ( walk - tmp ) == diffs.addedCount );
2252                tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2253                tr_free( tmp );
2254            }
2255
2256            if( diffs.droppedCount > 0 )
2257            {
2258                /* "dropped" */
2259                tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2260                for( i = 0; i < diffs.droppedCount; ++i ) {
2261                    memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2262                    memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2263                }
2264                assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2265                tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2266                tr_free( tmp );
2267            }
2268
2269            if( diffs6.addedCount > 0 )
2270            {
2271                /* "added6" */
2272                tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2273                for( i = 0; i < diffs6.addedCount; ++i ) {
2274                    memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2275                    walk += 16;
2276                    memcpy( walk, &diffs6.added[i].port, 2 );
2277                    walk += 2;
2278                }
2279                assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2280                tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2281                tr_free( tmp );
2282
2283                /* "added6.f" */
2284                tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2285                for( i = 0; i < diffs6.addedCount; ++i )
2286                    *walk++ = diffs6.added[i].flags;
2287                assert( ( walk - tmp ) == diffs6.addedCount );
2288                tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2289                tr_free( tmp );
2290            }
2291
2292            if( diffs6.droppedCount > 0 )
2293            {
2294                /* "dropped6" */
2295                tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2296                for( i = 0; i < diffs6.droppedCount; ++i ) {
2297                    memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2298                    walk += 16;
2299                    memcpy( walk, &diffs6.dropped[i].port, 2 );
2300                    walk += 2;
2301                }
2302                assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2303                tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2304                tr_free( tmp );
2305            }
2306
2307            /* write the pex message */
2308            benc = tr_bencToStr( &val, TR_FMT_BENC, &bencLen );
2309            tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + bencLen );
2310            tr_peerIoWriteUint8 ( io, out, BT_LTEP );
2311            tr_peerIoWriteUint8 ( io, out, msgs->ut_pex_id );
2312            tr_peerIoWriteBytes ( io, out, benc, bencLen );
2313            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2314            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
2315            dbgOutMessageLen( msgs );
2316
2317            tr_free( benc );
2318            tr_bencFree( &val );
2319        }
2320
2321        /* cleanup */
2322        tr_free( diffs.added );
2323        tr_free( diffs.dropped );
2324        tr_free( newPex );
2325        tr_free( diffs6.added );
2326        tr_free( diffs6.dropped );
2327        tr_free( newPex6 );
2328
2329        /*msgs->clientSentPexAt = tr_time( );*/
2330    }
2331}
2332
2333static void
2334pexPulse( int foo UNUSED, short bar UNUSED, void * vmsgs )
2335{
2336    struct tr_peermsgs * msgs = vmsgs;
2337
2338    sendPex( msgs );
2339
2340    tr_timerAdd( &msgs->pexTimer, PEX_INTERVAL_SECS, 0 );
2341}
2342
2343/**
2344***
2345**/
2346
2347tr_peermsgs*
2348tr_peerMsgsNew( struct tr_torrent * torrent,
2349                struct tr_peer    * peer,
2350                tr_delivery_func    func,
2351                void              * userData,
2352                tr_publisher_tag  * setme )
2353{
2354    tr_peermsgs * m;
2355
2356    assert( peer );
2357    assert( peer->io );
2358
2359    m = tr_new0( tr_peermsgs, 1 );
2360    m->publisher = TR_PUBLISHER_INIT;
2361    m->peer = peer;
2362    m->torrent = torrent;
2363    m->peer->clientIsChoked = 1;
2364    m->peer->peerIsChoked = 1;
2365    m->peer->clientIsInterested = 0;
2366    m->peer->peerIsInterested = 0;
2367    m->state = AWAITING_BT_LENGTH;
2368    m->outMessages = evbuffer_new( );
2369    m->outMessagesBatchedAt = 0;
2370    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2371    m->incoming.block = evbuffer_new( );
2372    evtimer_set( &m->pexTimer, pexPulse, m );
2373    tr_timerAdd( &m->pexTimer, PEX_INTERVAL_SECS, 0 );
2374    peer->msgs = m;
2375
2376    *setme = tr_publisherSubscribe( &m->publisher, func, userData );
2377
2378    if( tr_peerIoSupportsLTEP( peer->io ) )
2379        sendLtepHandshake( m );
2380
2381    if( tr_dhtEnabled( torrent->session ) && tr_peerIoSupportsDHT( peer->io ))
2382    {
2383        /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
2384        const struct tr_address *addr = tr_peerIoGetAddress( peer->io, NULL );
2385        if( addr->type == TR_AF_INET || tr_globalIPv6() ) {
2386            protocolSendPort( m, tr_dhtPort( torrent->session ) );
2387        }
2388    }
2389
2390    tellPeerWhatWeHave( m );
2391
2392    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2393    updateDesiredRequestCount( m, tr_date( ) );
2394
2395    return m;
2396}
2397
2398void
2399tr_peerMsgsFree( tr_peermsgs* msgs )
2400{
2401    if( msgs )
2402    {
2403        evtimer_del( &msgs->pexTimer );
2404        tr_publisherDestruct( &msgs->publisher );
2405
2406        evbuffer_free( msgs->incoming.block );
2407        evbuffer_free( msgs->outMessages );
2408        tr_free( msgs->pex6 );
2409        tr_free( msgs->pex );
2410
2411        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2412        tr_free( msgs );
2413    }
2414}
2415
2416void
2417tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2418                        tr_publisher_tag tag )
2419{
2420    tr_publisherUnsubscribe( &peer->publisher, tag );
2421}
Note: See TracBrowser for help on using the repository browser.