source: trunk/libtransmission/peer-msgs.c @ 7921

Last change on this file since 7921 was 7921, checked in by charles, 13 years ago

(trunk libT) some minor tr_bool correctness

  • Property svn:keywords set to Date Rev Author Id
File size: 62.7 KB
Line 
1/*
2 * This file Copyright (C) 2007-2009 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 7921 2009-02-19 21:55:00Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <event.h>
21
22#include "transmission.h"
23#include "session.h"
24#include "bencode.h"
25#include "completion.h"
26#include "crypto.h"
27#include "inout.h"
28#ifdef WIN32
29#include "net.h" /* for ECONN */
30#endif
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-msgs.h"
34#include "platform.h" /* MAX_STACK_ARRAY_SIZE */
35#include "ratecontrol.h"
36#include "request-list.h"
37#include "stats.h"
38#include "torrent.h"
39#include "trevent.h"
40#include "utils.h"
41
42/**
43***
44**/
45
46enum
47{
48    BT_CHOKE                = 0,
49    BT_UNCHOKE              = 1,
50    BT_INTERESTED           = 2,
51    BT_NOT_INTERESTED       = 3,
52    BT_HAVE                 = 4,
53    BT_BITFIELD             = 5,
54    BT_REQUEST              = 6,
55    BT_PIECE                = 7,
56    BT_CANCEL               = 8,
57    BT_PORT                 = 9,
58
59    BT_FEXT_SUGGEST         = 13,
60    BT_FEXT_HAVE_ALL        = 14,
61    BT_FEXT_HAVE_NONE       = 15,
62    BT_FEXT_REJECT          = 16,
63    BT_FEXT_ALLOWED_FAST    = 17,
64
65    BT_LTEP                 = 20,
66
67    LTEP_HANDSHAKE          = 0,
68
69    TR_LTEP_PEX             = 1,
70
71
72
73    MIN_CHOKE_PERIOD_SEC    = ( 10 ),
74
75    /* idle seconds before we send a keepalive */
76    KEEPALIVE_INTERVAL_SECS = 100,
77
78    PEX_INTERVAL            = ( 90 * 1000 ), /* msec between sendPex() calls */
79
80
81    MAX_BLOCK_SIZE          = ( 1024 * 16 ),
82
83
84    /* how long an unsent request can stay queued before it's returned
85       back to the peer-mgr's pool of requests */
86    QUEUED_REQUEST_TTL_SECS = 20,
87
88    /* how long a sent request can stay queued before it's returned
89       back to the peer-mgr's pool of requests */
90    SENT_REQUEST_TTL_SECS = 240,
91
92    /* used in lowering the outMessages queue period */
93    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
94    HIGH_PRIORITY_INTERVAL_SECS = 2,
95    LOW_PRIORITY_INTERVAL_SECS = 20,
96
97    /* number of pieces to remove from the bitfield when
98     * lazy bitfields are turned on */
99    LAZY_PIECE_COUNT = 26,
100
101    /* number of pieces we'll allow in our fast set */
102    MAX_FAST_SET_SIZE = 3
103};
104
105enum
106{
107    AWAITING_BT_LENGTH,
108    AWAITING_BT_ID,
109    AWAITING_BT_MESSAGE,
110    AWAITING_BT_PIECE
111};
112
113/**
114***
115**/
116
117/* this is raw, unchanged data from the peer regarding
118 * the current message that it's sending us. */
119struct tr_incoming
120{
121    uint8_t                id;
122    uint32_t               length; /* includes the +1 for id length */
123    struct peer_request    blockReq; /* metadata for incoming blocks */
124    struct evbuffer *      block; /* piece data for incoming blocks */
125};
126
127/**
128 * Low-level communication state information about a connected peer.
129 *
130 * This structure remembers the low-level protocol states that we're
131 * in with this peer, such as active requests, pex messages, and so on.
132 * Its fields are all private to peer-msgs.c.
133 *
134 * Data not directly involved with sending & receiving messages is
135 * stored in tr_peer, where it can be accessed by both peermsgs and
136 * the peer manager.
137 *
138 * @see struct peer_atom
139 * @see tr_peer
140 */
141struct tr_peermsgs
142{
143    tr_bool         peerSupportsPex;
144    tr_bool         clientSentLtepHandshake;
145    tr_bool         peerSentLtepHandshake;
146    tr_bool         haveFastSet;
147
148    uint8_t         state;
149    uint8_t         ut_pex_id;
150    uint16_t        pexCount;
151    uint16_t        pexCount6;
152    uint16_t        maxActiveRequests;
153
154    size_t                 fastsetSize;
155    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
156
157    /* how long the outMessages batch should be allowed to grow before
158     * it's flushed -- some messages (like requests >:) should be sent
159     * very quickly; others aren't as urgent. */
160    int                    outMessagesBatchPeriod;
161
162    tr_peer *              peer;
163
164    tr_session *           session;
165    tr_torrent *           torrent;
166
167    tr_publisher           publisher;
168
169    struct evbuffer *      outMessages; /* all the non-piece messages */
170
171    struct request_list    peerAskedFor;
172    struct request_list    clientAskedFor;
173    struct request_list    clientWillAskFor;
174
175    tr_timer             * pexTimer;
176    tr_pex               * pex;
177    tr_pex               * pex6;
178
179    time_t                 clientSentPexAt;
180    time_t                 clientSentAnythingAt;
181
182    /* when we started batching the outMessages */
183    time_t                outMessagesBatchedAt;
184
185    struct tr_incoming    incoming;
186
187    /* if the peer supports the Extension Protocol in BEP 10 and
188       supplied a reqq argument, it's stored here.  otherwise the
189       value is zero and should be ignored. */
190    int64_t               reqq;
191};
192
193/**
194***
195**/
196
197static void
198myDebug( const char * file, int line,
199         const struct tr_peermsgs * msgs,
200         const char * fmt, ... )
201{
202    FILE * fp = tr_getLog( );
203
204    if( fp )
205    {
206        va_list           args;
207        char              timestr[64];
208        struct evbuffer * buf = tr_getBuffer( );
209        char *            base = tr_basename( file );
210
211        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
212                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
213                             msgs->torrent->info.name,
214                             tr_peerIoGetAddrStr( msgs->peer->io ),
215                             msgs->peer->client );
216        va_start( args, fmt );
217        evbuffer_add_vprintf( buf, fmt, args );
218        va_end( args );
219        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
220        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
221
222        tr_free( base );
223        tr_releaseBuffer( buf );
224    }
225}
226
227#define dbgmsg( msgs, ... ) \
228    do { \
229        if( tr_deepLoggingIsActive( ) ) \
230            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
231    } while( 0 )
232
233/**
234***
235**/
236
237static void
238pokeBatchPeriod( tr_peermsgs * msgs,
239                 int           interval )
240{
241    if( msgs->outMessagesBatchPeriod > interval )
242    {
243        msgs->outMessagesBatchPeriod = interval;
244        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
245    }
246}
247
248static TR_INLINE void
249dbgOutMessageLen( tr_peermsgs * msgs )
250{
251    dbgmsg( msgs, "outMessage size is now %zu", EVBUFFER_LENGTH( msgs->outMessages ) );
252}
253
254static void
255protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
256{
257    tr_peerIo       * io  = msgs->peer->io;
258    struct evbuffer * out = msgs->outMessages;
259
260    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
261
262    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
263    tr_peerIoWriteUint8 ( io, out, BT_FEXT_REJECT );
264    tr_peerIoWriteUint32( io, out, req->index );
265    tr_peerIoWriteUint32( io, out, req->offset );
266    tr_peerIoWriteUint32( io, out, req->length );
267
268    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
269    dbgOutMessageLen( msgs );
270}
271
272static void
273protocolSendRequest( tr_peermsgs               * msgs,
274                     const struct peer_request * req )
275{
276    tr_peerIo       * io  = msgs->peer->io;
277    struct evbuffer * out = msgs->outMessages;
278
279    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
280    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
281    tr_peerIoWriteUint32( io, out, req->index );
282    tr_peerIoWriteUint32( io, out, req->offset );
283    tr_peerIoWriteUint32( io, out, req->length );
284
285    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
286    dbgOutMessageLen( msgs );
287    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
288}
289
290static void
291protocolSendCancel( tr_peermsgs               * msgs,
292                    const struct peer_request * req )
293{
294    tr_peerIo       * io  = msgs->peer->io;
295    struct evbuffer * out = msgs->outMessages;
296
297    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
298    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
299    tr_peerIoWriteUint32( io, out, req->index );
300    tr_peerIoWriteUint32( io, out, req->offset );
301    tr_peerIoWriteUint32( io, out, req->length );
302
303    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
304    dbgOutMessageLen( msgs );
305    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
306}
307
308static void
309protocolSendHave( tr_peermsgs * msgs,
310                  uint32_t      index )
311{
312    tr_peerIo       * io  = msgs->peer->io;
313    struct evbuffer * out = msgs->outMessages;
314
315    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
316    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
317    tr_peerIoWriteUint32( io, out, index );
318
319    dbgmsg( msgs, "sending Have %u", index );
320    dbgOutMessageLen( msgs );
321    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
322}
323
324#if 0
325static void
326protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
327{
328    tr_peerIo       * io  = msgs->peer->io;
329    struct evbuffer * out = msgs->outMessages;
330
331    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
332
333    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
334    tr_peerIoWriteUint8 ( io, out, BT_FEXT_ALLOWED_FAST );
335    tr_peerIoWriteUint32( io, out, pieceIndex );
336
337    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
338    dbgOutMessageLen( msgs );
339}
340#endif
341
342static void
343protocolSendChoke( tr_peermsgs * msgs,
344                   int           choke )
345{
346    tr_peerIo       * io  = msgs->peer->io;
347    struct evbuffer * out = msgs->outMessages;
348
349    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
350    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
351
352    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
353    dbgOutMessageLen( msgs );
354    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
355}
356
357static void
358protocolSendHaveAll( tr_peermsgs * msgs )
359{
360    tr_peerIo       * io  = msgs->peer->io;
361    struct evbuffer * out = msgs->outMessages;
362
363    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
364
365    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
366    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_ALL );
367
368    dbgmsg( msgs, "sending HAVE_ALL..." );
369    dbgOutMessageLen( msgs );
370    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
371}
372
373static void
374protocolSendHaveNone( tr_peermsgs * msgs )
375{
376    tr_peerIo       * io  = msgs->peer->io;
377    struct evbuffer * out = msgs->outMessages;
378
379    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
380
381    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
382    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_NONE );
383
384    dbgmsg( msgs, "sending HAVE_NONE..." );
385    dbgOutMessageLen( msgs );
386    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
387}
388
389/**
390***  EVENTS
391**/
392
393static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0, 0 };
394
395static void
396publish( tr_peermsgs * msgs, tr_peer_event * e )
397{
398    assert( msgs->peer );
399    assert( msgs->peer->msgs == msgs );
400
401    tr_publisherPublish( &msgs->publisher, msgs->peer, e );
402}
403
404static void
405fireError( tr_peermsgs * msgs, int err )
406{
407    tr_peer_event e = blankEvent;
408    e.eventType = TR_PEER_ERROR;
409    e.err = err;
410    publish( msgs, &e );
411}
412
413static void
414fireUploadOnly( tr_peermsgs * msgs, tr_bool uploadOnly )
415{
416    tr_peer_event e = blankEvent;
417    e.eventType = TR_PEER_UPLOAD_ONLY;
418    e.uploadOnly = uploadOnly;
419    publish( msgs, &e );
420}
421
422static void
423fireNeedReq( tr_peermsgs * msgs )
424{
425    tr_peer_event e = blankEvent;
426    e.eventType = TR_PEER_NEED_REQ;
427    publish( msgs, &e );
428}
429
430static void
431firePeerProgress( tr_peermsgs * msgs )
432{
433    tr_peer_event e = blankEvent;
434    e.eventType = TR_PEER_PEER_PROGRESS;
435    e.progress = msgs->peer->progress;
436    publish( msgs, &e );
437}
438
439static void
440fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
441{
442    tr_peer_event e = blankEvent;
443    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
444    e.pieceIndex = req->index;
445    e.offset = req->offset;
446    e.length = req->length;
447    publish( msgs, &e );
448}
449
450static void
451fireClientGotData( tr_peermsgs * msgs,
452                   uint32_t      length,
453                   int           wasPieceData )
454{
455    tr_peer_event e = blankEvent;
456
457    e.length = length;
458    e.eventType = TR_PEER_CLIENT_GOT_DATA;
459    e.wasPieceData = wasPieceData;
460    publish( msgs, &e );
461}
462
463static void
464fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
465{
466    tr_peer_event e = blankEvent;
467    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
468    e.pieceIndex = pieceIndex;
469    publish( msgs, &e );
470}
471
472static void
473fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
474{
475    tr_peer_event e = blankEvent;
476    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
477    e.pieceIndex = pieceIndex;
478    publish( msgs, &e );
479}
480
481static void
482firePeerGotData( tr_peermsgs  * msgs,
483                 uint32_t       length,
484                 int            wasPieceData )
485{
486    tr_peer_event e = blankEvent;
487
488    e.length = length;
489    e.eventType = TR_PEER_PEER_GOT_DATA;
490    e.wasPieceData = wasPieceData;
491
492    publish( msgs, &e );
493}
494
495static void
496fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
497{
498    tr_peer_event e = blankEvent;
499    e.eventType = TR_PEER_CANCEL;
500    e.pieceIndex = req->index;
501    e.offset = req->offset;
502    e.length = req->length;
503    publish( msgs, &e );
504}
505
506/**
507***  ALLOWED FAST SET
508***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
509**/
510
511size_t
512tr_generateAllowedSet( tr_piece_index_t * setmePieces,
513                       size_t             desiredSetSize,
514                       size_t             pieceCount,
515                       const uint8_t    * infohash,
516                       const tr_address * addr )
517{
518    size_t setSize = 0;
519
520    assert( setmePieces );
521    assert( desiredSetSize <= pieceCount );
522    assert( desiredSetSize );
523    assert( pieceCount );
524    assert( infohash );
525    assert( addr );
526
527    if( addr->type == TR_AF_INET )
528    {
529        uint8_t w[SHA_DIGEST_LENGTH + 4];
530        uint8_t x[SHA_DIGEST_LENGTH];
531
532        *(uint32_t*)w = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
533        memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );                /* (2) */
534        tr_sha1( x, w, sizeof( w ), NULL );                          /* (3) */
535
536        while( setSize<desiredSetSize )
537        {
538            int i;
539            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
540            {
541                size_t k;
542                uint32_t j = i * 4;                                  /* (5) */
543                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
544                uint32_t index = y % pieceCount;                     /* (7) */
545
546                for( k=0; k<setSize; ++k )                           /* (8) */
547                    if( setmePieces[k] == index )
548                        break;
549
550                if( k == setSize )
551                    setmePieces[setSize++] = index;                  /* (9) */
552            }
553
554            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
555        }
556    }
557
558    return setSize;
559}
560
561static void
562updateFastSet( tr_peermsgs * msgs UNUSED )
563{
564#if 0
565    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
566    const int peerIsNeedy = msgs->peer->progress < 0.10;
567
568    if( fext && peerIsNeedy && !msgs->haveFastSet )
569    {
570        size_t i;
571        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
572        const tr_info * inf = &msgs->torrent->info;
573        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
574
575        /* build the fast set */
576        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
577        msgs->haveFastSet = 1;
578
579        /* send it to the peer */
580        for( i=0; i<msgs->fastsetSize; ++i )
581            protocolSendAllowedFast( msgs, msgs->fastset[i] );
582    }
583#endif
584}
585
586/**
587***  INTEREST
588**/
589
590static tr_bool
591isPieceInteresting( const tr_peermsgs * msgs,
592                    tr_piece_index_t    piece )
593{
594    const tr_torrent * torrent = msgs->torrent;
595
596    return ( !torrent->info.pieces[piece].dnd )                 /* we want it */
597          && ( !tr_cpPieceIsComplete( &torrent->completion, piece ) ) /* !have */
598          && ( tr_bitfieldHas( msgs->peer->have, piece ) );    /* peer has it */
599}
600
601/* "interested" means we'll ask for piece data if they unchoke us */
602static tr_bool
603isPeerInteresting( const tr_peermsgs * msgs )
604{
605    tr_piece_index_t    i;
606    const tr_torrent *  torrent;
607    const tr_bitfield * bitfield;
608    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
609
610    if( clientIsSeed )
611        return FALSE;
612
613    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
614        return FALSE;
615
616    torrent = msgs->torrent;
617    bitfield = tr_cpPieceBitfield( &torrent->completion );
618
619    if( !msgs->peer->have )
620        return TRUE;
621
622    assert( bitfield->byteCount == msgs->peer->have->byteCount );
623
624    for( i = 0; i < torrent->info.pieceCount; ++i )
625        if( isPieceInteresting( msgs, i ) )
626            return TRUE;
627
628    return FALSE;
629}
630
631static void
632sendInterest( tr_peermsgs * msgs,
633              int           weAreInterested )
634{
635    struct evbuffer * out = msgs->outMessages;
636
637    assert( msgs );
638    assert( weAreInterested == 0 || weAreInterested == 1 );
639
640    msgs->peer->clientIsInterested = weAreInterested;
641    dbgmsg( msgs, "Sending %s", weAreInterested ? "Interested" : "Not Interested" );
642    tr_peerIoWriteUint32( msgs->peer->io, out, sizeof( uint8_t ) );
643    tr_peerIoWriteUint8 ( msgs->peer->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
644
645    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
646    dbgOutMessageLen( msgs );
647}
648
649static void
650updateInterest( tr_peermsgs * msgs )
651{
652    const int i = isPeerInteresting( msgs );
653
654    if( i != msgs->peer->clientIsInterested )
655        sendInterest( msgs, i );
656    if( i )
657        fireNeedReq( msgs );
658}
659
660static tr_bool
661popNextRequest( tr_peermsgs *         msgs,
662                struct peer_request * setme )
663{
664    return reqListPop( &msgs->peerAskedFor, setme );
665}
666
667static void
668cancelAllRequestsToClient( tr_peermsgs * msgs )
669{
670    struct peer_request req;
671    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
672
673    while( popNextRequest( msgs, &req ) )
674        if( mustSendCancel )
675            protocolSendReject( msgs, &req );
676}
677
678void
679tr_peerMsgsSetChoke( tr_peermsgs * msgs,
680                     int           choke )
681{
682    const time_t now = time( NULL );
683    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
684
685    assert( msgs );
686    assert( msgs->peer );
687    assert( choke == 0 || choke == 1 );
688
689    if( msgs->peer->chokeChangedAt > fibrillationTime )
690    {
691        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
692    }
693    else if( msgs->peer->peerIsChoked != choke )
694    {
695        msgs->peer->peerIsChoked = choke;
696        if( choke )
697            cancelAllRequestsToClient( msgs );
698        protocolSendChoke( msgs, choke );
699        msgs->peer->chokeChangedAt = now;
700    }
701}
702
703/**
704***
705**/
706
707void
708tr_peerMsgsHave( tr_peermsgs * msgs,
709                 uint32_t      index )
710{
711    protocolSendHave( msgs, index );
712
713    /* since we have more pieces now, we might not be interested in this peer */
714    updateInterest( msgs );
715}
716
717/**
718***
719**/
720
721static tr_bool
722reqIsValid( const tr_peermsgs * peer,
723            uint32_t            index,
724            uint32_t            offset,
725            uint32_t            length )
726{
727    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
728}
729
730static tr_bool
731requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
732{
733    return reqIsValid( msgs, req->index, req->offset, req->length );
734}
735
736static void
737expireFromList( tr_peermsgs          * msgs,
738                struct request_list  * list,
739                const time_t           oldestAllowed )
740{
741    size_t i;
742    struct request_list tmp = REQUEST_LIST_INIT;
743
744    /* since the fifo list is sorted by time, the oldest will be first */
745    if( !list->len || ( list->fifo[0].time_requested >= oldestAllowed ) )
746        return;
747
748    /* if we found one too old, start pruning them */
749    reqListCopy( &tmp, list );
750    for( i=0; i<tmp.len; ++i ) {
751        const struct peer_request * req = &tmp.fifo[i];
752        if( req->time_requested >= oldestAllowed )
753            break;
754        tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
755    }
756    reqListClear( &tmp );
757}
758
759static void
760expireOldRequests( tr_peermsgs * msgs, const time_t now  )
761{
762    time_t oldestAllowed;
763    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
764    dbgmsg( msgs, "entering `expire old requests' block" );
765
766    /* cancel requests that have been queued for too long */
767    oldestAllowed = now - QUEUED_REQUEST_TTL_SECS;
768    expireFromList( msgs, &msgs->clientWillAskFor, oldestAllowed );
769
770    /* if the peer doesn't support "Reject Request",
771     * cancel requests that were sent too long ago. */
772    if( !fext ) {
773        oldestAllowed = now - SENT_REQUEST_TTL_SECS;
774        expireFromList( msgs, &msgs->clientAskedFor, oldestAllowed );
775    }
776
777    dbgmsg( msgs, "leaving `expire old requests' block" );
778}
779
780static void
781pumpRequestQueue( tr_peermsgs * msgs, const time_t now )
782{
783    const int           max = msgs->maxActiveRequests;
784    int                 sent = 0;
785    int                 len = msgs->clientAskedFor.len;
786    struct peer_request req;
787
788    dbgmsg( msgs, "clientIsChoked %d, download allowed %d, len %d, max %d, msgs->clientWillAskFor.len %d",
789            (int)msgs->peer->clientIsChoked,
790            (int)tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ),
791            len, max, msgs->clientWillAskFor.len );
792
793    if( msgs->peer->clientIsChoked )
794        return;
795    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
796        return;
797
798    while( ( len < max ) && reqListPop( &msgs->clientWillAskFor, &req ) )
799    {
800        const tr_block_index_t block = _tr_block( msgs->torrent, req.index, req.offset );
801
802        assert( requestIsValid( msgs, &req ) );
803        assert( tr_bitfieldHas( msgs->peer->have, req.index ) );
804
805        /* don't ask for it if we've already got it... this block may have
806         * come in from a different peer after we cancelled a request for it */
807        if( !tr_cpBlockIsComplete( &msgs->torrent->completion, block ) )
808        {
809            protocolSendRequest( msgs, &req );
810            req.time_requested = now;
811            reqListAppend( &msgs->clientAskedFor, &req );
812
813            ++len;
814            ++sent;
815        }
816        else dbgmsg( msgs, "not asking for it because we've already got it..." );
817    }
818
819    if( sent )
820        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
821                sent, msgs->clientAskedFor.len, msgs->clientWillAskFor.len );
822
823    if( len < max )
824        fireNeedReq( msgs );
825}
826
827static TR_INLINE tr_bool
828requestQueueIsFull( const tr_peermsgs * msgs )
829{
830    const int req_max = msgs->maxActiveRequests;
831    return msgs->clientWillAskFor.len >= (size_t)req_max;
832}
833
834tr_addreq_t
835tr_peerMsgsAddRequest( tr_peermsgs *    msgs,
836                       uint32_t         index,
837                       uint32_t         offset,
838                       uint32_t         length )
839{
840    struct peer_request req;
841
842    assert( msgs );
843    assert( msgs->torrent );
844
845    /**
846    ***  Reasons to decline the request
847    **/
848
849    /* don't send requests to choked clients */
850    if( msgs->peer->clientIsChoked ) {
851        dbgmsg( msgs, "declining request because they're choking us" );
852        return TR_ADDREQ_CLIENT_CHOKED;
853    }
854
855    /* peer's queue is full */
856    if( requestQueueIsFull( msgs ) ) {
857        dbgmsg( msgs, "declining request because we're full" );
858        return TR_ADDREQ_FULL;
859    }
860
861    /* peer doesn't have this piece */
862    if( !tr_bitfieldHas( msgs->peer->have, index ) )
863        return TR_ADDREQ_MISSING;
864
865    /* have we already asked for this piece? */
866    req.index = index;
867    req.offset = offset;
868    req.length = length;
869    if( reqListHas( &msgs->clientAskedFor, &req ) ) {
870        dbgmsg( msgs, "declining because it's a duplicate" );
871        return TR_ADDREQ_DUPLICATE;
872    }
873    if( reqListHas( &msgs->clientWillAskFor, &req ) ) {
874        dbgmsg( msgs, "declining because it's a duplicate" );
875        return TR_ADDREQ_DUPLICATE;
876    }
877
878    /**
879    ***  Accept this request
880    **/
881
882    dbgmsg( msgs, "adding req for %"PRIu32":%"PRIu32"->%"PRIu32" to our `will request' list",
883            index, offset, length );
884    req.time_requested = time( NULL );
885    reqListAppend( &msgs->clientWillAskFor, &req );
886    return TR_ADDREQ_OK;
887}
888
889static void
890cancelAllRequestsToPeer( tr_peermsgs * msgs, tr_bool sendCancel )
891{
892    size_t i;
893    struct request_list a = msgs->clientWillAskFor;
894    struct request_list b = msgs->clientAskedFor;
895    dbgmsg( msgs, "cancelling all requests to peer" );
896
897    msgs->clientAskedFor = REQUEST_LIST_INIT;
898    msgs->clientWillAskFor = REQUEST_LIST_INIT;
899
900    for( i=0; i<a.len; ++i )
901        fireCancelledReq( msgs, &a.fifo[i] );
902
903    for( i = 0; i < b.len; ++i ) {
904        fireCancelledReq( msgs, &b.fifo[i] );
905        if( sendCancel )
906            protocolSendCancel( msgs, &b.fifo[i] );
907    }
908
909    reqListClear( &a );
910    reqListClear( &b );
911}
912
913void
914tr_peerMsgsCancel( tr_peermsgs * msgs,
915                   uint32_t      pieceIndex,
916                   uint32_t      offset,
917                   uint32_t      length )
918{
919    struct peer_request req;
920
921    assert( msgs != NULL );
922    assert( length > 0 );
923
924
925    /* have we asked the peer for this piece? */
926    req.index = pieceIndex;
927    req.offset = offset;
928    req.length = length;
929
930    /* if it's only in the queue and hasn't been sent yet, free it */
931    if( reqListRemove( &msgs->clientWillAskFor, &req ) ) {
932        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length );
933        fireCancelledReq( msgs, &req );
934    }
935
936    /* if it's already been sent, send a cancel message too */
937    if( reqListRemove( &msgs->clientAskedFor, &req ) ) {
938        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length );
939        protocolSendCancel( msgs, &req );
940        fireCancelledReq( msgs, &req );
941    }
942}
943
944
945/**
946***
947**/
948
949static void
950sendLtepHandshake( tr_peermsgs * msgs )
951{
952    tr_benc val, *m;
953    char * buf;
954    int len;
955    int pex;
956    struct evbuffer * out = msgs->outMessages;
957
958    if( msgs->clientSentLtepHandshake )
959        return;
960
961    dbgmsg( msgs, "sending an ltep handshake" );
962    msgs->clientSentLtepHandshake = 1;
963
964    /* decide if we want to advertise pex support */
965    if( !tr_torrentAllowsPex( msgs->torrent ) )
966        pex = 0;
967    else if( msgs->peerSentLtepHandshake )
968        pex = msgs->peerSupportsPex ? 1 : 0;
969    else
970        pex = 1;
971
972    tr_bencInitDict( &val, 5 );
973    tr_bencDictAddInt( &val, "e", msgs->session->encryptionMode != TR_CLEAR_PREFERRED );
974    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->session ) );
975    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
976    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
977    m  = tr_bencDictAddDict( &val, "m", 1 );
978    if( pex )
979        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
980    buf = tr_bencSave( &val, &len );
981
982    tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + len );
983    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
984    tr_peerIoWriteUint8 ( msgs->peer->io, out, LTEP_HANDSHAKE );
985    tr_peerIoWriteBytes ( msgs->peer->io, out, buf, len );
986    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
987    dbgOutMessageLen( msgs );
988
989    /* cleanup */
990    tr_bencFree( &val );
991    tr_free( buf );
992}
993
994static void
995parseLtepHandshake( tr_peermsgs *     msgs,
996                    int               len,
997                    struct evbuffer * inbuf )
998{
999    int64_t   i;
1000    tr_benc   val, * sub;
1001    uint8_t * tmp = tr_new( uint8_t, len );
1002
1003    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
1004    msgs->peerSentLtepHandshake = 1;
1005
1006    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
1007    {
1008        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
1009        tr_free( tmp );
1010        return;
1011    }
1012
1013    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
1014
1015    /* does the peer prefer encrypted connections? */
1016    if( tr_bencDictFindInt( &val, "e", &i ) )
1017        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1018                                              : ENCRYPTION_PREFERENCE_NO;
1019
1020    /* check supported messages for utorrent pex */
1021    msgs->peerSupportsPex = 0;
1022    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
1023        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
1024            msgs->ut_pex_id = (uint8_t) i;
1025            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
1026            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
1027        }
1028    }
1029
1030    /* look for upload_only (BEP 21) */
1031    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
1032        fireUploadOnly( msgs, i!=0 );
1033
1034    /* get peer's listening port */
1035    if( tr_bencDictFindInt( &val, "p", &i ) ) {
1036        msgs->peer->port = htons( (uint16_t)i );
1037        dbgmsg( msgs, "msgs->port is now %hu", msgs->peer->port );
1038    }
1039
1040    /* get peer's maximum request queue size */
1041    if( tr_bencDictFindInt( &val, "reqq", &i ) )
1042        msgs->reqq = i;
1043
1044    tr_bencFree( &val );
1045    tr_free( tmp );
1046}
1047
1048static void
1049parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1050{
1051    int loaded = 0;
1052    uint8_t * tmp = tr_new( uint8_t, msglen );
1053    tr_benc val;
1054    tr_torrent * tor = msgs->torrent;
1055    const uint8_t * added;
1056    size_t added_len;
1057
1058    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1059
1060    if( tr_torrentAllowsPex( tor )
1061      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1062    {
1063        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1064        {
1065            const uint8_t * added_f = NULL;
1066            tr_pex *        pex;
1067            size_t          i, n;
1068            size_t          added_f_len = 0;
1069            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1070            pex =
1071                tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len,
1072                                        &n );
1073            for( i = 0; i < n; ++i )
1074                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex + i );
1075            tr_free( pex );
1076        }
1077       
1078        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1079        {
1080            const uint8_t * added_f = NULL;
1081            tr_pex *        pex;
1082            size_t          i, n;
1083            size_t          added_f_len = 0;
1084            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1085            pex =
1086                tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len,
1087                                         &n );
1088            for( i = 0; i < n; ++i )
1089                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex + i );
1090            tr_free( pex );
1091        }
1092       
1093    }
1094
1095    if( loaded )
1096        tr_bencFree( &val );
1097    tr_free( tmp );
1098}
1099
1100static void sendPex( tr_peermsgs * msgs );
1101
1102static void
1103parseLtep( tr_peermsgs *     msgs,
1104           int               msglen,
1105           struct evbuffer * inbuf )
1106{
1107    uint8_t ltep_msgid;
1108
1109    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1110    msglen--;
1111
1112    if( ltep_msgid == LTEP_HANDSHAKE )
1113    {
1114        dbgmsg( msgs, "got ltep handshake" );
1115        parseLtepHandshake( msgs, msglen, inbuf );
1116        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1117        {
1118            sendLtepHandshake( msgs );
1119            sendPex( msgs );
1120        }
1121    }
1122    else if( ltep_msgid == TR_LTEP_PEX )
1123    {
1124        dbgmsg( msgs, "got ut pex" );
1125        msgs->peerSupportsPex = 1;
1126        parseUtPex( msgs, msglen, inbuf );
1127    }
1128    else
1129    {
1130        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1131        evbuffer_drain( inbuf, msglen );
1132    }
1133}
1134
1135static int
1136readBtLength( tr_peermsgs *     msgs,
1137              struct evbuffer * inbuf,
1138              size_t            inlen )
1139{
1140    uint32_t len;
1141
1142    if( inlen < sizeof( len ) )
1143        return READ_LATER;
1144
1145    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1146
1147    if( len == 0 ) /* peer sent us a keepalive message */
1148        dbgmsg( msgs, "got KeepAlive" );
1149    else
1150    {
1151        msgs->incoming.length = len;
1152        msgs->state = AWAITING_BT_ID;
1153    }
1154
1155    return READ_NOW;
1156}
1157
1158static int readBtMessage( tr_peermsgs *     msgs,
1159                          struct evbuffer * inbuf,
1160                          size_t            inlen );
1161
1162static int
1163readBtId( tr_peermsgs *     msgs,
1164          struct evbuffer * inbuf,
1165          size_t            inlen )
1166{
1167    uint8_t id;
1168
1169    if( inlen < sizeof( uint8_t ) )
1170        return READ_LATER;
1171
1172    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1173    msgs->incoming.id = id;
1174    dbgmsg( msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length );
1175
1176    if( id == BT_PIECE )
1177    {
1178        msgs->state = AWAITING_BT_PIECE;
1179        return READ_NOW;
1180    }
1181    else if( msgs->incoming.length != 1 )
1182    {
1183        msgs->state = AWAITING_BT_MESSAGE;
1184        return READ_NOW;
1185    }
1186    else return readBtMessage( msgs, inbuf, inlen - 1 );
1187}
1188
1189static void
1190updatePeerProgress( tr_peermsgs * msgs )
1191{
1192    msgs->peer->progress = tr_bitfieldCountTrueBits( msgs->peer->have ) / (float)msgs->torrent->info.pieceCount;
1193    dbgmsg( msgs, "peer progress is %f", msgs->peer->progress );
1194    updateFastSet( msgs );
1195    updateInterest( msgs );
1196    firePeerProgress( msgs );
1197}
1198
1199static void
1200peerMadeRequest( tr_peermsgs *               msgs,
1201                 const struct peer_request * req )
1202{
1203    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1204    const int reqIsValid = requestIsValid( msgs, req );
1205    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( &msgs->torrent->completion, req->index );
1206    const int peerIsChoked = msgs->peer->peerIsChoked;
1207
1208    int allow = FALSE;
1209
1210    if( !reqIsValid )
1211        dbgmsg( msgs, "rejecting an invalid request." );
1212    else if( !clientHasPiece )
1213        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1214    else if( peerIsChoked )
1215        dbgmsg( msgs, "rejecting request from choked peer" );
1216    else
1217        allow = TRUE;
1218
1219    if( allow )
1220        reqListAppend( &msgs->peerAskedFor, req );
1221    else if( fext )
1222        protocolSendReject( msgs, req );
1223}
1224
1225static tr_bool
1226messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1227{
1228    switch( id )
1229    {
1230        case BT_CHOKE:
1231        case BT_UNCHOKE:
1232        case BT_INTERESTED:
1233        case BT_NOT_INTERESTED:
1234        case BT_FEXT_HAVE_ALL:
1235        case BT_FEXT_HAVE_NONE:
1236            return len == 1;
1237
1238        case BT_HAVE:
1239        case BT_FEXT_SUGGEST:
1240        case BT_FEXT_ALLOWED_FAST:
1241            return len == 5;
1242
1243        case BT_BITFIELD:
1244            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1245
1246        case BT_REQUEST:
1247        case BT_CANCEL:
1248        case BT_FEXT_REJECT:
1249            return len == 13;
1250
1251        case BT_PIECE:
1252            return len > 9 && len <= 16393;
1253
1254        case BT_PORT:
1255            return len == 3;
1256
1257        case BT_LTEP:
1258            return len >= 2;
1259
1260        default:
1261            return FALSE;
1262    }
1263}
1264
1265static int clientGotBlock( tr_peermsgs *               msgs,
1266                           const uint8_t *             block,
1267                           const struct peer_request * req );
1268
1269static int
1270readBtPiece( tr_peermsgs      * msgs,
1271             struct evbuffer  * inbuf,
1272             size_t             inlen,
1273             size_t           * setme_piece_bytes_read )
1274{
1275    struct peer_request * req = &msgs->incoming.blockReq;
1276
1277    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1278    dbgmsg( msgs, "In readBtPiece" );
1279
1280    if( !req->length )
1281    {
1282        if( inlen < 8 )
1283            return READ_LATER;
1284
1285        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1286        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1287        req->length = msgs->incoming.length - 9;
1288        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1289        return READ_NOW;
1290    }
1291    else
1292    {
1293        int err;
1294
1295        /* read in another chunk of data */
1296        const size_t nLeft = req->length - EVBUFFER_LENGTH( msgs->incoming.block );
1297        size_t n = MIN( nLeft, inlen );
1298        size_t i = n;
1299
1300        while( i > 0 )
1301        {
1302            uint8_t buf[MAX_STACK_ARRAY_SIZE];
1303            const size_t thisPass = MIN( i, sizeof( buf ) );
1304            tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, thisPass );
1305            evbuffer_add( msgs->incoming.block, buf, thisPass );
1306            i -= thisPass;
1307        }
1308
1309        fireClientGotData( msgs, n, TRUE );
1310        *setme_piece_bytes_read += n;
1311        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1312               n, req->index, req->offset, req->length,
1313               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1314        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1315            return READ_LATER;
1316
1317        /* we've got the whole block ... process it */
1318        err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), req );
1319
1320        /* cleanup */
1321        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH( msgs->incoming.block ) );
1322        req->length = 0;
1323        msgs->state = AWAITING_BT_LENGTH;
1324        if( !err )
1325            return READ_NOW;
1326        else {
1327            fireError( msgs, err );
1328            return READ_ERR;
1329        }
1330    }
1331}
1332
1333static int
1334readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1335{
1336    uint32_t      ui32;
1337    uint32_t      msglen = msgs->incoming.length;
1338    const uint8_t id = msgs->incoming.id;
1339    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1340    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1341
1342    --msglen; /* id length */
1343
1344    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1345
1346    if( inlen < msglen )
1347        return READ_LATER;
1348
1349    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1350    {
1351        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1352        fireError( msgs, EMSGSIZE );
1353        return READ_ERR;
1354    }
1355
1356    switch( id )
1357    {
1358        case BT_CHOKE:
1359            dbgmsg( msgs, "got Choke" );
1360            msgs->peer->clientIsChoked = 1;
1361            if( !fext )
1362                cancelAllRequestsToPeer( msgs, FALSE );
1363            break;
1364
1365        case BT_UNCHOKE:
1366            dbgmsg( msgs, "got Unchoke" );
1367            msgs->peer->clientIsChoked = 0;
1368            fireNeedReq( msgs );
1369            break;
1370
1371        case BT_INTERESTED:
1372            dbgmsg( msgs, "got Interested" );
1373            msgs->peer->peerIsInterested = 1;
1374            break;
1375
1376        case BT_NOT_INTERESTED:
1377            dbgmsg( msgs, "got Not Interested" );
1378            msgs->peer->peerIsInterested = 0;
1379            break;
1380
1381        case BT_HAVE:
1382            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1383            dbgmsg( msgs, "got Have: %u", ui32 );
1384            if( tr_bitfieldAdd( msgs->peer->have, ui32 ) ) {
1385                fireError( msgs, ERANGE );
1386                return READ_ERR;
1387            }
1388            updatePeerProgress( msgs );
1389            tr_rcTransferred( &msgs->torrent->swarmSpeed,
1390                              msgs->torrent->info.pieceSize );
1391            break;
1392
1393        case BT_BITFIELD:
1394        {
1395            dbgmsg( msgs, "got a bitfield" );
1396            tr_peerIoReadBytes( msgs->peer->io, inbuf, msgs->peer->have->bits, msglen );
1397            updatePeerProgress( msgs );
1398            fireNeedReq( msgs );
1399            break;
1400        }
1401
1402        case BT_REQUEST:
1403        {
1404            struct peer_request r;
1405            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1406            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1407            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1408            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1409            peerMadeRequest( msgs, &r );
1410            break;
1411        }
1412
1413        case BT_CANCEL:
1414        {
1415            struct peer_request r;
1416            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1417            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1418            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1419            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1420            if( reqListRemove( &msgs->peerAskedFor, &r ) && fext )
1421                protocolSendReject( msgs, &r );
1422            break;
1423        }
1424
1425        case BT_PIECE:
1426            assert( 0 ); /* handled elsewhere! */
1427            break;
1428
1429        case BT_PORT:
1430            dbgmsg( msgs, "Got a BT_PORT" );
1431            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->port );
1432            break;
1433
1434        case BT_FEXT_SUGGEST:
1435            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1436            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1437            if( fext )
1438                fireClientGotSuggest( msgs, ui32 );
1439            else {
1440                fireError( msgs, EMSGSIZE );
1441                return READ_ERR;
1442            }
1443            break;
1444
1445        case BT_FEXT_ALLOWED_FAST:
1446            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1447            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1448            if( fext )
1449                fireClientGotAllowedFast( msgs, ui32 );
1450            else {
1451                fireError( msgs, EMSGSIZE );
1452                return READ_ERR;
1453            }
1454            break;
1455
1456        case BT_FEXT_HAVE_ALL:
1457            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1458            if( fext ) {
1459                tr_bitfieldAddRange( msgs->peer->have, 0, msgs->torrent->info.pieceCount );
1460                updatePeerProgress( msgs );
1461            } else {
1462                fireError( msgs, EMSGSIZE );
1463                return READ_ERR;
1464            }
1465            break;
1466
1467        case BT_FEXT_HAVE_NONE:
1468            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1469            if( fext ) {
1470                tr_bitfieldClear( msgs->peer->have );
1471                updatePeerProgress( msgs );
1472            } else {
1473                fireError( msgs, EMSGSIZE );
1474                return READ_ERR;
1475            }
1476            break;
1477
1478        case BT_FEXT_REJECT:
1479        {
1480            struct peer_request r;
1481            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1482            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1483            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1484            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1485            if( fext )
1486                reqListRemove( &msgs->clientAskedFor, &r );
1487            else {
1488                fireError( msgs, EMSGSIZE );
1489                return READ_ERR;
1490            }
1491            break;
1492        }
1493
1494        case BT_LTEP:
1495            dbgmsg( msgs, "Got a BT_LTEP" );
1496            parseLtep( msgs, msglen, inbuf );
1497            break;
1498
1499        default:
1500            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1501            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1502            break;
1503    }
1504
1505    assert( msglen + 1 == msgs->incoming.length );
1506    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1507
1508    msgs->state = AWAITING_BT_LENGTH;
1509    return READ_NOW;
1510}
1511
1512static TR_INLINE void
1513decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount )
1514{
1515    tr_torrent * tor = msgs->torrent;
1516
1517    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1518}
1519
1520static TR_INLINE void
1521clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1522{
1523    decrementDownloadedCount( msgs, req->length );
1524}
1525
1526static void
1527addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1528{
1529    if( !msgs->peer->blame )
1530         msgs->peer->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1531    tr_bitfieldAdd( msgs->peer->blame, index );
1532}
1533
1534/* returns 0 on success, or an errno on failure */
1535static int
1536clientGotBlock( tr_peermsgs *               msgs,
1537                const uint8_t *             data,
1538                const struct peer_request * req )
1539{
1540    int err;
1541    tr_torrent * tor = msgs->torrent;
1542    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1543
1544    assert( msgs );
1545    assert( req );
1546
1547    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1548        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1549                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1550        return EMSGSIZE;
1551    }
1552
1553    /* save the block */
1554    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1555
1556    /**
1557    *** Remove the block from our `we asked for this' list
1558    **/
1559
1560    if( !reqListRemove( &msgs->clientAskedFor, req ) ) {
1561        clientGotUnwantedBlock( msgs, req );
1562        dbgmsg( msgs, "we didn't ask for this message..." );
1563        return 0;
1564    }
1565
1566    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1567            msgs->clientAskedFor.len );
1568
1569    /**
1570    *** Error checks
1571    **/
1572
1573    if( tr_cpBlockIsComplete( &tor->completion, block ) ) {
1574        dbgmsg( msgs, "we have this block already..." );
1575        clientGotUnwantedBlock( msgs, req );
1576        return 0;
1577    }
1578
1579    /**
1580    ***  Save the block
1581    **/
1582
1583    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1584        return err;
1585
1586    addPeerToBlamefield( msgs, req->index );
1587    fireGotBlock( msgs, req );
1588    return 0;
1589}
1590
1591static int peerPulse( void * vmsgs );
1592
1593static void
1594didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1595{
1596    tr_peermsgs * msgs = vmsgs;
1597    firePeerGotData( msgs, bytesWritten, wasPieceData );
1598
1599    if ( tr_isPeerIo( io ) && io->userData )
1600        peerPulse( msgs );
1601}
1602
1603static ReadState
1604canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1605{
1606    ReadState         ret;
1607    tr_peermsgs *     msgs = vmsgs;
1608    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1609    const size_t      inlen = EVBUFFER_LENGTH( in );
1610
1611    dbgmsg( msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state );
1612
1613    if( !inlen )
1614    {
1615        ret = READ_LATER;
1616    }
1617    else if( msgs->state == AWAITING_BT_PIECE )
1618    {
1619        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1620    }
1621    else switch( msgs->state )
1622    {
1623        case AWAITING_BT_LENGTH:
1624            ret = readBtLength ( msgs, in, inlen ); break;
1625
1626        case AWAITING_BT_ID:
1627            ret = readBtId     ( msgs, in, inlen ); break;
1628
1629        case AWAITING_BT_MESSAGE:
1630            ret = readBtMessage( msgs, in, inlen ); break;
1631
1632        default:
1633            ret = READ_ERR;
1634            assert( 0 );
1635    }
1636
1637    dbgmsg( msgs, "canRead: ret is %d", (int)ret );
1638
1639    /* log the raw data that was read */
1640    if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
1641        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1642
1643    return ret;
1644}
1645
1646/**
1647***
1648**/
1649
1650static int
1651ratePulse( tr_peermsgs * msgs, uint64_t now )
1652{
1653    const double rateToClient = tr_peerGetPieceSpeed( msgs->peer, now, TR_PEER_TO_CLIENT );
1654    const int seconds = 10;
1655    const int floor = 8;
1656    const int estimatedBlocksInPeriod = ( rateToClient * seconds * 1024 ) / msgs->torrent->blockSize;
1657
1658    msgs->maxActiveRequests = floor + estimatedBlocksInPeriod;
1659
1660    if( msgs->reqq > 0 )
1661        msgs->maxActiveRequests = MIN( msgs->maxActiveRequests, msgs->reqq );
1662
1663    return TRUE;
1664}
1665
1666static size_t
1667fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1668{
1669    size_t bytesWritten = 0;
1670    struct peer_request req;
1671    const tr_bool haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1672    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1673
1674    /**
1675    ***  Protocol messages
1676    **/
1677
1678    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1679    {
1680        dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1681        msgs->outMessagesBatchedAt = now;
1682    }
1683    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1684    {
1685        const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
1686        /* flush the protocol messages */
1687        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1688        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, FALSE );
1689        msgs->clientSentAnythingAt = now;
1690        msgs->outMessagesBatchedAt = 0;
1691        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1692        bytesWritten +=  len;
1693    }
1694
1695    /**
1696    ***  Blocks
1697    **/
1698
1699    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= msgs->torrent->blockSize )
1700        && popNextRequest( msgs, &req ) )
1701    {
1702        if( requestIsValid( msgs, &req )
1703            && tr_cpPieceIsComplete( &msgs->torrent->completion, req.index ) )
1704        {
1705            int err;
1706            static uint8_t buf[MAX_BLOCK_SIZE];
1707
1708            /* send a block */
1709            if(( err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf ))) {
1710                fireError( msgs, err );
1711                bytesWritten = 0;
1712                msgs = NULL;
1713            } else {
1714                tr_peerIo * io = msgs->peer->io;
1715                struct evbuffer * out = tr_getBuffer( );
1716                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1717                tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1718                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1719                tr_peerIoWriteUint32( io, out, req.index );
1720                tr_peerIoWriteUint32( io, out, req.offset );
1721                tr_peerIoWriteBytes ( io, out, buf, req.length );
1722                tr_peerIoWriteBuf( io, out, TRUE );
1723                bytesWritten += EVBUFFER_LENGTH( out );
1724                msgs->clientSentAnythingAt = now;
1725                tr_releaseBuffer( out );
1726            }
1727        }
1728        else if( fext ) /* peer needs a reject message */
1729        {
1730            protocolSendReject( msgs, &req );
1731        }
1732    }
1733
1734    /**
1735    ***  Keepalive
1736    **/
1737
1738    if( ( msgs != NULL )
1739        && ( msgs->clientSentAnythingAt != 0 )
1740        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1741    {
1742        dbgmsg( msgs, "sending a keepalive message" );
1743        tr_peerIoWriteUint32( msgs->peer->io, msgs->outMessages, 0 );
1744        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1745    }
1746
1747    return bytesWritten;
1748}
1749
1750static int
1751peerPulse( void * vmsgs )
1752{
1753    tr_peermsgs * msgs = vmsgs;
1754    const time_t  now = time( NULL );
1755
1756    ratePulse( msgs, now );
1757
1758    pumpRequestQueue( msgs, now );
1759    expireOldRequests( msgs, now );
1760
1761    for( ;; )
1762        if( fillOutputBuffer( msgs, now ) < 1 )
1763            break;
1764
1765    return TRUE; /* loop forever */
1766}
1767
1768void
1769tr_peerMsgsPulse( tr_peermsgs * msgs )
1770{
1771    if( msgs != NULL )
1772        peerPulse( msgs );
1773}
1774
1775static void
1776gotError( tr_peerIo  * io UNUSED,
1777          short        what,
1778          void       * vmsgs )
1779{
1780    if( what & EVBUFFER_TIMEOUT )
1781        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1782    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1783        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1784               what, errno, tr_strerror( errno ) );
1785    fireError( vmsgs, ENOTCONN );
1786}
1787
1788static void
1789sendBitfield( tr_peermsgs * msgs )
1790{
1791    struct evbuffer * out = msgs->outMessages;
1792    tr_bitfield *     field;
1793    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
1794    size_t            i;
1795    size_t            lazyCount = 0;
1796
1797    field = tr_bitfieldDup( tr_cpPieceBitfield( &msgs->torrent->completion ) );
1798
1799    if( tr_sessionIsLazyBitfieldEnabled( msgs->session ) )
1800    {
1801        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
1802            speed over a truly random sample -- let's limit the pool size to
1803            the first 1000 pieces so large torrents don't bog things down */
1804        size_t poolSize;
1805        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
1806        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
1807
1808        /* build the pool */
1809        for( i=poolSize=0; i<maxPoolSize; ++i )
1810            if( tr_bitfieldHas( field, i ) )
1811                pool[poolSize++] = i;
1812
1813        /* pull random piece indices from the pool */
1814        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
1815        {
1816            const int pos = tr_cryptoWeakRandInt( poolSize );
1817            const tr_piece_index_t piece = pool[pos];
1818            tr_bitfieldRem( field, piece );
1819            lazyPieces[lazyCount++] = piece;
1820            pool[pos] = pool[--poolSize];
1821        }
1822
1823        /* cleanup */
1824        tr_free( pool );
1825    }
1826
1827    tr_peerIoWriteUint32( msgs->peer->io, out,
1828                          sizeof( uint8_t ) + field->byteCount );
1829    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_BITFIELD );
1830    tr_peerIoWriteBytes ( msgs->peer->io, out, field->bits, field->byteCount );
1831    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu",
1832            EVBUFFER_LENGTH( out ) );
1833    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1834
1835    for( i = 0; i < lazyCount; ++i )
1836        protocolSendHave( msgs, lazyPieces[i] );
1837
1838    tr_bitfieldFree( field );
1839}
1840
1841static void
1842tellPeerWhatWeHave( tr_peermsgs * msgs )
1843{
1844    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1845
1846    if( fext && ( tr_cpGetStatus( &msgs->torrent->completion ) == TR_SEED ) )
1847    {
1848        protocolSendHaveAll( msgs );
1849    }
1850    else if( fext && ( tr_cpHaveValid( &msgs->torrent->completion ) == 0 ) )
1851    {
1852        protocolSendHaveNone( msgs );
1853    }
1854    else
1855    {
1856        sendBitfield( msgs );
1857    }
1858}
1859
1860/**
1861***
1862**/
1863
1864/* some peers give us error messages if we send
1865   more than this many peers in a single pex message
1866   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1867#define MAX_PEX_ADDED 50
1868#define MAX_PEX_DROPPED 50
1869
1870typedef struct
1871{
1872    tr_pex *  added;
1873    tr_pex *  dropped;
1874    tr_pex *  elements;
1875    int       addedCount;
1876    int       droppedCount;
1877    int       elementCount;
1878}
1879PexDiffs;
1880
1881static void
1882pexAddedCb( void * vpex,
1883            void * userData )
1884{
1885    PexDiffs * diffs = userData;
1886    tr_pex *   pex = vpex;
1887
1888    if( diffs->addedCount < MAX_PEX_ADDED )
1889    {
1890        diffs->added[diffs->addedCount++] = *pex;
1891        diffs->elements[diffs->elementCount++] = *pex;
1892    }
1893}
1894
1895static TR_INLINE void
1896pexDroppedCb( void * vpex,
1897              void * userData )
1898{
1899    PexDiffs * diffs = userData;
1900    tr_pex *   pex = vpex;
1901
1902    if( diffs->droppedCount < MAX_PEX_DROPPED )
1903    {
1904        diffs->dropped[diffs->droppedCount++] = *pex;
1905    }
1906}
1907
1908static TR_INLINE void
1909pexElementCb( void * vpex,
1910              void * userData )
1911{
1912    PexDiffs * diffs = userData;
1913    tr_pex * pex = vpex;
1914
1915    diffs->elements[diffs->elementCount++] = *pex;
1916}
1917
1918static void
1919sendPex( tr_peermsgs * msgs )
1920{
1921    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
1922    {
1923        PexDiffs diffs;
1924        PexDiffs diffs6;
1925        tr_pex * newPex = NULL;
1926        tr_pex * newPex6 = NULL;
1927        const int newCount = tr_peerMgrGetPeers( msgs->torrent, &newPex, TR_AF_INET );
1928        const int newCount6 = tr_peerMgrGetPeers( msgs->torrent, &newPex6, TR_AF_INET6 );
1929
1930        /* build the diffs */
1931        diffs.added = tr_new( tr_pex, newCount );
1932        diffs.addedCount = 0;
1933        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1934        diffs.droppedCount = 0;
1935        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1936        diffs.elementCount = 0;
1937        tr_set_compare( msgs->pex, msgs->pexCount,
1938                        newPex, newCount,
1939                        tr_pexCompare, sizeof( tr_pex ),
1940                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
1941        diffs6.added = tr_new( tr_pex, newCount6 );
1942        diffs6.addedCount = 0;
1943        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
1944        diffs6.droppedCount = 0;
1945        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
1946        diffs6.elementCount = 0;
1947        tr_set_compare( msgs->pex6, msgs->pexCount6,
1948                        newPex6, newCount6,
1949                        tr_pexCompare, sizeof( tr_pex ),
1950                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
1951        dbgmsg(
1952            msgs,
1953            "pex: old peer count %d, new peer count %d, added %d, removed %d",
1954            msgs->pexCount, newCount + newCount6,
1955            diffs.addedCount + diffs6.addedCount,
1956            diffs.droppedCount + diffs6.droppedCount );
1957
1958        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
1959            !diffs6.droppedCount )
1960        {
1961            tr_free( diffs.elements );
1962            tr_free( diffs6.elements );
1963        }
1964        else
1965        {
1966            int  i;
1967            tr_benc val;
1968            char * benc;
1969            int bencLen;
1970            uint8_t * tmp, *walk;
1971            tr_peerIo       * io  = msgs->peer->io;
1972            struct evbuffer * out = msgs->outMessages;
1973
1974            /* update peer */
1975            tr_free( msgs->pex );
1976            msgs->pex = diffs.elements;
1977            msgs->pexCount = diffs.elementCount;
1978            tr_free( msgs->pex6 );
1979            msgs->pex6 = diffs6.elements;
1980            msgs->pexCount6 = diffs6.elementCount;
1981
1982            /* build the pex payload */
1983            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
1984                                         * speed vs. likelihood? */
1985
1986            /* "added" */
1987            tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1988            for( i = 0; i < diffs.addedCount; ++i )
1989            {
1990                tr_suspectAddress( &diffs.added[i].addr, "pex" );
1991                memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
1992                memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1993            }
1994            assert( ( walk - tmp ) == diffs.addedCount * 6 );
1995            tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
1996            tr_free( tmp );
1997
1998            /* "added.f" */
1999            tmp = walk = tr_new( uint8_t, diffs.addedCount );
2000            for( i = 0; i < diffs.addedCount; ++i )
2001                *walk++ = diffs.added[i].flags;
2002            assert( ( walk - tmp ) == diffs.addedCount );
2003            tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2004            tr_free( tmp );
2005
2006            /* "dropped" */
2007            tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2008            for( i = 0; i < diffs.droppedCount; ++i )
2009            {
2010                memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2011                memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2012            }
2013            assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2014            tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2015            tr_free( tmp );
2016           
2017            /* "added6" */
2018            tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2019            for( i = 0; i < diffs6.addedCount; ++i )
2020            {
2021                tr_suspectAddress( &diffs6.added[i].addr, "pex6" );
2022                memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2023                walk += 16;
2024                memcpy( walk, &diffs6.added[i].port, 2 );
2025                walk += 2;
2026            }
2027            assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2028            tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2029            tr_free( tmp );
2030           
2031            /* "added6.f" */
2032            tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2033            for( i = 0; i < diffs6.addedCount; ++i )
2034                *walk++ = diffs6.added[i].flags;
2035            assert( ( walk - tmp ) == diffs6.addedCount );
2036            tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2037            tr_free( tmp );
2038           
2039            /* "dropped6" */
2040            tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2041            for( i = 0; i < diffs6.droppedCount; ++i )
2042            {
2043                memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2044                walk += 16;
2045                memcpy( walk, &diffs6.dropped[i].port, 2 );
2046                walk += 2;
2047            }
2048            assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2049            tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2050            tr_free( tmp );
2051
2052            /* write the pex message */
2053            benc = tr_bencSave( &val, &bencLen );
2054            tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + bencLen );
2055            tr_peerIoWriteUint8 ( io, out, BT_LTEP );
2056            tr_peerIoWriteUint8 ( io, out, msgs->ut_pex_id );
2057            tr_peerIoWriteBytes ( io, out, benc, bencLen );
2058            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2059            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
2060            dbgOutMessageLen( msgs );
2061
2062            tr_free( benc );
2063            tr_bencFree( &val );
2064        }
2065
2066        /* cleanup */
2067        tr_free( diffs.added );
2068        tr_free( diffs.dropped );
2069        tr_free( newPex );
2070        tr_free( diffs6.added );
2071        tr_free( diffs6.dropped );
2072        tr_free( newPex6 );
2073
2074        msgs->clientSentPexAt = time( NULL );
2075    }
2076}
2077
2078static TR_INLINE int
2079pexPulse( void * vpeer )
2080{
2081    sendPex( vpeer );
2082    return TRUE;
2083}
2084
2085/**
2086***
2087**/
2088
2089tr_peermsgs*
2090tr_peerMsgsNew( struct tr_torrent * torrent,
2091                struct tr_peer    * peer,
2092                tr_delivery_func    func,
2093                void              * userData,
2094                tr_publisher_tag  * setme )
2095{
2096    tr_peermsgs * m;
2097
2098    assert( peer );
2099    assert( peer->io );
2100
2101    m = tr_new0( tr_peermsgs, 1 );
2102    m->publisher = TR_PUBLISHER_INIT;
2103    m->peer = peer;
2104    m->session = torrent->session;
2105    m->torrent = torrent;
2106    m->peer->clientIsChoked = 1;
2107    m->peer->peerIsChoked = 1;
2108    m->peer->clientIsInterested = 0;
2109    m->peer->peerIsInterested = 0;
2110    m->peer->have = tr_bitfieldNew( torrent->info.pieceCount );
2111    m->state = AWAITING_BT_LENGTH;
2112    m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL );
2113    m->outMessages = evbuffer_new( );
2114    m->outMessagesBatchedAt = 0;
2115    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2116    m->incoming.block = evbuffer_new( );
2117    m->peerAskedFor = REQUEST_LIST_INIT;
2118    m->clientAskedFor = REQUEST_LIST_INIT;
2119    m->clientWillAskFor = REQUEST_LIST_INIT;
2120    peer->msgs = m;
2121
2122    *setme = tr_publisherSubscribe( &m->publisher, func, userData );
2123
2124    if( tr_peerIoSupportsLTEP( peer->io ) )
2125        sendLtepHandshake( m );
2126
2127    tellPeerWhatWeHave( m );
2128
2129    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2130    ratePulse( m, tr_date() );
2131
2132    return m;
2133}
2134
2135void
2136tr_peerMsgsFree( tr_peermsgs* msgs )
2137{
2138    if( msgs )
2139    {
2140        tr_timerFree( &msgs->pexTimer );
2141        tr_publisherDestruct( &msgs->publisher );
2142        reqListClear( &msgs->clientWillAskFor );
2143        reqListClear( &msgs->clientAskedFor );
2144        reqListClear( &msgs->peerAskedFor );
2145
2146        evbuffer_free( msgs->incoming.block );
2147        evbuffer_free( msgs->outMessages );
2148        tr_free( msgs->pex6 );
2149        tr_free( msgs->pex );
2150
2151        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2152        tr_free( msgs );
2153    }
2154}
2155
2156void
2157tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2158                        tr_publisher_tag tag )
2159{
2160    tr_publisherUnsubscribe( &peer->publisher, tag );
2161}
2162
Note: See TracBrowser for help on using the repository browser.