source: trunk/libtransmission/peer-msgs.c @ 9434

Last change on this file since 9434 was 9434, checked in by charles, 13 years ago

(trunk libT) #2508 atom->port never updated

  • Property svn:keywords set to Date Rev Author Id
File size: 64.6 KB
Line 
1/*
2 * This file Copyright (C) 2007-2009 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 9434 2009-10-29 16:10:03Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <event.h>
21
22#include "transmission.h"
23#include "bencode.h"
24#include "completion.h"
25#include "crypto.h"
26#include "inout.h"
27#ifdef WIN32
28#include "net.h" /* for ECONN */
29#endif
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-msgs.h"
33#include "platform.h" /* MAX_STACK_ARRAY_SIZE */
34#include "ratecontrol.h"
35#include "request-list.h"
36#include "session.h"
37#include "stats.h"
38#include "torrent.h"
39#include "tr-dht.h"
40#include "utils.h"
41#include "version.h"
42
43/**
44***
45**/
46
47enum
48{
49    BT_CHOKE                = 0,
50    BT_UNCHOKE              = 1,
51    BT_INTERESTED           = 2,
52    BT_NOT_INTERESTED       = 3,
53    BT_HAVE                 = 4,
54    BT_BITFIELD             = 5,
55    BT_REQUEST              = 6,
56    BT_PIECE                = 7,
57    BT_CANCEL               = 8,
58    BT_PORT                 = 9,
59
60    BT_FEXT_SUGGEST         = 13,
61    BT_FEXT_HAVE_ALL        = 14,
62    BT_FEXT_HAVE_NONE       = 15,
63    BT_FEXT_REJECT          = 16,
64    BT_FEXT_ALLOWED_FAST    = 17,
65
66    BT_LTEP                 = 20,
67
68    LTEP_HANDSHAKE          = 0,
69
70    TR_LTEP_PEX             = 1,
71
72    MAX_PEX_PEER_COUNT      = 50,
73
74    MIN_CHOKE_PERIOD_SEC    = ( 10 ),
75
76    /* idle seconds before we send a keepalive */
77    KEEPALIVE_INTERVAL_SECS = 100,
78
79    PEX_INTERVAL_SECS       = 90, /* sec between sendPex() calls */
80
81
82    MAX_BLOCK_SIZE          = ( 1024 * 16 ),
83
84
85    /* how long an unsent request can stay queued before it's returned
86       back to the peer-mgr's pool of requests */
87    QUEUED_REQUEST_TTL_SECS = 20,
88
89    /* how long a sent request can stay queued before it's returned
90       back to the peer-mgr's pool of requests */
91    SENT_REQUEST_TTL_SECS = 240,
92
93    /* used in lowering the outMessages queue period */
94    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
95    HIGH_PRIORITY_INTERVAL_SECS = 2,
96    LOW_PRIORITY_INTERVAL_SECS = 20,
97
98    /* number of pieces to remove from the bitfield when
99     * lazy bitfields are turned on */
100    LAZY_PIECE_COUNT = 26,
101
102    /* number of pieces we'll allow in our fast set */
103    MAX_FAST_SET_SIZE = 3
104};
105
106enum
107{
108    AWAITING_BT_LENGTH,
109    AWAITING_BT_ID,
110    AWAITING_BT_MESSAGE,
111    AWAITING_BT_PIECE
112};
113
114/**
115***
116**/
117
118/* this is raw, unchanged data from the peer regarding
119 * the current message that it's sending us. */
120struct tr_incoming
121{
122    uint8_t                id;
123    uint32_t               length; /* includes the +1 for id length */
124    struct peer_request    blockReq; /* metadata for incoming blocks */
125    struct evbuffer *      block; /* piece data for incoming blocks */
126};
127
128/**
129 * Low-level communication state information about a connected peer.
130 *
131 * This structure remembers the low-level protocol states that we're
132 * in with this peer, such as active requests, pex messages, and so on.
133 * Its fields are all private to peer-msgs.c.
134 *
135 * Data not directly involved with sending & receiving messages is
136 * stored in tr_peer, where it can be accessed by both peermsgs and
137 * the peer manager.
138 *
139 * @see struct peer_atom
140 * @see tr_peer
141 */
142struct tr_peermsgs
143{
144    tr_bool         peerSupportsPex;
145    tr_bool         clientSentLtepHandshake;
146    tr_bool         peerSentLtepHandshake;
147    /*tr_bool         haveFastSet;*/
148
149    /* how long the outMessages batch should be allowed to grow before
150     * it's flushed -- some messages (like requests >:) should be sent
151     * very quickly; others aren't as urgent. */
152    int8_t          outMessagesBatchPeriod;
153
154    uint8_t         state;
155    uint8_t         ut_pex_id;
156    uint16_t        pexCount;
157    uint16_t        pexCount6;
158    uint16_t        maxActiveRequests;
159
160#if 0
161    size_t                 fastsetSize;
162    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
163#endif
164
165    tr_peer *              peer;
166
167    tr_torrent *           torrent;
168
169    tr_publisher           publisher;
170
171    struct evbuffer *      outMessages; /* all the non-piece messages */
172
173    struct request_list    peerAskedFor;
174    struct request_list    clientAskedFor;
175    struct request_list    clientWillAskFor;
176
177    tr_pex               * pex;
178    tr_pex               * pex6;
179
180    /*time_t                 clientSentPexAt;*/
181    time_t                 clientSentAnythingAt;
182
183    /* when we started batching the outMessages */
184    time_t                outMessagesBatchedAt;
185
186    struct tr_incoming    incoming;
187
188    /* if the peer supports the Extension Protocol in BEP 10 and
189       supplied a reqq argument, it's stored here.  otherwise the
190       value is zero and should be ignored. */
191    int64_t               reqq;
192
193    struct event          pexTimer;
194};
195
196static TR_INLINE tr_session*
197getSession( struct tr_peermsgs * msgs )
198{
199    return msgs->torrent->session;
200}
201
202/**
203***
204**/
205
206static void
207myDebug( const char * file, int line,
208         const struct tr_peermsgs * msgs,
209         const char * fmt, ... )
210{
211    FILE * fp = tr_getLog( );
212
213    if( fp )
214    {
215        va_list           args;
216        char              timestr[64];
217        struct evbuffer * buf = evbuffer_new( );
218        char *            base = tr_basename( file );
219
220        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
221                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
222                             tr_torrentName( msgs->torrent ),
223                             tr_peerIoGetAddrStr( msgs->peer->io ),
224                             msgs->peer->client );
225        va_start( args, fmt );
226        evbuffer_add_vprintf( buf, fmt, args );
227        va_end( args );
228        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
229        /* FIXME(libevent2) tr_getLog() should return an fd, then use evbuffer_write() here */
230        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
231
232        tr_free( base );
233        evbuffer_free( buf );
234    }
235}
236
237#define dbgmsg( msgs, ... ) \
238    do { \
239        if( tr_deepLoggingIsActive( ) ) \
240            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
241    } while( 0 )
242
243/**
244***
245**/
246
247static void
248pokeBatchPeriod( tr_peermsgs * msgs,
249                 int           interval )
250{
251    if( msgs->outMessagesBatchPeriod > interval )
252    {
253        msgs->outMessagesBatchPeriod = interval;
254        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
255    }
256}
257
258static TR_INLINE void
259dbgOutMessageLen( tr_peermsgs * msgs )
260{
261    dbgmsg( msgs, "outMessage size is now %zu", EVBUFFER_LENGTH( msgs->outMessages ) );
262}
263
264static void
265protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
266{
267    tr_peerIo       * io  = msgs->peer->io;
268    struct evbuffer * out = msgs->outMessages;
269
270    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
271
272    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
273    tr_peerIoWriteUint8 ( io, out, BT_FEXT_REJECT );
274    tr_peerIoWriteUint32( io, out, req->index );
275    tr_peerIoWriteUint32( io, out, req->offset );
276    tr_peerIoWriteUint32( io, out, req->length );
277
278    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
279    dbgOutMessageLen( msgs );
280}
281
282static void
283protocolSendRequest( tr_peermsgs               * msgs,
284                     const struct peer_request * req )
285{
286    tr_peerIo       * io  = msgs->peer->io;
287    struct evbuffer * out = msgs->outMessages;
288
289    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
290    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
291    tr_peerIoWriteUint32( io, out, req->index );
292    tr_peerIoWriteUint32( io, out, req->offset );
293    tr_peerIoWriteUint32( io, out, req->length );
294
295    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
296    dbgOutMessageLen( msgs );
297    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
298}
299
300static void
301protocolSendCancel( tr_peermsgs               * msgs,
302                    const struct peer_request * req )
303{
304    tr_peerIo       * io  = msgs->peer->io;
305    struct evbuffer * out = msgs->outMessages;
306
307    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
308    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
309    tr_peerIoWriteUint32( io, out, req->index );
310    tr_peerIoWriteUint32( io, out, req->offset );
311    tr_peerIoWriteUint32( io, out, req->length );
312
313    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
314    dbgOutMessageLen( msgs );
315    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
316}
317
318static void
319protocolSendPort(tr_peermsgs *msgs, uint16_t port)
320{
321    tr_peerIo       * io  = msgs->peer->io;
322    struct evbuffer * out = msgs->outMessages;
323
324    dbgmsg( msgs, "sending Port %u", port);
325    tr_peerIoWriteUint32( io, out, 3 );
326    tr_peerIoWriteUint8 ( io, out, BT_PORT );
327    tr_peerIoWriteUint16( io, out, port);
328}
329
330static void
331protocolSendHave( tr_peermsgs * msgs,
332                  uint32_t      index )
333{
334    tr_peerIo       * io  = msgs->peer->io;
335    struct evbuffer * out = msgs->outMessages;
336
337    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
338    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
339    tr_peerIoWriteUint32( io, out, index );
340
341    dbgmsg( msgs, "sending Have %u", index );
342    dbgOutMessageLen( msgs );
343    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
344}
345
346#if 0
347static void
348protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
349{
350    tr_peerIo       * io  = msgs->peer->io;
351    struct evbuffer * out = msgs->outMessages;
352
353    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
354
355    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
356    tr_peerIoWriteUint8 ( io, out, BT_FEXT_ALLOWED_FAST );
357    tr_peerIoWriteUint32( io, out, pieceIndex );
358
359    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
360    dbgOutMessageLen( msgs );
361}
362#endif
363
364static void
365protocolSendChoke( tr_peermsgs * msgs,
366                   int           choke )
367{
368    tr_peerIo       * io  = msgs->peer->io;
369    struct evbuffer * out = msgs->outMessages;
370
371    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
372    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
373
374    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
375    dbgOutMessageLen( msgs );
376    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
377}
378
379static void
380protocolSendHaveAll( tr_peermsgs * msgs )
381{
382    tr_peerIo       * io  = msgs->peer->io;
383    struct evbuffer * out = msgs->outMessages;
384
385    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
386
387    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
388    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_ALL );
389
390    dbgmsg( msgs, "sending HAVE_ALL..." );
391    dbgOutMessageLen( msgs );
392    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
393}
394
395static void
396protocolSendHaveNone( tr_peermsgs * msgs )
397{
398    tr_peerIo       * io  = msgs->peer->io;
399    struct evbuffer * out = msgs->outMessages;
400
401    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
402
403    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
404    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_NONE );
405
406    dbgmsg( msgs, "sending HAVE_NONE..." );
407    dbgOutMessageLen( msgs );
408    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
409}
410
411/**
412***  EVENTS
413**/
414
415static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0, 0, 0 };
416
417static void
418publish( tr_peermsgs * msgs, tr_peer_event * e )
419{
420    assert( msgs->peer );
421    assert( msgs->peer->msgs == msgs );
422
423    tr_publisherPublish( &msgs->publisher, msgs->peer, e );
424}
425
426static void
427fireError( tr_peermsgs * msgs, int err )
428{
429    tr_peer_event e = blankEvent;
430    e.eventType = TR_PEER_ERROR;
431    e.err = err;
432    publish( msgs, &e );
433}
434
435static void
436fireUploadOnly( tr_peermsgs * msgs, tr_bool uploadOnly )
437{
438    tr_peer_event e = blankEvent;
439    e.eventType = TR_PEER_UPLOAD_ONLY;
440    e.uploadOnly = uploadOnly;
441    publish( msgs, &e );
442}
443
444static void
445fireNeedReq( tr_peermsgs * msgs )
446{
447    tr_peer_event e = blankEvent;
448    e.eventType = TR_PEER_NEED_REQ;
449    publish( msgs, &e );
450}
451
452static void
453firePeerProgress( tr_peermsgs * msgs )
454{
455    tr_peer_event e = blankEvent;
456    e.eventType = TR_PEER_PEER_PROGRESS;
457    e.progress = msgs->peer->progress;
458    publish( msgs, &e );
459}
460
461static void
462fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
463{
464    tr_peer_event e = blankEvent;
465    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
466    e.pieceIndex = req->index;
467    e.offset = req->offset;
468    e.length = req->length;
469    publish( msgs, &e );
470}
471
472static void
473fireClientGotData( tr_peermsgs * msgs,
474                   uint32_t      length,
475                   int           wasPieceData )
476{
477    tr_peer_event e = blankEvent;
478
479    e.length = length;
480    e.eventType = TR_PEER_CLIENT_GOT_DATA;
481    e.wasPieceData = wasPieceData;
482    publish( msgs, &e );
483}
484
485static void
486fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
487{
488    tr_peer_event e = blankEvent;
489    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
490    e.pieceIndex = pieceIndex;
491    publish( msgs, &e );
492}
493
494static void
495fireClientGotPort( tr_peermsgs * msgs, tr_port port )
496{
497    tr_peer_event e = blankEvent;
498    e.eventType = TR_PEER_CLIENT_GOT_PORT;
499    e.port = port;
500    publish( msgs, &e );
501}
502
503static void
504fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
505{
506    tr_peer_event e = blankEvent;
507    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
508    e.pieceIndex = pieceIndex;
509    publish( msgs, &e );
510}
511
512static void
513firePeerGotData( tr_peermsgs  * msgs,
514                 uint32_t       length,
515                 int            wasPieceData )
516{
517    tr_peer_event e = blankEvent;
518
519    e.length = length;
520    e.eventType = TR_PEER_PEER_GOT_DATA;
521    e.wasPieceData = wasPieceData;
522
523    publish( msgs, &e );
524}
525
526static void
527fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
528{
529    tr_peer_event e = blankEvent;
530    e.eventType = TR_PEER_CANCEL;
531    e.pieceIndex = req->index;
532    e.offset = req->offset;
533    e.length = req->length;
534    publish( msgs, &e );
535}
536
537/**
538***  ALLOWED FAST SET
539***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
540**/
541
542size_t
543tr_generateAllowedSet( tr_piece_index_t * setmePieces,
544                       size_t             desiredSetSize,
545                       size_t             pieceCount,
546                       const uint8_t    * infohash,
547                       const tr_address * addr )
548{
549    size_t setSize = 0;
550
551    assert( setmePieces );
552    assert( desiredSetSize <= pieceCount );
553    assert( desiredSetSize );
554    assert( pieceCount );
555    assert( infohash );
556    assert( addr );
557
558    if( addr->type == TR_AF_INET )
559    {
560        uint8_t w[SHA_DIGEST_LENGTH + 4], *walk=w;
561        uint8_t x[SHA_DIGEST_LENGTH];
562
563        uint32_t ui32 = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
564        memcpy( w, &ui32, sizeof( uint32_t ) );
565        walk += sizeof( uint32_t );
566        memcpy( walk, infohash, SHA_DIGEST_LENGTH );                 /* (2) */
567        walk += SHA_DIGEST_LENGTH;
568        tr_sha1( x, w, walk-w, NULL );                               /* (3) */
569        assert( sizeof( w ) == walk-w );
570
571        while( setSize<desiredSetSize )
572        {
573            int i;
574            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
575            {
576                size_t k;
577                uint32_t j = i * 4;                                  /* (5) */
578                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
579                uint32_t index = y % pieceCount;                     /* (7) */
580
581                for( k=0; k<setSize; ++k )                           /* (8) */
582                    if( setmePieces[k] == index )
583                        break;
584
585                if( k == setSize )
586                    setmePieces[setSize++] = index;                  /* (9) */
587            }
588
589            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
590        }
591    }
592
593    return setSize;
594}
595
596static void
597updateFastSet( tr_peermsgs * msgs UNUSED )
598{
599#if 0
600    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
601    const int peerIsNeedy = msgs->peer->progress < 0.10;
602
603    if( fext && peerIsNeedy && !msgs->haveFastSet )
604    {
605        size_t i;
606        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
607        const tr_info * inf = &msgs->torrent->info;
608        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
609
610        /* build the fast set */
611        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
612        msgs->haveFastSet = 1;
613
614        /* send it to the peer */
615        for( i=0; i<msgs->fastsetSize; ++i )
616            protocolSendAllowedFast( msgs, msgs->fastset[i] );
617    }
618#endif
619}
620
621/**
622***  INTEREST
623**/
624
625static tr_bool
626isPieceInteresting( const tr_peermsgs * msgs,
627                    tr_piece_index_t    piece )
628{
629    const tr_torrent * torrent = msgs->torrent;
630
631    return ( !torrent->info.pieces[piece].dnd )                 /* we want it */
632          && ( !tr_cpPieceIsComplete( &torrent->completion, piece ) ) /* !have */
633          && ( tr_bitfieldHas( msgs->peer->have, piece ) );    /* peer has it */
634}
635
636/* "interested" means we'll ask for piece data if they unchoke us */
637static tr_bool
638isPeerInteresting( const tr_peermsgs * msgs )
639{
640    tr_piece_index_t    i;
641    const tr_torrent *  torrent;
642    const tr_bitfield * bitfield;
643    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
644
645    if( clientIsSeed )
646        return FALSE;
647
648    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
649        return FALSE;
650
651    torrent = msgs->torrent;
652    bitfield = tr_cpPieceBitfield( &torrent->completion );
653
654    if( !msgs->peer->have )
655        return TRUE;
656
657    assert( bitfield->byteCount == msgs->peer->have->byteCount );
658
659    for( i = 0; i < torrent->info.pieceCount; ++i )
660        if( isPieceInteresting( msgs, i ) )
661            return TRUE;
662
663    return FALSE;
664}
665
666static void
667sendInterest( tr_peermsgs * msgs,
668              int           weAreInterested )
669{
670    struct evbuffer * out = msgs->outMessages;
671
672    assert( msgs );
673    assert( weAreInterested == 0 || weAreInterested == 1 );
674
675    msgs->peer->clientIsInterested = weAreInterested;
676    dbgmsg( msgs, "Sending %s", weAreInterested ? "Interested" : "Not Interested" );
677    tr_peerIoWriteUint32( msgs->peer->io, out, sizeof( uint8_t ) );
678    tr_peerIoWriteUint8 ( msgs->peer->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
679
680    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
681    dbgOutMessageLen( msgs );
682}
683
684static void
685updateInterest( tr_peermsgs * msgs )
686{
687    const int i = isPeerInteresting( msgs );
688
689    if( i != msgs->peer->clientIsInterested )
690        sendInterest( msgs, i );
691    if( i )
692        fireNeedReq( msgs );
693}
694
695static tr_bool
696popNextRequest( tr_peermsgs *         msgs,
697                struct peer_request * setme )
698{
699    return reqListPop( &msgs->peerAskedFor, setme );
700}
701
702static void
703cancelAllRequestsToClient( tr_peermsgs * msgs )
704{
705    struct peer_request req;
706    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
707
708    while( popNextRequest( msgs, &req ) )
709        if( mustSendCancel )
710            protocolSendReject( msgs, &req );
711}
712
713void
714tr_peerMsgsSetChoke( tr_peermsgs * msgs,
715                     int           choke )
716{
717    const time_t now = time( NULL );
718    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
719
720    assert( msgs );
721    assert( msgs->peer );
722    assert( choke == 0 || choke == 1 );
723
724    if( msgs->peer->chokeChangedAt > fibrillationTime )
725    {
726        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
727    }
728    else if( msgs->peer->peerIsChoked != choke )
729    {
730        msgs->peer->peerIsChoked = choke;
731        if( choke )
732            cancelAllRequestsToClient( msgs );
733        protocolSendChoke( msgs, choke );
734        msgs->peer->chokeChangedAt = now;
735    }
736}
737
738/**
739***
740**/
741
742void
743tr_peerMsgsHave( tr_peermsgs * msgs,
744                 uint32_t      index )
745{
746    protocolSendHave( msgs, index );
747
748    /* since we have more pieces now, we might not be interested in this peer */
749    updateInterest( msgs );
750}
751
752/**
753***
754**/
755
756static tr_bool
757reqIsValid( const tr_peermsgs * peer,
758            uint32_t            index,
759            uint32_t            offset,
760            uint32_t            length )
761{
762    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
763}
764
765static tr_bool
766requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
767{
768    return reqIsValid( msgs, req->index, req->offset, req->length );
769}
770
771static void
772expireFromList( tr_peermsgs          * msgs,
773                struct request_list  * list,
774                const time_t           oldestAllowed )
775{
776    size_t i;
777    struct request_list tmp = REQUEST_LIST_INIT;
778
779    /* since the fifo list is sorted by time, the oldest will be first */
780    if( !list->len || ( list->fifo[0].time_requested >= oldestAllowed ) )
781        return;
782
783    /* if we found one too old, start pruning them */
784    reqListCopy( &tmp, list );
785    for( i=0; i<tmp.len; ++i ) {
786        const struct peer_request * req = &tmp.fifo[i];
787        if( req->time_requested >= oldestAllowed )
788            break;
789        tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
790    }
791    reqListClear( &tmp );
792}
793
794static void
795expireOldRequests( tr_peermsgs * msgs, const time_t now  )
796{
797    time_t oldestAllowed;
798    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
799    dbgmsg( msgs, "entering `expire old requests' block" );
800
801    /* cancel requests that have been queued for too long */
802    oldestAllowed = now - QUEUED_REQUEST_TTL_SECS;
803    expireFromList( msgs, &msgs->clientWillAskFor, oldestAllowed );
804
805    /* if the peer doesn't support "Reject Request",
806     * cancel requests that were sent too long ago. */
807    if( !fext ) {
808        oldestAllowed = now - SENT_REQUEST_TTL_SECS;
809        expireFromList( msgs, &msgs->clientAskedFor, oldestAllowed );
810    }
811
812    dbgmsg( msgs, "leaving `expire old requests' block" );
813}
814
815static void
816pumpRequestQueue( tr_peermsgs * msgs, const time_t now )
817{
818    const int           max = msgs->maxActiveRequests;
819    int                 sent = 0;
820    int                 len = msgs->clientAskedFor.len;
821    struct peer_request req;
822
823    dbgmsg( msgs, "clientIsChoked %d, download allowed %d, len %d, max %d, msgs->clientWillAskFor.len %d",
824            (int)msgs->peer->clientIsChoked,
825            (int)tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ),
826            len, max, msgs->clientWillAskFor.len );
827
828    if( msgs->peer->clientIsChoked )
829        return;
830    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
831        return;
832
833    while( ( len < max ) && reqListPop( &msgs->clientWillAskFor, &req ) )
834    {
835        const tr_block_index_t block = _tr_block( msgs->torrent, req.index, req.offset );
836
837        assert( requestIsValid( msgs, &req ) );
838        assert( tr_bitfieldHas( msgs->peer->have, req.index ) );
839
840        /* don't ask for it if we've already got it... this block may have
841         * come in from a different peer after we cancelled a request for it */
842        if( !tr_cpBlockIsComplete( &msgs->torrent->completion, block ) )
843        {
844            protocolSendRequest( msgs, &req );
845            req.time_requested = now;
846            reqListAppend( &msgs->clientAskedFor, &req );
847
848            ++len;
849            ++sent;
850        }
851        else dbgmsg( msgs, "not asking for it because we've already got it..." );
852    }
853
854    if( sent )
855        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
856                sent, msgs->clientAskedFor.len, msgs->clientWillAskFor.len );
857
858    if( len < max )
859        fireNeedReq( msgs );
860}
861
862static TR_INLINE tr_bool
863requestQueueIsFull( const tr_peermsgs * msgs )
864{
865    const int req_max = msgs->maxActiveRequests;
866    return msgs->clientWillAskFor.len >= (size_t)req_max;
867}
868
869tr_addreq_t
870tr_peerMsgsAddRequest( tr_peermsgs *    msgs,
871                       uint32_t         index,
872                       uint32_t         offset,
873                       uint32_t         length )
874{
875    struct peer_request req;
876
877    assert( msgs );
878    assert( msgs->torrent );
879
880    /**
881    ***  Reasons to decline the request
882    **/
883
884    /* don't send requests to choked clients */
885    if( msgs->peer->clientIsChoked ) {
886        dbgmsg( msgs, "declining request because they're choking us" );
887        return TR_ADDREQ_CLIENT_CHOKED;
888    }
889
890    /* peer's queue is full */
891    if( requestQueueIsFull( msgs ) ) {
892        dbgmsg( msgs, "declining request because we're full" );
893        return TR_ADDREQ_FULL;
894    }
895
896    /* peer doesn't have this piece */
897    if( !tr_bitfieldHas( msgs->peer->have, index ) )
898        return TR_ADDREQ_MISSING;
899
900    /* have we already asked for this piece? */
901    req.index = index;
902    req.offset = offset;
903    req.length = length;
904    if( reqListHas( &msgs->clientAskedFor, &req ) ) {
905        dbgmsg( msgs, "declining because it's a duplicate" );
906        return TR_ADDREQ_DUPLICATE;
907    }
908    if( reqListHas( &msgs->clientWillAskFor, &req ) ) {
909        dbgmsg( msgs, "declining because it's a duplicate" );
910        return TR_ADDREQ_DUPLICATE;
911    }
912
913    /**
914    ***  Accept this request
915    **/
916
917    dbgmsg( msgs, "adding req for %"PRIu32":%"PRIu32"->%"PRIu32" to our `will request' list",
918            index, offset, length );
919    req.time_requested = time( NULL );
920    reqListAppend( &msgs->clientWillAskFor, &req );
921    return TR_ADDREQ_OK;
922}
923
924static void
925cancelAllRequestsToPeer( tr_peermsgs * msgs, tr_bool sendCancel )
926{
927    size_t i;
928    struct request_list a = msgs->clientWillAskFor;
929    struct request_list b = msgs->clientAskedFor;
930    dbgmsg( msgs, "cancelling all requests to peer" );
931
932    msgs->clientAskedFor = REQUEST_LIST_INIT;
933    msgs->clientWillAskFor = REQUEST_LIST_INIT;
934
935    for( i=0; i<a.len; ++i )
936        fireCancelledReq( msgs, &a.fifo[i] );
937
938    for( i = 0; i < b.len; ++i ) {
939        fireCancelledReq( msgs, &b.fifo[i] );
940        if( sendCancel )
941            protocolSendCancel( msgs, &b.fifo[i] );
942    }
943
944    reqListClear( &a );
945    reqListClear( &b );
946}
947
948void
949tr_peerMsgsCancel( tr_peermsgs * msgs,
950                   uint32_t      pieceIndex,
951                   uint32_t      offset,
952                   uint32_t      length )
953{
954    struct peer_request req;
955
956    assert( msgs != NULL );
957    assert( length > 0 );
958
959
960    /* have we asked the peer for this piece? */
961    req.index = pieceIndex;
962    req.offset = offset;
963    req.length = length;
964
965    /* if it's only in the queue and hasn't been sent yet, free it */
966    if( reqListRemove( &msgs->clientWillAskFor, &req ) ) {
967        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length );
968        fireCancelledReq( msgs, &req );
969    }
970
971    /* if it's already been sent, send a cancel message too */
972    if( reqListRemove( &msgs->clientAskedFor, &req ) ) {
973        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length );
974        protocolSendCancel( msgs, &req );
975        fireCancelledReq( msgs, &req );
976    }
977}
978
979
980/**
981***
982**/
983
984static void
985sendLtepHandshake( tr_peermsgs * msgs )
986{
987    tr_benc val, *m;
988    char * buf;
989    int len;
990    int pex;
991    struct evbuffer * out = msgs->outMessages;
992
993    if( msgs->clientSentLtepHandshake )
994        return;
995
996    dbgmsg( msgs, "sending an ltep handshake" );
997    msgs->clientSentLtepHandshake = 1;
998
999    /* decide if we want to advertise pex support */
1000    if( !tr_torrentAllowsPex( msgs->torrent ) )
1001        pex = 0;
1002    else if( msgs->peerSentLtepHandshake )
1003        pex = msgs->peerSupportsPex ? 1 : 0;
1004    else
1005        pex = 1;
1006
1007    tr_bencInitDict( &val, 5 );
1008    tr_bencDictAddInt( &val, "e", getSession(msgs)->encryptionMode != TR_CLEAR_PREFERRED );
1009    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( getSession(msgs) ) );
1010    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
1011    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
1012    m  = tr_bencDictAddDict( &val, "m", 1 );
1013    if( pex )
1014        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
1015    buf = tr_bencToStr( &val, TR_FMT_BENC, &len );
1016
1017    tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + len );
1018    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
1019    tr_peerIoWriteUint8 ( msgs->peer->io, out, LTEP_HANDSHAKE );
1020    tr_peerIoWriteBytes ( msgs->peer->io, out, buf, len );
1021    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1022    dbgOutMessageLen( msgs );
1023
1024    /* cleanup */
1025    tr_bencFree( &val );
1026    tr_free( buf );
1027}
1028
1029static void
1030parseLtepHandshake( tr_peermsgs *     msgs,
1031                    int               len,
1032                    struct evbuffer * inbuf )
1033{
1034    int64_t   i;
1035    tr_benc   val, * sub;
1036    uint8_t * tmp = tr_new( uint8_t, len );
1037
1038    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
1039    msgs->peerSentLtepHandshake = 1;
1040
1041    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
1042    {
1043        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
1044        tr_free( tmp );
1045        return;
1046    }
1047
1048    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
1049
1050    /* does the peer prefer encrypted connections? */
1051    if( tr_bencDictFindInt( &val, "e", &i ) )
1052        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1053                                              : ENCRYPTION_PREFERENCE_NO;
1054
1055    /* check supported messages for utorrent pex */
1056    msgs->peerSupportsPex = 0;
1057    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
1058        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
1059            msgs->ut_pex_id = (uint8_t) i;
1060            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
1061            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
1062        }
1063    }
1064
1065    /* look for upload_only (BEP 21) */
1066    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
1067        fireUploadOnly( msgs, i!=0 );
1068
1069    /* get peer's listening port */
1070    if( tr_bencDictFindInt( &val, "p", &i ) ) {
1071        fireClientGotPort( msgs, (tr_port)i );
1072        dbgmsg( msgs, "peer's port is now %d", (int)i );
1073    }
1074
1075    /* get peer's maximum request queue size */
1076    if( tr_bencDictFindInt( &val, "reqq", &i ) )
1077        msgs->reqq = i;
1078
1079    tr_bencFree( &val );
1080    tr_free( tmp );
1081}
1082
1083static void
1084parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1085{
1086    int loaded = 0;
1087    uint8_t * tmp = tr_new( uint8_t, msglen );
1088    tr_benc val;
1089    tr_torrent * tor = msgs->torrent;
1090    const uint8_t * added;
1091    size_t added_len;
1092
1093    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1094
1095    if( tr_torrentAllowsPex( tor )
1096      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1097    {
1098        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1099        {
1100            tr_pex * pex;
1101            size_t i, n;
1102            size_t added_f_len = 0;
1103            const uint8_t * added_f = NULL;
1104
1105            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1106            pex = tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len, &n );
1107
1108            n = MIN( n, MAX_PEX_PEER_COUNT );
1109            for( i=0; i<n; ++i )
1110                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex + i );
1111
1112            tr_free( pex );
1113        }
1114
1115        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1116        {
1117            tr_pex * pex;
1118            size_t i, n;
1119            size_t added_f_len = 0;
1120            const uint8_t * added_f = NULL;
1121
1122            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1123            pex = tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len, &n );
1124
1125            n = MIN( n, MAX_PEX_PEER_COUNT );
1126            for( i=0; i<n; ++i )
1127                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex + i );
1128
1129            tr_free( pex );
1130        }
1131    }
1132
1133    if( loaded )
1134        tr_bencFree( &val );
1135    tr_free( tmp );
1136}
1137
1138static void sendPex( tr_peermsgs * msgs );
1139
1140static void
1141parseLtep( tr_peermsgs *     msgs,
1142           int               msglen,
1143           struct evbuffer * inbuf )
1144{
1145    uint8_t ltep_msgid;
1146
1147    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1148    msglen--;
1149
1150    if( ltep_msgid == LTEP_HANDSHAKE )
1151    {
1152        dbgmsg( msgs, "got ltep handshake" );
1153        parseLtepHandshake( msgs, msglen, inbuf );
1154        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1155        {
1156            sendLtepHandshake( msgs );
1157            sendPex( msgs );
1158        }
1159    }
1160    else if( ltep_msgid == TR_LTEP_PEX )
1161    {
1162        dbgmsg( msgs, "got ut pex" );
1163        msgs->peerSupportsPex = 1;
1164        parseUtPex( msgs, msglen, inbuf );
1165    }
1166    else
1167    {
1168        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1169        evbuffer_drain( inbuf, msglen );
1170    }
1171}
1172
1173static int
1174readBtLength( tr_peermsgs *     msgs,
1175              struct evbuffer * inbuf,
1176              size_t            inlen )
1177{
1178    uint32_t len;
1179
1180    if( inlen < sizeof( len ) )
1181        return READ_LATER;
1182
1183    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1184
1185    if( len == 0 ) /* peer sent us a keepalive message */
1186        dbgmsg( msgs, "got KeepAlive" );
1187    else
1188    {
1189        msgs->incoming.length = len;
1190        msgs->state = AWAITING_BT_ID;
1191    }
1192
1193    return READ_NOW;
1194}
1195
1196static int readBtMessage( tr_peermsgs *     msgs,
1197                          struct evbuffer * inbuf,
1198                          size_t            inlen );
1199
1200static int
1201readBtId( tr_peermsgs *     msgs,
1202          struct evbuffer * inbuf,
1203          size_t            inlen )
1204{
1205    uint8_t id;
1206
1207    if( inlen < sizeof( uint8_t ) )
1208        return READ_LATER;
1209
1210    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1211    msgs->incoming.id = id;
1212    dbgmsg( msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length );
1213
1214    if( id == BT_PIECE )
1215    {
1216        msgs->state = AWAITING_BT_PIECE;
1217        return READ_NOW;
1218    }
1219    else if( msgs->incoming.length != 1 )
1220    {
1221        msgs->state = AWAITING_BT_MESSAGE;
1222        return READ_NOW;
1223    }
1224    else return readBtMessage( msgs, inbuf, inlen - 1 );
1225}
1226
1227static void
1228updatePeerProgress( tr_peermsgs * msgs )
1229{
1230    msgs->peer->progress = tr_bitfieldCountTrueBits( msgs->peer->have ) / (float)msgs->torrent->info.pieceCount;
1231    dbgmsg( msgs, "peer progress is %f", msgs->peer->progress );
1232    updateFastSet( msgs );
1233    updateInterest( msgs );
1234    firePeerProgress( msgs );
1235}
1236
1237static void
1238peerMadeRequest( tr_peermsgs *               msgs,
1239                 const struct peer_request * req )
1240{
1241    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1242    const int reqIsValid = requestIsValid( msgs, req );
1243    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( &msgs->torrent->completion, req->index );
1244    const int peerIsChoked = msgs->peer->peerIsChoked;
1245
1246    int allow = FALSE;
1247
1248    if( !reqIsValid )
1249        dbgmsg( msgs, "rejecting an invalid request." );
1250    else if( !clientHasPiece )
1251        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1252    else if( peerIsChoked )
1253        dbgmsg( msgs, "rejecting request from choked peer" );
1254    else
1255        allow = TRUE;
1256
1257    if( allow )
1258        reqListAppend( &msgs->peerAskedFor, req );
1259    else if( fext )
1260        protocolSendReject( msgs, req );
1261}
1262
1263static tr_bool
1264messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1265{
1266    switch( id )
1267    {
1268        case BT_CHOKE:
1269        case BT_UNCHOKE:
1270        case BT_INTERESTED:
1271        case BT_NOT_INTERESTED:
1272        case BT_FEXT_HAVE_ALL:
1273        case BT_FEXT_HAVE_NONE:
1274            return len == 1;
1275
1276        case BT_HAVE:
1277        case BT_FEXT_SUGGEST:
1278        case BT_FEXT_ALLOWED_FAST:
1279            return len == 5;
1280
1281        case BT_BITFIELD:
1282            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1283
1284        case BT_REQUEST:
1285        case BT_CANCEL:
1286        case BT_FEXT_REJECT:
1287            return len == 13;
1288
1289        case BT_PIECE:
1290            return len > 9 && len <= 16393;
1291
1292        case BT_PORT:
1293            return len == 3;
1294
1295        case BT_LTEP:
1296            return len >= 2;
1297
1298        default:
1299            return FALSE;
1300    }
1301}
1302
1303static int clientGotBlock( tr_peermsgs *               msgs,
1304                           const uint8_t *             block,
1305                           const struct peer_request * req );
1306
1307static int
1308readBtPiece( tr_peermsgs      * msgs,
1309             struct evbuffer  * inbuf,
1310             size_t             inlen,
1311             size_t           * setme_piece_bytes_read )
1312{
1313    struct peer_request * req = &msgs->incoming.blockReq;
1314
1315    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1316    dbgmsg( msgs, "In readBtPiece" );
1317
1318    if( !req->length )
1319    {
1320        if( inlen < 8 )
1321            return READ_LATER;
1322
1323        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1324        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1325        req->length = msgs->incoming.length - 9;
1326        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1327        return READ_NOW;
1328    }
1329    else
1330    {
1331        int err;
1332
1333        /* read in another chunk of data */
1334        const size_t nLeft = req->length - EVBUFFER_LENGTH( msgs->incoming.block );
1335        size_t n = MIN( nLeft, inlen );
1336        size_t i = n;
1337
1338        while( i > 0 )
1339        {
1340            uint8_t buf[MAX_STACK_ARRAY_SIZE];
1341            const size_t thisPass = MIN( i, sizeof( buf ) );
1342            tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, thisPass );
1343            evbuffer_add( msgs->incoming.block, buf, thisPass );
1344            i -= thisPass;
1345        }
1346
1347        fireClientGotData( msgs, n, TRUE );
1348        *setme_piece_bytes_read += n;
1349        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1350               n, req->index, req->offset, req->length,
1351               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1352        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1353            return READ_LATER;
1354
1355        /* we've got the whole block ... process it */
1356        err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), req );
1357
1358        /* cleanup */
1359        evbuffer_free( msgs->incoming.block );
1360        msgs->incoming.block = evbuffer_new( );
1361        req->length = 0;
1362        msgs->state = AWAITING_BT_LENGTH;
1363        return err ? READ_ERR : READ_NOW;
1364    }
1365}
1366
1367static int
1368readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1369{
1370    uint32_t      ui32;
1371    uint32_t      msglen = msgs->incoming.length;
1372    const uint8_t id = msgs->incoming.id;
1373    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1374    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1375
1376    --msglen; /* id length */
1377
1378    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1379
1380    if( inlen < msglen )
1381        return READ_LATER;
1382
1383    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1384    {
1385        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1386        fireError( msgs, EMSGSIZE );
1387        return READ_ERR;
1388    }
1389
1390    switch( id )
1391    {
1392        case BT_CHOKE:
1393            dbgmsg( msgs, "got Choke" );
1394            msgs->peer->clientIsChoked = 1;
1395            if( !fext )
1396                cancelAllRequestsToPeer( msgs, FALSE );
1397            break;
1398
1399        case BT_UNCHOKE:
1400            dbgmsg( msgs, "got Unchoke" );
1401            msgs->peer->clientIsChoked = 0;
1402            fireNeedReq( msgs );
1403            break;
1404
1405        case BT_INTERESTED:
1406            dbgmsg( msgs, "got Interested" );
1407            msgs->peer->peerIsInterested = 1;
1408            break;
1409
1410        case BT_NOT_INTERESTED:
1411            dbgmsg( msgs, "got Not Interested" );
1412            msgs->peer->peerIsInterested = 0;
1413            break;
1414
1415        case BT_HAVE:
1416            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1417            dbgmsg( msgs, "got Have: %u", ui32 );
1418            if( tr_bitfieldAdd( msgs->peer->have, ui32 ) ) {
1419                fireError( msgs, ERANGE );
1420                return READ_ERR;
1421            }
1422            updatePeerProgress( msgs );
1423            tr_rcTransferred( &msgs->torrent->swarmSpeed,
1424                              msgs->torrent->info.pieceSize );
1425            break;
1426
1427        case BT_BITFIELD:
1428        {
1429            dbgmsg( msgs, "got a bitfield" );
1430            tr_peerIoReadBytes( msgs->peer->io, inbuf, msgs->peer->have->bits, msglen );
1431            updatePeerProgress( msgs );
1432            fireNeedReq( msgs );
1433            break;
1434        }
1435
1436        case BT_REQUEST:
1437        {
1438            struct peer_request r;
1439            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1440            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1441            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1442            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1443            peerMadeRequest( msgs, &r );
1444            break;
1445        }
1446
1447        case BT_CANCEL:
1448        {
1449            struct peer_request r;
1450            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1451            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1452            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1453            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1454            if( reqListRemove( &msgs->peerAskedFor, &r ) && fext )
1455                protocolSendReject( msgs, &r );
1456            break;
1457        }
1458
1459        case BT_PIECE:
1460            assert( 0 ); /* handled elsewhere! */
1461            break;
1462
1463        case BT_PORT:
1464            dbgmsg( msgs, "Got a BT_PORT" );
1465            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->dht_port );
1466            if( msgs->peer->dht_port > 0 )
1467                tr_dhtAddNode( getSession(msgs),
1468                               tr_peerAddress( msgs->peer ),
1469                               msgs->peer->dht_port, 0 );
1470            break;
1471
1472        case BT_FEXT_SUGGEST:
1473            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1474            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1475            if( fext )
1476                fireClientGotSuggest( msgs, ui32 );
1477            else {
1478                fireError( msgs, EMSGSIZE );
1479                return READ_ERR;
1480            }
1481            break;
1482
1483        case BT_FEXT_ALLOWED_FAST:
1484            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1485            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1486            if( fext )
1487                fireClientGotAllowedFast( msgs, ui32 );
1488            else {
1489                fireError( msgs, EMSGSIZE );
1490                return READ_ERR;
1491            }
1492            break;
1493
1494        case BT_FEXT_HAVE_ALL:
1495            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1496            if( fext ) {
1497                tr_bitfieldAddRange( msgs->peer->have, 0, msgs->torrent->info.pieceCount );
1498                updatePeerProgress( msgs );
1499            } else {
1500                fireError( msgs, EMSGSIZE );
1501                return READ_ERR;
1502            }
1503            break;
1504
1505        case BT_FEXT_HAVE_NONE:
1506            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1507            if( fext ) {
1508                tr_bitfieldClear( msgs->peer->have );
1509                updatePeerProgress( msgs );
1510            } else {
1511                fireError( msgs, EMSGSIZE );
1512                return READ_ERR;
1513            }
1514            break;
1515
1516        case BT_FEXT_REJECT:
1517        {
1518            struct peer_request r;
1519            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1520            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1521            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1522            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1523            if( fext )
1524                reqListRemove( &msgs->clientAskedFor, &r );
1525            else {
1526                fireError( msgs, EMSGSIZE );
1527                return READ_ERR;
1528            }
1529            break;
1530        }
1531
1532        case BT_LTEP:
1533            dbgmsg( msgs, "Got a BT_LTEP" );
1534            parseLtep( msgs, msglen, inbuf );
1535            break;
1536
1537        default:
1538            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1539            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1540            break;
1541    }
1542
1543    assert( msglen + 1 == msgs->incoming.length );
1544    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1545
1546    msgs->state = AWAITING_BT_LENGTH;
1547    return READ_NOW;
1548}
1549
1550static TR_INLINE void
1551decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount )
1552{
1553    tr_torrent * tor = msgs->torrent;
1554
1555    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1556}
1557
1558static TR_INLINE void
1559clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1560{
1561    decrementDownloadedCount( msgs, req->length );
1562}
1563
1564static void
1565addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1566{
1567    if( !msgs->peer->blame )
1568         msgs->peer->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1569    tr_bitfieldAdd( msgs->peer->blame, index );
1570}
1571
1572/* returns 0 on success, or an errno on failure */
1573static int
1574clientGotBlock( tr_peermsgs *               msgs,
1575                const uint8_t *             data,
1576                const struct peer_request * req )
1577{
1578    int err;
1579    tr_torrent * tor = msgs->torrent;
1580    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1581
1582    assert( msgs );
1583    assert( req );
1584
1585    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1586        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1587                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1588        return EMSGSIZE;
1589    }
1590
1591    /* save the block */
1592    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1593
1594    /**
1595    *** Remove the block from our `we asked for this' list
1596    **/
1597
1598    if( !reqListRemove( &msgs->clientAskedFor, req ) ) {
1599        clientGotUnwantedBlock( msgs, req );
1600        dbgmsg( msgs, "we didn't ask for this message..." );
1601        return 0;
1602    }
1603
1604    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1605            msgs->clientAskedFor.len );
1606
1607    /**
1608    *** Error checks
1609    **/
1610
1611    if( tr_cpBlockIsComplete( &tor->completion, block ) ) {
1612        dbgmsg( msgs, "we have this block already..." );
1613        clientGotUnwantedBlock( msgs, req );
1614        return 0;
1615    }
1616
1617    /**
1618    ***  Save the block
1619    **/
1620
1621    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1622        return err;
1623
1624    addPeerToBlamefield( msgs, req->index );
1625    fireGotBlock( msgs, req );
1626    return 0;
1627}
1628
1629static int peerPulse( void * vmsgs );
1630
1631static void
1632didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1633{
1634    tr_peermsgs * msgs = vmsgs;
1635    firePeerGotData( msgs, bytesWritten, wasPieceData );
1636
1637    if ( tr_isPeerIo( io ) && io->userData )
1638        peerPulse( msgs );
1639}
1640
1641static ReadState
1642canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1643{
1644    ReadState         ret;
1645    tr_peermsgs *     msgs = vmsgs;
1646    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1647    const size_t      inlen = EVBUFFER_LENGTH( in );
1648
1649    dbgmsg( msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state );
1650
1651    if( !inlen )
1652    {
1653        ret = READ_LATER;
1654    }
1655    else if( msgs->state == AWAITING_BT_PIECE )
1656    {
1657        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1658    }
1659    else switch( msgs->state )
1660    {
1661        case AWAITING_BT_LENGTH:
1662            ret = readBtLength ( msgs, in, inlen ); break;
1663
1664        case AWAITING_BT_ID:
1665            ret = readBtId     ( msgs, in, inlen ); break;
1666
1667        case AWAITING_BT_MESSAGE:
1668            ret = readBtMessage( msgs, in, inlen ); break;
1669
1670        default:
1671            ret = READ_ERR;
1672            assert( 0 );
1673    }
1674
1675    dbgmsg( msgs, "canRead: ret is %d", (int)ret );
1676
1677    /* log the raw data that was read */
1678    if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
1679        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1680
1681    return ret;
1682}
1683
1684/**
1685***
1686**/
1687
1688static int
1689ratePulse( tr_peermsgs * msgs, uint64_t now )
1690{
1691    int irate;
1692    const int floor = 8;
1693    const int seconds = 10;
1694    double rate;
1695    int estimatedBlocksInPeriod;
1696    const tr_torrent * const torrent = msgs->torrent;
1697
1698    /* Get the rate limit we should use.
1699     * FIXME: this needs to consider all the other peers as well... */
1700    rate = tr_peerGetPieceSpeed( msgs->peer, now, TR_PEER_TO_CLIENT );
1701    if( tr_torrentUsesSpeedLimit( torrent, TR_PEER_TO_CLIENT ) )
1702        rate = MIN( rate, tr_torrentGetSpeedLimit( torrent, TR_PEER_TO_CLIENT ) );
1703    if( tr_torrentUsesSessionLimits( torrent ) )
1704        if( tr_sessionGetActiveSpeedLimit( torrent->session, TR_PEER_TO_CLIENT, &irate ) )
1705            rate = MIN( rate, irate );
1706
1707    estimatedBlocksInPeriod = ( rate * seconds * 1024 ) / torrent->blockSize;
1708    msgs->maxActiveRequests = MAX( floor, estimatedBlocksInPeriod );
1709
1710    if( msgs->reqq > 0 )
1711        msgs->maxActiveRequests = MIN( msgs->maxActiveRequests, msgs->reqq );
1712
1713    return TRUE;
1714}
1715
1716static size_t
1717fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1718{
1719    size_t bytesWritten = 0;
1720    struct peer_request req;
1721    const tr_bool haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1722    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1723
1724    /**
1725    ***  Protocol messages
1726    **/
1727
1728    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1729    {
1730        dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1731        msgs->outMessagesBatchedAt = now;
1732    }
1733    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1734    {
1735        const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
1736        /* flush the protocol messages */
1737        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1738        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, FALSE );
1739        msgs->clientSentAnythingAt = now;
1740        msgs->outMessagesBatchedAt = 0;
1741        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1742        bytesWritten +=  len;
1743    }
1744
1745    /**
1746    ***  Blocks
1747    **/
1748
1749    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= msgs->torrent->blockSize )
1750        && popNextRequest( msgs, &req ) )
1751    {
1752        if( requestIsValid( msgs, &req )
1753            && tr_cpPieceIsComplete( &msgs->torrent->completion, req.index ) )
1754        {
1755            /* FIXME(libevent2) use evbuffer_reserve_space() + evbuffer_commit_space() */
1756            int err;
1757            const uint32_t msglen = 4 + 1 + 4 + 4 + req.length;
1758            struct evbuffer * out;
1759            tr_peerIo * io = msgs->peer->io;
1760
1761            out = evbuffer_new( );
1762            evbuffer_expand( out, msglen );
1763
1764            tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1765            tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1766            tr_peerIoWriteUint32( io, out, req.index );
1767            tr_peerIoWriteUint32( io, out, req.offset );
1768
1769            err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, EVBUFFER_DATA(out)+EVBUFFER_LENGTH(out) );
1770            if( err )
1771            {
1772                if( fext )
1773                    protocolSendReject( msgs, &req );
1774            }
1775            else
1776            {
1777                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1778                EVBUFFER_LENGTH(out) += req.length;
1779                assert( EVBUFFER_LENGTH( out ) == msglen );
1780                tr_peerIoWriteBuf( io, out, TRUE );
1781                bytesWritten += EVBUFFER_LENGTH( out );
1782                msgs->clientSentAnythingAt = now;
1783            }
1784
1785            evbuffer_free( out );
1786
1787            if( err )
1788            {
1789                bytesWritten = 0;
1790                msgs = NULL;
1791            }
1792        }
1793        else if( fext ) /* peer needs a reject message */
1794        {
1795            protocolSendReject( msgs, &req );
1796        }
1797    }
1798
1799    /**
1800    ***  Keepalive
1801    **/
1802
1803    if( ( msgs != NULL )
1804        && ( msgs->clientSentAnythingAt != 0 )
1805        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1806    {
1807        dbgmsg( msgs, "sending a keepalive message" );
1808        tr_peerIoWriteUint32( msgs->peer->io, msgs->outMessages, 0 );
1809        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1810    }
1811
1812    return bytesWritten;
1813}
1814
1815static int
1816peerPulse( void * vmsgs )
1817{
1818    tr_peermsgs * msgs = vmsgs;
1819    const time_t  now = time( NULL );
1820
1821    ratePulse( msgs, now );
1822
1823    pumpRequestQueue( msgs, now );
1824    expireOldRequests( msgs, now );
1825
1826    for( ;; )
1827        if( fillOutputBuffer( msgs, now ) < 1 )
1828            break;
1829
1830    return TRUE; /* loop forever */
1831}
1832
1833void
1834tr_peerMsgsPulse( tr_peermsgs * msgs )
1835{
1836    if( msgs != NULL )
1837        peerPulse( msgs );
1838}
1839
1840static void
1841gotError( tr_peerIo  * io UNUSED,
1842          short        what,
1843          void       * vmsgs )
1844{
1845    if( what & EVBUFFER_TIMEOUT )
1846        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1847    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1848        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1849               what, errno, tr_strerror( errno ) );
1850    fireError( vmsgs, ENOTCONN );
1851}
1852
1853static void
1854sendBitfield( tr_peermsgs * msgs )
1855{
1856    struct evbuffer * out = msgs->outMessages;
1857    tr_bitfield *     field;
1858    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
1859    size_t            i;
1860    size_t            lazyCount = 0;
1861
1862    field = tr_bitfieldDup( tr_cpPieceBitfield( &msgs->torrent->completion ) );
1863
1864    if( tr_sessionIsLazyBitfieldEnabled( getSession( msgs ) ) )
1865    {
1866        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
1867            speed over a truly random sample -- let's limit the pool size to
1868            the first 1000 pieces so large torrents don't bog things down */
1869        size_t poolSize;
1870        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
1871        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
1872
1873        /* build the pool */
1874        for( i=poolSize=0; i<maxPoolSize; ++i )
1875            if( tr_bitfieldHas( field, i ) )
1876                pool[poolSize++] = i;
1877
1878        /* pull random piece indices from the pool */
1879        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
1880        {
1881            const int pos = tr_cryptoWeakRandInt( poolSize );
1882            const tr_piece_index_t piece = pool[pos];
1883            tr_bitfieldRem( field, piece );
1884            lazyPieces[lazyCount++] = piece;
1885            pool[pos] = pool[--poolSize];
1886        }
1887
1888        /* cleanup */
1889        tr_free( pool );
1890    }
1891
1892    tr_peerIoWriteUint32( msgs->peer->io, out,
1893                          sizeof( uint8_t ) + field->byteCount );
1894    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_BITFIELD );
1895    /* FIXME(libevent2): use evbuffer_add_reference() */
1896    tr_peerIoWriteBytes ( msgs->peer->io, out, field->bits, field->byteCount );
1897    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu",
1898            EVBUFFER_LENGTH( out ) );
1899    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1900
1901    for( i = 0; i < lazyCount; ++i )
1902        protocolSendHave( msgs, lazyPieces[i] );
1903
1904    tr_bitfieldFree( field );
1905}
1906
1907static void
1908tellPeerWhatWeHave( tr_peermsgs * msgs )
1909{
1910    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1911
1912    if( fext && ( tr_cpGetStatus( &msgs->torrent->completion ) == TR_SEED ) )
1913    {
1914        protocolSendHaveAll( msgs );
1915    }
1916    else if( fext && ( tr_cpHaveValid( &msgs->torrent->completion ) == 0 ) )
1917    {
1918        protocolSendHaveNone( msgs );
1919    }
1920    else
1921    {
1922        sendBitfield( msgs );
1923    }
1924}
1925
1926/**
1927***
1928**/
1929
1930/* some peers give us error messages if we send
1931   more than this many peers in a single pex message
1932   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1933#define MAX_PEX_ADDED 50
1934#define MAX_PEX_DROPPED 50
1935
1936typedef struct
1937{
1938    tr_pex *  added;
1939    tr_pex *  dropped;
1940    tr_pex *  elements;
1941    int       addedCount;
1942    int       droppedCount;
1943    int       elementCount;
1944}
1945PexDiffs;
1946
1947static void
1948pexAddedCb( void * vpex,
1949            void * userData )
1950{
1951    PexDiffs * diffs = userData;
1952    tr_pex *   pex = vpex;
1953
1954    if( diffs->addedCount < MAX_PEX_ADDED )
1955    {
1956        diffs->added[diffs->addedCount++] = *pex;
1957        diffs->elements[diffs->elementCount++] = *pex;
1958    }
1959}
1960
1961static TR_INLINE void
1962pexDroppedCb( void * vpex,
1963              void * userData )
1964{
1965    PexDiffs * diffs = userData;
1966    tr_pex *   pex = vpex;
1967
1968    if( diffs->droppedCount < MAX_PEX_DROPPED )
1969    {
1970        diffs->dropped[diffs->droppedCount++] = *pex;
1971    }
1972}
1973
1974static TR_INLINE void
1975pexElementCb( void * vpex,
1976              void * userData )
1977{
1978    PexDiffs * diffs = userData;
1979    tr_pex * pex = vpex;
1980
1981    diffs->elements[diffs->elementCount++] = *pex;
1982}
1983
1984static void
1985sendPex( tr_peermsgs * msgs )
1986{
1987    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
1988    {
1989        PexDiffs diffs;
1990        PexDiffs diffs6;
1991        tr_pex * newPex = NULL;
1992        tr_pex * newPex6 = NULL;
1993        const int newCount = tr_peerMgrGetPeers( msgs->torrent, &newPex, TR_AF_INET, MAX_PEX_PEER_COUNT );
1994        const int newCount6 = tr_peerMgrGetPeers( msgs->torrent, &newPex6, TR_AF_INET6, MAX_PEX_PEER_COUNT );
1995
1996        /* build the diffs */
1997        diffs.added = tr_new( tr_pex, newCount );
1998        diffs.addedCount = 0;
1999        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2000        diffs.droppedCount = 0;
2001        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2002        diffs.elementCount = 0;
2003        tr_set_compare( msgs->pex, msgs->pexCount,
2004                        newPex, newCount,
2005                        tr_pexCompare, sizeof( tr_pex ),
2006                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2007        diffs6.added = tr_new( tr_pex, newCount6 );
2008        diffs6.addedCount = 0;
2009        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
2010        diffs6.droppedCount = 0;
2011        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
2012        diffs6.elementCount = 0;
2013        tr_set_compare( msgs->pex6, msgs->pexCount6,
2014                        newPex6, newCount6,
2015                        tr_pexCompare, sizeof( tr_pex ),
2016                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
2017        dbgmsg(
2018            msgs,
2019            "pex: old peer count %d, new peer count %d, added %d, removed %d",
2020            msgs->pexCount, newCount + newCount6,
2021            diffs.addedCount + diffs6.addedCount,
2022            diffs.droppedCount + diffs6.droppedCount );
2023
2024        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2025            !diffs6.droppedCount )
2026        {
2027            tr_free( diffs.elements );
2028            tr_free( diffs6.elements );
2029        }
2030        else
2031        {
2032            int  i;
2033            tr_benc val;
2034            char * benc;
2035            int bencLen;
2036            uint8_t * tmp, *walk;
2037            tr_peerIo       * io  = msgs->peer->io;
2038            struct evbuffer * out = msgs->outMessages;
2039
2040            /* update peer */
2041            tr_free( msgs->pex );
2042            msgs->pex = diffs.elements;
2043            msgs->pexCount = diffs.elementCount;
2044            tr_free( msgs->pex6 );
2045            msgs->pex6 = diffs6.elements;
2046            msgs->pexCount6 = diffs6.elementCount;
2047
2048            /* build the pex payload */
2049            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
2050                                         * speed vs. likelihood? */
2051
2052            /* "added" */
2053            tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2054            for( i = 0; i < diffs.addedCount; ++i )
2055            {
2056                memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2057                memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2058            }
2059            assert( ( walk - tmp ) == diffs.addedCount * 6 );
2060            tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2061            tr_free( tmp );
2062
2063            /* "added.f" */
2064            tmp = walk = tr_new( uint8_t, diffs.addedCount );
2065            for( i = 0; i < diffs.addedCount; ++i )
2066                *walk++ = diffs.added[i].flags;
2067            assert( ( walk - tmp ) == diffs.addedCount );
2068            tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2069            tr_free( tmp );
2070
2071            /* "dropped" */
2072            tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2073            for( i = 0; i < diffs.droppedCount; ++i )
2074            {
2075                memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2076                memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2077            }
2078            assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2079            tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2080            tr_free( tmp );
2081
2082            /* "added6" */
2083            tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2084            for( i = 0; i < diffs6.addedCount; ++i )
2085            {
2086                memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2087                walk += 16;
2088                memcpy( walk, &diffs6.added[i].port, 2 );
2089                walk += 2;
2090            }
2091            assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2092            tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2093            tr_free( tmp );
2094
2095            /* "added6.f" */
2096            tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2097            for( i = 0; i < diffs6.addedCount; ++i )
2098                *walk++ = diffs6.added[i].flags;
2099            assert( ( walk - tmp ) == diffs6.addedCount );
2100            tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2101            tr_free( tmp );
2102
2103            /* "dropped6" */
2104            tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2105            for( i = 0; i < diffs6.droppedCount; ++i )
2106            {
2107                memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2108                walk += 16;
2109                memcpy( walk, &diffs6.dropped[i].port, 2 );
2110                walk += 2;
2111            }
2112            assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2113            tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2114            tr_free( tmp );
2115
2116            /* write the pex message */
2117            benc = tr_bencToStr( &val, TR_FMT_BENC, &bencLen );
2118            tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + bencLen );
2119            tr_peerIoWriteUint8 ( io, out, BT_LTEP );
2120            tr_peerIoWriteUint8 ( io, out, msgs->ut_pex_id );
2121            tr_peerIoWriteBytes ( io, out, benc, bencLen );
2122            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2123            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
2124            dbgOutMessageLen( msgs );
2125
2126            tr_free( benc );
2127            tr_bencFree( &val );
2128        }
2129
2130        /* cleanup */
2131        tr_free( diffs.added );
2132        tr_free( diffs.dropped );
2133        tr_free( newPex );
2134        tr_free( diffs6.added );
2135        tr_free( diffs6.dropped );
2136        tr_free( newPex6 );
2137
2138        /*msgs->clientSentPexAt = time( NULL );*/
2139    }
2140}
2141
2142static void
2143pexPulse( int foo UNUSED, short bar UNUSED, void * vmsgs )
2144{
2145    struct tr_peermsgs * msgs = vmsgs;
2146
2147    sendPex( msgs );
2148
2149    tr_timerAdd( &msgs->pexTimer, PEX_INTERVAL_SECS, 0 );
2150}
2151
2152/**
2153***
2154**/
2155
2156tr_peermsgs*
2157tr_peerMsgsNew( struct tr_torrent * torrent,
2158                struct tr_peer    * peer,
2159                tr_delivery_func    func,
2160                void              * userData,
2161                tr_publisher_tag  * setme )
2162{
2163    tr_peermsgs * m;
2164
2165    assert( peer );
2166    assert( peer->io );
2167
2168    m = tr_new0( tr_peermsgs, 1 );
2169    m->publisher = TR_PUBLISHER_INIT;
2170    m->peer = peer;
2171    m->torrent = torrent;
2172    m->peer->clientIsChoked = 1;
2173    m->peer->peerIsChoked = 1;
2174    m->peer->clientIsInterested = 0;
2175    m->peer->peerIsInterested = 0;
2176    m->peer->have = tr_bitfieldNew( torrent->info.pieceCount );
2177    m->state = AWAITING_BT_LENGTH;
2178    m->outMessages = evbuffer_new( );
2179    m->outMessagesBatchedAt = 0;
2180    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2181    m->incoming.block = evbuffer_new( );
2182    m->peerAskedFor = REQUEST_LIST_INIT;
2183    m->clientAskedFor = REQUEST_LIST_INIT;
2184    m->clientWillAskFor = REQUEST_LIST_INIT;
2185    evtimer_set( &m->pexTimer, pexPulse, m );
2186    tr_timerAdd( &m->pexTimer, PEX_INTERVAL_SECS, 0 );
2187    peer->msgs = m;
2188
2189    *setme = tr_publisherSubscribe( &m->publisher, func, userData );
2190
2191    if( tr_peerIoSupportsLTEP( peer->io ) )
2192        sendLtepHandshake( m );
2193
2194    if(tr_peerIoSupportsDHT(peer->io))
2195        protocolSendPort(m, tr_dhtPort(torrent->session));
2196
2197    tellPeerWhatWeHave( m );
2198
2199    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2200    ratePulse( m, tr_date() );
2201
2202    return m;
2203}
2204
2205void
2206tr_peerMsgsFree( tr_peermsgs* msgs )
2207{
2208    if( msgs )
2209    {
2210        evtimer_del( &msgs->pexTimer );
2211        tr_publisherDestruct( &msgs->publisher );
2212        reqListClear( &msgs->clientWillAskFor );
2213        reqListClear( &msgs->clientAskedFor );
2214        reqListClear( &msgs->peerAskedFor );
2215
2216        evbuffer_free( msgs->incoming.block );
2217        evbuffer_free( msgs->outMessages );
2218        tr_free( msgs->pex6 );
2219        tr_free( msgs->pex );
2220
2221        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2222        tr_free( msgs );
2223    }
2224}
2225
2226void
2227tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2228                        tr_publisher_tag tag )
2229{
2230    tr_publisherUnsubscribe( &peer->publisher, tag );
2231}
2232
Note: See TracBrowser for help on using the repository browser.