source: trunk/libtransmission/peer-msgs.c @ 9381

Last change on this file since 9381 was 9381, checked in by charles, 13 years ago

(trunk libT) #2508: atom->port never updated

  • Property svn:keywords set to Date Rev Author Id
File size: 64.4 KB
Line 
1/*
2 * This file Copyright (C) 2007-2009 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 9381 2009-10-22 15:05:56Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <event.h>
21
22#include "transmission.h"
23#include "bencode.h"
24#include "completion.h"
25#include "crypto.h"
26#include "inout.h"
27#ifdef WIN32
28#include "net.h" /* for ECONN */
29#endif
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-msgs.h"
33#include "platform.h" /* MAX_STACK_ARRAY_SIZE */
34#include "ratecontrol.h"
35#include "request-list.h"
36#include "session.h"
37#include "stats.h"
38#include "torrent.h"
39#include "tr-dht.h"
40#include "utils.h"
41#include "version.h"
42
43/**
44***
45**/
46
47enum
48{
49    BT_CHOKE                = 0,
50    BT_UNCHOKE              = 1,
51    BT_INTERESTED           = 2,
52    BT_NOT_INTERESTED       = 3,
53    BT_HAVE                 = 4,
54    BT_BITFIELD             = 5,
55    BT_REQUEST              = 6,
56    BT_PIECE                = 7,
57    BT_CANCEL               = 8,
58    BT_PORT                 = 9,
59
60    BT_FEXT_SUGGEST         = 13,
61    BT_FEXT_HAVE_ALL        = 14,
62    BT_FEXT_HAVE_NONE       = 15,
63    BT_FEXT_REJECT          = 16,
64    BT_FEXT_ALLOWED_FAST    = 17,
65
66    BT_LTEP                 = 20,
67
68    LTEP_HANDSHAKE          = 0,
69
70    TR_LTEP_PEX             = 1,
71
72    MAX_PEX_PEER_COUNT      = 50,
73
74    MIN_CHOKE_PERIOD_SEC    = ( 10 ),
75
76    /* idle seconds before we send a keepalive */
77    KEEPALIVE_INTERVAL_SECS = 100,
78
79    PEX_INTERVAL_SECS       = 90, /* sec between sendPex() calls */
80
81
82    MAX_BLOCK_SIZE          = ( 1024 * 16 ),
83
84
85    /* how long an unsent request can stay queued before it's returned
86       back to the peer-mgr's pool of requests */
87    QUEUED_REQUEST_TTL_SECS = 20,
88
89    /* how long a sent request can stay queued before it's returned
90       back to the peer-mgr's pool of requests */
91    SENT_REQUEST_TTL_SECS = 240,
92
93    /* used in lowering the outMessages queue period */
94    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
95    HIGH_PRIORITY_INTERVAL_SECS = 2,
96    LOW_PRIORITY_INTERVAL_SECS = 20,
97
98    /* number of pieces to remove from the bitfield when
99     * lazy bitfields are turned on */
100    LAZY_PIECE_COUNT = 26,
101
102    /* number of pieces we'll allow in our fast set */
103    MAX_FAST_SET_SIZE = 3
104};
105
106enum
107{
108    AWAITING_BT_LENGTH,
109    AWAITING_BT_ID,
110    AWAITING_BT_MESSAGE,
111    AWAITING_BT_PIECE
112};
113
114/**
115***
116**/
117
118/* this is raw, unchanged data from the peer regarding
119 * the current message that it's sending us. */
120struct tr_incoming
121{
122    uint8_t                id;
123    uint32_t               length; /* includes the +1 for id length */
124    struct peer_request    blockReq; /* metadata for incoming blocks */
125    struct evbuffer *      block; /* piece data for incoming blocks */
126};
127
128/**
129 * Low-level communication state information about a connected peer.
130 *
131 * This structure remembers the low-level protocol states that we're
132 * in with this peer, such as active requests, pex messages, and so on.
133 * Its fields are all private to peer-msgs.c.
134 *
135 * Data not directly involved with sending & receiving messages is
136 * stored in tr_peer, where it can be accessed by both peermsgs and
137 * the peer manager.
138 *
139 * @see struct peer_atom
140 * @see tr_peer
141 */
142struct tr_peermsgs
143{
144    tr_bool         peerSupportsPex;
145    tr_bool         clientSentLtepHandshake;
146    tr_bool         peerSentLtepHandshake;
147    /*tr_bool         haveFastSet;*/
148
149    /* how long the outMessages batch should be allowed to grow before
150     * it's flushed -- some messages (like requests >:) should be sent
151     * very quickly; others aren't as urgent. */
152    int8_t          outMessagesBatchPeriod;
153
154    uint8_t         state;
155    uint8_t         ut_pex_id;
156    uint16_t        pexCount;
157    uint16_t        pexCount6;
158    uint16_t        maxActiveRequests;
159
160#if 0
161    size_t                 fastsetSize;
162    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
163#endif
164
165    tr_peer *              peer;
166
167    tr_torrent *           torrent;
168
169    tr_publisher           publisher;
170
171    struct evbuffer *      outMessages; /* all the non-piece messages */
172
173    struct request_list    peerAskedFor;
174    struct request_list    clientAskedFor;
175    struct request_list    clientWillAskFor;
176
177    tr_pex               * pex;
178    tr_pex               * pex6;
179
180    /*time_t                 clientSentPexAt;*/
181    time_t                 clientSentAnythingAt;
182
183    /* when we started batching the outMessages */
184    time_t                outMessagesBatchedAt;
185
186    struct tr_incoming    incoming;
187
188    /* if the peer supports the Extension Protocol in BEP 10 and
189       supplied a reqq argument, it's stored here.  otherwise the
190       value is zero and should be ignored. */
191    int64_t               reqq;
192
193    struct event          pexTimer;
194};
195
196static TR_INLINE tr_session*
197getSession( struct tr_peermsgs * msgs )
198{
199    return msgs->torrent->session;
200}
201
202/**
203***
204**/
205
206static void
207myDebug( const char * file, int line,
208         const struct tr_peermsgs * msgs,
209         const char * fmt, ... )
210{
211    FILE * fp = tr_getLog( );
212
213    if( fp )
214    {
215        va_list           args;
216        char              timestr[64];
217        struct evbuffer * buf = evbuffer_new( );
218        char *            base = tr_basename( file );
219
220        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
221                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
222                             tr_torrentName( msgs->torrent ),
223                             tr_peerIoGetAddrStr( msgs->peer->io ),
224                             msgs->peer->client );
225        va_start( args, fmt );
226        evbuffer_add_vprintf( buf, fmt, args );
227        va_end( args );
228        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
229        /* FIXME(libevent2) tr_getLog() should return an fd, then use evbuffer_write() here */
230        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
231
232        tr_free( base );
233        evbuffer_free( buf );
234    }
235}
236
237#define dbgmsg( msgs, ... ) \
238    do { \
239        if( tr_deepLoggingIsActive( ) ) \
240            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
241    } while( 0 )
242
243/**
244***
245**/
246
247static void
248pokeBatchPeriod( tr_peermsgs * msgs,
249                 int           interval )
250{
251    if( msgs->outMessagesBatchPeriod > interval )
252    {
253        msgs->outMessagesBatchPeriod = interval;
254        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
255    }
256}
257
258static TR_INLINE void
259dbgOutMessageLen( tr_peermsgs * msgs )
260{
261    dbgmsg( msgs, "outMessage size is now %zu", EVBUFFER_LENGTH( msgs->outMessages ) );
262}
263
264static void
265protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
266{
267    tr_peerIo       * io  = msgs->peer->io;
268    struct evbuffer * out = msgs->outMessages;
269
270    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
271
272    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
273    tr_peerIoWriteUint8 ( io, out, BT_FEXT_REJECT );
274    tr_peerIoWriteUint32( io, out, req->index );
275    tr_peerIoWriteUint32( io, out, req->offset );
276    tr_peerIoWriteUint32( io, out, req->length );
277
278    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
279    dbgOutMessageLen( msgs );
280}
281
282static void
283protocolSendRequest( tr_peermsgs               * msgs,
284                     const struct peer_request * req )
285{
286    tr_peerIo       * io  = msgs->peer->io;
287    struct evbuffer * out = msgs->outMessages;
288
289    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
290    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
291    tr_peerIoWriteUint32( io, out, req->index );
292    tr_peerIoWriteUint32( io, out, req->offset );
293    tr_peerIoWriteUint32( io, out, req->length );
294
295    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
296    dbgOutMessageLen( msgs );
297    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
298}
299
300static void
301protocolSendCancel( tr_peermsgs               * msgs,
302                    const struct peer_request * req )
303{
304    tr_peerIo       * io  = msgs->peer->io;
305    struct evbuffer * out = msgs->outMessages;
306
307    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
308    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
309    tr_peerIoWriteUint32( io, out, req->index );
310    tr_peerIoWriteUint32( io, out, req->offset );
311    tr_peerIoWriteUint32( io, out, req->length );
312
313    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
314    dbgOutMessageLen( msgs );
315    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
316}
317
318static void
319protocolSendPort(tr_peermsgs *msgs, uint16_t port)
320{
321    tr_peerIo       * io  = msgs->peer->io;
322    struct evbuffer * out = msgs->outMessages;
323
324    dbgmsg( msgs, "sending Port %u", port);
325    tr_peerIoWriteUint32( io, out, 3 );
326    tr_peerIoWriteUint8 ( io, out, BT_PORT );
327    tr_peerIoWriteUint16( io, out, port);
328}
329
330static void
331protocolSendHave( tr_peermsgs * msgs,
332                  uint32_t      index )
333{
334    tr_peerIo       * io  = msgs->peer->io;
335    struct evbuffer * out = msgs->outMessages;
336
337    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
338    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
339    tr_peerIoWriteUint32( io, out, index );
340
341    dbgmsg( msgs, "sending Have %u", index );
342    dbgOutMessageLen( msgs );
343    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
344}
345
346#if 0
347static void
348protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
349{
350    tr_peerIo       * io  = msgs->peer->io;
351    struct evbuffer * out = msgs->outMessages;
352
353    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
354
355    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
356    tr_peerIoWriteUint8 ( io, out, BT_FEXT_ALLOWED_FAST );
357    tr_peerIoWriteUint32( io, out, pieceIndex );
358
359    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
360    dbgOutMessageLen( msgs );
361}
362#endif
363
364static void
365protocolSendChoke( tr_peermsgs * msgs,
366                   int           choke )
367{
368    tr_peerIo       * io  = msgs->peer->io;
369    struct evbuffer * out = msgs->outMessages;
370
371    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
372    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
373
374    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
375    dbgOutMessageLen( msgs );
376    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
377}
378
379static void
380protocolSendHaveAll( tr_peermsgs * msgs )
381{
382    tr_peerIo       * io  = msgs->peer->io;
383    struct evbuffer * out = msgs->outMessages;
384
385    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
386
387    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
388    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_ALL );
389
390    dbgmsg( msgs, "sending HAVE_ALL..." );
391    dbgOutMessageLen( msgs );
392    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
393}
394
395static void
396protocolSendHaveNone( tr_peermsgs * msgs )
397{
398    tr_peerIo       * io  = msgs->peer->io;
399    struct evbuffer * out = msgs->outMessages;
400
401    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
402
403    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
404    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_NONE );
405
406    dbgmsg( msgs, "sending HAVE_NONE..." );
407    dbgOutMessageLen( msgs );
408    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
409}
410
411/**
412***  EVENTS
413**/
414
415static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0, 0 };
416
417static void
418publish( tr_peermsgs * msgs, tr_peer_event * e )
419{
420    assert( msgs->peer );
421    assert( msgs->peer->msgs == msgs );
422
423    tr_publisherPublish( &msgs->publisher, msgs->peer, e );
424}
425
426static void
427fireError( tr_peermsgs * msgs, int err )
428{
429    tr_peer_event e = blankEvent;
430    e.eventType = TR_PEER_ERROR;
431    e.err = err;
432    publish( msgs, &e );
433}
434
435static void
436fireUploadOnly( tr_peermsgs * msgs, tr_bool uploadOnly )
437{
438    tr_peer_event e = blankEvent;
439    e.eventType = TR_PEER_UPLOAD_ONLY;
440    e.uploadOnly = uploadOnly;
441    publish( msgs, &e );
442}
443
444static void
445fireNeedReq( tr_peermsgs * msgs )
446{
447    tr_peer_event e = blankEvent;
448    e.eventType = TR_PEER_NEED_REQ;
449    publish( msgs, &e );
450}
451
452static void
453firePeerProgress( tr_peermsgs * msgs )
454{
455    tr_peer_event e = blankEvent;
456    e.eventType = TR_PEER_PEER_PROGRESS;
457    e.progress = msgs->peer->progress;
458    publish( msgs, &e );
459}
460
461static void
462fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
463{
464    tr_peer_event e = blankEvent;
465    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
466    e.pieceIndex = req->index;
467    e.offset = req->offset;
468    e.length = req->length;
469    publish( msgs, &e );
470}
471
472static void
473fireClientGotData( tr_peermsgs * msgs,
474                   uint32_t      length,
475                   int           wasPieceData )
476{
477    tr_peer_event e = blankEvent;
478
479    e.length = length;
480    e.eventType = TR_PEER_CLIENT_GOT_DATA;
481    e.wasPieceData = wasPieceData;
482    publish( msgs, &e );
483}
484
485static void
486fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
487{
488    tr_peer_event e = blankEvent;
489    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
490    e.pieceIndex = pieceIndex;
491    publish( msgs, &e );
492}
493
494static void
495fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
496{
497    tr_peer_event e = blankEvent;
498    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
499    e.pieceIndex = pieceIndex;
500    publish( msgs, &e );
501}
502
503static void
504firePeerGotData( tr_peermsgs  * msgs,
505                 uint32_t       length,
506                 int            wasPieceData )
507{
508    tr_peer_event e = blankEvent;
509
510    e.length = length;
511    e.eventType = TR_PEER_PEER_GOT_DATA;
512    e.wasPieceData = wasPieceData;
513
514    publish( msgs, &e );
515}
516
517static void
518fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
519{
520    tr_peer_event e = blankEvent;
521    e.eventType = TR_PEER_CANCEL;
522    e.pieceIndex = req->index;
523    e.offset = req->offset;
524    e.length = req->length;
525    publish( msgs, &e );
526}
527
528/**
529***  ALLOWED FAST SET
530***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
531**/
532
533size_t
534tr_generateAllowedSet( tr_piece_index_t * setmePieces,
535                       size_t             desiredSetSize,
536                       size_t             pieceCount,
537                       const uint8_t    * infohash,
538                       const tr_address * addr )
539{
540    size_t setSize = 0;
541
542    assert( setmePieces );
543    assert( desiredSetSize <= pieceCount );
544    assert( desiredSetSize );
545    assert( pieceCount );
546    assert( infohash );
547    assert( addr );
548
549    if( addr->type == TR_AF_INET )
550    {
551        uint8_t w[SHA_DIGEST_LENGTH + 4], *walk=w;
552        uint8_t x[SHA_DIGEST_LENGTH];
553
554        uint32_t ui32 = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
555        memcpy( w, &ui32, sizeof( uint32_t ) );
556        walk += sizeof( uint32_t );
557        memcpy( walk, infohash, SHA_DIGEST_LENGTH );                 /* (2) */
558        walk += SHA_DIGEST_LENGTH;
559        tr_sha1( x, w, walk-w, NULL );                               /* (3) */
560        assert( sizeof( w ) == walk-w );
561
562        while( setSize<desiredSetSize )
563        {
564            int i;
565            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
566            {
567                size_t k;
568                uint32_t j = i * 4;                                  /* (5) */
569                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
570                uint32_t index = y % pieceCount;                     /* (7) */
571
572                for( k=0; k<setSize; ++k )                           /* (8) */
573                    if( setmePieces[k] == index )
574                        break;
575
576                if( k == setSize )
577                    setmePieces[setSize++] = index;                  /* (9) */
578            }
579
580            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
581        }
582    }
583
584    return setSize;
585}
586
587static void
588updateFastSet( tr_peermsgs * msgs UNUSED )
589{
590#if 0
591    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
592    const int peerIsNeedy = msgs->peer->progress < 0.10;
593
594    if( fext && peerIsNeedy && !msgs->haveFastSet )
595    {
596        size_t i;
597        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
598        const tr_info * inf = &msgs->torrent->info;
599        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
600
601        /* build the fast set */
602        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
603        msgs->haveFastSet = 1;
604
605        /* send it to the peer */
606        for( i=0; i<msgs->fastsetSize; ++i )
607            protocolSendAllowedFast( msgs, msgs->fastset[i] );
608    }
609#endif
610}
611
612/**
613***  INTEREST
614**/
615
616static tr_bool
617isPieceInteresting( const tr_peermsgs * msgs,
618                    tr_piece_index_t    piece )
619{
620    const tr_torrent * torrent = msgs->torrent;
621
622    return ( !torrent->info.pieces[piece].dnd )                 /* we want it */
623          && ( !tr_cpPieceIsComplete( &torrent->completion, piece ) ) /* !have */
624          && ( tr_bitfieldHas( msgs->peer->have, piece ) );    /* peer has it */
625}
626
627/* "interested" means we'll ask for piece data if they unchoke us */
628static tr_bool
629isPeerInteresting( const tr_peermsgs * msgs )
630{
631    tr_piece_index_t    i;
632    const tr_torrent *  torrent;
633    const tr_bitfield * bitfield;
634    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
635
636    if( clientIsSeed )
637        return FALSE;
638
639    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
640        return FALSE;
641
642    torrent = msgs->torrent;
643    bitfield = tr_cpPieceBitfield( &torrent->completion );
644
645    if( !msgs->peer->have )
646        return TRUE;
647
648    assert( bitfield->byteCount == msgs->peer->have->byteCount );
649
650    for( i = 0; i < torrent->info.pieceCount; ++i )
651        if( isPieceInteresting( msgs, i ) )
652            return TRUE;
653
654    return FALSE;
655}
656
657static void
658sendInterest( tr_peermsgs * msgs,
659              int           weAreInterested )
660{
661    struct evbuffer * out = msgs->outMessages;
662
663    assert( msgs );
664    assert( weAreInterested == 0 || weAreInterested == 1 );
665
666    msgs->peer->clientIsInterested = weAreInterested;
667    dbgmsg( msgs, "Sending %s", weAreInterested ? "Interested" : "Not Interested" );
668    tr_peerIoWriteUint32( msgs->peer->io, out, sizeof( uint8_t ) );
669    tr_peerIoWriteUint8 ( msgs->peer->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
670
671    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
672    dbgOutMessageLen( msgs );
673}
674
675static void
676updateInterest( tr_peermsgs * msgs )
677{
678    const int i = isPeerInteresting( msgs );
679
680    if( i != msgs->peer->clientIsInterested )
681        sendInterest( msgs, i );
682    if( i )
683        fireNeedReq( msgs );
684}
685
686static tr_bool
687popNextRequest( tr_peermsgs *         msgs,
688                struct peer_request * setme )
689{
690    return reqListPop( &msgs->peerAskedFor, setme );
691}
692
693static void
694cancelAllRequestsToClient( tr_peermsgs * msgs )
695{
696    struct peer_request req;
697    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
698
699    while( popNextRequest( msgs, &req ) )
700        if( mustSendCancel )
701            protocolSendReject( msgs, &req );
702}
703
704void
705tr_peerMsgsSetChoke( tr_peermsgs * msgs,
706                     int           choke )
707{
708    const time_t now = time( NULL );
709    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
710
711    assert( msgs );
712    assert( msgs->peer );
713    assert( choke == 0 || choke == 1 );
714
715    if( msgs->peer->chokeChangedAt > fibrillationTime )
716    {
717        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
718    }
719    else if( msgs->peer->peerIsChoked != choke )
720    {
721        msgs->peer->peerIsChoked = choke;
722        if( choke )
723            cancelAllRequestsToClient( msgs );
724        protocolSendChoke( msgs, choke );
725        msgs->peer->chokeChangedAt = now;
726    }
727}
728
729/**
730***
731**/
732
733void
734tr_peerMsgsHave( tr_peermsgs * msgs,
735                 uint32_t      index )
736{
737    protocolSendHave( msgs, index );
738
739    /* since we have more pieces now, we might not be interested in this peer */
740    updateInterest( msgs );
741}
742
743/**
744***
745**/
746
747static tr_bool
748reqIsValid( const tr_peermsgs * peer,
749            uint32_t            index,
750            uint32_t            offset,
751            uint32_t            length )
752{
753    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
754}
755
756static tr_bool
757requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
758{
759    return reqIsValid( msgs, req->index, req->offset, req->length );
760}
761
762static void
763expireFromList( tr_peermsgs          * msgs,
764                struct request_list  * list,
765                const time_t           oldestAllowed )
766{
767    size_t i;
768    struct request_list tmp = REQUEST_LIST_INIT;
769
770    /* since the fifo list is sorted by time, the oldest will be first */
771    if( !list->len || ( list->fifo[0].time_requested >= oldestAllowed ) )
772        return;
773
774    /* if we found one too old, start pruning them */
775    reqListCopy( &tmp, list );
776    for( i=0; i<tmp.len; ++i ) {
777        const struct peer_request * req = &tmp.fifo[i];
778        if( req->time_requested >= oldestAllowed )
779            break;
780        tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
781    }
782    reqListClear( &tmp );
783}
784
785static void
786expireOldRequests( tr_peermsgs * msgs, const time_t now  )
787{
788    time_t oldestAllowed;
789    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
790    dbgmsg( msgs, "entering `expire old requests' block" );
791
792    /* cancel requests that have been queued for too long */
793    oldestAllowed = now - QUEUED_REQUEST_TTL_SECS;
794    expireFromList( msgs, &msgs->clientWillAskFor, oldestAllowed );
795
796    /* if the peer doesn't support "Reject Request",
797     * cancel requests that were sent too long ago. */
798    if( !fext ) {
799        oldestAllowed = now - SENT_REQUEST_TTL_SECS;
800        expireFromList( msgs, &msgs->clientAskedFor, oldestAllowed );
801    }
802
803    dbgmsg( msgs, "leaving `expire old requests' block" );
804}
805
806static void
807pumpRequestQueue( tr_peermsgs * msgs, const time_t now )
808{
809    const int           max = msgs->maxActiveRequests;
810    int                 sent = 0;
811    int                 len = msgs->clientAskedFor.len;
812    struct peer_request req;
813
814    dbgmsg( msgs, "clientIsChoked %d, download allowed %d, len %d, max %d, msgs->clientWillAskFor.len %d",
815            (int)msgs->peer->clientIsChoked,
816            (int)tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ),
817            len, max, msgs->clientWillAskFor.len );
818
819    if( msgs->peer->clientIsChoked )
820        return;
821    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
822        return;
823
824    while( ( len < max ) && reqListPop( &msgs->clientWillAskFor, &req ) )
825    {
826        const tr_block_index_t block = _tr_block( msgs->torrent, req.index, req.offset );
827
828        assert( requestIsValid( msgs, &req ) );
829        assert( tr_bitfieldHas( msgs->peer->have, req.index ) );
830
831        /* don't ask for it if we've already got it... this block may have
832         * come in from a different peer after we cancelled a request for it */
833        if( !tr_cpBlockIsComplete( &msgs->torrent->completion, block ) )
834        {
835            protocolSendRequest( msgs, &req );
836            req.time_requested = now;
837            reqListAppend( &msgs->clientAskedFor, &req );
838
839            ++len;
840            ++sent;
841        }
842        else dbgmsg( msgs, "not asking for it because we've already got it..." );
843    }
844
845    if( sent )
846        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
847                sent, msgs->clientAskedFor.len, msgs->clientWillAskFor.len );
848
849    if( len < max )
850        fireNeedReq( msgs );
851}
852
853static TR_INLINE tr_bool
854requestQueueIsFull( const tr_peermsgs * msgs )
855{
856    const int req_max = msgs->maxActiveRequests;
857    return msgs->clientWillAskFor.len >= (size_t)req_max;
858}
859
860tr_addreq_t
861tr_peerMsgsAddRequest( tr_peermsgs *    msgs,
862                       uint32_t         index,
863                       uint32_t         offset,
864                       uint32_t         length )
865{
866    struct peer_request req;
867
868    assert( msgs );
869    assert( msgs->torrent );
870
871    /**
872    ***  Reasons to decline the request
873    **/
874
875    /* don't send requests to choked clients */
876    if( msgs->peer->clientIsChoked ) {
877        dbgmsg( msgs, "declining request because they're choking us" );
878        return TR_ADDREQ_CLIENT_CHOKED;
879    }
880
881    /* peer's queue is full */
882    if( requestQueueIsFull( msgs ) ) {
883        dbgmsg( msgs, "declining request because we're full" );
884        return TR_ADDREQ_FULL;
885    }
886
887    /* peer doesn't have this piece */
888    if( !tr_bitfieldHas( msgs->peer->have, index ) )
889        return TR_ADDREQ_MISSING;
890
891    /* have we already asked for this piece? */
892    req.index = index;
893    req.offset = offset;
894    req.length = length;
895    if( reqListHas( &msgs->clientAskedFor, &req ) ) {
896        dbgmsg( msgs, "declining because it's a duplicate" );
897        return TR_ADDREQ_DUPLICATE;
898    }
899    if( reqListHas( &msgs->clientWillAskFor, &req ) ) {
900        dbgmsg( msgs, "declining because it's a duplicate" );
901        return TR_ADDREQ_DUPLICATE;
902    }
903
904    /**
905    ***  Accept this request
906    **/
907
908    dbgmsg( msgs, "adding req for %"PRIu32":%"PRIu32"->%"PRIu32" to our `will request' list",
909            index, offset, length );
910    req.time_requested = time( NULL );
911    reqListAppend( &msgs->clientWillAskFor, &req );
912    return TR_ADDREQ_OK;
913}
914
915static void
916cancelAllRequestsToPeer( tr_peermsgs * msgs, tr_bool sendCancel )
917{
918    size_t i;
919    struct request_list a = msgs->clientWillAskFor;
920    struct request_list b = msgs->clientAskedFor;
921    dbgmsg( msgs, "cancelling all requests to peer" );
922
923    msgs->clientAskedFor = REQUEST_LIST_INIT;
924    msgs->clientWillAskFor = REQUEST_LIST_INIT;
925
926    for( i=0; i<a.len; ++i )
927        fireCancelledReq( msgs, &a.fifo[i] );
928
929    for( i = 0; i < b.len; ++i ) {
930        fireCancelledReq( msgs, &b.fifo[i] );
931        if( sendCancel )
932            protocolSendCancel( msgs, &b.fifo[i] );
933    }
934
935    reqListClear( &a );
936    reqListClear( &b );
937}
938
939void
940tr_peerMsgsCancel( tr_peermsgs * msgs,
941                   uint32_t      pieceIndex,
942                   uint32_t      offset,
943                   uint32_t      length )
944{
945    struct peer_request req;
946
947    assert( msgs != NULL );
948    assert( length > 0 );
949
950
951    /* have we asked the peer for this piece? */
952    req.index = pieceIndex;
953    req.offset = offset;
954    req.length = length;
955
956    /* if it's only in the queue and hasn't been sent yet, free it */
957    if( reqListRemove( &msgs->clientWillAskFor, &req ) ) {
958        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length );
959        fireCancelledReq( msgs, &req );
960    }
961
962    /* if it's already been sent, send a cancel message too */
963    if( reqListRemove( &msgs->clientAskedFor, &req ) ) {
964        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length );
965        protocolSendCancel( msgs, &req );
966        fireCancelledReq( msgs, &req );
967    }
968}
969
970
971/**
972***
973**/
974
975static void
976sendLtepHandshake( tr_peermsgs * msgs )
977{
978    tr_benc val, *m;
979    char * buf;
980    int len;
981    int pex;
982    struct evbuffer * out = msgs->outMessages;
983
984    if( msgs->clientSentLtepHandshake )
985        return;
986
987    dbgmsg( msgs, "sending an ltep handshake" );
988    msgs->clientSentLtepHandshake = 1;
989
990    /* decide if we want to advertise pex support */
991    if( !tr_torrentAllowsPex( msgs->torrent ) )
992        pex = 0;
993    else if( msgs->peerSentLtepHandshake )
994        pex = msgs->peerSupportsPex ? 1 : 0;
995    else
996        pex = 1;
997
998    tr_bencInitDict( &val, 5 );
999    tr_bencDictAddInt( &val, "e", getSession(msgs)->encryptionMode != TR_CLEAR_PREFERRED );
1000    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( getSession(msgs) ) );
1001    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
1002    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
1003    m  = tr_bencDictAddDict( &val, "m", 1 );
1004    if( pex )
1005        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
1006    buf = tr_bencToStr( &val, TR_FMT_BENC, &len );
1007
1008    tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + len );
1009    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
1010    tr_peerIoWriteUint8 ( msgs->peer->io, out, LTEP_HANDSHAKE );
1011    tr_peerIoWriteBytes ( msgs->peer->io, out, buf, len );
1012    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1013    dbgOutMessageLen( msgs );
1014
1015    /* cleanup */
1016    tr_bencFree( &val );
1017    tr_free( buf );
1018}
1019
1020static void
1021parseLtepHandshake( tr_peermsgs *     msgs,
1022                    int               len,
1023                    struct evbuffer * inbuf )
1024{
1025    int64_t   i;
1026    tr_benc   val, * sub;
1027    uint8_t * tmp = tr_new( uint8_t, len );
1028
1029    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
1030    msgs->peerSentLtepHandshake = 1;
1031
1032    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
1033    {
1034        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
1035        tr_free( tmp );
1036        return;
1037    }
1038
1039    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
1040
1041    /* does the peer prefer encrypted connections? */
1042    if( tr_bencDictFindInt( &val, "e", &i ) )
1043        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1044                                              : ENCRYPTION_PREFERENCE_NO;
1045
1046    /* check supported messages for utorrent pex */
1047    msgs->peerSupportsPex = 0;
1048    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
1049        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
1050            msgs->ut_pex_id = (uint8_t) i;
1051            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
1052            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
1053        }
1054    }
1055
1056    /* look for upload_only (BEP 21) */
1057    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
1058        fireUploadOnly( msgs, i!=0 );
1059
1060    /* get peer's listening port */
1061    if( tr_bencDictFindInt( &val, "p", &i ) ) {
1062        tr_atomSetPort( msgs->peer->atom, htons( (uint16_t)i ) );
1063        dbgmsg( msgs, "msgs->port is now %hu", tr_atomGetPort( msgs->peer->atom ) );
1064    }
1065
1066    /* get peer's maximum request queue size */
1067    if( tr_bencDictFindInt( &val, "reqq", &i ) )
1068        msgs->reqq = i;
1069
1070    tr_bencFree( &val );
1071    tr_free( tmp );
1072}
1073
1074static void
1075parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1076{
1077    int loaded = 0;
1078    uint8_t * tmp = tr_new( uint8_t, msglen );
1079    tr_benc val;
1080    tr_torrent * tor = msgs->torrent;
1081    const uint8_t * added;
1082    size_t added_len;
1083
1084    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1085
1086    if( tr_torrentAllowsPex( tor )
1087      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1088    {
1089        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1090        {
1091            tr_pex * pex;
1092            size_t i, n;
1093            size_t added_f_len = 0;
1094            const uint8_t * added_f = NULL;
1095
1096            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1097            pex = tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len, &n );
1098
1099            n = MIN( n, MAX_PEX_PEER_COUNT );
1100            for( i=0; i<n; ++i )
1101                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex + i );
1102
1103            tr_free( pex );
1104        }
1105
1106        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1107        {
1108            tr_pex * pex;
1109            size_t i, n;
1110            size_t added_f_len = 0;
1111            const uint8_t * added_f = NULL;
1112
1113            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1114            pex = tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len, &n );
1115
1116            n = MIN( n, MAX_PEX_PEER_COUNT );
1117            for( i=0; i<n; ++i )
1118                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex + i );
1119
1120            tr_free( pex );
1121        }
1122    }
1123
1124    if( loaded )
1125        tr_bencFree( &val );
1126    tr_free( tmp );
1127}
1128
1129static void sendPex( tr_peermsgs * msgs );
1130
1131static void
1132parseLtep( tr_peermsgs *     msgs,
1133           int               msglen,
1134           struct evbuffer * inbuf )
1135{
1136    uint8_t ltep_msgid;
1137
1138    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1139    msglen--;
1140
1141    if( ltep_msgid == LTEP_HANDSHAKE )
1142    {
1143        dbgmsg( msgs, "got ltep handshake" );
1144        parseLtepHandshake( msgs, msglen, inbuf );
1145        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1146        {
1147            sendLtepHandshake( msgs );
1148            sendPex( msgs );
1149        }
1150    }
1151    else if( ltep_msgid == TR_LTEP_PEX )
1152    {
1153        dbgmsg( msgs, "got ut pex" );
1154        msgs->peerSupportsPex = 1;
1155        parseUtPex( msgs, msglen, inbuf );
1156    }
1157    else
1158    {
1159        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1160        evbuffer_drain( inbuf, msglen );
1161    }
1162}
1163
1164static int
1165readBtLength( tr_peermsgs *     msgs,
1166              struct evbuffer * inbuf,
1167              size_t            inlen )
1168{
1169    uint32_t len;
1170
1171    if( inlen < sizeof( len ) )
1172        return READ_LATER;
1173
1174    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1175
1176    if( len == 0 ) /* peer sent us a keepalive message */
1177        dbgmsg( msgs, "got KeepAlive" );
1178    else
1179    {
1180        msgs->incoming.length = len;
1181        msgs->state = AWAITING_BT_ID;
1182    }
1183
1184    return READ_NOW;
1185}
1186
1187static int readBtMessage( tr_peermsgs *     msgs,
1188                          struct evbuffer * inbuf,
1189                          size_t            inlen );
1190
1191static int
1192readBtId( tr_peermsgs *     msgs,
1193          struct evbuffer * inbuf,
1194          size_t            inlen )
1195{
1196    uint8_t id;
1197
1198    if( inlen < sizeof( uint8_t ) )
1199        return READ_LATER;
1200
1201    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1202    msgs->incoming.id = id;
1203    dbgmsg( msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length );
1204
1205    if( id == BT_PIECE )
1206    {
1207        msgs->state = AWAITING_BT_PIECE;
1208        return READ_NOW;
1209    }
1210    else if( msgs->incoming.length != 1 )
1211    {
1212        msgs->state = AWAITING_BT_MESSAGE;
1213        return READ_NOW;
1214    }
1215    else return readBtMessage( msgs, inbuf, inlen - 1 );
1216}
1217
1218static void
1219updatePeerProgress( tr_peermsgs * msgs )
1220{
1221    msgs->peer->progress = tr_bitfieldCountTrueBits( msgs->peer->have ) / (float)msgs->torrent->info.pieceCount;
1222    dbgmsg( msgs, "peer progress is %f", msgs->peer->progress );
1223    updateFastSet( msgs );
1224    updateInterest( msgs );
1225    firePeerProgress( msgs );
1226}
1227
1228static void
1229peerMadeRequest( tr_peermsgs *               msgs,
1230                 const struct peer_request * req )
1231{
1232    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1233    const int reqIsValid = requestIsValid( msgs, req );
1234    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( &msgs->torrent->completion, req->index );
1235    const int peerIsChoked = msgs->peer->peerIsChoked;
1236
1237    int allow = FALSE;
1238
1239    if( !reqIsValid )
1240        dbgmsg( msgs, "rejecting an invalid request." );
1241    else if( !clientHasPiece )
1242        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1243    else if( peerIsChoked )
1244        dbgmsg( msgs, "rejecting request from choked peer" );
1245    else
1246        allow = TRUE;
1247
1248    if( allow )
1249        reqListAppend( &msgs->peerAskedFor, req );
1250    else if( fext )
1251        protocolSendReject( msgs, req );
1252}
1253
1254static tr_bool
1255messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1256{
1257    switch( id )
1258    {
1259        case BT_CHOKE:
1260        case BT_UNCHOKE:
1261        case BT_INTERESTED:
1262        case BT_NOT_INTERESTED:
1263        case BT_FEXT_HAVE_ALL:
1264        case BT_FEXT_HAVE_NONE:
1265            return len == 1;
1266
1267        case BT_HAVE:
1268        case BT_FEXT_SUGGEST:
1269        case BT_FEXT_ALLOWED_FAST:
1270            return len == 5;
1271
1272        case BT_BITFIELD:
1273            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1274
1275        case BT_REQUEST:
1276        case BT_CANCEL:
1277        case BT_FEXT_REJECT:
1278            return len == 13;
1279
1280        case BT_PIECE:
1281            return len > 9 && len <= 16393;
1282
1283        case BT_PORT:
1284            return len == 3;
1285
1286        case BT_LTEP:
1287            return len >= 2;
1288
1289        default:
1290            return FALSE;
1291    }
1292}
1293
1294static int clientGotBlock( tr_peermsgs *               msgs,
1295                           const uint8_t *             block,
1296                           const struct peer_request * req );
1297
1298static int
1299readBtPiece( tr_peermsgs      * msgs,
1300             struct evbuffer  * inbuf,
1301             size_t             inlen,
1302             size_t           * setme_piece_bytes_read )
1303{
1304    struct peer_request * req = &msgs->incoming.blockReq;
1305
1306    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1307    dbgmsg( msgs, "In readBtPiece" );
1308
1309    if( !req->length )
1310    {
1311        if( inlen < 8 )
1312            return READ_LATER;
1313
1314        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1315        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1316        req->length = msgs->incoming.length - 9;
1317        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1318        return READ_NOW;
1319    }
1320    else
1321    {
1322        int err;
1323
1324        /* read in another chunk of data */
1325        const size_t nLeft = req->length - EVBUFFER_LENGTH( msgs->incoming.block );
1326        size_t n = MIN( nLeft, inlen );
1327        size_t i = n;
1328
1329        while( i > 0 )
1330        {
1331            uint8_t buf[MAX_STACK_ARRAY_SIZE];
1332            const size_t thisPass = MIN( i, sizeof( buf ) );
1333            tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, thisPass );
1334            evbuffer_add( msgs->incoming.block, buf, thisPass );
1335            i -= thisPass;
1336        }
1337
1338        fireClientGotData( msgs, n, TRUE );
1339        *setme_piece_bytes_read += n;
1340        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1341               n, req->index, req->offset, req->length,
1342               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1343        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1344            return READ_LATER;
1345
1346        /* we've got the whole block ... process it */
1347        err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), req );
1348
1349        /* cleanup */
1350        evbuffer_free( msgs->incoming.block );
1351        msgs->incoming.block = evbuffer_new( );
1352        req->length = 0;
1353        msgs->state = AWAITING_BT_LENGTH;
1354        return err ? READ_ERR : READ_NOW;
1355    }
1356}
1357
1358static int
1359readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1360{
1361    uint32_t      ui32;
1362    uint32_t      msglen = msgs->incoming.length;
1363    const uint8_t id = msgs->incoming.id;
1364    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1365    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1366
1367    --msglen; /* id length */
1368
1369    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1370
1371    if( inlen < msglen )
1372        return READ_LATER;
1373
1374    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1375    {
1376        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1377        fireError( msgs, EMSGSIZE );
1378        return READ_ERR;
1379    }
1380
1381    switch( id )
1382    {
1383        case BT_CHOKE:
1384            dbgmsg( msgs, "got Choke" );
1385            msgs->peer->clientIsChoked = 1;
1386            if( !fext )
1387                cancelAllRequestsToPeer( msgs, FALSE );
1388            break;
1389
1390        case BT_UNCHOKE:
1391            dbgmsg( msgs, "got Unchoke" );
1392            msgs->peer->clientIsChoked = 0;
1393            fireNeedReq( msgs );
1394            break;
1395
1396        case BT_INTERESTED:
1397            dbgmsg( msgs, "got Interested" );
1398            msgs->peer->peerIsInterested = 1;
1399            break;
1400
1401        case BT_NOT_INTERESTED:
1402            dbgmsg( msgs, "got Not Interested" );
1403            msgs->peer->peerIsInterested = 0;
1404            break;
1405
1406        case BT_HAVE:
1407            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1408            dbgmsg( msgs, "got Have: %u", ui32 );
1409            if( tr_bitfieldAdd( msgs->peer->have, ui32 ) ) {
1410                fireError( msgs, ERANGE );
1411                return READ_ERR;
1412            }
1413            updatePeerProgress( msgs );
1414            tr_rcTransferred( &msgs->torrent->swarmSpeed,
1415                              msgs->torrent->info.pieceSize );
1416            break;
1417
1418        case BT_BITFIELD:
1419        {
1420            dbgmsg( msgs, "got a bitfield" );
1421            tr_peerIoReadBytes( msgs->peer->io, inbuf, msgs->peer->have->bits, msglen );
1422            updatePeerProgress( msgs );
1423            fireNeedReq( msgs );
1424            break;
1425        }
1426
1427        case BT_REQUEST:
1428        {
1429            struct peer_request r;
1430            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1431            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1432            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1433            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1434            peerMadeRequest( msgs, &r );
1435            break;
1436        }
1437
1438        case BT_CANCEL:
1439        {
1440            struct peer_request r;
1441            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1442            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1443            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1444            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1445            if( reqListRemove( &msgs->peerAskedFor, &r ) && fext )
1446                protocolSendReject( msgs, &r );
1447            break;
1448        }
1449
1450        case BT_PIECE:
1451            assert( 0 ); /* handled elsewhere! */
1452            break;
1453
1454        case BT_PORT:
1455            dbgmsg( msgs, "Got a BT_PORT" );
1456            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->dht_port );
1457            if( msgs->peer->dht_port > 0 )
1458                tr_dhtAddNode( getSession(msgs), tr_atomGetAddr( msgs->peer->atom ), msgs->peer->dht_port, 0 );
1459            break;
1460
1461        case BT_FEXT_SUGGEST:
1462            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1463            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1464            if( fext )
1465                fireClientGotSuggest( msgs, ui32 );
1466            else {
1467                fireError( msgs, EMSGSIZE );
1468                return READ_ERR;
1469            }
1470            break;
1471
1472        case BT_FEXT_ALLOWED_FAST:
1473            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1474            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1475            if( fext )
1476                fireClientGotAllowedFast( msgs, ui32 );
1477            else {
1478                fireError( msgs, EMSGSIZE );
1479                return READ_ERR;
1480            }
1481            break;
1482
1483        case BT_FEXT_HAVE_ALL:
1484            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1485            if( fext ) {
1486                tr_bitfieldAddRange( msgs->peer->have, 0, msgs->torrent->info.pieceCount );
1487                updatePeerProgress( msgs );
1488            } else {
1489                fireError( msgs, EMSGSIZE );
1490                return READ_ERR;
1491            }
1492            break;
1493
1494        case BT_FEXT_HAVE_NONE:
1495            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1496            if( fext ) {
1497                tr_bitfieldClear( msgs->peer->have );
1498                updatePeerProgress( msgs );
1499            } else {
1500                fireError( msgs, EMSGSIZE );
1501                return READ_ERR;
1502            }
1503            break;
1504
1505        case BT_FEXT_REJECT:
1506        {
1507            struct peer_request r;
1508            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1509            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1510            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1511            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1512            if( fext )
1513                reqListRemove( &msgs->clientAskedFor, &r );
1514            else {
1515                fireError( msgs, EMSGSIZE );
1516                return READ_ERR;
1517            }
1518            break;
1519        }
1520
1521        case BT_LTEP:
1522            dbgmsg( msgs, "Got a BT_LTEP" );
1523            parseLtep( msgs, msglen, inbuf );
1524            break;
1525
1526        default:
1527            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1528            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1529            break;
1530    }
1531
1532    assert( msglen + 1 == msgs->incoming.length );
1533    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1534
1535    msgs->state = AWAITING_BT_LENGTH;
1536    return READ_NOW;
1537}
1538
1539static TR_INLINE void
1540decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount )
1541{
1542    tr_torrent * tor = msgs->torrent;
1543
1544    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1545}
1546
1547static TR_INLINE void
1548clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1549{
1550    decrementDownloadedCount( msgs, req->length );
1551}
1552
1553static void
1554addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1555{
1556    if( !msgs->peer->blame )
1557         msgs->peer->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1558    tr_bitfieldAdd( msgs->peer->blame, index );
1559}
1560
1561/* returns 0 on success, or an errno on failure */
1562static int
1563clientGotBlock( tr_peermsgs *               msgs,
1564                const uint8_t *             data,
1565                const struct peer_request * req )
1566{
1567    int err;
1568    tr_torrent * tor = msgs->torrent;
1569    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1570
1571    assert( msgs );
1572    assert( req );
1573
1574    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1575        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1576                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1577        return EMSGSIZE;
1578    }
1579
1580    /* save the block */
1581    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1582
1583    /**
1584    *** Remove the block from our `we asked for this' list
1585    **/
1586
1587    if( !reqListRemove( &msgs->clientAskedFor, req ) ) {
1588        clientGotUnwantedBlock( msgs, req );
1589        dbgmsg( msgs, "we didn't ask for this message..." );
1590        return 0;
1591    }
1592
1593    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1594            msgs->clientAskedFor.len );
1595
1596    /**
1597    *** Error checks
1598    **/
1599
1600    if( tr_cpBlockIsComplete( &tor->completion, block ) ) {
1601        dbgmsg( msgs, "we have this block already..." );
1602        clientGotUnwantedBlock( msgs, req );
1603        return 0;
1604    }
1605
1606    /**
1607    ***  Save the block
1608    **/
1609
1610    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1611        return err;
1612
1613    addPeerToBlamefield( msgs, req->index );
1614    fireGotBlock( msgs, req );
1615    return 0;
1616}
1617
1618static int peerPulse( void * vmsgs );
1619
1620static void
1621didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1622{
1623    tr_peermsgs * msgs = vmsgs;
1624    firePeerGotData( msgs, bytesWritten, wasPieceData );
1625
1626    if ( tr_isPeerIo( io ) && io->userData )
1627        peerPulse( msgs );
1628}
1629
1630static ReadState
1631canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1632{
1633    ReadState         ret;
1634    tr_peermsgs *     msgs = vmsgs;
1635    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1636    const size_t      inlen = EVBUFFER_LENGTH( in );
1637
1638    dbgmsg( msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state );
1639
1640    if( !inlen )
1641    {
1642        ret = READ_LATER;
1643    }
1644    else if( msgs->state == AWAITING_BT_PIECE )
1645    {
1646        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1647    }
1648    else switch( msgs->state )
1649    {
1650        case AWAITING_BT_LENGTH:
1651            ret = readBtLength ( msgs, in, inlen ); break;
1652
1653        case AWAITING_BT_ID:
1654            ret = readBtId     ( msgs, in, inlen ); break;
1655
1656        case AWAITING_BT_MESSAGE:
1657            ret = readBtMessage( msgs, in, inlen ); break;
1658
1659        default:
1660            ret = READ_ERR;
1661            assert( 0 );
1662    }
1663
1664    dbgmsg( msgs, "canRead: ret is %d", (int)ret );
1665
1666    /* log the raw data that was read */
1667    if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
1668        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1669
1670    return ret;
1671}
1672
1673/**
1674***
1675**/
1676
1677static int
1678ratePulse( tr_peermsgs * msgs, uint64_t now )
1679{
1680    int irate;
1681    const int floor = 8;
1682    const int seconds = 10;
1683    double rate;
1684    int estimatedBlocksInPeriod;
1685    const tr_torrent * const torrent = msgs->torrent;
1686
1687    /* Get the rate limit we should use.
1688     * FIXME: this needs to consider all the other peers as well... */
1689    rate = tr_peerGetPieceSpeed( msgs->peer, now, TR_PEER_TO_CLIENT );
1690    if( tr_torrentUsesSpeedLimit( torrent, TR_PEER_TO_CLIENT ) )
1691        rate = MIN( rate, tr_torrentGetSpeedLimit( torrent, TR_PEER_TO_CLIENT ) );
1692    if( tr_torrentUsesSessionLimits( torrent ) )
1693        if( tr_sessionGetActiveSpeedLimit( torrent->session, TR_PEER_TO_CLIENT, &irate ) )
1694            rate = MIN( rate, irate );
1695
1696    estimatedBlocksInPeriod = ( rate * seconds * 1024 ) / torrent->blockSize;
1697    msgs->maxActiveRequests = MAX( floor, estimatedBlocksInPeriod );
1698
1699    if( msgs->reqq > 0 )
1700        msgs->maxActiveRequests = MIN( msgs->maxActiveRequests, msgs->reqq );
1701
1702    return TRUE;
1703}
1704
1705static size_t
1706fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1707{
1708    size_t bytesWritten = 0;
1709    struct peer_request req;
1710    const tr_bool haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1711    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1712
1713    /**
1714    ***  Protocol messages
1715    **/
1716
1717    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1718    {
1719        dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1720        msgs->outMessagesBatchedAt = now;
1721    }
1722    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1723    {
1724        const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
1725        /* flush the protocol messages */
1726        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1727        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, FALSE );
1728        msgs->clientSentAnythingAt = now;
1729        msgs->outMessagesBatchedAt = 0;
1730        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1731        bytesWritten +=  len;
1732    }
1733
1734    /**
1735    ***  Blocks
1736    **/
1737
1738    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= msgs->torrent->blockSize )
1739        && popNextRequest( msgs, &req ) )
1740    {
1741        if( requestIsValid( msgs, &req )
1742            && tr_cpPieceIsComplete( &msgs->torrent->completion, req.index ) )
1743        {
1744            /* FIXME(libevent2) use evbuffer_reserve_space() + evbuffer_commit_space() */
1745            int err;
1746            const uint32_t msglen = 4 + 1 + 4 + 4 + req.length;
1747            struct evbuffer * out;
1748            tr_peerIo * io = msgs->peer->io;
1749
1750            out = evbuffer_new( );
1751            evbuffer_expand( out, msglen );
1752
1753            tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1754            tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1755            tr_peerIoWriteUint32( io, out, req.index );
1756            tr_peerIoWriteUint32( io, out, req.offset );
1757
1758            err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, EVBUFFER_DATA(out)+EVBUFFER_LENGTH(out) );
1759            if( err )
1760            {
1761                if( fext )
1762                    protocolSendReject( msgs, &req );
1763            }
1764            else
1765            {
1766                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1767                EVBUFFER_LENGTH(out) += req.length;
1768                assert( EVBUFFER_LENGTH( out ) == msglen );
1769                tr_peerIoWriteBuf( io, out, TRUE );
1770                bytesWritten += EVBUFFER_LENGTH( out );
1771                msgs->clientSentAnythingAt = now;
1772            }
1773
1774            evbuffer_free( out );
1775
1776            if( err )
1777            {
1778                bytesWritten = 0;
1779                msgs = NULL;
1780            }
1781        }
1782        else if( fext ) /* peer needs a reject message */
1783        {
1784            protocolSendReject( msgs, &req );
1785        }
1786    }
1787
1788    /**
1789    ***  Keepalive
1790    **/
1791
1792    if( ( msgs != NULL )
1793        && ( msgs->clientSentAnythingAt != 0 )
1794        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1795    {
1796        dbgmsg( msgs, "sending a keepalive message" );
1797        tr_peerIoWriteUint32( msgs->peer->io, msgs->outMessages, 0 );
1798        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1799    }
1800
1801    return bytesWritten;
1802}
1803
1804static int
1805peerPulse( void * vmsgs )
1806{
1807    tr_peermsgs * msgs = vmsgs;
1808    const time_t  now = time( NULL );
1809
1810    ratePulse( msgs, now );
1811
1812    pumpRequestQueue( msgs, now );
1813    expireOldRequests( msgs, now );
1814
1815    for( ;; )
1816        if( fillOutputBuffer( msgs, now ) < 1 )
1817            break;
1818
1819    return TRUE; /* loop forever */
1820}
1821
1822void
1823tr_peerMsgsPulse( tr_peermsgs * msgs )
1824{
1825    if( msgs != NULL )
1826        peerPulse( msgs );
1827}
1828
1829static void
1830gotError( tr_peerIo  * io UNUSED,
1831          short        what,
1832          void       * vmsgs )
1833{
1834    if( what & EVBUFFER_TIMEOUT )
1835        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1836    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1837        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1838               what, errno, tr_strerror( errno ) );
1839    fireError( vmsgs, ENOTCONN );
1840}
1841
1842static void
1843sendBitfield( tr_peermsgs * msgs )
1844{
1845    struct evbuffer * out = msgs->outMessages;
1846    tr_bitfield *     field;
1847    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
1848    size_t            i;
1849    size_t            lazyCount = 0;
1850
1851    field = tr_bitfieldDup( tr_cpPieceBitfield( &msgs->torrent->completion ) );
1852
1853    if( tr_sessionIsLazyBitfieldEnabled( getSession( msgs ) ) )
1854    {
1855        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
1856            speed over a truly random sample -- let's limit the pool size to
1857            the first 1000 pieces so large torrents don't bog things down */
1858        size_t poolSize;
1859        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
1860        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
1861
1862        /* build the pool */
1863        for( i=poolSize=0; i<maxPoolSize; ++i )
1864            if( tr_bitfieldHas( field, i ) )
1865                pool[poolSize++] = i;
1866
1867        /* pull random piece indices from the pool */
1868        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
1869        {
1870            const int pos = tr_cryptoWeakRandInt( poolSize );
1871            const tr_piece_index_t piece = pool[pos];
1872            tr_bitfieldRem( field, piece );
1873            lazyPieces[lazyCount++] = piece;
1874            pool[pos] = pool[--poolSize];
1875        }
1876
1877        /* cleanup */
1878        tr_free( pool );
1879    }
1880
1881    tr_peerIoWriteUint32( msgs->peer->io, out,
1882                          sizeof( uint8_t ) + field->byteCount );
1883    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_BITFIELD );
1884    /* FIXME(libevent2): use evbuffer_add_reference() */
1885    tr_peerIoWriteBytes ( msgs->peer->io, out, field->bits, field->byteCount );
1886    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu",
1887            EVBUFFER_LENGTH( out ) );
1888    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1889
1890    for( i = 0; i < lazyCount; ++i )
1891        protocolSendHave( msgs, lazyPieces[i] );
1892
1893    tr_bitfieldFree( field );
1894}
1895
1896static void
1897tellPeerWhatWeHave( tr_peermsgs * msgs )
1898{
1899    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1900
1901    if( fext && ( tr_cpGetStatus( &msgs->torrent->completion ) == TR_SEED ) )
1902    {
1903        protocolSendHaveAll( msgs );
1904    }
1905    else if( fext && ( tr_cpHaveValid( &msgs->torrent->completion ) == 0 ) )
1906    {
1907        protocolSendHaveNone( msgs );
1908    }
1909    else
1910    {
1911        sendBitfield( msgs );
1912    }
1913}
1914
1915/**
1916***
1917**/
1918
1919/* some peers give us error messages if we send
1920   more than this many peers in a single pex message
1921   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1922#define MAX_PEX_ADDED 50
1923#define MAX_PEX_DROPPED 50
1924
1925typedef struct
1926{
1927    tr_pex *  added;
1928    tr_pex *  dropped;
1929    tr_pex *  elements;
1930    int       addedCount;
1931    int       droppedCount;
1932    int       elementCount;
1933}
1934PexDiffs;
1935
1936static void
1937pexAddedCb( void * vpex,
1938            void * userData )
1939{
1940    PexDiffs * diffs = userData;
1941    tr_pex *   pex = vpex;
1942
1943    if( diffs->addedCount < MAX_PEX_ADDED )
1944    {
1945        diffs->added[diffs->addedCount++] = *pex;
1946        diffs->elements[diffs->elementCount++] = *pex;
1947    }
1948}
1949
1950static TR_INLINE void
1951pexDroppedCb( void * vpex,
1952              void * userData )
1953{
1954    PexDiffs * diffs = userData;
1955    tr_pex *   pex = vpex;
1956
1957    if( diffs->droppedCount < MAX_PEX_DROPPED )
1958    {
1959        diffs->dropped[diffs->droppedCount++] = *pex;
1960    }
1961}
1962
1963static TR_INLINE void
1964pexElementCb( void * vpex,
1965              void * userData )
1966{
1967    PexDiffs * diffs = userData;
1968    tr_pex * pex = vpex;
1969
1970    diffs->elements[diffs->elementCount++] = *pex;
1971}
1972
1973static void
1974sendPex( tr_peermsgs * msgs )
1975{
1976    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
1977    {
1978        PexDiffs diffs;
1979        PexDiffs diffs6;
1980        tr_pex * newPex = NULL;
1981        tr_pex * newPex6 = NULL;
1982        const int newCount = tr_peerMgrGetPeers( msgs->torrent, &newPex, TR_AF_INET, MAX_PEX_PEER_COUNT );
1983        const int newCount6 = tr_peerMgrGetPeers( msgs->torrent, &newPex6, TR_AF_INET6, MAX_PEX_PEER_COUNT );
1984
1985        /* build the diffs */
1986        diffs.added = tr_new( tr_pex, newCount );
1987        diffs.addedCount = 0;
1988        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1989        diffs.droppedCount = 0;
1990        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1991        diffs.elementCount = 0;
1992        tr_set_compare( msgs->pex, msgs->pexCount,
1993                        newPex, newCount,
1994                        tr_pexCompare, sizeof( tr_pex ),
1995                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
1996        diffs6.added = tr_new( tr_pex, newCount6 );
1997        diffs6.addedCount = 0;
1998        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
1999        diffs6.droppedCount = 0;
2000        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
2001        diffs6.elementCount = 0;
2002        tr_set_compare( msgs->pex6, msgs->pexCount6,
2003                        newPex6, newCount6,
2004                        tr_pexCompare, sizeof( tr_pex ),
2005                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
2006        dbgmsg(
2007            msgs,
2008            "pex: old peer count %d, new peer count %d, added %d, removed %d",
2009            msgs->pexCount, newCount + newCount6,
2010            diffs.addedCount + diffs6.addedCount,
2011            diffs.droppedCount + diffs6.droppedCount );
2012
2013        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2014            !diffs6.droppedCount )
2015        {
2016            tr_free( diffs.elements );
2017            tr_free( diffs6.elements );
2018        }
2019        else
2020        {
2021            int  i;
2022            tr_benc val;
2023            char * benc;
2024            int bencLen;
2025            uint8_t * tmp, *walk;
2026            tr_peerIo       * io  = msgs->peer->io;
2027            struct evbuffer * out = msgs->outMessages;
2028
2029            /* update peer */
2030            tr_free( msgs->pex );
2031            msgs->pex = diffs.elements;
2032            msgs->pexCount = diffs.elementCount;
2033            tr_free( msgs->pex6 );
2034            msgs->pex6 = diffs6.elements;
2035            msgs->pexCount6 = diffs6.elementCount;
2036
2037            /* build the pex payload */
2038            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
2039                                         * speed vs. likelihood? */
2040
2041            /* "added" */
2042            tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2043            for( i = 0; i < diffs.addedCount; ++i )
2044            {
2045                memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2046                memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2047            }
2048            assert( ( walk - tmp ) == diffs.addedCount * 6 );
2049            tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2050            tr_free( tmp );
2051
2052            /* "added.f" */
2053            tmp = walk = tr_new( uint8_t, diffs.addedCount );
2054            for( i = 0; i < diffs.addedCount; ++i )
2055                *walk++ = diffs.added[i].flags;
2056            assert( ( walk - tmp ) == diffs.addedCount );
2057            tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2058            tr_free( tmp );
2059
2060            /* "dropped" */
2061            tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2062            for( i = 0; i < diffs.droppedCount; ++i )
2063            {
2064                memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2065                memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2066            }
2067            assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2068            tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2069            tr_free( tmp );
2070
2071            /* "added6" */
2072            tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2073            for( i = 0; i < diffs6.addedCount; ++i )
2074            {
2075                memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2076                walk += 16;
2077                memcpy( walk, &diffs6.added[i].port, 2 );
2078                walk += 2;
2079            }
2080            assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2081            tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2082            tr_free( tmp );
2083
2084            /* "added6.f" */
2085            tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2086            for( i = 0; i < diffs6.addedCount; ++i )
2087                *walk++ = diffs6.added[i].flags;
2088            assert( ( walk - tmp ) == diffs6.addedCount );
2089            tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2090            tr_free( tmp );
2091
2092            /* "dropped6" */
2093            tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2094            for( i = 0; i < diffs6.droppedCount; ++i )
2095            {
2096                memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2097                walk += 16;
2098                memcpy( walk, &diffs6.dropped[i].port, 2 );
2099                walk += 2;
2100            }
2101            assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2102            tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2103            tr_free( tmp );
2104
2105            /* write the pex message */
2106            benc = tr_bencToStr( &val, TR_FMT_BENC, &bencLen );
2107            tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + bencLen );
2108            tr_peerIoWriteUint8 ( io, out, BT_LTEP );
2109            tr_peerIoWriteUint8 ( io, out, msgs->ut_pex_id );
2110            tr_peerIoWriteBytes ( io, out, benc, bencLen );
2111            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2112            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
2113            dbgOutMessageLen( msgs );
2114
2115            tr_free( benc );
2116            tr_bencFree( &val );
2117        }
2118
2119        /* cleanup */
2120        tr_free( diffs.added );
2121        tr_free( diffs.dropped );
2122        tr_free( newPex );
2123        tr_free( diffs6.added );
2124        tr_free( diffs6.dropped );
2125        tr_free( newPex6 );
2126
2127        /*msgs->clientSentPexAt = time( NULL );*/
2128    }
2129}
2130
2131static void
2132pexPulse( int foo UNUSED, short bar UNUSED, void * vmsgs )
2133{
2134    struct tr_peermsgs * msgs = vmsgs;
2135
2136    sendPex( msgs );
2137
2138    tr_timerAdd( &msgs->pexTimer, PEX_INTERVAL_SECS, 0 );
2139}
2140
2141/**
2142***
2143**/
2144
2145tr_peermsgs*
2146tr_peerMsgsNew( struct tr_torrent * torrent,
2147                struct tr_peer    * peer,
2148                tr_delivery_func    func,
2149                void              * userData,
2150                tr_publisher_tag  * setme )
2151{
2152    tr_peermsgs * m;
2153
2154    assert( peer );
2155    assert( peer->io );
2156
2157    m = tr_new0( tr_peermsgs, 1 );
2158    m->publisher = TR_PUBLISHER_INIT;
2159    m->peer = peer;
2160    m->torrent = torrent;
2161    m->peer->clientIsChoked = 1;
2162    m->peer->peerIsChoked = 1;
2163    m->peer->clientIsInterested = 0;
2164    m->peer->peerIsInterested = 0;
2165    m->peer->have = tr_bitfieldNew( torrent->info.pieceCount );
2166    m->state = AWAITING_BT_LENGTH;
2167    m->outMessages = evbuffer_new( );
2168    m->outMessagesBatchedAt = 0;
2169    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2170    m->incoming.block = evbuffer_new( );
2171    m->peerAskedFor = REQUEST_LIST_INIT;
2172    m->clientAskedFor = REQUEST_LIST_INIT;
2173    m->clientWillAskFor = REQUEST_LIST_INIT;
2174    evtimer_set( &m->pexTimer, pexPulse, m );
2175    tr_timerAdd( &m->pexTimer, PEX_INTERVAL_SECS, 0 );
2176    peer->msgs = m;
2177
2178    *setme = tr_publisherSubscribe( &m->publisher, func, userData );
2179
2180    if( tr_peerIoSupportsLTEP( peer->io ) )
2181        sendLtepHandshake( m );
2182
2183    if(tr_peerIoSupportsDHT(peer->io))
2184        protocolSendPort(m, tr_dhtPort(torrent->session));
2185
2186    tellPeerWhatWeHave( m );
2187
2188    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2189    ratePulse( m, tr_date() );
2190
2191    return m;
2192}
2193
2194void
2195tr_peerMsgsFree( tr_peermsgs* msgs )
2196{
2197    if( msgs )
2198    {
2199        evtimer_del( &msgs->pexTimer );
2200        tr_publisherDestruct( &msgs->publisher );
2201        reqListClear( &msgs->clientWillAskFor );
2202        reqListClear( &msgs->clientAskedFor );
2203        reqListClear( &msgs->peerAskedFor );
2204
2205        evbuffer_free( msgs->incoming.block );
2206        evbuffer_free( msgs->outMessages );
2207        tr_free( msgs->pex6 );
2208        tr_free( msgs->pex );
2209
2210        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2211        tr_free( msgs );
2212    }
2213}
2214
2215void
2216tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2217                        tr_publisher_tag tag )
2218{
2219    tr_publisherUnsubscribe( &peer->publisher, tag );
2220}
2221
Note: See TracBrowser for help on using the repository browser.