source: trunk/libtransmission/peer-msgs.c @ 12111

Last change on this file since 12111 was 12111, checked in by jordan, 11 years ago

(trunk libT) copyediting.

  1. make some accidentally-public functions/variables private
  2. comment out tr_generateAllowedSet() since we're not using it
  3. argument list indentation
  • Property svn:keywords set to Date Rev Author Id
File size: 70.0 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 12111 2011-03-07 14:33:45Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdarg.h>
17#include <stdio.h>
18#include <stdlib.h>
19#include <string.h>
20
21#include <event2/bufferevent.h>
22#include <event2/event.h>
23
24#include "transmission.h"
25#include "bencode.h"
26#include "cache.h"
27#include "completion.h"
28#include "crypto.h"
29#ifdef WIN32
30#include "net.h" /* for ECONN */
31#endif
32#include "peer-io.h"
33#include "peer-mgr.h"
34#include "peer-msgs.h"
35#include "session.h"
36#include "stats.h"
37#include "torrent.h"
38#include "torrent-magnet.h"
39#include "tr-dht.h"
40#include "utils.h"
41#include "version.h"
42
43/**
44***
45**/
46
47enum
48{
49    BT_CHOKE                = 0,
50    BT_UNCHOKE              = 1,
51    BT_INTERESTED           = 2,
52    BT_NOT_INTERESTED       = 3,
53    BT_HAVE                 = 4,
54    BT_BITFIELD             = 5,
55    BT_REQUEST              = 6,
56    BT_PIECE                = 7,
57    BT_CANCEL               = 8,
58    BT_PORT                 = 9,
59
60    BT_FEXT_SUGGEST         = 13,
61    BT_FEXT_HAVE_ALL        = 14,
62    BT_FEXT_HAVE_NONE       = 15,
63    BT_FEXT_REJECT          = 16,
64    BT_FEXT_ALLOWED_FAST    = 17,
65
66    BT_LTEP                 = 20,
67
68    LTEP_HANDSHAKE          = 0,
69
70    UT_PEX_ID               = 1,
71    UT_METADATA_ID          = 3,
72
73    MAX_PEX_PEER_COUNT      = 50,
74
75    MIN_CHOKE_PERIOD_SEC    = 10,
76
77    /* idle seconds before we send a keepalive */
78    KEEPALIVE_INTERVAL_SECS = 100,
79
80    PEX_INTERVAL_SECS       = 90, /* sec between sendPex() calls */
81
82    REQQ                    = 512,
83
84    METADATA_REQQ           = 64,
85
86    /* used in lowering the outMessages queue period */
87    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
88    HIGH_PRIORITY_INTERVAL_SECS = 2,
89    LOW_PRIORITY_INTERVAL_SECS = 10,
90
91    /* number of pieces we'll allow in our fast set */
92    MAX_FAST_SET_SIZE = 3,
93
94    /* defined in BEP #9 */
95    METADATA_MSG_TYPE_REQUEST = 0,
96    METADATA_MSG_TYPE_DATA = 1,
97    METADATA_MSG_TYPE_REJECT = 2
98};
99
100enum
101{
102    AWAITING_BT_LENGTH,
103    AWAITING_BT_ID,
104    AWAITING_BT_MESSAGE,
105    AWAITING_BT_PIECE
106};
107
108/**
109***
110**/
111
112struct peer_request
113{
114    uint32_t    index;
115    uint32_t    offset;
116    uint32_t    length;
117};
118
119static void
120blockToReq( const tr_torrent     * tor,
121            tr_block_index_t       block,
122            struct peer_request  * setme )
123{
124    tr_torrentGetBlockLocation( tor, block, &setme->index,
125                                            &setme->offset,
126                                            &setme->length );
127}
128
129/**
130***
131**/
132
133/* this is raw, unchanged data from the peer regarding
134 * the current message that it's sending us. */
135struct tr_incoming
136{
137    uint8_t                id;
138    uint32_t               length; /* includes the +1 for id length */
139    struct peer_request    blockReq; /* metadata for incoming blocks */
140    struct evbuffer *      block; /* piece data for incoming blocks */
141};
142
143/**
144 * Low-level communication state information about a connected peer.
145 *
146 * This structure remembers the low-level protocol states that we're
147 * in with this peer, such as active requests, pex messages, and so on.
148 * Its fields are all private to peer-msgs.c.
149 *
150 * Data not directly involved with sending & receiving messages is
151 * stored in tr_peer, where it can be accessed by both peermsgs and
152 * the peer manager.
153 *
154 * @see struct peer_atom
155 * @see tr_peer
156 */
157struct tr_peermsgs
158{
159    tr_bool         peerSupportsPex;
160    tr_bool         peerSupportsMetadataXfer;
161    tr_bool         clientSentLtepHandshake;
162    tr_bool         peerSentLtepHandshake;
163
164    /*tr_bool         haveFastSet;*/
165
166    int             desiredRequestCount;
167
168    int             prefetchCount;
169
170    /* how long the outMessages batch should be allowed to grow before
171     * it's flushed -- some messages (like requests >:) should be sent
172     * very quickly; others aren't as urgent. */
173    int8_t          outMessagesBatchPeriod;
174
175    uint8_t         state;
176    uint8_t         ut_pex_id;
177    uint8_t         ut_metadata_id;
178    uint16_t        pexCount;
179    uint16_t        pexCount6;
180
181    size_t          metadata_size_hint;
182#if 0
183    size_t                 fastsetSize;
184    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
185#endif
186
187    tr_peer *              peer;
188
189    tr_torrent *           torrent;
190
191    tr_peer_callback      * callback;
192    void                  * callbackData;
193
194    struct evbuffer *      outMessages; /* all the non-piece messages */
195
196    struct peer_request    peerAskedFor[REQQ];
197
198    int                    peerAskedForMetadata[METADATA_REQQ];
199    int                    peerAskedForMetadataCount;
200
201    tr_pex               * pex;
202    tr_pex               * pex6;
203
204    /*time_t                 clientSentPexAt;*/
205    time_t                 clientSentAnythingAt;
206
207    /* when we started batching the outMessages */
208    time_t                outMessagesBatchedAt;
209
210    struct tr_incoming    incoming;
211
212    /* if the peer supports the Extension Protocol in BEP 10 and
213       supplied a reqq argument, it's stored here. Otherwise, the
214       value is zero and should be ignored. */
215    int64_t               reqq;
216
217    struct event        * pexTimer;
218};
219
220/**
221***
222**/
223
224#if 0
225static tr_bitfield*
226getHave( const struct tr_peermsgs * msgs )
227{
228    if( msgs->peer->have == NULL )
229        msgs->peer->have = tr_bitfieldNew( msgs->torrent->info.pieceCount );
230    return msgs->peer->have;
231}
232#endif
233
234static inline tr_session*
235getSession( struct tr_peermsgs * msgs )
236{
237    return msgs->torrent->session;
238}
239
240/**
241***
242**/
243
244static void
245myDebug( const char * file, int line,
246         const struct tr_peermsgs * msgs,
247         const char * fmt, ... )
248{
249    FILE * fp = tr_getLog( );
250
251    if( fp )
252    {
253        va_list           args;
254        char              timestr[64];
255        struct evbuffer * buf = evbuffer_new( );
256        char *            base = tr_basename( file );
257
258        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
259                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
260                             tr_torrentName( msgs->torrent ),
261                             tr_peerIoGetAddrStr( msgs->peer->io ),
262                             msgs->peer->client );
263        va_start( args, fmt );
264        evbuffer_add_vprintf( buf, fmt, args );
265        va_end( args );
266        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
267        fputs( (const char*)evbuffer_pullup( buf, -1 ), fp );
268
269        tr_free( base );
270        evbuffer_free( buf );
271    }
272}
273
274#define dbgmsg( msgs, ... ) \
275    do { \
276        if( tr_deepLoggingIsActive( ) ) \
277            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
278    } while( 0 )
279
280/**
281***
282**/
283
284static void
285pokeBatchPeriod( tr_peermsgs * msgs, int interval )
286{
287    if( msgs->outMessagesBatchPeriod > interval )
288    {
289        msgs->outMessagesBatchPeriod = interval;
290        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
291    }
292}
293
294static void
295dbgOutMessageLen( tr_peermsgs * msgs )
296{
297    dbgmsg( msgs, "outMessage size is now %zu", evbuffer_get_length( msgs->outMessages ) );
298}
299
300static void
301protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
302{
303    struct evbuffer * out = msgs->outMessages;
304
305    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
306
307    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
308    evbuffer_add_uint8 ( out, BT_FEXT_REJECT );
309    evbuffer_add_uint32( out, req->index );
310    evbuffer_add_uint32( out, req->offset );
311    evbuffer_add_uint32( out, req->length );
312
313    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
314    dbgOutMessageLen( msgs );
315}
316
317static void
318protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
319{
320    struct evbuffer * out = msgs->outMessages;
321
322    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
323    evbuffer_add_uint8 ( out, BT_REQUEST );
324    evbuffer_add_uint32( out, req->index );
325    evbuffer_add_uint32( out, req->offset );
326    evbuffer_add_uint32( out, req->length );
327
328    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
329    dbgOutMessageLen( msgs );
330    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
331}
332
333static void
334protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
335{
336    struct evbuffer * out = msgs->outMessages;
337
338    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
339    evbuffer_add_uint8 ( out, BT_CANCEL );
340    evbuffer_add_uint32( out, req->index );
341    evbuffer_add_uint32( out, req->offset );
342    evbuffer_add_uint32( out, req->length );
343
344    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
345    dbgOutMessageLen( msgs );
346    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
347}
348
349static void
350protocolSendPort(tr_peermsgs *msgs, uint16_t port)
351{
352    struct evbuffer * out = msgs->outMessages;
353
354    dbgmsg( msgs, "sending Port %u", port);
355    evbuffer_add_uint32( out, 3 );
356    evbuffer_add_uint8 ( out, BT_PORT );
357    evbuffer_add_uint16( out, port);
358}
359
360static void
361protocolSendHave( tr_peermsgs * msgs, uint32_t index )
362{
363    struct evbuffer * out = msgs->outMessages;
364
365    evbuffer_add_uint32( out, sizeof(uint8_t) + sizeof(uint32_t) );
366    evbuffer_add_uint8 ( out, BT_HAVE );
367    evbuffer_add_uint32( out, index );
368
369    dbgmsg( msgs, "sending Have %u", index );
370    dbgOutMessageLen( msgs );
371    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
372}
373
374#if 0
375static void
376protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
377{
378    tr_peerIo       * io  = msgs->peer->io;
379    struct evbuffer * out = msgs->outMessages;
380
381    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
382
383    evbuffer_add_uint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
384    evbuffer_add_uint8 ( io, out, BT_FEXT_ALLOWED_FAST );
385    evbuffer_add_uint32( io, out, pieceIndex );
386
387    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
388    dbgOutMessageLen( msgs );
389}
390#endif
391
392static void
393protocolSendChoke( tr_peermsgs * msgs, int choke )
394{
395    struct evbuffer * out = msgs->outMessages;
396
397    evbuffer_add_uint32( out, sizeof( uint8_t ) );
398    evbuffer_add_uint8 ( out, choke ? BT_CHOKE : BT_UNCHOKE );
399
400    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
401    dbgOutMessageLen( msgs );
402    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
403}
404
405static void
406protocolSendHaveAll( tr_peermsgs * msgs )
407{
408    struct evbuffer * out = msgs->outMessages;
409
410    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
411
412    evbuffer_add_uint32( out, sizeof( uint8_t ) );
413    evbuffer_add_uint8 ( out, BT_FEXT_HAVE_ALL );
414
415    dbgmsg( msgs, "sending HAVE_ALL..." );
416    dbgOutMessageLen( msgs );
417    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
418}
419
420static void
421protocolSendHaveNone( tr_peermsgs * msgs )
422{
423    struct evbuffer * out = msgs->outMessages;
424
425    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
426
427    evbuffer_add_uint32( out, sizeof( uint8_t ) );
428    evbuffer_add_uint8 ( out, BT_FEXT_HAVE_NONE );
429
430    dbgmsg( msgs, "sending HAVE_NONE..." );
431    dbgOutMessageLen( msgs );
432    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
433}
434
435/**
436***  EVENTS
437**/
438
439static void
440publish( tr_peermsgs * msgs, tr_peer_event * e )
441{
442    assert( msgs->peer );
443    assert( msgs->peer->msgs == msgs );
444
445    if( msgs->callback != NULL )
446        msgs->callback( msgs->peer, e, msgs->callbackData );
447}
448
449static void
450fireError( tr_peermsgs * msgs, int err )
451{
452    tr_peer_event e = TR_PEER_EVENT_INIT;
453    e.eventType = TR_PEER_ERROR;
454    e.err = err;
455    publish( msgs, &e );
456}
457
458static void
459fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
460{
461    tr_peer_event e = TR_PEER_EVENT_INIT;
462    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
463    e.pieceIndex = req->index;
464    e.offset = req->offset;
465    e.length = req->length;
466    publish( msgs, &e );
467}
468
469static void
470fireGotRej( tr_peermsgs * msgs, const struct peer_request * req )
471{
472    tr_peer_event e = TR_PEER_EVENT_INIT;
473    e.eventType = TR_PEER_CLIENT_GOT_REJ;
474    e.pieceIndex = req->index;
475    e.offset = req->offset;
476    e.length = req->length;
477    publish( msgs, &e );
478}
479
480static void
481fireGotChoke( tr_peermsgs * msgs )
482{
483    tr_peer_event e = TR_PEER_EVENT_INIT;
484    e.eventType = TR_PEER_CLIENT_GOT_CHOKE;
485    publish( msgs, &e );
486}
487
488static void
489fireClientGotHaveAll( tr_peermsgs * msgs )
490{
491    tr_peer_event e = TR_PEER_EVENT_INIT;
492    e.eventType = TR_PEER_CLIENT_GOT_HAVE_ALL;
493    publish( msgs, &e );
494}
495
496static void
497fireClientGotHaveNone( tr_peermsgs * msgs )
498{
499    tr_peer_event e = TR_PEER_EVENT_INIT;
500    e.eventType = TR_PEER_CLIENT_GOT_HAVE_NONE;
501    publish( msgs, &e );
502}
503
504static void
505fireClientGotData( tr_peermsgs * msgs, uint32_t length, int wasPieceData )
506{
507    tr_peer_event e = TR_PEER_EVENT_INIT;
508
509    e.length = length;
510    e.eventType = TR_PEER_CLIENT_GOT_DATA;
511    e.wasPieceData = wasPieceData;
512    publish( msgs, &e );
513}
514
515static void
516fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
517{
518    tr_peer_event e = TR_PEER_EVENT_INIT;
519    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
520    e.pieceIndex = pieceIndex;
521    publish( msgs, &e );
522}
523
524static void
525fireClientGotPort( tr_peermsgs * msgs, tr_port port )
526{
527    tr_peer_event e = TR_PEER_EVENT_INIT;
528    e.eventType = TR_PEER_CLIENT_GOT_PORT;
529    e.port = port;
530    publish( msgs, &e );
531}
532
533static void
534fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
535{
536    tr_peer_event e = TR_PEER_EVENT_INIT;
537    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
538    e.pieceIndex = pieceIndex;
539    publish( msgs, &e );
540}
541
542static void
543fireClientGotBitfield( tr_peermsgs * msgs, tr_bitfield * bitfield )
544{
545    tr_peer_event e = TR_PEER_EVENT_INIT;
546    e.eventType = TR_PEER_CLIENT_GOT_BITFIELD;
547    e.bitfield = bitfield;
548    publish( msgs, &e );
549}
550
551static void
552fireClientGotHave( tr_peermsgs * msgs, tr_piece_index_t index )
553{
554    tr_peer_event e = TR_PEER_EVENT_INIT;
555    e.eventType = TR_PEER_CLIENT_GOT_HAVE;
556    e.pieceIndex = index;
557    publish( msgs, &e );
558}
559
560static void
561firePeerGotData( tr_peermsgs * msgs, uint32_t length, tr_bool wasPieceData )
562{
563    tr_peer_event e = TR_PEER_EVENT_INIT;
564
565    e.length = length;
566    e.eventType = TR_PEER_PEER_GOT_DATA;
567    e.wasPieceData = wasPieceData;
568
569    publish( msgs, &e );
570}
571
572/**
573***  ALLOWED FAST SET
574***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
575**/
576
577#if 0
578size_t
579tr_generateAllowedSet( tr_piece_index_t * setmePieces,
580                       size_t             desiredSetSize,
581                       size_t             pieceCount,
582                       const uint8_t    * infohash,
583                       const tr_address * addr )
584{
585    size_t setSize = 0;
586
587    assert( setmePieces );
588    assert( desiredSetSize <= pieceCount );
589    assert( desiredSetSize );
590    assert( pieceCount );
591    assert( infohash );
592    assert( addr );
593
594    if( addr->type == TR_AF_INET )
595    {
596        uint8_t w[SHA_DIGEST_LENGTH + 4], *walk=w;
597        uint8_t x[SHA_DIGEST_LENGTH];
598
599        uint32_t ui32 = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
600        memcpy( w, &ui32, sizeof( uint32_t ) );
601        walk += sizeof( uint32_t );
602        memcpy( walk, infohash, SHA_DIGEST_LENGTH );                 /* (2) */
603        walk += SHA_DIGEST_LENGTH;
604        tr_sha1( x, w, walk-w, NULL );                               /* (3) */
605        assert( sizeof( w ) == walk-w );
606
607        while( setSize<desiredSetSize )
608        {
609            int i;
610            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
611            {
612                size_t k;
613                uint32_t j = i * 4;                                  /* (5) */
614                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
615                uint32_t index = y % pieceCount;                     /* (7) */
616
617                for( k=0; k<setSize; ++k )                           /* (8) */
618                    if( setmePieces[k] == index )
619                        break;
620
621                if( k == setSize )
622                    setmePieces[setSize++] = index;                  /* (9) */
623            }
624
625            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
626        }
627    }
628
629    return setSize;
630}
631
632static void
633updateFastSet( tr_peermsgs * msgs UNUSED )
634{
635    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
636    const int peerIsNeedy = msgs->peer->progress < 0.10;
637
638    if( fext && peerIsNeedy && !msgs->haveFastSet )
639    {
640        size_t i;
641        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
642        const tr_info * inf = &msgs->torrent->info;
643        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
644
645        /* build the fast set */
646        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
647        msgs->haveFastSet = 1;
648
649        /* send it to the peer */
650        for( i=0; i<msgs->fastsetSize; ++i )
651            protocolSendAllowedFast( msgs, msgs->fastset[i] );
652    }
653}
654#endif
655
656/**
657***  INTEREST
658**/
659
660static void
661sendInterest( tr_peermsgs * msgs, tr_bool clientIsInterested )
662{
663    struct evbuffer * out = msgs->outMessages;
664
665    assert( msgs );
666    assert( tr_isBool( clientIsInterested ) );
667
668    msgs->peer->clientIsInterested = clientIsInterested;
669    dbgmsg( msgs, "Sending %s", clientIsInterested ? "Interested" : "Not Interested" );
670    evbuffer_add_uint32( out, sizeof( uint8_t ) );
671    evbuffer_add_uint8 ( out, clientIsInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
672
673    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
674    dbgOutMessageLen( msgs );
675}
676
677static void
678updateInterest( tr_peermsgs * msgs UNUSED )
679{
680    /* FIXME -- might need to poke the mgr on startup */
681}
682
683void
684tr_peerMsgsSetInterested( tr_peermsgs * msgs, int isInterested )
685{
686    assert( tr_isBool( isInterested ) );
687
688    if( isInterested != msgs->peer->clientIsInterested )
689        sendInterest( msgs, isInterested );
690}
691
692static tr_bool
693popNextMetadataRequest( tr_peermsgs * msgs, int * piece )
694{
695    if( msgs->peerAskedForMetadataCount == 0 )
696        return FALSE;
697
698    *piece = msgs->peerAskedForMetadata[0];
699
700    tr_removeElementFromArray( msgs->peerAskedForMetadata, 0, sizeof( int ),
701                               msgs->peerAskedForMetadataCount-- );
702
703    return TRUE;
704}
705
706static tr_bool
707popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
708{
709    if( msgs->peer->pendingReqsToClient == 0 )
710        return FALSE;
711
712    *setme = msgs->peerAskedFor[0];
713
714    tr_removeElementFromArray( msgs->peerAskedFor, 0, sizeof( struct peer_request ),
715                               msgs->peer->pendingReqsToClient-- );
716
717    return TRUE;
718}
719
720static void
721cancelAllRequestsToClient( tr_peermsgs * msgs )
722{
723    struct peer_request req;
724    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
725
726    while( popNextRequest( msgs, &req ))
727        if( mustSendCancel )
728            protocolSendReject( msgs, &req );
729}
730
731void
732tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
733{
734    const time_t now = tr_time( );
735    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
736
737    assert( msgs );
738    assert( msgs->peer );
739    assert( choke == 0 || choke == 1 );
740
741    if( msgs->peer->chokeChangedAt > fibrillationTime )
742    {
743        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
744    }
745    else if( msgs->peer->peerIsChoked != choke )
746    {
747        msgs->peer->peerIsChoked = choke;
748        if( choke )
749            cancelAllRequestsToClient( msgs );
750        protocolSendChoke( msgs, choke );
751        msgs->peer->chokeChangedAt = now;
752    }
753}
754
755/**
756***
757**/
758
759void
760tr_peerMsgsHave( tr_peermsgs * msgs, uint32_t index )
761{
762    protocolSendHave( msgs, index );
763
764    /* since we have more pieces now, we might not be interested in this peer */
765    updateInterest( msgs );
766}
767
768/**
769***
770**/
771
772static tr_bool
773reqIsValid( const tr_peermsgs * peer,
774            uint32_t            index,
775            uint32_t            offset,
776            uint32_t            length )
777{
778    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
779}
780
781static tr_bool
782requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
783{
784    return reqIsValid( msgs, req->index, req->offset, req->length );
785}
786
787void
788tr_peerMsgsCancel( tr_peermsgs * msgs, tr_block_index_t block )
789{
790    struct peer_request req;
791/*fprintf( stderr, "SENDING CANCEL MESSAGE FOR BLOCK %zu\n\t\tFROM PEER %p ------------------------------------\n", (size_t)block, msgs->peer );*/
792    blockToReq( msgs->torrent, block, &req );
793    protocolSendCancel( msgs, &req );
794}
795
796/**
797***
798**/
799
800static void
801sendLtepHandshake( tr_peermsgs * msgs )
802{
803    tr_benc val, *m;
804    char * buf;
805    int len;
806    tr_bool allow_pex;
807    tr_bool allow_metadata_xfer;
808    struct evbuffer * out = msgs->outMessages;
809    const unsigned char * ipv6 = tr_globalIPv6();
810
811    if( msgs->clientSentLtepHandshake )
812        return;
813
814    dbgmsg( msgs, "sending an ltep handshake" );
815    msgs->clientSentLtepHandshake = 1;
816
817    /* decide if we want to advertise metadata xfer support (BEP 9) */
818    if( tr_torrentIsPrivate( msgs->torrent ) )
819        allow_metadata_xfer = 0;
820    else
821        allow_metadata_xfer = 1;
822
823    /* decide if we want to advertise pex support */
824    if( !tr_torrentAllowsPex( msgs->torrent ) )
825        allow_pex = 0;
826    else if( msgs->peerSentLtepHandshake )
827        allow_pex = msgs->peerSupportsPex ? 1 : 0;
828    else
829        allow_pex = 1;
830
831    tr_bencInitDict( &val, 8 );
832    tr_bencDictAddInt( &val, "e", getSession(msgs)->encryptionMode != TR_CLEAR_PREFERRED );
833    if( ipv6 != NULL )
834        tr_bencDictAddRaw( &val, "ipv6", ipv6, 16 );
835    if( allow_metadata_xfer && tr_torrentHasMetadata( msgs->torrent )
836                            && ( msgs->torrent->infoDictLength > 0 ) )
837        tr_bencDictAddInt( &val, "metadata_size", msgs->torrent->infoDictLength );
838    tr_bencDictAddInt( &val, "p", tr_sessionGetPublicPeerPort( getSession(msgs) ) );
839    tr_bencDictAddInt( &val, "reqq", REQQ );
840    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
841    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
842    m  = tr_bencDictAddDict( &val, "m", 2 );
843    if( allow_metadata_xfer )
844        tr_bencDictAddInt( m, "ut_metadata", UT_METADATA_ID );
845    if( allow_pex )
846        tr_bencDictAddInt( m, "ut_pex", UT_PEX_ID );
847
848    buf = tr_bencToStr( &val, TR_FMT_BENC, &len );
849
850    evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + len );
851    evbuffer_add_uint8 ( out, BT_LTEP );
852    evbuffer_add_uint8 ( out, LTEP_HANDSHAKE );
853    evbuffer_add       ( out, buf, len );
854    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
855    dbgOutMessageLen( msgs );
856
857    /* cleanup */
858    tr_bencFree( &val );
859    tr_free( buf );
860}
861
862static void
863parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
864{
865    int64_t   i;
866    tr_benc   val, * sub;
867    uint8_t * tmp = tr_new( uint8_t, len );
868    const uint8_t *addr;
869    size_t addr_len;
870    tr_pex pex;
871    int8_t seedProbability = -1;
872
873    memset( &pex, 0, sizeof( tr_pex ) );
874
875    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
876    msgs->peerSentLtepHandshake = 1;
877
878    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
879    {
880        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
881        tr_free( tmp );
882        return;
883    }
884
885    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
886
887    /* does the peer prefer encrypted connections? */
888    if( tr_bencDictFindInt( &val, "e", &i ) ) {
889        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
890                                              : ENCRYPTION_PREFERENCE_NO;
891        if( i )
892            pex.flags |= ADDED_F_ENCRYPTION_FLAG;
893    }
894
895    /* check supported messages for utorrent pex */
896    msgs->peerSupportsPex = 0;
897    msgs->peerSupportsMetadataXfer = 0;
898
899    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
900        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
901            msgs->peerSupportsPex = i != 0;
902            msgs->ut_pex_id = (uint8_t) i;
903            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
904        }
905        if( tr_bencDictFindInt( sub, "ut_metadata", &i ) ) {
906            msgs->peerSupportsMetadataXfer = i != 0;
907            msgs->ut_metadata_id = (uint8_t) i;
908            dbgmsg( msgs, "msgs->ut_metadata_id is %d", (int)msgs->ut_metadata_id );
909        }
910        if( tr_bencDictFindInt( sub, "ut_holepunch", &i ) ) {
911            /* Mysterious µTorrent extension that we don't grok.  However,
912               it implies support for µTP, so use it to indicate that. */
913            tr_peerMgrSetUtpFailed( msgs->torrent,
914                                    tr_peerIoGetAddress( msgs->peer->io, NULL ),
915                                    FALSE );
916        }
917    }
918
919    /* look for metainfo size (BEP 9) */
920    if( tr_bencDictFindInt( &val, "metadata_size", &i ) ) {
921        tr_torrentSetMetadataSizeHint( msgs->torrent, i );
922        msgs->metadata_size_hint = (size_t) i;
923    }
924
925    /* look for upload_only (BEP 21) */
926    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
927        seedProbability = i==0 ? 0 : 100;
928
929    /* get peer's listening port */
930    if( tr_bencDictFindInt( &val, "p", &i ) ) {
931        pex.port = htons( (uint16_t)i );
932        fireClientGotPort( msgs, pex.port );
933        dbgmsg( msgs, "peer's port is now %d", (int)i );
934    }
935
936    if( tr_peerIoIsIncoming( msgs->peer->io )
937        && tr_bencDictFindRaw( &val, "ipv4", &addr, &addr_len )
938        && ( addr_len == 4 ) )
939    {
940        pex.addr.type = TR_AF_INET;
941        memcpy( &pex.addr.addr.addr4, addr, 4 );
942        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
943    }
944
945    if( tr_peerIoIsIncoming( msgs->peer->io )
946        && tr_bencDictFindRaw( &val, "ipv6", &addr, &addr_len )
947        && ( addr_len == 16 ) )
948    {
949        pex.addr.type = TR_AF_INET6;
950        memcpy( &pex.addr.addr.addr6, addr, 16 );
951        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
952    }
953
954    /* get peer's maximum request queue size */
955    if( tr_bencDictFindInt( &val, "reqq", &i ) )
956        msgs->reqq = i;
957
958    tr_bencFree( &val );
959    tr_free( tmp );
960}
961
962static void
963parseUtMetadata( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
964{
965    tr_benc dict;
966    char * msg_end;
967    char * benc_end;
968    int64_t msg_type = -1;
969    int64_t piece = -1;
970    int64_t total_size = 0;
971    uint8_t * tmp = tr_new( uint8_t, msglen );
972
973    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
974    msg_end = (char*)tmp + msglen;
975
976    if( !tr_bencLoad( tmp, msglen, &dict, &benc_end ) )
977    {
978        tr_bencDictFindInt( &dict, "msg_type", &msg_type );
979        tr_bencDictFindInt( &dict, "piece", &piece );
980        tr_bencDictFindInt( &dict, "total_size", &total_size );
981        tr_bencFree( &dict );
982    }
983
984    dbgmsg( msgs, "got ut_metadata msg: type %d, piece %d, total_size %d",
985            (int)msg_type, (int)piece, (int)total_size );
986
987    if( msg_type == METADATA_MSG_TYPE_REJECT )
988    {
989        /* NOOP */
990    }
991
992    if( ( msg_type == METADATA_MSG_TYPE_DATA )
993        && ( !tr_torrentHasMetadata( msgs->torrent ) )
994        && ( msg_end - benc_end <= METADATA_PIECE_SIZE )
995        && ( piece * METADATA_PIECE_SIZE + (msg_end - benc_end) <= total_size ) )
996    {
997        const int pieceLen = msg_end - benc_end;
998        tr_torrentSetMetadataPiece( msgs->torrent, piece, benc_end, pieceLen );
999    }
1000
1001    if( msg_type == METADATA_MSG_TYPE_REQUEST )
1002    {
1003        if( ( piece >= 0 )
1004            && tr_torrentHasMetadata( msgs->torrent )
1005            && !tr_torrentIsPrivate( msgs->torrent )
1006            && ( msgs->peerAskedForMetadataCount < METADATA_REQQ ) )
1007        {
1008            msgs->peerAskedForMetadata[msgs->peerAskedForMetadataCount++] = piece;
1009        }
1010        else
1011        {
1012            tr_benc tmp;
1013            int payloadLen;
1014            char * payload;
1015            struct evbuffer * out = msgs->outMessages;
1016
1017            /* build the rejection message */
1018            tr_bencInitDict( &tmp, 2 );
1019            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1020            tr_bencDictAddInt( &tmp, "piece", piece );
1021            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1022            tr_bencFree( &tmp );
1023
1024            /* write it out as a LTEP message to our outMessages buffer */
1025            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen );
1026            evbuffer_add_uint8 ( out, BT_LTEP );
1027            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1028            evbuffer_add       ( out, payload, payloadLen );
1029            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1030            dbgOutMessageLen( msgs );
1031
1032            tr_free( payload );
1033        }
1034    }
1035
1036    tr_free( tmp );
1037}
1038
1039static void
1040parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1041{
1042    int loaded = 0;
1043    uint8_t * tmp = tr_new( uint8_t, msglen );
1044    tr_benc val;
1045    tr_torrent * tor = msgs->torrent;
1046    const uint8_t * added;
1047    size_t added_len;
1048
1049    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1050
1051    if( tr_torrentAllowsPex( tor )
1052      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1053    {
1054        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1055        {
1056            tr_pex * pex;
1057            size_t i, n;
1058            size_t added_f_len = 0;
1059            const uint8_t * added_f = NULL;
1060
1061            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1062            pex = tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len, &n );
1063
1064            n = MIN( n, MAX_PEX_PEER_COUNT );
1065            for( i=0; i<n; ++i )
1066            {
1067                int seedProbability = -1;
1068                if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
1069                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
1070            }
1071
1072            tr_free( pex );
1073        }
1074
1075        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1076        {
1077            tr_pex * pex;
1078            size_t i, n;
1079            size_t added_f_len = 0;
1080            const uint8_t * added_f = NULL;
1081
1082            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1083            pex = tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len, &n );
1084
1085            n = MIN( n, MAX_PEX_PEER_COUNT );
1086            for( i=0; i<n; ++i )
1087            {
1088                int seedProbability = -1;
1089                if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
1090                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
1091            }
1092
1093            tr_free( pex );
1094        }
1095    }
1096
1097    if( loaded )
1098        tr_bencFree( &val );
1099    tr_free( tmp );
1100}
1101
1102static void sendPex( tr_peermsgs * msgs );
1103
1104static void
1105parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer  * inbuf )
1106{
1107    uint8_t ltep_msgid;
1108
1109    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1110    msglen--;
1111
1112    if( ltep_msgid == LTEP_HANDSHAKE )
1113    {
1114        dbgmsg( msgs, "got ltep handshake" );
1115        parseLtepHandshake( msgs, msglen, inbuf );
1116        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1117        {
1118            sendLtepHandshake( msgs );
1119            sendPex( msgs );
1120        }
1121    }
1122    else if( ltep_msgid == UT_PEX_ID )
1123    {
1124        dbgmsg( msgs, "got ut pex" );
1125        msgs->peerSupportsPex = 1;
1126        parseUtPex( msgs, msglen, inbuf );
1127    }
1128    else if( ltep_msgid == UT_METADATA_ID )
1129    {
1130        dbgmsg( msgs, "got ut metadata" );
1131        msgs->peerSupportsMetadataXfer = 1;
1132        parseUtMetadata( msgs, msglen, inbuf );
1133    }
1134    else
1135    {
1136        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1137        evbuffer_drain( inbuf, msglen );
1138    }
1139}
1140
1141static int
1142readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1143{
1144    uint32_t len;
1145
1146    if( inlen < sizeof( len ) )
1147        return READ_LATER;
1148
1149    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1150
1151    if( len == 0 ) /* peer sent us a keepalive message */
1152        dbgmsg( msgs, "got KeepAlive" );
1153    else
1154    {
1155        msgs->incoming.length = len;
1156        msgs->state = AWAITING_BT_ID;
1157    }
1158
1159    return READ_NOW;
1160}
1161
1162static int readBtMessage( tr_peermsgs *, struct evbuffer *, size_t );
1163
1164static int
1165readBtId( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1166{
1167    uint8_t id;
1168
1169    if( inlen < sizeof( uint8_t ) )
1170        return READ_LATER;
1171
1172    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1173    msgs->incoming.id = id;
1174    dbgmsg( msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length );
1175
1176    if( id == BT_PIECE )
1177    {
1178        msgs->state = AWAITING_BT_PIECE;
1179        return READ_NOW;
1180    }
1181    else if( msgs->incoming.length != 1 )
1182    {
1183        msgs->state = AWAITING_BT_MESSAGE;
1184        return READ_NOW;
1185    }
1186    else return readBtMessage( msgs, inbuf, inlen - 1 );
1187}
1188
1189static void
1190updatePeerProgress( tr_peermsgs * msgs )
1191{
1192    tr_peerUpdateProgress( msgs->torrent, msgs->peer );
1193
1194    /*updateFastSet( msgs );*/
1195    updateInterest( msgs );
1196}
1197
1198static void
1199prefetchPieces( tr_peermsgs *msgs )
1200{
1201    int i;
1202
1203    if( !getSession(msgs)->isPrefetchEnabled )
1204        return;
1205
1206    /* Maintain 12 prefetched blocks per unchoked peer */
1207    for( i=msgs->prefetchCount; i<msgs->peer->pendingReqsToClient && i<12; ++i )
1208    {
1209        const struct peer_request * req = msgs->peerAskedFor + i;
1210        if( requestIsValid( msgs, req ) )
1211        {
1212            tr_cachePrefetchBlock( getSession(msgs)->cache, msgs->torrent, req->index, req->offset, req->length );
1213            ++msgs->prefetchCount;
1214        }
1215    }
1216}
1217
1218static void
1219peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
1220{
1221    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1222    const int reqIsValid = requestIsValid( msgs, req );
1223    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( &msgs->torrent->completion, req->index );
1224    const int peerIsChoked = msgs->peer->peerIsChoked;
1225
1226    int allow = FALSE;
1227
1228    if( !reqIsValid )
1229        dbgmsg( msgs, "rejecting an invalid request." );
1230    else if( !clientHasPiece )
1231        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1232    else if( peerIsChoked )
1233        dbgmsg( msgs, "rejecting request from choked peer" );
1234    else if( msgs->peer->pendingReqsToClient + 1 >= REQQ )
1235        dbgmsg( msgs, "rejecting request ... reqq is full" );
1236    else
1237        allow = TRUE;
1238
1239    if( allow ) {
1240        msgs->peerAskedFor[msgs->peer->pendingReqsToClient++] = *req;
1241        prefetchPieces( msgs );
1242    } else if( fext ) {
1243        protocolSendReject( msgs, req );
1244    }
1245}
1246
1247static tr_bool
1248messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1249{
1250    switch( id )
1251    {
1252        case BT_CHOKE:
1253        case BT_UNCHOKE:
1254        case BT_INTERESTED:
1255        case BT_NOT_INTERESTED:
1256        case BT_FEXT_HAVE_ALL:
1257        case BT_FEXT_HAVE_NONE:
1258            return len == 1;
1259
1260        case BT_HAVE:
1261        case BT_FEXT_SUGGEST:
1262        case BT_FEXT_ALLOWED_FAST:
1263            return len == 5;
1264
1265        case BT_BITFIELD:
1266            if( tr_torrentHasMetadata( msg->torrent ) )
1267                return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1268            /* we don't know the piece count yet,
1269               so we can only guess whether to send TRUE or FALSE */
1270            if( msg->metadata_size_hint > 0 )
1271                return len <= msg->metadata_size_hint;
1272            return TRUE;
1273
1274        case BT_REQUEST:
1275        case BT_CANCEL:
1276        case BT_FEXT_REJECT:
1277            return len == 13;
1278
1279        case BT_PIECE:
1280            return len > 9 && len <= 16393;
1281
1282        case BT_PORT:
1283            return len == 3;
1284
1285        case BT_LTEP:
1286            return len >= 2;
1287
1288        default:
1289            return FALSE;
1290    }
1291}
1292
1293static int clientGotBlock( tr_peermsgs *               msgs,
1294                           struct evbuffer *           block,
1295                           const struct peer_request * req );
1296
1297static int
1298readBtPiece( tr_peermsgs      * msgs,
1299             struct evbuffer  * inbuf,
1300             size_t             inlen,
1301             size_t           * setme_piece_bytes_read )
1302{
1303    struct peer_request * req = &msgs->incoming.blockReq;
1304
1305    assert( evbuffer_get_length( inbuf ) >= inlen );
1306    dbgmsg( msgs, "In readBtPiece" );
1307
1308    if( !req->length )
1309    {
1310        if( inlen < 8 )
1311            return READ_LATER;
1312
1313        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1314        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1315        req->length = msgs->incoming.length - 9;
1316        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1317        return READ_NOW;
1318    }
1319    else
1320    {
1321        int err;
1322
1323        /* read in another chunk of data */
1324        const size_t nLeft = req->length - evbuffer_get_length( msgs->incoming.block );
1325        size_t n = MIN( nLeft, inlen );
1326        size_t i = n;
1327        void * buf = tr_sessionGetBuffer( getSession( msgs ) );
1328        const size_t buflen = SESSION_BUFFER_SIZE;
1329
1330        while( i > 0 )
1331        {
1332            const size_t thisPass = MIN( i, buflen );
1333            tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, thisPass );
1334            evbuffer_add( msgs->incoming.block, buf, thisPass );
1335            i -= thisPass;
1336        }
1337
1338        tr_sessionReleaseBuffer( getSession( msgs ) );
1339        buf = NULL;
1340
1341        fireClientGotData( msgs, n, TRUE );
1342        *setme_piece_bytes_read += n;
1343        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1344               n, req->index, req->offset, req->length,
1345               (int)( req->length - evbuffer_get_length( msgs->incoming.block ) ) );
1346        if( evbuffer_get_length( msgs->incoming.block ) < req->length )
1347            return READ_LATER;
1348
1349        /* we've got the whole block ... process it */
1350        err = clientGotBlock( msgs, msgs->incoming.block, req );
1351
1352        /* cleanup */
1353        evbuffer_free( msgs->incoming.block );
1354        msgs->incoming.block = evbuffer_new( );
1355        req->length = 0;
1356        msgs->state = AWAITING_BT_LENGTH;
1357        return err ? READ_ERR : READ_NOW;
1358    }
1359}
1360
1361static void updateDesiredRequestCount( tr_peermsgs * msgs );
1362
1363static int
1364readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1365{
1366    uint32_t      ui32;
1367    uint32_t      msglen = msgs->incoming.length;
1368    const uint8_t id = msgs->incoming.id;
1369#ifndef NDEBUG
1370    const size_t  startBufLen = evbuffer_get_length( inbuf );
1371#endif
1372    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1373
1374    --msglen; /* id length */
1375
1376    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1377
1378    if( inlen < msglen )
1379        return READ_LATER;
1380
1381    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1382    {
1383        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1384        fireError( msgs, EMSGSIZE );
1385        return READ_ERR;
1386    }
1387
1388    switch( id )
1389    {
1390        case BT_CHOKE:
1391            dbgmsg( msgs, "got Choke" );
1392            msgs->peer->clientIsChoked = 1;
1393            if( !fext )
1394                fireGotChoke( msgs );
1395            break;
1396
1397        case BT_UNCHOKE:
1398            dbgmsg( msgs, "got Unchoke" );
1399            msgs->peer->clientIsChoked = 0;
1400            updateDesiredRequestCount( msgs );
1401            break;
1402
1403        case BT_INTERESTED:
1404            dbgmsg( msgs, "got Interested" );
1405            msgs->peer->peerIsInterested = 1;
1406            break;
1407
1408        case BT_NOT_INTERESTED:
1409            dbgmsg( msgs, "got Not Interested" );
1410            msgs->peer->peerIsInterested = 0;
1411            break;
1412
1413        case BT_HAVE:
1414            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1415            dbgmsg( msgs, "got Have: %u", ui32 );
1416            if( tr_torrentHasMetadata( msgs->torrent )
1417                    && ( ui32 >= msgs->torrent->info.pieceCount ) )
1418            {
1419                fireError( msgs, ERANGE );
1420                return READ_ERR;
1421            }
1422
1423            /* a peer can send the same HAVE message twice... */
1424            if( !tr_bitsetHas( &msgs->peer->have, ui32 ) ) {
1425                tr_bitsetAdd( &msgs->peer->have, ui32 );
1426                fireClientGotHave( msgs, ui32 );
1427            }
1428            updatePeerProgress( msgs );
1429            break;
1430
1431        case BT_BITFIELD: {
1432            tr_bitfield tmp = TR_BITFIELD_INIT;
1433            const size_t bitCount = tr_torrentHasMetadata( msgs->torrent )
1434                                  ? msgs->torrent->info.pieceCount
1435                                  : msglen * 8;
1436            tr_bitfieldConstruct( &tmp, bitCount );
1437            dbgmsg( msgs, "got a bitfield" );
1438            tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp.bits, msglen );
1439            tr_bitsetSetBitfield( &msgs->peer->have, &tmp );
1440            fireClientGotBitfield( msgs, &tmp );
1441            tr_bitfieldDestruct( &tmp );
1442            updatePeerProgress( msgs );
1443            break;
1444        }
1445
1446        case BT_REQUEST:
1447        {
1448            struct peer_request r;
1449            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1450            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1451            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1452            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1453            peerMadeRequest( msgs, &r );
1454            break;
1455        }
1456
1457        case BT_CANCEL:
1458        {
1459            int i;
1460            struct peer_request r;
1461            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1462            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1463            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1464            tr_historyAdd( &msgs->peer->cancelsSentToClient, tr_time( ), 1 );
1465            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1466
1467            for( i=0; i<msgs->peer->pendingReqsToClient; ++i ) {
1468                const struct peer_request * req = msgs->peerAskedFor + i;
1469                if( ( req->index == r.index ) && ( req->offset == r.offset ) && ( req->length == r.length ) )
1470                    break;
1471            }
1472
1473            if( i < msgs->peer->pendingReqsToClient )
1474                tr_removeElementFromArray( msgs->peerAskedFor, i, sizeof( struct peer_request ),
1475                                           msgs->peer->pendingReqsToClient-- );
1476            break;
1477        }
1478
1479        case BT_PIECE:
1480            assert( 0 ); /* handled elsewhere! */
1481            break;
1482
1483        case BT_PORT:
1484            dbgmsg( msgs, "Got a BT_PORT" );
1485            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->dht_port );
1486            if( msgs->peer->dht_port > 0 )
1487                tr_dhtAddNode( getSession(msgs),
1488                               tr_peerAddress( msgs->peer ),
1489                               msgs->peer->dht_port, 0 );
1490            break;
1491
1492        case BT_FEXT_SUGGEST:
1493            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1494            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1495            if( fext )
1496                fireClientGotSuggest( msgs, ui32 );
1497            else {
1498                fireError( msgs, EMSGSIZE );
1499                return READ_ERR;
1500            }
1501            break;
1502
1503        case BT_FEXT_ALLOWED_FAST:
1504            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1505            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1506            if( fext )
1507                fireClientGotAllowedFast( msgs, ui32 );
1508            else {
1509                fireError( msgs, EMSGSIZE );
1510                return READ_ERR;
1511            }
1512            break;
1513
1514        case BT_FEXT_HAVE_ALL:
1515            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1516            if( fext ) {
1517                tr_bitsetSetHaveAll( &msgs->peer->have );
1518                fireClientGotHaveAll( msgs );
1519                updatePeerProgress( msgs );
1520            } else {
1521                fireError( msgs, EMSGSIZE );
1522                return READ_ERR;
1523            }
1524            break;
1525
1526        case BT_FEXT_HAVE_NONE:
1527            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1528            if( fext ) {
1529                tr_bitsetSetHaveNone( &msgs->peer->have );
1530                fireClientGotHaveNone( msgs );
1531                updatePeerProgress( msgs );
1532            } else {
1533                fireError( msgs, EMSGSIZE );
1534                return READ_ERR;
1535            }
1536            break;
1537
1538        case BT_FEXT_REJECT:
1539        {
1540            struct peer_request r;
1541            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1542            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1543            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1544            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1545            if( fext )
1546                fireGotRej( msgs, &r );
1547            else {
1548                fireError( msgs, EMSGSIZE );
1549                return READ_ERR;
1550            }
1551            break;
1552        }
1553
1554        case BT_LTEP:
1555            dbgmsg( msgs, "Got a BT_LTEP" );
1556            parseLtep( msgs, msglen, inbuf );
1557            break;
1558
1559        default:
1560            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1561            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1562            break;
1563    }
1564
1565    assert( msglen + 1 == msgs->incoming.length );
1566    assert( evbuffer_get_length( inbuf ) == startBufLen - msglen );
1567
1568    msgs->state = AWAITING_BT_LENGTH;
1569    return READ_NOW;
1570}
1571
1572static void
1573addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1574{
1575    if( !msgs->peer->blame )
1576         msgs->peer->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1577    tr_bitfieldAdd( msgs->peer->blame, index );
1578}
1579
1580/* returns 0 on success, or an errno on failure */
1581static int
1582clientGotBlock( tr_peermsgs                * msgs,
1583                struct evbuffer            * data,
1584                const struct peer_request  * req )
1585{
1586    int err;
1587    tr_torrent * tor = msgs->torrent;
1588    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1589
1590    assert( msgs );
1591    assert( req );
1592
1593    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1594        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1595                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1596        return EMSGSIZE;
1597    }
1598
1599    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1600
1601    if( !tr_peerMgrDidPeerRequest( msgs->torrent, msgs->peer, block ) ) {
1602        dbgmsg( msgs, "we didn't ask for this message..." );
1603        return 0;
1604    }
1605    if( tr_cpPieceIsComplete( &msgs->torrent->completion, req->index ) ) {
1606        dbgmsg( msgs, "we did ask for this message, but the piece is already complete..." );
1607        return 0;
1608    }
1609
1610    /**
1611    ***  Save the block
1612    **/
1613
1614    if(( err = tr_cacheWriteBlock( getSession(msgs)->cache, tor, req->index, req->offset, req->length, data )))
1615        return err;
1616
1617    addPeerToBlamefield( msgs, req->index );
1618    fireGotBlock( msgs, req );
1619    return 0;
1620}
1621
1622static int peerPulse( void * vmsgs );
1623
1624static void
1625didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1626{
1627    tr_peermsgs * msgs = vmsgs;
1628    firePeerGotData( msgs, bytesWritten, wasPieceData );
1629
1630    if ( tr_isPeerIo( io ) && io->userData )
1631        peerPulse( msgs );
1632}
1633
1634static ReadState
1635canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1636{
1637    ReadState         ret;
1638    tr_peermsgs *     msgs = vmsgs;
1639    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1640    const size_t      inlen = evbuffer_get_length( in );
1641
1642    dbgmsg( msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state );
1643
1644    if( !inlen )
1645    {
1646        ret = READ_LATER;
1647    }
1648    else if( msgs->state == AWAITING_BT_PIECE )
1649    {
1650        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1651    }
1652    else switch( msgs->state )
1653    {
1654        case AWAITING_BT_LENGTH:
1655            ret = readBtLength ( msgs, in, inlen ); break;
1656
1657        case AWAITING_BT_ID:
1658            ret = readBtId     ( msgs, in, inlen ); break;
1659
1660        case AWAITING_BT_MESSAGE:
1661            ret = readBtMessage( msgs, in, inlen ); break;
1662
1663        default:
1664            ret = READ_ERR;
1665            assert( 0 );
1666    }
1667
1668    dbgmsg( msgs, "canRead: ret is %d", (int)ret );
1669
1670    /* log the raw data that was read */
1671    if( ( ret != READ_ERR ) && ( evbuffer_get_length( in ) != inlen ) )
1672        fireClientGotData( msgs, inlen - evbuffer_get_length( in ), FALSE );
1673
1674    return ret;
1675}
1676
1677int
1678tr_peerMsgsIsReadingBlock( const tr_peermsgs * msgs, tr_block_index_t block )
1679{
1680    if( msgs->state != AWAITING_BT_PIECE )
1681        return FALSE;
1682
1683    return block == _tr_block( msgs->torrent,
1684                               msgs->incoming.blockReq.index,
1685                               msgs->incoming.blockReq.offset );
1686}
1687
1688/**
1689***
1690**/
1691
1692static void
1693updateDesiredRequestCount( tr_peermsgs * msgs )
1694{
1695    const tr_torrent * const torrent = msgs->torrent;
1696
1697    if( tr_torrentIsSeed( msgs->torrent ) )
1698    {
1699        msgs->desiredRequestCount = 0;
1700    }
1701    else if( msgs->peer->clientIsChoked )
1702    {
1703        msgs->desiredRequestCount = 0;
1704    }
1705    else if( !msgs->peer->clientIsInterested )
1706    {
1707        msgs->desiredRequestCount = 0;
1708    }
1709    else
1710    {
1711        int estimatedBlocksInPeriod;
1712        int rate_Bps;
1713        int irate_Bps;
1714        const int floor = 4;
1715        const int seconds = REQUEST_BUF_SECS;
1716        const uint64_t now = tr_time_msec( );
1717
1718        /* Get the rate limit we should use.
1719         * FIXME: this needs to consider all the other peers as well... */
1720        rate_Bps = tr_peerGetPieceSpeed_Bps( msgs->peer, now, TR_PEER_TO_CLIENT );
1721        if( tr_torrentUsesSpeedLimit( torrent, TR_PEER_TO_CLIENT ) )
1722            rate_Bps = MIN( rate_Bps, tr_torrentGetSpeedLimit_Bps( torrent, TR_PEER_TO_CLIENT ) );
1723
1724        /* honor the session limits, if enabled */
1725        if( tr_torrentUsesSessionLimits( torrent ) )
1726            if( tr_sessionGetActiveSpeedLimit_Bps( torrent->session, TR_PEER_TO_CLIENT, &irate_Bps ) )
1727                rate_Bps = MIN( rate_Bps, irate_Bps );
1728
1729        /* use this desired rate to figure out how
1730         * many requests we should send to this peer */
1731        estimatedBlocksInPeriod = ( rate_Bps * seconds ) / torrent->blockSize;
1732        msgs->desiredRequestCount = MAX( floor, estimatedBlocksInPeriod );
1733
1734        /* honor the peer's maximum request count, if specified */
1735        if( msgs->reqq > 0 )
1736            if( msgs->desiredRequestCount > msgs->reqq )
1737                msgs->desiredRequestCount = msgs->reqq;
1738    }
1739}
1740
1741static void
1742updateMetadataRequests( tr_peermsgs * msgs, time_t now )
1743{
1744    int piece;
1745
1746    if( msgs->peerSupportsMetadataXfer
1747        && tr_torrentGetNextMetadataRequest( msgs->torrent, now, &piece ) )
1748    {
1749        tr_benc tmp;
1750        int payloadLen;
1751        char * payload;
1752        struct evbuffer * out = msgs->outMessages;
1753
1754        /* build the data message */
1755        tr_bencInitDict( &tmp, 3 );
1756        tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REQUEST );
1757        tr_bencDictAddInt( &tmp, "piece", piece );
1758        payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1759        tr_bencFree( &tmp );
1760
1761        dbgmsg( msgs, "requesting metadata piece #%d", piece );
1762
1763        /* write it out as a LTEP message to our outMessages buffer */
1764        evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen );
1765        evbuffer_add_uint8 ( out, BT_LTEP );
1766        evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1767        evbuffer_add       ( out, payload, payloadLen );
1768        pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1769        dbgOutMessageLen( msgs );
1770
1771        tr_free( payload );
1772    }
1773}
1774
1775static void
1776updateBlockRequests( tr_peermsgs * msgs )
1777{
1778    if( tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT )
1779        && ( msgs->desiredRequestCount > 0 )
1780        && ( msgs->peer->pendingReqsToPeer <= ( msgs->desiredRequestCount * 0.66 ) ) )
1781    {
1782        int i;
1783        int n;
1784        const int numwant = msgs->desiredRequestCount - msgs->peer->pendingReqsToPeer;
1785        tr_block_index_t * blocks = tr_new( tr_block_index_t, numwant );
1786
1787        tr_peerMgrGetNextRequests( msgs->torrent, msgs->peer, numwant, blocks, &n );
1788
1789        for( i=0; i<n; ++i )
1790        {
1791            struct peer_request req;
1792            blockToReq( msgs->torrent, blocks[i], &req );
1793            protocolSendRequest( msgs, &req );
1794        }
1795
1796        tr_free( blocks );
1797    }
1798}
1799
1800static size_t
1801fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1802{
1803    int piece;
1804    size_t bytesWritten = 0;
1805    struct peer_request req;
1806    const tr_bool haveMessages = evbuffer_get_length( msgs->outMessages ) != 0;
1807    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1808
1809    /**
1810    ***  Protocol messages
1811    **/
1812
1813    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1814    {
1815        dbgmsg( msgs, "started an outMessages batch (length is %zu)", evbuffer_get_length( msgs->outMessages ) );
1816        msgs->outMessagesBatchedAt = now;
1817    }
1818    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1819    {
1820        const size_t len = evbuffer_get_length( msgs->outMessages );
1821        /* flush the protocol messages */
1822        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1823        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, FALSE );
1824        msgs->clientSentAnythingAt = now;
1825        msgs->outMessagesBatchedAt = 0;
1826        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1827        bytesWritten +=  len;
1828    }
1829
1830    /**
1831    ***  Metadata Pieces
1832    **/
1833
1834    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= METADATA_PIECE_SIZE )
1835        && popNextMetadataRequest( msgs, &piece ) )
1836    {
1837        char * data;
1838        int dataLen;
1839        tr_bool ok = FALSE;
1840
1841        data = tr_torrentGetMetadataPiece( msgs->torrent, piece, &dataLen );
1842        if( ( dataLen > 0 ) && ( data != NULL ) )
1843        {
1844            tr_benc tmp;
1845            int payloadLen;
1846            char * payload;
1847            struct evbuffer * out = msgs->outMessages;
1848
1849            /* build the data message */
1850            tr_bencInitDict( &tmp, 3 );
1851            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_DATA );
1852            tr_bencDictAddInt( &tmp, "piece", piece );
1853            tr_bencDictAddInt( &tmp, "total_size", msgs->torrent->infoDictLength );
1854            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1855            tr_bencFree( &tmp );
1856
1857            /* write it out as a LTEP message to our outMessages buffer */
1858            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen + dataLen );
1859            evbuffer_add_uint8 ( out, BT_LTEP );
1860            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1861            evbuffer_add       ( out, payload, payloadLen );
1862            evbuffer_add       ( out, data, dataLen );
1863            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1864            dbgOutMessageLen( msgs );
1865
1866            tr_free( payload );
1867            tr_free( data );
1868
1869            ok = TRUE;
1870        }
1871
1872        if( !ok ) /* send a rejection message */
1873        {
1874            tr_benc tmp;
1875            int payloadLen;
1876            char * payload;
1877            struct evbuffer * out = msgs->outMessages;
1878
1879            /* build the rejection message */
1880            tr_bencInitDict( &tmp, 2 );
1881            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1882            tr_bencDictAddInt( &tmp, "piece", piece );
1883            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1884            tr_bencFree( &tmp );
1885
1886            /* write it out as a LTEP message to our outMessages buffer */
1887            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen );
1888            evbuffer_add_uint8 ( out, BT_LTEP );
1889            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1890            evbuffer_add       ( out, payload, payloadLen );
1891            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1892            dbgOutMessageLen( msgs );
1893
1894            tr_free( payload );
1895        }
1896    }
1897
1898    /**
1899    ***  Data Blocks
1900    **/
1901
1902    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= msgs->torrent->blockSize )
1903        && popNextRequest( msgs, &req ) )
1904    {
1905        --msgs->prefetchCount;
1906
1907        if( requestIsValid( msgs, &req )
1908            && tr_cpPieceIsComplete( &msgs->torrent->completion, req.index ) )
1909        {
1910            int err;
1911            const uint32_t msglen = 4 + 1 + 4 + 4 + req.length;
1912            struct evbuffer * out;
1913            struct evbuffer_iovec iovec[1];
1914
1915            out = evbuffer_new( );
1916            evbuffer_expand( out, msglen );
1917
1918            evbuffer_add_uint32( out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1919            evbuffer_add_uint8 ( out, BT_PIECE );
1920            evbuffer_add_uint32( out, req.index );
1921            evbuffer_add_uint32( out, req.offset );
1922
1923            evbuffer_reserve_space( out, req.length, iovec, 1 );
1924            err = tr_cacheReadBlock( getSession(msgs)->cache, msgs->torrent, req.index, req.offset, req.length, iovec[0].iov_base );
1925            iovec[0].iov_len = req.length;
1926            evbuffer_commit_space( out, iovec, 1 );
1927
1928            /* check the piece if it needs checking... */
1929            if( !err && tr_torrentPieceNeedsCheck( msgs->torrent, req.index ) )
1930                if(( err = !tr_torrentCheckPiece( msgs->torrent, req.index )))
1931                    tr_torrentSetLocalError( msgs->torrent, _( "Please Verify Local Data! Piece #%zu is corrupt." ), (size_t)req.index );
1932
1933            if( err )
1934            {
1935                if( fext )
1936                    protocolSendReject( msgs, &req );
1937            }
1938            else
1939            {
1940                const size_t n = evbuffer_get_length( out );
1941                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1942                assert( n == msglen );
1943                tr_peerIoWriteBuf( msgs->peer->io, out, TRUE );
1944                bytesWritten += n;
1945                msgs->clientSentAnythingAt = now;
1946                tr_historyAdd( &msgs->peer->blocksSentToPeer, tr_time( ), 1 );
1947            }
1948
1949            evbuffer_free( out );
1950
1951            if( err )
1952            {
1953                bytesWritten = 0;
1954                msgs = NULL;
1955            }
1956        }
1957        else if( fext ) /* peer needs a reject message */
1958        {
1959            protocolSendReject( msgs, &req );
1960        }
1961
1962        if( msgs != NULL )
1963            prefetchPieces( msgs );
1964    }
1965
1966    /**
1967    ***  Keepalive
1968    **/
1969
1970    if( ( msgs != NULL )
1971        && ( msgs->clientSentAnythingAt != 0 )
1972        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1973    {
1974        dbgmsg( msgs, "sending a keepalive message" );
1975        evbuffer_add_uint32( msgs->outMessages, 0 );
1976        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1977    }
1978
1979    return bytesWritten;
1980}
1981
1982static int
1983peerPulse( void * vmsgs )
1984{
1985    tr_peermsgs * msgs = vmsgs;
1986    const time_t  now = tr_time( );
1987
1988    if ( tr_isPeerIo( msgs->peer->io ) ) {
1989        updateDesiredRequestCount( msgs );
1990        updateBlockRequests( msgs );
1991        updateMetadataRequests( msgs, now );
1992    }
1993
1994    for( ;; )
1995        if( fillOutputBuffer( msgs, now ) < 1 )
1996            break;
1997
1998    return TRUE; /* loop forever */
1999}
2000
2001void
2002tr_peerMsgsPulse( tr_peermsgs * msgs )
2003{
2004    if( msgs != NULL )
2005        peerPulse( msgs );
2006}
2007
2008static void
2009gotError( tr_peerIo * io UNUSED, short what, void * vmsgs )
2010{
2011    if( what & BEV_EVENT_TIMEOUT )
2012        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
2013    if( what & ( BEV_EVENT_EOF | BEV_EVENT_ERROR ) )
2014        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
2015               what, errno, tr_strerror( errno ) );
2016    fireError( vmsgs, ENOTCONN );
2017}
2018
2019static void
2020sendBitfield( tr_peermsgs * msgs )
2021{
2022    struct evbuffer * out = msgs->outMessages;
2023    tr_bitfield * bf = tr_cpCreatePieceBitfield( &msgs->torrent->completion );
2024
2025    evbuffer_add_uint32( out, sizeof( uint8_t ) + bf->byteCount );
2026    evbuffer_add_uint8 ( out, BT_BITFIELD );
2027    evbuffer_add       ( out, bf->bits, bf->byteCount );
2028    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu", evbuffer_get_length( out ) );
2029    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
2030
2031    tr_bitfieldFree( bf );
2032}
2033
2034static void
2035tellPeerWhatWeHave( tr_peermsgs * msgs )
2036{
2037    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
2038
2039    if( fext && ( tr_cpBlockBitset( &msgs->torrent->completion )->haveAll ) )
2040    {
2041        protocolSendHaveAll( msgs );
2042    }
2043    else if( fext && ( tr_cpBlockBitset( &msgs->torrent->completion )->haveNone ) )
2044    {
2045        protocolSendHaveNone( msgs );
2046    }
2047    else
2048    {
2049        sendBitfield( msgs );
2050    }
2051}
2052
2053/**
2054***
2055**/
2056
2057/* some peers give us error messages if we send
2058   more than this many peers in a single pex message
2059   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
2060#define MAX_PEX_ADDED 50
2061#define MAX_PEX_DROPPED 50
2062
2063typedef struct
2064{
2065    tr_pex *  added;
2066    tr_pex *  dropped;
2067    tr_pex *  elements;
2068    int       addedCount;
2069    int       droppedCount;
2070    int       elementCount;
2071}
2072PexDiffs;
2073
2074static void
2075pexAddedCb( void * vpex, void * userData )
2076{
2077    PexDiffs * diffs = userData;
2078    tr_pex *   pex = vpex;
2079
2080    if( diffs->addedCount < MAX_PEX_ADDED )
2081    {
2082        diffs->added[diffs->addedCount++] = *pex;
2083        diffs->elements[diffs->elementCount++] = *pex;
2084    }
2085}
2086
2087static inline void
2088pexDroppedCb( void * vpex, void * userData )
2089{
2090    PexDiffs * diffs = userData;
2091    tr_pex *   pex = vpex;
2092
2093    if( diffs->droppedCount < MAX_PEX_DROPPED )
2094    {
2095        diffs->dropped[diffs->droppedCount++] = *pex;
2096    }
2097}
2098
2099static inline void
2100pexElementCb( void * vpex, void * userData )
2101{
2102    PexDiffs * diffs = userData;
2103    tr_pex * pex = vpex;
2104
2105    diffs->elements[diffs->elementCount++] = *pex;
2106}
2107
2108static void
2109sendPex( tr_peermsgs * msgs )
2110{
2111    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2112    {
2113        PexDiffs diffs;
2114        PexDiffs diffs6;
2115        tr_pex * newPex = NULL;
2116        tr_pex * newPex6 = NULL;
2117        const int newCount = tr_peerMgrGetPeers( msgs->torrent, &newPex, TR_AF_INET, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2118        const int newCount6 = tr_peerMgrGetPeers( msgs->torrent, &newPex6, TR_AF_INET6, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2119
2120        /* build the diffs */
2121        diffs.added = tr_new( tr_pex, newCount );
2122        diffs.addedCount = 0;
2123        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2124        diffs.droppedCount = 0;
2125        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2126        diffs.elementCount = 0;
2127        tr_set_compare( msgs->pex, msgs->pexCount,
2128                        newPex, newCount,
2129                        tr_pexCompare, sizeof( tr_pex ),
2130                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2131        diffs6.added = tr_new( tr_pex, newCount6 );
2132        diffs6.addedCount = 0;
2133        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
2134        diffs6.droppedCount = 0;
2135        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
2136        diffs6.elementCount = 0;
2137        tr_set_compare( msgs->pex6, msgs->pexCount6,
2138                        newPex6, newCount6,
2139                        tr_pexCompare, sizeof( tr_pex ),
2140                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
2141        dbgmsg(
2142            msgs,
2143            "pex: old peer count %d+%d, new peer count %d+%d, "
2144            "added %d+%d, removed %d+%d",
2145            msgs->pexCount, msgs->pexCount6, newCount, newCount6,
2146            diffs.addedCount, diffs6.addedCount,
2147            diffs.droppedCount, diffs6.droppedCount );
2148
2149        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2150            !diffs6.droppedCount )
2151        {
2152            tr_free( diffs.elements );
2153            tr_free( diffs6.elements );
2154        }
2155        else
2156        {
2157            int  i;
2158            tr_benc val;
2159            char * benc;
2160            int bencLen;
2161            uint8_t * tmp, *walk;
2162            struct evbuffer * out = msgs->outMessages;
2163
2164            /* update peer */
2165            tr_free( msgs->pex );
2166            msgs->pex = diffs.elements;
2167            msgs->pexCount = diffs.elementCount;
2168            tr_free( msgs->pex6 );
2169            msgs->pex6 = diffs6.elements;
2170            msgs->pexCount6 = diffs6.elementCount;
2171
2172            /* build the pex payload */
2173            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
2174                                         * speed vs. likelihood? */
2175
2176            if( diffs.addedCount > 0)
2177            {
2178                /* "added" */
2179                tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2180                for( i = 0; i < diffs.addedCount; ++i ) {
2181                    memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2182                    memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2183                }
2184                assert( ( walk - tmp ) == diffs.addedCount * 6 );
2185                tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2186                tr_free( tmp );
2187
2188                /* "added.f"
2189                 * unset each holepunch flag because we don't support it. */
2190                tmp = walk = tr_new( uint8_t, diffs.addedCount );
2191                for( i = 0; i < diffs.addedCount; ++i )
2192                    *walk++ = diffs.added[i].flags & ~ADDED_F_HOLEPUNCH;
2193                assert( ( walk - tmp ) == diffs.addedCount );
2194                tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2195                tr_free( tmp );
2196            }
2197
2198            if( diffs.droppedCount > 0 )
2199            {
2200                /* "dropped" */
2201                tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2202                for( i = 0; i < diffs.droppedCount; ++i ) {
2203                    memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2204                    memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2205                }
2206                assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2207                tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2208                tr_free( tmp );
2209            }
2210
2211            if( diffs6.addedCount > 0 )
2212            {
2213                /* "added6" */
2214                tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2215                for( i = 0; i < diffs6.addedCount; ++i ) {
2216                    memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2217                    walk += 16;
2218                    memcpy( walk, &diffs6.added[i].port, 2 );
2219                    walk += 2;
2220                }
2221                assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2222                tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2223                tr_free( tmp );
2224
2225                /* "added6.f"
2226                 * unset each holepunch flag because we don't support it. */
2227                tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2228                for( i = 0; i < diffs6.addedCount; ++i )
2229                    *walk++ = diffs6.added[i].flags & ~ADDED_F_HOLEPUNCH;
2230                assert( ( walk - tmp ) == diffs6.addedCount );
2231                tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2232                tr_free( tmp );
2233            }
2234
2235            if( diffs6.droppedCount > 0 )
2236            {
2237                /* "dropped6" */
2238                tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2239                for( i = 0; i < diffs6.droppedCount; ++i ) {
2240                    memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2241                    walk += 16;
2242                    memcpy( walk, &diffs6.dropped[i].port, 2 );
2243                    walk += 2;
2244                }
2245                assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2246                tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2247                tr_free( tmp );
2248            }
2249
2250            /* write the pex message */
2251            benc = tr_bencToStr( &val, TR_FMT_BENC, &bencLen );
2252            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + bencLen );
2253            evbuffer_add_uint8 ( out, BT_LTEP );
2254            evbuffer_add_uint8 ( out, msgs->ut_pex_id );
2255            evbuffer_add       ( out, benc, bencLen );
2256            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2257            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", evbuffer_get_length( out ) );
2258            dbgOutMessageLen( msgs );
2259
2260            tr_free( benc );
2261            tr_bencFree( &val );
2262        }
2263
2264        /* cleanup */
2265        tr_free( diffs.added );
2266        tr_free( diffs.dropped );
2267        tr_free( newPex );
2268        tr_free( diffs6.added );
2269        tr_free( diffs6.dropped );
2270        tr_free( newPex6 );
2271
2272        /*msgs->clientSentPexAt = tr_time( );*/
2273    }
2274}
2275
2276static void
2277pexPulse( int foo UNUSED, short bar UNUSED, void * vmsgs )
2278{
2279    struct tr_peermsgs * msgs = vmsgs;
2280
2281    sendPex( msgs );
2282
2283    tr_timerAdd( msgs->pexTimer, PEX_INTERVAL_SECS, 0 );
2284}
2285
2286/**
2287***
2288**/
2289
2290tr_peermsgs*
2291tr_peerMsgsNew( struct tr_torrent    * torrent,
2292                struct tr_peer       * peer,
2293                tr_peer_callback     * callback,
2294                void                 * callbackData )
2295{
2296    tr_peermsgs * m;
2297
2298    assert( peer );
2299    assert( peer->io );
2300
2301    m = tr_new0( tr_peermsgs, 1 );
2302    m->callback = callback;
2303    m->callbackData = callbackData;
2304    m->peer = peer;
2305    m->torrent = torrent;
2306    m->peer->clientIsChoked = 1;
2307    m->peer->peerIsChoked = 1;
2308    m->peer->clientIsInterested = 0;
2309    m->peer->peerIsInterested = 0;
2310    m->state = AWAITING_BT_LENGTH;
2311    m->outMessages = evbuffer_new( );
2312    m->outMessagesBatchedAt = 0;
2313    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2314    m->incoming.block = evbuffer_new( );
2315    m->pexTimer = evtimer_new( torrent->session->event_base, pexPulse, m );
2316    peer->msgs = m;
2317    tr_timerAdd( m->pexTimer, PEX_INTERVAL_SECS, 0 );
2318
2319    if( tr_peerIoSupportsUTP( peer->io ) ) {
2320        const tr_address * addr = tr_peerIoGetAddress( peer->io, NULL );
2321        tr_peerMgrSetUtpSupported( torrent, addr );
2322        tr_peerMgrSetUtpFailed( torrent, addr, FALSE );
2323    }
2324
2325    if( tr_peerIoSupportsLTEP( peer->io ) )
2326        sendLtepHandshake( m );
2327
2328    tellPeerWhatWeHave( m );
2329
2330    if( tr_dhtEnabled( torrent->session ) && tr_peerIoSupportsDHT( peer->io ))
2331    {
2332        /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
2333        const struct tr_address *addr = tr_peerIoGetAddress( peer->io, NULL );
2334        if( addr->type == TR_AF_INET || tr_globalIPv6() ) {
2335            protocolSendPort( m, tr_dhtPort( torrent->session ) );
2336        }
2337    }
2338
2339    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2340    updateDesiredRequestCount( m );
2341
2342    return m;
2343}
2344
2345void
2346tr_peerMsgsFree( tr_peermsgs* msgs )
2347{
2348    if( msgs )
2349    {
2350        event_free( msgs->pexTimer );
2351
2352        evbuffer_free( msgs->incoming.block );
2353        evbuffer_free( msgs->outMessages );
2354        tr_free( msgs->pex6 );
2355        tr_free( msgs->pex );
2356
2357        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2358        tr_free( msgs );
2359    }
2360}
Note: See TracBrowser for help on using the repository browser.