source: trunk/libtransmission/peer-msgs.c @ 11509

Last change on this file since 11509 was 11509, checked in by charles, 11 years ago

(trunk libT) fix typo in error message reported by Rolcol

  • Property svn:keywords set to Date Rev Author Id
File size: 71.3 KB
Line 
1/*
2 * This file Copyright (C) 2007-2010 Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 11509 2010-12-10 03:28:27Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdarg.h>
17#include <stdio.h>
18#include <stdlib.h>
19#include <string.h>
20
21#include <event.h>
22
23#include "transmission.h"
24#include "bencode.h"
25#include "cache.h"
26#include "completion.h"
27#include "crypto.h"
28#ifdef WIN32
29#include "net.h" /* for ECONN */
30#endif
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-msgs.h"
34#include "session.h"
35#include "stats.h"
36#include "torrent.h"
37#include "torrent-magnet.h"
38#include "tr-dht.h"
39#include "utils.h"
40#include "version.h"
41
42/**
43***
44**/
45
46enum
47{
48    BT_CHOKE                = 0,
49    BT_UNCHOKE              = 1,
50    BT_INTERESTED           = 2,
51    BT_NOT_INTERESTED       = 3,
52    BT_HAVE                 = 4,
53    BT_BITFIELD             = 5,
54    BT_REQUEST              = 6,
55    BT_PIECE                = 7,
56    BT_CANCEL               = 8,
57    BT_PORT                 = 9,
58
59    BT_FEXT_SUGGEST         = 13,
60    BT_FEXT_HAVE_ALL        = 14,
61    BT_FEXT_HAVE_NONE       = 15,
62    BT_FEXT_REJECT          = 16,
63    BT_FEXT_ALLOWED_FAST    = 17,
64
65    BT_LTEP                 = 20,
66
67    LTEP_HANDSHAKE          = 0,
68
69    UT_PEX_ID               = 1,
70    UT_METADATA_ID          = 3,
71
72    MAX_PEX_PEER_COUNT      = 50,
73
74    MIN_CHOKE_PERIOD_SEC    = 10,
75
76    /* idle seconds before we send a keepalive */
77    KEEPALIVE_INTERVAL_SECS = 100,
78
79    PEX_INTERVAL_SECS       = 90, /* sec between sendPex() calls */
80
81    REQQ                    = 512,
82
83    METADATA_REQQ           = 64,
84
85    /* used in lowering the outMessages queue period */
86    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
87    HIGH_PRIORITY_INTERVAL_SECS = 2,
88    LOW_PRIORITY_INTERVAL_SECS = 10,
89
90    /* number of pieces to remove from the bitfield when
91     * lazy bitfields are turned on */
92    LAZY_PIECE_COUNT = 26,
93
94    /* number of pieces we'll allow in our fast set */
95    MAX_FAST_SET_SIZE = 3,
96
97    /* defined in BEP #9 */
98    METADATA_MSG_TYPE_REQUEST = 0,
99    METADATA_MSG_TYPE_DATA = 1,
100    METADATA_MSG_TYPE_REJECT = 2
101};
102
103enum
104{
105    AWAITING_BT_LENGTH,
106    AWAITING_BT_ID,
107    AWAITING_BT_MESSAGE,
108    AWAITING_BT_PIECE
109};
110
111/**
112***
113**/
114
115struct peer_request
116{
117    uint32_t    index;
118    uint32_t    offset;
119    uint32_t    length;
120};
121
122static uint32_t
123getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
124{
125    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
126    const uint64_t blockPos = tor->blockSize * b;
127    assert( blockPos >= piecePos );
128    return (uint32_t)( blockPos - piecePos );
129}
130
131static void
132blockToReq( const tr_torrent     * tor,
133            tr_block_index_t       block,
134            struct peer_request  * setme )
135{
136    assert( setme != NULL );
137
138    setme->index = tr_torBlockPiece( tor, block );
139    setme->offset = getBlockOffsetInPiece( tor, block );
140    setme->length = tr_torBlockCountBytes( tor, block );
141}
142
143/**
144***
145**/
146
147/* this is raw, unchanged data from the peer regarding
148 * the current message that it's sending us. */
149struct tr_incoming
150{
151    uint8_t                id;
152    uint32_t               length; /* includes the +1 for id length */
153    struct peer_request    blockReq; /* metadata for incoming blocks */
154    struct evbuffer *      block; /* piece data for incoming blocks */
155};
156
157/**
158 * Low-level communication state information about a connected peer.
159 *
160 * This structure remembers the low-level protocol states that we're
161 * in with this peer, such as active requests, pex messages, and so on.
162 * Its fields are all private to peer-msgs.c.
163 *
164 * Data not directly involved with sending & receiving messages is
165 * stored in tr_peer, where it can be accessed by both peermsgs and
166 * the peer manager.
167 *
168 * @see struct peer_atom
169 * @see tr_peer
170 */
171struct tr_peermsgs
172{
173    tr_bool         peerSupportsPex;
174    tr_bool         peerSupportsMetadataXfer;
175    tr_bool         clientSentLtepHandshake;
176    tr_bool         peerSentLtepHandshake;
177
178    /*tr_bool         haveFastSet;*/
179
180    int             desiredRequestCount;
181
182    int             prefetchCount;
183
184    /* how long the outMessages batch should be allowed to grow before
185     * it's flushed -- some messages (like requests >:) should be sent
186     * very quickly; others aren't as urgent. */
187    int8_t          outMessagesBatchPeriod;
188
189    uint8_t         state;
190    uint8_t         ut_pex_id;
191    uint8_t         ut_metadata_id;
192    uint16_t        pexCount;
193    uint16_t        pexCount6;
194
195#if 0
196    size_t                 fastsetSize;
197    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
198#endif
199
200    tr_peer *              peer;
201
202    tr_torrent *           torrent;
203
204    tr_peer_callback      * callback;
205    void                  * callbackData;
206
207    struct evbuffer *      outMessages; /* all the non-piece messages */
208
209    struct peer_request    peerAskedFor[REQQ];
210
211    int                    peerAskedForMetadata[METADATA_REQQ];
212    int                    peerAskedForMetadataCount;
213
214    tr_pex               * pex;
215    tr_pex               * pex6;
216
217    /*time_t                 clientSentPexAt;*/
218    time_t                 clientSentAnythingAt;
219
220    /* when we started batching the outMessages */
221    time_t                outMessagesBatchedAt;
222
223    struct tr_incoming    incoming;
224
225    /* if the peer supports the Extension Protocol in BEP 10 and
226       supplied a reqq argument, it's stored here.  otherwise the
227       value is zero and should be ignored. */
228    int64_t               reqq;
229
230    struct event          pexTimer;
231};
232
233/**
234***
235**/
236
237#if 0
238static tr_bitfield*
239getHave( const struct tr_peermsgs * msgs )
240{
241    if( msgs->peer->have == NULL )
242        msgs->peer->have = tr_bitfieldNew( msgs->torrent->info.pieceCount );
243    return msgs->peer->have;
244}
245#endif
246
247static inline tr_session*
248getSession( struct tr_peermsgs * msgs )
249{
250    return msgs->torrent->session;
251}
252
253/**
254***
255**/
256
257static void
258myDebug( const char * file, int line,
259         const struct tr_peermsgs * msgs,
260         const char * fmt, ... )
261{
262    FILE * fp = tr_getLog( );
263
264    if( fp )
265    {
266        va_list           args;
267        char              timestr[64];
268        struct evbuffer * buf = evbuffer_new( );
269        char *            base = tr_basename( file );
270
271        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
272                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
273                             tr_torrentName( msgs->torrent ),
274                             tr_peerIoGetAddrStr( msgs->peer->io ),
275                             msgs->peer->client );
276        va_start( args, fmt );
277        evbuffer_add_vprintf( buf, fmt, args );
278        va_end( args );
279        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
280        /* FIXME(libevent2) tr_getLog() should return an fd, then use evbuffer_write() here */
281        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
282
283        tr_free( base );
284        evbuffer_free( buf );
285    }
286}
287
288#define dbgmsg( msgs, ... ) \
289    do { \
290        if( tr_deepLoggingIsActive( ) ) \
291            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
292    } while( 0 )
293
294/**
295***
296**/
297
298static void
299pokeBatchPeriod( tr_peermsgs * msgs,
300                 int           interval )
301{
302    if( msgs->outMessagesBatchPeriod > interval )
303    {
304        msgs->outMessagesBatchPeriod = interval;
305        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
306    }
307}
308
309static void
310dbgOutMessageLen( tr_peermsgs * msgs )
311{
312    dbgmsg( msgs, "outMessage size is now %zu", EVBUFFER_LENGTH( msgs->outMessages ) );
313}
314
315static void
316protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
317{
318    tr_peerIo       * io  = msgs->peer->io;
319    struct evbuffer * out = msgs->outMessages;
320
321    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
322
323    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
324    tr_peerIoWriteUint8 ( io, out, BT_FEXT_REJECT );
325    tr_peerIoWriteUint32( io, out, req->index );
326    tr_peerIoWriteUint32( io, out, req->offset );
327    tr_peerIoWriteUint32( io, out, req->length );
328
329    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
330    dbgOutMessageLen( msgs );
331}
332
333static void
334protocolSendRequest( tr_peermsgs               * msgs,
335                     const struct peer_request * req )
336{
337    tr_peerIo       * io  = msgs->peer->io;
338    struct evbuffer * out = msgs->outMessages;
339
340    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
341    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
342    tr_peerIoWriteUint32( io, out, req->index );
343    tr_peerIoWriteUint32( io, out, req->offset );
344    tr_peerIoWriteUint32( io, out, req->length );
345
346    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
347    dbgOutMessageLen( msgs );
348    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
349}
350
351static void
352protocolSendCancel( tr_peermsgs               * msgs,
353                    const struct peer_request * req )
354{
355    tr_peerIo       * io  = msgs->peer->io;
356    struct evbuffer * out = msgs->outMessages;
357
358    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
359    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
360    tr_peerIoWriteUint32( io, out, req->index );
361    tr_peerIoWriteUint32( io, out, req->offset );
362    tr_peerIoWriteUint32( io, out, req->length );
363
364    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
365    dbgOutMessageLen( msgs );
366    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
367}
368
369static void
370protocolSendPort(tr_peermsgs *msgs, uint16_t port)
371{
372    tr_peerIo       * io  = msgs->peer->io;
373    struct evbuffer * out = msgs->outMessages;
374
375    dbgmsg( msgs, "sending Port %u", port);
376    tr_peerIoWriteUint32( io, out, 3 );
377    tr_peerIoWriteUint8 ( io, out, BT_PORT );
378    tr_peerIoWriteUint16( io, out, port);
379}
380
381static void
382protocolSendHave( tr_peermsgs * msgs,
383                  uint32_t      index )
384{
385    tr_peerIo       * io  = msgs->peer->io;
386    struct evbuffer * out = msgs->outMessages;
387
388    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
389    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
390    tr_peerIoWriteUint32( io, out, index );
391
392    dbgmsg( msgs, "sending Have %u", index );
393    dbgOutMessageLen( msgs );
394    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
395}
396
397#if 0
398static void
399protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
400{
401    tr_peerIo       * io  = msgs->peer->io;
402    struct evbuffer * out = msgs->outMessages;
403
404    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
405
406    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
407    tr_peerIoWriteUint8 ( io, out, BT_FEXT_ALLOWED_FAST );
408    tr_peerIoWriteUint32( io, out, pieceIndex );
409
410    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
411    dbgOutMessageLen( msgs );
412}
413#endif
414
415static void
416protocolSendChoke( tr_peermsgs * msgs,
417                   int           choke )
418{
419    tr_peerIo       * io  = msgs->peer->io;
420    struct evbuffer * out = msgs->outMessages;
421
422    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
423    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
424
425    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
426    dbgOutMessageLen( msgs );
427    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
428}
429
430static void
431protocolSendHaveAll( tr_peermsgs * msgs )
432{
433    tr_peerIo       * io  = msgs->peer->io;
434    struct evbuffer * out = msgs->outMessages;
435
436    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
437
438    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
439    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_ALL );
440
441    dbgmsg( msgs, "sending HAVE_ALL..." );
442    dbgOutMessageLen( msgs );
443    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
444}
445
446static void
447protocolSendHaveNone( tr_peermsgs * msgs )
448{
449    tr_peerIo       * io  = msgs->peer->io;
450    struct evbuffer * out = msgs->outMessages;
451
452    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
453
454    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
455    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_NONE );
456
457    dbgmsg( msgs, "sending HAVE_NONE..." );
458    dbgOutMessageLen( msgs );
459    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
460}
461
462/**
463***  EVENTS
464**/
465
466static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0, 0 };
467
468static void
469publish( tr_peermsgs * msgs, tr_peer_event * e )
470{
471    assert( msgs->peer );
472    assert( msgs->peer->msgs == msgs );
473
474    if( msgs->callback != NULL )
475        msgs->callback( msgs->peer, e, msgs->callbackData );
476}
477
478static void
479fireError( tr_peermsgs * msgs, int err )
480{
481    tr_peer_event e = blankEvent;
482    e.eventType = TR_PEER_ERROR;
483    e.err = err;
484    publish( msgs, &e );
485}
486
487static void
488firePeerProgress( tr_peermsgs * msgs )
489{
490    tr_peer_event e = blankEvent;
491    e.eventType = TR_PEER_PEER_PROGRESS;
492    e.progress = msgs->peer->progress;
493    publish( msgs, &e );
494}
495
496static void
497fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
498{
499    tr_peer_event e = blankEvent;
500    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
501    e.pieceIndex = req->index;
502    e.offset = req->offset;
503    e.length = req->length;
504    publish( msgs, &e );
505}
506
507static void
508fireGotRej( tr_peermsgs * msgs, const struct peer_request * req )
509{
510    tr_peer_event e = blankEvent;
511    e.eventType = TR_PEER_CLIENT_GOT_REJ;
512    e.pieceIndex = req->index;
513    e.offset = req->offset;
514    e.length = req->length;
515    publish( msgs, &e );
516}
517
518static void
519fireGotChoke( tr_peermsgs * msgs )
520{
521    tr_peer_event e = blankEvent;
522    e.eventType = TR_PEER_CLIENT_GOT_CHOKE;
523    publish( msgs, &e );
524}
525
526static void
527fireClientGotData( tr_peermsgs * msgs,
528                   uint32_t      length,
529                   int           wasPieceData )
530{
531    tr_peer_event e = blankEvent;
532
533    e.length = length;
534    e.eventType = TR_PEER_CLIENT_GOT_DATA;
535    e.wasPieceData = wasPieceData;
536    publish( msgs, &e );
537}
538
539static void
540fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
541{
542    tr_peer_event e = blankEvent;
543    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
544    e.pieceIndex = pieceIndex;
545    publish( msgs, &e );
546}
547
548static void
549fireClientGotPort( tr_peermsgs * msgs, tr_port port )
550{
551    tr_peer_event e = blankEvent;
552    e.eventType = TR_PEER_CLIENT_GOT_PORT;
553    e.port = port;
554    publish( msgs, &e );
555}
556
557static void
558fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
559{
560    tr_peer_event e = blankEvent;
561    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
562    e.pieceIndex = pieceIndex;
563    publish( msgs, &e );
564}
565
566static void
567firePeerGotData( tr_peermsgs  * msgs,
568                 uint32_t       length,
569                 int            wasPieceData )
570{
571    tr_peer_event e = blankEvent;
572
573    e.length = length;
574    e.eventType = TR_PEER_PEER_GOT_DATA;
575    e.wasPieceData = wasPieceData;
576
577    publish( msgs, &e );
578}
579
580/**
581***  ALLOWED FAST SET
582***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
583**/
584
585size_t
586tr_generateAllowedSet( tr_piece_index_t * setmePieces,
587                       size_t             desiredSetSize,
588                       size_t             pieceCount,
589                       const uint8_t    * infohash,
590                       const tr_address * addr )
591{
592    size_t setSize = 0;
593
594    assert( setmePieces );
595    assert( desiredSetSize <= pieceCount );
596    assert( desiredSetSize );
597    assert( pieceCount );
598    assert( infohash );
599    assert( addr );
600
601    if( addr->type == TR_AF_INET )
602    {
603        uint8_t w[SHA_DIGEST_LENGTH + 4], *walk=w;
604        uint8_t x[SHA_DIGEST_LENGTH];
605
606        uint32_t ui32 = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
607        memcpy( w, &ui32, sizeof( uint32_t ) );
608        walk += sizeof( uint32_t );
609        memcpy( walk, infohash, SHA_DIGEST_LENGTH );                 /* (2) */
610        walk += SHA_DIGEST_LENGTH;
611        tr_sha1( x, w, walk-w, NULL );                               /* (3) */
612        assert( sizeof( w ) == walk-w );
613
614        while( setSize<desiredSetSize )
615        {
616            int i;
617            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
618            {
619                size_t k;
620                uint32_t j = i * 4;                                  /* (5) */
621                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
622                uint32_t index = y % pieceCount;                     /* (7) */
623
624                for( k=0; k<setSize; ++k )                           /* (8) */
625                    if( setmePieces[k] == index )
626                        break;
627
628                if( k == setSize )
629                    setmePieces[setSize++] = index;                  /* (9) */
630            }
631
632            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
633        }
634    }
635
636    return setSize;
637}
638
639static void
640updateFastSet( tr_peermsgs * msgs UNUSED )
641{
642#if 0
643    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
644    const int peerIsNeedy = msgs->peer->progress < 0.10;
645
646    if( fext && peerIsNeedy && !msgs->haveFastSet )
647    {
648        size_t i;
649        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
650        const tr_info * inf = &msgs->torrent->info;
651        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
652
653        /* build the fast set */
654        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
655        msgs->haveFastSet = 1;
656
657        /* send it to the peer */
658        for( i=0; i<msgs->fastsetSize; ++i )
659            protocolSendAllowedFast( msgs, msgs->fastset[i] );
660    }
661#endif
662}
663
664/**
665***  INTEREST
666**/
667
668static void
669sendInterest( tr_peermsgs * msgs, tr_bool clientIsInterested )
670{
671    struct evbuffer * out = msgs->outMessages;
672
673    assert( msgs );
674    assert( tr_isBool( clientIsInterested ) );
675
676    msgs->peer->clientIsInterested = clientIsInterested;
677    dbgmsg( msgs, "Sending %s", clientIsInterested ? "Interested" : "Not Interested" );
678    tr_peerIoWriteUint32( msgs->peer->io, out, sizeof( uint8_t ) );
679    tr_peerIoWriteUint8 ( msgs->peer->io, out, clientIsInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
680
681    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
682    dbgOutMessageLen( msgs );
683}
684
685static void
686updateInterest( tr_peermsgs * msgs UNUSED )
687{
688    /* FIXME -- might need to poke the mgr on startup */
689}
690
691void
692tr_peerMsgsSetInterested( tr_peermsgs * msgs, int isInterested )
693{
694    assert( tr_isBool( isInterested ) );
695
696    if( isInterested != msgs->peer->clientIsInterested )
697        sendInterest( msgs, isInterested );
698}
699
700static tr_bool
701popNextMetadataRequest( tr_peermsgs * msgs, int * piece )
702{
703    if( msgs->peerAskedForMetadataCount == 0 )
704        return FALSE;
705
706    *piece = msgs->peerAskedForMetadata[0];
707
708    tr_removeElementFromArray( msgs->peerAskedForMetadata, 0, sizeof( int ),
709                               msgs->peerAskedForMetadataCount-- );
710
711    return TRUE;
712}
713
714static tr_bool
715popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
716{
717    if( msgs->peer->pendingReqsToClient == 0 )
718        return FALSE;
719
720    *setme = msgs->peerAskedFor[0];
721
722    tr_removeElementFromArray( msgs->peerAskedFor, 0, sizeof( struct peer_request ),
723                               msgs->peer->pendingReqsToClient-- );
724
725    return TRUE;
726}
727
728static void
729cancelAllRequestsToClient( tr_peermsgs * msgs )
730{
731    struct peer_request req;
732    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
733
734    while( popNextRequest( msgs, &req ))
735        if( mustSendCancel )
736            protocolSendReject( msgs, &req );
737}
738
739void
740tr_peerMsgsSetChoke( tr_peermsgs * msgs,
741                     int           choke )
742{
743    const time_t now = tr_time( );
744    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
745
746    assert( msgs );
747    assert( msgs->peer );
748    assert( choke == 0 || choke == 1 );
749
750    if( msgs->peer->chokeChangedAt > fibrillationTime )
751    {
752        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
753    }
754    else if( msgs->peer->peerIsChoked != choke )
755    {
756        msgs->peer->peerIsChoked = choke;
757        if( choke )
758            cancelAllRequestsToClient( msgs );
759        protocolSendChoke( msgs, choke );
760        msgs->peer->chokeChangedAt = now;
761    }
762}
763
764/**
765***
766**/
767
768void
769tr_peerMsgsHave( tr_peermsgs * msgs,
770                 uint32_t      index )
771{
772    protocolSendHave( msgs, index );
773
774    /* since we have more pieces now, we might not be interested in this peer */
775    updateInterest( msgs );
776}
777
778/**
779***
780**/
781
782static tr_bool
783reqIsValid( const tr_peermsgs * peer,
784            uint32_t            index,
785            uint32_t            offset,
786            uint32_t            length )
787{
788    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
789}
790
791static tr_bool
792requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
793{
794    return reqIsValid( msgs, req->index, req->offset, req->length );
795}
796
797void
798tr_peerMsgsCancel( tr_peermsgs * msgs, tr_block_index_t block )
799{
800    struct peer_request req;
801/*fprintf( stderr, "SENDING CANCEL MESSAGE FOR BLOCK %zu\n\t\tFROM PEER %p ------------------------------------\n", (size_t)block, msgs->peer );*/
802    blockToReq( msgs->torrent, block, &req );
803    protocolSendCancel( msgs, &req );
804}
805
806/**
807***
808**/
809
810static void
811sendLtepHandshake( tr_peermsgs * msgs )
812{
813    tr_benc val, *m;
814    char * buf;
815    int len;
816    tr_bool allow_pex;
817    tr_bool allow_metadata_xfer;
818    struct evbuffer * out = msgs->outMessages;
819    const unsigned char * ipv6 = tr_globalIPv6();
820
821    if( msgs->clientSentLtepHandshake )
822        return;
823
824    dbgmsg( msgs, "sending an ltep handshake" );
825    msgs->clientSentLtepHandshake = 1;
826
827    /* decide if we want to advertise metadata xfer support (BEP 9) */
828    if( tr_torrentIsPrivate( msgs->torrent ) )
829        allow_metadata_xfer = 0;
830    else
831        allow_metadata_xfer = 1;
832
833    /* decide if we want to advertise pex support */
834    if( !tr_torrentAllowsPex( msgs->torrent ) )
835        allow_pex = 0;
836    else if( msgs->peerSentLtepHandshake )
837        allow_pex = msgs->peerSupportsPex ? 1 : 0;
838    else
839        allow_pex = 1;
840
841    tr_bencInitDict( &val, 8 );
842    tr_bencDictAddInt( &val, "e", getSession(msgs)->encryptionMode != TR_CLEAR_PREFERRED );
843    if( ipv6 != NULL )
844        tr_bencDictAddRaw( &val, "ipv6", ipv6, 16 );
845    if( allow_metadata_xfer && tr_torrentHasMetadata( msgs->torrent )
846                            && ( msgs->torrent->infoDictLength > 0 ) )
847        tr_bencDictAddInt( &val, "metadata_size", msgs->torrent->infoDictLength );
848    tr_bencDictAddInt( &val, "p", tr_sessionGetPublicPeerPort( getSession(msgs) ) );
849    tr_bencDictAddInt( &val, "reqq", REQQ );
850    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
851    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
852    m  = tr_bencDictAddDict( &val, "m", 2 );
853    if( allow_metadata_xfer )
854        tr_bencDictAddInt( m, "ut_metadata", UT_METADATA_ID );
855    if( allow_pex )
856        tr_bencDictAddInt( m, "ut_pex", UT_PEX_ID );
857
858    buf = tr_bencToStr( &val, TR_FMT_BENC, &len );
859
860    tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + len );
861    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
862    tr_peerIoWriteUint8 ( msgs->peer->io, out, LTEP_HANDSHAKE );
863    tr_peerIoWriteBytes ( msgs->peer->io, out, buf, len );
864    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
865    dbgOutMessageLen( msgs );
866
867    /* cleanup */
868    tr_bencFree( &val );
869    tr_free( buf );
870}
871
872static void
873parseLtepHandshake( tr_peermsgs *     msgs,
874                    int               len,
875                    struct evbuffer * inbuf )
876{
877    int64_t   i;
878    tr_benc   val, * sub;
879    uint8_t * tmp = tr_new( uint8_t, len );
880    const uint8_t *addr;
881    size_t addr_len;
882    tr_pex pex;
883    int8_t seedProbability = -1;
884
885    memset( &pex, 0, sizeof( tr_pex ) );
886
887    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
888    msgs->peerSentLtepHandshake = 1;
889
890    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
891    {
892        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
893        tr_free( tmp );
894        return;
895    }
896
897    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
898
899    /* does the peer prefer encrypted connections? */
900    if( tr_bencDictFindInt( &val, "e", &i ) ) {
901        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
902                                              : ENCRYPTION_PREFERENCE_NO;
903        if( i )
904            pex.flags |= ADDED_F_ENCRYPTION_FLAG;
905    }
906
907    /* check supported messages for utorrent pex */
908    msgs->peerSupportsPex = 0;
909    msgs->peerSupportsMetadataXfer = 0;
910
911    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
912        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
913            msgs->peerSupportsPex = i != 0;
914            msgs->ut_pex_id = (uint8_t) i;
915            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
916        }
917        if( tr_bencDictFindInt( sub, "ut_metadata", &i ) ) {
918            msgs->peerSupportsMetadataXfer = i != 0;
919            msgs->ut_metadata_id = (uint8_t) i;
920            dbgmsg( msgs, "msgs->ut_metadata_id is %d", (int)msgs->ut_metadata_id );
921        }
922    }
923
924    /* look for metainfo size (BEP 9) */
925    if( tr_bencDictFindInt( &val, "metadata_size", &i ) )
926        tr_torrentSetMetadataSizeHint( msgs->torrent, i );
927
928    /* look for upload_only (BEP 21) */
929    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
930        seedProbability = i==0 ? 0 : 100;
931
932    /* get peer's listening port */
933    if( tr_bencDictFindInt( &val, "p", &i ) ) {
934        pex.port = htons( (uint16_t)i );
935        fireClientGotPort( msgs, pex.port );
936        dbgmsg( msgs, "peer's port is now %d", (int)i );
937    }
938
939    if( tr_peerIoIsIncoming( msgs->peer->io )
940        && tr_bencDictFindRaw( &val, "ipv4", &addr, &addr_len )
941        && ( addr_len == 4 ) )
942    {
943        pex.addr.type = TR_AF_INET;
944        memcpy( &pex.addr.addr.addr4, addr, 4 );
945        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
946    }
947
948    if( tr_peerIoIsIncoming( msgs->peer->io )
949        && tr_bencDictFindRaw( &val, "ipv6", &addr, &addr_len )
950        && ( addr_len == 16 ) )
951    {
952        pex.addr.type = TR_AF_INET6;
953        memcpy( &pex.addr.addr.addr6, addr, 16 );
954        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
955    }
956
957    /* get peer's maximum request queue size */
958    if( tr_bencDictFindInt( &val, "reqq", &i ) )
959        msgs->reqq = i;
960
961    tr_bencFree( &val );
962    tr_free( tmp );
963}
964
965static void
966parseUtMetadata( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
967{
968    tr_benc dict;
969    char * msg_end;
970    char * benc_end;
971    int64_t msg_type = -1;
972    int64_t piece = -1;
973    int64_t total_size = 0;
974    uint8_t * tmp = tr_new( uint8_t, msglen );
975
976    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
977    msg_end = (char*)tmp + msglen;
978
979    if( !tr_bencLoad( tmp, msglen, &dict, &benc_end ) )
980    {
981        tr_bencDictFindInt( &dict, "msg_type", &msg_type );
982        tr_bencDictFindInt( &dict, "piece", &piece );
983        tr_bencDictFindInt( &dict, "total_size", &total_size );
984        tr_bencFree( &dict );
985    }
986
987    dbgmsg( msgs, "got ut_metadata msg: type %d, piece %d, total_size %d",
988            (int)msg_type, (int)piece, (int)total_size );
989
990    if( msg_type == METADATA_MSG_TYPE_REJECT )
991    {
992        /* NOOP */
993    }
994
995    if( ( msg_type == METADATA_MSG_TYPE_DATA )
996        && ( !tr_torrentHasMetadata( msgs->torrent ) )
997        && ( msg_end - benc_end <= METADATA_PIECE_SIZE )
998        && ( piece * METADATA_PIECE_SIZE + (msg_end - benc_end) <= total_size ) )
999    {
1000        const int pieceLen = msg_end - benc_end;
1001        tr_torrentSetMetadataPiece( msgs->torrent, piece, benc_end, pieceLen );
1002    }
1003
1004    if( msg_type == METADATA_MSG_TYPE_REQUEST )
1005    {
1006        if( ( piece >= 0 )
1007            && tr_torrentHasMetadata( msgs->torrent )
1008            && !tr_torrentIsPrivate( msgs->torrent )
1009            && ( msgs->peerAskedForMetadataCount < METADATA_REQQ ) )
1010        {
1011            msgs->peerAskedForMetadata[msgs->peerAskedForMetadataCount++] = piece;
1012        }
1013        else
1014        {
1015            tr_benc tmp;
1016            int payloadLen;
1017            char * payload;
1018            tr_peerIo  * io  = msgs->peer->io;
1019            struct evbuffer * out = msgs->outMessages;
1020
1021            /* build the rejection message */
1022            tr_bencInitDict( &tmp, 2 );
1023            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1024            tr_bencDictAddInt( &tmp, "piece", piece );
1025            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1026            tr_bencFree( &tmp );
1027
1028            /* write it out as a LTEP message to our outMessages buffer */
1029            tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + payloadLen );
1030            tr_peerIoWriteUint8 ( io, out, BT_LTEP );
1031            tr_peerIoWriteUint8 ( io, out, msgs->ut_metadata_id );
1032            tr_peerIoWriteBytes ( io, out, payload, payloadLen );
1033            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1034            dbgOutMessageLen( msgs );
1035
1036            tr_free( payload );
1037        }
1038    }
1039
1040    tr_free( tmp );
1041}
1042
1043static void
1044parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1045{
1046    int loaded = 0;
1047    uint8_t * tmp = tr_new( uint8_t, msglen );
1048    tr_benc val;
1049    tr_torrent * tor = msgs->torrent;
1050    const uint8_t * added;
1051    size_t added_len;
1052
1053    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1054
1055    if( tr_torrentAllowsPex( tor )
1056      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1057    {
1058        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1059        {
1060            tr_pex * pex;
1061            size_t i, n;
1062            size_t added_f_len = 0;
1063            const uint8_t * added_f = NULL;
1064
1065            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1066            pex = tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len, &n );
1067
1068            n = MIN( n, MAX_PEX_PEER_COUNT );
1069            for( i=0; i<n; ++i )
1070            {
1071                int seedProbability = -1;
1072                if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
1073                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
1074            }
1075
1076            tr_free( pex );
1077        }
1078
1079        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1080        {
1081            tr_pex * pex;
1082            size_t i, n;
1083            size_t added_f_len = 0;
1084            const uint8_t * added_f = NULL;
1085
1086            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1087            pex = tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len, &n );
1088
1089            n = MIN( n, MAX_PEX_PEER_COUNT );
1090            for( i=0; i<n; ++i )
1091            {
1092                int seedProbability = -1;
1093                if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
1094                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
1095            }
1096
1097            tr_free( pex );
1098        }
1099    }
1100
1101    if( loaded )
1102        tr_bencFree( &val );
1103    tr_free( tmp );
1104}
1105
1106static void sendPex( tr_peermsgs * msgs );
1107
1108static void
1109parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer  * inbuf )
1110{
1111    uint8_t ltep_msgid;
1112
1113    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1114    msglen--;
1115
1116    if( ltep_msgid == LTEP_HANDSHAKE )
1117    {
1118        dbgmsg( msgs, "got ltep handshake" );
1119        parseLtepHandshake( msgs, msglen, inbuf );
1120        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1121        {
1122            sendLtepHandshake( msgs );
1123            sendPex( msgs );
1124        }
1125    }
1126    else if( ltep_msgid == UT_PEX_ID )
1127    {
1128        dbgmsg( msgs, "got ut pex" );
1129        msgs->peerSupportsPex = 1;
1130        parseUtPex( msgs, msglen, inbuf );
1131    }
1132    else if( ltep_msgid == UT_METADATA_ID )
1133    {
1134        dbgmsg( msgs, "got ut metadata" );
1135        msgs->peerSupportsMetadataXfer = 1;
1136        parseUtMetadata( msgs, msglen, inbuf );
1137    }
1138    else
1139    {
1140        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1141        evbuffer_drain( inbuf, msglen );
1142    }
1143}
1144
1145static int
1146readBtLength( tr_peermsgs *     msgs,
1147              struct evbuffer * inbuf,
1148              size_t            inlen )
1149{
1150    uint32_t len;
1151
1152    if( inlen < sizeof( len ) )
1153        return READ_LATER;
1154
1155    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1156
1157    if( len == 0 ) /* peer sent us a keepalive message */
1158        dbgmsg( msgs, "got KeepAlive" );
1159    else
1160    {
1161        msgs->incoming.length = len;
1162        msgs->state = AWAITING_BT_ID;
1163    }
1164
1165    return READ_NOW;
1166}
1167
1168static int readBtMessage( tr_peermsgs     * msgs,
1169                          struct evbuffer * inbuf,
1170                          size_t            inlen );
1171
1172static int
1173readBtId( tr_peermsgs * msgs, struct evbuffer  * inbuf, size_t inlen )
1174{
1175    uint8_t id;
1176
1177    if( inlen < sizeof( uint8_t ) )
1178        return READ_LATER;
1179
1180    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1181    msgs->incoming.id = id;
1182    dbgmsg( msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length );
1183
1184    if( id == BT_PIECE )
1185    {
1186        msgs->state = AWAITING_BT_PIECE;
1187        return READ_NOW;
1188    }
1189    else if( msgs->incoming.length != 1 )
1190    {
1191        msgs->state = AWAITING_BT_MESSAGE;
1192        return READ_NOW;
1193    }
1194    else return readBtMessage( msgs, inbuf, inlen - 1 );
1195}
1196
1197static void
1198updatePeerProgress( tr_peermsgs * msgs )
1199{
1200    msgs->peer->progress = tr_bitsetPercent( &msgs->peer->have );
1201    dbgmsg( msgs, "peer progress is %f", msgs->peer->progress );
1202    updateFastSet( msgs );
1203    updateInterest( msgs );
1204    firePeerProgress( msgs );
1205}
1206
1207static void
1208prefetchPieces( tr_peermsgs *msgs )
1209{
1210    int i;
1211
1212    /* Maintain 12 prefetched blocks per unchoked peer */
1213    for( i=msgs->prefetchCount; i<msgs->peer->pendingReqsToClient && i<12; ++i )
1214    {
1215        const struct peer_request * req = msgs->peerAskedFor + i;
1216        if( requestIsValid( msgs, req ) )
1217        {
1218            tr_cachePrefetchBlock( getSession(msgs)->cache, msgs->torrent, req->index, req->offset, req->length );
1219            ++msgs->prefetchCount;
1220        }
1221    }
1222}
1223
1224static void
1225peerMadeRequest( tr_peermsgs *               msgs,
1226                 const struct peer_request * req )
1227{
1228    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1229    const int reqIsValid = requestIsValid( msgs, req );
1230    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( &msgs->torrent->completion, req->index );
1231    const int peerIsChoked = msgs->peer->peerIsChoked;
1232
1233    int allow = FALSE;
1234
1235    if( !reqIsValid )
1236        dbgmsg( msgs, "rejecting an invalid request." );
1237    else if( !clientHasPiece )
1238        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1239    else if( peerIsChoked )
1240        dbgmsg( msgs, "rejecting request from choked peer" );
1241    else if( msgs->peer->pendingReqsToClient + 1 >= REQQ )
1242        dbgmsg( msgs, "rejecting request ... reqq is full" );
1243    else
1244        allow = TRUE;
1245
1246    if( allow ) {
1247        msgs->peerAskedFor[msgs->peer->pendingReqsToClient++] = *req;
1248        prefetchPieces( msgs );
1249    } else if( fext ) {
1250        protocolSendReject( msgs, req );
1251    }
1252}
1253
1254static tr_bool
1255messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1256{
1257    switch( id )
1258    {
1259        case BT_CHOKE:
1260        case BT_UNCHOKE:
1261        case BT_INTERESTED:
1262        case BT_NOT_INTERESTED:
1263        case BT_FEXT_HAVE_ALL:
1264        case BT_FEXT_HAVE_NONE:
1265            return len == 1;
1266
1267        case BT_HAVE:
1268        case BT_FEXT_SUGGEST:
1269        case BT_FEXT_ALLOWED_FAST:
1270            return len == 5;
1271
1272        case BT_BITFIELD:
1273            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1274
1275        case BT_REQUEST:
1276        case BT_CANCEL:
1277        case BT_FEXT_REJECT:
1278            return len == 13;
1279
1280        case BT_PIECE:
1281            return len > 9 && len <= 16393;
1282
1283        case BT_PORT:
1284            return len == 3;
1285
1286        case BT_LTEP:
1287            return len >= 2;
1288
1289        default:
1290            return FALSE;
1291    }
1292}
1293
1294static int clientGotBlock( tr_peermsgs *               msgs,
1295                           const uint8_t *             block,
1296                           const struct peer_request * req );
1297
1298static int
1299readBtPiece( tr_peermsgs      * msgs,
1300             struct evbuffer  * inbuf,
1301             size_t             inlen,
1302             size_t           * setme_piece_bytes_read )
1303{
1304    struct peer_request * req = &msgs->incoming.blockReq;
1305
1306    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1307    dbgmsg( msgs, "In readBtPiece" );
1308
1309    if( !req->length )
1310    {
1311        if( inlen < 8 )
1312            return READ_LATER;
1313
1314        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1315        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1316        req->length = msgs->incoming.length - 9;
1317        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1318        return READ_NOW;
1319    }
1320    else
1321    {
1322        int err;
1323
1324        /* read in another chunk of data */
1325        const size_t nLeft = req->length - EVBUFFER_LENGTH( msgs->incoming.block );
1326        size_t n = MIN( nLeft, inlen );
1327        size_t i = n;
1328        void * buf = tr_sessionGetBuffer( getSession( msgs ) );
1329        const size_t buflen = SESSION_BUFFER_SIZE;
1330
1331        while( i > 0 )
1332        {
1333            const size_t thisPass = MIN( i, buflen );
1334            tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, thisPass );
1335            evbuffer_add( msgs->incoming.block, buf, thisPass );
1336            i -= thisPass;
1337        }
1338
1339        tr_sessionReleaseBuffer( getSession( msgs ) );
1340        buf = NULL;
1341
1342        fireClientGotData( msgs, n, TRUE );
1343        *setme_piece_bytes_read += n;
1344        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1345               n, req->index, req->offset, req->length,
1346               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1347        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1348            return READ_LATER;
1349
1350        /* we've got the whole block ... process it */
1351        err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), req );
1352
1353        /* cleanup */
1354        evbuffer_free( msgs->incoming.block );
1355        msgs->incoming.block = evbuffer_new( );
1356        req->length = 0;
1357        msgs->state = AWAITING_BT_LENGTH;
1358        return err ? READ_ERR : READ_NOW;
1359    }
1360}
1361
1362static void updateDesiredRequestCount( tr_peermsgs * msgs );
1363
1364static int
1365readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1366{
1367    uint32_t      ui32;
1368    uint32_t      msglen = msgs->incoming.length;
1369    const uint8_t id = msgs->incoming.id;
1370#ifndef NDEBUG
1371    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1372#endif
1373    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1374
1375    --msglen; /* id length */
1376
1377    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1378
1379    if( inlen < msglen )
1380        return READ_LATER;
1381
1382    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1383    {
1384        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1385        fireError( msgs, EMSGSIZE );
1386        return READ_ERR;
1387    }
1388
1389    switch( id )
1390    {
1391        case BT_CHOKE:
1392            dbgmsg( msgs, "got Choke" );
1393            msgs->peer->clientIsChoked = 1;
1394            if( !fext )
1395                fireGotChoke( msgs );
1396            break;
1397
1398        case BT_UNCHOKE:
1399            dbgmsg( msgs, "got Unchoke" );
1400            msgs->peer->clientIsChoked = 0;
1401            updateDesiredRequestCount( msgs );
1402            break;
1403
1404        case BT_INTERESTED:
1405            dbgmsg( msgs, "got Interested" );
1406            msgs->peer->peerIsInterested = 1;
1407            break;
1408
1409        case BT_NOT_INTERESTED:
1410            dbgmsg( msgs, "got Not Interested" );
1411            msgs->peer->peerIsInterested = 0;
1412            break;
1413
1414        case BT_HAVE:
1415            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1416            dbgmsg( msgs, "got Have: %u", ui32 );
1417            if( tr_torrentHasMetadata( msgs->torrent )
1418                    && ( ui32 >= msgs->torrent->info.pieceCount ) )
1419            {
1420                fireError( msgs, ERANGE );
1421                return READ_ERR;
1422            }
1423            if( tr_bitsetAdd( &msgs->peer->have, ui32 ) )
1424            {
1425                fireError( msgs, ERANGE );
1426                return READ_ERR;
1427            }
1428            updatePeerProgress( msgs );
1429            break;
1430
1431        case BT_BITFIELD: {
1432            const size_t bitCount = tr_torrentHasMetadata( msgs->torrent )
1433                                  ? msgs->torrent->info.pieceCount
1434                                  : msglen * 8;
1435            dbgmsg( msgs, "got a bitfield" );
1436            tr_bitsetReserve( &msgs->peer->have, bitCount );
1437            tr_peerIoReadBytes( msgs->peer->io, inbuf,
1438                                msgs->peer->have.bitfield.bits, msglen );
1439            updatePeerProgress( msgs );
1440            break;
1441        }
1442
1443        case BT_REQUEST:
1444        {
1445            struct peer_request r;
1446            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1447            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1448            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1449            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1450            peerMadeRequest( msgs, &r );
1451            break;
1452        }
1453
1454        case BT_CANCEL:
1455        {
1456            int i;
1457            struct peer_request r;
1458            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1459            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1460            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1461            tr_historyAdd( msgs->peer->cancelsSentToClient, tr_time( ), 1 );
1462            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1463
1464            for( i=0; i<msgs->peer->pendingReqsToClient; ++i ) {
1465                const struct peer_request * req = msgs->peerAskedFor + i;
1466                if( ( req->index == r.index ) && ( req->offset == r.offset ) && ( req->length == r.length ) )
1467                    break;
1468            }
1469
1470            if( i < msgs->peer->pendingReqsToClient )
1471                tr_removeElementFromArray( msgs->peerAskedFor, i, sizeof( struct peer_request ),
1472                                           msgs->peer->pendingReqsToClient-- );
1473            break;
1474        }
1475
1476        case BT_PIECE:
1477            assert( 0 ); /* handled elsewhere! */
1478            break;
1479
1480        case BT_PORT:
1481            dbgmsg( msgs, "Got a BT_PORT" );
1482            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->dht_port );
1483            if( msgs->peer->dht_port > 0 )
1484                tr_dhtAddNode( getSession(msgs),
1485                               tr_peerAddress( msgs->peer ),
1486                               msgs->peer->dht_port, 0 );
1487            break;
1488
1489        case BT_FEXT_SUGGEST:
1490            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1491            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1492            if( fext )
1493                fireClientGotSuggest( msgs, ui32 );
1494            else {
1495                fireError( msgs, EMSGSIZE );
1496                return READ_ERR;
1497            }
1498            break;
1499
1500        case BT_FEXT_ALLOWED_FAST:
1501            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1502            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1503            if( fext )
1504                fireClientGotAllowedFast( msgs, ui32 );
1505            else {
1506                fireError( msgs, EMSGSIZE );
1507                return READ_ERR;
1508            }
1509            break;
1510
1511        case BT_FEXT_HAVE_ALL:
1512            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1513            if( fext ) {
1514                tr_bitsetSetHaveAll( &msgs->peer->have );
1515                updatePeerProgress( msgs );
1516            } else {
1517                fireError( msgs, EMSGSIZE );
1518                return READ_ERR;
1519            }
1520            break;
1521
1522        case BT_FEXT_HAVE_NONE:
1523            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1524            if( fext ) {
1525                tr_bitsetSetHaveNone( &msgs->peer->have );
1526                updatePeerProgress( msgs );
1527            } else {
1528                fireError( msgs, EMSGSIZE );
1529                return READ_ERR;
1530            }
1531            break;
1532
1533        case BT_FEXT_REJECT:
1534        {
1535            struct peer_request r;
1536            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1537            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1538            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1539            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1540            if( fext )
1541                fireGotRej( msgs, &r );
1542            else {
1543                fireError( msgs, EMSGSIZE );
1544                return READ_ERR;
1545            }
1546            break;
1547        }
1548
1549        case BT_LTEP:
1550            dbgmsg( msgs, "Got a BT_LTEP" );
1551            parseLtep( msgs, msglen, inbuf );
1552            break;
1553
1554        default:
1555            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1556            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1557            break;
1558    }
1559
1560    assert( msglen + 1 == msgs->incoming.length );
1561    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1562
1563    msgs->state = AWAITING_BT_LENGTH;
1564    return READ_NOW;
1565}
1566
1567static void
1568addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1569{
1570    if( !msgs->peer->blame )
1571         msgs->peer->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1572    tr_bitfieldAdd( msgs->peer->blame, index );
1573}
1574
1575/* returns 0 on success, or an errno on failure */
1576static int
1577clientGotBlock( tr_peermsgs *               msgs,
1578                const uint8_t *             data,
1579                const struct peer_request * req )
1580{
1581    int err;
1582    tr_torrent * tor = msgs->torrent;
1583    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1584
1585    assert( msgs );
1586    assert( req );
1587
1588    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1589        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1590                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1591        return EMSGSIZE;
1592    }
1593
1594    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1595
1596    if( !tr_peerMgrDidPeerRequest( msgs->torrent, msgs->peer, block ) ) {
1597        dbgmsg( msgs, "we didn't ask for this message..." );
1598        return 0;
1599    }
1600    if( tr_cpPieceIsComplete( &msgs->torrent->completion, req->index ) ) {
1601        dbgmsg( msgs, "we did ask for this message, but the piece is already complete..." );
1602        return 0;
1603    }
1604
1605    /**
1606    ***  Save the block
1607    **/
1608
1609    if(( err = tr_cacheWriteBlock( getSession(msgs)->cache, tor, req->index, req->offset, req->length, data )))
1610        return err;
1611
1612    addPeerToBlamefield( msgs, req->index );
1613    fireGotBlock( msgs, req );
1614    return 0;
1615}
1616
1617static int peerPulse( void * vmsgs );
1618
1619static void
1620didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1621{
1622    tr_peermsgs * msgs = vmsgs;
1623    firePeerGotData( msgs, bytesWritten, wasPieceData );
1624
1625    if ( tr_isPeerIo( io ) && io->userData )
1626        peerPulse( msgs );
1627}
1628
1629static ReadState
1630canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1631{
1632    ReadState         ret;
1633    tr_peermsgs *     msgs = vmsgs;
1634    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1635    const size_t      inlen = EVBUFFER_LENGTH( in );
1636
1637    dbgmsg( msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state );
1638
1639    if( !inlen )
1640    {
1641        ret = READ_LATER;
1642    }
1643    else if( msgs->state == AWAITING_BT_PIECE )
1644    {
1645        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1646    }
1647    else switch( msgs->state )
1648    {
1649        case AWAITING_BT_LENGTH:
1650            ret = readBtLength ( msgs, in, inlen ); break;
1651
1652        case AWAITING_BT_ID:
1653            ret = readBtId     ( msgs, in, inlen ); break;
1654
1655        case AWAITING_BT_MESSAGE:
1656            ret = readBtMessage( msgs, in, inlen ); break;
1657
1658        default:
1659            ret = READ_ERR;
1660            assert( 0 );
1661    }
1662
1663    dbgmsg( msgs, "canRead: ret is %d", (int)ret );
1664
1665    /* log the raw data that was read */
1666    if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
1667        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1668
1669    return ret;
1670}
1671
1672int
1673tr_peerMsgsIsReadingBlock( const tr_peermsgs * msgs, tr_block_index_t block )
1674{
1675    if( msgs->state != AWAITING_BT_PIECE )
1676        return FALSE;
1677
1678    return block == _tr_block( msgs->torrent,
1679                               msgs->incoming.blockReq.index,
1680                               msgs->incoming.blockReq.offset );
1681}
1682
1683/**
1684***
1685**/
1686
1687static void
1688updateDesiredRequestCount( tr_peermsgs * msgs )
1689{
1690    const tr_torrent * const torrent = msgs->torrent;
1691
1692    if( tr_torrentIsSeed( msgs->torrent ) )
1693    {
1694        msgs->desiredRequestCount = 0;
1695    }
1696    else if( msgs->peer->clientIsChoked )
1697    {
1698        msgs->desiredRequestCount = 0;
1699    }
1700    else if( !msgs->peer->clientIsInterested )
1701    {
1702        msgs->desiredRequestCount = 0;
1703    }
1704    else
1705    {
1706        int estimatedBlocksInPeriod;
1707        int rate_Bps;
1708        int irate_Bps;
1709        const int floor = 4;
1710        const int seconds = REQUEST_BUF_SECS;
1711        const uint64_t now = tr_time_msec( );
1712
1713        /* Get the rate limit we should use.
1714         * FIXME: this needs to consider all the other peers as well... */
1715        rate_Bps = tr_peerGetPieceSpeed_Bps( msgs->peer, now, TR_PEER_TO_CLIENT );
1716        if( tr_torrentUsesSpeedLimit( torrent, TR_PEER_TO_CLIENT ) )
1717            rate_Bps = MIN( rate_Bps, tr_torrentGetSpeedLimit_Bps( torrent, TR_PEER_TO_CLIENT ) );
1718
1719        /* honor the session limits, if enabled */
1720        if( tr_torrentUsesSessionLimits( torrent ) )
1721            if( tr_sessionGetActiveSpeedLimit_Bps( torrent->session, TR_PEER_TO_CLIENT, &irate_Bps ) )
1722                rate_Bps = MIN( rate_Bps, irate_Bps );
1723
1724        /* use this desired rate to figure out how
1725         * many requests we should send to this peer */
1726        estimatedBlocksInPeriod = ( rate_Bps * seconds ) / torrent->blockSize;
1727        msgs->desiredRequestCount = MAX( floor, estimatedBlocksInPeriod );
1728
1729        /* honor the peer's maximum request count, if specified */
1730        if( msgs->reqq > 0 )
1731            if( msgs->desiredRequestCount > msgs->reqq )
1732                msgs->desiredRequestCount = msgs->reqq;
1733    }
1734}
1735
1736static void
1737updateMetadataRequests( tr_peermsgs * msgs, time_t now )
1738{
1739    int piece;
1740
1741    if( msgs->peerSupportsMetadataXfer
1742        && tr_torrentGetNextMetadataRequest( msgs->torrent, now, &piece ) )
1743    {
1744        tr_benc tmp;
1745        int payloadLen;
1746        char * payload;
1747        tr_peerIo  * io  = msgs->peer->io;
1748        struct evbuffer * out = msgs->outMessages;
1749
1750        /* build the data message */
1751        tr_bencInitDict( &tmp, 3 );
1752        tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REQUEST );
1753        tr_bencDictAddInt( &tmp, "piece", piece );
1754        payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1755        tr_bencFree( &tmp );
1756
1757        dbgmsg( msgs, "requesting metadata piece #%d", piece );
1758
1759        /* write it out as a LTEP message to our outMessages buffer */
1760        tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + payloadLen );
1761        tr_peerIoWriteUint8 ( io, out, BT_LTEP );
1762        tr_peerIoWriteUint8 ( io, out, msgs->ut_metadata_id );
1763        tr_peerIoWriteBytes ( io, out, payload, payloadLen );
1764        pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1765        dbgOutMessageLen( msgs );
1766
1767        tr_free( payload );
1768    }
1769}
1770
1771static void
1772updateBlockRequests( tr_peermsgs * msgs )
1773{
1774    if( tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT )
1775        && ( msgs->desiredRequestCount > 0 )
1776        && ( msgs->peer->pendingReqsToPeer <= ( msgs->desiredRequestCount * 0.66 ) ) )
1777    {
1778        int i;
1779        int n;
1780        const int numwant = msgs->desiredRequestCount - msgs->peer->pendingReqsToPeer;
1781        tr_block_index_t * blocks = tr_new( tr_block_index_t, numwant );
1782
1783        tr_peerMgrGetNextRequests( msgs->torrent, msgs->peer, numwant, blocks, &n );
1784
1785        for( i=0; i<n; ++i )
1786        {
1787            struct peer_request req;
1788            blockToReq( msgs->torrent, blocks[i], &req );
1789            protocolSendRequest( msgs, &req );
1790        }
1791
1792        tr_free( blocks );
1793    }
1794}
1795
1796static size_t
1797fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1798{
1799    int piece;
1800    size_t bytesWritten = 0;
1801    struct peer_request req;
1802    const tr_bool haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1803    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1804
1805    /**
1806    ***  Protocol messages
1807    **/
1808
1809    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1810    {
1811        dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1812        msgs->outMessagesBatchedAt = now;
1813    }
1814    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1815    {
1816        const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
1817        /* flush the protocol messages */
1818        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1819        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, FALSE );
1820        msgs->clientSentAnythingAt = now;
1821        msgs->outMessagesBatchedAt = 0;
1822        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1823        bytesWritten +=  len;
1824    }
1825
1826    /**
1827    ***  Metadata Pieces
1828    **/
1829
1830    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= METADATA_PIECE_SIZE )
1831        && popNextMetadataRequest( msgs, &piece ) )
1832    {
1833        char * data;
1834        int dataLen;
1835        tr_bool ok = FALSE;
1836
1837        data = tr_torrentGetMetadataPiece( msgs->torrent, piece, &dataLen );
1838        if( ( dataLen > 0 ) && ( data != NULL ) )
1839        {
1840            tr_benc tmp;
1841            int payloadLen;
1842            char * payload;
1843            tr_peerIo  * io  = msgs->peer->io;
1844            struct evbuffer * out = msgs->outMessages;
1845
1846            /* build the data message */
1847            tr_bencInitDict( &tmp, 3 );
1848            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_DATA );
1849            tr_bencDictAddInt( &tmp, "piece", piece );
1850            tr_bencDictAddInt( &tmp, "total_size", msgs->torrent->infoDictLength );
1851            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1852            tr_bencFree( &tmp );
1853
1854            /* write it out as a LTEP message to our outMessages buffer */
1855            tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + payloadLen + dataLen );
1856            tr_peerIoWriteUint8 ( io, out, BT_LTEP );
1857            tr_peerIoWriteUint8 ( io, out, msgs->ut_metadata_id );
1858            tr_peerIoWriteBytes ( io, out, payload, payloadLen );
1859            tr_peerIoWriteBytes ( io, out, data, dataLen );
1860            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1861            dbgOutMessageLen( msgs );
1862
1863            tr_free( payload );
1864            tr_free( data );
1865
1866            ok = TRUE;
1867        }
1868
1869        if( !ok ) /* send a rejection message */
1870        {
1871            tr_benc tmp;
1872            int payloadLen;
1873            char * payload;
1874            tr_peerIo  * io  = msgs->peer->io;
1875            struct evbuffer * out = msgs->outMessages;
1876
1877            /* build the rejection message */
1878            tr_bencInitDict( &tmp, 2 );
1879            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1880            tr_bencDictAddInt( &tmp, "piece", piece );
1881            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1882            tr_bencFree( &tmp );
1883
1884            /* write it out as a LTEP message to our outMessages buffer */
1885            tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + payloadLen );
1886            tr_peerIoWriteUint8 ( io, out, BT_LTEP );
1887            tr_peerIoWriteUint8 ( io, out, msgs->ut_metadata_id );
1888            tr_peerIoWriteBytes ( io, out, payload, payloadLen );
1889            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1890            dbgOutMessageLen( msgs );
1891
1892            tr_free( payload );
1893        }
1894    }
1895
1896    /**
1897    ***  Data Blocks
1898    **/
1899
1900    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= msgs->torrent->blockSize )
1901        && popNextRequest( msgs, &req ) )
1902    {
1903        --msgs->prefetchCount;
1904
1905        if( requestIsValid( msgs, &req )
1906            && tr_cpPieceIsComplete( &msgs->torrent->completion, req.index ) )
1907        {
1908            /* FIXME(libevent2) use evbuffer_reserve_space() + evbuffer_commit_space() */
1909            int err;
1910            const uint32_t msglen = 4 + 1 + 4 + 4 + req.length;
1911            struct evbuffer * out;
1912            tr_peerIo * io = msgs->peer->io;
1913
1914            out = evbuffer_new( );
1915            evbuffer_expand( out, msglen );
1916
1917            tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1918            tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1919            tr_peerIoWriteUint32( io, out, req.index );
1920            tr_peerIoWriteUint32( io, out, req.offset );
1921
1922            err = tr_cacheReadBlock( getSession(msgs)->cache, msgs->torrent, req.index, req.offset, req.length, EVBUFFER_DATA(out)+EVBUFFER_LENGTH(out) );
1923
1924            /* check the piece if it needs checking... */
1925            if( !err && tr_torrentPieceNeedsCheck( msgs->torrent, req.index ) )
1926                if(( err = !tr_torrentCheckPiece( msgs->torrent, req.index )))
1927                    tr_torrentSetLocalError( msgs->torrent, _( "Piece #%zu is corrupt!  Please Verify Local Data." ), (size_t)req.index );
1928
1929            if( err )
1930            {
1931                if( fext )
1932                    protocolSendReject( msgs, &req );
1933            }
1934            else
1935            {
1936                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1937                EVBUFFER_LENGTH(out) += req.length;
1938                assert( EVBUFFER_LENGTH( out ) == msglen );
1939                tr_peerIoWriteBuf( io, out, TRUE );
1940                bytesWritten += EVBUFFER_LENGTH( out );
1941                msgs->clientSentAnythingAt = now;
1942                tr_historyAdd( msgs->peer->blocksSentToPeer, tr_time( ), 1 );
1943            }
1944
1945            evbuffer_free( out );
1946
1947            if( err )
1948            {
1949                bytesWritten = 0;
1950                msgs = NULL;
1951            }
1952        }
1953        else if( fext ) /* peer needs a reject message */
1954        {
1955            protocolSendReject( msgs, &req );
1956        }
1957
1958        if( msgs != NULL )
1959            prefetchPieces( msgs );
1960    }
1961
1962    /**
1963    ***  Keepalive
1964    **/
1965
1966    if( ( msgs != NULL )
1967        && ( msgs->clientSentAnythingAt != 0 )
1968        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1969    {
1970        dbgmsg( msgs, "sending a keepalive message" );
1971        tr_peerIoWriteUint32( msgs->peer->io, msgs->outMessages, 0 );
1972        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1973    }
1974
1975    return bytesWritten;
1976}
1977
1978static int
1979peerPulse( void * vmsgs )
1980{
1981    tr_peermsgs * msgs = vmsgs;
1982    const time_t  now = tr_time( );
1983
1984    if ( tr_isPeerIo( msgs->peer->io ) ) {
1985        updateDesiredRequestCount( msgs );
1986        updateBlockRequests( msgs );
1987        updateMetadataRequests( msgs, now );
1988    }
1989
1990    for( ;; )
1991        if( fillOutputBuffer( msgs, now ) < 1 )
1992            break;
1993
1994    return TRUE; /* loop forever */
1995}
1996
1997void
1998tr_peerMsgsPulse( tr_peermsgs * msgs )
1999{
2000    if( msgs != NULL )
2001        peerPulse( msgs );
2002}
2003
2004static void
2005gotError( tr_peerIo  * io UNUSED,
2006          short        what,
2007          void       * vmsgs )
2008{
2009    if( what & EVBUFFER_TIMEOUT )
2010        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
2011    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
2012        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
2013               what, errno, tr_strerror( errno ) );
2014    fireError( vmsgs, ENOTCONN );
2015}
2016
2017static void
2018sendBitfield( tr_peermsgs * msgs )
2019{
2020    struct evbuffer * out = msgs->outMessages;
2021    tr_bitfield *     field;
2022    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
2023    size_t            i;
2024    size_t            lazyCount = 0;
2025
2026    field = tr_bitfieldDup( tr_cpPieceBitfield( &msgs->torrent->completion ) );
2027
2028    if( tr_sessionIsLazyBitfieldEnabled( getSession( msgs ) ) )
2029    {
2030        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
2031            speed over a truly random sample -- let's limit the pool size to
2032            the first 1000 pieces so large torrents don't bog things down */
2033        size_t poolSize;
2034        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
2035        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
2036
2037        /* build the pool */
2038        for( i=poolSize=0; i<maxPoolSize; ++i )
2039            if( tr_bitfieldHas( field, i ) )
2040                pool[poolSize++] = i;
2041
2042        /* pull random piece indices from the pool */
2043        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
2044        {
2045            const int pos = tr_cryptoWeakRandInt( poolSize );
2046            const tr_piece_index_t piece = pool[pos];
2047            tr_bitfieldRem( field, piece );
2048            lazyPieces[lazyCount++] = piece;
2049            pool[pos] = pool[--poolSize];
2050        }
2051
2052        /* cleanup */
2053        tr_free( pool );
2054    }
2055
2056    tr_peerIoWriteUint32( msgs->peer->io, out,
2057                          sizeof( uint8_t ) + field->byteCount );
2058    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_BITFIELD );
2059    /* FIXME(libevent2): use evbuffer_add_reference() */
2060    tr_peerIoWriteBytes ( msgs->peer->io, out, field->bits, field->byteCount );
2061    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu",
2062            EVBUFFER_LENGTH( out ) );
2063    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
2064
2065    for( i = 0; i < lazyCount; ++i )
2066        protocolSendHave( msgs, lazyPieces[i] );
2067
2068    tr_bitfieldFree( field );
2069}
2070
2071static void
2072tellPeerWhatWeHave( tr_peermsgs * msgs )
2073{
2074    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
2075
2076    if( fext && ( tr_cpGetStatus( &msgs->torrent->completion ) == TR_SEED ) )
2077    {
2078        protocolSendHaveAll( msgs );
2079    }
2080    else if( fext && ( tr_cpHaveValid( &msgs->torrent->completion ) == 0 ) )
2081    {
2082        protocolSendHaveNone( msgs );
2083    }
2084    else
2085    {
2086        sendBitfield( msgs );
2087    }
2088}
2089
2090/**
2091***
2092**/
2093
2094/* some peers give us error messages if we send
2095   more than this many peers in a single pex message
2096   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
2097#define MAX_PEX_ADDED 50
2098#define MAX_PEX_DROPPED 50
2099
2100typedef struct
2101{
2102    tr_pex *  added;
2103    tr_pex *  dropped;
2104    tr_pex *  elements;
2105    int       addedCount;
2106    int       droppedCount;
2107    int       elementCount;
2108}
2109PexDiffs;
2110
2111static void
2112pexAddedCb( void * vpex,
2113            void * userData )
2114{
2115    PexDiffs * diffs = userData;
2116    tr_pex *   pex = vpex;
2117
2118    if( diffs->addedCount < MAX_PEX_ADDED )
2119    {
2120        diffs->added[diffs->addedCount++] = *pex;
2121        diffs->elements[diffs->elementCount++] = *pex;
2122    }
2123}
2124
2125static inline void
2126pexDroppedCb( void * vpex,
2127              void * userData )
2128{
2129    PexDiffs * diffs = userData;
2130    tr_pex *   pex = vpex;
2131
2132    if( diffs->droppedCount < MAX_PEX_DROPPED )
2133    {
2134        diffs->dropped[diffs->droppedCount++] = *pex;
2135    }
2136}
2137
2138static inline void
2139pexElementCb( void * vpex,
2140              void * userData )
2141{
2142    PexDiffs * diffs = userData;
2143    tr_pex * pex = vpex;
2144
2145    diffs->elements[diffs->elementCount++] = *pex;
2146}
2147
2148static void
2149sendPex( tr_peermsgs * msgs )
2150{
2151    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2152    {
2153        PexDiffs diffs;
2154        PexDiffs diffs6;
2155        tr_pex * newPex = NULL;
2156        tr_pex * newPex6 = NULL;
2157        const int newCount = tr_peerMgrGetPeers( msgs->torrent, &newPex, TR_AF_INET, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2158        const int newCount6 = tr_peerMgrGetPeers( msgs->torrent, &newPex6, TR_AF_INET6, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2159
2160        /* build the diffs */
2161        diffs.added = tr_new( tr_pex, newCount );
2162        diffs.addedCount = 0;
2163        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2164        diffs.droppedCount = 0;
2165        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2166        diffs.elementCount = 0;
2167        tr_set_compare( msgs->pex, msgs->pexCount,
2168                        newPex, newCount,
2169                        tr_pexCompare, sizeof( tr_pex ),
2170                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2171        diffs6.added = tr_new( tr_pex, newCount6 );
2172        diffs6.addedCount = 0;
2173        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
2174        diffs6.droppedCount = 0;
2175        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
2176        diffs6.elementCount = 0;
2177        tr_set_compare( msgs->pex6, msgs->pexCount6,
2178                        newPex6, newCount6,
2179                        tr_pexCompare, sizeof( tr_pex ),
2180                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
2181        dbgmsg(
2182            msgs,
2183            "pex: old peer count %d+%d, new peer count %d+%d, "
2184            "added %d+%d, removed %d+%d",
2185            msgs->pexCount, msgs->pexCount6, newCount, newCount6,
2186            diffs.addedCount, diffs6.addedCount,
2187            diffs.droppedCount, diffs6.droppedCount );
2188
2189        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2190            !diffs6.droppedCount )
2191        {
2192            tr_free( diffs.elements );
2193            tr_free( diffs6.elements );
2194        }
2195        else
2196        {
2197            int  i;
2198            tr_benc val;
2199            char * benc;
2200            int bencLen;
2201            uint8_t * tmp, *walk;
2202            tr_peerIo       * io  = msgs->peer->io;
2203            struct evbuffer * out = msgs->outMessages;
2204
2205            /* update peer */
2206            tr_free( msgs->pex );
2207            msgs->pex = diffs.elements;
2208            msgs->pexCount = diffs.elementCount;
2209            tr_free( msgs->pex6 );
2210            msgs->pex6 = diffs6.elements;
2211            msgs->pexCount6 = diffs6.elementCount;
2212
2213            /* build the pex payload */
2214            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
2215                                         * speed vs. likelihood? */
2216
2217            if( diffs.addedCount > 0)
2218            {
2219                /* "added" */
2220                tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2221                for( i = 0; i < diffs.addedCount; ++i ) {
2222                    memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2223                    memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2224                }
2225                assert( ( walk - tmp ) == diffs.addedCount * 6 );
2226                tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2227                tr_free( tmp );
2228
2229                /* "added.f" */
2230                tmp = walk = tr_new( uint8_t, diffs.addedCount );
2231                for( i = 0; i < diffs.addedCount; ++i )
2232                    *walk++ = diffs.added[i].flags;
2233                assert( ( walk - tmp ) == diffs.addedCount );
2234                tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2235                tr_free( tmp );
2236            }
2237
2238            if( diffs.droppedCount > 0 )
2239            {
2240                /* "dropped" */
2241                tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2242                for( i = 0; i < diffs.droppedCount; ++i ) {
2243                    memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2244                    memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2245                }
2246                assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2247                tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2248                tr_free( tmp );
2249            }
2250
2251            if( diffs6.addedCount > 0 )
2252            {
2253                /* "added6" */
2254                tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2255                for( i = 0; i < diffs6.addedCount; ++i ) {
2256                    memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2257                    walk += 16;
2258                    memcpy( walk, &diffs6.added[i].port, 2 );
2259                    walk += 2;
2260                }
2261                assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2262                tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2263                tr_free( tmp );
2264
2265                /* "added6.f" */
2266                tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2267                for( i = 0; i < diffs6.addedCount; ++i )
2268                    *walk++ = diffs6.added[i].flags;
2269                assert( ( walk - tmp ) == diffs6.addedCount );
2270                tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2271                tr_free( tmp );
2272            }
2273
2274            if( diffs6.droppedCount > 0 )
2275            {
2276                /* "dropped6" */
2277                tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2278                for( i = 0; i < diffs6.droppedCount; ++i ) {
2279                    memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2280                    walk += 16;
2281                    memcpy( walk, &diffs6.dropped[i].port, 2 );
2282                    walk += 2;
2283                }
2284                assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2285                tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2286                tr_free( tmp );
2287            }
2288
2289            /* write the pex message */
2290            benc = tr_bencToStr( &val, TR_FMT_BENC, &bencLen );
2291            tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + bencLen );
2292            tr_peerIoWriteUint8 ( io, out, BT_LTEP );
2293            tr_peerIoWriteUint8 ( io, out, msgs->ut_pex_id );
2294            tr_peerIoWriteBytes ( io, out, benc, bencLen );
2295            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2296            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
2297            dbgOutMessageLen( msgs );
2298
2299            tr_free( benc );
2300            tr_bencFree( &val );
2301        }
2302
2303        /* cleanup */
2304        tr_free( diffs.added );
2305        tr_free( diffs.dropped );
2306        tr_free( newPex );
2307        tr_free( diffs6.added );
2308        tr_free( diffs6.dropped );
2309        tr_free( newPex6 );
2310
2311        /*msgs->clientSentPexAt = tr_time( );*/
2312    }
2313}
2314
2315static void
2316pexPulse( int foo UNUSED, short bar UNUSED, void * vmsgs )
2317{
2318    struct tr_peermsgs * msgs = vmsgs;
2319
2320    sendPex( msgs );
2321
2322    tr_timerAdd( &msgs->pexTimer, PEX_INTERVAL_SECS, 0 );
2323}
2324
2325/**
2326***
2327**/
2328
2329tr_peermsgs*
2330tr_peerMsgsNew( struct tr_torrent    * torrent,
2331                struct tr_peer       * peer,
2332                tr_peer_callback     * callback,
2333                void                 * callbackData )
2334{
2335    tr_peermsgs * m;
2336
2337    assert( peer );
2338    assert( peer->io );
2339
2340    m = tr_new0( tr_peermsgs, 1 );
2341    m->callback = callback;
2342    m->callbackData = callbackData;
2343    m->peer = peer;
2344    m->torrent = torrent;
2345    m->peer->clientIsChoked = 1;
2346    m->peer->peerIsChoked = 1;
2347    m->peer->clientIsInterested = 0;
2348    m->peer->peerIsInterested = 0;
2349    m->state = AWAITING_BT_LENGTH;
2350    m->outMessages = evbuffer_new( );
2351    m->outMessagesBatchedAt = 0;
2352    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2353    m->incoming.block = evbuffer_new( );
2354    evtimer_set( &m->pexTimer, pexPulse, m );
2355    tr_timerAdd( &m->pexTimer, PEX_INTERVAL_SECS, 0 );
2356    peer->msgs = m;
2357
2358    if( tr_peerIoSupportsLTEP( peer->io ) )
2359        sendLtepHandshake( m );
2360
2361    tellPeerWhatWeHave( m );
2362
2363    if( tr_dhtEnabled( torrent->session ) && tr_peerIoSupportsDHT( peer->io ))
2364    {
2365        /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
2366        const struct tr_address *addr = tr_peerIoGetAddress( peer->io, NULL );
2367        if( addr->type == TR_AF_INET || tr_globalIPv6() ) {
2368            protocolSendPort( m, tr_dhtPort( torrent->session ) );
2369        }
2370    }
2371
2372    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2373    updateDesiredRequestCount( m );
2374
2375    return m;
2376}
2377
2378void
2379tr_peerMsgsFree( tr_peermsgs* msgs )
2380{
2381    if( msgs )
2382    {
2383        evtimer_del( &msgs->pexTimer );
2384
2385        evbuffer_free( msgs->incoming.block );
2386        evbuffer_free( msgs->outMessages );
2387        tr_free( msgs->pex6 );
2388        tr_free( msgs->pex );
2389
2390        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2391        tr_free( msgs );
2392    }
2393}
Note: See TracBrowser for help on using the repository browser.