source: trunk/libtransmission/peer-msgs.c @ 12065

Last change on this file since 12065 was 12065, checked in by jordan, 11 years ago

(trunk libT) #4078 "Better calculation of the bitfield length while still a magnet link" -- fixed.

  • Property svn:keywords set to Date Rev Author Id
File size: 70.5 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 12065 2011-03-02 15:00:12Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdarg.h>
17#include <stdio.h>
18#include <stdlib.h>
19#include <string.h>
20
21#include <event2/bufferevent.h>
22#include <event2/event.h>
23
24#include "transmission.h"
25#include "bencode.h"
26#include "cache.h"
27#include "completion.h"
28#include "crypto.h"
29#ifdef WIN32
30#include "net.h" /* for ECONN */
31#endif
32#include "peer-io.h"
33#include "peer-mgr.h"
34#include "peer-msgs.h"
35#include "session.h"
36#include "stats.h"
37#include "torrent.h"
38#include "torrent-magnet.h"
39#include "tr-dht.h"
40#include "utils.h"
41#include "version.h"
42
43/**
44***
45**/
46
47enum
48{
49    BT_CHOKE                = 0,
50    BT_UNCHOKE              = 1,
51    BT_INTERESTED           = 2,
52    BT_NOT_INTERESTED       = 3,
53    BT_HAVE                 = 4,
54    BT_BITFIELD             = 5,
55    BT_REQUEST              = 6,
56    BT_PIECE                = 7,
57    BT_CANCEL               = 8,
58    BT_PORT                 = 9,
59
60    BT_FEXT_SUGGEST         = 13,
61    BT_FEXT_HAVE_ALL        = 14,
62    BT_FEXT_HAVE_NONE       = 15,
63    BT_FEXT_REJECT          = 16,
64    BT_FEXT_ALLOWED_FAST    = 17,
65
66    BT_LTEP                 = 20,
67
68    LTEP_HANDSHAKE          = 0,
69
70    UT_PEX_ID               = 1,
71    UT_METADATA_ID          = 3,
72
73    MAX_PEX_PEER_COUNT      = 50,
74
75    MIN_CHOKE_PERIOD_SEC    = 10,
76
77    /* idle seconds before we send a keepalive */
78    KEEPALIVE_INTERVAL_SECS = 100,
79
80    PEX_INTERVAL_SECS       = 90, /* sec between sendPex() calls */
81
82    REQQ                    = 512,
83
84    METADATA_REQQ           = 64,
85
86    /* used in lowering the outMessages queue period */
87    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
88    HIGH_PRIORITY_INTERVAL_SECS = 2,
89    LOW_PRIORITY_INTERVAL_SECS = 10,
90
91    /* number of pieces we'll allow in our fast set */
92    MAX_FAST_SET_SIZE = 3,
93
94    /* defined in BEP #9 */
95    METADATA_MSG_TYPE_REQUEST = 0,
96    METADATA_MSG_TYPE_DATA = 1,
97    METADATA_MSG_TYPE_REJECT = 2
98};
99
100enum
101{
102    AWAITING_BT_LENGTH,
103    AWAITING_BT_ID,
104    AWAITING_BT_MESSAGE,
105    AWAITING_BT_PIECE
106};
107
108/**
109***
110**/
111
112struct peer_request
113{
114    uint32_t    index;
115    uint32_t    offset;
116    uint32_t    length;
117};
118
119static void
120blockToReq( const tr_torrent     * tor,
121            tr_block_index_t       block,
122            struct peer_request  * setme )
123{
124    tr_torrentGetBlockLocation( tor, block, &setme->index,
125                                            &setme->offset,
126                                            &setme->length );
127}
128
129/**
130***
131**/
132
133/* this is raw, unchanged data from the peer regarding
134 * the current message that it's sending us. */
135struct tr_incoming
136{
137    uint8_t                id;
138    uint32_t               length; /* includes the +1 for id length */
139    struct peer_request    blockReq; /* metadata for incoming blocks */
140    struct evbuffer *      block; /* piece data for incoming blocks */
141};
142
143/**
144 * Low-level communication state information about a connected peer.
145 *
146 * This structure remembers the low-level protocol states that we're
147 * in with this peer, such as active requests, pex messages, and so on.
148 * Its fields are all private to peer-msgs.c.
149 *
150 * Data not directly involved with sending & receiving messages is
151 * stored in tr_peer, where it can be accessed by both peermsgs and
152 * the peer manager.
153 *
154 * @see struct peer_atom
155 * @see tr_peer
156 */
157struct tr_peermsgs
158{
159    tr_bool         peerSupportsPex;
160    tr_bool         peerSupportsMetadataXfer;
161    tr_bool         clientSentLtepHandshake;
162    tr_bool         peerSentLtepHandshake;
163
164    /*tr_bool         haveFastSet;*/
165
166    int             desiredRequestCount;
167
168    int             prefetchCount;
169
170    /* how long the outMessages batch should be allowed to grow before
171     * it's flushed -- some messages (like requests >:) should be sent
172     * very quickly; others aren't as urgent. */
173    int8_t          outMessagesBatchPeriod;
174
175    uint8_t         state;
176    uint8_t         ut_pex_id;
177    uint8_t         ut_metadata_id;
178    uint16_t        pexCount;
179    uint16_t        pexCount6;
180
181    size_t          metadata_size_hint;
182#if 0
183    size_t                 fastsetSize;
184    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
185#endif
186
187    tr_peer *              peer;
188
189    tr_torrent *           torrent;
190
191    tr_peer_callback      * callback;
192    void                  * callbackData;
193
194    struct evbuffer *      outMessages; /* all the non-piece messages */
195
196    struct peer_request    peerAskedFor[REQQ];
197
198    int                    peerAskedForMetadata[METADATA_REQQ];
199    int                    peerAskedForMetadataCount;
200
201    tr_pex               * pex;
202    tr_pex               * pex6;
203
204    /*time_t                 clientSentPexAt;*/
205    time_t                 clientSentAnythingAt;
206
207    /* when we started batching the outMessages */
208    time_t                outMessagesBatchedAt;
209
210    struct tr_incoming    incoming;
211
212    /* if the peer supports the Extension Protocol in BEP 10 and
213       supplied a reqq argument, it's stored here. Otherwise, the
214       value is zero and should be ignored. */
215    int64_t               reqq;
216
217    struct event        * pexTimer;
218};
219
220/**
221***
222**/
223
224#if 0
225static tr_bitfield*
226getHave( const struct tr_peermsgs * msgs )
227{
228    if( msgs->peer->have == NULL )
229        msgs->peer->have = tr_bitfieldNew( msgs->torrent->info.pieceCount );
230    return msgs->peer->have;
231}
232#endif
233
234static inline tr_session*
235getSession( struct tr_peermsgs * msgs )
236{
237    return msgs->torrent->session;
238}
239
240/**
241***
242**/
243
244static void
245myDebug( const char * file, int line,
246         const struct tr_peermsgs * msgs,
247         const char * fmt, ... )
248{
249    FILE * fp = tr_getLog( );
250
251    if( fp )
252    {
253        va_list           args;
254        char              timestr[64];
255        struct evbuffer * buf = evbuffer_new( );
256        char *            base = tr_basename( file );
257
258        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
259                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
260                             tr_torrentName( msgs->torrent ),
261                             tr_peerIoGetAddrStr( msgs->peer->io ),
262                             msgs->peer->client );
263        va_start( args, fmt );
264        evbuffer_add_vprintf( buf, fmt, args );
265        va_end( args );
266        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
267        fputs( (const char*)evbuffer_pullup( buf, -1 ), fp );
268
269        tr_free( base );
270        evbuffer_free( buf );
271    }
272}
273
274#define dbgmsg( msgs, ... ) \
275    do { \
276        if( tr_deepLoggingIsActive( ) ) \
277            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
278    } while( 0 )
279
280/**
281***
282**/
283
284static void
285pokeBatchPeriod( tr_peermsgs * msgs,
286                 int           interval )
287{
288    if( msgs->outMessagesBatchPeriod > interval )
289    {
290        msgs->outMessagesBatchPeriod = interval;
291        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
292    }
293}
294
295static void
296dbgOutMessageLen( tr_peermsgs * msgs )
297{
298    dbgmsg( msgs, "outMessage size is now %zu", evbuffer_get_length( msgs->outMessages ) );
299}
300
301static void
302protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
303{
304    struct evbuffer * out = msgs->outMessages;
305
306    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
307
308    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
309    evbuffer_add_uint8 ( out, BT_FEXT_REJECT );
310    evbuffer_add_uint32( out, req->index );
311    evbuffer_add_uint32( out, req->offset );
312    evbuffer_add_uint32( out, req->length );
313
314    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
315    dbgOutMessageLen( msgs );
316}
317
318static void
319protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
320{
321    struct evbuffer * out = msgs->outMessages;
322
323    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
324    evbuffer_add_uint8 ( out, BT_REQUEST );
325    evbuffer_add_uint32( out, req->index );
326    evbuffer_add_uint32( out, req->offset );
327    evbuffer_add_uint32( out, req->length );
328
329    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
330    dbgOutMessageLen( msgs );
331    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
332}
333
334static void
335protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
336{
337    struct evbuffer * out = msgs->outMessages;
338
339    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
340    evbuffer_add_uint8 ( out, BT_CANCEL );
341    evbuffer_add_uint32( out, req->index );
342    evbuffer_add_uint32( out, req->offset );
343    evbuffer_add_uint32( out, req->length );
344
345    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
346    dbgOutMessageLen( msgs );
347    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
348}
349
350static void
351protocolSendPort(tr_peermsgs *msgs, uint16_t port)
352{
353    struct evbuffer * out = msgs->outMessages;
354
355    dbgmsg( msgs, "sending Port %u", port);
356    evbuffer_add_uint32( out, 3 );
357    evbuffer_add_uint8 ( out, BT_PORT );
358    evbuffer_add_uint16( out, port);
359}
360
361static void
362protocolSendHave( tr_peermsgs * msgs, uint32_t index )
363{
364    struct evbuffer * out = msgs->outMessages;
365
366    evbuffer_add_uint32( out, sizeof(uint8_t) + sizeof(uint32_t) );
367    evbuffer_add_uint8 ( out, BT_HAVE );
368    evbuffer_add_uint32( out, index );
369
370    dbgmsg( msgs, "sending Have %u", index );
371    dbgOutMessageLen( msgs );
372    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
373}
374
375#if 0
376static void
377protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
378{
379    tr_peerIo       * io  = msgs->peer->io;
380    struct evbuffer * out = msgs->outMessages;
381
382    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
383
384    evbuffer_add_uint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
385    evbuffer_add_uint8 ( io, out, BT_FEXT_ALLOWED_FAST );
386    evbuffer_add_uint32( io, out, pieceIndex );
387
388    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
389    dbgOutMessageLen( msgs );
390}
391#endif
392
393static void
394protocolSendChoke( tr_peermsgs * msgs, int choke )
395{
396    struct evbuffer * out = msgs->outMessages;
397
398    evbuffer_add_uint32( out, sizeof( uint8_t ) );
399    evbuffer_add_uint8 ( out, choke ? BT_CHOKE : BT_UNCHOKE );
400
401    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
402    dbgOutMessageLen( msgs );
403    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
404}
405
406static void
407protocolSendHaveAll( tr_peermsgs * msgs )
408{
409    struct evbuffer * out = msgs->outMessages;
410
411    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
412
413    evbuffer_add_uint32( out, sizeof( uint8_t ) );
414    evbuffer_add_uint8 ( out, BT_FEXT_HAVE_ALL );
415
416    dbgmsg( msgs, "sending HAVE_ALL..." );
417    dbgOutMessageLen( msgs );
418    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
419}
420
421static void
422protocolSendHaveNone( tr_peermsgs * msgs )
423{
424    struct evbuffer * out = msgs->outMessages;
425
426    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
427
428    evbuffer_add_uint32( out, sizeof( uint8_t ) );
429    evbuffer_add_uint8 ( out, BT_FEXT_HAVE_NONE );
430
431    dbgmsg( msgs, "sending HAVE_NONE..." );
432    dbgOutMessageLen( msgs );
433    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
434}
435
436/**
437***  EVENTS
438**/
439
440static void
441publish( tr_peermsgs * msgs, tr_peer_event * e )
442{
443    assert( msgs->peer );
444    assert( msgs->peer->msgs == msgs );
445
446    if( msgs->callback != NULL )
447        msgs->callback( msgs->peer, e, msgs->callbackData );
448}
449
450static void
451fireError( tr_peermsgs * msgs, int err )
452{
453    tr_peer_event e = TR_PEER_EVENT_INIT;
454    e.eventType = TR_PEER_ERROR;
455    e.err = err;
456    publish( msgs, &e );
457}
458
459static void
460fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
461{
462    tr_peer_event e = TR_PEER_EVENT_INIT;
463    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
464    e.pieceIndex = req->index;
465    e.offset = req->offset;
466    e.length = req->length;
467    publish( msgs, &e );
468}
469
470static void
471fireGotRej( tr_peermsgs * msgs, const struct peer_request * req )
472{
473    tr_peer_event e = TR_PEER_EVENT_INIT;
474    e.eventType = TR_PEER_CLIENT_GOT_REJ;
475    e.pieceIndex = req->index;
476    e.offset = req->offset;
477    e.length = req->length;
478    publish( msgs, &e );
479}
480
481static void
482fireGotChoke( tr_peermsgs * msgs )
483{
484    tr_peer_event e = TR_PEER_EVENT_INIT;
485    e.eventType = TR_PEER_CLIENT_GOT_CHOKE;
486    publish( msgs, &e );
487}
488
489static void
490fireClientGotHaveAll( tr_peermsgs * msgs )
491{
492    tr_peer_event e = TR_PEER_EVENT_INIT;
493    e.eventType = TR_PEER_CLIENT_GOT_HAVE_ALL;
494    publish( msgs, &e );
495}
496
497static void
498fireClientGotHaveNone( tr_peermsgs * msgs )
499{
500    tr_peer_event e = TR_PEER_EVENT_INIT;
501    e.eventType = TR_PEER_CLIENT_GOT_HAVE_NONE;
502    publish( msgs, &e );
503}
504
505static void
506fireClientGotData( tr_peermsgs * msgs,
507                   uint32_t      length,
508                   int           wasPieceData )
509{
510    tr_peer_event e = TR_PEER_EVENT_INIT;
511
512    e.length = length;
513    e.eventType = TR_PEER_CLIENT_GOT_DATA;
514    e.wasPieceData = wasPieceData;
515    publish( msgs, &e );
516}
517
518static void
519fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
520{
521    tr_peer_event e = TR_PEER_EVENT_INIT;
522    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
523    e.pieceIndex = pieceIndex;
524    publish( msgs, &e );
525}
526
527static void
528fireClientGotPort( tr_peermsgs * msgs, tr_port port )
529{
530    tr_peer_event e = TR_PEER_EVENT_INIT;
531    e.eventType = TR_PEER_CLIENT_GOT_PORT;
532    e.port = port;
533    publish( msgs, &e );
534}
535
536static void
537fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
538{
539    tr_peer_event e = TR_PEER_EVENT_INIT;
540    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
541    e.pieceIndex = pieceIndex;
542    publish( msgs, &e );
543}
544
545static void
546fireClientGotBitfield( tr_peermsgs * msgs, tr_bitfield * bitfield )
547{
548    tr_peer_event e = TR_PEER_EVENT_INIT;
549    e.eventType = TR_PEER_CLIENT_GOT_BITFIELD;
550    e.bitfield = bitfield;
551    publish( msgs, &e );
552}
553
554static void
555fireClientGotHave( tr_peermsgs * msgs, tr_piece_index_t index )
556{
557    tr_peer_event e = TR_PEER_EVENT_INIT;
558    e.eventType = TR_PEER_CLIENT_GOT_HAVE;
559    e.pieceIndex = index;
560    publish( msgs, &e );
561}
562
563static void
564firePeerGotData( tr_peermsgs  * msgs,
565                 uint32_t       length,
566                 int            wasPieceData )
567{
568    tr_peer_event e = TR_PEER_EVENT_INIT;
569
570    e.length = length;
571    e.eventType = TR_PEER_PEER_GOT_DATA;
572    e.wasPieceData = wasPieceData;
573
574    publish( msgs, &e );
575}
576
577/**
578***  ALLOWED FAST SET
579***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
580**/
581
582size_t
583tr_generateAllowedSet( tr_piece_index_t * setmePieces,
584                       size_t             desiredSetSize,
585                       size_t             pieceCount,
586                       const uint8_t    * infohash,
587                       const tr_address * addr )
588{
589    size_t setSize = 0;
590
591    assert( setmePieces );
592    assert( desiredSetSize <= pieceCount );
593    assert( desiredSetSize );
594    assert( pieceCount );
595    assert( infohash );
596    assert( addr );
597
598    if( addr->type == TR_AF_INET )
599    {
600        uint8_t w[SHA_DIGEST_LENGTH + 4], *walk=w;
601        uint8_t x[SHA_DIGEST_LENGTH];
602
603        uint32_t ui32 = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
604        memcpy( w, &ui32, sizeof( uint32_t ) );
605        walk += sizeof( uint32_t );
606        memcpy( walk, infohash, SHA_DIGEST_LENGTH );                 /* (2) */
607        walk += SHA_DIGEST_LENGTH;
608        tr_sha1( x, w, walk-w, NULL );                               /* (3) */
609        assert( sizeof( w ) == walk-w );
610
611        while( setSize<desiredSetSize )
612        {
613            int i;
614            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
615            {
616                size_t k;
617                uint32_t j = i * 4;                                  /* (5) */
618                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
619                uint32_t index = y % pieceCount;                     /* (7) */
620
621                for( k=0; k<setSize; ++k )                           /* (8) */
622                    if( setmePieces[k] == index )
623                        break;
624
625                if( k == setSize )
626                    setmePieces[setSize++] = index;                  /* (9) */
627            }
628
629            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
630        }
631    }
632
633    return setSize;
634}
635
636static void
637updateFastSet( tr_peermsgs * msgs UNUSED )
638{
639#if 0
640    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
641    const int peerIsNeedy = msgs->peer->progress < 0.10;
642
643    if( fext && peerIsNeedy && !msgs->haveFastSet )
644    {
645        size_t i;
646        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
647        const tr_info * inf = &msgs->torrent->info;
648        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
649
650        /* build the fast set */
651        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
652        msgs->haveFastSet = 1;
653
654        /* send it to the peer */
655        for( i=0; i<msgs->fastsetSize; ++i )
656            protocolSendAllowedFast( msgs, msgs->fastset[i] );
657    }
658#endif
659}
660
661/**
662***  INTEREST
663**/
664
665static void
666sendInterest( tr_peermsgs * msgs, tr_bool clientIsInterested )
667{
668    struct evbuffer * out = msgs->outMessages;
669
670    assert( msgs );
671    assert( tr_isBool( clientIsInterested ) );
672
673    msgs->peer->clientIsInterested = clientIsInterested;
674    dbgmsg( msgs, "Sending %s", clientIsInterested ? "Interested" : "Not Interested" );
675    evbuffer_add_uint32( out, sizeof( uint8_t ) );
676    evbuffer_add_uint8 ( out, clientIsInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
677
678    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
679    dbgOutMessageLen( msgs );
680}
681
682static void
683updateInterest( tr_peermsgs * msgs UNUSED )
684{
685    /* FIXME -- might need to poke the mgr on startup */
686}
687
688void
689tr_peerMsgsSetInterested( tr_peermsgs * msgs, int isInterested )
690{
691    assert( tr_isBool( isInterested ) );
692
693    if( isInterested != msgs->peer->clientIsInterested )
694        sendInterest( msgs, isInterested );
695}
696
697static tr_bool
698popNextMetadataRequest( tr_peermsgs * msgs, int * piece )
699{
700    if( msgs->peerAskedForMetadataCount == 0 )
701        return FALSE;
702
703    *piece = msgs->peerAskedForMetadata[0];
704
705    tr_removeElementFromArray( msgs->peerAskedForMetadata, 0, sizeof( int ),
706                               msgs->peerAskedForMetadataCount-- );
707
708    return TRUE;
709}
710
711static tr_bool
712popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
713{
714    if( msgs->peer->pendingReqsToClient == 0 )
715        return FALSE;
716
717    *setme = msgs->peerAskedFor[0];
718
719    tr_removeElementFromArray( msgs->peerAskedFor, 0, sizeof( struct peer_request ),
720                               msgs->peer->pendingReqsToClient-- );
721
722    return TRUE;
723}
724
725static void
726cancelAllRequestsToClient( tr_peermsgs * msgs )
727{
728    struct peer_request req;
729    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
730
731    while( popNextRequest( msgs, &req ))
732        if( mustSendCancel )
733            protocolSendReject( msgs, &req );
734}
735
736void
737tr_peerMsgsSetChoke( tr_peermsgs * msgs,
738                     int           choke )
739{
740    const time_t now = tr_time( );
741    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
742
743    assert( msgs );
744    assert( msgs->peer );
745    assert( choke == 0 || choke == 1 );
746
747    if( msgs->peer->chokeChangedAt > fibrillationTime )
748    {
749        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
750    }
751    else if( msgs->peer->peerIsChoked != choke )
752    {
753        msgs->peer->peerIsChoked = choke;
754        if( choke )
755            cancelAllRequestsToClient( msgs );
756        protocolSendChoke( msgs, choke );
757        msgs->peer->chokeChangedAt = now;
758    }
759}
760
761/**
762***
763**/
764
765void
766tr_peerMsgsHave( tr_peermsgs * msgs,
767                 uint32_t      index )
768{
769    protocolSendHave( msgs, index );
770
771    /* since we have more pieces now, we might not be interested in this peer */
772    updateInterest( msgs );
773}
774
775/**
776***
777**/
778
779static tr_bool
780reqIsValid( const tr_peermsgs * peer,
781            uint32_t            index,
782            uint32_t            offset,
783            uint32_t            length )
784{
785    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
786}
787
788static tr_bool
789requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
790{
791    return reqIsValid( msgs, req->index, req->offset, req->length );
792}
793
794void
795tr_peerMsgsCancel( tr_peermsgs * msgs, tr_block_index_t block )
796{
797    struct peer_request req;
798/*fprintf( stderr, "SENDING CANCEL MESSAGE FOR BLOCK %zu\n\t\tFROM PEER %p ------------------------------------\n", (size_t)block, msgs->peer );*/
799    blockToReq( msgs->torrent, block, &req );
800    protocolSendCancel( msgs, &req );
801}
802
803/**
804***
805**/
806
807static void
808sendLtepHandshake( tr_peermsgs * msgs )
809{
810    tr_benc val, *m;
811    char * buf;
812    int len;
813    tr_bool allow_pex;
814    tr_bool allow_metadata_xfer;
815    struct evbuffer * out = msgs->outMessages;
816    const unsigned char * ipv6 = tr_globalIPv6();
817
818    if( msgs->clientSentLtepHandshake )
819        return;
820
821    dbgmsg( msgs, "sending an ltep handshake" );
822    msgs->clientSentLtepHandshake = 1;
823
824    /* decide if we want to advertise metadata xfer support (BEP 9) */
825    if( tr_torrentIsPrivate( msgs->torrent ) )
826        allow_metadata_xfer = 0;
827    else
828        allow_metadata_xfer = 1;
829
830    /* decide if we want to advertise pex support */
831    if( !tr_torrentAllowsPex( msgs->torrent ) )
832        allow_pex = 0;
833    else if( msgs->peerSentLtepHandshake )
834        allow_pex = msgs->peerSupportsPex ? 1 : 0;
835    else
836        allow_pex = 1;
837
838    tr_bencInitDict( &val, 8 );
839    tr_bencDictAddInt( &val, "e", getSession(msgs)->encryptionMode != TR_CLEAR_PREFERRED );
840    if( ipv6 != NULL )
841        tr_bencDictAddRaw( &val, "ipv6", ipv6, 16 );
842    if( allow_metadata_xfer && tr_torrentHasMetadata( msgs->torrent )
843                            && ( msgs->torrent->infoDictLength > 0 ) )
844        tr_bencDictAddInt( &val, "metadata_size", msgs->torrent->infoDictLength );
845    tr_bencDictAddInt( &val, "p", tr_sessionGetPublicPeerPort( getSession(msgs) ) );
846    tr_bencDictAddInt( &val, "reqq", REQQ );
847    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
848    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
849    m  = tr_bencDictAddDict( &val, "m", 2 );
850    if( allow_metadata_xfer )
851        tr_bencDictAddInt( m, "ut_metadata", UT_METADATA_ID );
852    if( allow_pex )
853        tr_bencDictAddInt( m, "ut_pex", UT_PEX_ID );
854
855    buf = tr_bencToStr( &val, TR_FMT_BENC, &len );
856
857    evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + len );
858    evbuffer_add_uint8 ( out, BT_LTEP );
859    evbuffer_add_uint8 ( out, LTEP_HANDSHAKE );
860    evbuffer_add       ( out, buf, len );
861    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
862    dbgOutMessageLen( msgs );
863
864    /* cleanup */
865    tr_bencFree( &val );
866    tr_free( buf );
867}
868
869static void
870parseLtepHandshake( tr_peermsgs *     msgs,
871                    int               len,
872                    struct evbuffer * inbuf )
873{
874    int64_t   i;
875    tr_benc   val, * sub;
876    uint8_t * tmp = tr_new( uint8_t, len );
877    const uint8_t *addr;
878    size_t addr_len;
879    tr_pex pex;
880    int8_t seedProbability = -1;
881
882    memset( &pex, 0, sizeof( tr_pex ) );
883
884    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
885    msgs->peerSentLtepHandshake = 1;
886
887    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
888    {
889        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
890        tr_free( tmp );
891        return;
892    }
893
894    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
895
896    /* does the peer prefer encrypted connections? */
897    if( tr_bencDictFindInt( &val, "e", &i ) ) {
898        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
899                                              : ENCRYPTION_PREFERENCE_NO;
900        if( i )
901            pex.flags |= ADDED_F_ENCRYPTION_FLAG;
902    }
903
904    /* check supported messages for utorrent pex */
905    msgs->peerSupportsPex = 0;
906    msgs->peerSupportsMetadataXfer = 0;
907
908    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
909        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
910            msgs->peerSupportsPex = i != 0;
911            msgs->ut_pex_id = (uint8_t) i;
912            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
913        }
914        if( tr_bencDictFindInt( sub, "ut_metadata", &i ) ) {
915            msgs->peerSupportsMetadataXfer = i != 0;
916            msgs->ut_metadata_id = (uint8_t) i;
917            dbgmsg( msgs, "msgs->ut_metadata_id is %d", (int)msgs->ut_metadata_id );
918        }
919        if( tr_bencDictFindInt( sub, "ut_holepunch", &i ) ) {
920            /* Mysterious µTorrent extension that we don't grok.  However,
921               it implies support for µTP, so use it to indicate that. */
922            tr_peerMgrSetUtpFailed( msgs->torrent,
923                                    tr_peerIoGetAddress( msgs->peer->io, NULL ),
924                                    FALSE );
925        }
926    }
927
928    /* look for metainfo size (BEP 9) */
929    if( tr_bencDictFindInt( &val, "metadata_size", &i ) ) {
930        tr_torrentSetMetadataSizeHint( msgs->torrent, i );
931        msgs->metadata_size_hint = (size_t) i;
932    }
933
934    /* look for upload_only (BEP 21) */
935    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
936        seedProbability = i==0 ? 0 : 100;
937
938    /* get peer's listening port */
939    if( tr_bencDictFindInt( &val, "p", &i ) ) {
940        pex.port = htons( (uint16_t)i );
941        fireClientGotPort( msgs, pex.port );
942        dbgmsg( msgs, "peer's port is now %d", (int)i );
943    }
944
945    if( tr_peerIoIsIncoming( msgs->peer->io )
946        && tr_bencDictFindRaw( &val, "ipv4", &addr, &addr_len )
947        && ( addr_len == 4 ) )
948    {
949        pex.addr.type = TR_AF_INET;
950        memcpy( &pex.addr.addr.addr4, addr, 4 );
951        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
952    }
953
954    if( tr_peerIoIsIncoming( msgs->peer->io )
955        && tr_bencDictFindRaw( &val, "ipv6", &addr, &addr_len )
956        && ( addr_len == 16 ) )
957    {
958        pex.addr.type = TR_AF_INET6;
959        memcpy( &pex.addr.addr.addr6, addr, 16 );
960        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
961    }
962
963    /* get peer's maximum request queue size */
964    if( tr_bencDictFindInt( &val, "reqq", &i ) )
965        msgs->reqq = i;
966
967    tr_bencFree( &val );
968    tr_free( tmp );
969}
970
971static void
972parseUtMetadata( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
973{
974    tr_benc dict;
975    char * msg_end;
976    char * benc_end;
977    int64_t msg_type = -1;
978    int64_t piece = -1;
979    int64_t total_size = 0;
980    uint8_t * tmp = tr_new( uint8_t, msglen );
981
982    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
983    msg_end = (char*)tmp + msglen;
984
985    if( !tr_bencLoad( tmp, msglen, &dict, &benc_end ) )
986    {
987        tr_bencDictFindInt( &dict, "msg_type", &msg_type );
988        tr_bencDictFindInt( &dict, "piece", &piece );
989        tr_bencDictFindInt( &dict, "total_size", &total_size );
990        tr_bencFree( &dict );
991    }
992
993    dbgmsg( msgs, "got ut_metadata msg: type %d, piece %d, total_size %d",
994            (int)msg_type, (int)piece, (int)total_size );
995
996    if( msg_type == METADATA_MSG_TYPE_REJECT )
997    {
998        /* NOOP */
999    }
1000
1001    if( ( msg_type == METADATA_MSG_TYPE_DATA )
1002        && ( !tr_torrentHasMetadata( msgs->torrent ) )
1003        && ( msg_end - benc_end <= METADATA_PIECE_SIZE )
1004        && ( piece * METADATA_PIECE_SIZE + (msg_end - benc_end) <= total_size ) )
1005    {
1006        const int pieceLen = msg_end - benc_end;
1007        tr_torrentSetMetadataPiece( msgs->torrent, piece, benc_end, pieceLen );
1008    }
1009
1010    if( msg_type == METADATA_MSG_TYPE_REQUEST )
1011    {
1012        if( ( piece >= 0 )
1013            && tr_torrentHasMetadata( msgs->torrent )
1014            && !tr_torrentIsPrivate( msgs->torrent )
1015            && ( msgs->peerAskedForMetadataCount < METADATA_REQQ ) )
1016        {
1017            msgs->peerAskedForMetadata[msgs->peerAskedForMetadataCount++] = piece;
1018        }
1019        else
1020        {
1021            tr_benc tmp;
1022            int payloadLen;
1023            char * payload;
1024            struct evbuffer * out = msgs->outMessages;
1025
1026            /* build the rejection message */
1027            tr_bencInitDict( &tmp, 2 );
1028            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1029            tr_bencDictAddInt( &tmp, "piece", piece );
1030            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1031            tr_bencFree( &tmp );
1032
1033            /* write it out as a LTEP message to our outMessages buffer */
1034            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen );
1035            evbuffer_add_uint8 ( out, BT_LTEP );
1036            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1037            evbuffer_add       ( out, payload, payloadLen );
1038            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1039            dbgOutMessageLen( msgs );
1040
1041            tr_free( payload );
1042        }
1043    }
1044
1045    tr_free( tmp );
1046}
1047
1048static void
1049parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1050{
1051    int loaded = 0;
1052    uint8_t * tmp = tr_new( uint8_t, msglen );
1053    tr_benc val;
1054    tr_torrent * tor = msgs->torrent;
1055    const uint8_t * added;
1056    size_t added_len;
1057
1058    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1059
1060    if( tr_torrentAllowsPex( tor )
1061      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1062    {
1063        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1064        {
1065            tr_pex * pex;
1066            size_t i, n;
1067            size_t added_f_len = 0;
1068            const uint8_t * added_f = NULL;
1069
1070            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1071            pex = tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len, &n );
1072
1073            n = MIN( n, MAX_PEX_PEER_COUNT );
1074            for( i=0; i<n; ++i )
1075            {
1076                int seedProbability = -1;
1077                if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
1078                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
1079            }
1080
1081            tr_free( pex );
1082        }
1083
1084        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1085        {
1086            tr_pex * pex;
1087            size_t i, n;
1088            size_t added_f_len = 0;
1089            const uint8_t * added_f = NULL;
1090
1091            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1092            pex = tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len, &n );
1093
1094            n = MIN( n, MAX_PEX_PEER_COUNT );
1095            for( i=0; i<n; ++i )
1096            {
1097                int seedProbability = -1;
1098                if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
1099                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
1100            }
1101
1102            tr_free( pex );
1103        }
1104    }
1105
1106    if( loaded )
1107        tr_bencFree( &val );
1108    tr_free( tmp );
1109}
1110
1111static void sendPex( tr_peermsgs * msgs );
1112
1113static void
1114parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer  * inbuf )
1115{
1116    uint8_t ltep_msgid;
1117
1118    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1119    msglen--;
1120
1121    if( ltep_msgid == LTEP_HANDSHAKE )
1122    {
1123        dbgmsg( msgs, "got ltep handshake" );
1124        parseLtepHandshake( msgs, msglen, inbuf );
1125        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1126        {
1127            sendLtepHandshake( msgs );
1128            sendPex( msgs );
1129        }
1130    }
1131    else if( ltep_msgid == UT_PEX_ID )
1132    {
1133        dbgmsg( msgs, "got ut pex" );
1134        msgs->peerSupportsPex = 1;
1135        parseUtPex( msgs, msglen, inbuf );
1136    }
1137    else if( ltep_msgid == UT_METADATA_ID )
1138    {
1139        dbgmsg( msgs, "got ut metadata" );
1140        msgs->peerSupportsMetadataXfer = 1;
1141        parseUtMetadata( msgs, msglen, inbuf );
1142    }
1143    else
1144    {
1145        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1146        evbuffer_drain( inbuf, msglen );
1147    }
1148}
1149
1150static int
1151readBtLength( tr_peermsgs *     msgs,
1152              struct evbuffer * inbuf,
1153              size_t            inlen )
1154{
1155    uint32_t len;
1156
1157    if( inlen < sizeof( len ) )
1158        return READ_LATER;
1159
1160    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1161
1162    if( len == 0 ) /* peer sent us a keepalive message */
1163        dbgmsg( msgs, "got KeepAlive" );
1164    else
1165    {
1166        msgs->incoming.length = len;
1167        msgs->state = AWAITING_BT_ID;
1168    }
1169
1170    return READ_NOW;
1171}
1172
1173static int readBtMessage( tr_peermsgs     * msgs,
1174                          struct evbuffer * inbuf,
1175                          size_t            inlen );
1176
1177static int
1178readBtId( tr_peermsgs * msgs, struct evbuffer  * inbuf, size_t inlen )
1179{
1180    uint8_t id;
1181
1182    if( inlen < sizeof( uint8_t ) )
1183        return READ_LATER;
1184
1185    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1186    msgs->incoming.id = id;
1187    dbgmsg( msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length );
1188
1189    if( id == BT_PIECE )
1190    {
1191        msgs->state = AWAITING_BT_PIECE;
1192        return READ_NOW;
1193    }
1194    else if( msgs->incoming.length != 1 )
1195    {
1196        msgs->state = AWAITING_BT_MESSAGE;
1197        return READ_NOW;
1198    }
1199    else return readBtMessage( msgs, inbuf, inlen - 1 );
1200}
1201
1202static void
1203updatePeerProgress( tr_peermsgs * msgs )
1204{
1205    tr_peerUpdateProgress( msgs->torrent, msgs->peer );
1206
1207    updateFastSet( msgs );
1208    updateInterest( msgs );
1209}
1210
1211static void
1212prefetchPieces( tr_peermsgs *msgs )
1213{
1214    int i;
1215
1216    if( !getSession(msgs)->isPrefetchEnabled )
1217        return;
1218
1219    /* Maintain 12 prefetched blocks per unchoked peer */
1220    for( i=msgs->prefetchCount; i<msgs->peer->pendingReqsToClient && i<12; ++i )
1221    {
1222        const struct peer_request * req = msgs->peerAskedFor + i;
1223        if( requestIsValid( msgs, req ) )
1224        {
1225            tr_cachePrefetchBlock( getSession(msgs)->cache, msgs->torrent, req->index, req->offset, req->length );
1226            ++msgs->prefetchCount;
1227        }
1228    }
1229}
1230
1231static void
1232peerMadeRequest( tr_peermsgs *               msgs,
1233                 const struct peer_request * req )
1234{
1235    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1236    const int reqIsValid = requestIsValid( msgs, req );
1237    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( &msgs->torrent->completion, req->index );
1238    const int peerIsChoked = msgs->peer->peerIsChoked;
1239
1240    int allow = FALSE;
1241
1242    if( !reqIsValid )
1243        dbgmsg( msgs, "rejecting an invalid request." );
1244    else if( !clientHasPiece )
1245        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1246    else if( peerIsChoked )
1247        dbgmsg( msgs, "rejecting request from choked peer" );
1248    else if( msgs->peer->pendingReqsToClient + 1 >= REQQ )
1249        dbgmsg( msgs, "rejecting request ... reqq is full" );
1250    else
1251        allow = TRUE;
1252
1253    if( allow ) {
1254        msgs->peerAskedFor[msgs->peer->pendingReqsToClient++] = *req;
1255        prefetchPieces( msgs );
1256    } else if( fext ) {
1257        protocolSendReject( msgs, req );
1258    }
1259}
1260
1261static tr_bool
1262messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1263{
1264    switch( id )
1265    {
1266        case BT_CHOKE:
1267        case BT_UNCHOKE:
1268        case BT_INTERESTED:
1269        case BT_NOT_INTERESTED:
1270        case BT_FEXT_HAVE_ALL:
1271        case BT_FEXT_HAVE_NONE:
1272            return len == 1;
1273
1274        case BT_HAVE:
1275        case BT_FEXT_SUGGEST:
1276        case BT_FEXT_ALLOWED_FAST:
1277            return len == 5;
1278
1279        case BT_BITFIELD:
1280            if( tr_torrentHasMetadata( msg->torrent ) )
1281                return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1282            /* we don't know the piece count yet,
1283               so we can only guess whether to send TRUE or FALSE */
1284            if( msg->metadata_size_hint > 0 )
1285                return len <= msg->metadata_size_hint;
1286            return TRUE;
1287
1288        case BT_REQUEST:
1289        case BT_CANCEL:
1290        case BT_FEXT_REJECT:
1291            return len == 13;
1292
1293        case BT_PIECE:
1294            return len > 9 && len <= 16393;
1295
1296        case BT_PORT:
1297            return len == 3;
1298
1299        case BT_LTEP:
1300            return len >= 2;
1301
1302        default:
1303            return FALSE;
1304    }
1305}
1306
1307static int clientGotBlock( tr_peermsgs *               msgs,
1308                           struct evbuffer *           block,
1309                           const struct peer_request * req );
1310
1311static int
1312readBtPiece( tr_peermsgs      * msgs,
1313             struct evbuffer  * inbuf,
1314             size_t             inlen,
1315             size_t           * setme_piece_bytes_read )
1316{
1317    struct peer_request * req = &msgs->incoming.blockReq;
1318
1319    assert( evbuffer_get_length( inbuf ) >= inlen );
1320    dbgmsg( msgs, "In readBtPiece" );
1321
1322    if( !req->length )
1323    {
1324        if( inlen < 8 )
1325            return READ_LATER;
1326
1327        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1328        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1329        req->length = msgs->incoming.length - 9;
1330        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1331        return READ_NOW;
1332    }
1333    else
1334    {
1335        int err;
1336
1337        /* read in another chunk of data */
1338        const size_t nLeft = req->length - evbuffer_get_length( msgs->incoming.block );
1339        size_t n = MIN( nLeft, inlen );
1340        size_t i = n;
1341        void * buf = tr_sessionGetBuffer( getSession( msgs ) );
1342        const size_t buflen = SESSION_BUFFER_SIZE;
1343
1344        while( i > 0 )
1345        {
1346            const size_t thisPass = MIN( i, buflen );
1347            tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, thisPass );
1348            evbuffer_add( msgs->incoming.block, buf, thisPass );
1349            i -= thisPass;
1350        }
1351
1352        tr_sessionReleaseBuffer( getSession( msgs ) );
1353        buf = NULL;
1354
1355        fireClientGotData( msgs, n, TRUE );
1356        *setme_piece_bytes_read += n;
1357        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1358               n, req->index, req->offset, req->length,
1359               (int)( req->length - evbuffer_get_length( msgs->incoming.block ) ) );
1360        if( evbuffer_get_length( msgs->incoming.block ) < req->length )
1361            return READ_LATER;
1362
1363        /* we've got the whole block ... process it */
1364        err = clientGotBlock( msgs, msgs->incoming.block, req );
1365
1366        /* cleanup */
1367        evbuffer_free( msgs->incoming.block );
1368        msgs->incoming.block = evbuffer_new( );
1369        req->length = 0;
1370        msgs->state = AWAITING_BT_LENGTH;
1371        return err ? READ_ERR : READ_NOW;
1372    }
1373}
1374
1375static void updateDesiredRequestCount( tr_peermsgs * msgs );
1376
1377static int
1378readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1379{
1380    uint32_t      ui32;
1381    uint32_t      msglen = msgs->incoming.length;
1382    const uint8_t id = msgs->incoming.id;
1383#ifndef NDEBUG
1384    const size_t  startBufLen = evbuffer_get_length( inbuf );
1385#endif
1386    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1387
1388    --msglen; /* id length */
1389
1390    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1391
1392    if( inlen < msglen )
1393        return READ_LATER;
1394
1395    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1396    {
1397        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1398        fireError( msgs, EMSGSIZE );
1399        return READ_ERR;
1400    }
1401
1402    switch( id )
1403    {
1404        case BT_CHOKE:
1405            dbgmsg( msgs, "got Choke" );
1406            msgs->peer->clientIsChoked = 1;
1407            if( !fext )
1408                fireGotChoke( msgs );
1409            break;
1410
1411        case BT_UNCHOKE:
1412            dbgmsg( msgs, "got Unchoke" );
1413            msgs->peer->clientIsChoked = 0;
1414            updateDesiredRequestCount( msgs );
1415            break;
1416
1417        case BT_INTERESTED:
1418            dbgmsg( msgs, "got Interested" );
1419            msgs->peer->peerIsInterested = 1;
1420            break;
1421
1422        case BT_NOT_INTERESTED:
1423            dbgmsg( msgs, "got Not Interested" );
1424            msgs->peer->peerIsInterested = 0;
1425            break;
1426
1427        case BT_HAVE:
1428            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1429            dbgmsg( msgs, "got Have: %u", ui32 );
1430            if( tr_torrentHasMetadata( msgs->torrent )
1431                    && ( ui32 >= msgs->torrent->info.pieceCount ) )
1432            {
1433                fireError( msgs, ERANGE );
1434                return READ_ERR;
1435            }
1436
1437            /* a peer can send the same HAVE message twice... */
1438            if( !tr_bitsetHas( &msgs->peer->have, ui32 ) ) {
1439                tr_bitsetAdd( &msgs->peer->have, ui32 );
1440                fireClientGotHave( msgs, ui32 );
1441            }
1442            updatePeerProgress( msgs );
1443            break;
1444
1445        case BT_BITFIELD: {
1446            tr_bitfield tmp = TR_BITFIELD_INIT;
1447            const size_t bitCount = tr_torrentHasMetadata( msgs->torrent )
1448                                  ? msgs->torrent->info.pieceCount
1449                                  : msglen * 8;
1450            tr_bitfieldConstruct( &tmp, bitCount );
1451            dbgmsg( msgs, "got a bitfield" );
1452            tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp.bits, msglen );
1453            tr_bitsetSetBitfield( &msgs->peer->have, &tmp );
1454            fireClientGotBitfield( msgs, &tmp );
1455            tr_bitfieldDestruct( &tmp );
1456            updatePeerProgress( msgs );
1457            break;
1458        }
1459
1460        case BT_REQUEST:
1461        {
1462            struct peer_request r;
1463            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1464            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1465            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1466            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1467            peerMadeRequest( msgs, &r );
1468            break;
1469        }
1470
1471        case BT_CANCEL:
1472        {
1473            int i;
1474            struct peer_request r;
1475            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1476            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1477            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1478            tr_historyAdd( &msgs->peer->cancelsSentToClient, tr_time( ), 1 );
1479            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1480
1481            for( i=0; i<msgs->peer->pendingReqsToClient; ++i ) {
1482                const struct peer_request * req = msgs->peerAskedFor + i;
1483                if( ( req->index == r.index ) && ( req->offset == r.offset ) && ( req->length == r.length ) )
1484                    break;
1485            }
1486
1487            if( i < msgs->peer->pendingReqsToClient )
1488                tr_removeElementFromArray( msgs->peerAskedFor, i, sizeof( struct peer_request ),
1489                                           msgs->peer->pendingReqsToClient-- );
1490            break;
1491        }
1492
1493        case BT_PIECE:
1494            assert( 0 ); /* handled elsewhere! */
1495            break;
1496
1497        case BT_PORT:
1498            dbgmsg( msgs, "Got a BT_PORT" );
1499            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->dht_port );
1500            if( msgs->peer->dht_port > 0 )
1501                tr_dhtAddNode( getSession(msgs),
1502                               tr_peerAddress( msgs->peer ),
1503                               msgs->peer->dht_port, 0 );
1504            break;
1505
1506        case BT_FEXT_SUGGEST:
1507            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1508            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1509            if( fext )
1510                fireClientGotSuggest( msgs, ui32 );
1511            else {
1512                fireError( msgs, EMSGSIZE );
1513                return READ_ERR;
1514            }
1515            break;
1516
1517        case BT_FEXT_ALLOWED_FAST:
1518            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1519            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1520            if( fext )
1521                fireClientGotAllowedFast( msgs, ui32 );
1522            else {
1523                fireError( msgs, EMSGSIZE );
1524                return READ_ERR;
1525            }
1526            break;
1527
1528        case BT_FEXT_HAVE_ALL:
1529            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1530            if( fext ) {
1531                tr_bitsetSetHaveAll( &msgs->peer->have );
1532                fireClientGotHaveAll( msgs );
1533                updatePeerProgress( msgs );
1534            } else {
1535                fireError( msgs, EMSGSIZE );
1536                return READ_ERR;
1537            }
1538            break;
1539
1540        case BT_FEXT_HAVE_NONE:
1541            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1542            if( fext ) {
1543                tr_bitsetSetHaveNone( &msgs->peer->have );
1544                fireClientGotHaveNone( msgs );
1545                updatePeerProgress( msgs );
1546            } else {
1547                fireError( msgs, EMSGSIZE );
1548                return READ_ERR;
1549            }
1550            break;
1551
1552        case BT_FEXT_REJECT:
1553        {
1554            struct peer_request r;
1555            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1556            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1557            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1558            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1559            if( fext )
1560                fireGotRej( msgs, &r );
1561            else {
1562                fireError( msgs, EMSGSIZE );
1563                return READ_ERR;
1564            }
1565            break;
1566        }
1567
1568        case BT_LTEP:
1569            dbgmsg( msgs, "Got a BT_LTEP" );
1570            parseLtep( msgs, msglen, inbuf );
1571            break;
1572
1573        default:
1574            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1575            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1576            break;
1577    }
1578
1579    assert( msglen + 1 == msgs->incoming.length );
1580    assert( evbuffer_get_length( inbuf ) == startBufLen - msglen );
1581
1582    msgs->state = AWAITING_BT_LENGTH;
1583    return READ_NOW;
1584}
1585
1586static void
1587addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1588{
1589    if( !msgs->peer->blame )
1590         msgs->peer->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1591    tr_bitfieldAdd( msgs->peer->blame, index );
1592}
1593
1594/* returns 0 on success, or an errno on failure */
1595static int
1596clientGotBlock( tr_peermsgs *               msgs,
1597                struct evbuffer *           data,
1598                const struct peer_request * req )
1599{
1600    int err;
1601    tr_torrent * tor = msgs->torrent;
1602    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1603
1604    assert( msgs );
1605    assert( req );
1606
1607    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1608        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1609                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1610        return EMSGSIZE;
1611    }
1612
1613    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1614
1615    if( !tr_peerMgrDidPeerRequest( msgs->torrent, msgs->peer, block ) ) {
1616        dbgmsg( msgs, "we didn't ask for this message..." );
1617        return 0;
1618    }
1619    if( tr_cpPieceIsComplete( &msgs->torrent->completion, req->index ) ) {
1620        dbgmsg( msgs, "we did ask for this message, but the piece is already complete..." );
1621        return 0;
1622    }
1623
1624    /**
1625    ***  Save the block
1626    **/
1627
1628    if(( err = tr_cacheWriteBlock( getSession(msgs)->cache, tor, req->index, req->offset, req->length, data )))
1629        return err;
1630
1631    addPeerToBlamefield( msgs, req->index );
1632    fireGotBlock( msgs, req );
1633    return 0;
1634}
1635
1636static int peerPulse( void * vmsgs );
1637
1638static void
1639didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1640{
1641    tr_peermsgs * msgs = vmsgs;
1642    firePeerGotData( msgs, bytesWritten, wasPieceData );
1643
1644    if ( tr_isPeerIo( io ) && io->userData )
1645        peerPulse( msgs );
1646}
1647
1648static ReadState
1649canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1650{
1651    ReadState         ret;
1652    tr_peermsgs *     msgs = vmsgs;
1653    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1654    const size_t      inlen = evbuffer_get_length( in );
1655
1656    dbgmsg( msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state );
1657
1658    if( !inlen )
1659    {
1660        ret = READ_LATER;
1661    }
1662    else if( msgs->state == AWAITING_BT_PIECE )
1663    {
1664        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1665    }
1666    else switch( msgs->state )
1667    {
1668        case AWAITING_BT_LENGTH:
1669            ret = readBtLength ( msgs, in, inlen ); break;
1670
1671        case AWAITING_BT_ID:
1672            ret = readBtId     ( msgs, in, inlen ); break;
1673
1674        case AWAITING_BT_MESSAGE:
1675            ret = readBtMessage( msgs, in, inlen ); break;
1676
1677        default:
1678            ret = READ_ERR;
1679            assert( 0 );
1680    }
1681
1682    dbgmsg( msgs, "canRead: ret is %d", (int)ret );
1683
1684    /* log the raw data that was read */
1685    if( ( ret != READ_ERR ) && ( evbuffer_get_length( in ) != inlen ) )
1686        fireClientGotData( msgs, inlen - evbuffer_get_length( in ), FALSE );
1687
1688    return ret;
1689}
1690
1691int
1692tr_peerMsgsIsReadingBlock( const tr_peermsgs * msgs, tr_block_index_t block )
1693{
1694    if( msgs->state != AWAITING_BT_PIECE )
1695        return FALSE;
1696
1697    return block == _tr_block( msgs->torrent,
1698                               msgs->incoming.blockReq.index,
1699                               msgs->incoming.blockReq.offset );
1700}
1701
1702/**
1703***
1704**/
1705
1706static void
1707updateDesiredRequestCount( tr_peermsgs * msgs )
1708{
1709    const tr_torrent * const torrent = msgs->torrent;
1710
1711    if( tr_torrentIsSeed( msgs->torrent ) )
1712    {
1713        msgs->desiredRequestCount = 0;
1714    }
1715    else if( msgs->peer->clientIsChoked )
1716    {
1717        msgs->desiredRequestCount = 0;
1718    }
1719    else if( !msgs->peer->clientIsInterested )
1720    {
1721        msgs->desiredRequestCount = 0;
1722    }
1723    else
1724    {
1725        int estimatedBlocksInPeriod;
1726        int rate_Bps;
1727        int irate_Bps;
1728        const int floor = 4;
1729        const int seconds = REQUEST_BUF_SECS;
1730        const uint64_t now = tr_time_msec( );
1731
1732        /* Get the rate limit we should use.
1733         * FIXME: this needs to consider all the other peers as well... */
1734        rate_Bps = tr_peerGetPieceSpeed_Bps( msgs->peer, now, TR_PEER_TO_CLIENT );
1735        if( tr_torrentUsesSpeedLimit( torrent, TR_PEER_TO_CLIENT ) )
1736            rate_Bps = MIN( rate_Bps, tr_torrentGetSpeedLimit_Bps( torrent, TR_PEER_TO_CLIENT ) );
1737
1738        /* honor the session limits, if enabled */
1739        if( tr_torrentUsesSessionLimits( torrent ) )
1740            if( tr_sessionGetActiveSpeedLimit_Bps( torrent->session, TR_PEER_TO_CLIENT, &irate_Bps ) )
1741                rate_Bps = MIN( rate_Bps, irate_Bps );
1742
1743        /* use this desired rate to figure out how
1744         * many requests we should send to this peer */
1745        estimatedBlocksInPeriod = ( rate_Bps * seconds ) / torrent->blockSize;
1746        msgs->desiredRequestCount = MAX( floor, estimatedBlocksInPeriod );
1747
1748        /* honor the peer's maximum request count, if specified */
1749        if( msgs->reqq > 0 )
1750            if( msgs->desiredRequestCount > msgs->reqq )
1751                msgs->desiredRequestCount = msgs->reqq;
1752    }
1753}
1754
1755static void
1756updateMetadataRequests( tr_peermsgs * msgs, time_t now )
1757{
1758    int piece;
1759
1760    if( msgs->peerSupportsMetadataXfer
1761        && tr_torrentGetNextMetadataRequest( msgs->torrent, now, &piece ) )
1762    {
1763        tr_benc tmp;
1764        int payloadLen;
1765        char * payload;
1766        struct evbuffer * out = msgs->outMessages;
1767
1768        /* build the data message */
1769        tr_bencInitDict( &tmp, 3 );
1770        tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REQUEST );
1771        tr_bencDictAddInt( &tmp, "piece", piece );
1772        payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1773        tr_bencFree( &tmp );
1774
1775        dbgmsg( msgs, "requesting metadata piece #%d", piece );
1776
1777        /* write it out as a LTEP message to our outMessages buffer */
1778        evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen );
1779        evbuffer_add_uint8 ( out, BT_LTEP );
1780        evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1781        evbuffer_add       ( out, payload, payloadLen );
1782        pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1783        dbgOutMessageLen( msgs );
1784
1785        tr_free( payload );
1786    }
1787}
1788
1789static void
1790updateBlockRequests( tr_peermsgs * msgs )
1791{
1792    if( tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT )
1793        && ( msgs->desiredRequestCount > 0 )
1794        && ( msgs->peer->pendingReqsToPeer <= ( msgs->desiredRequestCount * 0.66 ) ) )
1795    {
1796        int i;
1797        int n;
1798        const int numwant = msgs->desiredRequestCount - msgs->peer->pendingReqsToPeer;
1799        tr_block_index_t * blocks = tr_new( tr_block_index_t, numwant );
1800
1801        tr_peerMgrGetNextRequests( msgs->torrent, msgs->peer, numwant, blocks, &n );
1802
1803        for( i=0; i<n; ++i )
1804        {
1805            struct peer_request req;
1806            blockToReq( msgs->torrent, blocks[i], &req );
1807            protocolSendRequest( msgs, &req );
1808        }
1809
1810        tr_free( blocks );
1811    }
1812}
1813
1814static size_t
1815fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1816{
1817    int piece;
1818    size_t bytesWritten = 0;
1819    struct peer_request req;
1820    const tr_bool haveMessages = evbuffer_get_length( msgs->outMessages ) != 0;
1821    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1822
1823    /**
1824    ***  Protocol messages
1825    **/
1826
1827    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1828    {
1829        dbgmsg( msgs, "started an outMessages batch (length is %zu)", evbuffer_get_length( msgs->outMessages ) );
1830        msgs->outMessagesBatchedAt = now;
1831    }
1832    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1833    {
1834        const size_t len = evbuffer_get_length( msgs->outMessages );
1835        /* flush the protocol messages */
1836        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1837        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, FALSE );
1838        msgs->clientSentAnythingAt = now;
1839        msgs->outMessagesBatchedAt = 0;
1840        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1841        bytesWritten +=  len;
1842    }
1843
1844    /**
1845    ***  Metadata Pieces
1846    **/
1847
1848    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= METADATA_PIECE_SIZE )
1849        && popNextMetadataRequest( msgs, &piece ) )
1850    {
1851        char * data;
1852        int dataLen;
1853        tr_bool ok = FALSE;
1854
1855        data = tr_torrentGetMetadataPiece( msgs->torrent, piece, &dataLen );
1856        if( ( dataLen > 0 ) && ( data != NULL ) )
1857        {
1858            tr_benc tmp;
1859            int payloadLen;
1860            char * payload;
1861            struct evbuffer * out = msgs->outMessages;
1862
1863            /* build the data message */
1864            tr_bencInitDict( &tmp, 3 );
1865            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_DATA );
1866            tr_bencDictAddInt( &tmp, "piece", piece );
1867            tr_bencDictAddInt( &tmp, "total_size", msgs->torrent->infoDictLength );
1868            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1869            tr_bencFree( &tmp );
1870
1871            /* write it out as a LTEP message to our outMessages buffer */
1872            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen + dataLen );
1873            evbuffer_add_uint8 ( out, BT_LTEP );
1874            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1875            evbuffer_add       ( out, payload, payloadLen );
1876            evbuffer_add       ( out, data, dataLen );
1877            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1878            dbgOutMessageLen( msgs );
1879
1880            tr_free( payload );
1881            tr_free( data );
1882
1883            ok = TRUE;
1884        }
1885
1886        if( !ok ) /* send a rejection message */
1887        {
1888            tr_benc tmp;
1889            int payloadLen;
1890            char * payload;
1891            struct evbuffer * out = msgs->outMessages;
1892
1893            /* build the rejection message */
1894            tr_bencInitDict( &tmp, 2 );
1895            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1896            tr_bencDictAddInt( &tmp, "piece", piece );
1897            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1898            tr_bencFree( &tmp );
1899
1900            /* write it out as a LTEP message to our outMessages buffer */
1901            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen );
1902            evbuffer_add_uint8 ( out, BT_LTEP );
1903            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1904            evbuffer_add       ( out, payload, payloadLen );
1905            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1906            dbgOutMessageLen( msgs );
1907
1908            tr_free( payload );
1909        }
1910    }
1911
1912    /**
1913    ***  Data Blocks
1914    **/
1915
1916    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= msgs->torrent->blockSize )
1917        && popNextRequest( msgs, &req ) )
1918    {
1919        --msgs->prefetchCount;
1920
1921        if( requestIsValid( msgs, &req )
1922            && tr_cpPieceIsComplete( &msgs->torrent->completion, req.index ) )
1923        {
1924            int err;
1925            const uint32_t msglen = 4 + 1 + 4 + 4 + req.length;
1926            struct evbuffer * out;
1927            struct evbuffer_iovec iovec[1];
1928
1929            out = evbuffer_new( );
1930            evbuffer_expand( out, msglen );
1931
1932            evbuffer_add_uint32( out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1933            evbuffer_add_uint8 ( out, BT_PIECE );
1934            evbuffer_add_uint32( out, req.index );
1935            evbuffer_add_uint32( out, req.offset );
1936
1937            evbuffer_reserve_space( out, req.length, iovec, 1 );
1938            err = tr_cacheReadBlock( getSession(msgs)->cache, msgs->torrent, req.index, req.offset, req.length, iovec[0].iov_base );
1939            iovec[0].iov_len = req.length;
1940            evbuffer_commit_space( out, iovec, 1 );
1941
1942            /* check the piece if it needs checking... */
1943            if( !err && tr_torrentPieceNeedsCheck( msgs->torrent, req.index ) )
1944                if(( err = !tr_torrentCheckPiece( msgs->torrent, req.index )))
1945                    tr_torrentSetLocalError( msgs->torrent, _( "Please Verify Local Data! Piece #%zu is corrupt." ), (size_t)req.index );
1946
1947            if( err )
1948            {
1949                if( fext )
1950                    protocolSendReject( msgs, &req );
1951            }
1952            else
1953            {
1954                const size_t n = evbuffer_get_length( out );
1955                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1956                assert( n == msglen );
1957                tr_peerIoWriteBuf( msgs->peer->io, out, TRUE );
1958                bytesWritten += n;
1959                msgs->clientSentAnythingAt = now;
1960                tr_historyAdd( &msgs->peer->blocksSentToPeer, tr_time( ), 1 );
1961            }
1962
1963            evbuffer_free( out );
1964
1965            if( err )
1966            {
1967                bytesWritten = 0;
1968                msgs = NULL;
1969            }
1970        }
1971        else if( fext ) /* peer needs a reject message */
1972        {
1973            protocolSendReject( msgs, &req );
1974        }
1975
1976        if( msgs != NULL )
1977            prefetchPieces( msgs );
1978    }
1979
1980    /**
1981    ***  Keepalive
1982    **/
1983
1984    if( ( msgs != NULL )
1985        && ( msgs->clientSentAnythingAt != 0 )
1986        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1987    {
1988        dbgmsg( msgs, "sending a keepalive message" );
1989        evbuffer_add_uint32( msgs->outMessages, 0 );
1990        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1991    }
1992
1993    return bytesWritten;
1994}
1995
1996static int
1997peerPulse( void * vmsgs )
1998{
1999    tr_peermsgs * msgs = vmsgs;
2000    const time_t  now = tr_time( );
2001
2002    if ( tr_isPeerIo( msgs->peer->io ) ) {
2003        updateDesiredRequestCount( msgs );
2004        updateBlockRequests( msgs );
2005        updateMetadataRequests( msgs, now );
2006    }
2007
2008    for( ;; )
2009        if( fillOutputBuffer( msgs, now ) < 1 )
2010            break;
2011
2012    return TRUE; /* loop forever */
2013}
2014
2015void
2016tr_peerMsgsPulse( tr_peermsgs * msgs )
2017{
2018    if( msgs != NULL )
2019        peerPulse( msgs );
2020}
2021
2022static void
2023gotError( tr_peerIo  * io UNUSED,
2024          short        what,
2025          void       * vmsgs )
2026{
2027    if( what & BEV_EVENT_TIMEOUT )
2028        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
2029    if( what & ( BEV_EVENT_EOF | BEV_EVENT_ERROR ) )
2030        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
2031               what, errno, tr_strerror( errno ) );
2032    fireError( vmsgs, ENOTCONN );
2033}
2034
2035static void
2036sendBitfield( tr_peermsgs * msgs )
2037{
2038    struct evbuffer * out = msgs->outMessages;
2039    tr_bitfield * bf = tr_cpCreatePieceBitfield( &msgs->torrent->completion );
2040
2041    evbuffer_add_uint32( out, sizeof( uint8_t ) + bf->byteCount );
2042    evbuffer_add_uint8 ( out, BT_BITFIELD );
2043    evbuffer_add       ( out, bf->bits, bf->byteCount );
2044    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu", evbuffer_get_length( out ) );
2045    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
2046
2047    tr_bitfieldFree( bf );
2048}
2049
2050static void
2051tellPeerWhatWeHave( tr_peermsgs * msgs )
2052{
2053    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
2054
2055    if( fext && ( tr_cpBlockBitset( &msgs->torrent->completion )->haveAll ) )
2056    {
2057        protocolSendHaveAll( msgs );
2058    }
2059    else if( fext && ( tr_cpBlockBitset( &msgs->torrent->completion )->haveNone ) )
2060    {
2061        protocolSendHaveNone( msgs );
2062    }
2063    else
2064    {
2065        sendBitfield( msgs );
2066    }
2067}
2068
2069/**
2070***
2071**/
2072
2073/* some peers give us error messages if we send
2074   more than this many peers in a single pex message
2075   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
2076#define MAX_PEX_ADDED 50
2077#define MAX_PEX_DROPPED 50
2078
2079typedef struct
2080{
2081    tr_pex *  added;
2082    tr_pex *  dropped;
2083    tr_pex *  elements;
2084    int       addedCount;
2085    int       droppedCount;
2086    int       elementCount;
2087}
2088PexDiffs;
2089
2090static void
2091pexAddedCb( void * vpex,
2092            void * userData )
2093{
2094    PexDiffs * diffs = userData;
2095    tr_pex *   pex = vpex;
2096
2097    if( diffs->addedCount < MAX_PEX_ADDED )
2098    {
2099        diffs->added[diffs->addedCount++] = *pex;
2100        diffs->elements[diffs->elementCount++] = *pex;
2101    }
2102}
2103
2104static inline void
2105pexDroppedCb( void * vpex,
2106              void * userData )
2107{
2108    PexDiffs * diffs = userData;
2109    tr_pex *   pex = vpex;
2110
2111    if( diffs->droppedCount < MAX_PEX_DROPPED )
2112    {
2113        diffs->dropped[diffs->droppedCount++] = *pex;
2114    }
2115}
2116
2117static inline void
2118pexElementCb( void * vpex,
2119              void * userData )
2120{
2121    PexDiffs * diffs = userData;
2122    tr_pex * pex = vpex;
2123
2124    diffs->elements[diffs->elementCount++] = *pex;
2125}
2126
2127static void
2128sendPex( tr_peermsgs * msgs )
2129{
2130    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2131    {
2132        PexDiffs diffs;
2133        PexDiffs diffs6;
2134        tr_pex * newPex = NULL;
2135        tr_pex * newPex6 = NULL;
2136        const int newCount = tr_peerMgrGetPeers( msgs->torrent, &newPex, TR_AF_INET, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2137        const int newCount6 = tr_peerMgrGetPeers( msgs->torrent, &newPex6, TR_AF_INET6, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2138
2139        /* build the diffs */
2140        diffs.added = tr_new( tr_pex, newCount );
2141        diffs.addedCount = 0;
2142        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2143        diffs.droppedCount = 0;
2144        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2145        diffs.elementCount = 0;
2146        tr_set_compare( msgs->pex, msgs->pexCount,
2147                        newPex, newCount,
2148                        tr_pexCompare, sizeof( tr_pex ),
2149                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2150        diffs6.added = tr_new( tr_pex, newCount6 );
2151        diffs6.addedCount = 0;
2152        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
2153        diffs6.droppedCount = 0;
2154        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
2155        diffs6.elementCount = 0;
2156        tr_set_compare( msgs->pex6, msgs->pexCount6,
2157                        newPex6, newCount6,
2158                        tr_pexCompare, sizeof( tr_pex ),
2159                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
2160        dbgmsg(
2161            msgs,
2162            "pex: old peer count %d+%d, new peer count %d+%d, "
2163            "added %d+%d, removed %d+%d",
2164            msgs->pexCount, msgs->pexCount6, newCount, newCount6,
2165            diffs.addedCount, diffs6.addedCount,
2166            diffs.droppedCount, diffs6.droppedCount );
2167
2168        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2169            !diffs6.droppedCount )
2170        {
2171            tr_free( diffs.elements );
2172            tr_free( diffs6.elements );
2173        }
2174        else
2175        {
2176            int  i;
2177            tr_benc val;
2178            char * benc;
2179            int bencLen;
2180            uint8_t * tmp, *walk;
2181            struct evbuffer * out = msgs->outMessages;
2182
2183            /* update peer */
2184            tr_free( msgs->pex );
2185            msgs->pex = diffs.elements;
2186            msgs->pexCount = diffs.elementCount;
2187            tr_free( msgs->pex6 );
2188            msgs->pex6 = diffs6.elements;
2189            msgs->pexCount6 = diffs6.elementCount;
2190
2191            /* build the pex payload */
2192            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
2193                                         * speed vs. likelihood? */
2194
2195            if( diffs.addedCount > 0)
2196            {
2197                /* "added" */
2198                tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2199                for( i = 0; i < diffs.addedCount; ++i ) {
2200                    memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2201                    memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2202                }
2203                assert( ( walk - tmp ) == diffs.addedCount * 6 );
2204                tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2205                tr_free( tmp );
2206
2207                /* "added.f"
2208                 * unset each holepunch flag because we don't support it. */
2209                tmp = walk = tr_new( uint8_t, diffs.addedCount );
2210                for( i = 0; i < diffs.addedCount; ++i )
2211                    *walk++ = diffs.added[i].flags & ~ADDED_F_HOLEPUNCH;
2212                assert( ( walk - tmp ) == diffs.addedCount );
2213                tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2214                tr_free( tmp );
2215            }
2216
2217            if( diffs.droppedCount > 0 )
2218            {
2219                /* "dropped" */
2220                tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2221                for( i = 0; i < diffs.droppedCount; ++i ) {
2222                    memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2223                    memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2224                }
2225                assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2226                tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2227                tr_free( tmp );
2228            }
2229
2230            if( diffs6.addedCount > 0 )
2231            {
2232                /* "added6" */
2233                tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2234                for( i = 0; i < diffs6.addedCount; ++i ) {
2235                    memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2236                    walk += 16;
2237                    memcpy( walk, &diffs6.added[i].port, 2 );
2238                    walk += 2;
2239                }
2240                assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2241                tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2242                tr_free( tmp );
2243
2244                /* "added6.f"
2245                 * unset each holepunch flag because we don't support it. */
2246                tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2247                for( i = 0; i < diffs6.addedCount; ++i )
2248                    *walk++ = diffs6.added[i].flags & ~ADDED_F_HOLEPUNCH;
2249                assert( ( walk - tmp ) == diffs6.addedCount );
2250                tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2251                tr_free( tmp );
2252            }
2253
2254            if( diffs6.droppedCount > 0 )
2255            {
2256                /* "dropped6" */
2257                tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2258                for( i = 0; i < diffs6.droppedCount; ++i ) {
2259                    memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2260                    walk += 16;
2261                    memcpy( walk, &diffs6.dropped[i].port, 2 );
2262                    walk += 2;
2263                }
2264                assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2265                tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2266                tr_free( tmp );
2267            }
2268
2269            /* write the pex message */
2270            benc = tr_bencToStr( &val, TR_FMT_BENC, &bencLen );
2271            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + bencLen );
2272            evbuffer_add_uint8 ( out, BT_LTEP );
2273            evbuffer_add_uint8 ( out, msgs->ut_pex_id );
2274            evbuffer_add       ( out, benc, bencLen );
2275            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2276            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", evbuffer_get_length( out ) );
2277            dbgOutMessageLen( msgs );
2278
2279            tr_free( benc );
2280            tr_bencFree( &val );
2281        }
2282
2283        /* cleanup */
2284        tr_free( diffs.added );
2285        tr_free( diffs.dropped );
2286        tr_free( newPex );
2287        tr_free( diffs6.added );
2288        tr_free( diffs6.dropped );
2289        tr_free( newPex6 );
2290
2291        /*msgs->clientSentPexAt = tr_time( );*/
2292    }
2293}
2294
2295static void
2296pexPulse( int foo UNUSED, short bar UNUSED, void * vmsgs )
2297{
2298    struct tr_peermsgs * msgs = vmsgs;
2299
2300    sendPex( msgs );
2301
2302    tr_timerAdd( msgs->pexTimer, PEX_INTERVAL_SECS, 0 );
2303}
2304
2305/**
2306***
2307**/
2308
2309tr_peermsgs*
2310tr_peerMsgsNew( struct tr_torrent    * torrent,
2311                struct tr_peer       * peer,
2312                tr_peer_callback     * callback,
2313                void                 * callbackData )
2314{
2315    tr_peermsgs * m;
2316
2317    assert( peer );
2318    assert( peer->io );
2319
2320    m = tr_new0( tr_peermsgs, 1 );
2321    m->callback = callback;
2322    m->callbackData = callbackData;
2323    m->peer = peer;
2324    m->torrent = torrent;
2325    m->peer->clientIsChoked = 1;
2326    m->peer->peerIsChoked = 1;
2327    m->peer->clientIsInterested = 0;
2328    m->peer->peerIsInterested = 0;
2329    m->state = AWAITING_BT_LENGTH;
2330    m->outMessages = evbuffer_new( );
2331    m->outMessagesBatchedAt = 0;
2332    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2333    m->incoming.block = evbuffer_new( );
2334    m->pexTimer = evtimer_new( torrent->session->event_base, pexPulse, m );
2335    peer->msgs = m;
2336    tr_timerAdd( m->pexTimer, PEX_INTERVAL_SECS, 0 );
2337
2338    if( tr_peerIoSupportsUTP( peer->io ) ) {
2339        const tr_address * addr = tr_peerIoGetAddress( peer->io, NULL );
2340        tr_peerMgrSetUtpSupported( torrent, addr );
2341        tr_peerMgrSetUtpFailed( torrent, addr, FALSE );
2342    }
2343
2344    if( tr_peerIoSupportsLTEP( peer->io ) )
2345        sendLtepHandshake( m );
2346
2347    tellPeerWhatWeHave( m );
2348
2349    if( tr_dhtEnabled( torrent->session ) && tr_peerIoSupportsDHT( peer->io ))
2350    {
2351        /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
2352        const struct tr_address *addr = tr_peerIoGetAddress( peer->io, NULL );
2353        if( addr->type == TR_AF_INET || tr_globalIPv6() ) {
2354            protocolSendPort( m, tr_dhtPort( torrent->session ) );
2355        }
2356    }
2357
2358    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2359    updateDesiredRequestCount( m );
2360
2361    return m;
2362}
2363
2364void
2365tr_peerMsgsFree( tr_peermsgs* msgs )
2366{
2367    if( msgs )
2368    {
2369        event_free( msgs->pexTimer );
2370
2371        evbuffer_free( msgs->incoming.block );
2372        evbuffer_free( msgs->outMessages );
2373        tr_free( msgs->pex6 );
2374        tr_free( msgs->pex );
2375
2376        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2377        tr_free( msgs );
2378    }
2379}
Note: See TracBrowser for help on using the repository browser.