source: trunk/libtransmission/peer-msgs.c @ 12224

Last change on this file since 12224 was 12224, checked in by jordan, 11 years ago

(trunk libT) copyediting: clean up the "#include <event2/*>" directives in libtransmission's header files

  • Property svn:keywords set to Date Rev Author Id
File size: 72.0 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 12224 2011-03-24 22:45:04Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdarg.h>
17#include <stdio.h>
18#include <stdlib.h>
19#include <string.h>
20
21#include <event2/buffer.h>
22#include <event2/bufferevent.h>
23#include <event2/event.h>
24
25#include "transmission.h"
26#include "bencode.h"
27#include "cache.h"
28#include "completion.h"
29#include "crypto.h"
30#ifdef WIN32
31#include "net.h" /* for ECONN */
32#endif
33#include "peer-io.h"
34#include "peer-mgr.h"
35#include "peer-msgs.h"
36#include "session.h"
37#include "stats.h"
38#include "torrent.h"
39#include "torrent-magnet.h"
40#include "tr-dht.h"
41#include "utils.h"
42#include "version.h"
43
44/**
45***
46**/
47
48enum
49{
50    BT_CHOKE                = 0,
51    BT_UNCHOKE              = 1,
52    BT_INTERESTED           = 2,
53    BT_NOT_INTERESTED       = 3,
54    BT_HAVE                 = 4,
55    BT_BITFIELD             = 5,
56    BT_REQUEST              = 6,
57    BT_PIECE                = 7,
58    BT_CANCEL               = 8,
59    BT_PORT                 = 9,
60
61    BT_FEXT_SUGGEST         = 13,
62    BT_FEXT_HAVE_ALL        = 14,
63    BT_FEXT_HAVE_NONE       = 15,
64    BT_FEXT_REJECT          = 16,
65    BT_FEXT_ALLOWED_FAST    = 17,
66
67    BT_LTEP                 = 20,
68
69    LTEP_HANDSHAKE          = 0,
70
71    UT_PEX_ID               = 1,
72    UT_METADATA_ID          = 3,
73
74    MAX_PEX_PEER_COUNT      = 50,
75
76    MIN_CHOKE_PERIOD_SEC    = 10,
77
78    /* idle seconds before we send a keepalive */
79    KEEPALIVE_INTERVAL_SECS = 100,
80
81    PEX_INTERVAL_SECS       = 90, /* sec between sendPex() calls */
82
83    REQQ                    = 512,
84
85    METADATA_REQQ           = 64,
86
87    /* used in lowering the outMessages queue period */
88    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
89    HIGH_PRIORITY_INTERVAL_SECS = 2,
90    LOW_PRIORITY_INTERVAL_SECS = 10,
91
92    /* number of pieces we'll allow in our fast set */
93    MAX_FAST_SET_SIZE = 3,
94
95    /* defined in BEP #9 */
96    METADATA_MSG_TYPE_REQUEST = 0,
97    METADATA_MSG_TYPE_DATA = 1,
98    METADATA_MSG_TYPE_REJECT = 2
99};
100
101enum
102{
103    AWAITING_BT_LENGTH,
104    AWAITING_BT_ID,
105    AWAITING_BT_MESSAGE,
106    AWAITING_BT_PIECE
107};
108
109/**
110***
111**/
112
113struct peer_request
114{
115    uint32_t    index;
116    uint32_t    offset;
117    uint32_t    length;
118};
119
120static void
121blockToReq( const tr_torrent     * tor,
122            tr_block_index_t       block,
123            struct peer_request  * setme )
124{
125    tr_torrentGetBlockLocation( tor, block, &setme->index,
126                                            &setme->offset,
127                                            &setme->length );
128}
129
130/**
131***
132**/
133
134/* this is raw, unchanged data from the peer regarding
135 * the current message that it's sending us. */
136struct tr_incoming
137{
138    uint8_t                id;
139    uint32_t               length; /* includes the +1 for id length */
140    struct peer_request    blockReq; /* metadata for incoming blocks */
141    struct evbuffer *      block; /* piece data for incoming blocks */
142};
143
144/**
145 * Low-level communication state information about a connected peer.
146 *
147 * This structure remembers the low-level protocol states that we're
148 * in with this peer, such as active requests, pex messages, and so on.
149 * Its fields are all private to peer-msgs.c.
150 *
151 * Data not directly involved with sending & receiving messages is
152 * stored in tr_peer, where it can be accessed by both peermsgs and
153 * the peer manager.
154 *
155 * @see struct peer_atom
156 * @see tr_peer
157 */
158struct tr_peermsgs
159{
160    bool            peerSupportsPex;
161    bool            peerSupportsMetadataXfer;
162    bool            clientSentLtepHandshake;
163    bool            peerSentLtepHandshake;
164
165    /*bool          haveFastSet;*/
166
167    int             desiredRequestCount;
168
169    int             prefetchCount;
170
171    /* how long the outMessages batch should be allowed to grow before
172     * it's flushed -- some messages (like requests >:) should be sent
173     * very quickly; others aren't as urgent. */
174    int8_t          outMessagesBatchPeriod;
175
176    uint8_t         state;
177    uint8_t         ut_pex_id;
178    uint8_t         ut_metadata_id;
179    uint16_t        pexCount;
180    uint16_t        pexCount6;
181
182    size_t          metadata_size_hint;
183#if 0
184    size_t                 fastsetSize;
185    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
186#endif
187
188    tr_peer *              peer;
189
190    tr_torrent *           torrent;
191
192    tr_peer_callback      * callback;
193    void                  * callbackData;
194
195    struct evbuffer *      outMessages; /* all the non-piece messages */
196
197    struct peer_request    peerAskedFor[REQQ];
198
199    int                    peerAskedForMetadata[METADATA_REQQ];
200    int                    peerAskedForMetadataCount;
201
202    tr_pex               * pex;
203    tr_pex               * pex6;
204
205    /*time_t                 clientSentPexAt;*/
206    time_t                 clientSentAnythingAt;
207
208    /* when we started batching the outMessages */
209    time_t                outMessagesBatchedAt;
210
211    struct tr_incoming    incoming;
212
213    /* if the peer supports the Extension Protocol in BEP 10 and
214       supplied a reqq argument, it's stored here. Otherwise, the
215       value is zero and should be ignored. */
216    int64_t               reqq;
217
218    struct event        * pexTimer;
219};
220
221/**
222***
223**/
224
225#if 0
226static tr_bitfield*
227getHave( const struct tr_peermsgs * msgs )
228{
229    if( msgs->peer->have == NULL )
230        msgs->peer->have = tr_bitfieldNew( msgs->torrent->info.pieceCount );
231    return msgs->peer->have;
232}
233#endif
234
235static inline tr_session*
236getSession( struct tr_peermsgs * msgs )
237{
238    return msgs->torrent->session;
239}
240
241/**
242***
243**/
244
245static void
246myDebug( const char * file, int line,
247         const struct tr_peermsgs * msgs,
248         const char * fmt, ... )
249{
250    FILE * fp = tr_getLog( );
251
252    if( fp )
253    {
254        va_list           args;
255        char              timestr[64];
256        struct evbuffer * buf = evbuffer_new( );
257        char *            base = tr_basename( file );
258
259        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
260                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
261                             tr_torrentName( msgs->torrent ),
262                             tr_peerIoGetAddrStr( msgs->peer->io ),
263                             msgs->peer->client );
264        va_start( args, fmt );
265        evbuffer_add_vprintf( buf, fmt, args );
266        va_end( args );
267        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
268        fputs( (const char*)evbuffer_pullup( buf, -1 ), fp );
269
270        tr_free( base );
271        evbuffer_free( buf );
272    }
273}
274
275#define dbgmsg( msgs, ... ) \
276    do { \
277        if( tr_deepLoggingIsActive( ) ) \
278            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
279    } while( 0 )
280
281/**
282***
283**/
284
285static void
286pokeBatchPeriod( tr_peermsgs * msgs, int interval )
287{
288    if( msgs->outMessagesBatchPeriod > interval )
289    {
290        msgs->outMessagesBatchPeriod = interval;
291        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
292    }
293}
294
295static void
296dbgOutMessageLen( tr_peermsgs * msgs )
297{
298    dbgmsg( msgs, "outMessage size is now %zu", evbuffer_get_length( msgs->outMessages ) );
299}
300
301static void
302protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
303{
304    struct evbuffer * out = msgs->outMessages;
305
306    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
307
308    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
309    evbuffer_add_uint8 ( out, BT_FEXT_REJECT );
310    evbuffer_add_uint32( out, req->index );
311    evbuffer_add_uint32( out, req->offset );
312    evbuffer_add_uint32( out, req->length );
313
314    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
315    dbgOutMessageLen( msgs );
316}
317
318static void
319protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
320{
321    struct evbuffer * out = msgs->outMessages;
322
323    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
324    evbuffer_add_uint8 ( out, BT_REQUEST );
325    evbuffer_add_uint32( out, req->index );
326    evbuffer_add_uint32( out, req->offset );
327    evbuffer_add_uint32( out, req->length );
328
329    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
330    dbgOutMessageLen( msgs );
331    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
332}
333
334static void
335protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
336{
337    struct evbuffer * out = msgs->outMessages;
338
339    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
340    evbuffer_add_uint8 ( out, BT_CANCEL );
341    evbuffer_add_uint32( out, req->index );
342    evbuffer_add_uint32( out, req->offset );
343    evbuffer_add_uint32( out, req->length );
344
345    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
346    dbgOutMessageLen( msgs );
347    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
348}
349
350static void
351protocolSendPort(tr_peermsgs *msgs, uint16_t port)
352{
353    struct evbuffer * out = msgs->outMessages;
354
355    dbgmsg( msgs, "sending Port %u", port);
356    evbuffer_add_uint32( out, 3 );
357    evbuffer_add_uint8 ( out, BT_PORT );
358    evbuffer_add_uint16( out, port);
359}
360
361static void
362protocolSendHave( tr_peermsgs * msgs, uint32_t index )
363{
364    struct evbuffer * out = msgs->outMessages;
365
366    evbuffer_add_uint32( out, sizeof(uint8_t) + sizeof(uint32_t) );
367    evbuffer_add_uint8 ( out, BT_HAVE );
368    evbuffer_add_uint32( out, index );
369
370    dbgmsg( msgs, "sending Have %u", index );
371    dbgOutMessageLen( msgs );
372    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
373}
374
375#if 0
376static void
377protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
378{
379    tr_peerIo       * io  = msgs->peer->io;
380    struct evbuffer * out = msgs->outMessages;
381
382    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
383
384    evbuffer_add_uint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
385    evbuffer_add_uint8 ( io, out, BT_FEXT_ALLOWED_FAST );
386    evbuffer_add_uint32( io, out, pieceIndex );
387
388    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
389    dbgOutMessageLen( msgs );
390}
391#endif
392
393static void
394protocolSendChoke( tr_peermsgs * msgs, int choke )
395{
396    struct evbuffer * out = msgs->outMessages;
397
398    evbuffer_add_uint32( out, sizeof( uint8_t ) );
399    evbuffer_add_uint8 ( out, choke ? BT_CHOKE : BT_UNCHOKE );
400
401    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
402    dbgOutMessageLen( msgs );
403    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
404}
405
406static void
407protocolSendHaveAll( tr_peermsgs * msgs )
408{
409    struct evbuffer * out = msgs->outMessages;
410
411    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
412
413    evbuffer_add_uint32( out, sizeof( uint8_t ) );
414    evbuffer_add_uint8 ( out, BT_FEXT_HAVE_ALL );
415
416    dbgmsg( msgs, "sending HAVE_ALL..." );
417    dbgOutMessageLen( msgs );
418    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
419}
420
421static void
422protocolSendHaveNone( tr_peermsgs * msgs )
423{
424    struct evbuffer * out = msgs->outMessages;
425
426    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
427
428    evbuffer_add_uint32( out, sizeof( uint8_t ) );
429    evbuffer_add_uint8 ( out, BT_FEXT_HAVE_NONE );
430
431    dbgmsg( msgs, "sending HAVE_NONE..." );
432    dbgOutMessageLen( msgs );
433    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
434}
435
436/**
437***  EVENTS
438**/
439
440static void
441publish( tr_peermsgs * msgs, tr_peer_event * e )
442{
443    assert( msgs->peer );
444    assert( msgs->peer->msgs == msgs );
445
446    if( msgs->callback != NULL )
447        msgs->callback( msgs->peer, e, msgs->callbackData );
448}
449
450static void
451fireError( tr_peermsgs * msgs, int err )
452{
453    tr_peer_event e = TR_PEER_EVENT_INIT;
454    e.eventType = TR_PEER_ERROR;
455    e.err = err;
456    publish( msgs, &e );
457}
458
459static void
460fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
461{
462    tr_peer_event e = TR_PEER_EVENT_INIT;
463    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
464    e.pieceIndex = req->index;
465    e.offset = req->offset;
466    e.length = req->length;
467    publish( msgs, &e );
468}
469
470static void
471fireGotRej( tr_peermsgs * msgs, const struct peer_request * req )
472{
473    tr_peer_event e = TR_PEER_EVENT_INIT;
474    e.eventType = TR_PEER_CLIENT_GOT_REJ;
475    e.pieceIndex = req->index;
476    e.offset = req->offset;
477    e.length = req->length;
478    publish( msgs, &e );
479}
480
481static void
482fireGotChoke( tr_peermsgs * msgs )
483{
484    tr_peer_event e = TR_PEER_EVENT_INIT;
485    e.eventType = TR_PEER_CLIENT_GOT_CHOKE;
486    publish( msgs, &e );
487}
488
489static void
490fireClientGotHaveAll( tr_peermsgs * msgs )
491{
492    tr_peer_event e = TR_PEER_EVENT_INIT;
493    e.eventType = TR_PEER_CLIENT_GOT_HAVE_ALL;
494    publish( msgs, &e );
495}
496
497static void
498fireClientGotHaveNone( tr_peermsgs * msgs )
499{
500    tr_peer_event e = TR_PEER_EVENT_INIT;
501    e.eventType = TR_PEER_CLIENT_GOT_HAVE_NONE;
502    publish( msgs, &e );
503}
504
505static void
506fireClientGotData( tr_peermsgs * msgs, uint32_t length, int wasPieceData )
507{
508    tr_peer_event e = TR_PEER_EVENT_INIT;
509
510    e.length = length;
511    e.eventType = TR_PEER_CLIENT_GOT_DATA;
512    e.wasPieceData = wasPieceData;
513    publish( msgs, &e );
514}
515
516static void
517fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
518{
519    tr_peer_event e = TR_PEER_EVENT_INIT;
520    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
521    e.pieceIndex = pieceIndex;
522    publish( msgs, &e );
523}
524
525static void
526fireClientGotPort( tr_peermsgs * msgs, tr_port port )
527{
528    tr_peer_event e = TR_PEER_EVENT_INIT;
529    e.eventType = TR_PEER_CLIENT_GOT_PORT;
530    e.port = port;
531    publish( msgs, &e );
532}
533
534static void
535fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
536{
537    tr_peer_event e = TR_PEER_EVENT_INIT;
538    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
539    e.pieceIndex = pieceIndex;
540    publish( msgs, &e );
541}
542
543static void
544fireClientGotBitfield( tr_peermsgs * msgs, tr_bitfield * bitfield )
545{
546    tr_peer_event e = TR_PEER_EVENT_INIT;
547    e.eventType = TR_PEER_CLIENT_GOT_BITFIELD;
548    e.bitfield = bitfield;
549    publish( msgs, &e );
550}
551
552static void
553fireClientGotHave( tr_peermsgs * msgs, tr_piece_index_t index )
554{
555    tr_peer_event e = TR_PEER_EVENT_INIT;
556    e.eventType = TR_PEER_CLIENT_GOT_HAVE;
557    e.pieceIndex = index;
558    publish( msgs, &e );
559}
560
561static void
562firePeerGotData( tr_peermsgs * msgs, uint32_t length, bool wasPieceData )
563{
564    tr_peer_event e = TR_PEER_EVENT_INIT;
565
566    e.length = length;
567    e.eventType = TR_PEER_PEER_GOT_DATA;
568    e.wasPieceData = wasPieceData;
569
570    publish( msgs, &e );
571}
572
573/**
574***  ALLOWED FAST SET
575***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
576**/
577
578#if 0
579size_t
580tr_generateAllowedSet( tr_piece_index_t * setmePieces,
581                       size_t             desiredSetSize,
582                       size_t             pieceCount,
583                       const uint8_t    * infohash,
584                       const tr_address * addr )
585{
586    size_t setSize = 0;
587
588    assert( setmePieces );
589    assert( desiredSetSize <= pieceCount );
590    assert( desiredSetSize );
591    assert( pieceCount );
592    assert( infohash );
593    assert( addr );
594
595    if( addr->type == TR_AF_INET )
596    {
597        uint8_t w[SHA_DIGEST_LENGTH + 4], *walk=w;
598        uint8_t x[SHA_DIGEST_LENGTH];
599
600        uint32_t ui32 = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
601        memcpy( w, &ui32, sizeof( uint32_t ) );
602        walk += sizeof( uint32_t );
603        memcpy( walk, infohash, SHA_DIGEST_LENGTH );                 /* (2) */
604        walk += SHA_DIGEST_LENGTH;
605        tr_sha1( x, w, walk-w, NULL );                               /* (3) */
606        assert( sizeof( w ) == walk-w );
607
608        while( setSize<desiredSetSize )
609        {
610            int i;
611            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
612            {
613                size_t k;
614                uint32_t j = i * 4;                                  /* (5) */
615                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
616                uint32_t index = y % pieceCount;                     /* (7) */
617
618                for( k=0; k<setSize; ++k )                           /* (8) */
619                    if( setmePieces[k] == index )
620                        break;
621
622                if( k == setSize )
623                    setmePieces[setSize++] = index;                  /* (9) */
624            }
625
626            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
627        }
628    }
629
630    return setSize;
631}
632
633static void
634updateFastSet( tr_peermsgs * msgs UNUSED )
635{
636    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
637    const int peerIsNeedy = msgs->peer->progress < 0.10;
638
639    if( fext && peerIsNeedy && !msgs->haveFastSet )
640    {
641        size_t i;
642        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
643        const tr_info * inf = &msgs->torrent->info;
644        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
645
646        /* build the fast set */
647        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
648        msgs->haveFastSet = 1;
649
650        /* send it to the peer */
651        for( i=0; i<msgs->fastsetSize; ++i )
652            protocolSendAllowedFast( msgs, msgs->fastset[i] );
653    }
654}
655#endif
656
657/**
658***  INTEREST
659**/
660
661static void
662sendInterest( tr_peermsgs * msgs, bool clientIsInterested )
663{
664    struct evbuffer * out = msgs->outMessages;
665
666    assert( msgs );
667    assert( tr_isBool( clientIsInterested ) );
668
669    msgs->peer->clientIsInterested = clientIsInterested;
670    dbgmsg( msgs, "Sending %s", clientIsInterested ? "Interested" : "Not Interested" );
671    evbuffer_add_uint32( out, sizeof( uint8_t ) );
672    evbuffer_add_uint8 ( out, clientIsInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
673
674    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
675    dbgOutMessageLen( msgs );
676}
677
678static void
679updateInterest( tr_peermsgs * msgs UNUSED )
680{
681    /* FIXME -- might need to poke the mgr on startup */
682}
683
684void
685tr_peerMsgsSetInterested( tr_peermsgs * msgs, int isInterested )
686{
687    assert( tr_isBool( isInterested ) );
688
689    if( isInterested != msgs->peer->clientIsInterested )
690        sendInterest( msgs, isInterested );
691}
692
693static bool
694popNextMetadataRequest( tr_peermsgs * msgs, int * piece )
695{
696    if( msgs->peerAskedForMetadataCount == 0 )
697        return false;
698
699    *piece = msgs->peerAskedForMetadata[0];
700
701    tr_removeElementFromArray( msgs->peerAskedForMetadata, 0, sizeof( int ),
702                               msgs->peerAskedForMetadataCount-- );
703
704    return true;
705}
706
707static bool
708popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
709{
710    if( msgs->peer->pendingReqsToClient == 0 )
711        return false;
712
713    *setme = msgs->peerAskedFor[0];
714
715    tr_removeElementFromArray( msgs->peerAskedFor, 0, sizeof( struct peer_request ),
716                               msgs->peer->pendingReqsToClient-- );
717
718    return true;
719}
720
721static void
722cancelAllRequestsToClient( tr_peermsgs * msgs )
723{
724    struct peer_request req;
725    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
726
727    while( popNextRequest( msgs, &req ))
728        if( mustSendCancel )
729            protocolSendReject( msgs, &req );
730}
731
732void
733tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
734{
735    const time_t now = tr_time( );
736    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
737
738    assert( msgs );
739    assert( msgs->peer );
740    assert( choke == 0 || choke == 1 );
741
742    if( msgs->peer->chokeChangedAt > fibrillationTime )
743    {
744        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
745    }
746    else if( msgs->peer->peerIsChoked != choke )
747    {
748        msgs->peer->peerIsChoked = choke;
749        if( choke )
750            cancelAllRequestsToClient( msgs );
751        protocolSendChoke( msgs, choke );
752        msgs->peer->chokeChangedAt = now;
753    }
754}
755
756/**
757***
758**/
759
760void
761tr_peerMsgsHave( tr_peermsgs * msgs, uint32_t index )
762{
763    protocolSendHave( msgs, index );
764
765    /* since we have more pieces now, we might not be interested in this peer */
766    updateInterest( msgs );
767}
768
769/**
770***
771**/
772
773static bool
774reqIsValid( const tr_peermsgs * peer,
775            uint32_t            index,
776            uint32_t            offset,
777            uint32_t            length )
778{
779    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
780}
781
782static bool
783requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
784{
785    return reqIsValid( msgs, req->index, req->offset, req->length );
786}
787
788void
789tr_peerMsgsCancel( tr_peermsgs * msgs, tr_block_index_t block )
790{
791    struct peer_request req;
792/*fprintf( stderr, "SENDING CANCEL MESSAGE FOR BLOCK %zu\n\t\tFROM PEER %p ------------------------------------\n", (size_t)block, msgs->peer );*/
793    blockToReq( msgs->torrent, block, &req );
794    protocolSendCancel( msgs, &req );
795}
796
797/**
798***
799**/
800
801static void
802sendLtepHandshake( tr_peermsgs * msgs )
803{
804    tr_benc val, *m;
805    char * buf;
806    int len;
807    bool allow_pex;
808    bool allow_metadata_xfer;
809    struct evbuffer * out = msgs->outMessages;
810    const unsigned char * ipv6 = tr_globalIPv6();
811
812    if( msgs->clientSentLtepHandshake )
813        return;
814
815    dbgmsg( msgs, "sending an ltep handshake" );
816    msgs->clientSentLtepHandshake = 1;
817
818    /* decide if we want to advertise metadata xfer support (BEP 9) */
819    if( tr_torrentIsPrivate( msgs->torrent ) )
820        allow_metadata_xfer = 0;
821    else
822        allow_metadata_xfer = 1;
823
824    /* decide if we want to advertise pex support */
825    if( !tr_torrentAllowsPex( msgs->torrent ) )
826        allow_pex = 0;
827    else if( msgs->peerSentLtepHandshake )
828        allow_pex = msgs->peerSupportsPex ? 1 : 0;
829    else
830        allow_pex = 1;
831
832    tr_bencInitDict( &val, 8 );
833    tr_bencDictAddInt( &val, "e", getSession(msgs)->encryptionMode != TR_CLEAR_PREFERRED );
834    if( ipv6 != NULL )
835        tr_bencDictAddRaw( &val, "ipv6", ipv6, 16 );
836    if( allow_metadata_xfer && tr_torrentHasMetadata( msgs->torrent )
837                            && ( msgs->torrent->infoDictLength > 0 ) )
838        tr_bencDictAddInt( &val, "metadata_size", msgs->torrent->infoDictLength );
839    tr_bencDictAddInt( &val, "p", tr_sessionGetPublicPeerPort( getSession(msgs) ) );
840    tr_bencDictAddInt( &val, "reqq", REQQ );
841    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
842    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
843    m  = tr_bencDictAddDict( &val, "m", 2 );
844    if( allow_metadata_xfer )
845        tr_bencDictAddInt( m, "ut_metadata", UT_METADATA_ID );
846    if( allow_pex )
847        tr_bencDictAddInt( m, "ut_pex", UT_PEX_ID );
848
849    buf = tr_bencToStr( &val, TR_FMT_BENC, &len );
850
851    evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + len );
852    evbuffer_add_uint8 ( out, BT_LTEP );
853    evbuffer_add_uint8 ( out, LTEP_HANDSHAKE );
854    evbuffer_add       ( out, buf, len );
855    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
856    dbgOutMessageLen( msgs );
857
858    /* cleanup */
859    tr_bencFree( &val );
860    tr_free( buf );
861}
862
863static void
864parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
865{
866    int64_t   i;
867    tr_benc   val, * sub;
868    uint8_t * tmp = tr_new( uint8_t, len );
869    const uint8_t *addr;
870    size_t addr_len;
871    tr_pex pex;
872    int8_t seedProbability = -1;
873
874    memset( &pex, 0, sizeof( tr_pex ) );
875
876    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
877    msgs->peerSentLtepHandshake = 1;
878
879    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
880    {
881        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
882        tr_free( tmp );
883        return;
884    }
885
886    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
887
888    /* does the peer prefer encrypted connections? */
889    if( tr_bencDictFindInt( &val, "e", &i ) ) {
890        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
891                                              : ENCRYPTION_PREFERENCE_NO;
892        if( i )
893            pex.flags |= ADDED_F_ENCRYPTION_FLAG;
894    }
895
896    /* check supported messages for utorrent pex */
897    msgs->peerSupportsPex = 0;
898    msgs->peerSupportsMetadataXfer = 0;
899
900    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
901        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
902            msgs->peerSupportsPex = i != 0;
903            msgs->ut_pex_id = (uint8_t) i;
904            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
905        }
906        if( tr_bencDictFindInt( sub, "ut_metadata", &i ) ) {
907            msgs->peerSupportsMetadataXfer = i != 0;
908            msgs->ut_metadata_id = (uint8_t) i;
909            dbgmsg( msgs, "msgs->ut_metadata_id is %d", (int)msgs->ut_metadata_id );
910        }
911        if( tr_bencDictFindInt( sub, "ut_holepunch", &i ) ) {
912            /* Mysterious µTorrent extension that we don't grok.  However,
913               it implies support for µTP, so use it to indicate that. */
914            tr_peerMgrSetUtpFailed( msgs->torrent,
915                                    tr_peerIoGetAddress( msgs->peer->io, NULL ),
916                                    false );
917        }
918    }
919
920    /* look for metainfo size (BEP 9) */
921    if( tr_bencDictFindInt( &val, "metadata_size", &i ) ) {
922        tr_torrentSetMetadataSizeHint( msgs->torrent, i );
923        msgs->metadata_size_hint = (size_t) i;
924    }
925
926    /* look for upload_only (BEP 21) */
927    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
928        seedProbability = i==0 ? 0 : 100;
929
930    /* get peer's listening port */
931    if( tr_bencDictFindInt( &val, "p", &i ) ) {
932        pex.port = htons( (uint16_t)i );
933        fireClientGotPort( msgs, pex.port );
934        dbgmsg( msgs, "peer's port is now %d", (int)i );
935    }
936
937    if( tr_peerIoIsIncoming( msgs->peer->io )
938        && tr_bencDictFindRaw( &val, "ipv4", &addr, &addr_len )
939        && ( addr_len == 4 ) )
940    {
941        pex.addr.type = TR_AF_INET;
942        memcpy( &pex.addr.addr.addr4, addr, 4 );
943        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
944    }
945
946    if( tr_peerIoIsIncoming( msgs->peer->io )
947        && tr_bencDictFindRaw( &val, "ipv6", &addr, &addr_len )
948        && ( addr_len == 16 ) )
949    {
950        pex.addr.type = TR_AF_INET6;
951        memcpy( &pex.addr.addr.addr6, addr, 16 );
952        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
953    }
954
955    /* get peer's maximum request queue size */
956    if( tr_bencDictFindInt( &val, "reqq", &i ) )
957        msgs->reqq = i;
958
959    tr_bencFree( &val );
960    tr_free( tmp );
961}
962
963static void
964parseUtMetadata( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
965{
966    tr_benc dict;
967    char * msg_end;
968    char * benc_end;
969    int64_t msg_type = -1;
970    int64_t piece = -1;
971    int64_t total_size = 0;
972    uint8_t * tmp = tr_new( uint8_t, msglen );
973
974    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
975    msg_end = (char*)tmp + msglen;
976
977    if( !tr_bencLoad( tmp, msglen, &dict, &benc_end ) )
978    {
979        tr_bencDictFindInt( &dict, "msg_type", &msg_type );
980        tr_bencDictFindInt( &dict, "piece", &piece );
981        tr_bencDictFindInt( &dict, "total_size", &total_size );
982        tr_bencFree( &dict );
983    }
984
985    dbgmsg( msgs, "got ut_metadata msg: type %d, piece %d, total_size %d",
986            (int)msg_type, (int)piece, (int)total_size );
987
988    if( msg_type == METADATA_MSG_TYPE_REJECT )
989    {
990        /* NOOP */
991    }
992
993    if( ( msg_type == METADATA_MSG_TYPE_DATA )
994        && ( !tr_torrentHasMetadata( msgs->torrent ) )
995        && ( msg_end - benc_end <= METADATA_PIECE_SIZE )
996        && ( piece * METADATA_PIECE_SIZE + (msg_end - benc_end) <= total_size ) )
997    {
998        const int pieceLen = msg_end - benc_end;
999        tr_torrentSetMetadataPiece( msgs->torrent, piece, benc_end, pieceLen );
1000    }
1001
1002    if( msg_type == METADATA_MSG_TYPE_REQUEST )
1003    {
1004        if( ( piece >= 0 )
1005            && tr_torrentHasMetadata( msgs->torrent )
1006            && !tr_torrentIsPrivate( msgs->torrent )
1007            && ( msgs->peerAskedForMetadataCount < METADATA_REQQ ) )
1008        {
1009            msgs->peerAskedForMetadata[msgs->peerAskedForMetadataCount++] = piece;
1010        }
1011        else
1012        {
1013            tr_benc tmp;
1014            int payloadLen;
1015            char * payload;
1016            struct evbuffer * out = msgs->outMessages;
1017
1018            /* build the rejection message */
1019            tr_bencInitDict( &tmp, 2 );
1020            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1021            tr_bencDictAddInt( &tmp, "piece", piece );
1022            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1023            tr_bencFree( &tmp );
1024
1025            /* write it out as a LTEP message to our outMessages buffer */
1026            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen );
1027            evbuffer_add_uint8 ( out, BT_LTEP );
1028            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1029            evbuffer_add       ( out, payload, payloadLen );
1030            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1031            dbgOutMessageLen( msgs );
1032
1033            tr_free( payload );
1034        }
1035    }
1036
1037    tr_free( tmp );
1038}
1039
1040static void
1041parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1042{
1043    int loaded = 0;
1044    uint8_t * tmp = tr_new( uint8_t, msglen );
1045    tr_benc val;
1046    tr_torrent * tor = msgs->torrent;
1047    const uint8_t * added;
1048    size_t added_len;
1049
1050    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1051
1052    if( tr_torrentAllowsPex( tor )
1053      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1054    {
1055        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1056        {
1057            tr_pex * pex;
1058            size_t i, n;
1059            size_t added_f_len = 0;
1060            const uint8_t * added_f = NULL;
1061
1062            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1063            pex = tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len, &n );
1064
1065            n = MIN( n, MAX_PEX_PEER_COUNT );
1066            for( i=0; i<n; ++i )
1067            {
1068                int seedProbability = -1;
1069                if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
1070                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
1071            }
1072
1073            tr_free( pex );
1074        }
1075
1076        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1077        {
1078            tr_pex * pex;
1079            size_t i, n;
1080            size_t added_f_len = 0;
1081            const uint8_t * added_f = NULL;
1082
1083            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1084            pex = tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len, &n );
1085
1086            n = MIN( n, MAX_PEX_PEER_COUNT );
1087            for( i=0; i<n; ++i )
1088            {
1089                int seedProbability = -1;
1090                if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
1091                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
1092            }
1093
1094            tr_free( pex );
1095        }
1096    }
1097
1098    if( loaded )
1099        tr_bencFree( &val );
1100    tr_free( tmp );
1101}
1102
1103static void sendPex( tr_peermsgs * msgs );
1104
1105static void
1106parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer  * inbuf )
1107{
1108    uint8_t ltep_msgid;
1109
1110    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1111    msglen--;
1112
1113    if( ltep_msgid == LTEP_HANDSHAKE )
1114    {
1115        dbgmsg( msgs, "got ltep handshake" );
1116        parseLtepHandshake( msgs, msglen, inbuf );
1117        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1118        {
1119            sendLtepHandshake( msgs );
1120            sendPex( msgs );
1121        }
1122    }
1123    else if( ltep_msgid == UT_PEX_ID )
1124    {
1125        dbgmsg( msgs, "got ut pex" );
1126        msgs->peerSupportsPex = 1;
1127        parseUtPex( msgs, msglen, inbuf );
1128    }
1129    else if( ltep_msgid == UT_METADATA_ID )
1130    {
1131        dbgmsg( msgs, "got ut metadata" );
1132        msgs->peerSupportsMetadataXfer = 1;
1133        parseUtMetadata( msgs, msglen, inbuf );
1134    }
1135    else
1136    {
1137        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1138        evbuffer_drain( inbuf, msglen );
1139    }
1140}
1141
1142static int
1143readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1144{
1145    uint32_t len;
1146
1147    if( inlen < sizeof( len ) )
1148        return READ_LATER;
1149
1150    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1151
1152    if( len == 0 ) /* peer sent us a keepalive message */
1153        dbgmsg( msgs, "got KeepAlive" );
1154    else
1155    {
1156        msgs->incoming.length = len;
1157        msgs->state = AWAITING_BT_ID;
1158    }
1159
1160    return READ_NOW;
1161}
1162
1163static int readBtMessage( tr_peermsgs *, struct evbuffer *, size_t );
1164
1165static int
1166readBtId( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1167{
1168    uint8_t id;
1169
1170    if( inlen < sizeof( uint8_t ) )
1171        return READ_LATER;
1172
1173    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1174    msgs->incoming.id = id;
1175    dbgmsg( msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length );
1176
1177    if( id == BT_PIECE )
1178    {
1179        msgs->state = AWAITING_BT_PIECE;
1180        return READ_NOW;
1181    }
1182    else if( msgs->incoming.length != 1 )
1183    {
1184        msgs->state = AWAITING_BT_MESSAGE;
1185        return READ_NOW;
1186    }
1187    else return readBtMessage( msgs, inbuf, inlen - 1 );
1188}
1189
1190static void
1191updatePeerProgress( tr_peermsgs * msgs )
1192{
1193    tr_peerUpdateProgress( msgs->torrent, msgs->peer );
1194
1195    /*updateFastSet( msgs );*/
1196    updateInterest( msgs );
1197}
1198
1199static void
1200prefetchPieces( tr_peermsgs *msgs )
1201{
1202    int i;
1203
1204    if( !getSession(msgs)->isPrefetchEnabled )
1205        return;
1206
1207    /* Maintain 12 prefetched blocks per unchoked peer */
1208    for( i=msgs->prefetchCount; i<msgs->peer->pendingReqsToClient && i<12; ++i )
1209    {
1210        const struct peer_request * req = msgs->peerAskedFor + i;
1211        if( requestIsValid( msgs, req ) )
1212        {
1213            tr_cachePrefetchBlock( getSession(msgs)->cache, msgs->torrent, req->index, req->offset, req->length );
1214            ++msgs->prefetchCount;
1215        }
1216    }
1217}
1218
1219static void
1220peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
1221{
1222    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1223    const int reqIsValid = requestIsValid( msgs, req );
1224    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( &msgs->torrent->completion, req->index );
1225    const int peerIsChoked = msgs->peer->peerIsChoked;
1226
1227    int allow = false;
1228
1229    if( !reqIsValid )
1230        dbgmsg( msgs, "rejecting an invalid request." );
1231    else if( !clientHasPiece )
1232        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1233    else if( peerIsChoked )
1234        dbgmsg( msgs, "rejecting request from choked peer" );
1235    else if( msgs->peer->pendingReqsToClient + 1 >= REQQ )
1236        dbgmsg( msgs, "rejecting request ... reqq is full" );
1237    else
1238        allow = true;
1239
1240    if( allow ) {
1241        msgs->peerAskedFor[msgs->peer->pendingReqsToClient++] = *req;
1242        prefetchPieces( msgs );
1243    } else if( fext ) {
1244        protocolSendReject( msgs, req );
1245    }
1246}
1247
1248static bool
1249messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1250{
1251    switch( id )
1252    {
1253        case BT_CHOKE:
1254        case BT_UNCHOKE:
1255        case BT_INTERESTED:
1256        case BT_NOT_INTERESTED:
1257        case BT_FEXT_HAVE_ALL:
1258        case BT_FEXT_HAVE_NONE:
1259            return len == 1;
1260
1261        case BT_HAVE:
1262        case BT_FEXT_SUGGEST:
1263        case BT_FEXT_ALLOWED_FAST:
1264            return len == 5;
1265
1266        case BT_BITFIELD:
1267            if( tr_torrentHasMetadata( msg->torrent ) )
1268                return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1269            /* we don't know the piece count yet,
1270               so we can only guess whether to send true or false */
1271            if( msg->metadata_size_hint > 0 )
1272                return len <= msg->metadata_size_hint;
1273            return true;
1274
1275        case BT_REQUEST:
1276        case BT_CANCEL:
1277        case BT_FEXT_REJECT:
1278            return len == 13;
1279
1280        case BT_PIECE:
1281            return len > 9 && len <= 16393;
1282
1283        case BT_PORT:
1284            return len == 3;
1285
1286        case BT_LTEP:
1287            return len >= 2;
1288
1289        default:
1290            return false;
1291    }
1292}
1293
1294static int clientGotBlock( tr_peermsgs *               msgs,
1295                           struct evbuffer *           block,
1296                           const struct peer_request * req );
1297
1298static int
1299readBtPiece( tr_peermsgs      * msgs,
1300             struct evbuffer  * inbuf,
1301             size_t             inlen,
1302             size_t           * setme_piece_bytes_read )
1303{
1304    struct peer_request * req = &msgs->incoming.blockReq;
1305
1306    assert( evbuffer_get_length( inbuf ) >= inlen );
1307    dbgmsg( msgs, "In readBtPiece" );
1308
1309    if( !req->length )
1310    {
1311        if( inlen < 8 )
1312            return READ_LATER;
1313
1314        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1315        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1316        req->length = msgs->incoming.length - 9;
1317        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1318        return READ_NOW;
1319    }
1320    else
1321    {
1322        int err;
1323
1324        /* read in another chunk of data */
1325        const size_t nLeft = req->length - evbuffer_get_length( msgs->incoming.block );
1326        size_t n = MIN( nLeft, inlen );
1327        size_t i = n;
1328        void * buf = tr_sessionGetBuffer( getSession( msgs ) );
1329        const size_t buflen = SESSION_BUFFER_SIZE;
1330
1331        while( i > 0 )
1332        {
1333            const size_t thisPass = MIN( i, buflen );
1334            tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, thisPass );
1335            evbuffer_add( msgs->incoming.block, buf, thisPass );
1336            i -= thisPass;
1337        }
1338
1339        tr_sessionReleaseBuffer( getSession( msgs ) );
1340        buf = NULL;
1341
1342        fireClientGotData( msgs, n, true );
1343        *setme_piece_bytes_read += n;
1344        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1345               n, req->index, req->offset, req->length,
1346               (int)( req->length - evbuffer_get_length( msgs->incoming.block ) ) );
1347        if( evbuffer_get_length( msgs->incoming.block ) < req->length )
1348            return READ_LATER;
1349
1350        /* we've got the whole block ... process it */
1351        err = clientGotBlock( msgs, msgs->incoming.block, req );
1352
1353        /* cleanup */
1354        evbuffer_free( msgs->incoming.block );
1355        msgs->incoming.block = evbuffer_new( );
1356        req->length = 0;
1357        msgs->state = AWAITING_BT_LENGTH;
1358        return err ? READ_ERR : READ_NOW;
1359    }
1360}
1361
1362static void updateDesiredRequestCount( tr_peermsgs * msgs );
1363
1364static int
1365readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1366{
1367    uint32_t      ui32;
1368    uint32_t      msglen = msgs->incoming.length;
1369    const uint8_t id = msgs->incoming.id;
1370#ifndef NDEBUG
1371    const size_t  startBufLen = evbuffer_get_length( inbuf );
1372#endif
1373    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1374
1375    --msglen; /* id length */
1376
1377    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1378
1379    if( inlen < msglen )
1380        return READ_LATER;
1381
1382    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1383    {
1384        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1385        fireError( msgs, EMSGSIZE );
1386        return READ_ERR;
1387    }
1388
1389    switch( id )
1390    {
1391        case BT_CHOKE:
1392            dbgmsg( msgs, "got Choke" );
1393            msgs->peer->clientIsChoked = 1;
1394            if( !fext )
1395                fireGotChoke( msgs );
1396            break;
1397
1398        case BT_UNCHOKE:
1399            dbgmsg( msgs, "got Unchoke" );
1400            msgs->peer->clientIsChoked = 0;
1401            updateDesiredRequestCount( msgs );
1402            break;
1403
1404        case BT_INTERESTED:
1405            dbgmsg( msgs, "got Interested" );
1406            msgs->peer->peerIsInterested = 1;
1407            break;
1408
1409        case BT_NOT_INTERESTED:
1410            dbgmsg( msgs, "got Not Interested" );
1411            msgs->peer->peerIsInterested = 0;
1412            break;
1413
1414        case BT_HAVE:
1415            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1416            dbgmsg( msgs, "got Have: %u", ui32 );
1417            if( tr_torrentHasMetadata( msgs->torrent )
1418                    && ( ui32 >= msgs->torrent->info.pieceCount ) )
1419            {
1420                fireError( msgs, ERANGE );
1421                return READ_ERR;
1422            }
1423
1424            /* a peer can send the same HAVE message twice... */
1425            if( !tr_bitsetHas( &msgs->peer->have, ui32 ) ) {
1426                tr_bitsetAdd( &msgs->peer->have, ui32 );
1427                fireClientGotHave( msgs, ui32 );
1428            }
1429            updatePeerProgress( msgs );
1430            break;
1431
1432        case BT_BITFIELD: {
1433            tr_bitfield tmp = TR_BITFIELD_INIT;
1434            const size_t bitCount = tr_torrentHasMetadata( msgs->torrent )
1435                                  ? msgs->torrent->info.pieceCount
1436                                  : msglen * 8;
1437            tr_bitfieldConstruct( &tmp, bitCount );
1438            dbgmsg( msgs, "got a bitfield" );
1439            tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp.bits, msglen );
1440            tr_bitsetSetBitfield( &msgs->peer->have, &tmp );
1441            fireClientGotBitfield( msgs, &tmp );
1442            tr_bitfieldDestruct( &tmp );
1443            updatePeerProgress( msgs );
1444            break;
1445        }
1446
1447        case BT_REQUEST:
1448        {
1449            struct peer_request r;
1450            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1451            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1452            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1453            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1454            peerMadeRequest( msgs, &r );
1455            break;
1456        }
1457
1458        case BT_CANCEL:
1459        {
1460            int i;
1461            struct peer_request r;
1462            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1463            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1464            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1465            tr_historyAdd( &msgs->peer->cancelsSentToClient, tr_time( ), 1 );
1466            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1467
1468            for( i=0; i<msgs->peer->pendingReqsToClient; ++i ) {
1469                const struct peer_request * req = msgs->peerAskedFor + i;
1470                if( ( req->index == r.index ) && ( req->offset == r.offset ) && ( req->length == r.length ) )
1471                    break;
1472            }
1473
1474            if( i < msgs->peer->pendingReqsToClient )
1475                tr_removeElementFromArray( msgs->peerAskedFor, i, sizeof( struct peer_request ),
1476                                           msgs->peer->pendingReqsToClient-- );
1477            break;
1478        }
1479
1480        case BT_PIECE:
1481            assert( 0 ); /* handled elsewhere! */
1482            break;
1483
1484        case BT_PORT:
1485            dbgmsg( msgs, "Got a BT_PORT" );
1486            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->dht_port );
1487            if( msgs->peer->dht_port > 0 )
1488                tr_dhtAddNode( getSession(msgs),
1489                               tr_peerAddress( msgs->peer ),
1490                               msgs->peer->dht_port, 0 );
1491            break;
1492
1493        case BT_FEXT_SUGGEST:
1494            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1495            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1496            if( fext )
1497                fireClientGotSuggest( msgs, ui32 );
1498            else {
1499                fireError( msgs, EMSGSIZE );
1500                return READ_ERR;
1501            }
1502            break;
1503
1504        case BT_FEXT_ALLOWED_FAST:
1505            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1506            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1507            if( fext )
1508                fireClientGotAllowedFast( msgs, ui32 );
1509            else {
1510                fireError( msgs, EMSGSIZE );
1511                return READ_ERR;
1512            }
1513            break;
1514
1515        case BT_FEXT_HAVE_ALL:
1516            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1517            if( fext ) {
1518                tr_bitsetSetHaveAll( &msgs->peer->have );
1519                fireClientGotHaveAll( msgs );
1520                updatePeerProgress( msgs );
1521            } else {
1522                fireError( msgs, EMSGSIZE );
1523                return READ_ERR;
1524            }
1525            break;
1526
1527        case BT_FEXT_HAVE_NONE:
1528            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1529            if( fext ) {
1530                tr_bitsetSetHaveNone( &msgs->peer->have );
1531                fireClientGotHaveNone( msgs );
1532                updatePeerProgress( msgs );
1533            } else {
1534                fireError( msgs, EMSGSIZE );
1535                return READ_ERR;
1536            }
1537            break;
1538
1539        case BT_FEXT_REJECT:
1540        {
1541            struct peer_request r;
1542            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1543            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1544            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1545            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1546            if( fext )
1547                fireGotRej( msgs, &r );
1548            else {
1549                fireError( msgs, EMSGSIZE );
1550                return READ_ERR;
1551            }
1552            break;
1553        }
1554
1555        case BT_LTEP:
1556            dbgmsg( msgs, "Got a BT_LTEP" );
1557            parseLtep( msgs, msglen, inbuf );
1558            break;
1559
1560        default:
1561            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1562            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1563            break;
1564    }
1565
1566    assert( msglen + 1 == msgs->incoming.length );
1567    assert( evbuffer_get_length( inbuf ) == startBufLen - msglen );
1568
1569    msgs->state = AWAITING_BT_LENGTH;
1570    return READ_NOW;
1571}
1572
1573static void
1574addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1575{
1576    if( !msgs->peer->blame )
1577         msgs->peer->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1578    tr_bitfieldAdd( msgs->peer->blame, index );
1579}
1580
1581/* returns 0 on success, or an errno on failure */
1582static int
1583clientGotBlock( tr_peermsgs                * msgs,
1584                struct evbuffer            * data,
1585                const struct peer_request  * req )
1586{
1587    int err;
1588    tr_torrent * tor = msgs->torrent;
1589    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1590
1591    assert( msgs );
1592    assert( req );
1593
1594    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1595        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1596                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1597        return EMSGSIZE;
1598    }
1599
1600    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1601
1602    if( !tr_peerMgrDidPeerRequest( msgs->torrent, msgs->peer, block ) ) {
1603        dbgmsg( msgs, "we didn't ask for this message..." );
1604        return 0;
1605    }
1606    if( tr_cpPieceIsComplete( &msgs->torrent->completion, req->index ) ) {
1607        dbgmsg( msgs, "we did ask for this message, but the piece is already complete..." );
1608        return 0;
1609    }
1610
1611    /**
1612    ***  Save the block
1613    **/
1614
1615    if(( err = tr_cacheWriteBlock( getSession(msgs)->cache, tor, req->index, req->offset, req->length, data )))
1616        return err;
1617
1618    addPeerToBlamefield( msgs, req->index );
1619    fireGotBlock( msgs, req );
1620    return 0;
1621}
1622
1623static int peerPulse( void * vmsgs );
1624
1625static void
1626didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1627{
1628    tr_peermsgs * msgs = vmsgs;
1629    firePeerGotData( msgs, bytesWritten, wasPieceData );
1630
1631    if ( tr_isPeerIo( io ) && io->userData )
1632        peerPulse( msgs );
1633}
1634
1635static ReadState
1636canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1637{
1638    ReadState         ret;
1639    tr_peermsgs *     msgs = vmsgs;
1640    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1641    const size_t      inlen = evbuffer_get_length( in );
1642
1643    dbgmsg( msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state );
1644
1645    if( !inlen )
1646    {
1647        ret = READ_LATER;
1648    }
1649    else if( msgs->state == AWAITING_BT_PIECE )
1650    {
1651        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1652    }
1653    else switch( msgs->state )
1654    {
1655        case AWAITING_BT_LENGTH:
1656            ret = readBtLength ( msgs, in, inlen ); break;
1657
1658        case AWAITING_BT_ID:
1659            ret = readBtId     ( msgs, in, inlen ); break;
1660
1661        case AWAITING_BT_MESSAGE:
1662            ret = readBtMessage( msgs, in, inlen ); break;
1663
1664        default:
1665            ret = READ_ERR;
1666            assert( 0 );
1667    }
1668
1669    dbgmsg( msgs, "canRead: ret is %d", (int)ret );
1670
1671    /* log the raw data that was read */
1672    if( ( ret != READ_ERR ) && ( evbuffer_get_length( in ) != inlen ) )
1673        fireClientGotData( msgs, inlen - evbuffer_get_length( in ), false );
1674
1675    return ret;
1676}
1677
1678int
1679tr_peerMsgsIsReadingBlock( const tr_peermsgs * msgs, tr_block_index_t block )
1680{
1681    if( msgs->state != AWAITING_BT_PIECE )
1682        return false;
1683
1684    return block == _tr_block( msgs->torrent,
1685                               msgs->incoming.blockReq.index,
1686                               msgs->incoming.blockReq.offset );
1687}
1688
1689/**
1690***
1691**/
1692
1693static void
1694updateDesiredRequestCount( tr_peermsgs * msgs )
1695{
1696    const tr_torrent * const torrent = msgs->torrent;
1697
1698    if( tr_torrentIsSeed( msgs->torrent ) )
1699    {
1700        msgs->desiredRequestCount = 0;
1701    }
1702    else if( msgs->peer->clientIsChoked )
1703    {
1704        msgs->desiredRequestCount = 0;
1705    }
1706    else if( !msgs->peer->clientIsInterested )
1707    {
1708        msgs->desiredRequestCount = 0;
1709    }
1710    else
1711    {
1712        int estimatedBlocksInPeriod;
1713        int rate_Bps;
1714        int irate_Bps;
1715        const int floor = 4;
1716        const int seconds = REQUEST_BUF_SECS;
1717        const uint64_t now = tr_time_msec( );
1718
1719        /* Get the rate limit we should use.
1720         * FIXME: this needs to consider all the other peers as well... */
1721        rate_Bps = tr_peerGetPieceSpeed_Bps( msgs->peer, now, TR_PEER_TO_CLIENT );
1722        if( tr_torrentUsesSpeedLimit( torrent, TR_PEER_TO_CLIENT ) )
1723            rate_Bps = MIN( rate_Bps, tr_torrentGetSpeedLimit_Bps( torrent, TR_PEER_TO_CLIENT ) );
1724
1725        /* honor the session limits, if enabled */
1726        if( tr_torrentUsesSessionLimits( torrent ) )
1727            if( tr_sessionGetActiveSpeedLimit_Bps( torrent->session, TR_PEER_TO_CLIENT, &irate_Bps ) )
1728                rate_Bps = MIN( rate_Bps, irate_Bps );
1729
1730        /* use this desired rate to figure out how
1731         * many requests we should send to this peer */
1732        estimatedBlocksInPeriod = ( rate_Bps * seconds ) / torrent->blockSize;
1733        msgs->desiredRequestCount = MAX( floor, estimatedBlocksInPeriod );
1734
1735        /* honor the peer's maximum request count, if specified */
1736        if( msgs->reqq > 0 )
1737            if( msgs->desiredRequestCount > msgs->reqq )
1738                msgs->desiredRequestCount = msgs->reqq;
1739    }
1740}
1741
1742static void
1743updateMetadataRequests( tr_peermsgs * msgs, time_t now )
1744{
1745    int piece;
1746
1747    if( msgs->peerSupportsMetadataXfer
1748        && tr_torrentGetNextMetadataRequest( msgs->torrent, now, &piece ) )
1749    {
1750        tr_benc tmp;
1751        int payloadLen;
1752        char * payload;
1753        struct evbuffer * out = msgs->outMessages;
1754
1755        /* build the data message */
1756        tr_bencInitDict( &tmp, 3 );
1757        tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REQUEST );
1758        tr_bencDictAddInt( &tmp, "piece", piece );
1759        payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1760        tr_bencFree( &tmp );
1761
1762        dbgmsg( msgs, "requesting metadata piece #%d", piece );
1763
1764        /* write it out as a LTEP message to our outMessages buffer */
1765        evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen );
1766        evbuffer_add_uint8 ( out, BT_LTEP );
1767        evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1768        evbuffer_add       ( out, payload, payloadLen );
1769        pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1770        dbgOutMessageLen( msgs );
1771
1772        tr_free( payload );
1773    }
1774}
1775
1776static void
1777updateBlockRequests( tr_peermsgs * msgs )
1778{
1779    if( tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT )
1780        && ( msgs->desiredRequestCount > 0 )
1781        && ( msgs->peer->pendingReqsToPeer <= ( msgs->desiredRequestCount * 0.66 ) ) )
1782    {
1783        int i;
1784        int n;
1785        const int numwant = msgs->desiredRequestCount - msgs->peer->pendingReqsToPeer;
1786        tr_block_index_t * blocks = tr_new( tr_block_index_t, numwant );
1787
1788        tr_peerMgrGetNextRequests( msgs->torrent, msgs->peer, numwant, blocks, &n );
1789
1790        for( i=0; i<n; ++i )
1791        {
1792            struct peer_request req;
1793            blockToReq( msgs->torrent, blocks[i], &req );
1794            protocolSendRequest( msgs, &req );
1795        }
1796
1797        tr_free( blocks );
1798    }
1799}
1800
1801static size_t
1802fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1803{
1804    int piece;
1805    size_t bytesWritten = 0;
1806    struct peer_request req;
1807    const bool haveMessages = evbuffer_get_length( msgs->outMessages ) != 0;
1808    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1809
1810    /**
1811    ***  Protocol messages
1812    **/
1813
1814    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1815    {
1816        dbgmsg( msgs, "started an outMessages batch (length is %zu)", evbuffer_get_length( msgs->outMessages ) );
1817        msgs->outMessagesBatchedAt = now;
1818    }
1819    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1820    {
1821        const size_t len = evbuffer_get_length( msgs->outMessages );
1822        /* flush the protocol messages */
1823        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1824        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, false );
1825        msgs->clientSentAnythingAt = now;
1826        msgs->outMessagesBatchedAt = 0;
1827        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1828        bytesWritten +=  len;
1829    }
1830
1831    /**
1832    ***  Metadata Pieces
1833    **/
1834
1835    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= METADATA_PIECE_SIZE )
1836        && popNextMetadataRequest( msgs, &piece ) )
1837    {
1838        char * data;
1839        int dataLen;
1840        bool ok = false;
1841
1842        data = tr_torrentGetMetadataPiece( msgs->torrent, piece, &dataLen );
1843        if( ( dataLen > 0 ) && ( data != NULL ) )
1844        {
1845            tr_benc tmp;
1846            int payloadLen;
1847            char * payload;
1848            struct evbuffer * out = msgs->outMessages;
1849
1850            /* build the data message */
1851            tr_bencInitDict( &tmp, 3 );
1852            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_DATA );
1853            tr_bencDictAddInt( &tmp, "piece", piece );
1854            tr_bencDictAddInt( &tmp, "total_size", msgs->torrent->infoDictLength );
1855            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1856            tr_bencFree( &tmp );
1857
1858            /* write it out as a LTEP message to our outMessages buffer */
1859            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen + dataLen );
1860            evbuffer_add_uint8 ( out, BT_LTEP );
1861            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1862            evbuffer_add       ( out, payload, payloadLen );
1863            evbuffer_add       ( out, data, dataLen );
1864            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1865            dbgOutMessageLen( msgs );
1866
1867            tr_free( payload );
1868            tr_free( data );
1869
1870            ok = true;
1871        }
1872
1873        if( !ok ) /* send a rejection message */
1874        {
1875            tr_benc tmp;
1876            int payloadLen;
1877            char * payload;
1878            struct evbuffer * out = msgs->outMessages;
1879
1880            /* build the rejection message */
1881            tr_bencInitDict( &tmp, 2 );
1882            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1883            tr_bencDictAddInt( &tmp, "piece", piece );
1884            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1885            tr_bencFree( &tmp );
1886
1887            /* write it out as a LTEP message to our outMessages buffer */
1888            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen );
1889            evbuffer_add_uint8 ( out, BT_LTEP );
1890            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1891            evbuffer_add       ( out, payload, payloadLen );
1892            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1893            dbgOutMessageLen( msgs );
1894
1895            tr_free( payload );
1896        }
1897    }
1898
1899    /**
1900    ***  Data Blocks
1901    **/
1902
1903    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= msgs->torrent->blockSize )
1904        && popNextRequest( msgs, &req ) )
1905    {
1906        --msgs->prefetchCount;
1907
1908        if( requestIsValid( msgs, &req )
1909            && tr_cpPieceIsComplete( &msgs->torrent->completion, req.index ) )
1910        {
1911            int err;
1912            const uint32_t msglen = 4 + 1 + 4 + 4 + req.length;
1913            struct evbuffer * out;
1914            struct evbuffer_iovec iovec[1];
1915
1916            out = evbuffer_new( );
1917            evbuffer_expand( out, msglen );
1918
1919            evbuffer_add_uint32( out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1920            evbuffer_add_uint8 ( out, BT_PIECE );
1921            evbuffer_add_uint32( out, req.index );
1922            evbuffer_add_uint32( out, req.offset );
1923
1924            evbuffer_reserve_space( out, req.length, iovec, 1 );
1925            err = tr_cacheReadBlock( getSession(msgs)->cache, msgs->torrent, req.index, req.offset, req.length, iovec[0].iov_base );
1926            iovec[0].iov_len = req.length;
1927            evbuffer_commit_space( out, iovec, 1 );
1928
1929            /* check the piece if it needs checking... */
1930            if( !err && tr_torrentPieceNeedsCheck( msgs->torrent, req.index ) )
1931                if(( err = !tr_torrentCheckPiece( msgs->torrent, req.index )))
1932                    tr_torrentSetLocalError( msgs->torrent, _( "Please Verify Local Data! Piece #%zu is corrupt." ), (size_t)req.index );
1933
1934            if( err )
1935            {
1936                if( fext )
1937                    protocolSendReject( msgs, &req );
1938            }
1939            else
1940            {
1941                const size_t n = evbuffer_get_length( out );
1942                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1943                assert( n == msglen );
1944                tr_peerIoWriteBuf( msgs->peer->io, out, true );
1945                bytesWritten += n;
1946                msgs->clientSentAnythingAt = now;
1947                tr_historyAdd( &msgs->peer->blocksSentToPeer, tr_time( ), 1 );
1948            }
1949
1950            evbuffer_free( out );
1951
1952            if( err )
1953            {
1954                bytesWritten = 0;
1955                msgs = NULL;
1956            }
1957        }
1958        else if( fext ) /* peer needs a reject message */
1959        {
1960            protocolSendReject( msgs, &req );
1961        }
1962
1963        if( msgs != NULL )
1964            prefetchPieces( msgs );
1965    }
1966
1967    /**
1968    ***  Keepalive
1969    **/
1970
1971    if( ( msgs != NULL )
1972        && ( msgs->clientSentAnythingAt != 0 )
1973        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1974    {
1975        dbgmsg( msgs, "sending a keepalive message" );
1976        evbuffer_add_uint32( msgs->outMessages, 0 );
1977        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1978    }
1979
1980    return bytesWritten;
1981}
1982
1983static int
1984peerPulse( void * vmsgs )
1985{
1986    tr_peermsgs * msgs = vmsgs;
1987    const time_t  now = tr_time( );
1988
1989    if ( tr_isPeerIo( msgs->peer->io ) ) {
1990        updateDesiredRequestCount( msgs );
1991        updateBlockRequests( msgs );
1992        updateMetadataRequests( msgs, now );
1993    }
1994
1995    for( ;; )
1996        if( fillOutputBuffer( msgs, now ) < 1 )
1997            break;
1998
1999    return true; /* loop forever */
2000}
2001
2002void
2003tr_peerMsgsPulse( tr_peermsgs * msgs )
2004{
2005    if( msgs != NULL )
2006        peerPulse( msgs );
2007}
2008
2009static void
2010gotError( tr_peerIo * io UNUSED, short what, void * vmsgs )
2011{
2012    if( what & BEV_EVENT_TIMEOUT )
2013        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
2014    if( what & ( BEV_EVENT_EOF | BEV_EVENT_ERROR ) )
2015        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
2016               what, errno, tr_strerror( errno ) );
2017    fireError( vmsgs, ENOTCONN );
2018}
2019
2020static void
2021sendBitfield( tr_peermsgs * msgs )
2022{
2023    struct evbuffer * out = msgs->outMessages;
2024    tr_bitfield * bf = tr_cpCreatePieceBitfield( &msgs->torrent->completion );
2025
2026    evbuffer_add_uint32( out, sizeof( uint8_t ) + bf->byteCount );
2027    evbuffer_add_uint8 ( out, BT_BITFIELD );
2028    evbuffer_add       ( out, bf->bits, bf->byteCount );
2029    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu", evbuffer_get_length( out ) );
2030    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
2031
2032    tr_bitfieldFree( bf );
2033}
2034
2035static void
2036tellPeerWhatWeHave( tr_peermsgs * msgs )
2037{
2038    const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
2039
2040    if( fext && ( tr_cpBlockBitset( &msgs->torrent->completion )->haveAll ) )
2041    {
2042        protocolSendHaveAll( msgs );
2043    }
2044    else if( fext && ( tr_cpBlockBitset( &msgs->torrent->completion )->haveNone ) )
2045    {
2046        protocolSendHaveNone( msgs );
2047    }
2048    else
2049    {
2050        sendBitfield( msgs );
2051    }
2052}
2053
2054/**
2055***
2056**/
2057
2058/* some peers give us error messages if we send
2059   more than this many peers in a single pex message
2060   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
2061#define MAX_PEX_ADDED 50
2062#define MAX_PEX_DROPPED 50
2063
2064typedef struct
2065{
2066    tr_pex *  added;
2067    tr_pex *  dropped;
2068    tr_pex *  elements;
2069    int       addedCount;
2070    int       droppedCount;
2071    int       elementCount;
2072}
2073PexDiffs;
2074
2075static void
2076pexAddedCb( void * vpex, void * userData )
2077{
2078    PexDiffs * diffs = userData;
2079    tr_pex *   pex = vpex;
2080
2081    if( diffs->addedCount < MAX_PEX_ADDED )
2082    {
2083        diffs->added[diffs->addedCount++] = *pex;
2084        diffs->elements[diffs->elementCount++] = *pex;
2085    }
2086}
2087
2088static inline void
2089pexDroppedCb( void * vpex, void * userData )
2090{
2091    PexDiffs * diffs = userData;
2092    tr_pex *   pex = vpex;
2093
2094    if( diffs->droppedCount < MAX_PEX_DROPPED )
2095    {
2096        diffs->dropped[diffs->droppedCount++] = *pex;
2097    }
2098}
2099
2100static inline void
2101pexElementCb( void * vpex, void * userData )
2102{
2103    PexDiffs * diffs = userData;
2104    tr_pex * pex = vpex;
2105
2106    diffs->elements[diffs->elementCount++] = *pex;
2107}
2108
2109typedef void ( tr_set_func )( void * element, void * userData );
2110
2111/**
2112 * @brief find the differences and commonalities in two sorted sets
2113 * @param a the first set
2114 * @param aCount the number of elements in the set 'a'
2115 * @param b the second set
2116 * @param bCount the number of elements in the set 'b'
2117 * @param compare the sorting method for both sets
2118 * @param elementSize the sizeof the element in the two sorted sets
2119 * @param in_a called for items in set 'a' but not set 'b'
2120 * @param in_b called for items in set 'b' but not set 'a'
2121 * @param in_both called for items that are in both sets
2122 * @param userData user data passed along to in_a, in_b, and in_both
2123 */
2124static void
2125tr_set_compare( const void * va, size_t aCount,
2126                const void * vb, size_t bCount,
2127                int compare( const void * a, const void * b ),
2128                size_t elementSize,
2129                tr_set_func in_a_cb,
2130                tr_set_func in_b_cb,
2131                tr_set_func in_both_cb,
2132                void * userData )
2133{
2134    const uint8_t * a = va;
2135    const uint8_t * b = vb;
2136    const uint8_t * aend = a + elementSize * aCount;
2137    const uint8_t * bend = b + elementSize * bCount;
2138
2139    while( a != aend || b != bend )
2140    {
2141        if( a == aend )
2142        {
2143            ( *in_b_cb )( (void*)b, userData );
2144            b += elementSize;
2145        }
2146        else if( b == bend )
2147        {
2148            ( *in_a_cb )( (void*)a, userData );
2149            a += elementSize;
2150        }
2151        else
2152        {
2153            const int val = ( *compare )( a, b );
2154
2155            if( !val )
2156            {
2157                ( *in_both_cb )( (void*)a, userData );
2158                a += elementSize;
2159                b += elementSize;
2160            }
2161            else if( val < 0 )
2162            {
2163                ( *in_a_cb )( (void*)a, userData );
2164                a += elementSize;
2165            }
2166            else if( val > 0 )
2167            {
2168                ( *in_b_cb )( (void*)b, userData );
2169                b += elementSize;
2170            }
2171        }
2172    }
2173}
2174
2175
2176static void
2177sendPex( tr_peermsgs * msgs )
2178{
2179    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2180    {
2181        PexDiffs diffs;
2182        PexDiffs diffs6;
2183        tr_pex * newPex = NULL;
2184        tr_pex * newPex6 = NULL;
2185        const int newCount = tr_peerMgrGetPeers( msgs->torrent, &newPex, TR_AF_INET, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2186        const int newCount6 = tr_peerMgrGetPeers( msgs->torrent, &newPex6, TR_AF_INET6, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2187
2188        /* build the diffs */
2189        diffs.added = tr_new( tr_pex, newCount );
2190        diffs.addedCount = 0;
2191        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2192        diffs.droppedCount = 0;
2193        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2194        diffs.elementCount = 0;
2195        tr_set_compare( msgs->pex, msgs->pexCount,
2196                        newPex, newCount,
2197                        tr_pexCompare, sizeof( tr_pex ),
2198                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2199        diffs6.added = tr_new( tr_pex, newCount6 );
2200        diffs6.addedCount = 0;
2201        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
2202        diffs6.droppedCount = 0;
2203        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
2204        diffs6.elementCount = 0;
2205        tr_set_compare( msgs->pex6, msgs->pexCount6,
2206                        newPex6, newCount6,
2207                        tr_pexCompare, sizeof( tr_pex ),
2208                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
2209        dbgmsg(
2210            msgs,
2211            "pex: old peer count %d+%d, new peer count %d+%d, "
2212            "added %d+%d, removed %d+%d",
2213            msgs->pexCount, msgs->pexCount6, newCount, newCount6,
2214            diffs.addedCount, diffs6.addedCount,
2215            diffs.droppedCount, diffs6.droppedCount );
2216
2217        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2218            !diffs6.droppedCount )
2219        {
2220            tr_free( diffs.elements );
2221            tr_free( diffs6.elements );
2222        }
2223        else
2224        {
2225            int  i;
2226            tr_benc val;
2227            char * benc;
2228            int bencLen;
2229            uint8_t * tmp, *walk;
2230            struct evbuffer * out = msgs->outMessages;
2231
2232            /* update peer */
2233            tr_free( msgs->pex );
2234            msgs->pex = diffs.elements;
2235            msgs->pexCount = diffs.elementCount;
2236            tr_free( msgs->pex6 );
2237            msgs->pex6 = diffs6.elements;
2238            msgs->pexCount6 = diffs6.elementCount;
2239
2240            /* build the pex payload */
2241            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
2242                                         * speed vs. likelihood? */
2243
2244            if( diffs.addedCount > 0)
2245            {
2246                /* "added" */
2247                tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2248                for( i = 0; i < diffs.addedCount; ++i ) {
2249                    memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2250                    memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2251                }
2252                assert( ( walk - tmp ) == diffs.addedCount * 6 );
2253                tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2254                tr_free( tmp );
2255
2256                /* "added.f"
2257                 * unset each holepunch flag because we don't support it. */
2258                tmp = walk = tr_new( uint8_t, diffs.addedCount );
2259                for( i = 0; i < diffs.addedCount; ++i )
2260                    *walk++ = diffs.added[i].flags & ~ADDED_F_HOLEPUNCH;
2261                assert( ( walk - tmp ) == diffs.addedCount );
2262                tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2263                tr_free( tmp );
2264            }
2265
2266            if( diffs.droppedCount > 0 )
2267            {
2268                /* "dropped" */
2269                tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2270                for( i = 0; i < diffs.droppedCount; ++i ) {
2271                    memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2272                    memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2273                }
2274                assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2275                tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2276                tr_free( tmp );
2277            }
2278
2279            if( diffs6.addedCount > 0 )
2280            {
2281                /* "added6" */
2282                tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2283                for( i = 0; i < diffs6.addedCount; ++i ) {
2284                    memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2285                    walk += 16;
2286                    memcpy( walk, &diffs6.added[i].port, 2 );
2287                    walk += 2;
2288                }
2289                assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2290                tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2291                tr_free( tmp );
2292
2293                /* "added6.f"
2294                 * unset each holepunch flag because we don't support it. */
2295                tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2296                for( i = 0; i < diffs6.addedCount; ++i )
2297                    *walk++ = diffs6.added[i].flags & ~ADDED_F_HOLEPUNCH;
2298                assert( ( walk - tmp ) == diffs6.addedCount );
2299                tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2300                tr_free( tmp );
2301            }
2302
2303            if( diffs6.droppedCount > 0 )
2304            {
2305                /* "dropped6" */
2306                tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2307                for( i = 0; i < diffs6.droppedCount; ++i ) {
2308                    memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2309                    walk += 16;
2310                    memcpy( walk, &diffs6.dropped[i].port, 2 );
2311                    walk += 2;
2312                }
2313                assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2314                tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2315                tr_free( tmp );
2316            }
2317
2318            /* write the pex message */
2319            benc = tr_bencToStr( &val, TR_FMT_BENC, &bencLen );
2320            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + bencLen );
2321            evbuffer_add_uint8 ( out, BT_LTEP );
2322            evbuffer_add_uint8 ( out, msgs->ut_pex_id );
2323            evbuffer_add       ( out, benc, bencLen );
2324            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2325            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", evbuffer_get_length( out ) );
2326            dbgOutMessageLen( msgs );
2327
2328            tr_free( benc );
2329            tr_bencFree( &val );
2330        }
2331
2332        /* cleanup */
2333        tr_free( diffs.added );
2334        tr_free( diffs.dropped );
2335        tr_free( newPex );
2336        tr_free( diffs6.added );
2337        tr_free( diffs6.dropped );
2338        tr_free( newPex6 );
2339
2340        /*msgs->clientSentPexAt = tr_time( );*/
2341    }
2342}
2343
2344static void
2345pexPulse( int foo UNUSED, short bar UNUSED, void * vmsgs )
2346{
2347    struct tr_peermsgs * msgs = vmsgs;
2348
2349    sendPex( msgs );
2350
2351    tr_timerAdd( msgs->pexTimer, PEX_INTERVAL_SECS, 0 );
2352}
2353
2354/**
2355***
2356**/
2357
2358tr_peermsgs*
2359tr_peerMsgsNew( struct tr_torrent    * torrent,
2360                struct tr_peer       * peer,
2361                tr_peer_callback     * callback,
2362                void                 * callbackData )
2363{
2364    tr_peermsgs * m;
2365
2366    assert( peer );
2367    assert( peer->io );
2368
2369    m = tr_new0( tr_peermsgs, 1 );
2370    m->callback = callback;
2371    m->callbackData = callbackData;
2372    m->peer = peer;
2373    m->torrent = torrent;
2374    m->peer->clientIsChoked = 1;
2375    m->peer->peerIsChoked = 1;
2376    m->peer->clientIsInterested = 0;
2377    m->peer->peerIsInterested = 0;
2378    m->state = AWAITING_BT_LENGTH;
2379    m->outMessages = evbuffer_new( );
2380    m->outMessagesBatchedAt = 0;
2381    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2382    m->incoming.block = evbuffer_new( );
2383    m->pexTimer = evtimer_new( torrent->session->event_base, pexPulse, m );
2384    peer->msgs = m;
2385    tr_timerAdd( m->pexTimer, PEX_INTERVAL_SECS, 0 );
2386
2387    if( tr_peerIoSupportsUTP( peer->io ) ) {
2388        const tr_address * addr = tr_peerIoGetAddress( peer->io, NULL );
2389        tr_peerMgrSetUtpSupported( torrent, addr );
2390        tr_peerMgrSetUtpFailed( torrent, addr, false );
2391    }
2392
2393    if( tr_peerIoSupportsLTEP( peer->io ) )
2394        sendLtepHandshake( m );
2395
2396    tellPeerWhatWeHave( m );
2397
2398    if( tr_dhtEnabled( torrent->session ) && tr_peerIoSupportsDHT( peer->io ))
2399    {
2400        /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
2401        const struct tr_address *addr = tr_peerIoGetAddress( peer->io, NULL );
2402        if( addr->type == TR_AF_INET || tr_globalIPv6() ) {
2403            protocolSendPort( m, tr_dhtPort( torrent->session ) );
2404        }
2405    }
2406
2407    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2408    updateDesiredRequestCount( m );
2409
2410    return m;
2411}
2412
2413void
2414tr_peerMsgsFree( tr_peermsgs* msgs )
2415{
2416    if( msgs )
2417    {
2418        event_free( msgs->pexTimer );
2419
2420        evbuffer_free( msgs->incoming.block );
2421        evbuffer_free( msgs->outMessages );
2422        tr_free( msgs->pex6 );
2423        tr_free( msgs->pex );
2424
2425        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2426        tr_free( msgs );
2427    }
2428}
Note: See TracBrowser for help on using the repository browser.