source: trunk/libtransmission/peer-msgs.c @ 7824

Last change on this file since 7824 was 7824, checked in by charles, 13 years ago

(trunk libT) #1748: possible fix for the kqueue corruption errors by consolidating the three per-torrent libevent timers into three session-wide timers. Since most people reporting this error have lots of torrents loaded, consider a hypothetical example: if you had 500 torrents, this patch will reduce 1,500 libevent timers down to just three timers. On top of that, those three have simpler life cycles too...

  • Property svn:keywords set to Date Rev Author Id
File size: 62.4 KB
Line 
1/*
2 * This file Copyright (C) 2007-2009 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 7824 2009-02-04 16:58:52Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <event.h>
21
22#include "transmission.h"
23#include "session.h"
24#include "bencode.h"
25#include "completion.h"
26#include "crypto.h"
27#include "inout.h"
28#ifdef WIN32
29#include "net.h" /* for ECONN */
30#endif
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-msgs.h"
34#include "platform.h" /* MAX_STACK_ARRAY_SIZE */
35#include "ratecontrol.h"
36#include "request-list.h"
37#include "stats.h"
38#include "torrent.h"
39#include "trevent.h"
40#include "utils.h"
41
42/**
43***
44**/
45
46enum
47{
48    BT_CHOKE                = 0,
49    BT_UNCHOKE              = 1,
50    BT_INTERESTED           = 2,
51    BT_NOT_INTERESTED       = 3,
52    BT_HAVE                 = 4,
53    BT_BITFIELD             = 5,
54    BT_REQUEST              = 6,
55    BT_PIECE                = 7,
56    BT_CANCEL               = 8,
57    BT_PORT                 = 9,
58
59    BT_FEXT_SUGGEST         = 13,
60    BT_FEXT_HAVE_ALL        = 14,
61    BT_FEXT_HAVE_NONE       = 15,
62    BT_FEXT_REJECT          = 16,
63    BT_FEXT_ALLOWED_FAST    = 17,
64
65    BT_LTEP                 = 20,
66
67    LTEP_HANDSHAKE          = 0,
68
69    TR_LTEP_PEX             = 1,
70
71
72
73    MIN_CHOKE_PERIOD_SEC    = ( 10 ),
74
75    /* idle seconds before we send a keepalive */
76    KEEPALIVE_INTERVAL_SECS = 100,
77
78    PEX_INTERVAL            = ( 90 * 1000 ), /* msec between sendPex() calls */
79
80
81    MAX_BLOCK_SIZE          = ( 1024 * 16 ),
82
83
84    /* how long an unsent request can stay queued before it's returned
85       back to the peer-mgr's pool of requests */
86    QUEUED_REQUEST_TTL_SECS = 20,
87
88    /* how long a sent request can stay queued before it's returned
89       back to the peer-mgr's pool of requests */
90    SENT_REQUEST_TTL_SECS = 240,
91
92    /* used in lowering the outMessages queue period */
93    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
94    HIGH_PRIORITY_INTERVAL_SECS = 2,
95    LOW_PRIORITY_INTERVAL_SECS = 20,
96
97    /* number of pieces to remove from the bitfield when
98     * lazy bitfields are turned on */
99    LAZY_PIECE_COUNT = 26,
100
101    /* number of pieces we'll allow in our fast set */
102    MAX_FAST_SET_SIZE = 3
103};
104
105enum
106{
107    AWAITING_BT_LENGTH,
108    AWAITING_BT_ID,
109    AWAITING_BT_MESSAGE,
110    AWAITING_BT_PIECE
111};
112
113/**
114***
115**/
116
117/* this is raw, unchanged data from the peer regarding
118 * the current message that it's sending us. */
119struct tr_incoming
120{
121    uint8_t                id;
122    uint32_t               length; /* includes the +1 for id length */
123    struct peer_request    blockReq; /* metadata for incoming blocks */
124    struct evbuffer *      block; /* piece data for incoming blocks */
125};
126
127/**
128 * Low-level communication state information about a connected peer.
129 *
130 * This structure remembers the low-level protocol states that we're
131 * in with this peer, such as active requests, pex messages, and so on.
132 * Its fields are all private to peer-msgs.c.
133 *
134 * Data not directly involved with sending & receiving messages is
135 * stored in tr_peer, where it can be accessed by both peermsgs and
136 * the peer manager.
137 *
138 * @see struct peer_atom
139 * @see tr_peer
140 */
141struct tr_peermsgs
142{
143    tr_bool         peerSupportsPex;
144    tr_bool         clientSentLtepHandshake;
145    tr_bool         peerSentLtepHandshake;
146    tr_bool         haveFastSet;
147
148    uint8_t         state;
149    uint8_t         ut_pex_id;
150    uint16_t        pexCount;
151    uint16_t        pexCount6;
152    uint16_t        maxActiveRequests;
153
154    size_t                 fastsetSize;
155    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
156
157    /* how long the outMessages batch should be allowed to grow before
158     * it's flushed -- some messages (like requests >:) should be sent
159     * very quickly; others aren't as urgent. */
160    int                    outMessagesBatchPeriod;
161
162    tr_peer *              peer;
163
164    tr_session *           session;
165    tr_torrent *           torrent;
166
167    tr_publisher           publisher;
168
169    struct evbuffer *      outMessages; /* all the non-piece messages */
170
171    struct request_list    peerAskedFor;
172    struct request_list    clientAskedFor;
173    struct request_list    clientWillAskFor;
174
175    tr_timer             * pexTimer;
176    tr_pex               * pex;
177    tr_pex               * pex6;
178
179    time_t                 clientSentPexAt;
180    time_t                 clientSentAnythingAt;
181
182    /* when we started batching the outMessages */
183    time_t                outMessagesBatchedAt;
184
185    struct tr_incoming    incoming;
186
187    /* if the peer supports the Extension Protocol in BEP 10 and
188       supplied a reqq argument, it's stored here.  otherwise the
189       value is zero and should be ignored. */
190    int64_t               reqq;
191};
192
193/**
194***
195**/
196
197static void
198myDebug( const char * file, int line,
199         const struct tr_peermsgs * msgs,
200         const char * fmt, ... )
201{
202    FILE * fp = tr_getLog( );
203
204    if( fp )
205    {
206        va_list           args;
207        char              timestr[64];
208        struct evbuffer * buf = tr_getBuffer( );
209        char *            base = tr_basename( file );
210
211        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
212                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
213                             msgs->torrent->info.name,
214                             tr_peerIoGetAddrStr( msgs->peer->io ),
215                             msgs->peer->client );
216        va_start( args, fmt );
217        evbuffer_add_vprintf( buf, fmt, args );
218        va_end( args );
219        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
220        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
221
222        tr_free( base );
223        tr_releaseBuffer( buf );
224    }
225}
226
227#define dbgmsg( msgs, ... ) \
228    do { \
229        if( tr_deepLoggingIsActive( ) ) \
230            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
231    } while( 0 )
232
233/**
234***
235**/
236
237static void
238pokeBatchPeriod( tr_peermsgs * msgs,
239                 int           interval )
240{
241    if( msgs->outMessagesBatchPeriod > interval )
242    {
243        msgs->outMessagesBatchPeriod = interval;
244        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
245    }
246}
247
248static TR_INLINE void
249dbgOutMessageLen( tr_peermsgs * msgs )
250{
251    dbgmsg( msgs, "outMessage size is now %zu", EVBUFFER_LENGTH( msgs->outMessages ) );
252}
253
254static void
255protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
256{
257    tr_peerIo       * io  = msgs->peer->io;
258    struct evbuffer * out = msgs->outMessages;
259
260    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
261
262    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
263    tr_peerIoWriteUint8 ( io, out, BT_FEXT_REJECT );
264    tr_peerIoWriteUint32( io, out, req->index );
265    tr_peerIoWriteUint32( io, out, req->offset );
266    tr_peerIoWriteUint32( io, out, req->length );
267
268    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
269    dbgOutMessageLen( msgs );
270}
271
272static void
273protocolSendRequest( tr_peermsgs               * msgs,
274                     const struct peer_request * req )
275{
276    tr_peerIo       * io  = msgs->peer->io;
277    struct evbuffer * out = msgs->outMessages;
278
279    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
280    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
281    tr_peerIoWriteUint32( io, out, req->index );
282    tr_peerIoWriteUint32( io, out, req->offset );
283    tr_peerIoWriteUint32( io, out, req->length );
284
285    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
286    dbgOutMessageLen( msgs );
287    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
288}
289
290static void
291protocolSendCancel( tr_peermsgs               * msgs,
292                    const struct peer_request * req )
293{
294    tr_peerIo       * io  = msgs->peer->io;
295    struct evbuffer * out = msgs->outMessages;
296
297    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
298    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
299    tr_peerIoWriteUint32( io, out, req->index );
300    tr_peerIoWriteUint32( io, out, req->offset );
301    tr_peerIoWriteUint32( io, out, req->length );
302
303    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
304    dbgOutMessageLen( msgs );
305    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
306}
307
308static void
309protocolSendHave( tr_peermsgs * msgs,
310                  uint32_t      index )
311{
312    tr_peerIo       * io  = msgs->peer->io;
313    struct evbuffer * out = msgs->outMessages;
314
315    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
316    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
317    tr_peerIoWriteUint32( io, out, index );
318
319    dbgmsg( msgs, "sending Have %u", index );
320    dbgOutMessageLen( msgs );
321    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
322}
323
324#if 0
325static void
326protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
327{
328    tr_peerIo       * io  = msgs->peer->io;
329    struct evbuffer * out = msgs->outMessages;
330
331    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
332
333    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
334    tr_peerIoWriteUint8 ( io, out, BT_FEXT_ALLOWED_FAST );
335    tr_peerIoWriteUint32( io, out, pieceIndex );
336
337    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
338    dbgOutMessageLen( msgs );
339}
340#endif
341
342static void
343protocolSendChoke( tr_peermsgs * msgs,
344                   int           choke )
345{
346    tr_peerIo       * io  = msgs->peer->io;
347    struct evbuffer * out = msgs->outMessages;
348
349    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
350    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
351
352    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
353    dbgOutMessageLen( msgs );
354    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
355}
356
357static void
358protocolSendHaveAll( tr_peermsgs * msgs )
359{
360    tr_peerIo       * io  = msgs->peer->io;
361    struct evbuffer * out = msgs->outMessages;
362
363    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
364
365    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
366    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_ALL );
367
368    dbgmsg( msgs, "sending HAVE_ALL..." );
369    dbgOutMessageLen( msgs );
370    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
371}
372
373static void
374protocolSendHaveNone( tr_peermsgs * msgs )
375{
376    tr_peerIo       * io  = msgs->peer->io;
377    struct evbuffer * out = msgs->outMessages;
378
379    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
380
381    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
382    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_NONE );
383
384    dbgmsg( msgs, "sending HAVE_NONE..." );
385    dbgOutMessageLen( msgs );
386    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
387}
388
389/**
390***  EVENTS
391**/
392
393static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0, 0 };
394
395static void
396publish( tr_peermsgs * msgs, tr_peer_event * e )
397{
398    assert( msgs->peer );
399    assert( msgs->peer->msgs == msgs );
400
401    tr_publisherPublish( &msgs->publisher, msgs->peer, e );
402}
403
404static void
405fireError( tr_peermsgs * msgs, int err )
406{
407    tr_peer_event e = blankEvent;
408    e.eventType = TR_PEER_ERROR;
409    e.err = err;
410    publish( msgs, &e );
411}
412
413static void
414fireUploadOnly( tr_peermsgs * msgs, tr_bool uploadOnly )
415{
416    tr_peer_event e = blankEvent;
417    e.eventType = TR_PEER_UPLOAD_ONLY;
418    e.uploadOnly = uploadOnly;
419    publish( msgs, &e );
420}
421
422static void
423firePeerProgress( tr_peermsgs * msgs )
424{
425    tr_peer_event e = blankEvent;
426    e.eventType = TR_PEER_PEER_PROGRESS;
427    e.progress = msgs->peer->progress;
428    publish( msgs, &e );
429}
430
431static void
432fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
433{
434    tr_peer_event e = blankEvent;
435    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
436    e.pieceIndex = req->index;
437    e.offset = req->offset;
438    e.length = req->length;
439    publish( msgs, &e );
440}
441
442static void
443fireClientGotData( tr_peermsgs * msgs,
444                   uint32_t      length,
445                   int           wasPieceData )
446{
447    tr_peer_event e = blankEvent;
448
449    e.length = length;
450    e.eventType = TR_PEER_CLIENT_GOT_DATA;
451    e.wasPieceData = wasPieceData;
452    publish( msgs, &e );
453}
454
455static void
456fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
457{
458    tr_peer_event e = blankEvent;
459    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
460    e.pieceIndex = pieceIndex;
461    publish( msgs, &e );
462}
463
464static void
465fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
466{
467    tr_peer_event e = blankEvent;
468    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
469    e.pieceIndex = pieceIndex;
470    publish( msgs, &e );
471}
472
473static void
474firePeerGotData( tr_peermsgs  * msgs,
475                 uint32_t       length,
476                 int            wasPieceData )
477{
478    tr_peer_event e = blankEvent;
479
480    e.length = length;
481    e.eventType = TR_PEER_PEER_GOT_DATA;
482    e.wasPieceData = wasPieceData;
483
484    publish( msgs, &e );
485}
486
487static void
488fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
489{
490    tr_peer_event e = blankEvent;
491    e.eventType = TR_PEER_CANCEL;
492    e.pieceIndex = req->index;
493    e.offset = req->offset;
494    e.length = req->length;
495    publish( msgs, &e );
496}
497
498/**
499***  ALLOWED FAST SET
500***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
501**/
502
503size_t
504tr_generateAllowedSet( tr_piece_index_t * setmePieces,
505                       size_t             desiredSetSize,
506                       size_t             pieceCount,
507                       const uint8_t    * infohash,
508                       const tr_address * addr )
509{
510    size_t setSize = 0;
511
512    assert( setmePieces );
513    assert( desiredSetSize <= pieceCount );
514    assert( desiredSetSize );
515    assert( pieceCount );
516    assert( infohash );
517    assert( addr );
518
519    if( addr->type == TR_AF_INET )
520    {
521        uint8_t w[SHA_DIGEST_LENGTH + 4];
522        uint8_t x[SHA_DIGEST_LENGTH];
523
524        *(uint32_t*)w = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
525        memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );                /* (2) */
526        tr_sha1( x, w, sizeof( w ), NULL );                          /* (3) */
527
528        while( setSize<desiredSetSize )
529        {
530            int i;
531            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
532            {
533                size_t k;
534                uint32_t j = i * 4;                                  /* (5) */
535                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
536                uint32_t index = y % pieceCount;                     /* (7) */
537
538                for( k=0; k<setSize; ++k )                           /* (8) */
539                    if( setmePieces[k] == index )
540                        break;
541
542                if( k == setSize )
543                    setmePieces[setSize++] = index;                  /* (9) */
544            }
545
546            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
547        }
548    }
549
550    return setSize;
551}
552
553static void
554updateFastSet( tr_peermsgs * msgs UNUSED )
555{
556#if 0
557    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
558    const int peerIsNeedy = msgs->peer->progress < 0.10;
559
560    if( fext && peerIsNeedy && !msgs->haveFastSet )
561    {
562        size_t i;
563        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
564        const tr_info * inf = &msgs->torrent->info;
565        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
566
567        /* build the fast set */
568        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
569        msgs->haveFastSet = 1;
570
571        /* send it to the peer */
572        for( i=0; i<msgs->fastsetSize; ++i )
573            protocolSendAllowedFast( msgs, msgs->fastset[i] );
574    }
575#endif
576}
577
578/**
579***  INTEREST
580**/
581
582static int
583isPieceInteresting( const tr_peermsgs * msgs,
584                    tr_piece_index_t    piece )
585{
586    const tr_torrent * torrent = msgs->torrent;
587
588    return ( !torrent->info.pieces[piece].dnd )                 /* we want it */
589          && ( !tr_cpPieceIsComplete( &torrent->completion, piece ) ) /* !have */
590          && ( tr_bitfieldHas( msgs->peer->have, piece ) );    /* peer has it */
591}
592
593/* "interested" means we'll ask for piece data if they unchoke us */
594static int
595isPeerInteresting( const tr_peermsgs * msgs )
596{
597    tr_piece_index_t    i;
598    const tr_torrent *  torrent;
599    const tr_bitfield * bitfield;
600    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
601
602    if( clientIsSeed )
603        return FALSE;
604
605    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
606        return FALSE;
607
608    torrent = msgs->torrent;
609    bitfield = tr_cpPieceBitfield( &torrent->completion );
610
611    if( !msgs->peer->have )
612        return TRUE;
613
614    assert( bitfield->byteCount == msgs->peer->have->byteCount );
615
616    for( i = 0; i < torrent->info.pieceCount; ++i )
617        if( isPieceInteresting( msgs, i ) )
618            return TRUE;
619
620    return FALSE;
621}
622
623static void
624sendInterest( tr_peermsgs * msgs,
625              int           weAreInterested )
626{
627    struct evbuffer * out = msgs->outMessages;
628
629    assert( msgs );
630    assert( weAreInterested == 0 || weAreInterested == 1 );
631
632    msgs->peer->clientIsInterested = weAreInterested;
633    dbgmsg( msgs, "Sending %s", weAreInterested ? "Interested" : "Not Interested" );
634    tr_peerIoWriteUint32( msgs->peer->io, out, sizeof( uint8_t ) );
635    tr_peerIoWriteUint8 ( msgs->peer->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
636
637    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
638    dbgOutMessageLen( msgs );
639}
640
641static void
642updateInterest( tr_peermsgs * msgs )
643{
644    const int i = isPeerInteresting( msgs );
645
646    if( i != msgs->peer->clientIsInterested )
647        sendInterest( msgs, i );
648}
649
650static int
651popNextRequest( tr_peermsgs *         msgs,
652                struct peer_request * setme )
653{
654    return reqListPop( &msgs->peerAskedFor, setme );
655}
656
657static void
658cancelAllRequestsToClient( tr_peermsgs * msgs )
659{
660    struct peer_request req;
661    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
662
663    while( popNextRequest( msgs, &req ) )
664        if( mustSendCancel )
665            protocolSendReject( msgs, &req );
666}
667
668void
669tr_peerMsgsSetChoke( tr_peermsgs * msgs,
670                     int           choke )
671{
672    const time_t now = time( NULL );
673    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
674
675    assert( msgs );
676    assert( msgs->peer );
677    assert( choke == 0 || choke == 1 );
678
679    if( msgs->peer->chokeChangedAt > fibrillationTime )
680    {
681        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
682    }
683    else if( msgs->peer->peerIsChoked != choke )
684    {
685        msgs->peer->peerIsChoked = choke;
686        if( choke )
687            cancelAllRequestsToClient( msgs );
688        protocolSendChoke( msgs, choke );
689        msgs->peer->chokeChangedAt = now;
690    }
691}
692
693/**
694***
695**/
696
697void
698tr_peerMsgsHave( tr_peermsgs * msgs,
699                 uint32_t      index )
700{
701    protocolSendHave( msgs, index );
702
703    /* since we have more pieces now, we might not be interested in this peer */
704    updateInterest( msgs );
705}
706
707/**
708***
709**/
710
711static int
712reqIsValid( const tr_peermsgs * peer,
713            uint32_t            index,
714            uint32_t            offset,
715            uint32_t            length )
716{
717    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
718}
719
720static int
721requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
722{
723    return reqIsValid( msgs, req->index, req->offset, req->length );
724}
725
726static void
727expireFromList( tr_peermsgs          * msgs,
728                struct request_list  * list,
729                const time_t           oldestAllowed )
730{
731    size_t i;
732    struct request_list tmp = REQUEST_LIST_INIT;
733
734    /* since the fifo list is sorted by time, the oldest will be first */
735    if( !list->len || ( list->fifo[0].time_requested >= oldestAllowed ) )
736        return;
737
738    /* if we found one too old, start pruning them */
739    reqListCopy( &tmp, list );
740    for( i=0; i<tmp.len; ++i ) {
741        const struct peer_request * req = &tmp.fifo[i];
742        if( req->time_requested >= oldestAllowed )
743            break;
744        tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
745    }
746    reqListClear( &tmp );
747}
748
749static void
750expireOldRequests( tr_peermsgs * msgs, const time_t now  )
751{
752    time_t oldestAllowed;
753    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
754    dbgmsg( msgs, "entering `expire old requests' block" );
755
756    /* cancel requests that have been queued for too long */
757    oldestAllowed = now - QUEUED_REQUEST_TTL_SECS;
758    expireFromList( msgs, &msgs->clientWillAskFor, oldestAllowed );
759
760    /* if the peer doesn't support "Reject Request",
761     * cancel requests that were sent too long ago. */
762    if( !fext ) {
763        oldestAllowed = now - SENT_REQUEST_TTL_SECS;
764        expireFromList( msgs, &msgs->clientAskedFor, oldestAllowed );
765    }
766
767    dbgmsg( msgs, "leaving `expire old requests' block" );
768}
769
770static void
771pumpRequestQueue( tr_peermsgs * msgs, const time_t now )
772{
773    const int           max = msgs->maxActiveRequests;
774    int                 sent = 0;
775    int                 len = msgs->clientAskedFor.len;
776    struct peer_request req;
777
778    dbgmsg( msgs, "clientIsChoked %d, download allowed %d, len %d, max %d, msgs->clientWillAskFor.len %d",
779            (int)msgs->peer->clientIsChoked,
780            (int)tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ),
781            len, max, msgs->clientWillAskFor.len );
782
783    if( msgs->peer->clientIsChoked )
784        return;
785    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
786        return;
787
788    while( ( len < max ) && reqListPop( &msgs->clientWillAskFor, &req ) )
789    {
790        const tr_block_index_t block = _tr_block( msgs->torrent, req.index, req.offset );
791
792        assert( requestIsValid( msgs, &req ) );
793        assert( tr_bitfieldHas( msgs->peer->have, req.index ) );
794
795        /* don't ask for it if we've already got it... this block may have
796         * come in from a different peer after we cancelled a request for it */
797        if( !tr_cpBlockIsComplete( &msgs->torrent->completion, block ) )
798        {
799            protocolSendRequest( msgs, &req );
800            req.time_requested = now;
801            reqListAppend( &msgs->clientAskedFor, &req );
802
803            ++len;
804            ++sent;
805        }
806        else dbgmsg( msgs, "not asking for it because we've already got it..." );
807    }
808
809    if( sent )
810        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
811                sent, msgs->clientAskedFor.len, msgs->clientWillAskFor.len );
812}
813
814static TR_INLINE int
815requestQueueIsFull( const tr_peermsgs * msgs )
816{
817    const int req_max = msgs->maxActiveRequests;
818    return msgs->clientWillAskFor.len >= (size_t)req_max;
819}
820
821tr_addreq_t
822tr_peerMsgsAddRequest( tr_peermsgs *    msgs,
823                       uint32_t         index,
824                       uint32_t         offset,
825                       uint32_t         length )
826{
827    struct peer_request req;
828
829    assert( msgs );
830    assert( msgs->torrent );
831    assert( reqIsValid( msgs, index, offset, length ) );
832
833    /**
834    ***  Reasons to decline the request
835    **/
836
837    /* don't send requests to choked clients */
838    if( msgs->peer->clientIsChoked ) {
839        dbgmsg( msgs, "declining request because they're choking us" );
840        return TR_ADDREQ_CLIENT_CHOKED;
841    }
842
843    /* peer doesn't have this piece */
844    if( !tr_bitfieldHas( msgs->peer->have, index ) )
845        return TR_ADDREQ_MISSING;
846
847    /* peer's queue is full */
848    if( requestQueueIsFull( msgs ) ) {
849        dbgmsg( msgs, "declining request because we're full" );
850        return TR_ADDREQ_FULL;
851    }
852
853    /* have we already asked for this piece? */
854    req.index = index;
855    req.offset = offset;
856    req.length = length;
857    if( reqListHas( &msgs->clientAskedFor, &req ) ) {
858        dbgmsg( msgs, "declining because it's a duplicate" );
859        return TR_ADDREQ_DUPLICATE;
860    }
861    if( reqListHas( &msgs->clientWillAskFor, &req ) ) {
862        dbgmsg( msgs, "declining because it's a duplicate" );
863        return TR_ADDREQ_DUPLICATE;
864    }
865
866    /**
867    ***  Accept this request
868    **/
869
870    dbgmsg( msgs, "adding req for %"PRIu32":%"PRIu32"->%"PRIu32" to our `will request' list",
871            index, offset, length );
872    req.time_requested = time( NULL );
873    reqListAppend( &msgs->clientWillAskFor, &req );
874    return TR_ADDREQ_OK;
875}
876
877static void
878cancelAllRequestsToPeer( tr_peermsgs * msgs, tr_bool sendCancel )
879{
880    size_t i;
881    struct request_list a = msgs->clientWillAskFor;
882    struct request_list b = msgs->clientAskedFor;
883    dbgmsg( msgs, "cancelling all requests to peer" );
884
885    msgs->clientAskedFor = REQUEST_LIST_INIT;
886    msgs->clientWillAskFor = REQUEST_LIST_INIT;
887
888    for( i=0; i<a.len; ++i )
889        fireCancelledReq( msgs, &a.fifo[i] );
890
891    for( i = 0; i < b.len; ++i ) {
892        fireCancelledReq( msgs, &b.fifo[i] );
893        if( sendCancel )
894            protocolSendCancel( msgs, &b.fifo[i] );
895    }
896
897    reqListClear( &a );
898    reqListClear( &b );
899}
900
901void
902tr_peerMsgsCancel( tr_peermsgs * msgs,
903                   uint32_t      pieceIndex,
904                   uint32_t      offset,
905                   uint32_t      length )
906{
907    struct peer_request req;
908
909    assert( msgs != NULL );
910    assert( length > 0 );
911
912
913    /* have we asked the peer for this piece? */
914    req.index = pieceIndex;
915    req.offset = offset;
916    req.length = length;
917
918    /* if it's only in the queue and hasn't been sent yet, free it */
919    if( reqListRemove( &msgs->clientWillAskFor, &req ) ) {
920        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length );
921        fireCancelledReq( msgs, &req );
922    }
923
924    /* if it's already been sent, send a cancel message too */
925    if( reqListRemove( &msgs->clientAskedFor, &req ) ) {
926        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length );
927        protocolSendCancel( msgs, &req );
928        fireCancelledReq( msgs, &req );
929    }
930}
931
932
933/**
934***
935**/
936
937static void
938sendLtepHandshake( tr_peermsgs * msgs )
939{
940    tr_benc val, *m;
941    char * buf;
942    int len;
943    int pex;
944    struct evbuffer * out = msgs->outMessages;
945
946    if( msgs->clientSentLtepHandshake )
947        return;
948
949    dbgmsg( msgs, "sending an ltep handshake" );
950    msgs->clientSentLtepHandshake = 1;
951
952    /* decide if we want to advertise pex support */
953    if( !tr_torrentAllowsPex( msgs->torrent ) )
954        pex = 0;
955    else if( msgs->peerSentLtepHandshake )
956        pex = msgs->peerSupportsPex ? 1 : 0;
957    else
958        pex = 1;
959
960    tr_bencInitDict( &val, 5 );
961    tr_bencDictAddInt( &val, "e", msgs->session->encryptionMode != TR_CLEAR_PREFERRED );
962    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->session ) );
963    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
964    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
965    m  = tr_bencDictAddDict( &val, "m", 1 );
966    if( pex )
967        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
968    buf = tr_bencSave( &val, &len );
969
970    tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + len );
971    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
972    tr_peerIoWriteUint8 ( msgs->peer->io, out, LTEP_HANDSHAKE );
973    tr_peerIoWriteBytes ( msgs->peer->io, out, buf, len );
974    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
975    dbgOutMessageLen( msgs );
976
977    /* cleanup */
978    tr_bencFree( &val );
979    tr_free( buf );
980}
981
982static void
983parseLtepHandshake( tr_peermsgs *     msgs,
984                    int               len,
985                    struct evbuffer * inbuf )
986{
987    int64_t   i;
988    tr_benc   val, * sub;
989    uint8_t * tmp = tr_new( uint8_t, len );
990
991    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
992    msgs->peerSentLtepHandshake = 1;
993
994    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
995    {
996        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
997        tr_free( tmp );
998        return;
999    }
1000
1001    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
1002
1003    /* does the peer prefer encrypted connections? */
1004    if( tr_bencDictFindInt( &val, "e", &i ) )
1005        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1006                                              : ENCRYPTION_PREFERENCE_NO;
1007
1008    /* check supported messages for utorrent pex */
1009    msgs->peerSupportsPex = 0;
1010    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
1011        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
1012            msgs->ut_pex_id = (uint8_t) i;
1013            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
1014            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
1015        }
1016    }
1017
1018    /* look for upload_only (BEP 21) */
1019    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
1020        fireUploadOnly( msgs, i!=0 );
1021
1022    /* get peer's listening port */
1023    if( tr_bencDictFindInt( &val, "p", &i ) ) {
1024        msgs->peer->port = htons( (uint16_t)i );
1025        dbgmsg( msgs, "msgs->port is now %hu", msgs->peer->port );
1026    }
1027
1028    /* get peer's maximum request queue size */
1029    if( tr_bencDictFindInt( &val, "reqq", &i ) )
1030        msgs->reqq = i;
1031
1032    tr_bencFree( &val );
1033    tr_free( tmp );
1034}
1035
1036static void
1037parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1038{
1039    int loaded = 0;
1040    uint8_t * tmp = tr_new( uint8_t, msglen );
1041    tr_benc val;
1042    tr_torrent * tor = msgs->torrent;
1043    const uint8_t * added;
1044    size_t added_len;
1045
1046    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1047
1048    if( tr_torrentAllowsPex( tor )
1049      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1050    {
1051        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1052        {
1053            const uint8_t * added_f = NULL;
1054            tr_pex *        pex;
1055            size_t          i, n;
1056            size_t          added_f_len = 0;
1057            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1058            pex =
1059                tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len,
1060                                        &n );
1061            for( i = 0; i < n; ++i )
1062                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex + i );
1063            tr_free( pex );
1064        }
1065       
1066        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1067        {
1068            const uint8_t * added_f = NULL;
1069            tr_pex *        pex;
1070            size_t          i, n;
1071            size_t          added_f_len = 0;
1072            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1073            pex =
1074                tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len,
1075                                         &n );
1076            for( i = 0; i < n; ++i )
1077                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex + i );
1078            tr_free( pex );
1079        }
1080       
1081    }
1082
1083    if( loaded )
1084        tr_bencFree( &val );
1085    tr_free( tmp );
1086}
1087
1088static void sendPex( tr_peermsgs * msgs );
1089
1090static void
1091parseLtep( tr_peermsgs *     msgs,
1092           int               msglen,
1093           struct evbuffer * inbuf )
1094{
1095    uint8_t ltep_msgid;
1096
1097    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1098    msglen--;
1099
1100    if( ltep_msgid == LTEP_HANDSHAKE )
1101    {
1102        dbgmsg( msgs, "got ltep handshake" );
1103        parseLtepHandshake( msgs, msglen, inbuf );
1104        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1105        {
1106            sendLtepHandshake( msgs );
1107            sendPex( msgs );
1108        }
1109    }
1110    else if( ltep_msgid == TR_LTEP_PEX )
1111    {
1112        dbgmsg( msgs, "got ut pex" );
1113        msgs->peerSupportsPex = 1;
1114        parseUtPex( msgs, msglen, inbuf );
1115    }
1116    else
1117    {
1118        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1119        evbuffer_drain( inbuf, msglen );
1120    }
1121}
1122
1123static int
1124readBtLength( tr_peermsgs *     msgs,
1125              struct evbuffer * inbuf,
1126              size_t            inlen )
1127{
1128    uint32_t len;
1129
1130    if( inlen < sizeof( len ) )
1131        return READ_LATER;
1132
1133    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1134
1135    if( len == 0 ) /* peer sent us a keepalive message */
1136        dbgmsg( msgs, "got KeepAlive" );
1137    else
1138    {
1139        msgs->incoming.length = len;
1140        msgs->state = AWAITING_BT_ID;
1141    }
1142
1143    return READ_NOW;
1144}
1145
1146static int readBtMessage( tr_peermsgs *     msgs,
1147                          struct evbuffer * inbuf,
1148                          size_t            inlen );
1149
1150static int
1151readBtId( tr_peermsgs *     msgs,
1152          struct evbuffer * inbuf,
1153          size_t            inlen )
1154{
1155    uint8_t id;
1156
1157    if( inlen < sizeof( uint8_t ) )
1158        return READ_LATER;
1159
1160    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1161    msgs->incoming.id = id;
1162    dbgmsg( msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length );
1163
1164    if( id == BT_PIECE )
1165    {
1166        msgs->state = AWAITING_BT_PIECE;
1167        return READ_NOW;
1168    }
1169    else if( msgs->incoming.length != 1 )
1170    {
1171        msgs->state = AWAITING_BT_MESSAGE;
1172        return READ_NOW;
1173    }
1174    else return readBtMessage( msgs, inbuf, inlen - 1 );
1175}
1176
1177static void
1178updatePeerProgress( tr_peermsgs * msgs )
1179{
1180    msgs->peer->progress = tr_bitfieldCountTrueBits( msgs->peer->have ) / (float)msgs->torrent->info.pieceCount;
1181    dbgmsg( msgs, "peer progress is %f", msgs->peer->progress );
1182    updateFastSet( msgs );
1183    updateInterest( msgs );
1184    firePeerProgress( msgs );
1185}
1186
1187static void
1188peerMadeRequest( tr_peermsgs *               msgs,
1189                 const struct peer_request * req )
1190{
1191    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1192    const int reqIsValid = requestIsValid( msgs, req );
1193    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( &msgs->torrent->completion, req->index );
1194    const int peerIsChoked = msgs->peer->peerIsChoked;
1195
1196    int allow = FALSE;
1197
1198    if( !reqIsValid )
1199        dbgmsg( msgs, "rejecting an invalid request." );
1200    else if( !clientHasPiece )
1201        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1202    else if( peerIsChoked )
1203        dbgmsg( msgs, "rejecting request from choked peer" );
1204    else
1205        allow = TRUE;
1206
1207    if( allow )
1208        reqListAppend( &msgs->peerAskedFor, req );
1209    else if( fext )
1210        protocolSendReject( msgs, req );
1211}
1212
1213static int
1214messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1215{
1216    switch( id )
1217    {
1218        case BT_CHOKE:
1219        case BT_UNCHOKE:
1220        case BT_INTERESTED:
1221        case BT_NOT_INTERESTED:
1222        case BT_FEXT_HAVE_ALL:
1223        case BT_FEXT_HAVE_NONE:
1224            return len == 1;
1225
1226        case BT_HAVE:
1227        case BT_FEXT_SUGGEST:
1228        case BT_FEXT_ALLOWED_FAST:
1229            return len == 5;
1230
1231        case BT_BITFIELD:
1232            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1233
1234        case BT_REQUEST:
1235        case BT_CANCEL:
1236        case BT_FEXT_REJECT:
1237            return len == 13;
1238
1239        case BT_PIECE:
1240            return len > 9 && len <= 16393;
1241
1242        case BT_PORT:
1243            return len == 3;
1244
1245        case BT_LTEP:
1246            return len >= 2;
1247
1248        default:
1249            return FALSE;
1250    }
1251}
1252
1253static int clientGotBlock( tr_peermsgs *               msgs,
1254                           const uint8_t *             block,
1255                           const struct peer_request * req );
1256
1257static int
1258readBtPiece( tr_peermsgs      * msgs,
1259             struct evbuffer  * inbuf,
1260             size_t             inlen,
1261             size_t           * setme_piece_bytes_read )
1262{
1263    struct peer_request * req = &msgs->incoming.blockReq;
1264
1265    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1266    dbgmsg( msgs, "In readBtPiece" );
1267
1268    if( !req->length )
1269    {
1270        if( inlen < 8 )
1271            return READ_LATER;
1272
1273        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1274        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1275        req->length = msgs->incoming.length - 9;
1276        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1277        return READ_NOW;
1278    }
1279    else
1280    {
1281        int err;
1282
1283        /* read in another chunk of data */
1284        const size_t nLeft = req->length - EVBUFFER_LENGTH( msgs->incoming.block );
1285        size_t n = MIN( nLeft, inlen );
1286        size_t i = n;
1287
1288        while( i > 0 )
1289        {
1290            uint8_t buf[MAX_STACK_ARRAY_SIZE];
1291            const size_t thisPass = MIN( i, sizeof( buf ) );
1292            tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, thisPass );
1293            evbuffer_add( msgs->incoming.block, buf, thisPass );
1294            i -= thisPass;
1295        }
1296
1297        fireClientGotData( msgs, n, TRUE );
1298        *setme_piece_bytes_read += n;
1299        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1300               n, req->index, req->offset, req->length,
1301               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1302        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1303            return READ_LATER;
1304
1305        /* we've got the whole block ... process it */
1306        err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), req );
1307
1308        /* cleanup */
1309        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH( msgs->incoming.block ) );
1310        req->length = 0;
1311        msgs->state = AWAITING_BT_LENGTH;
1312        if( !err )
1313            return READ_NOW;
1314        else {
1315            fireError( msgs, err );
1316            return READ_ERR;
1317        }
1318    }
1319}
1320
1321static int
1322readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1323{
1324    uint32_t      ui32;
1325    uint32_t      msglen = msgs->incoming.length;
1326    const uint8_t id = msgs->incoming.id;
1327    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1328    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1329
1330    --msglen; /* id length */
1331
1332    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1333
1334    if( inlen < msglen )
1335        return READ_LATER;
1336
1337    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1338    {
1339        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1340        fireError( msgs, EMSGSIZE );
1341        return READ_ERR;
1342    }
1343
1344    switch( id )
1345    {
1346        case BT_CHOKE:
1347            dbgmsg( msgs, "got Choke" );
1348            msgs->peer->clientIsChoked = 1;
1349            if( !fext )
1350                cancelAllRequestsToPeer( msgs, FALSE );
1351            break;
1352
1353        case BT_UNCHOKE:
1354            dbgmsg( msgs, "got Unchoke" );
1355            msgs->peer->clientIsChoked = 0;
1356            break;
1357
1358        case BT_INTERESTED:
1359            dbgmsg( msgs, "got Interested" );
1360            msgs->peer->peerIsInterested = 1;
1361            break;
1362
1363        case BT_NOT_INTERESTED:
1364            dbgmsg( msgs, "got Not Interested" );
1365            msgs->peer->peerIsInterested = 0;
1366            break;
1367
1368        case BT_HAVE:
1369            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1370            dbgmsg( msgs, "got Have: %u", ui32 );
1371            if( tr_bitfieldAdd( msgs->peer->have, ui32 ) ) {
1372                fireError( msgs, ERANGE );
1373                return READ_ERR;
1374            }
1375            updatePeerProgress( msgs );
1376            tr_rcTransferred( &msgs->torrent->swarmSpeed,
1377                              msgs->torrent->info.pieceSize );
1378            break;
1379
1380        case BT_BITFIELD:
1381            dbgmsg( msgs, "got a bitfield" );
1382            tr_peerIoReadBytes( msgs->peer->io, inbuf, msgs->peer->have->bits, msglen );
1383            updatePeerProgress( msgs );
1384            break;
1385
1386        case BT_REQUEST:
1387        {
1388            struct peer_request r;
1389            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1390            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1391            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1392            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1393            peerMadeRequest( msgs, &r );
1394            break;
1395        }
1396
1397        case BT_CANCEL:
1398        {
1399            struct peer_request r;
1400            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1401            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1402            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1403            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1404            if( reqListRemove( &msgs->peerAskedFor, &r ) && fext )
1405                protocolSendReject( msgs, &r );
1406            break;
1407        }
1408
1409        case BT_PIECE:
1410            assert( 0 ); /* handled elsewhere! */
1411            break;
1412
1413        case BT_PORT:
1414            dbgmsg( msgs, "Got a BT_PORT" );
1415            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->port );
1416            break;
1417
1418        case BT_FEXT_SUGGEST:
1419            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1420            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1421            if( fext )
1422                fireClientGotSuggest( msgs, ui32 );
1423            else {
1424                fireError( msgs, EMSGSIZE );
1425                return READ_ERR;
1426            }
1427            break;
1428
1429        case BT_FEXT_ALLOWED_FAST:
1430            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1431            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1432            if( fext )
1433                fireClientGotAllowedFast( msgs, ui32 );
1434            else {
1435                fireError( msgs, EMSGSIZE );
1436                return READ_ERR;
1437            }
1438            break;
1439
1440        case BT_FEXT_HAVE_ALL:
1441            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1442            if( fext ) {
1443                tr_bitfieldAddRange( msgs->peer->have, 0, msgs->torrent->info.pieceCount );
1444                updatePeerProgress( msgs );
1445            } else {
1446                fireError( msgs, EMSGSIZE );
1447                return READ_ERR;
1448            }
1449            break;
1450
1451        case BT_FEXT_HAVE_NONE:
1452            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1453            if( fext ) {
1454                tr_bitfieldClear( msgs->peer->have );
1455                updatePeerProgress( msgs );
1456            } else {
1457                fireError( msgs, EMSGSIZE );
1458                return READ_ERR;
1459            }
1460            break;
1461
1462        case BT_FEXT_REJECT:
1463        {
1464            struct peer_request r;
1465            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1466            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1467            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1468            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1469            if( fext )
1470                reqListRemove( &msgs->clientAskedFor, &r );
1471            else {
1472                fireError( msgs, EMSGSIZE );
1473                return READ_ERR;
1474            }
1475            break;
1476        }
1477
1478        case BT_LTEP:
1479            dbgmsg( msgs, "Got a BT_LTEP" );
1480            parseLtep( msgs, msglen, inbuf );
1481            break;
1482
1483        default:
1484            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1485            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1486            break;
1487    }
1488
1489    assert( msglen + 1 == msgs->incoming.length );
1490    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1491
1492    msgs->state = AWAITING_BT_LENGTH;
1493    return READ_NOW;
1494}
1495
1496static TR_INLINE void
1497decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount )
1498{
1499    tr_torrent * tor = msgs->torrent;
1500
1501    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1502}
1503
1504static TR_INLINE void
1505clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1506{
1507    decrementDownloadedCount( msgs, req->length );
1508}
1509
1510static void
1511addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1512{
1513    if( !msgs->peer->blame )
1514         msgs->peer->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1515    tr_bitfieldAdd( msgs->peer->blame, index );
1516}
1517
1518/* returns 0 on success, or an errno on failure */
1519static int
1520clientGotBlock( tr_peermsgs *               msgs,
1521                const uint8_t *             data,
1522                const struct peer_request * req )
1523{
1524    int err;
1525    tr_torrent * tor = msgs->torrent;
1526    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1527
1528    assert( msgs );
1529    assert( req );
1530
1531    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1532        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1533                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1534        return EMSGSIZE;
1535    }
1536
1537    /* save the block */
1538    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1539
1540    /**
1541    *** Remove the block from our `we asked for this' list
1542    **/
1543
1544    if( !reqListRemove( &msgs->clientAskedFor, req ) ) {
1545        clientGotUnwantedBlock( msgs, req );
1546        dbgmsg( msgs, "we didn't ask for this message..." );
1547        return 0;
1548    }
1549
1550    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1551            msgs->clientAskedFor.len );
1552
1553    /**
1554    *** Error checks
1555    **/
1556
1557    if( tr_cpBlockIsComplete( &tor->completion, block ) ) {
1558        dbgmsg( msgs, "we have this block already..." );
1559        clientGotUnwantedBlock( msgs, req );
1560        return 0;
1561    }
1562
1563    /**
1564    ***  Save the block
1565    **/
1566
1567    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1568        return err;
1569
1570    addPeerToBlamefield( msgs, req->index );
1571    fireGotBlock( msgs, req );
1572    return 0;
1573}
1574
1575static int peerPulse( void * vmsgs );
1576
1577static void
1578didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1579{
1580    tr_peermsgs * msgs = vmsgs;
1581    firePeerGotData( msgs, bytesWritten, wasPieceData );
1582    peerPulse( msgs );
1583}
1584
1585static ReadState
1586canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1587{
1588    ReadState         ret;
1589    tr_peermsgs *     msgs = vmsgs;
1590    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1591    const size_t      inlen = EVBUFFER_LENGTH( in );
1592
1593    dbgmsg( msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state );
1594
1595    if( !inlen )
1596    {
1597        ret = READ_LATER;
1598    }
1599    else if( msgs->state == AWAITING_BT_PIECE )
1600    {
1601        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1602    }
1603    else switch( msgs->state )
1604    {
1605        case AWAITING_BT_LENGTH:
1606            ret = readBtLength ( msgs, in, inlen ); break;
1607
1608        case AWAITING_BT_ID:
1609            ret = readBtId     ( msgs, in, inlen ); break;
1610
1611        case AWAITING_BT_MESSAGE:
1612            ret = readBtMessage( msgs, in, inlen ); break;
1613
1614        default:
1615            ret = READ_ERR;
1616            assert( 0 );
1617    }
1618
1619    dbgmsg( msgs, "canRead: ret is %d", (int)ret );
1620
1621    /* log the raw data that was read */
1622    if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
1623        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1624
1625    return ret;
1626}
1627
1628/**
1629***
1630**/
1631
1632static int
1633ratePulse( tr_peermsgs * msgs, uint64_t now )
1634{
1635    const double rateToClient = tr_peerGetPieceSpeed( msgs->peer, now, TR_PEER_TO_CLIENT );
1636    const int seconds = 10;
1637    const int floor = 8;
1638    const int estimatedBlocksInPeriod = ( rateToClient * seconds * 1024 ) / msgs->torrent->blockSize;
1639
1640    msgs->maxActiveRequests = floor + estimatedBlocksInPeriod;
1641
1642    if( msgs->reqq > 0 )
1643        msgs->maxActiveRequests = MIN( msgs->maxActiveRequests, msgs->reqq );
1644
1645    return TRUE;
1646}
1647
1648static size_t
1649fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1650{
1651    size_t bytesWritten = 0;
1652    struct peer_request req;
1653    const tr_bool haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1654    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1655
1656    /**
1657    ***  Protocol messages
1658    **/
1659
1660    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1661    {
1662        dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1663        msgs->outMessagesBatchedAt = now;
1664    }
1665    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1666    {
1667        const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
1668        /* flush the protocol messages */
1669        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1670        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, FALSE );
1671        msgs->clientSentAnythingAt = now;
1672        msgs->outMessagesBatchedAt = 0;
1673        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1674        bytesWritten +=  len;
1675    }
1676
1677    /**
1678    ***  Blocks
1679    **/
1680
1681    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= msgs->torrent->blockSize )
1682        && popNextRequest( msgs, &req ) )
1683    {
1684        if( requestIsValid( msgs, &req )
1685            && tr_cpPieceIsComplete( &msgs->torrent->completion, req.index ) )
1686        {
1687            int err;
1688            static uint8_t buf[MAX_BLOCK_SIZE];
1689
1690            /* send a block */
1691            if(( err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf ))) {
1692                fireError( msgs, err );
1693                bytesWritten = 0;
1694                msgs = NULL;
1695            } else {
1696                tr_peerIo * io = msgs->peer->io;
1697                struct evbuffer * out = tr_getBuffer( );
1698                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1699                tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1700                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1701                tr_peerIoWriteUint32( io, out, req.index );
1702                tr_peerIoWriteUint32( io, out, req.offset );
1703                tr_peerIoWriteBytes ( io, out, buf, req.length );
1704                tr_peerIoWriteBuf( io, out, TRUE );
1705                bytesWritten += EVBUFFER_LENGTH( out );
1706                msgs->clientSentAnythingAt = now;
1707                tr_releaseBuffer( out );
1708            }
1709        }
1710        else if( fext ) /* peer needs a reject message */
1711        {
1712            protocolSendReject( msgs, &req );
1713        }
1714    }
1715
1716    /**
1717    ***  Keepalive
1718    **/
1719
1720    if( ( msgs != NULL )
1721        && ( msgs->clientSentAnythingAt != 0 )
1722        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1723    {
1724        dbgmsg( msgs, "sending a keepalive message" );
1725        tr_peerIoWriteUint32( msgs->peer->io, msgs->outMessages, 0 );
1726        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1727    }
1728
1729    return bytesWritten;
1730}
1731
1732static int
1733peerPulse( void * vmsgs )
1734{
1735    tr_peermsgs * msgs = vmsgs;
1736    const time_t  now = time( NULL );
1737
1738    ratePulse( msgs, now );
1739
1740    pumpRequestQueue( msgs, now );
1741    expireOldRequests( msgs, now );
1742
1743    for( ;; )
1744        if( fillOutputBuffer( msgs, now ) < 1 )
1745            break;
1746
1747    return TRUE; /* loop forever */
1748}
1749
1750void
1751tr_peerMsgsPulse( tr_peermsgs * msgs )
1752{
1753    if( msgs != NULL )
1754        peerPulse( msgs );
1755}
1756
1757static void
1758gotError( tr_peerIo  * io UNUSED,
1759          short        what,
1760          void       * vmsgs )
1761{
1762    if( what & EVBUFFER_TIMEOUT )
1763        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1764    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1765        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1766               what, errno, tr_strerror( errno ) );
1767    fireError( vmsgs, ENOTCONN );
1768}
1769
1770static void
1771sendBitfield( tr_peermsgs * msgs )
1772{
1773    struct evbuffer * out = msgs->outMessages;
1774    tr_bitfield *     field;
1775    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
1776    size_t            i;
1777    size_t            lazyCount = 0;
1778
1779    field = tr_bitfieldDup( tr_cpPieceBitfield( &msgs->torrent->completion ) );
1780
1781    if( tr_sessionIsLazyBitfieldEnabled( msgs->session ) )
1782    {
1783        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
1784            speed over a truly random sample -- let's limit the pool size to
1785            the first 1000 pieces so large torrents don't bog things down */
1786        size_t poolSize;
1787        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
1788        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
1789
1790        /* build the pool */
1791        for( i=poolSize=0; i<maxPoolSize; ++i )
1792            if( tr_bitfieldHas( field, i ) )
1793                pool[poolSize++] = i;
1794
1795        /* pull random piece indices from the pool */
1796        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
1797        {
1798            const int pos = tr_cryptoWeakRandInt( poolSize );
1799            const tr_piece_index_t piece = pool[pos];
1800            tr_bitfieldRem( field, piece );
1801            lazyPieces[lazyCount++] = piece;
1802            pool[pos] = pool[--poolSize];
1803        }
1804
1805        /* cleanup */
1806        tr_free( pool );
1807    }
1808
1809    tr_peerIoWriteUint32( msgs->peer->io, out,
1810                          sizeof( uint8_t ) + field->byteCount );
1811    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_BITFIELD );
1812    tr_peerIoWriteBytes ( msgs->peer->io, out, field->bits, field->byteCount );
1813    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu",
1814            EVBUFFER_LENGTH( out ) );
1815    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1816
1817    for( i = 0; i < lazyCount; ++i )
1818        protocolSendHave( msgs, lazyPieces[i] );
1819
1820    tr_bitfieldFree( field );
1821}
1822
1823static void
1824tellPeerWhatWeHave( tr_peermsgs * msgs )
1825{
1826    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1827
1828    if( fext && ( tr_cpGetStatus( &msgs->torrent->completion ) == TR_SEED ) )
1829    {
1830        protocolSendHaveAll( msgs );
1831    }
1832    else if( fext && ( tr_cpHaveValid( &msgs->torrent->completion ) == 0 ) )
1833    {
1834        protocolSendHaveNone( msgs );
1835    }
1836    else
1837    {
1838        sendBitfield( msgs );
1839    }
1840}
1841
1842/**
1843***
1844**/
1845
1846/* some peers give us error messages if we send
1847   more than this many peers in a single pex message
1848   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1849#define MAX_PEX_ADDED 50
1850#define MAX_PEX_DROPPED 50
1851
1852typedef struct
1853{
1854    tr_pex *  added;
1855    tr_pex *  dropped;
1856    tr_pex *  elements;
1857    int       addedCount;
1858    int       droppedCount;
1859    int       elementCount;
1860}
1861PexDiffs;
1862
1863static void
1864pexAddedCb( void * vpex,
1865            void * userData )
1866{
1867    PexDiffs * diffs = userData;
1868    tr_pex *   pex = vpex;
1869
1870    if( diffs->addedCount < MAX_PEX_ADDED )
1871    {
1872        diffs->added[diffs->addedCount++] = *pex;
1873        diffs->elements[diffs->elementCount++] = *pex;
1874    }
1875}
1876
1877static TR_INLINE void
1878pexDroppedCb( void * vpex,
1879              void * userData )
1880{
1881    PexDiffs * diffs = userData;
1882    tr_pex *   pex = vpex;
1883
1884    if( diffs->droppedCount < MAX_PEX_DROPPED )
1885    {
1886        diffs->dropped[diffs->droppedCount++] = *pex;
1887    }
1888}
1889
1890static TR_INLINE void
1891pexElementCb( void * vpex,
1892              void * userData )
1893{
1894    PexDiffs * diffs = userData;
1895    tr_pex * pex = vpex;
1896
1897    diffs->elements[diffs->elementCount++] = *pex;
1898}
1899
1900static void
1901sendPex( tr_peermsgs * msgs )
1902{
1903    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
1904    {
1905        PexDiffs diffs;
1906        PexDiffs diffs6;
1907        tr_pex * newPex = NULL;
1908        tr_pex * newPex6 = NULL;
1909        const int newCount = tr_peerMgrGetPeers( msgs->torrent, &newPex, TR_AF_INET );
1910        const int newCount6 = tr_peerMgrGetPeers( msgs->torrent, &newPex6, TR_AF_INET6 );
1911
1912        /* build the diffs */
1913        diffs.added = tr_new( tr_pex, newCount );
1914        diffs.addedCount = 0;
1915        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1916        diffs.droppedCount = 0;
1917        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1918        diffs.elementCount = 0;
1919        tr_set_compare( msgs->pex, msgs->pexCount,
1920                        newPex, newCount,
1921                        tr_pexCompare, sizeof( tr_pex ),
1922                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
1923        diffs6.added = tr_new( tr_pex, newCount6 );
1924        diffs6.addedCount = 0;
1925        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
1926        diffs6.droppedCount = 0;
1927        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
1928        diffs6.elementCount = 0;
1929        tr_set_compare( msgs->pex6, msgs->pexCount6,
1930                        newPex6, newCount6,
1931                        tr_pexCompare, sizeof( tr_pex ),
1932                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
1933        dbgmsg(
1934            msgs,
1935            "pex: old peer count %d, new peer count %d, added %d, removed %d",
1936            msgs->pexCount, newCount + newCount6,
1937            diffs.addedCount + diffs6.addedCount,
1938            diffs.droppedCount + diffs6.droppedCount );
1939
1940        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
1941            !diffs6.droppedCount )
1942        {
1943            tr_free( diffs.elements );
1944            tr_free( diffs6.elements );
1945        }
1946        else
1947        {
1948            int  i;
1949            tr_benc val;
1950            char * benc;
1951            int bencLen;
1952            uint8_t * tmp, *walk;
1953            tr_peerIo       * io  = msgs->peer->io;
1954            struct evbuffer * out = msgs->outMessages;
1955
1956            /* update peer */
1957            tr_free( msgs->pex );
1958            msgs->pex = diffs.elements;
1959            msgs->pexCount = diffs.elementCount;
1960            tr_free( msgs->pex6 );
1961            msgs->pex6 = diffs6.elements;
1962            msgs->pexCount6 = diffs6.elementCount;
1963
1964            /* build the pex payload */
1965            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
1966                                         * speed vs. likelihood? */
1967
1968            /* "added" */
1969            tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1970            for( i = 0; i < diffs.addedCount; ++i )
1971            {
1972                tr_suspectAddress( &diffs.added[i].addr, "pex" );
1973                memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
1974                memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1975            }
1976            assert( ( walk - tmp ) == diffs.addedCount * 6 );
1977            tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
1978            tr_free( tmp );
1979
1980            /* "added.f" */
1981            tmp = walk = tr_new( uint8_t, diffs.addedCount );
1982            for( i = 0; i < diffs.addedCount; ++i )
1983                *walk++ = diffs.added[i].flags;
1984            assert( ( walk - tmp ) == diffs.addedCount );
1985            tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
1986            tr_free( tmp );
1987
1988            /* "dropped" */
1989            tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1990            for( i = 0; i < diffs.droppedCount; ++i )
1991            {
1992                memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
1993                memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1994            }
1995            assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1996            tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
1997            tr_free( tmp );
1998           
1999            /* "added6" */
2000            tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2001            for( i = 0; i < diffs6.addedCount; ++i )
2002            {
2003                tr_suspectAddress( &diffs6.added[i].addr, "pex6" );
2004                memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2005                walk += 16;
2006                memcpy( walk, &diffs6.added[i].port, 2 );
2007                walk += 2;
2008            }
2009            assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2010            tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2011            tr_free( tmp );
2012           
2013            /* "added6.f" */
2014            tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2015            for( i = 0; i < diffs6.addedCount; ++i )
2016                *walk++ = diffs6.added[i].flags;
2017            assert( ( walk - tmp ) == diffs6.addedCount );
2018            tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2019            tr_free( tmp );
2020           
2021            /* "dropped6" */
2022            tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2023            for( i = 0; i < diffs6.droppedCount; ++i )
2024            {
2025                memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2026                walk += 16;
2027                memcpy( walk, &diffs6.dropped[i].port, 2 );
2028                walk += 2;
2029            }
2030            assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2031            tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2032            tr_free( tmp );
2033
2034            /* write the pex message */
2035            benc = tr_bencSave( &val, &bencLen );
2036            tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + bencLen );
2037            tr_peerIoWriteUint8 ( io, out, BT_LTEP );
2038            tr_peerIoWriteUint8 ( io, out, msgs->ut_pex_id );
2039            tr_peerIoWriteBytes ( io, out, benc, bencLen );
2040            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2041            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
2042            dbgOutMessageLen( msgs );
2043
2044            tr_free( benc );
2045            tr_bencFree( &val );
2046        }
2047
2048        /* cleanup */
2049        tr_free( diffs.added );
2050        tr_free( diffs.dropped );
2051        tr_free( newPex );
2052        tr_free( diffs6.added );
2053        tr_free( diffs6.dropped );
2054        tr_free( newPex6 );
2055
2056        msgs->clientSentPexAt = time( NULL );
2057    }
2058}
2059
2060static TR_INLINE int
2061pexPulse( void * vpeer )
2062{
2063    sendPex( vpeer );
2064    return TRUE;
2065}
2066
2067/**
2068***
2069**/
2070
2071tr_peermsgs*
2072tr_peerMsgsNew( struct tr_torrent * torrent,
2073                struct tr_peer    * peer,
2074                tr_delivery_func    func,
2075                void              * userData,
2076                tr_publisher_tag  * setme )
2077{
2078    tr_peermsgs * m;
2079
2080    assert( peer );
2081    assert( peer->io );
2082
2083    m = tr_new0( tr_peermsgs, 1 );
2084    m->publisher = TR_PUBLISHER_INIT;
2085    m->peer = peer;
2086    m->session = torrent->session;
2087    m->torrent = torrent;
2088    m->peer->clientIsChoked = 1;
2089    m->peer->peerIsChoked = 1;
2090    m->peer->clientIsInterested = 0;
2091    m->peer->peerIsInterested = 0;
2092    m->peer->have = tr_bitfieldNew( torrent->info.pieceCount );
2093    m->state = AWAITING_BT_LENGTH;
2094    m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL );
2095    m->outMessages = evbuffer_new( );
2096    m->outMessagesBatchedAt = 0;
2097    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2098    m->incoming.block = evbuffer_new( );
2099    m->peerAskedFor = REQUEST_LIST_INIT;
2100    m->clientAskedFor = REQUEST_LIST_INIT;
2101    m->clientWillAskFor = REQUEST_LIST_INIT;
2102    peer->msgs = m;
2103
2104    *setme = tr_publisherSubscribe( &m->publisher, func, userData );
2105
2106    if( tr_peerIoSupportsLTEP( peer->io ) )
2107        sendLtepHandshake( m );
2108
2109    tellPeerWhatWeHave( m );
2110
2111    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2112    ratePulse( m, tr_date() );
2113
2114    return m;
2115}
2116
2117void
2118tr_peerMsgsFree( tr_peermsgs* msgs )
2119{
2120    if( msgs )
2121    {
2122        tr_timerFree( &msgs->pexTimer );
2123        tr_publisherDestruct( &msgs->publisher );
2124        reqListClear( &msgs->clientWillAskFor );
2125        reqListClear( &msgs->clientAskedFor );
2126        reqListClear( &msgs->peerAskedFor );
2127
2128        evbuffer_free( msgs->incoming.block );
2129        evbuffer_free( msgs->outMessages );
2130        tr_free( msgs->pex6 );
2131        tr_free( msgs->pex );
2132
2133        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2134        tr_free( msgs );
2135    }
2136}
2137
2138void
2139tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2140                        tr_publisher_tag tag )
2141{
2142    tr_publisherUnsubscribe( &peer->publisher, tag );
2143}
2144
Note: See TracBrowser for help on using the repository browser.