source: trunk/libtransmission/peer-msgs.c @ 9465

Last change on this file since 9465 was 9465, checked in by charles, 12 years ago

(trunk libT) #2548: T's request queue can send out too many duplicate requests

  • Property svn:keywords set to Date Rev Author Id
File size: 64.8 KB
Line 
1/*
2 * This file Copyright (C) 2007-2009 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 9465 2009-11-01 02:10:47Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <event.h>
21
22#include "transmission.h"
23#include "bencode.h"
24#include "completion.h"
25#include "crypto.h"
26#include "inout.h"
27#ifdef WIN32
28#include "net.h" /* for ECONN */
29#endif
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-msgs.h"
33#include "platform.h" /* MAX_STACK_ARRAY_SIZE */
34#include "ratecontrol.h"
35#include "request-list.h"
36#include "session.h"
37#include "stats.h"
38#include "torrent.h"
39#include "tr-dht.h"
40#include "utils.h"
41#include "version.h"
42
43/**
44***
45**/
46
47enum
48{
49    BT_CHOKE                = 0,
50    BT_UNCHOKE              = 1,
51    BT_INTERESTED           = 2,
52    BT_NOT_INTERESTED       = 3,
53    BT_HAVE                 = 4,
54    BT_BITFIELD             = 5,
55    BT_REQUEST              = 6,
56    BT_PIECE                = 7,
57    BT_CANCEL               = 8,
58    BT_PORT                 = 9,
59
60    BT_FEXT_SUGGEST         = 13,
61    BT_FEXT_HAVE_ALL        = 14,
62    BT_FEXT_HAVE_NONE       = 15,
63    BT_FEXT_REJECT          = 16,
64    BT_FEXT_ALLOWED_FAST    = 17,
65
66    BT_LTEP                 = 20,
67
68    LTEP_HANDSHAKE          = 0,
69
70    TR_LTEP_PEX             = 1,
71
72    MAX_PEX_PEER_COUNT      = 50,
73
74    MIN_CHOKE_PERIOD_SEC    = ( 10 ),
75
76    /* idle seconds before we send a keepalive */
77    KEEPALIVE_INTERVAL_SECS = 100,
78
79    PEX_INTERVAL_SECS       = 90, /* sec between sendPex() calls */
80
81
82    MAX_BLOCK_SIZE          = ( 1024 * 16 ),
83
84
85    /* how long an unsent request can stay queued before it's returned
86       back to the peer-mgr's pool of requests */
87    QUEUED_REQUEST_TTL_SECS = 20,
88
89    /* how long a sent request can stay queued before it's returned
90       back to the peer-mgr's pool of requests */
91    SENT_REQUEST_TTL_SECS = 240,
92
93    /* used in lowering the outMessages queue period */
94    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
95    HIGH_PRIORITY_INTERVAL_SECS = 2,
96    LOW_PRIORITY_INTERVAL_SECS = 20,
97
98    /* number of pieces to remove from the bitfield when
99     * lazy bitfields are turned on */
100    LAZY_PIECE_COUNT = 26,
101
102    /* number of pieces we'll allow in our fast set */
103    MAX_FAST_SET_SIZE = 3
104};
105
106enum
107{
108    AWAITING_BT_LENGTH,
109    AWAITING_BT_ID,
110    AWAITING_BT_MESSAGE,
111    AWAITING_BT_PIECE
112};
113
114/**
115***
116**/
117
118/* this is raw, unchanged data from the peer regarding
119 * the current message that it's sending us. */
120struct tr_incoming
121{
122    uint8_t                id;
123    uint32_t               length; /* includes the +1 for id length */
124    struct peer_request    blockReq; /* metadata for incoming blocks */
125    struct evbuffer *      block; /* piece data for incoming blocks */
126};
127
128/**
129 * Low-level communication state information about a connected peer.
130 *
131 * This structure remembers the low-level protocol states that we're
132 * in with this peer, such as active requests, pex messages, and so on.
133 * Its fields are all private to peer-msgs.c.
134 *
135 * Data not directly involved with sending & receiving messages is
136 * stored in tr_peer, where it can be accessed by both peermsgs and
137 * the peer manager.
138 *
139 * @see struct peer_atom
140 * @see tr_peer
141 */
142struct tr_peermsgs
143{
144    tr_bool         peerSupportsPex;
145    tr_bool         clientSentLtepHandshake;
146    tr_bool         peerSentLtepHandshake;
147    /*tr_bool         haveFastSet;*/
148
149    /* how long the outMessages batch should be allowed to grow before
150     * it's flushed -- some messages (like requests >:) should be sent
151     * very quickly; others aren't as urgent. */
152    int8_t          outMessagesBatchPeriod;
153
154    uint8_t         state;
155    uint8_t         ut_pex_id;
156    uint16_t        pexCount;
157    uint16_t        pexCount6;
158    uint16_t        maxActiveRequests;
159
160#if 0
161    size_t                 fastsetSize;
162    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
163#endif
164
165    tr_peer *              peer;
166
167    tr_torrent *           torrent;
168
169    tr_publisher           publisher;
170
171    struct evbuffer *      outMessages; /* all the non-piece messages */
172
173    struct request_list    peerAskedFor;
174    struct request_list    clientAskedFor;
175    struct request_list    clientWillAskFor;
176
177    tr_pex               * pex;
178    tr_pex               * pex6;
179
180    /*time_t                 clientSentPexAt;*/
181    time_t                 clientSentAnythingAt;
182
183    /* when we started batching the outMessages */
184    time_t                outMessagesBatchedAt;
185
186    struct tr_incoming    incoming;
187
188    /* if the peer supports the Extension Protocol in BEP 10 and
189       supplied a reqq argument, it's stored here.  otherwise the
190       value is zero and should be ignored. */
191    int64_t               reqq;
192
193    struct event          pexTimer;
194};
195
196static TR_INLINE tr_session*
197getSession( struct tr_peermsgs * msgs )
198{
199    return msgs->torrent->session;
200}
201
202/**
203***
204**/
205
206static void
207myDebug( const char * file, int line,
208         const struct tr_peermsgs * msgs,
209         const char * fmt, ... )
210{
211    FILE * fp = tr_getLog( );
212
213    if( fp )
214    {
215        va_list           args;
216        char              timestr[64];
217        struct evbuffer * buf = evbuffer_new( );
218        char *            base = tr_basename( file );
219
220        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
221                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
222                             tr_torrentName( msgs->torrent ),
223                             tr_peerIoGetAddrStr( msgs->peer->io ),
224                             msgs->peer->client );
225        va_start( args, fmt );
226        evbuffer_add_vprintf( buf, fmt, args );
227        va_end( args );
228        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
229        /* FIXME(libevent2) tr_getLog() should return an fd, then use evbuffer_write() here */
230        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
231
232        tr_free( base );
233        evbuffer_free( buf );
234    }
235}
236
237#define dbgmsg( msgs, ... ) \
238    do { \
239        if( tr_deepLoggingIsActive( ) ) \
240            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
241    } while( 0 )
242
243/**
244***
245**/
246
247static void
248pokeBatchPeriod( tr_peermsgs * msgs,
249                 int           interval )
250{
251    if( msgs->outMessagesBatchPeriod > interval )
252    {
253        msgs->outMessagesBatchPeriod = interval;
254        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
255    }
256}
257
258static TR_INLINE void
259dbgOutMessageLen( tr_peermsgs * msgs )
260{
261    dbgmsg( msgs, "outMessage size is now %zu", EVBUFFER_LENGTH( msgs->outMessages ) );
262}
263
264static void
265protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
266{
267    tr_peerIo       * io  = msgs->peer->io;
268    struct evbuffer * out = msgs->outMessages;
269
270    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
271
272    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
273    tr_peerIoWriteUint8 ( io, out, BT_FEXT_REJECT );
274    tr_peerIoWriteUint32( io, out, req->index );
275    tr_peerIoWriteUint32( io, out, req->offset );
276    tr_peerIoWriteUint32( io, out, req->length );
277
278    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
279    dbgOutMessageLen( msgs );
280}
281
282static void
283protocolSendRequest( tr_peermsgs               * msgs,
284                     const struct peer_request * req )
285{
286    tr_peerIo       * io  = msgs->peer->io;
287    struct evbuffer * out = msgs->outMessages;
288
289    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
290    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
291    tr_peerIoWriteUint32( io, out, req->index );
292    tr_peerIoWriteUint32( io, out, req->offset );
293    tr_peerIoWriteUint32( io, out, req->length );
294
295    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
296    dbgOutMessageLen( msgs );
297    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
298}
299
300static void
301protocolSendCancel( tr_peermsgs               * msgs,
302                    const struct peer_request * req )
303{
304    tr_peerIo       * io  = msgs->peer->io;
305    struct evbuffer * out = msgs->outMessages;
306
307    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
308    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
309    tr_peerIoWriteUint32( io, out, req->index );
310    tr_peerIoWriteUint32( io, out, req->offset );
311    tr_peerIoWriteUint32( io, out, req->length );
312
313    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
314    dbgOutMessageLen( msgs );
315    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
316}
317
318static void
319protocolSendPort(tr_peermsgs *msgs, uint16_t port)
320{
321    tr_peerIo       * io  = msgs->peer->io;
322    struct evbuffer * out = msgs->outMessages;
323
324    dbgmsg( msgs, "sending Port %u", port);
325    tr_peerIoWriteUint32( io, out, 3 );
326    tr_peerIoWriteUint8 ( io, out, BT_PORT );
327    tr_peerIoWriteUint16( io, out, port);
328}
329
330static void
331protocolSendHave( tr_peermsgs * msgs,
332                  uint32_t      index )
333{
334    tr_peerIo       * io  = msgs->peer->io;
335    struct evbuffer * out = msgs->outMessages;
336
337    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
338    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
339    tr_peerIoWriteUint32( io, out, index );
340
341    dbgmsg( msgs, "sending Have %u", index );
342    dbgOutMessageLen( msgs );
343    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
344}
345
346#if 0
347static void
348protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
349{
350    tr_peerIo       * io  = msgs->peer->io;
351    struct evbuffer * out = msgs->outMessages;
352
353    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
354
355    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
356    tr_peerIoWriteUint8 ( io, out, BT_FEXT_ALLOWED_FAST );
357    tr_peerIoWriteUint32( io, out, pieceIndex );
358
359    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
360    dbgOutMessageLen( msgs );
361}
362#endif
363
364static void
365protocolSendChoke( tr_peermsgs * msgs,
366                   int           choke )
367{
368    tr_peerIo       * io  = msgs->peer->io;
369    struct evbuffer * out = msgs->outMessages;
370
371    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
372    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
373
374    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
375    dbgOutMessageLen( msgs );
376    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
377}
378
379static void
380protocolSendHaveAll( tr_peermsgs * msgs )
381{
382    tr_peerIo       * io  = msgs->peer->io;
383    struct evbuffer * out = msgs->outMessages;
384
385    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
386
387    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
388    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_ALL );
389
390    dbgmsg( msgs, "sending HAVE_ALL..." );
391    dbgOutMessageLen( msgs );
392    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
393}
394
395static void
396protocolSendHaveNone( tr_peermsgs * msgs )
397{
398    tr_peerIo       * io  = msgs->peer->io;
399    struct evbuffer * out = msgs->outMessages;
400
401    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
402
403    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
404    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_NONE );
405
406    dbgmsg( msgs, "sending HAVE_NONE..." );
407    dbgOutMessageLen( msgs );
408    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
409}
410
411/**
412***  EVENTS
413**/
414
415static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0, 0, 0 };
416
417static void
418publish( tr_peermsgs * msgs, tr_peer_event * e )
419{
420    assert( msgs->peer );
421    assert( msgs->peer->msgs == msgs );
422
423    tr_publisherPublish( &msgs->publisher, msgs->peer, e );
424}
425
426static void
427fireError( tr_peermsgs * msgs, int err )
428{
429    tr_peer_event e = blankEvent;
430    e.eventType = TR_PEER_ERROR;
431    e.err = err;
432    publish( msgs, &e );
433}
434
435static void
436fireUploadOnly( tr_peermsgs * msgs, tr_bool uploadOnly )
437{
438    tr_peer_event e = blankEvent;
439    e.eventType = TR_PEER_UPLOAD_ONLY;
440    e.uploadOnly = uploadOnly;
441    publish( msgs, &e );
442}
443
444static void
445fireNeedReq( tr_peermsgs * msgs )
446{
447    tr_peer_event e = blankEvent;
448    e.eventType = TR_PEER_NEED_REQ;
449    publish( msgs, &e );
450}
451
452static void
453firePeerProgress( tr_peermsgs * msgs )
454{
455    tr_peer_event e = blankEvent;
456    e.eventType = TR_PEER_PEER_PROGRESS;
457    e.progress = msgs->peer->progress;
458    publish( msgs, &e );
459}
460
461static double blocksGotten = 0.0;
462
463static void
464fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
465{
466    tr_peer_event e = blankEvent;
467++blocksGotten;
468    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
469    e.pieceIndex = req->index;
470    e.offset = req->offset;
471    e.length = req->length;
472    publish( msgs, &e );
473}
474
475static void
476fireClientGotData( tr_peermsgs * msgs,
477                   uint32_t      length,
478                   int           wasPieceData )
479{
480    tr_peer_event e = blankEvent;
481
482    e.length = length;
483    e.eventType = TR_PEER_CLIENT_GOT_DATA;
484    e.wasPieceData = wasPieceData;
485    publish( msgs, &e );
486}
487
488static void
489fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
490{
491    tr_peer_event e = blankEvent;
492    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
493    e.pieceIndex = pieceIndex;
494    publish( msgs, &e );
495}
496
497static void
498fireClientGotPort( tr_peermsgs * msgs, tr_port port )
499{
500    tr_peer_event e = blankEvent;
501    e.eventType = TR_PEER_CLIENT_GOT_PORT;
502    e.port = port;
503    publish( msgs, &e );
504}
505
506static void
507fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
508{
509    tr_peer_event e = blankEvent;
510    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
511    e.pieceIndex = pieceIndex;
512    publish( msgs, &e );
513}
514
515static void
516firePeerGotData( tr_peermsgs  * msgs,
517                 uint32_t       length,
518                 int            wasPieceData )
519{
520    tr_peer_event e = blankEvent;
521
522    e.length = length;
523    e.eventType = TR_PEER_PEER_GOT_DATA;
524    e.wasPieceData = wasPieceData;
525
526    publish( msgs, &e );
527}
528
529static void
530fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
531{
532    tr_peer_event e = blankEvent;
533    e.eventType = TR_PEER_CANCEL;
534    e.pieceIndex = req->index;
535    e.offset = req->offset;
536    e.length = req->length;
537    publish( msgs, &e );
538}
539
540/**
541***  ALLOWED FAST SET
542***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
543**/
544
545size_t
546tr_generateAllowedSet( tr_piece_index_t * setmePieces,
547                       size_t             desiredSetSize,
548                       size_t             pieceCount,
549                       const uint8_t    * infohash,
550                       const tr_address * addr )
551{
552    size_t setSize = 0;
553
554    assert( setmePieces );
555    assert( desiredSetSize <= pieceCount );
556    assert( desiredSetSize );
557    assert( pieceCount );
558    assert( infohash );
559    assert( addr );
560
561    if( addr->type == TR_AF_INET )
562    {
563        uint8_t w[SHA_DIGEST_LENGTH + 4], *walk=w;
564        uint8_t x[SHA_DIGEST_LENGTH];
565
566        uint32_t ui32 = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
567        memcpy( w, &ui32, sizeof( uint32_t ) );
568        walk += sizeof( uint32_t );
569        memcpy( walk, infohash, SHA_DIGEST_LENGTH );                 /* (2) */
570        walk += SHA_DIGEST_LENGTH;
571        tr_sha1( x, w, walk-w, NULL );                               /* (3) */
572        assert( sizeof( w ) == walk-w );
573
574        while( setSize<desiredSetSize )
575        {
576            int i;
577            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
578            {
579                size_t k;
580                uint32_t j = i * 4;                                  /* (5) */
581                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
582                uint32_t index = y % pieceCount;                     /* (7) */
583
584                for( k=0; k<setSize; ++k )                           /* (8) */
585                    if( setmePieces[k] == index )
586                        break;
587
588                if( k == setSize )
589                    setmePieces[setSize++] = index;                  /* (9) */
590            }
591
592            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
593        }
594    }
595
596    return setSize;
597}
598
599static void
600updateFastSet( tr_peermsgs * msgs UNUSED )
601{
602#if 0
603    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
604    const int peerIsNeedy = msgs->peer->progress < 0.10;
605
606    if( fext && peerIsNeedy && !msgs->haveFastSet )
607    {
608        size_t i;
609        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
610        const tr_info * inf = &msgs->torrent->info;
611        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
612
613        /* build the fast set */
614        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
615        msgs->haveFastSet = 1;
616
617        /* send it to the peer */
618        for( i=0; i<msgs->fastsetSize; ++i )
619            protocolSendAllowedFast( msgs, msgs->fastset[i] );
620    }
621#endif
622}
623
624/**
625***  INTEREST
626**/
627
628static tr_bool
629isPieceInteresting( const tr_peermsgs * msgs,
630                    tr_piece_index_t    piece )
631{
632    const tr_torrent * torrent = msgs->torrent;
633
634    return ( !torrent->info.pieces[piece].dnd )                 /* we want it */
635          && ( !tr_cpPieceIsComplete( &torrent->completion, piece ) ) /* !have */
636          && ( tr_bitfieldHas( msgs->peer->have, piece ) );    /* peer has it */
637}
638
639/* "interested" means we'll ask for piece data if they unchoke us */
640static tr_bool
641isPeerInteresting( const tr_peermsgs * msgs )
642{
643    tr_piece_index_t    i;
644    const tr_torrent *  torrent;
645    const tr_bitfield * bitfield;
646    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
647
648    if( clientIsSeed )
649        return FALSE;
650
651    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
652        return FALSE;
653
654    torrent = msgs->torrent;
655    bitfield = tr_cpPieceBitfield( &torrent->completion );
656
657    if( !msgs->peer->have )
658        return TRUE;
659
660    assert( bitfield->byteCount == msgs->peer->have->byteCount );
661
662    for( i = 0; i < torrent->info.pieceCount; ++i )
663        if( isPieceInteresting( msgs, i ) )
664            return TRUE;
665
666    return FALSE;
667}
668
669static void
670sendInterest( tr_peermsgs * msgs,
671              int           weAreInterested )
672{
673    struct evbuffer * out = msgs->outMessages;
674
675    assert( msgs );
676    assert( weAreInterested == 0 || weAreInterested == 1 );
677
678    msgs->peer->clientIsInterested = weAreInterested;
679    dbgmsg( msgs, "Sending %s", weAreInterested ? "Interested" : "Not Interested" );
680    tr_peerIoWriteUint32( msgs->peer->io, out, sizeof( uint8_t ) );
681    tr_peerIoWriteUint8 ( msgs->peer->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
682
683    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
684    dbgOutMessageLen( msgs );
685}
686
687static void
688updateInterest( tr_peermsgs * msgs )
689{
690    const int i = isPeerInteresting( msgs );
691
692    if( i != msgs->peer->clientIsInterested )
693        sendInterest( msgs, i );
694    if( i )
695        fireNeedReq( msgs );
696}
697
698static tr_bool
699popNextRequest( tr_peermsgs *         msgs,
700                struct peer_request * setme )
701{
702    return reqListPop( &msgs->peerAskedFor, setme );
703}
704
705static void
706cancelAllRequestsToClient( tr_peermsgs * msgs )
707{
708    struct peer_request req;
709    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
710
711    while( popNextRequest( msgs, &req ) )
712        if( mustSendCancel )
713            protocolSendReject( msgs, &req );
714}
715
716void
717tr_peerMsgsSetChoke( tr_peermsgs * msgs,
718                     int           choke )
719{
720    const time_t now = time( NULL );
721    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
722
723    assert( msgs );
724    assert( msgs->peer );
725    assert( choke == 0 || choke == 1 );
726
727    if( msgs->peer->chokeChangedAt > fibrillationTime )
728    {
729        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
730    }
731    else if( msgs->peer->peerIsChoked != choke )
732    {
733        msgs->peer->peerIsChoked = choke;
734        if( choke )
735            cancelAllRequestsToClient( msgs );
736        protocolSendChoke( msgs, choke );
737        msgs->peer->chokeChangedAt = now;
738    }
739}
740
741/**
742***
743**/
744
745void
746tr_peerMsgsHave( tr_peermsgs * msgs,
747                 uint32_t      index )
748{
749    protocolSendHave( msgs, index );
750
751    /* since we have more pieces now, we might not be interested in this peer */
752    updateInterest( msgs );
753}
754
755/**
756***
757**/
758
759static tr_bool
760reqIsValid( const tr_peermsgs * peer,
761            uint32_t            index,
762            uint32_t            offset,
763            uint32_t            length )
764{
765    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
766}
767
768static tr_bool
769requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
770{
771    return reqIsValid( msgs, req->index, req->offset, req->length );
772}
773
774static void
775expireFromList( tr_peermsgs          * msgs,
776                struct request_list  * list,
777                const time_t           oldestAllowed )
778{
779    size_t i;
780    struct request_list tmp = REQUEST_LIST_INIT;
781
782    /* since the fifo list is sorted by time, the oldest will be first */
783    if( !list->len || ( list->fifo[0].time_requested >= oldestAllowed ) )
784        return;
785
786    /* if we found one too old, start pruning them */
787    reqListCopy( &tmp, list );
788    for( i=0; i<tmp.len; ++i ) {
789        const struct peer_request * req = &tmp.fifo[i];
790        if( req->time_requested >= oldestAllowed )
791            break;
792        tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
793    }
794    reqListClear( &tmp );
795}
796
797static void
798expireOldRequests( tr_peermsgs * msgs, const time_t now  )
799{
800    time_t oldestAllowed;
801    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
802    dbgmsg( msgs, "entering `expire old requests' block" );
803
804    /* cancel requests that have been queued for too long */
805    oldestAllowed = now - QUEUED_REQUEST_TTL_SECS;
806    expireFromList( msgs, &msgs->clientWillAskFor, oldestAllowed );
807
808    /* if the peer doesn't support "Reject Request",
809     * cancel requests that were sent too long ago. */
810    if( !fext ) {
811        oldestAllowed = now - SENT_REQUEST_TTL_SECS;
812        expireFromList( msgs, &msgs->clientAskedFor, oldestAllowed );
813    }
814
815    dbgmsg( msgs, "leaving `expire old requests' block" );
816}
817
818static void
819pumpRequestQueue( tr_peermsgs * msgs, const time_t now )
820{
821    const int           max = msgs->maxActiveRequests;
822    int                 sent = 0;
823    int                 len = msgs->clientAskedFor.len;
824    struct peer_request req;
825
826    dbgmsg( msgs, "clientIsChoked %d, download allowed %d, len %d, max %d, msgs->clientWillAskFor.len %d",
827            (int)msgs->peer->clientIsChoked,
828            (int)tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ),
829            len, max, msgs->clientWillAskFor.len );
830
831    if( msgs->peer->clientIsChoked )
832        return;
833    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
834        return;
835
836    while( ( len < max ) && reqListPop( &msgs->clientWillAskFor, &req ) )
837    {
838        const tr_block_index_t block = _tr_block( msgs->torrent, req.index, req.offset );
839
840        assert( requestIsValid( msgs, &req ) );
841        assert( tr_bitfieldHas( msgs->peer->have, req.index ) );
842
843        /* don't ask for it if we've already got it... this block may have
844         * come in from a different peer after we cancelled a request for it */
845        if( !tr_cpBlockIsComplete( &msgs->torrent->completion, block ) )
846        {
847            protocolSendRequest( msgs, &req );
848            req.time_requested = now;
849            reqListAppend( &msgs->clientAskedFor, &req );
850
851            ++len;
852            ++sent;
853        }
854        else dbgmsg( msgs, "not asking for it because we've already got it..." );
855    }
856
857    if( sent )
858        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
859                sent, msgs->clientAskedFor.len, msgs->clientWillAskFor.len );
860
861    if( len < max )
862        fireNeedReq( msgs );
863}
864
865static TR_INLINE tr_bool
866requestQueueIsFull( const tr_peermsgs * msgs )
867{
868    const int req_max = msgs->maxActiveRequests;
869    return msgs->clientWillAskFor.len >= (size_t)req_max;
870}
871
872tr_addreq_t
873tr_peerMsgsAddRequest( tr_peermsgs *    msgs,
874                       uint32_t         index,
875                       uint32_t         offset,
876                       uint32_t         length )
877{
878    struct peer_request req;
879
880    assert( msgs );
881    assert( msgs->torrent );
882
883    /**
884    ***  Reasons to decline the request
885    **/
886
887    /* don't send requests to choked clients */
888    if( msgs->peer->clientIsChoked ) {
889        dbgmsg( msgs, "declining request because they're choking us" );
890        return TR_ADDREQ_CLIENT_CHOKED;
891    }
892
893    /* peer's queue is full */
894    if( requestQueueIsFull( msgs ) ) {
895        dbgmsg( msgs, "declining request because we're full" );
896        return TR_ADDREQ_FULL;
897    }
898
899    /* peer doesn't have this piece */
900    if( !tr_bitfieldHas( msgs->peer->have, index ) )
901        return TR_ADDREQ_MISSING;
902
903    /* have we already asked for this piece? */
904    req.index = index;
905    req.offset = offset;
906    req.length = length;
907    if( reqListHas( &msgs->clientAskedFor, &req ) ) {
908        dbgmsg( msgs, "declining because it's a duplicate" );
909        return TR_ADDREQ_DUPLICATE;
910    }
911    if( reqListHas( &msgs->clientWillAskFor, &req ) ) {
912        dbgmsg( msgs, "declining because it's a duplicate" );
913        return TR_ADDREQ_DUPLICATE;
914    }
915
916    /**
917    ***  Accept this request
918    **/
919
920    dbgmsg( msgs, "adding req for %"PRIu32":%"PRIu32"->%"PRIu32" to our `will request' list",
921            index, offset, length );
922    req.time_requested = time( NULL );
923    reqListAppend( &msgs->clientWillAskFor, &req );
924    return TR_ADDREQ_OK;
925}
926
927static void
928cancelAllRequestsToPeer( tr_peermsgs * msgs, tr_bool sendCancel )
929{
930    size_t i;
931    struct request_list a = msgs->clientWillAskFor;
932    struct request_list b = msgs->clientAskedFor;
933    dbgmsg( msgs, "cancelling all requests to peer" );
934
935    msgs->clientAskedFor = REQUEST_LIST_INIT;
936    msgs->clientWillAskFor = REQUEST_LIST_INIT;
937
938    for( i=0; i<a.len; ++i )
939        fireCancelledReq( msgs, &a.fifo[i] );
940
941    for( i = 0; i < b.len; ++i ) {
942        fireCancelledReq( msgs, &b.fifo[i] );
943        if( sendCancel )
944            protocolSendCancel( msgs, &b.fifo[i] );
945    }
946
947    reqListClear( &a );
948    reqListClear( &b );
949}
950
951void
952tr_peerMsgsCancel( tr_peermsgs * msgs,
953                   uint32_t      pieceIndex,
954                   uint32_t      offset,
955                   uint32_t      length )
956{
957    struct peer_request req;
958
959    assert( msgs != NULL );
960    assert( length > 0 );
961
962
963    /* have we asked the peer for this piece? */
964    req.index = pieceIndex;
965    req.offset = offset;
966    req.length = length;
967
968    /* if it's only in the queue and hasn't been sent yet, free it */
969    if( reqListRemove( &msgs->clientWillAskFor, &req ) ) {
970        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length );
971        fireCancelledReq( msgs, &req );
972    }
973
974    /* if it's already been sent, send a cancel message too */
975    if( reqListRemove( &msgs->clientAskedFor, &req ) ) {
976        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length );
977        protocolSendCancel( msgs, &req );
978        fireCancelledReq( msgs, &req );
979    }
980}
981
982
983/**
984***
985**/
986
987static void
988sendLtepHandshake( tr_peermsgs * msgs )
989{
990    tr_benc val, *m;
991    char * buf;
992    int len;
993    int pex;
994    struct evbuffer * out = msgs->outMessages;
995
996    if( msgs->clientSentLtepHandshake )
997        return;
998
999    dbgmsg( msgs, "sending an ltep handshake" );
1000    msgs->clientSentLtepHandshake = 1;
1001
1002    /* decide if we want to advertise pex support */
1003    if( !tr_torrentAllowsPex( msgs->torrent ) )
1004        pex = 0;
1005    else if( msgs->peerSentLtepHandshake )
1006        pex = msgs->peerSupportsPex ? 1 : 0;
1007    else
1008        pex = 1;
1009
1010    tr_bencInitDict( &val, 5 );
1011    tr_bencDictAddInt( &val, "e", getSession(msgs)->encryptionMode != TR_CLEAR_PREFERRED );
1012    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( getSession(msgs) ) );
1013    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
1014    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
1015    m  = tr_bencDictAddDict( &val, "m", 1 );
1016    if( pex )
1017        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
1018    buf = tr_bencToStr( &val, TR_FMT_BENC, &len );
1019
1020    tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + len );
1021    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
1022    tr_peerIoWriteUint8 ( msgs->peer->io, out, LTEP_HANDSHAKE );
1023    tr_peerIoWriteBytes ( msgs->peer->io, out, buf, len );
1024    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1025    dbgOutMessageLen( msgs );
1026
1027    /* cleanup */
1028    tr_bencFree( &val );
1029    tr_free( buf );
1030}
1031
1032static void
1033parseLtepHandshake( tr_peermsgs *     msgs,
1034                    int               len,
1035                    struct evbuffer * inbuf )
1036{
1037    int64_t   i;
1038    tr_benc   val, * sub;
1039    uint8_t * tmp = tr_new( uint8_t, len );
1040
1041    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
1042    msgs->peerSentLtepHandshake = 1;
1043
1044    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
1045    {
1046        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
1047        tr_free( tmp );
1048        return;
1049    }
1050
1051    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
1052
1053    /* does the peer prefer encrypted connections? */
1054    if( tr_bencDictFindInt( &val, "e", &i ) )
1055        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1056                                              : ENCRYPTION_PREFERENCE_NO;
1057
1058    /* check supported messages for utorrent pex */
1059    msgs->peerSupportsPex = 0;
1060    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
1061        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
1062            msgs->ut_pex_id = (uint8_t) i;
1063            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
1064            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
1065        }
1066    }
1067
1068    /* look for upload_only (BEP 21) */
1069    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
1070        fireUploadOnly( msgs, i!=0 );
1071
1072    /* get peer's listening port */
1073    if( tr_bencDictFindInt( &val, "p", &i ) ) {
1074        fireClientGotPort( msgs, (tr_port)i );
1075        dbgmsg( msgs, "peer's port is now %d", (int)i );
1076    }
1077
1078    /* get peer's maximum request queue size */
1079    if( tr_bencDictFindInt( &val, "reqq", &i ) )
1080        msgs->reqq = i;
1081
1082    tr_bencFree( &val );
1083    tr_free( tmp );
1084}
1085
1086static void
1087parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1088{
1089    int loaded = 0;
1090    uint8_t * tmp = tr_new( uint8_t, msglen );
1091    tr_benc val;
1092    tr_torrent * tor = msgs->torrent;
1093    const uint8_t * added;
1094    size_t added_len;
1095
1096    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1097
1098    if( tr_torrentAllowsPex( tor )
1099      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1100    {
1101        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1102        {
1103            tr_pex * pex;
1104            size_t i, n;
1105            size_t added_f_len = 0;
1106            const uint8_t * added_f = NULL;
1107
1108            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1109            pex = tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len, &n );
1110
1111            n = MIN( n, MAX_PEX_PEER_COUNT );
1112            for( i=0; i<n; ++i )
1113                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex + i );
1114
1115            tr_free( pex );
1116        }
1117
1118        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1119        {
1120            tr_pex * pex;
1121            size_t i, n;
1122            size_t added_f_len = 0;
1123            const uint8_t * added_f = NULL;
1124
1125            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1126            pex = tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len, &n );
1127
1128            n = MIN( n, MAX_PEX_PEER_COUNT );
1129            for( i=0; i<n; ++i )
1130                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex + i );
1131
1132            tr_free( pex );
1133        }
1134    }
1135
1136    if( loaded )
1137        tr_bencFree( &val );
1138    tr_free( tmp );
1139}
1140
1141static void sendPex( tr_peermsgs * msgs );
1142
1143static void
1144parseLtep( tr_peermsgs *     msgs,
1145           int               msglen,
1146           struct evbuffer * inbuf )
1147{
1148    uint8_t ltep_msgid;
1149
1150    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1151    msglen--;
1152
1153    if( ltep_msgid == LTEP_HANDSHAKE )
1154    {
1155        dbgmsg( msgs, "got ltep handshake" );
1156        parseLtepHandshake( msgs, msglen, inbuf );
1157        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1158        {
1159            sendLtepHandshake( msgs );
1160            sendPex( msgs );
1161        }
1162    }
1163    else if( ltep_msgid == TR_LTEP_PEX )
1164    {
1165        dbgmsg( msgs, "got ut pex" );
1166        msgs->peerSupportsPex = 1;
1167        parseUtPex( msgs, msglen, inbuf );
1168    }
1169    else
1170    {
1171        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1172        evbuffer_drain( inbuf, msglen );
1173    }
1174}
1175
1176static int
1177readBtLength( tr_peermsgs *     msgs,
1178              struct evbuffer * inbuf,
1179              size_t            inlen )
1180{
1181    uint32_t len;
1182
1183    if( inlen < sizeof( len ) )
1184        return READ_LATER;
1185
1186    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1187
1188    if( len == 0 ) /* peer sent us a keepalive message */
1189        dbgmsg( msgs, "got KeepAlive" );
1190    else
1191    {
1192        msgs->incoming.length = len;
1193        msgs->state = AWAITING_BT_ID;
1194    }
1195
1196    return READ_NOW;
1197}
1198
1199static int readBtMessage( tr_peermsgs *     msgs,
1200                          struct evbuffer * inbuf,
1201                          size_t            inlen );
1202
1203static int
1204readBtId( tr_peermsgs *     msgs,
1205          struct evbuffer * inbuf,
1206          size_t            inlen )
1207{
1208    uint8_t id;
1209
1210    if( inlen < sizeof( uint8_t ) )
1211        return READ_LATER;
1212
1213    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1214    msgs->incoming.id = id;
1215    dbgmsg( msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length );
1216
1217    if( id == BT_PIECE )
1218    {
1219        msgs->state = AWAITING_BT_PIECE;
1220        return READ_NOW;
1221    }
1222    else if( msgs->incoming.length != 1 )
1223    {
1224        msgs->state = AWAITING_BT_MESSAGE;
1225        return READ_NOW;
1226    }
1227    else return readBtMessage( msgs, inbuf, inlen - 1 );
1228}
1229
1230static void
1231updatePeerProgress( tr_peermsgs * msgs )
1232{
1233    msgs->peer->progress = tr_bitfieldCountTrueBits( msgs->peer->have ) / (float)msgs->torrent->info.pieceCount;
1234    dbgmsg( msgs, "peer progress is %f", msgs->peer->progress );
1235    updateFastSet( msgs );
1236    updateInterest( msgs );
1237    firePeerProgress( msgs );
1238}
1239
1240static void
1241peerMadeRequest( tr_peermsgs *               msgs,
1242                 const struct peer_request * req )
1243{
1244    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1245    const int reqIsValid = requestIsValid( msgs, req );
1246    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( &msgs->torrent->completion, req->index );
1247    const int peerIsChoked = msgs->peer->peerIsChoked;
1248
1249    int allow = FALSE;
1250
1251    if( !reqIsValid )
1252        dbgmsg( msgs, "rejecting an invalid request." );
1253    else if( !clientHasPiece )
1254        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1255    else if( peerIsChoked )
1256        dbgmsg( msgs, "rejecting request from choked peer" );
1257    else
1258        allow = TRUE;
1259
1260    if( allow )
1261        reqListAppend( &msgs->peerAskedFor, req );
1262    else if( fext )
1263        protocolSendReject( msgs, req );
1264}
1265
1266static tr_bool
1267messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1268{
1269    switch( id )
1270    {
1271        case BT_CHOKE:
1272        case BT_UNCHOKE:
1273        case BT_INTERESTED:
1274        case BT_NOT_INTERESTED:
1275        case BT_FEXT_HAVE_ALL:
1276        case BT_FEXT_HAVE_NONE:
1277            return len == 1;
1278
1279        case BT_HAVE:
1280        case BT_FEXT_SUGGEST:
1281        case BT_FEXT_ALLOWED_FAST:
1282            return len == 5;
1283
1284        case BT_BITFIELD:
1285            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1286
1287        case BT_REQUEST:
1288        case BT_CANCEL:
1289        case BT_FEXT_REJECT:
1290            return len == 13;
1291
1292        case BT_PIECE:
1293            return len > 9 && len <= 16393;
1294
1295        case BT_PORT:
1296            return len == 3;
1297
1298        case BT_LTEP:
1299            return len >= 2;
1300
1301        default:
1302            return FALSE;
1303    }
1304}
1305
1306static int clientGotBlock( tr_peermsgs *               msgs,
1307                           const uint8_t *             block,
1308                           const struct peer_request * req );
1309
1310static int
1311readBtPiece( tr_peermsgs      * msgs,
1312             struct evbuffer  * inbuf,
1313             size_t             inlen,
1314             size_t           * setme_piece_bytes_read )
1315{
1316    struct peer_request * req = &msgs->incoming.blockReq;
1317
1318    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1319    dbgmsg( msgs, "In readBtPiece" );
1320
1321    if( !req->length )
1322    {
1323        if( inlen < 8 )
1324            return READ_LATER;
1325
1326        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1327        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1328        req->length = msgs->incoming.length - 9;
1329        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1330        return READ_NOW;
1331    }
1332    else
1333    {
1334        int err;
1335
1336        /* read in another chunk of data */
1337        const size_t nLeft = req->length - EVBUFFER_LENGTH( msgs->incoming.block );
1338        size_t n = MIN( nLeft, inlen );
1339        size_t i = n;
1340
1341        while( i > 0 )
1342        {
1343            uint8_t buf[MAX_STACK_ARRAY_SIZE];
1344            const size_t thisPass = MIN( i, sizeof( buf ) );
1345            tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, thisPass );
1346            evbuffer_add( msgs->incoming.block, buf, thisPass );
1347            i -= thisPass;
1348        }
1349
1350        fireClientGotData( msgs, n, TRUE );
1351        *setme_piece_bytes_read += n;
1352        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1353               n, req->index, req->offset, req->length,
1354               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1355        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1356            return READ_LATER;
1357
1358        /* we've got the whole block ... process it */
1359        err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), req );
1360
1361        /* cleanup */
1362        evbuffer_free( msgs->incoming.block );
1363        msgs->incoming.block = evbuffer_new( );
1364        req->length = 0;
1365        msgs->state = AWAITING_BT_LENGTH;
1366        return err ? READ_ERR : READ_NOW;
1367    }
1368}
1369
1370static int
1371readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1372{
1373    uint32_t      ui32;
1374    uint32_t      msglen = msgs->incoming.length;
1375    const uint8_t id = msgs->incoming.id;
1376    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1377    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1378
1379    --msglen; /* id length */
1380
1381    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1382
1383    if( inlen < msglen )
1384        return READ_LATER;
1385
1386    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1387    {
1388        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1389        fireError( msgs, EMSGSIZE );
1390        return READ_ERR;
1391    }
1392
1393    switch( id )
1394    {
1395        case BT_CHOKE:
1396            dbgmsg( msgs, "got Choke" );
1397            msgs->peer->clientIsChoked = 1;
1398            if( !fext )
1399                cancelAllRequestsToPeer( msgs, FALSE );
1400            break;
1401
1402        case BT_UNCHOKE:
1403            dbgmsg( msgs, "got Unchoke" );
1404            msgs->peer->clientIsChoked = 0;
1405            fireNeedReq( msgs );
1406            break;
1407
1408        case BT_INTERESTED:
1409            dbgmsg( msgs, "got Interested" );
1410            msgs->peer->peerIsInterested = 1;
1411            break;
1412
1413        case BT_NOT_INTERESTED:
1414            dbgmsg( msgs, "got Not Interested" );
1415            msgs->peer->peerIsInterested = 0;
1416            break;
1417
1418        case BT_HAVE:
1419            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1420            dbgmsg( msgs, "got Have: %u", ui32 );
1421            if( tr_bitfieldAdd( msgs->peer->have, ui32 ) ) {
1422                fireError( msgs, ERANGE );
1423                return READ_ERR;
1424            }
1425            updatePeerProgress( msgs );
1426            tr_rcTransferred( &msgs->torrent->swarmSpeed,
1427                              msgs->torrent->info.pieceSize );
1428            break;
1429
1430        case BT_BITFIELD:
1431        {
1432            dbgmsg( msgs, "got a bitfield" );
1433            tr_peerIoReadBytes( msgs->peer->io, inbuf, msgs->peer->have->bits, msglen );
1434            updatePeerProgress( msgs );
1435            fireNeedReq( msgs );
1436            break;
1437        }
1438
1439        case BT_REQUEST:
1440        {
1441            struct peer_request r;
1442            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1443            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1444            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1445            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1446            peerMadeRequest( msgs, &r );
1447            break;
1448        }
1449
1450        case BT_CANCEL:
1451        {
1452            struct peer_request r;
1453            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1454            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1455            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1456            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1457            if( reqListRemove( &msgs->peerAskedFor, &r ) && fext )
1458                protocolSendReject( msgs, &r );
1459            break;
1460        }
1461
1462        case BT_PIECE:
1463            assert( 0 ); /* handled elsewhere! */
1464            break;
1465
1466        case BT_PORT:
1467            dbgmsg( msgs, "Got a BT_PORT" );
1468            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->dht_port );
1469            if( msgs->peer->dht_port > 0 )
1470                tr_dhtAddNode( getSession(msgs),
1471                               tr_peerAddress( msgs->peer ),
1472                               msgs->peer->dht_port, 0 );
1473            break;
1474
1475        case BT_FEXT_SUGGEST:
1476            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1477            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1478            if( fext )
1479                fireClientGotSuggest( msgs, ui32 );
1480            else {
1481                fireError( msgs, EMSGSIZE );
1482                return READ_ERR;
1483            }
1484            break;
1485
1486        case BT_FEXT_ALLOWED_FAST:
1487            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1488            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1489            if( fext )
1490                fireClientGotAllowedFast( msgs, ui32 );
1491            else {
1492                fireError( msgs, EMSGSIZE );
1493                return READ_ERR;
1494            }
1495            break;
1496
1497        case BT_FEXT_HAVE_ALL:
1498            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1499            if( fext ) {
1500                tr_bitfieldAddRange( msgs->peer->have, 0, msgs->torrent->info.pieceCount );
1501                updatePeerProgress( msgs );
1502            } else {
1503                fireError( msgs, EMSGSIZE );
1504                return READ_ERR;
1505            }
1506            break;
1507
1508        case BT_FEXT_HAVE_NONE:
1509            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1510            if( fext ) {
1511                tr_bitfieldClear( msgs->peer->have );
1512                updatePeerProgress( msgs );
1513            } else {
1514                fireError( msgs, EMSGSIZE );
1515                return READ_ERR;
1516            }
1517            break;
1518
1519        case BT_FEXT_REJECT:
1520        {
1521            struct peer_request r;
1522            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1523            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1524            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1525            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1526            if( fext )
1527                reqListRemove( &msgs->clientAskedFor, &r );
1528            else {
1529                fireError( msgs, EMSGSIZE );
1530                return READ_ERR;
1531            }
1532            break;
1533        }
1534
1535        case BT_LTEP:
1536            dbgmsg( msgs, "Got a BT_LTEP" );
1537            parseLtep( msgs, msglen, inbuf );
1538            break;
1539
1540        default:
1541            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1542            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1543            break;
1544    }
1545
1546    assert( msglen + 1 == msgs->incoming.length );
1547    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1548
1549    msgs->state = AWAITING_BT_LENGTH;
1550    return READ_NOW;
1551}
1552
1553static TR_INLINE void
1554decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount )
1555{
1556    tr_torrent * tor = msgs->torrent;
1557
1558    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1559}
1560
1561static TR_INLINE void
1562clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1563{
1564static double unwantedGotten = 0.0;
1565fprintf( stderr, "dupe ratio: %f\n", ++unwantedGotten / blocksGotten );
1566    decrementDownloadedCount( msgs, req->length );
1567}
1568
1569static void
1570addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1571{
1572    if( !msgs->peer->blame )
1573         msgs->peer->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1574    tr_bitfieldAdd( msgs->peer->blame, index );
1575}
1576
1577/* returns 0 on success, or an errno on failure */
1578static int
1579clientGotBlock( tr_peermsgs *               msgs,
1580                const uint8_t *             data,
1581                const struct peer_request * req )
1582{
1583    int err;
1584    tr_torrent * tor = msgs->torrent;
1585    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1586
1587    assert( msgs );
1588    assert( req );
1589
1590    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1591        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1592                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1593        return EMSGSIZE;
1594    }
1595
1596    /* save the block */
1597    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1598
1599    /**
1600    *** Remove the block from our `we asked for this' list
1601    **/
1602
1603    if( !reqListRemove( &msgs->clientAskedFor, req ) ) {
1604        clientGotUnwantedBlock( msgs, req );
1605        dbgmsg( msgs, "we didn't ask for this message..." );
1606        return 0;
1607    }
1608
1609    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1610            msgs->clientAskedFor.len );
1611
1612    /**
1613    *** Error checks
1614    **/
1615
1616    if( tr_cpBlockIsComplete( &tor->completion, block ) ) {
1617        dbgmsg( msgs, "we have this block already..." );
1618        clientGotUnwantedBlock( msgs, req );
1619        return 0;
1620    }
1621
1622    /**
1623    ***  Save the block
1624    **/
1625
1626    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1627        return err;
1628
1629    addPeerToBlamefield( msgs, req->index );
1630    fireGotBlock( msgs, req );
1631    return 0;
1632}
1633
1634static int peerPulse( void * vmsgs );
1635
1636static void
1637didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1638{
1639    tr_peermsgs * msgs = vmsgs;
1640    firePeerGotData( msgs, bytesWritten, wasPieceData );
1641
1642    if ( tr_isPeerIo( io ) && io->userData )
1643        peerPulse( msgs );
1644}
1645
1646static ReadState
1647canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1648{
1649    ReadState         ret;
1650    tr_peermsgs *     msgs = vmsgs;
1651    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1652    const size_t      inlen = EVBUFFER_LENGTH( in );
1653
1654    dbgmsg( msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state );
1655
1656    if( !inlen )
1657    {
1658        ret = READ_LATER;
1659    }
1660    else if( msgs->state == AWAITING_BT_PIECE )
1661    {
1662        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1663    }
1664    else switch( msgs->state )
1665    {
1666        case AWAITING_BT_LENGTH:
1667            ret = readBtLength ( msgs, in, inlen ); break;
1668
1669        case AWAITING_BT_ID:
1670            ret = readBtId     ( msgs, in, inlen ); break;
1671
1672        case AWAITING_BT_MESSAGE:
1673            ret = readBtMessage( msgs, in, inlen ); break;
1674
1675        default:
1676            ret = READ_ERR;
1677            assert( 0 );
1678    }
1679
1680    dbgmsg( msgs, "canRead: ret is %d", (int)ret );
1681
1682    /* log the raw data that was read */
1683    if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
1684        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1685
1686    return ret;
1687}
1688
1689/**
1690***
1691**/
1692
1693static int
1694ratePulse( tr_peermsgs * msgs, uint64_t now )
1695{
1696    int irate;
1697    const int floor = 8;
1698    const int seconds = 10;
1699    double rate;
1700    int estimatedBlocksInPeriod;
1701    const tr_torrent * const torrent = msgs->torrent;
1702
1703    /* Get the rate limit we should use.
1704     * FIXME: this needs to consider all the other peers as well... */
1705    rate = tr_peerGetPieceSpeed( msgs->peer, now, TR_PEER_TO_CLIENT );
1706    if( tr_torrentUsesSpeedLimit( torrent, TR_PEER_TO_CLIENT ) )
1707        rate = MIN( rate, tr_torrentGetSpeedLimit( torrent, TR_PEER_TO_CLIENT ) );
1708    if( tr_torrentUsesSessionLimits( torrent ) )
1709        if( tr_sessionGetActiveSpeedLimit( torrent->session, TR_PEER_TO_CLIENT, &irate ) )
1710            rate = MIN( rate, irate );
1711
1712    estimatedBlocksInPeriod = ( rate * seconds * 1024 ) / torrent->blockSize;
1713    msgs->maxActiveRequests = MAX( floor, estimatedBlocksInPeriod );
1714
1715    if( msgs->reqq > 0 )
1716        msgs->maxActiveRequests = MIN( msgs->maxActiveRequests, msgs->reqq );
1717
1718    return TRUE;
1719}
1720
1721static size_t
1722fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1723{
1724    size_t bytesWritten = 0;
1725    struct peer_request req;
1726    const tr_bool haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1727    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1728
1729    /**
1730    ***  Protocol messages
1731    **/
1732
1733    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1734    {
1735        dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1736        msgs->outMessagesBatchedAt = now;
1737    }
1738    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1739    {
1740        const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
1741        /* flush the protocol messages */
1742        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1743        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, FALSE );
1744        msgs->clientSentAnythingAt = now;
1745        msgs->outMessagesBatchedAt = 0;
1746        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1747        bytesWritten +=  len;
1748    }
1749
1750    /**
1751    ***  Blocks
1752    **/
1753
1754    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= msgs->torrent->blockSize )
1755        && popNextRequest( msgs, &req ) )
1756    {
1757        if( requestIsValid( msgs, &req )
1758            && tr_cpPieceIsComplete( &msgs->torrent->completion, req.index ) )
1759        {
1760            /* FIXME(libevent2) use evbuffer_reserve_space() + evbuffer_commit_space() */
1761            int err;
1762            const uint32_t msglen = 4 + 1 + 4 + 4 + req.length;
1763            struct evbuffer * out;
1764            tr_peerIo * io = msgs->peer->io;
1765
1766            out = evbuffer_new( );
1767            evbuffer_expand( out, msglen );
1768
1769            tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1770            tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1771            tr_peerIoWriteUint32( io, out, req.index );
1772            tr_peerIoWriteUint32( io, out, req.offset );
1773
1774            err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, EVBUFFER_DATA(out)+EVBUFFER_LENGTH(out) );
1775            if( err )
1776            {
1777                if( fext )
1778                    protocolSendReject( msgs, &req );
1779            }
1780            else
1781            {
1782                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1783                EVBUFFER_LENGTH(out) += req.length;
1784                assert( EVBUFFER_LENGTH( out ) == msglen );
1785                tr_peerIoWriteBuf( io, out, TRUE );
1786                bytesWritten += EVBUFFER_LENGTH( out );
1787                msgs->clientSentAnythingAt = now;
1788            }
1789
1790            evbuffer_free( out );
1791
1792            if( err )
1793            {
1794                bytesWritten = 0;
1795                msgs = NULL;
1796            }
1797        }
1798        else if( fext ) /* peer needs a reject message */
1799        {
1800            protocolSendReject( msgs, &req );
1801        }
1802    }
1803
1804    /**
1805    ***  Keepalive
1806    **/
1807
1808    if( ( msgs != NULL )
1809        && ( msgs->clientSentAnythingAt != 0 )
1810        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1811    {
1812        dbgmsg( msgs, "sending a keepalive message" );
1813        tr_peerIoWriteUint32( msgs->peer->io, msgs->outMessages, 0 );
1814        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1815    }
1816
1817    return bytesWritten;
1818}
1819
1820static int
1821peerPulse( void * vmsgs )
1822{
1823    tr_peermsgs * msgs = vmsgs;
1824    const time_t  now = time( NULL );
1825
1826    ratePulse( msgs, now );
1827
1828    pumpRequestQueue( msgs, now );
1829    expireOldRequests( msgs, now );
1830
1831    for( ;; )
1832        if( fillOutputBuffer( msgs, now ) < 1 )
1833            break;
1834
1835    return TRUE; /* loop forever */
1836}
1837
1838void
1839tr_peerMsgsPulse( tr_peermsgs * msgs )
1840{
1841    if( msgs != NULL )
1842        peerPulse( msgs );
1843}
1844
1845static void
1846gotError( tr_peerIo  * io UNUSED,
1847          short        what,
1848          void       * vmsgs )
1849{
1850    if( what & EVBUFFER_TIMEOUT )
1851        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1852    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1853        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1854               what, errno, tr_strerror( errno ) );
1855    fireError( vmsgs, ENOTCONN );
1856}
1857
1858static void
1859sendBitfield( tr_peermsgs * msgs )
1860{
1861    struct evbuffer * out = msgs->outMessages;
1862    tr_bitfield *     field;
1863    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
1864    size_t            i;
1865    size_t            lazyCount = 0;
1866
1867    field = tr_bitfieldDup( tr_cpPieceBitfield( &msgs->torrent->completion ) );
1868
1869    if( tr_sessionIsLazyBitfieldEnabled( getSession( msgs ) ) )
1870    {
1871        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
1872            speed over a truly random sample -- let's limit the pool size to
1873            the first 1000 pieces so large torrents don't bog things down */
1874        size_t poolSize;
1875        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
1876        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
1877
1878        /* build the pool */
1879        for( i=poolSize=0; i<maxPoolSize; ++i )
1880            if( tr_bitfieldHas( field, i ) )
1881                pool[poolSize++] = i;
1882
1883        /* pull random piece indices from the pool */
1884        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
1885        {
1886            const int pos = tr_cryptoWeakRandInt( poolSize );
1887            const tr_piece_index_t piece = pool[pos];
1888            tr_bitfieldRem( field, piece );
1889            lazyPieces[lazyCount++] = piece;
1890            pool[pos] = pool[--poolSize];
1891        }
1892
1893        /* cleanup */
1894        tr_free( pool );
1895    }
1896
1897    tr_peerIoWriteUint32( msgs->peer->io, out,
1898                          sizeof( uint8_t ) + field->byteCount );
1899    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_BITFIELD );
1900    /* FIXME(libevent2): use evbuffer_add_reference() */
1901    tr_peerIoWriteBytes ( msgs->peer->io, out, field->bits, field->byteCount );
1902    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu",
1903            EVBUFFER_LENGTH( out ) );
1904    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1905
1906    for( i = 0; i < lazyCount; ++i )
1907        protocolSendHave( msgs, lazyPieces[i] );
1908
1909    tr_bitfieldFree( field );
1910}
1911
1912static void
1913tellPeerWhatWeHave( tr_peermsgs * msgs )
1914{
1915    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1916
1917    if( fext && ( tr_cpGetStatus( &msgs->torrent->completion ) == TR_SEED ) )
1918    {
1919        protocolSendHaveAll( msgs );
1920    }
1921    else if( fext && ( tr_cpHaveValid( &msgs->torrent->completion ) == 0 ) )
1922    {
1923        protocolSendHaveNone( msgs );
1924    }
1925    else
1926    {
1927        sendBitfield( msgs );
1928    }
1929}
1930
1931/**
1932***
1933**/
1934
1935/* some peers give us error messages if we send
1936   more than this many peers in a single pex message
1937   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1938#define MAX_PEX_ADDED 50
1939#define MAX_PEX_DROPPED 50
1940
1941typedef struct
1942{
1943    tr_pex *  added;
1944    tr_pex *  dropped;
1945    tr_pex *  elements;
1946    int       addedCount;
1947    int       droppedCount;
1948    int       elementCount;
1949}
1950PexDiffs;
1951
1952static void
1953pexAddedCb( void * vpex,
1954            void * userData )
1955{
1956    PexDiffs * diffs = userData;
1957    tr_pex *   pex = vpex;
1958
1959    if( diffs->addedCount < MAX_PEX_ADDED )
1960    {
1961        diffs->added[diffs->addedCount++] = *pex;
1962        diffs->elements[diffs->elementCount++] = *pex;
1963    }
1964}
1965
1966static TR_INLINE void
1967pexDroppedCb( void * vpex,
1968              void * userData )
1969{
1970    PexDiffs * diffs = userData;
1971    tr_pex *   pex = vpex;
1972
1973    if( diffs->droppedCount < MAX_PEX_DROPPED )
1974    {
1975        diffs->dropped[diffs->droppedCount++] = *pex;
1976    }
1977}
1978
1979static TR_INLINE void
1980pexElementCb( void * vpex,
1981              void * userData )
1982{
1983    PexDiffs * diffs = userData;
1984    tr_pex * pex = vpex;
1985
1986    diffs->elements[diffs->elementCount++] = *pex;
1987}
1988
1989static void
1990sendPex( tr_peermsgs * msgs )
1991{
1992    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
1993    {
1994        PexDiffs diffs;
1995        PexDiffs diffs6;
1996        tr_pex * newPex = NULL;
1997        tr_pex * newPex6 = NULL;
1998        const int newCount = tr_peerMgrGetPeers( msgs->torrent, &newPex, TR_AF_INET, MAX_PEX_PEER_COUNT );
1999        const int newCount6 = tr_peerMgrGetPeers( msgs->torrent, &newPex6, TR_AF_INET6, MAX_PEX_PEER_COUNT );
2000
2001        /* build the diffs */
2002        diffs.added = tr_new( tr_pex, newCount );
2003        diffs.addedCount = 0;
2004        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2005        diffs.droppedCount = 0;
2006        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2007        diffs.elementCount = 0;
2008        tr_set_compare( msgs->pex, msgs->pexCount,
2009                        newPex, newCount,
2010                        tr_pexCompare, sizeof( tr_pex ),
2011                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2012        diffs6.added = tr_new( tr_pex, newCount6 );
2013        diffs6.addedCount = 0;
2014        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
2015        diffs6.droppedCount = 0;
2016        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
2017        diffs6.elementCount = 0;
2018        tr_set_compare( msgs->pex6, msgs->pexCount6,
2019                        newPex6, newCount6,
2020                        tr_pexCompare, sizeof( tr_pex ),
2021                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
2022        dbgmsg(
2023            msgs,
2024            "pex: old peer count %d, new peer count %d, added %d, removed %d",
2025            msgs->pexCount, newCount + newCount6,
2026            diffs.addedCount + diffs6.addedCount,
2027            diffs.droppedCount + diffs6.droppedCount );
2028
2029        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2030            !diffs6.droppedCount )
2031        {
2032            tr_free( diffs.elements );
2033            tr_free( diffs6.elements );
2034        }
2035        else
2036        {
2037            int  i;
2038            tr_benc val;
2039            char * benc;
2040            int bencLen;
2041            uint8_t * tmp, *walk;
2042            tr_peerIo       * io  = msgs->peer->io;
2043            struct evbuffer * out = msgs->outMessages;
2044
2045            /* update peer */
2046            tr_free( msgs->pex );
2047            msgs->pex = diffs.elements;
2048            msgs->pexCount = diffs.elementCount;
2049            tr_free( msgs->pex6 );
2050            msgs->pex6 = diffs6.elements;
2051            msgs->pexCount6 = diffs6.elementCount;
2052
2053            /* build the pex payload */
2054            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
2055                                         * speed vs. likelihood? */
2056
2057            /* "added" */
2058            tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2059            for( i = 0; i < diffs.addedCount; ++i )
2060            {
2061                memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2062                memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2063            }
2064            assert( ( walk - tmp ) == diffs.addedCount * 6 );
2065            tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2066            tr_free( tmp );
2067
2068            /* "added.f" */
2069            tmp = walk = tr_new( uint8_t, diffs.addedCount );
2070            for( i = 0; i < diffs.addedCount; ++i )
2071                *walk++ = diffs.added[i].flags;
2072            assert( ( walk - tmp ) == diffs.addedCount );
2073            tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2074            tr_free( tmp );
2075
2076            /* "dropped" */
2077            tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2078            for( i = 0; i < diffs.droppedCount; ++i )
2079            {
2080                memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2081                memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2082            }
2083            assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2084            tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2085            tr_free( tmp );
2086
2087            /* "added6" */
2088            tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2089            for( i = 0; i < diffs6.addedCount; ++i )
2090            {
2091                memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2092                walk += 16;
2093                memcpy( walk, &diffs6.added[i].port, 2 );
2094                walk += 2;
2095            }
2096            assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2097            tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2098            tr_free( tmp );
2099
2100            /* "added6.f" */
2101            tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2102            for( i = 0; i < diffs6.addedCount; ++i )
2103                *walk++ = diffs6.added[i].flags;
2104            assert( ( walk - tmp ) == diffs6.addedCount );
2105            tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2106            tr_free( tmp );
2107
2108            /* "dropped6" */
2109            tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2110            for( i = 0; i < diffs6.droppedCount; ++i )
2111            {
2112                memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2113                walk += 16;
2114                memcpy( walk, &diffs6.dropped[i].port, 2 );
2115                walk += 2;
2116            }
2117            assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2118            tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2119            tr_free( tmp );
2120
2121            /* write the pex message */
2122            benc = tr_bencToStr( &val, TR_FMT_BENC, &bencLen );
2123            tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + bencLen );
2124            tr_peerIoWriteUint8 ( io, out, BT_LTEP );
2125            tr_peerIoWriteUint8 ( io, out, msgs->ut_pex_id );
2126            tr_peerIoWriteBytes ( io, out, benc, bencLen );
2127            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2128            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
2129            dbgOutMessageLen( msgs );
2130
2131            tr_free( benc );
2132            tr_bencFree( &val );
2133        }
2134
2135        /* cleanup */
2136        tr_free( diffs.added );
2137        tr_free( diffs.dropped );
2138        tr_free( newPex );
2139        tr_free( diffs6.added );
2140        tr_free( diffs6.dropped );
2141        tr_free( newPex6 );
2142
2143        /*msgs->clientSentPexAt = time( NULL );*/
2144    }
2145}
2146
2147static void
2148pexPulse( int foo UNUSED, short bar UNUSED, void * vmsgs )
2149{
2150    struct tr_peermsgs * msgs = vmsgs;
2151
2152    sendPex( msgs );
2153
2154    tr_timerAdd( &msgs->pexTimer, PEX_INTERVAL_SECS, 0 );
2155}
2156
2157/**
2158***
2159**/
2160
2161tr_peermsgs*
2162tr_peerMsgsNew( struct tr_torrent * torrent,
2163                struct tr_peer    * peer,
2164                tr_delivery_func    func,
2165                void              * userData,
2166                tr_publisher_tag  * setme )
2167{
2168    tr_peermsgs * m;
2169
2170    assert( peer );
2171    assert( peer->io );
2172
2173    m = tr_new0( tr_peermsgs, 1 );
2174    m->publisher = TR_PUBLISHER_INIT;
2175    m->peer = peer;
2176    m->torrent = torrent;
2177    m->peer->clientIsChoked = 1;
2178    m->peer->peerIsChoked = 1;
2179    m->peer->clientIsInterested = 0;
2180    m->peer->peerIsInterested = 0;
2181    m->peer->have = tr_bitfieldNew( torrent->info.pieceCount );
2182    m->state = AWAITING_BT_LENGTH;
2183    m->outMessages = evbuffer_new( );
2184    m->outMessagesBatchedAt = 0;
2185    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2186    m->incoming.block = evbuffer_new( );
2187    m->peerAskedFor = REQUEST_LIST_INIT;
2188    m->clientAskedFor = REQUEST_LIST_INIT;
2189    m->clientWillAskFor = REQUEST_LIST_INIT;
2190    evtimer_set( &m->pexTimer, pexPulse, m );
2191    tr_timerAdd( &m->pexTimer, PEX_INTERVAL_SECS, 0 );
2192    peer->msgs = m;
2193
2194    *setme = tr_publisherSubscribe( &m->publisher, func, userData );
2195
2196    if( tr_peerIoSupportsLTEP( peer->io ) )
2197        sendLtepHandshake( m );
2198
2199    if(tr_peerIoSupportsDHT(peer->io))
2200        protocolSendPort(m, tr_dhtPort(torrent->session));
2201
2202    tellPeerWhatWeHave( m );
2203
2204    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2205    ratePulse( m, tr_date() );
2206
2207    return m;
2208}
2209
2210void
2211tr_peerMsgsFree( tr_peermsgs* msgs )
2212{
2213    if( msgs )
2214    {
2215        evtimer_del( &msgs->pexTimer );
2216        tr_publisherDestruct( &msgs->publisher );
2217        reqListClear( &msgs->clientWillAskFor );
2218        reqListClear( &msgs->clientAskedFor );
2219        reqListClear( &msgs->peerAskedFor );
2220
2221        evbuffer_free( msgs->incoming.block );
2222        evbuffer_free( msgs->outMessages );
2223        tr_free( msgs->pex6 );
2224        tr_free( msgs->pex );
2225
2226        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2227        tr_free( msgs );
2228    }
2229}
2230
2231void
2232tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2233                        tr_publisher_tag tag )
2234{
2235    tr_publisherUnsubscribe( &peer->publisher, tag );
2236}
2237
Note: See TracBrowser for help on using the repository browser.