source: trunk/libtransmission/peer-msgs.c @ 11953

Last change on this file since 11953 was 11953, checked in by jch, 11 years ago

Clear utp_failed flag upon seeing an announcement for ut_holepunch.

  • Property svn:keywords set to Date Rev Author Id
File size: 72.0 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 11953 2011-02-18 00:43:39Z jch $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdarg.h>
17#include <stdio.h>
18#include <stdlib.h>
19#include <string.h>
20
21#include <event2/bufferevent.h>
22#include <event2/event.h>
23
24#include "transmission.h"
25#include "bencode.h"
26#include "cache.h"
27#include "completion.h"
28#include "crypto.h"
29#ifdef WIN32
30#include "net.h" /* for ECONN */
31#endif
32#include "peer-io.h"
33#include "peer-mgr.h"
34#include "peer-msgs.h"
35#include "session.h"
36#include "stats.h"
37#include "torrent.h"
38#include "torrent-magnet.h"
39#include "tr-dht.h"
40#include "utils.h"
41#include "version.h"
42
43/**
44***
45**/
46
47enum
48{
49    BT_CHOKE                = 0,
50    BT_UNCHOKE              = 1,
51    BT_INTERESTED           = 2,
52    BT_NOT_INTERESTED       = 3,
53    BT_HAVE                 = 4,
54    BT_BITFIELD             = 5,
55    BT_REQUEST              = 6,
56    BT_PIECE                = 7,
57    BT_CANCEL               = 8,
58    BT_PORT                 = 9,
59
60    BT_FEXT_SUGGEST         = 13,
61    BT_FEXT_HAVE_ALL        = 14,
62    BT_FEXT_HAVE_NONE       = 15,
63    BT_FEXT_REJECT          = 16,
64    BT_FEXT_ALLOWED_FAST    = 17,
65
66    BT_LTEP                 = 20,
67
68    LTEP_HANDSHAKE          = 0,
69
70    UT_PEX_ID               = 1,
71    UT_METADATA_ID          = 3,
72
73    MAX_PEX_PEER_COUNT      = 50,
74
75    MIN_CHOKE_PERIOD_SEC    = 10,
76
77    /* idle seconds before we send a keepalive */
78    KEEPALIVE_INTERVAL_SECS = 100,
79
80    PEX_INTERVAL_SECS       = 90, /* sec between sendPex() calls */
81
82    REQQ                    = 512,
83
84    METADATA_REQQ           = 64,
85
86    /* used in lowering the outMessages queue period */
87    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
88    HIGH_PRIORITY_INTERVAL_SECS = 2,
89    LOW_PRIORITY_INTERVAL_SECS = 10,
90
91    /* number of pieces to remove from the bitfield when
92     * lazy bitfields are turned on */
93    LAZY_PIECE_COUNT = 26,
94
95    /* number of pieces we'll allow in our fast set */
96    MAX_FAST_SET_SIZE = 3,
97
98    /* defined in BEP #9 */
99    METADATA_MSG_TYPE_REQUEST = 0,
100    METADATA_MSG_TYPE_DATA = 1,
101    METADATA_MSG_TYPE_REJECT = 2
102};
103
104enum
105{
106    AWAITING_BT_LENGTH,
107    AWAITING_BT_ID,
108    AWAITING_BT_MESSAGE,
109    AWAITING_BT_PIECE
110};
111
112/**
113***
114**/
115
116struct peer_request
117{
118    uint32_t    index;
119    uint32_t    offset;
120    uint32_t    length;
121};
122
123static uint32_t
124getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
125{
126    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
127    const uint64_t blockPos = tor->blockSize * b;
128    assert( blockPos >= piecePos );
129    return (uint32_t)( blockPos - piecePos );
130}
131
132static void
133blockToReq( const tr_torrent     * tor,
134            tr_block_index_t       block,
135            struct peer_request  * setme )
136{
137    assert( setme != NULL );
138
139    setme->index = tr_torBlockPiece( tor, block );
140    setme->offset = getBlockOffsetInPiece( tor, block );
141    setme->length = tr_torBlockCountBytes( tor, block );
142}
143
144/**
145***
146**/
147
148/* this is raw, unchanged data from the peer regarding
149 * the current message that it's sending us. */
150struct tr_incoming
151{
152    uint8_t                id;
153    uint32_t               length; /* includes the +1 for id length */
154    struct peer_request    blockReq; /* metadata for incoming blocks */
155    struct evbuffer *      block; /* piece data for incoming blocks */
156};
157
158/**
159 * Low-level communication state information about a connected peer.
160 *
161 * This structure remembers the low-level protocol states that we're
162 * in with this peer, such as active requests, pex messages, and so on.
163 * Its fields are all private to peer-msgs.c.
164 *
165 * Data not directly involved with sending & receiving messages is
166 * stored in tr_peer, where it can be accessed by both peermsgs and
167 * the peer manager.
168 *
169 * @see struct peer_atom
170 * @see tr_peer
171 */
172struct tr_peermsgs
173{
174    tr_bool         peerSupportsPex;
175    tr_bool         peerSupportsMetadataXfer;
176    tr_bool         clientSentLtepHandshake;
177    tr_bool         peerSentLtepHandshake;
178
179    /*tr_bool         haveFastSet;*/
180
181    int             desiredRequestCount;
182
183    int             prefetchCount;
184
185    /* how long the outMessages batch should be allowed to grow before
186     * it's flushed -- some messages (like requests >:) should be sent
187     * very quickly; others aren't as urgent. */
188    int8_t          outMessagesBatchPeriod;
189
190    uint8_t         state;
191    uint8_t         ut_pex_id;
192    uint8_t         ut_metadata_id;
193    uint16_t        pexCount;
194    uint16_t        pexCount6;
195
196#if 0
197    size_t                 fastsetSize;
198    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
199#endif
200
201    tr_peer *              peer;
202
203    tr_torrent *           torrent;
204
205    tr_peer_callback      * callback;
206    void                  * callbackData;
207
208    struct evbuffer *      outMessages; /* all the non-piece messages */
209
210    struct peer_request    peerAskedFor[REQQ];
211
212    int                    peerAskedForMetadata[METADATA_REQQ];
213    int                    peerAskedForMetadataCount;
214
215    tr_pex               * pex;
216    tr_pex               * pex6;
217
218    /*time_t                 clientSentPexAt;*/
219    time_t                 clientSentAnythingAt;
220
221    /* when we started batching the outMessages */
222    time_t                outMessagesBatchedAt;
223
224    struct tr_incoming    incoming;
225
226    /* if the peer supports the Extension Protocol in BEP 10 and
227       supplied a reqq argument, it's stored here. Otherwise, the
228       value is zero and should be ignored. */
229    int64_t               reqq;
230
231    struct event        * pexTimer;
232};
233
234/**
235***
236**/
237
238#if 0
239static tr_bitfield*
240getHave( const struct tr_peermsgs * msgs )
241{
242    if( msgs->peer->have == NULL )
243        msgs->peer->have = tr_bitfieldNew( msgs->torrent->info.pieceCount );
244    return msgs->peer->have;
245}
246#endif
247
248static inline tr_session*
249getSession( struct tr_peermsgs * msgs )
250{
251    return msgs->torrent->session;
252}
253
254/**
255***
256**/
257
258static void
259myDebug( const char * file, int line,
260         const struct tr_peermsgs * msgs,
261         const char * fmt, ... )
262{
263    FILE * fp = tr_getLog( );
264
265    if( fp )
266    {
267        va_list           args;
268        char              timestr[64];
269        struct evbuffer * buf = evbuffer_new( );
270        char *            base = tr_basename( file );
271
272        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
273                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
274                             tr_torrentName( msgs->torrent ),
275                             tr_peerIoGetAddrStr( msgs->peer->io ),
276                             msgs->peer->client );
277        va_start( args, fmt );
278        evbuffer_add_vprintf( buf, fmt, args );
279        va_end( args );
280        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
281        fputs( (const char*)evbuffer_pullup( buf, -1 ), fp );
282
283        tr_free( base );
284        evbuffer_free( buf );
285    }
286}
287
288#define dbgmsg( msgs, ... ) \
289    do { \
290        if( tr_deepLoggingIsActive( ) ) \
291            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
292    } while( 0 )
293
294/**
295***
296**/
297
298static void
299pokeBatchPeriod( tr_peermsgs * msgs,
300                 int           interval )
301{
302    if( msgs->outMessagesBatchPeriod > interval )
303    {
304        msgs->outMessagesBatchPeriod = interval;
305        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
306    }
307}
308
309static void
310dbgOutMessageLen( tr_peermsgs * msgs )
311{
312    dbgmsg( msgs, "outMessage size is now %zu", evbuffer_get_length( msgs->outMessages ) );
313}
314
315static void
316protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
317{
318    struct evbuffer * out = msgs->outMessages;
319
320    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
321
322    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
323    evbuffer_add_uint8 ( out, BT_FEXT_REJECT );
324    evbuffer_add_uint32( out, req->index );
325    evbuffer_add_uint32( out, req->offset );
326    evbuffer_add_uint32( out, req->length );
327
328    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
329    dbgOutMessageLen( msgs );
330}
331
332static void
333protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
334{
335    struct evbuffer * out = msgs->outMessages;
336
337    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
338    evbuffer_add_uint8 ( out, BT_REQUEST );
339    evbuffer_add_uint32( out, req->index );
340    evbuffer_add_uint32( out, req->offset );
341    evbuffer_add_uint32( out, req->length );
342
343    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
344    dbgOutMessageLen( msgs );
345    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
346}
347
348static void
349protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
350{
351    struct evbuffer * out = msgs->outMessages;
352
353    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
354    evbuffer_add_uint8 ( out, BT_CANCEL );
355    evbuffer_add_uint32( out, req->index );
356    evbuffer_add_uint32( out, req->offset );
357    evbuffer_add_uint32( out, req->length );
358
359    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
360    dbgOutMessageLen( msgs );
361    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
362}
363
364static void
365protocolSendPort(tr_peermsgs *msgs, uint16_t port)
366{
367    struct evbuffer * out = msgs->outMessages;
368
369    dbgmsg( msgs, "sending Port %u", port);
370    evbuffer_add_uint32( out, 3 );
371    evbuffer_add_uint8 ( out, BT_PORT );
372    evbuffer_add_uint16( out, port);
373}
374
375static void
376protocolSendHave( tr_peermsgs * msgs, uint32_t index )
377{
378    struct evbuffer * out = msgs->outMessages;
379
380    evbuffer_add_uint32( out, sizeof(uint8_t) + sizeof(uint32_t) );
381    evbuffer_add_uint8 ( out, BT_HAVE );
382    evbuffer_add_uint32( out, index );
383
384    dbgmsg( msgs, "sending Have %u", index );
385    dbgOutMessageLen( msgs );
386    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
387}
388
389#if 0
390static void
391protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
392{
393    tr_peerIo       * io  = msgs->peer->io;
394    struct evbuffer * out = msgs->outMessages;
395
396    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
397
398    evbuffer_add_uint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
399    evbuffer_add_uint8 ( io, out, BT_FEXT_ALLOWED_FAST );
400    evbuffer_add_uint32( io, out, pieceIndex );
401
402    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
403    dbgOutMessageLen( msgs );
404}
405#endif
406
407static void
408protocolSendChoke( tr_peermsgs * msgs, int choke )
409{
410    struct evbuffer * out = msgs->outMessages;
411
412    evbuffer_add_uint32( out, sizeof( uint8_t ) );
413    evbuffer_add_uint8 ( out, choke ? BT_CHOKE : BT_UNCHOKE );
414
415    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
416    dbgOutMessageLen( msgs );
417    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
418}
419
420static void
421protocolSendHaveAll( tr_peermsgs * msgs )
422{
423    struct evbuffer * out = msgs->outMessages;
424
425    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
426
427    evbuffer_add_uint32( out, sizeof( uint8_t ) );
428    evbuffer_add_uint8 ( out, BT_FEXT_HAVE_ALL );
429
430    dbgmsg( msgs, "sending HAVE_ALL..." );
431    dbgOutMessageLen( msgs );
432    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
433}
434
435static void
436protocolSendHaveNone( tr_peermsgs * msgs )
437{
438    struct evbuffer * out = msgs->outMessages;
439
440    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
441
442    evbuffer_add_uint32( out, sizeof( uint8_t ) );
443    evbuffer_add_uint8 ( out, BT_FEXT_HAVE_NONE );
444
445    dbgmsg( msgs, "sending HAVE_NONE..." );
446    dbgOutMessageLen( msgs );
447    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
448}
449
450/**
451***  EVENTS
452**/
453
454static void
455publish( tr_peermsgs * msgs, tr_peer_event * e )
456{
457    assert( msgs->peer );
458    assert( msgs->peer->msgs == msgs );
459
460    if( msgs->callback != NULL )
461        msgs->callback( msgs->peer, e, msgs->callbackData );
462}
463
464static void
465fireError( tr_peermsgs * msgs, int err )
466{
467    tr_peer_event e = TR_PEER_EVENT_INIT;
468    e.eventType = TR_PEER_ERROR;
469    e.err = err;
470    publish( msgs, &e );
471}
472
473static void
474firePeerProgress( tr_peermsgs * msgs )
475{
476    tr_peer_event e = TR_PEER_EVENT_INIT;
477    e.eventType = TR_PEER_PEER_PROGRESS;
478    e.progress = msgs->peer->progress;
479    publish( msgs, &e );
480}
481
482static void
483fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
484{
485    tr_peer_event e = TR_PEER_EVENT_INIT;
486    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
487    e.pieceIndex = req->index;
488    e.offset = req->offset;
489    e.length = req->length;
490    publish( msgs, &e );
491}
492
493static void
494fireGotRej( tr_peermsgs * msgs, const struct peer_request * req )
495{
496    tr_peer_event e = TR_PEER_EVENT_INIT;
497    e.eventType = TR_PEER_CLIENT_GOT_REJ;
498    e.pieceIndex = req->index;
499    e.offset = req->offset;
500    e.length = req->length;
501    publish( msgs, &e );
502}
503
504static void
505fireGotChoke( tr_peermsgs * msgs )
506{
507    tr_peer_event e = TR_PEER_EVENT_INIT;
508    e.eventType = TR_PEER_CLIENT_GOT_CHOKE;
509    publish( msgs, &e );
510}
511
512static void
513fireClientGotHaveAll( tr_peermsgs * msgs )
514{
515    tr_peer_event e = TR_PEER_EVENT_INIT;
516    e.eventType = TR_PEER_CLIENT_GOT_HAVE_ALL;
517    publish( msgs, &e );
518}
519
520static void
521fireClientGotHaveNone( tr_peermsgs * msgs )
522{
523    tr_peer_event e = TR_PEER_EVENT_INIT;
524    e.eventType = TR_PEER_CLIENT_GOT_HAVE_NONE;
525    publish( msgs, &e );
526}
527
528static void
529fireClientGotData( tr_peermsgs * msgs,
530                   uint32_t      length,
531                   int           wasPieceData )
532{
533    tr_peer_event e = TR_PEER_EVENT_INIT;
534
535    e.length = length;
536    e.eventType = TR_PEER_CLIENT_GOT_DATA;
537    e.wasPieceData = wasPieceData;
538    publish( msgs, &e );
539}
540
541static void
542fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
543{
544    tr_peer_event e = TR_PEER_EVENT_INIT;
545    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
546    e.pieceIndex = pieceIndex;
547    publish( msgs, &e );
548}
549
550static void
551fireClientGotPort( tr_peermsgs * msgs, tr_port port )
552{
553    tr_peer_event e = TR_PEER_EVENT_INIT;
554    e.eventType = TR_PEER_CLIENT_GOT_PORT;
555    e.port = port;
556    publish( msgs, &e );
557}
558
559static void
560fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
561{
562    tr_peer_event e = TR_PEER_EVENT_INIT;
563    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
564    e.pieceIndex = pieceIndex;
565    publish( msgs, &e );
566}
567
568static void
569fireClientGotBitfield( tr_peermsgs * msgs, tr_bitfield * bitfield )
570{
571    tr_peer_event e = TR_PEER_EVENT_INIT;
572    e.eventType = TR_PEER_CLIENT_GOT_BITFIELD;
573    e.bitfield = bitfield;
574    publish( msgs, &e );
575}
576
577static void
578fireClientGotHave( tr_peermsgs * msgs, tr_piece_index_t index )
579{
580    tr_peer_event e = TR_PEER_EVENT_INIT;
581    e.eventType = TR_PEER_CLIENT_GOT_HAVE;
582    e.pieceIndex = index;
583    publish( msgs, &e );
584}
585
586static void
587firePeerGotData( tr_peermsgs  * msgs,
588                 uint32_t       length,
589                 int            wasPieceData )
590{
591    tr_peer_event e = TR_PEER_EVENT_INIT;
592
593    e.length = length;
594    e.eventType = TR_PEER_PEER_GOT_DATA;
595    e.wasPieceData = wasPieceData;
596
597    publish( msgs, &e );
598}
599
600/**
601***  ALLOWED FAST SET
602***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
603**/
604
605size_t
606tr_generateAllowedSet( tr_piece_index_t * setmePieces,
607                       size_t             desiredSetSize,
608                       size_t             pieceCount,
609                       const uint8_t    * infohash,
610                       const tr_address * addr )
611{
612    size_t setSize = 0;
613
614    assert( setmePieces );
615    assert( desiredSetSize <= pieceCount );
616    assert( desiredSetSize );
617    assert( pieceCount );
618    assert( infohash );
619    assert( addr );
620
621    if( addr->type == TR_AF_INET )
622    {
623        uint8_t w[SHA_DIGEST_LENGTH + 4], *walk=w;
624        uint8_t x[SHA_DIGEST_LENGTH];
625
626        uint32_t ui32 = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
627        memcpy( w, &ui32, sizeof( uint32_t ) );
628        walk += sizeof( uint32_t );
629        memcpy( walk, infohash, SHA_DIGEST_LENGTH );                 /* (2) */
630        walk += SHA_DIGEST_LENGTH;
631        tr_sha1( x, w, walk-w, NULL );                               /* (3) */
632        assert( sizeof( w ) == walk-w );
633
634        while( setSize<desiredSetSize )
635        {
636            int i;
637            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
638            {
639                size_t k;
640                uint32_t j = i * 4;                                  /* (5) */
641                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
642                uint32_t index = y % pieceCount;                     /* (7) */
643
644                for( k=0; k<setSize; ++k )                           /* (8) */
645                    if( setmePieces[k] == index )
646                        break;
647
648                if( k == setSize )
649                    setmePieces[setSize++] = index;                  /* (9) */
650            }
651
652            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
653        }
654    }
655
656    return setSize;
657}
658
659static void
660updateFastSet( tr_peermsgs * msgs UNUSED )
661{
662#if 0
663    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
664    const int peerIsNeedy = msgs->peer->progress < 0.10;
665
666    if( fext && peerIsNeedy && !msgs->haveFastSet )
667    {
668        size_t i;
669        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
670        const tr_info * inf = &msgs->torrent->info;
671        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
672
673        /* build the fast set */
674        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
675        msgs->haveFastSet = 1;
676
677        /* send it to the peer */
678        for( i=0; i<msgs->fastsetSize; ++i )
679            protocolSendAllowedFast( msgs, msgs->fastset[i] );
680    }
681#endif
682}
683
684/**
685***  INTEREST
686**/
687
688static void
689sendInterest( tr_peermsgs * msgs, tr_bool clientIsInterested )
690{
691    struct evbuffer * out = msgs->outMessages;
692
693    assert( msgs );
694    assert( tr_isBool( clientIsInterested ) );
695
696    msgs->peer->clientIsInterested = clientIsInterested;
697    dbgmsg( msgs, "Sending %s", clientIsInterested ? "Interested" : "Not Interested" );
698    evbuffer_add_uint32( out, sizeof( uint8_t ) );
699    evbuffer_add_uint8 ( out, clientIsInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
700
701    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
702    dbgOutMessageLen( msgs );
703}
704
705static void
706updateInterest( tr_peermsgs * msgs UNUSED )
707{
708    /* FIXME -- might need to poke the mgr on startup */
709}
710
711void
712tr_peerMsgsSetInterested( tr_peermsgs * msgs, int isInterested )
713{
714    assert( tr_isBool( isInterested ) );
715
716    if( isInterested != msgs->peer->clientIsInterested )
717        sendInterest( msgs, isInterested );
718}
719
720static tr_bool
721popNextMetadataRequest( tr_peermsgs * msgs, int * piece )
722{
723    if( msgs->peerAskedForMetadataCount == 0 )
724        return FALSE;
725
726    *piece = msgs->peerAskedForMetadata[0];
727
728    tr_removeElementFromArray( msgs->peerAskedForMetadata, 0, sizeof( int ),
729                               msgs->peerAskedForMetadataCount-- );
730
731    return TRUE;
732}
733
734static tr_bool
735popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
736{
737    if( msgs->peer->pendingReqsToClient == 0 )
738        return FALSE;
739
740    *setme = msgs->peerAskedFor[0];
741
742    tr_removeElementFromArray( msgs->peerAskedFor, 0, sizeof( struct peer_request ),
743                               msgs->peer->pendingReqsToClient-- );
744
745    return TRUE;
746}
747
748static void
749cancelAllRequestsToClient( tr_peermsgs * msgs )
750{
751    struct peer_request req;
752    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
753
754    while( popNextRequest( msgs, &req ))
755        if( mustSendCancel )
756            protocolSendReject( msgs, &req );
757}
758
759void
760tr_peerMsgsSetChoke( tr_peermsgs * msgs,
761                     int           choke )
762{
763    const time_t now = tr_time( );
764    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
765
766    assert( msgs );
767    assert( msgs->peer );
768    assert( choke == 0 || choke == 1 );
769
770    if( msgs->peer->chokeChangedAt > fibrillationTime )
771    {
772        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
773    }
774    else if( msgs->peer->peerIsChoked != choke )
775    {
776        msgs->peer->peerIsChoked = choke;
777        if( choke )
778            cancelAllRequestsToClient( msgs );
779        protocolSendChoke( msgs, choke );
780        msgs->peer->chokeChangedAt = now;
781    }
782}
783
784/**
785***
786**/
787
788void
789tr_peerMsgsHave( tr_peermsgs * msgs,
790                 uint32_t      index )
791{
792    protocolSendHave( msgs, index );
793
794    /* since we have more pieces now, we might not be interested in this peer */
795    updateInterest( msgs );
796}
797
798/**
799***
800**/
801
802static tr_bool
803reqIsValid( const tr_peermsgs * peer,
804            uint32_t            index,
805            uint32_t            offset,
806            uint32_t            length )
807{
808    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
809}
810
811static tr_bool
812requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
813{
814    return reqIsValid( msgs, req->index, req->offset, req->length );
815}
816
817void
818tr_peerMsgsCancel( tr_peermsgs * msgs, tr_block_index_t block )
819{
820    struct peer_request req;
821/*fprintf( stderr, "SENDING CANCEL MESSAGE FOR BLOCK %zu\n\t\tFROM PEER %p ------------------------------------\n", (size_t)block, msgs->peer );*/
822    blockToReq( msgs->torrent, block, &req );
823    protocolSendCancel( msgs, &req );
824}
825
826/**
827***
828**/
829
830static void
831sendLtepHandshake( tr_peermsgs * msgs )
832{
833    tr_benc val, *m;
834    char * buf;
835    int len;
836    tr_bool allow_pex;
837    tr_bool allow_metadata_xfer;
838    struct evbuffer * out = msgs->outMessages;
839    const unsigned char * ipv6 = tr_globalIPv6();
840
841    if( msgs->clientSentLtepHandshake )
842        return;
843
844    dbgmsg( msgs, "sending an ltep handshake" );
845    msgs->clientSentLtepHandshake = 1;
846
847    /* decide if we want to advertise metadata xfer support (BEP 9) */
848    if( tr_torrentIsPrivate( msgs->torrent ) )
849        allow_metadata_xfer = 0;
850    else
851        allow_metadata_xfer = 1;
852
853    /* decide if we want to advertise pex support */
854    if( !tr_torrentAllowsPex( msgs->torrent ) )
855        allow_pex = 0;
856    else if( msgs->peerSentLtepHandshake )
857        allow_pex = msgs->peerSupportsPex ? 1 : 0;
858    else
859        allow_pex = 1;
860
861    tr_bencInitDict( &val, 8 );
862    tr_bencDictAddInt( &val, "e", getSession(msgs)->encryptionMode != TR_CLEAR_PREFERRED );
863    if( ipv6 != NULL )
864        tr_bencDictAddRaw( &val, "ipv6", ipv6, 16 );
865    if( allow_metadata_xfer && tr_torrentHasMetadata( msgs->torrent )
866                            && ( msgs->torrent->infoDictLength > 0 ) )
867        tr_bencDictAddInt( &val, "metadata_size", msgs->torrent->infoDictLength );
868    tr_bencDictAddInt( &val, "p", tr_sessionGetPublicPeerPort( getSession(msgs) ) );
869    tr_bencDictAddInt( &val, "reqq", REQQ );
870    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
871    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
872    m  = tr_bencDictAddDict( &val, "m", 2 );
873    if( allow_metadata_xfer )
874        tr_bencDictAddInt( m, "ut_metadata", UT_METADATA_ID );
875    if( allow_pex )
876        tr_bencDictAddInt( m, "ut_pex", UT_PEX_ID );
877
878    buf = tr_bencToStr( &val, TR_FMT_BENC, &len );
879
880    evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + len );
881    evbuffer_add_uint8 ( out, BT_LTEP );
882    evbuffer_add_uint8 ( out, LTEP_HANDSHAKE );
883    evbuffer_add       ( out, buf, len );
884    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
885    dbgOutMessageLen( msgs );
886
887    /* cleanup */
888    tr_bencFree( &val );
889    tr_free( buf );
890}
891
892static void
893parseLtepHandshake( tr_peermsgs *     msgs,
894                    int               len,
895                    struct evbuffer * inbuf )
896{
897    int64_t   i;
898    tr_benc   val, * sub;
899    uint8_t * tmp = tr_new( uint8_t, len );
900    const uint8_t *addr;
901    size_t addr_len;
902    tr_pex pex;
903    int8_t seedProbability = -1;
904
905    memset( &pex, 0, sizeof( tr_pex ) );
906
907    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
908    msgs->peerSentLtepHandshake = 1;
909
910    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
911    {
912        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
913        tr_free( tmp );
914        return;
915    }
916
917    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
918
919    /* does the peer prefer encrypted connections? */
920    if( tr_bencDictFindInt( &val, "e", &i ) ) {
921        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
922                                              : ENCRYPTION_PREFERENCE_NO;
923        if( i )
924            pex.flags |= ADDED_F_ENCRYPTION_FLAG;
925    }
926
927    /* check supported messages for utorrent pex */
928    msgs->peerSupportsPex = 0;
929    msgs->peerSupportsMetadataXfer = 0;
930
931    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
932        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
933            msgs->peerSupportsPex = i != 0;
934            msgs->ut_pex_id = (uint8_t) i;
935            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
936        }
937        if( tr_bencDictFindInt( sub, "ut_metadata", &i ) ) {
938            msgs->peerSupportsMetadataXfer = i != 0;
939            msgs->ut_metadata_id = (uint8_t) i;
940            dbgmsg( msgs, "msgs->ut_metadata_id is %d", (int)msgs->ut_metadata_id );
941        }
942        if( tr_bencDictFindInt( sub, "ut_holepunch", &i ) ) {
943            /* Mysterious µTorrent extension that we don't grok.  However,
944               it implies support for µTP, so use it to indicate that. */
945            tr_peerMgrSetUtpFailed( msgs->torrent,
946                                    tr_peerIoGetAddress( msgs->peer->io, NULL ),
947                                    FALSE );
948        }
949    }
950
951    /* look for metainfo size (BEP 9) */
952    if( tr_bencDictFindInt( &val, "metadata_size", &i ) )
953        tr_torrentSetMetadataSizeHint( msgs->torrent, i );
954
955    /* look for upload_only (BEP 21) */
956    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
957        seedProbability = i==0 ? 0 : 100;
958
959    /* get peer's listening port */
960    if( tr_bencDictFindInt( &val, "p", &i ) ) {
961        pex.port = htons( (uint16_t)i );
962        fireClientGotPort( msgs, pex.port );
963        dbgmsg( msgs, "peer's port is now %d", (int)i );
964    }
965
966    if( tr_peerIoIsIncoming( msgs->peer->io )
967        && tr_bencDictFindRaw( &val, "ipv4", &addr, &addr_len )
968        && ( addr_len == 4 ) )
969    {
970        pex.addr.type = TR_AF_INET;
971        memcpy( &pex.addr.addr.addr4, addr, 4 );
972        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
973    }
974
975    if( tr_peerIoIsIncoming( msgs->peer->io )
976        && tr_bencDictFindRaw( &val, "ipv6", &addr, &addr_len )
977        && ( addr_len == 16 ) )
978    {
979        pex.addr.type = TR_AF_INET6;
980        memcpy( &pex.addr.addr.addr6, addr, 16 );
981        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
982    }
983
984    /* get peer's maximum request queue size */
985    if( tr_bencDictFindInt( &val, "reqq", &i ) )
986        msgs->reqq = i;
987
988    tr_bencFree( &val );
989    tr_free( tmp );
990}
991
992static void
993parseUtMetadata( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
994{
995    tr_benc dict;
996    char * msg_end;
997    char * benc_end;
998    int64_t msg_type = -1;
999    int64_t piece = -1;
1000    int64_t total_size = 0;
1001    uint8_t * tmp = tr_new( uint8_t, msglen );
1002
1003    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1004    msg_end = (char*)tmp + msglen;
1005
1006    if( !tr_bencLoad( tmp, msglen, &dict, &benc_end ) )
1007    {
1008        tr_bencDictFindInt( &dict, "msg_type", &msg_type );
1009        tr_bencDictFindInt( &dict, "piece", &piece );
1010        tr_bencDictFindInt( &dict, "total_size", &total_size );
1011        tr_bencFree( &dict );
1012    }
1013
1014    dbgmsg( msgs, "got ut_metadata msg: type %d, piece %d, total_size %d",
1015            (int)msg_type, (int)piece, (int)total_size );
1016
1017    if( msg_type == METADATA_MSG_TYPE_REJECT )
1018    {
1019        /* NOOP */
1020    }
1021
1022    if( ( msg_type == METADATA_MSG_TYPE_DATA )
1023        && ( !tr_torrentHasMetadata( msgs->torrent ) )
1024        && ( msg_end - benc_end <= METADATA_PIECE_SIZE )
1025        && ( piece * METADATA_PIECE_SIZE + (msg_end - benc_end) <= total_size ) )
1026    {
1027        const int pieceLen = msg_end - benc_end;
1028        tr_torrentSetMetadataPiece( msgs->torrent, piece, benc_end, pieceLen );
1029    }
1030
1031    if( msg_type == METADATA_MSG_TYPE_REQUEST )
1032    {
1033        if( ( piece >= 0 )
1034            && tr_torrentHasMetadata( msgs->torrent )
1035            && !tr_torrentIsPrivate( msgs->torrent )
1036            && ( msgs->peerAskedForMetadataCount < METADATA_REQQ ) )
1037        {
1038            msgs->peerAskedForMetadata[msgs->peerAskedForMetadataCount++] = piece;
1039        }
1040        else
1041        {
1042            tr_benc tmp;
1043            int payloadLen;
1044            char * payload;
1045            struct evbuffer * out = msgs->outMessages;
1046
1047            /* build the rejection message */
1048            tr_bencInitDict( &tmp, 2 );
1049            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1050            tr_bencDictAddInt( &tmp, "piece", piece );
1051            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1052            tr_bencFree( &tmp );
1053
1054            /* write it out as a LTEP message to our outMessages buffer */
1055            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen );
1056            evbuffer_add_uint8 ( out, BT_LTEP );
1057            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1058            evbuffer_add       ( out, payload, payloadLen );
1059            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1060            dbgOutMessageLen( msgs );
1061
1062            tr_free( payload );
1063        }
1064    }
1065
1066    tr_free( tmp );
1067}
1068
1069static void
1070parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1071{
1072    int loaded = 0;
1073    uint8_t * tmp = tr_new( uint8_t, msglen );
1074    tr_benc val;
1075    tr_torrent * tor = msgs->torrent;
1076    const uint8_t * added;
1077    size_t added_len;
1078
1079    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1080
1081    if( tr_torrentAllowsPex( tor )
1082      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1083    {
1084        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1085        {
1086            tr_pex * pex;
1087            size_t i, n;
1088            size_t added_f_len = 0;
1089            const uint8_t * added_f = NULL;
1090
1091            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1092            pex = tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len, &n );
1093
1094            n = MIN( n, MAX_PEX_PEER_COUNT );
1095            for( i=0; i<n; ++i )
1096            {
1097                int seedProbability = -1;
1098                if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
1099                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
1100            }
1101
1102            tr_free( pex );
1103        }
1104
1105        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1106        {
1107            tr_pex * pex;
1108            size_t i, n;
1109            size_t added_f_len = 0;
1110            const uint8_t * added_f = NULL;
1111
1112            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1113            pex = tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len, &n );
1114
1115            n = MIN( n, MAX_PEX_PEER_COUNT );
1116            for( i=0; i<n; ++i )
1117            {
1118                int seedProbability = -1;
1119                if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
1120                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
1121            }
1122
1123            tr_free( pex );
1124        }
1125    }
1126
1127    if( loaded )
1128        tr_bencFree( &val );
1129    tr_free( tmp );
1130}
1131
1132static void sendPex( tr_peermsgs * msgs );
1133
1134static void
1135parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer  * inbuf )
1136{
1137    uint8_t ltep_msgid;
1138
1139    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1140    msglen--;
1141
1142    if( ltep_msgid == LTEP_HANDSHAKE )
1143    {
1144        dbgmsg( msgs, "got ltep handshake" );
1145        parseLtepHandshake( msgs, msglen, inbuf );
1146        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1147        {
1148            sendLtepHandshake( msgs );
1149            sendPex( msgs );
1150        }
1151    }
1152    else if( ltep_msgid == UT_PEX_ID )
1153    {
1154        dbgmsg( msgs, "got ut pex" );
1155        msgs->peerSupportsPex = 1;
1156        parseUtPex( msgs, msglen, inbuf );
1157    }
1158    else if( ltep_msgid == UT_METADATA_ID )
1159    {
1160        dbgmsg( msgs, "got ut metadata" );
1161        msgs->peerSupportsMetadataXfer = 1;
1162        parseUtMetadata( msgs, msglen, inbuf );
1163    }
1164    else
1165    {
1166        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1167        evbuffer_drain( inbuf, msglen );
1168    }
1169}
1170
1171static int
1172readBtLength( tr_peermsgs *     msgs,
1173              struct evbuffer * inbuf,
1174              size_t            inlen )
1175{
1176    uint32_t len;
1177
1178    if( inlen < sizeof( len ) )
1179        return READ_LATER;
1180
1181    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1182
1183    if( len == 0 ) /* peer sent us a keepalive message */
1184        dbgmsg( msgs, "got KeepAlive" );
1185    else
1186    {
1187        msgs->incoming.length = len;
1188        msgs->state = AWAITING_BT_ID;
1189    }
1190
1191    return READ_NOW;
1192}
1193
1194static int readBtMessage( tr_peermsgs     * msgs,
1195                          struct evbuffer * inbuf,
1196                          size_t            inlen );
1197
1198static int
1199readBtId( tr_peermsgs * msgs, struct evbuffer  * inbuf, size_t inlen )
1200{
1201    uint8_t id;
1202
1203    if( inlen < sizeof( uint8_t ) )
1204        return READ_LATER;
1205
1206    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1207    msgs->incoming.id = id;
1208    dbgmsg( msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length );
1209
1210    if( id == BT_PIECE )
1211    {
1212        msgs->state = AWAITING_BT_PIECE;
1213        return READ_NOW;
1214    }
1215    else if( msgs->incoming.length != 1 )
1216    {
1217        msgs->state = AWAITING_BT_MESSAGE;
1218        return READ_NOW;
1219    }
1220    else return readBtMessage( msgs, inbuf, inlen - 1 );
1221}
1222
1223static void
1224updatePeerProgress( tr_peermsgs * msgs )
1225{
1226    msgs->peer->progress = tr_bitsetPercent( &msgs->peer->have );
1227    dbgmsg( msgs, "peer progress is %f", msgs->peer->progress );
1228    updateFastSet( msgs );
1229    updateInterest( msgs );
1230    firePeerProgress( msgs );
1231}
1232
1233static void
1234prefetchPieces( tr_peermsgs *msgs )
1235{
1236    int i;
1237
1238    if( !getSession(msgs)->isPrefetchEnabled )
1239        return;
1240
1241    /* Maintain 12 prefetched blocks per unchoked peer */
1242    for( i=msgs->prefetchCount; i<msgs->peer->pendingReqsToClient && i<12; ++i )
1243    {
1244        const struct peer_request * req = msgs->peerAskedFor + i;
1245        if( requestIsValid( msgs, req ) )
1246        {
1247            tr_cachePrefetchBlock( getSession(msgs)->cache, msgs->torrent, req->index, req->offset, req->length );
1248            ++msgs->prefetchCount;
1249        }
1250    }
1251}
1252
1253static void
1254peerMadeRequest( tr_peermsgs *               msgs,
1255                 const struct peer_request * req )
1256{
1257    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1258    const int reqIsValid = requestIsValid( msgs, req );
1259    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( &msgs->torrent->completion, req->index );
1260    const int peerIsChoked = msgs->peer->peerIsChoked;
1261
1262    int allow = FALSE;
1263
1264    if( !reqIsValid )
1265        dbgmsg( msgs, "rejecting an invalid request." );
1266    else if( !clientHasPiece )
1267        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1268    else if( peerIsChoked )
1269        dbgmsg( msgs, "rejecting request from choked peer" );
1270    else if( msgs->peer->pendingReqsToClient + 1 >= REQQ )
1271        dbgmsg( msgs, "rejecting request ... reqq is full" );
1272    else
1273        allow = TRUE;
1274
1275    if( allow ) {
1276        msgs->peerAskedFor[msgs->peer->pendingReqsToClient++] = *req;
1277        prefetchPieces( msgs );
1278    } else if( fext ) {
1279        protocolSendReject( msgs, req );
1280    }
1281}
1282
1283static tr_bool
1284messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1285{
1286    switch( id )
1287    {
1288        case BT_CHOKE:
1289        case BT_UNCHOKE:
1290        case BT_INTERESTED:
1291        case BT_NOT_INTERESTED:
1292        case BT_FEXT_HAVE_ALL:
1293        case BT_FEXT_HAVE_NONE:
1294            return len == 1;
1295
1296        case BT_HAVE:
1297        case BT_FEXT_SUGGEST:
1298        case BT_FEXT_ALLOWED_FAST:
1299            return len == 5;
1300
1301        case BT_BITFIELD:
1302            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1303
1304        case BT_REQUEST:
1305        case BT_CANCEL:
1306        case BT_FEXT_REJECT:
1307            return len == 13;
1308
1309        case BT_PIECE:
1310            return len > 9 && len <= 16393;
1311
1312        case BT_PORT:
1313            return len == 3;
1314
1315        case BT_LTEP:
1316            return len >= 2;
1317
1318        default:
1319            return FALSE;
1320    }
1321}
1322
1323static int clientGotBlock( tr_peermsgs *               msgs,
1324                           struct evbuffer *           block,
1325                           const struct peer_request * req );
1326
1327static int
1328readBtPiece( tr_peermsgs      * msgs,
1329             struct evbuffer  * inbuf,
1330             size_t             inlen,
1331             size_t           * setme_piece_bytes_read )
1332{
1333    struct peer_request * req = &msgs->incoming.blockReq;
1334
1335    assert( evbuffer_get_length( inbuf ) >= inlen );
1336    dbgmsg( msgs, "In readBtPiece" );
1337
1338    if( !req->length )
1339    {
1340        if( inlen < 8 )
1341            return READ_LATER;
1342
1343        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1344        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1345        req->length = msgs->incoming.length - 9;
1346        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1347        return READ_NOW;
1348    }
1349    else
1350    {
1351        int err;
1352
1353        /* read in another chunk of data */
1354        const size_t nLeft = req->length - evbuffer_get_length( msgs->incoming.block );
1355        size_t n = MIN( nLeft, inlen );
1356        size_t i = n;
1357        void * buf = tr_sessionGetBuffer( getSession( msgs ) );
1358        const size_t buflen = SESSION_BUFFER_SIZE;
1359
1360        while( i > 0 )
1361        {
1362            const size_t thisPass = MIN( i, buflen );
1363            tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, thisPass );
1364            evbuffer_add( msgs->incoming.block, buf, thisPass );
1365            i -= thisPass;
1366        }
1367
1368        tr_sessionReleaseBuffer( getSession( msgs ) );
1369        buf = NULL;
1370
1371        fireClientGotData( msgs, n, TRUE );
1372        *setme_piece_bytes_read += n;
1373        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1374               n, req->index, req->offset, req->length,
1375               (int)( req->length - evbuffer_get_length( msgs->incoming.block ) ) );
1376        if( evbuffer_get_length( msgs->incoming.block ) < req->length )
1377            return READ_LATER;
1378
1379        /* we've got the whole block ... process it */
1380        err = clientGotBlock( msgs, msgs->incoming.block, req );
1381
1382        /* cleanup */
1383        evbuffer_free( msgs->incoming.block );
1384        msgs->incoming.block = evbuffer_new( );
1385        req->length = 0;
1386        msgs->state = AWAITING_BT_LENGTH;
1387        return err ? READ_ERR : READ_NOW;
1388    }
1389}
1390
1391static void updateDesiredRequestCount( tr_peermsgs * msgs );
1392
1393static int
1394readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1395{
1396    uint32_t      ui32;
1397    uint32_t      msglen = msgs->incoming.length;
1398    const uint8_t id = msgs->incoming.id;
1399#ifndef NDEBUG
1400    const size_t  startBufLen = evbuffer_get_length( inbuf );
1401#endif
1402    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1403
1404    --msglen; /* id length */
1405
1406    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1407
1408    if( inlen < msglen )
1409        return READ_LATER;
1410
1411    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1412    {
1413        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1414        fireError( msgs, EMSGSIZE );
1415        return READ_ERR;
1416    }
1417
1418    switch( id )
1419    {
1420        case BT_CHOKE:
1421            dbgmsg( msgs, "got Choke" );
1422            msgs->peer->clientIsChoked = 1;
1423            if( !fext )
1424                fireGotChoke( msgs );
1425            break;
1426
1427        case BT_UNCHOKE:
1428            dbgmsg( msgs, "got Unchoke" );
1429            msgs->peer->clientIsChoked = 0;
1430            updateDesiredRequestCount( msgs );
1431            break;
1432
1433        case BT_INTERESTED:
1434            dbgmsg( msgs, "got Interested" );
1435            msgs->peer->peerIsInterested = 1;
1436            break;
1437
1438        case BT_NOT_INTERESTED:
1439            dbgmsg( msgs, "got Not Interested" );
1440            msgs->peer->peerIsInterested = 0;
1441            break;
1442
1443        case BT_HAVE:
1444            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1445            dbgmsg( msgs, "got Have: %u", ui32 );
1446            if( tr_torrentHasMetadata( msgs->torrent )
1447                    && ( ui32 >= msgs->torrent->info.pieceCount ) )
1448            {
1449                fireError( msgs, ERANGE );
1450                return READ_ERR;
1451            }
1452
1453            /* a peer can send the same HAVE message twice... */
1454            if( !tr_bitsetHas( &msgs->peer->have, ui32 ) )
1455                if( !tr_bitsetAdd( &msgs->peer->have, ui32 ) )
1456                    fireClientGotHave( msgs, ui32 );
1457            updatePeerProgress( msgs );
1458            break;
1459
1460        case BT_BITFIELD: {
1461            const size_t bitCount = tr_torrentHasMetadata( msgs->torrent )
1462                                  ? msgs->torrent->info.pieceCount
1463                                  : msglen * 8;
1464            dbgmsg( msgs, "got a bitfield" );
1465            tr_bitsetReserve( &msgs->peer->have, bitCount );
1466            tr_peerIoReadBytes( msgs->peer->io, inbuf,
1467                                msgs->peer->have.bitfield.bits, msglen );
1468            fireClientGotBitfield( msgs, &msgs->peer->have.bitfield );
1469            updatePeerProgress( msgs );
1470            break;
1471        }
1472
1473        case BT_REQUEST:
1474        {
1475            struct peer_request r;
1476            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1477            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1478            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1479            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1480            peerMadeRequest( msgs, &r );
1481            break;
1482        }
1483
1484        case BT_CANCEL:
1485        {
1486            int i;
1487            struct peer_request r;
1488            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1489            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1490            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1491            tr_historyAdd( msgs->peer->cancelsSentToClient, tr_time( ), 1 );
1492            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1493
1494            for( i=0; i<msgs->peer->pendingReqsToClient; ++i ) {
1495                const struct peer_request * req = msgs->peerAskedFor + i;
1496                if( ( req->index == r.index ) && ( req->offset == r.offset ) && ( req->length == r.length ) )
1497                    break;
1498            }
1499
1500            if( i < msgs->peer->pendingReqsToClient )
1501                tr_removeElementFromArray( msgs->peerAskedFor, i, sizeof( struct peer_request ),
1502                                           msgs->peer->pendingReqsToClient-- );
1503            break;
1504        }
1505
1506        case BT_PIECE:
1507            assert( 0 ); /* handled elsewhere! */
1508            break;
1509
1510        case BT_PORT:
1511            dbgmsg( msgs, "Got a BT_PORT" );
1512            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->dht_port );
1513            if( msgs->peer->dht_port > 0 )
1514                tr_dhtAddNode( getSession(msgs),
1515                               tr_peerAddress( msgs->peer ),
1516                               msgs->peer->dht_port, 0 );
1517            break;
1518
1519        case BT_FEXT_SUGGEST:
1520            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1521            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1522            if( fext )
1523                fireClientGotSuggest( msgs, ui32 );
1524            else {
1525                fireError( msgs, EMSGSIZE );
1526                return READ_ERR;
1527            }
1528            break;
1529
1530        case BT_FEXT_ALLOWED_FAST:
1531            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1532            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1533            if( fext )
1534                fireClientGotAllowedFast( msgs, ui32 );
1535            else {
1536                fireError( msgs, EMSGSIZE );
1537                return READ_ERR;
1538            }
1539            break;
1540
1541        case BT_FEXT_HAVE_ALL:
1542            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1543            if( fext ) {
1544                tr_bitsetSetHaveAll( &msgs->peer->have );
1545                fireClientGotHaveAll( msgs );
1546                updatePeerProgress( msgs );
1547            } else {
1548                fireError( msgs, EMSGSIZE );
1549                return READ_ERR;
1550            }
1551            break;
1552
1553        case BT_FEXT_HAVE_NONE:
1554            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1555            if( fext ) {
1556                tr_bitsetSetHaveNone( &msgs->peer->have );
1557                fireClientGotHaveNone( msgs );
1558                updatePeerProgress( msgs );
1559            } else {
1560                fireError( msgs, EMSGSIZE );
1561                return READ_ERR;
1562            }
1563            break;
1564
1565        case BT_FEXT_REJECT:
1566        {
1567            struct peer_request r;
1568            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1569            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1570            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1571            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1572            if( fext )
1573                fireGotRej( msgs, &r );
1574            else {
1575                fireError( msgs, EMSGSIZE );
1576                return READ_ERR;
1577            }
1578            break;
1579        }
1580
1581        case BT_LTEP:
1582            dbgmsg( msgs, "Got a BT_LTEP" );
1583            parseLtep( msgs, msglen, inbuf );
1584            break;
1585
1586        default:
1587            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1588            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1589            break;
1590    }
1591
1592    assert( msglen + 1 == msgs->incoming.length );
1593    assert( evbuffer_get_length( inbuf ) == startBufLen - msglen );
1594
1595    msgs->state = AWAITING_BT_LENGTH;
1596    return READ_NOW;
1597}
1598
1599static void
1600addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1601{
1602    if( !msgs->peer->blame )
1603         msgs->peer->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1604    tr_bitfieldAdd( msgs->peer->blame, index );
1605}
1606
1607/* returns 0 on success, or an errno on failure */
1608static int
1609clientGotBlock( tr_peermsgs *               msgs,
1610                struct evbuffer *           data,
1611                const struct peer_request * req )
1612{
1613    int err;
1614    tr_torrent * tor = msgs->torrent;
1615    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1616
1617    assert( msgs );
1618    assert( req );
1619
1620    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1621        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1622                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1623        return EMSGSIZE;
1624    }
1625
1626    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1627
1628    if( !tr_peerMgrDidPeerRequest( msgs->torrent, msgs->peer, block ) ) {
1629        dbgmsg( msgs, "we didn't ask for this message..." );
1630        return 0;
1631    }
1632    if( tr_cpPieceIsComplete( &msgs->torrent->completion, req->index ) ) {
1633        dbgmsg( msgs, "we did ask for this message, but the piece is already complete..." );
1634        return 0;
1635    }
1636
1637    /**
1638    ***  Save the block
1639    **/
1640
1641    if(( err = tr_cacheWriteBlock( getSession(msgs)->cache, tor, req->index, req->offset, req->length, data )))
1642        return err;
1643
1644    addPeerToBlamefield( msgs, req->index );
1645    fireGotBlock( msgs, req );
1646    return 0;
1647}
1648
1649static int peerPulse( void * vmsgs );
1650
1651static void
1652didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1653{
1654    tr_peermsgs * msgs = vmsgs;
1655    firePeerGotData( msgs, bytesWritten, wasPieceData );
1656
1657    if ( tr_isPeerIo( io ) && io->userData )
1658        peerPulse( msgs );
1659}
1660
1661static ReadState
1662canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1663{
1664    ReadState         ret;
1665    tr_peermsgs *     msgs = vmsgs;
1666    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1667    const size_t      inlen = evbuffer_get_length( in );
1668
1669    dbgmsg( msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state );
1670
1671    if( !inlen )
1672    {
1673        ret = READ_LATER;
1674    }
1675    else if( msgs->state == AWAITING_BT_PIECE )
1676    {
1677        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1678    }
1679    else switch( msgs->state )
1680    {
1681        case AWAITING_BT_LENGTH:
1682            ret = readBtLength ( msgs, in, inlen ); break;
1683
1684        case AWAITING_BT_ID:
1685            ret = readBtId     ( msgs, in, inlen ); break;
1686
1687        case AWAITING_BT_MESSAGE:
1688            ret = readBtMessage( msgs, in, inlen ); break;
1689
1690        default:
1691            ret = READ_ERR;
1692            assert( 0 );
1693    }
1694
1695    dbgmsg( msgs, "canRead: ret is %d", (int)ret );
1696
1697    /* log the raw data that was read */
1698    if( ( ret != READ_ERR ) && ( evbuffer_get_length( in ) != inlen ) )
1699        fireClientGotData( msgs, inlen - evbuffer_get_length( in ), FALSE );
1700
1701    return ret;
1702}
1703
1704int
1705tr_peerMsgsIsReadingBlock( const tr_peermsgs * msgs, tr_block_index_t block )
1706{
1707    if( msgs->state != AWAITING_BT_PIECE )
1708        return FALSE;
1709
1710    return block == _tr_block( msgs->torrent,
1711                               msgs->incoming.blockReq.index,
1712                               msgs->incoming.blockReq.offset );
1713}
1714
1715/**
1716***
1717**/
1718
1719static void
1720updateDesiredRequestCount( tr_peermsgs * msgs )
1721{
1722    const tr_torrent * const torrent = msgs->torrent;
1723
1724    if( tr_torrentIsSeed( msgs->torrent ) )
1725    {
1726        msgs->desiredRequestCount = 0;
1727    }
1728    else if( msgs->peer->clientIsChoked )
1729    {
1730        msgs->desiredRequestCount = 0;
1731    }
1732    else if( !msgs->peer->clientIsInterested )
1733    {
1734        msgs->desiredRequestCount = 0;
1735    }
1736    else
1737    {
1738        int estimatedBlocksInPeriod;
1739        int rate_Bps;
1740        int irate_Bps;
1741        const int floor = 4;
1742        const int seconds = REQUEST_BUF_SECS;
1743        const uint64_t now = tr_time_msec( );
1744
1745        /* Get the rate limit we should use.
1746         * FIXME: this needs to consider all the other peers as well... */
1747        rate_Bps = tr_peerGetPieceSpeed_Bps( msgs->peer, now, TR_PEER_TO_CLIENT );
1748        if( tr_torrentUsesSpeedLimit( torrent, TR_PEER_TO_CLIENT ) )
1749            rate_Bps = MIN( rate_Bps, tr_torrentGetSpeedLimit_Bps( torrent, TR_PEER_TO_CLIENT ) );
1750
1751        /* honor the session limits, if enabled */
1752        if( tr_torrentUsesSessionLimits( torrent ) )
1753            if( tr_sessionGetActiveSpeedLimit_Bps( torrent->session, TR_PEER_TO_CLIENT, &irate_Bps ) )
1754                rate_Bps = MIN( rate_Bps, irate_Bps );
1755
1756        /* use this desired rate to figure out how
1757         * many requests we should send to this peer */
1758        estimatedBlocksInPeriod = ( rate_Bps * seconds ) / torrent->blockSize;
1759        msgs->desiredRequestCount = MAX( floor, estimatedBlocksInPeriod );
1760
1761        /* honor the peer's maximum request count, if specified */
1762        if( msgs->reqq > 0 )
1763            if( msgs->desiredRequestCount > msgs->reqq )
1764                msgs->desiredRequestCount = msgs->reqq;
1765    }
1766}
1767
1768static void
1769updateMetadataRequests( tr_peermsgs * msgs, time_t now )
1770{
1771    int piece;
1772
1773    if( msgs->peerSupportsMetadataXfer
1774        && tr_torrentGetNextMetadataRequest( msgs->torrent, now, &piece ) )
1775    {
1776        tr_benc tmp;
1777        int payloadLen;
1778        char * payload;
1779        struct evbuffer * out = msgs->outMessages;
1780
1781        /* build the data message */
1782        tr_bencInitDict( &tmp, 3 );
1783        tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REQUEST );
1784        tr_bencDictAddInt( &tmp, "piece", piece );
1785        payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1786        tr_bencFree( &tmp );
1787
1788        dbgmsg( msgs, "requesting metadata piece #%d", piece );
1789
1790        /* write it out as a LTEP message to our outMessages buffer */
1791        evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen );
1792        evbuffer_add_uint8 ( out, BT_LTEP );
1793        evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1794        evbuffer_add       ( out, payload, payloadLen );
1795        pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1796        dbgOutMessageLen( msgs );
1797
1798        tr_free( payload );
1799    }
1800}
1801
1802static void
1803updateBlockRequests( tr_peermsgs * msgs )
1804{
1805    if( tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT )
1806        && ( msgs->desiredRequestCount > 0 )
1807        && ( msgs->peer->pendingReqsToPeer <= ( msgs->desiredRequestCount * 0.66 ) ) )
1808    {
1809        int i;
1810        int n;
1811        const int numwant = msgs->desiredRequestCount - msgs->peer->pendingReqsToPeer;
1812        tr_block_index_t * blocks = tr_new( tr_block_index_t, numwant );
1813
1814        tr_peerMgrGetNextRequests( msgs->torrent, msgs->peer, numwant, blocks, &n );
1815
1816        for( i=0; i<n; ++i )
1817        {
1818            struct peer_request req;
1819            blockToReq( msgs->torrent, blocks[i], &req );
1820            protocolSendRequest( msgs, &req );
1821        }
1822
1823        tr_free( blocks );
1824    }
1825}
1826
1827static size_t
1828fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1829{
1830    int piece;
1831    size_t bytesWritten = 0;
1832    struct peer_request req;
1833    const tr_bool haveMessages = evbuffer_get_length( msgs->outMessages ) != 0;
1834    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1835
1836    /**
1837    ***  Protocol messages
1838    **/
1839
1840    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1841    {
1842        dbgmsg( msgs, "started an outMessages batch (length is %zu)", evbuffer_get_length( msgs->outMessages ) );
1843        msgs->outMessagesBatchedAt = now;
1844    }
1845    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1846    {
1847        const size_t len = evbuffer_get_length( msgs->outMessages );
1848        /* flush the protocol messages */
1849        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1850        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, FALSE );
1851        msgs->clientSentAnythingAt = now;
1852        msgs->outMessagesBatchedAt = 0;
1853        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1854        bytesWritten +=  len;
1855    }
1856
1857    /**
1858    ***  Metadata Pieces
1859    **/
1860
1861    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= METADATA_PIECE_SIZE )
1862        && popNextMetadataRequest( msgs, &piece ) )
1863    {
1864        char * data;
1865        int dataLen;
1866        tr_bool ok = FALSE;
1867
1868        data = tr_torrentGetMetadataPiece( msgs->torrent, piece, &dataLen );
1869        if( ( dataLen > 0 ) && ( data != NULL ) )
1870        {
1871            tr_benc tmp;
1872            int payloadLen;
1873            char * payload;
1874            struct evbuffer * out = msgs->outMessages;
1875
1876            /* build the data message */
1877            tr_bencInitDict( &tmp, 3 );
1878            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_DATA );
1879            tr_bencDictAddInt( &tmp, "piece", piece );
1880            tr_bencDictAddInt( &tmp, "total_size", msgs->torrent->infoDictLength );
1881            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1882            tr_bencFree( &tmp );
1883
1884            /* write it out as a LTEP message to our outMessages buffer */
1885            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen + dataLen );
1886            evbuffer_add_uint8 ( out, BT_LTEP );
1887            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1888            evbuffer_add       ( out, payload, payloadLen );
1889            evbuffer_add       ( out, data, dataLen );
1890            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1891            dbgOutMessageLen( msgs );
1892
1893            tr_free( payload );
1894            tr_free( data );
1895
1896            ok = TRUE;
1897        }
1898
1899        if( !ok ) /* send a rejection message */
1900        {
1901            tr_benc tmp;
1902            int payloadLen;
1903            char * payload;
1904            struct evbuffer * out = msgs->outMessages;
1905
1906            /* build the rejection message */
1907            tr_bencInitDict( &tmp, 2 );
1908            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1909            tr_bencDictAddInt( &tmp, "piece", piece );
1910            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1911            tr_bencFree( &tmp );
1912
1913            /* write it out as a LTEP message to our outMessages buffer */
1914            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen );
1915            evbuffer_add_uint8 ( out, BT_LTEP );
1916            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1917            evbuffer_add       ( out, payload, payloadLen );
1918            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1919            dbgOutMessageLen( msgs );
1920
1921            tr_free( payload );
1922        }
1923    }
1924
1925    /**
1926    ***  Data Blocks
1927    **/
1928
1929    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= msgs->torrent->blockSize )
1930        && popNextRequest( msgs, &req ) )
1931    {
1932        --msgs->prefetchCount;
1933
1934        if( requestIsValid( msgs, &req )
1935            && tr_cpPieceIsComplete( &msgs->torrent->completion, req.index ) )
1936        {
1937            int err;
1938            const uint32_t msglen = 4 + 1 + 4 + 4 + req.length;
1939            struct evbuffer * out;
1940            struct evbuffer_iovec iovec[1];
1941
1942            out = evbuffer_new( );
1943            evbuffer_expand( out, msglen );
1944
1945            evbuffer_add_uint32( out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1946            evbuffer_add_uint8 ( out, BT_PIECE );
1947            evbuffer_add_uint32( out, req.index );
1948            evbuffer_add_uint32( out, req.offset );
1949
1950            evbuffer_reserve_space( out, req.length, iovec, 1 );
1951            err = tr_cacheReadBlock( getSession(msgs)->cache, msgs->torrent, req.index, req.offset, req.length, iovec[0].iov_base );
1952            iovec[0].iov_len = req.length;
1953            evbuffer_commit_space( out, iovec, 1 );
1954
1955            /* check the piece if it needs checking... */
1956            if( !err && tr_torrentPieceNeedsCheck( msgs->torrent, req.index ) )
1957                if(( err = !tr_torrentCheckPiece( msgs->torrent, req.index )))
1958                    tr_torrentSetLocalError( msgs->torrent, _( "Please Verify Local Data! Piece #%zu is corrupt." ), (size_t)req.index );
1959
1960            if( err )
1961            {
1962                if( fext )
1963                    protocolSendReject( msgs, &req );
1964            }
1965            else
1966            {
1967                const size_t n = evbuffer_get_length( out );
1968                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1969                assert( n == msglen );
1970                tr_peerIoWriteBuf( msgs->peer->io, out, TRUE );
1971                bytesWritten += n;
1972                msgs->clientSentAnythingAt = now;
1973                tr_historyAdd( msgs->peer->blocksSentToPeer, tr_time( ), 1 );
1974            }
1975
1976            evbuffer_free( out );
1977
1978            if( err )
1979            {
1980                bytesWritten = 0;
1981                msgs = NULL;
1982            }
1983        }
1984        else if( fext ) /* peer needs a reject message */
1985        {
1986            protocolSendReject( msgs, &req );
1987        }
1988
1989        if( msgs != NULL )
1990            prefetchPieces( msgs );
1991    }
1992
1993    /**
1994    ***  Keepalive
1995    **/
1996
1997    if( ( msgs != NULL )
1998        && ( msgs->clientSentAnythingAt != 0 )
1999        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
2000    {
2001        dbgmsg( msgs, "sending a keepalive message" );
2002        evbuffer_add_uint32( msgs->outMessages, 0 );
2003        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
2004    }
2005
2006    return bytesWritten;
2007}
2008
2009static int
2010peerPulse( void * vmsgs )
2011{
2012    tr_peermsgs * msgs = vmsgs;
2013    const time_t  now = tr_time( );
2014
2015    if ( tr_isPeerIo( msgs->peer->io ) ) {
2016        updateDesiredRequestCount( msgs );
2017        updateBlockRequests( msgs );
2018        updateMetadataRequests( msgs, now );
2019    }
2020
2021    for( ;; )
2022        if( fillOutputBuffer( msgs, now ) < 1 )
2023            break;
2024
2025    return TRUE; /* loop forever */
2026}
2027
2028void
2029tr_peerMsgsPulse( tr_peermsgs * msgs )
2030{
2031    if( msgs != NULL )
2032        peerPulse( msgs );
2033}
2034
2035static void
2036gotError( tr_peerIo  * io UNUSED,
2037          short        what,
2038          void       * vmsgs )
2039{
2040    if( what & BEV_EVENT_TIMEOUT )
2041        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
2042    if( what & ( BEV_EVENT_EOF | BEV_EVENT_ERROR ) )
2043        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
2044               what, errno, tr_strerror( errno ) );
2045    fireError( vmsgs, ENOTCONN );
2046}
2047
2048static void
2049sendBitfield( tr_peermsgs * msgs )
2050{
2051    struct evbuffer * out = msgs->outMessages;
2052    tr_bitfield *     field;
2053    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
2054    size_t            i;
2055    size_t            lazyCount = 0;
2056
2057    field = tr_bitfieldDup( tr_cpPieceBitfield( &msgs->torrent->completion ) );
2058
2059    if( tr_sessionIsLazyBitfieldEnabled( getSession( msgs ) ) )
2060    {
2061        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
2062            speed over a truly random sample -- let's limit the pool size to
2063            the first 1000 pieces so large torrents don't bog things down */
2064        size_t poolSize;
2065        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
2066        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
2067
2068        /* build the pool */
2069        for( i=poolSize=0; i<maxPoolSize; ++i )
2070            if( tr_bitfieldHas( field, i ) )
2071                pool[poolSize++] = i;
2072
2073        /* pull random piece indices from the pool */
2074        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
2075        {
2076            const int pos = tr_cryptoWeakRandInt( poolSize );
2077            const tr_piece_index_t piece = pool[pos];
2078            tr_bitfieldRem( field, piece );
2079            lazyPieces[lazyCount++] = piece;
2080            pool[pos] = pool[--poolSize];
2081        }
2082
2083        /* cleanup */
2084        tr_free( pool );
2085    }
2086
2087    evbuffer_add_uint32( out, sizeof( uint8_t ) + field->byteCount );
2088    evbuffer_add_uint8 ( out, BT_BITFIELD );
2089    evbuffer_add       ( out, field->bits, field->byteCount );
2090    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu",
2091            evbuffer_get_length( out ) );
2092    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
2093
2094    for( i = 0; i < lazyCount; ++i )
2095        protocolSendHave( msgs, lazyPieces[i] );
2096
2097    tr_bitfieldFree( field );
2098}
2099
2100static void
2101tellPeerWhatWeHave( tr_peermsgs * msgs )
2102{
2103    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
2104
2105    if( fext && ( tr_cpGetStatus( &msgs->torrent->completion ) == TR_SEED ) )
2106    {
2107        protocolSendHaveAll( msgs );
2108    }
2109    else if( fext && ( tr_cpHaveValid( &msgs->torrent->completion ) == 0 ) )
2110    {
2111        protocolSendHaveNone( msgs );
2112    }
2113    else
2114    {
2115        sendBitfield( msgs );
2116    }
2117}
2118
2119/**
2120***
2121**/
2122
2123/* some peers give us error messages if we send
2124   more than this many peers in a single pex message
2125   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
2126#define MAX_PEX_ADDED 50
2127#define MAX_PEX_DROPPED 50
2128
2129typedef struct
2130{
2131    tr_pex *  added;
2132    tr_pex *  dropped;
2133    tr_pex *  elements;
2134    int       addedCount;
2135    int       droppedCount;
2136    int       elementCount;
2137}
2138PexDiffs;
2139
2140static void
2141pexAddedCb( void * vpex,
2142            void * userData )
2143{
2144    PexDiffs * diffs = userData;
2145    tr_pex *   pex = vpex;
2146
2147    if( diffs->addedCount < MAX_PEX_ADDED )
2148    {
2149        diffs->added[diffs->addedCount++] = *pex;
2150        diffs->elements[diffs->elementCount++] = *pex;
2151    }
2152}
2153
2154static inline void
2155pexDroppedCb( void * vpex,
2156              void * userData )
2157{
2158    PexDiffs * diffs = userData;
2159    tr_pex *   pex = vpex;
2160
2161    if( diffs->droppedCount < MAX_PEX_DROPPED )
2162    {
2163        diffs->dropped[diffs->droppedCount++] = *pex;
2164    }
2165}
2166
2167static inline void
2168pexElementCb( void * vpex,
2169              void * userData )
2170{
2171    PexDiffs * diffs = userData;
2172    tr_pex * pex = vpex;
2173
2174    diffs->elements[diffs->elementCount++] = *pex;
2175}
2176
2177static void
2178sendPex( tr_peermsgs * msgs )
2179{
2180    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2181    {
2182        PexDiffs diffs;
2183        PexDiffs diffs6;
2184        tr_pex * newPex = NULL;
2185        tr_pex * newPex6 = NULL;
2186        const int newCount = tr_peerMgrGetPeers( msgs->torrent, &newPex, TR_AF_INET, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2187        const int newCount6 = tr_peerMgrGetPeers( msgs->torrent, &newPex6, TR_AF_INET6, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2188
2189        /* build the diffs */
2190        diffs.added = tr_new( tr_pex, newCount );
2191        diffs.addedCount = 0;
2192        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2193        diffs.droppedCount = 0;
2194        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2195        diffs.elementCount = 0;
2196        tr_set_compare( msgs->pex, msgs->pexCount,
2197                        newPex, newCount,
2198                        tr_pexCompare, sizeof( tr_pex ),
2199                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2200        diffs6.added = tr_new( tr_pex, newCount6 );
2201        diffs6.addedCount = 0;
2202        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
2203        diffs6.droppedCount = 0;
2204        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
2205        diffs6.elementCount = 0;
2206        tr_set_compare( msgs->pex6, msgs->pexCount6,
2207                        newPex6, newCount6,
2208                        tr_pexCompare, sizeof( tr_pex ),
2209                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
2210        dbgmsg(
2211            msgs,
2212            "pex: old peer count %d+%d, new peer count %d+%d, "
2213            "added %d+%d, removed %d+%d",
2214            msgs->pexCount, msgs->pexCount6, newCount, newCount6,
2215            diffs.addedCount, diffs6.addedCount,
2216            diffs.droppedCount, diffs6.droppedCount );
2217
2218        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2219            !diffs6.droppedCount )
2220        {
2221            tr_free( diffs.elements );
2222            tr_free( diffs6.elements );
2223        }
2224        else
2225        {
2226            int  i;
2227            tr_benc val;
2228            char * benc;
2229            int bencLen;
2230            uint8_t * tmp, *walk;
2231            struct evbuffer * out = msgs->outMessages;
2232
2233            /* update peer */
2234            tr_free( msgs->pex );
2235            msgs->pex = diffs.elements;
2236            msgs->pexCount = diffs.elementCount;
2237            tr_free( msgs->pex6 );
2238            msgs->pex6 = diffs6.elements;
2239            msgs->pexCount6 = diffs6.elementCount;
2240
2241            /* build the pex payload */
2242            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
2243                                         * speed vs. likelihood? */
2244
2245            if( diffs.addedCount > 0)
2246            {
2247                /* "added" */
2248                tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2249                for( i = 0; i < diffs.addedCount; ++i ) {
2250                    memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2251                    memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2252                }
2253                assert( ( walk - tmp ) == diffs.addedCount * 6 );
2254                tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2255                tr_free( tmp );
2256
2257                /* "added.f"
2258                 * unset each holepunch flag because we don't support it. */
2259                tmp = walk = tr_new( uint8_t, diffs.addedCount );
2260                for( i = 0; i < diffs.addedCount; ++i )
2261                    *walk++ = diffs.added[i].flags & ~ADDED_F_HOLEPUNCH;
2262                assert( ( walk - tmp ) == diffs.addedCount );
2263                tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2264                tr_free( tmp );
2265            }
2266
2267            if( diffs.droppedCount > 0 )
2268            {
2269                /* "dropped" */
2270                tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2271                for( i = 0; i < diffs.droppedCount; ++i ) {
2272                    memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2273                    memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2274                }
2275                assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2276                tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2277                tr_free( tmp );
2278            }
2279
2280            if( diffs6.addedCount > 0 )
2281            {
2282                /* "added6" */
2283                tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2284                for( i = 0; i < diffs6.addedCount; ++i ) {
2285                    memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2286                    walk += 16;
2287                    memcpy( walk, &diffs6.added[i].port, 2 );
2288                    walk += 2;
2289                }
2290                assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2291                tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2292                tr_free( tmp );
2293
2294                /* "added6.f"
2295                 * unset each holepunch flag because we don't support it. */
2296                tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2297                for( i = 0; i < diffs6.addedCount; ++i )
2298                    *walk++ = diffs6.added[i].flags & ~ADDED_F_HOLEPUNCH;
2299                assert( ( walk - tmp ) == diffs6.addedCount );
2300                tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2301                tr_free( tmp );
2302            }
2303
2304            if( diffs6.droppedCount > 0 )
2305            {
2306                /* "dropped6" */
2307                tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2308                for( i = 0; i < diffs6.droppedCount; ++i ) {
2309                    memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2310                    walk += 16;
2311                    memcpy( walk, &diffs6.dropped[i].port, 2 );
2312                    walk += 2;
2313                }
2314                assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2315                tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2316                tr_free( tmp );
2317            }
2318
2319            /* write the pex message */
2320            benc = tr_bencToStr( &val, TR_FMT_BENC, &bencLen );
2321            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + bencLen );
2322            evbuffer_add_uint8 ( out, BT_LTEP );
2323            evbuffer_add_uint8 ( out, msgs->ut_pex_id );
2324            evbuffer_add       ( out, benc, bencLen );
2325            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2326            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", evbuffer_get_length( out ) );
2327            dbgOutMessageLen( msgs );
2328
2329            tr_free( benc );
2330            tr_bencFree( &val );
2331        }
2332
2333        /* cleanup */
2334        tr_free( diffs.added );
2335        tr_free( diffs.dropped );
2336        tr_free( newPex );
2337        tr_free( diffs6.added );
2338        tr_free( diffs6.dropped );
2339        tr_free( newPex6 );
2340
2341        /*msgs->clientSentPexAt = tr_time( );*/
2342    }
2343}
2344
2345static void
2346pexPulse( int foo UNUSED, short bar UNUSED, void * vmsgs )
2347{
2348    struct tr_peermsgs * msgs = vmsgs;
2349
2350    sendPex( msgs );
2351
2352    tr_timerAdd( msgs->pexTimer, PEX_INTERVAL_SECS, 0 );
2353}
2354
2355/**
2356***
2357**/
2358
2359tr_peermsgs*
2360tr_peerMsgsNew( struct tr_torrent    * torrent,
2361                struct tr_peer       * peer,
2362                tr_peer_callback     * callback,
2363                void                 * callbackData )
2364{
2365    tr_peermsgs * m;
2366
2367    assert( peer );
2368    assert( peer->io );
2369
2370    m = tr_new0( tr_peermsgs, 1 );
2371    m->callback = callback;
2372    m->callbackData = callbackData;
2373    m->peer = peer;
2374    m->torrent = torrent;
2375    m->peer->clientIsChoked = 1;
2376    m->peer->peerIsChoked = 1;
2377    m->peer->clientIsInterested = 0;
2378    m->peer->peerIsInterested = 0;
2379    m->state = AWAITING_BT_LENGTH;
2380    m->outMessages = evbuffer_new( );
2381    m->outMessagesBatchedAt = 0;
2382    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2383    m->incoming.block = evbuffer_new( );
2384    m->pexTimer = evtimer_new( torrent->session->event_base, pexPulse, m );
2385    peer->msgs = m;
2386    tr_timerAdd( m->pexTimer, PEX_INTERVAL_SECS, 0 );
2387
2388    if( tr_peerIoSupportsUTP( peer->io ) ) {
2389        const tr_address * addr = tr_peerIoGetAddress( peer->io, NULL );
2390        tr_peerMgrSetUtpSupported( torrent, addr );
2391        tr_peerMgrSetUtpFailed( torrent, addr, FALSE );
2392    }
2393
2394    if( tr_peerIoSupportsLTEP( peer->io ) )
2395        sendLtepHandshake( m );
2396
2397    tellPeerWhatWeHave( m );
2398
2399    if( tr_dhtEnabled( torrent->session ) && tr_peerIoSupportsDHT( peer->io ))
2400    {
2401        /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
2402        const struct tr_address *addr = tr_peerIoGetAddress( peer->io, NULL );
2403        if( addr->type == TR_AF_INET || tr_globalIPv6() ) {
2404            protocolSendPort( m, tr_dhtPort( torrent->session ) );
2405        }
2406    }
2407
2408    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2409    updateDesiredRequestCount( m );
2410
2411    return m;
2412}
2413
2414void
2415tr_peerMsgsFree( tr_peermsgs* msgs )
2416{
2417    if( msgs )
2418    {
2419        event_free( msgs->pexTimer );
2420
2421        evbuffer_free( msgs->incoming.block );
2422        evbuffer_free( msgs->outMessages );
2423        tr_free( msgs->pex6 );
2424        tr_free( msgs->pex );
2425
2426        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2427        tr_free( msgs );
2428    }
2429}
Note: See TracBrowser for help on using the repository browser.