source: trunk/libtransmission/peer-msgs.c @ 7588

Last change on this file since 7588 was 7588, checked in by charles, 12 years ago

(trunk libT) revert r7548, which broke very low speed download limits.. the simplified peer-msgs parsing didn't distinguish between piece & raw data until the piece was done downloading.

  • Property svn:keywords set to Date Rev Author Id
File size: 65.4 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 7588 2009-01-02 23:28:57Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <event.h>
21
22#include "transmission.h"
23#include "session.h"
24#include "bencode.h"
25#include "completion.h"
26#include "crypto.h"
27#include "inout.h"
28#ifdef WIN32
29#include "net.h" /* for ECONN */
30#endif
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-mgr-private.h"
34#include "peer-msgs.h"
35#include "platform.h" /* MAX_STACK_ARRAY_SIZE */
36#include "ratecontrol.h"
37#include "stats.h"
38#include "torrent.h"
39#include "trevent.h"
40#include "utils.h"
41
42/**
43***
44**/
45
46enum
47{
48    BT_CHOKE                = 0,
49    BT_UNCHOKE              = 1,
50    BT_INTERESTED           = 2,
51    BT_NOT_INTERESTED       = 3,
52    BT_HAVE                 = 4,
53    BT_BITFIELD             = 5,
54    BT_REQUEST              = 6,
55    BT_PIECE                = 7,
56    BT_CANCEL               = 8,
57    BT_PORT                 = 9,
58
59    BT_FEXT_SUGGEST         = 13,
60    BT_FEXT_HAVE_ALL        = 14,
61    BT_FEXT_HAVE_NONE       = 15,
62    BT_FEXT_REJECT          = 16,
63    BT_FEXT_ALLOWED_FAST    = 17,
64
65    BT_LTEP                 = 20,
66
67    LTEP_HANDSHAKE          = 0,
68
69    TR_LTEP_PEX             = 1,
70
71
72
73    MIN_CHOKE_PERIOD_SEC    = ( 10 ),
74
75    /* idle seconds before we send a keepalive */
76    KEEPALIVE_INTERVAL_SECS = 100,
77
78    PEX_INTERVAL            = ( 90 * 1000 ), /* msec between sendPex() calls */
79
80
81    MAX_BLOCK_SIZE          = ( 1024 * 16 ),
82
83
84    /* how long an unsent request can stay queued before it's returned
85       back to the peer-mgr's pool of requests */
86    QUEUED_REQUEST_TTL_SECS = 20,
87
88    /* how long a sent request can stay queued before it's returned
89       back to the peer-mgr's pool of requests */
90    SENT_REQUEST_TTL_SECS = 240,
91
92    /* used in lowering the outMessages queue period */
93    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
94    HIGH_PRIORITY_INTERVAL_SECS = 2,
95    LOW_PRIORITY_INTERVAL_SECS = 20,
96
97    /* number of pieces to remove from the bitfield when
98     * lazy bitfields are turned on */
99    LAZY_PIECE_COUNT = 26,
100
101    /* number of pieces we'll allow in our fast set */
102    MAX_FAST_SET_SIZE = 3
103};
104
105/**
106***  REQUEST MANAGEMENT
107**/
108
109enum
110{
111    AWAITING_BT_LENGTH,
112    AWAITING_BT_ID,
113    AWAITING_BT_MESSAGE,
114    AWAITING_BT_PIECE
115};
116
117struct peer_request
118{
119    uint32_t    index;
120    uint32_t    offset;
121    uint32_t    length;
122    time_t      time_requested;
123};
124
125static int
126compareRequest( const void * va, const void * vb )
127{
128    const struct peer_request * a = va;
129    const struct peer_request * b = vb;
130
131    if( a->index != b->index )
132        return a->index < b->index ? -1 : 1;
133
134    if( a->offset != b->offset )
135        return a->offset < b->offset ? -1 : 1;
136
137    if( a->length != b->length )
138        return a->length < b->length ? -1 : 1;
139
140    return 0;
141}
142
143struct request_list
144{
145    uint16_t               count;
146    uint16_t               max;
147    struct peer_request *  requests;
148};
149
150static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
151
152static void
153reqListReserve( struct request_list * list,
154                uint16_t              max )
155{
156    if( list->max < max )
157    {
158        list->max = max;
159        list->requests = tr_renew( struct peer_request,
160                                   list->requests,
161                                   list->max );
162    }
163}
164
165static void
166reqListClear( struct request_list * list )
167{
168    tr_free( list->requests );
169    *list = REQUEST_LIST_INIT;
170}
171
172static void
173reqListCopy( struct request_list * dest, const struct request_list * src )
174{
175    dest->count = dest->max = src->count;
176    dest->requests = tr_memdup( src->requests, dest->count * sizeof( struct peer_request ) );
177}
178
179static void
180reqListRemoveOne( struct request_list * list,
181                  int                   i )
182{
183    assert( 0 <= i && i < list->count );
184
185    memmove( &list->requests[i],
186             &list->requests[i + 1],
187             sizeof( struct peer_request ) * ( --list->count - i ) );
188}
189
190static void
191reqListAppend( struct request_list *       list,
192               const struct peer_request * req )
193{
194    if( ++list->count >= list->max )
195        reqListReserve( list, list->max + 8 );
196
197    list->requests[list->count - 1] = *req;
198}
199
200static int
201reqListPop( struct request_list * list,
202            struct peer_request * setme )
203{
204    int success;
205
206    if( !list->count )
207        success = FALSE;
208    else {
209        *setme = list->requests[0];
210        reqListRemoveOne( list, 0 );
211        success = TRUE;
212    }
213
214    return success;
215}
216
217static int
218reqListFind( struct request_list *       list,
219             const struct peer_request * key )
220{
221    uint16_t i;
222
223    for( i = 0; i < list->count; ++i )
224        if( !compareRequest( key, list->requests + i ) )
225            return i;
226
227    return -1;
228}
229
230static int
231reqListRemove( struct request_list *       list,
232               const struct peer_request * key )
233{
234    int success;
235    const int i = reqListFind( list, key );
236
237    if( i < 0 )
238        success = FALSE;
239    else {
240        reqListRemoveOne( list, i );
241        success = TRUE;
242    }
243
244    return success;
245}
246
247/**
248***
249**/
250
251/* this is raw, unchanged data from the peer regarding
252 * the current message that it's sending us. */
253struct tr_incoming
254{
255    uint8_t                id;
256    uint32_t               length; /* includes the +1 for id length */
257    struct peer_request    blockReq; /* metadata for incoming blocks */
258    struct evbuffer *      block; /* piece data for incoming blocks */
259};
260
261/**
262 * Low-level communication state information about a connected peer.
263 *
264 * This structure remembers the low-level protocol states that we're
265 * in with this peer, such as active requests, pex messages, and so on.
266 * Its fields are all private to peer-msgs.c.
267 *
268 * Data not directly involved with sending & receiving messages is
269 * stored in tr_peer, where it can be accessed by both peermsgs and
270 * the peer manager.
271 *
272 * @see struct peer_atom
273 * @see tr_peer
274 */
275struct tr_peermsgs
276{
277    tr_bool         peerSupportsPex;
278    tr_bool         clientSentLtepHandshake;
279    tr_bool         peerSentLtepHandshake;
280    tr_bool         haveFastSet;
281
282    uint8_t         state;
283    uint8_t         ut_pex_id;
284    uint16_t        pexCount;
285    uint16_t        pexCount6;
286    uint16_t        maxActiveRequests;
287
288    size_t                 fastsetSize;
289    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
290
291    /* how long the outMessages batch should be allowed to grow before
292     * it's flushed -- some messages (like requests >:) should be sent
293     * very quickly; others aren't as urgent. */
294    int                    outMessagesBatchPeriod;
295
296    tr_peer *              peer;
297
298    tr_session *           session;
299    tr_torrent *           torrent;
300
301    tr_publisher           publisher;
302
303    struct evbuffer *      outMessages; /* all the non-piece messages */
304
305    struct request_list    peerAskedFor;
306    struct request_list    clientAskedFor;
307    struct request_list    clientWillAskFor;
308
309    tr_timer             * pexTimer;
310    tr_pex               * pex;
311    tr_pex               * pex6;
312
313    time_t                 clientSentPexAt;
314    time_t                 clientSentAnythingAt;
315
316    /* when we started batching the outMessages */
317    time_t                outMessagesBatchedAt;
318
319    struct tr_incoming    incoming;
320
321    /* if the peer supports the Extension Protocol in BEP 10 and
322       supplied a reqq argument, it's stored here.  otherwise the
323       value is zero and should be ignored. */
324    int64_t               reqq;
325};
326
327/**
328***
329**/
330
331static void
332myDebug( const char * file, int line,
333         const struct tr_peermsgs * msgs,
334         const char * fmt, ... )
335{
336    FILE * fp = tr_getLog( );
337
338    if( fp )
339    {
340        va_list           args;
341        char              timestr[64];
342        struct evbuffer * buf = tr_getBuffer( );
343        char *            base = tr_basename( file );
344
345        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
346                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
347                             msgs->torrent->info.name,
348                             tr_peerIoGetAddrStr( msgs->peer->io ),
349                             msgs->peer->client );
350        va_start( args, fmt );
351        evbuffer_add_vprintf( buf, fmt, args );
352        va_end( args );
353        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
354        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
355
356        tr_free( base );
357        tr_releaseBuffer( buf );
358    }
359}
360
361#define dbgmsg( msgs, ... ) \
362    do { \
363        if( tr_deepLoggingIsActive( ) ) \
364            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
365    } while( 0 )
366
367/**
368***
369**/
370
371static void
372pokeBatchPeriod( tr_peermsgs * msgs,
373                 int           interval )
374{
375    if( msgs->outMessagesBatchPeriod > interval )
376    {
377        msgs->outMessagesBatchPeriod = interval;
378        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
379    }
380}
381
382static inline void
383dbgOutMessageLen( tr_peermsgs * msgs )
384{
385    dbgmsg( msgs, "outMessage size is now %zu", EVBUFFER_LENGTH( msgs->outMessages ) );
386}
387
388static void
389protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
390{
391    tr_peerIo       * io  = msgs->peer->io;
392    struct evbuffer * out = msgs->outMessages;
393
394    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
395
396    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
397    tr_peerIoWriteUint8 ( io, out, BT_FEXT_REJECT );
398    tr_peerIoWriteUint32( io, out, req->index );
399    tr_peerIoWriteUint32( io, out, req->offset );
400    tr_peerIoWriteUint32( io, out, req->length );
401
402    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
403    dbgOutMessageLen( msgs );
404}
405
406static void
407protocolSendRequest( tr_peermsgs               * msgs,
408                     const struct peer_request * req )
409{
410    tr_peerIo       * io  = msgs->peer->io;
411    struct evbuffer * out = msgs->outMessages;
412
413    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
414    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
415    tr_peerIoWriteUint32( io, out, req->index );
416    tr_peerIoWriteUint32( io, out, req->offset );
417    tr_peerIoWriteUint32( io, out, req->length );
418
419    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
420    dbgOutMessageLen( msgs );
421    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
422}
423
424static void
425protocolSendCancel( tr_peermsgs               * msgs,
426                    const struct peer_request * req )
427{
428    tr_peerIo       * io  = msgs->peer->io;
429    struct evbuffer * out = msgs->outMessages;
430
431    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
432    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
433    tr_peerIoWriteUint32( io, out, req->index );
434    tr_peerIoWriteUint32( io, out, req->offset );
435    tr_peerIoWriteUint32( io, out, req->length );
436
437    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
438    dbgOutMessageLen( msgs );
439    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
440}
441
442static void
443protocolSendHave( tr_peermsgs * msgs,
444                  uint32_t      index )
445{
446    tr_peerIo       * io  = msgs->peer->io;
447    struct evbuffer * out = msgs->outMessages;
448
449    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
450    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
451    tr_peerIoWriteUint32( io, out, index );
452
453    dbgmsg( msgs, "sending Have %u", index );
454    dbgOutMessageLen( msgs );
455    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
456}
457
458#if 0
459static void
460protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
461{
462    tr_peerIo       * io  = msgs->peer->io;
463    struct evbuffer * out = msgs->outMessages;
464
465    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
466
467    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
468    tr_peerIoWriteUint8 ( io, out, BT_FEXT_ALLOWED_FAST );
469    tr_peerIoWriteUint32( io, out, pieceIndex );
470
471    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
472    dbgOutMessageLen( msgs );
473}
474#endif
475
476static void
477protocolSendChoke( tr_peermsgs * msgs,
478                   int           choke )
479{
480    tr_peerIo       * io  = msgs->peer->io;
481    struct evbuffer * out = msgs->outMessages;
482
483    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
484    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
485
486    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
487    dbgOutMessageLen( msgs );
488    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
489}
490
491static void
492protocolSendHaveAll( tr_peermsgs * msgs )
493{
494    tr_peerIo       * io  = msgs->peer->io;
495    struct evbuffer * out = msgs->outMessages;
496
497    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
498
499    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
500    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_ALL );
501
502    dbgmsg( msgs, "sending HAVE_ALL..." );
503    dbgOutMessageLen( msgs );
504    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
505}
506
507static void
508protocolSendHaveNone( tr_peermsgs * msgs )
509{
510    tr_peerIo       * io  = msgs->peer->io;
511    struct evbuffer * out = msgs->outMessages;
512
513    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
514
515    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
516    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_NONE );
517
518    dbgmsg( msgs, "sending HAVE_NONE..." );
519    dbgOutMessageLen( msgs );
520    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
521}
522
523/**
524***  EVENTS
525**/
526
527static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0, 0 };
528
529static void
530publish( tr_peermsgs * msgs, tr_peer_event * e )
531{
532    assert( msgs->peer );
533    assert( msgs->peer->msgs == msgs );
534
535    tr_publisherPublish( &msgs->publisher, msgs->peer, e );
536}
537
538static void
539fireError( tr_peermsgs * msgs, int err )
540{
541    tr_peer_event e = blankEvent;
542    e.eventType = TR_PEER_ERROR;
543    e.err = err;
544    publish( msgs, &e );
545}
546
547static void
548fireUploadOnly( tr_peermsgs * msgs, tr_bool uploadOnly )
549{
550    tr_peer_event e = blankEvent;
551    e.eventType = TR_PEER_UPLOAD_ONLY;
552    e.uploadOnly = uploadOnly;
553    publish( msgs, &e );
554}
555
556static void
557fireNeedReq( tr_peermsgs * msgs )
558{
559    tr_peer_event e = blankEvent;
560    e.eventType = TR_PEER_NEED_REQ;
561    publish( msgs, &e );
562}
563
564static void
565firePeerProgress( tr_peermsgs * msgs )
566{
567    tr_peer_event e = blankEvent;
568    e.eventType = TR_PEER_PEER_PROGRESS;
569    e.progress = msgs->peer->progress;
570    publish( msgs, &e );
571}
572
573static void
574fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
575{
576    tr_peer_event e = blankEvent;
577    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
578    e.pieceIndex = req->index;
579    e.offset = req->offset;
580    e.length = req->length;
581    publish( msgs, &e );
582}
583
584static void
585fireClientGotData( tr_peermsgs * msgs,
586                   uint32_t      length,
587                   int           wasPieceData )
588{
589    tr_peer_event e = blankEvent;
590
591    e.length = length;
592    e.eventType = TR_PEER_CLIENT_GOT_DATA;
593    e.wasPieceData = wasPieceData;
594    publish( msgs, &e );
595}
596
597static void
598fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
599{
600    tr_peer_event e = blankEvent;
601    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
602    e.pieceIndex = pieceIndex;
603    publish( msgs, &e );
604}
605
606static void
607fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
608{
609    tr_peer_event e = blankEvent;
610    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
611    e.pieceIndex = pieceIndex;
612    publish( msgs, &e );
613}
614
615static void
616firePeerGotData( tr_peermsgs  * msgs,
617                 uint32_t       length,
618                 int            wasPieceData )
619{
620    tr_peer_event e = blankEvent;
621
622    e.length = length;
623    e.eventType = TR_PEER_PEER_GOT_DATA;
624    e.wasPieceData = wasPieceData;
625
626    publish( msgs, &e );
627}
628
629static void
630fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
631{
632    tr_peer_event e = blankEvent;
633    e.eventType = TR_PEER_CANCEL;
634    e.pieceIndex = req->index;
635    e.offset = req->offset;
636    e.length = req->length;
637    publish( msgs, &e );
638}
639
640/**
641***  ALLOWED FAST SET
642***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
643**/
644
645size_t
646tr_generateAllowedSet( tr_piece_index_t * setmePieces,
647                       size_t             desiredSetSize,
648                       size_t             pieceCount,
649                       const uint8_t    * infohash,
650                       const tr_address * addr )
651{
652    size_t setSize = 0;
653
654    assert( setmePieces );
655    assert( desiredSetSize <= pieceCount );
656    assert( desiredSetSize );
657    assert( pieceCount );
658    assert( infohash );
659    assert( addr );
660
661    if( addr->type == TR_AF_INET )
662    {
663        uint8_t w[SHA_DIGEST_LENGTH + 4];
664        uint8_t x[SHA_DIGEST_LENGTH];
665
666        *(uint32_t*)w = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
667        memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );                /* (2) */
668        tr_sha1( x, w, sizeof( w ), NULL );                          /* (3) */
669
670        while( setSize<desiredSetSize )
671        {
672            int i;
673            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
674            {
675                size_t k;
676                uint32_t j = i * 4;                                  /* (5) */
677                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
678                uint32_t index = y % pieceCount;                     /* (7) */
679
680                for( k=0; k<setSize; ++k )                           /* (8) */
681                    if( setmePieces[k] == index )
682                        break;
683
684                if( k == setSize )
685                    setmePieces[setSize++] = index;                  /* (9) */
686            }
687
688            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
689        }
690    }
691
692    return setSize;
693}
694
695static void
696updateFastSet( tr_peermsgs * msgs UNUSED )
697{
698#if 0
699    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
700    const int peerIsNeedy = msgs->peer->progress < 0.10;
701
702    if( fext && peerIsNeedy && !msgs->haveFastSet )
703    {
704        size_t i;
705        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
706        const tr_info * inf = &msgs->torrent->info;
707        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
708
709        /* build the fast set */
710        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
711        msgs->haveFastSet = 1;
712
713        /* send it to the peer */
714        for( i=0; i<msgs->fastsetSize; ++i )
715            protocolSendAllowedFast( msgs, msgs->fastset[i] );
716    }
717#endif
718}
719
720/**
721***  INTEREST
722**/
723
724static int
725isPieceInteresting( const tr_peermsgs * msgs,
726                    tr_piece_index_t    piece )
727{
728    const tr_torrent * torrent = msgs->torrent;
729
730    return ( !torrent->info.pieces[piece].dnd )                 /* we want it */
731          && ( !tr_cpPieceIsComplete( &torrent->completion, piece ) ) /* !have */
732          && ( tr_bitfieldHas( msgs->peer->have, piece ) );    /* peer has it */
733}
734
735/* "interested" means we'll ask for piece data if they unchoke us */
736static int
737isPeerInteresting( const tr_peermsgs * msgs )
738{
739    tr_piece_index_t    i;
740    const tr_torrent *  torrent;
741    const tr_bitfield * bitfield;
742    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
743
744    if( clientIsSeed )
745        return FALSE;
746
747    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
748        return FALSE;
749
750    torrent = msgs->torrent;
751    bitfield = tr_cpPieceBitfield( &torrent->completion );
752
753    if( !msgs->peer->have )
754        return TRUE;
755
756    assert( bitfield->byteCount == msgs->peer->have->byteCount );
757
758    for( i = 0; i < torrent->info.pieceCount; ++i )
759        if( isPieceInteresting( msgs, i ) )
760            return TRUE;
761
762    return FALSE;
763}
764
765static void
766sendInterest( tr_peermsgs * msgs,
767              int           weAreInterested )
768{
769    struct evbuffer * out = msgs->outMessages;
770
771    assert( msgs );
772    assert( weAreInterested == 0 || weAreInterested == 1 );
773
774    msgs->peer->clientIsInterested = weAreInterested;
775    dbgmsg( msgs, "Sending %s", weAreInterested ? "Interested" : "Not Interested" );
776    tr_peerIoWriteUint32( msgs->peer->io, out, sizeof( uint8_t ) );
777    tr_peerIoWriteUint8 ( msgs->peer->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
778
779    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
780    dbgOutMessageLen( msgs );
781}
782
783static void
784updateInterest( tr_peermsgs * msgs )
785{
786    const int i = isPeerInteresting( msgs );
787
788    if( i != msgs->peer->clientIsInterested )
789        sendInterest( msgs, i );
790    if( i )
791        fireNeedReq( msgs );
792}
793
794static int
795popNextRequest( tr_peermsgs *         msgs,
796                struct peer_request * setme )
797{
798    return reqListPop( &msgs->peerAskedFor, setme );
799}
800
801static void
802cancelAllRequestsToClient( tr_peermsgs * msgs )
803{
804    struct peer_request req;
805    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
806
807    while( popNextRequest( msgs, &req ) )
808        if( mustSendCancel )
809            protocolSendReject( msgs, &req );
810}
811
812void
813tr_peerMsgsSetChoke( tr_peermsgs * msgs,
814                     int           choke )
815{
816    const time_t now = time( NULL );
817    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
818
819    assert( msgs );
820    assert( msgs->peer );
821    assert( choke == 0 || choke == 1 );
822
823    if( msgs->peer->chokeChangedAt > fibrillationTime )
824    {
825        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
826    }
827    else if( msgs->peer->peerIsChoked != choke )
828    {
829        msgs->peer->peerIsChoked = choke;
830        if( choke )
831            cancelAllRequestsToClient( msgs );
832        protocolSendChoke( msgs, choke );
833        msgs->peer->chokeChangedAt = now;
834    }
835}
836
837/**
838***
839**/
840
841void
842tr_peerMsgsHave( tr_peermsgs * msgs,
843                 uint32_t      index )
844{
845    protocolSendHave( msgs, index );
846
847    /* since we have more pieces now, we might not be interested in this peer */
848    updateInterest( msgs );
849}
850
851/**
852***
853**/
854
855static int
856reqIsValid( const tr_peermsgs * peer,
857            uint32_t            index,
858            uint32_t            offset,
859            uint32_t            length )
860{
861    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
862}
863
864static int
865requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
866{
867    return reqIsValid( msgs, req->index, req->offset, req->length );
868}
869
870static void
871expireFromList( tr_peermsgs          * msgs,
872                struct request_list  * list,
873                const time_t           oldestAllowed )
874{
875    int i;
876
877    /* walk through the list, looking for the first req that's too old */
878    for( i=0; i<list->count; ++i ) {
879        const struct peer_request * req = &list->requests[i];
880        if( req->time_requested < oldestAllowed )
881            break;
882    }
883
884    /* if we found one too old, start pruning them */
885    if( i < list->count ) {
886        struct request_list tmp = REQUEST_LIST_INIT;
887        reqListCopy( &tmp, list );
888        for( ; i<tmp.count; ++i ) {
889            const struct peer_request * req = &tmp.requests[i];
890            if( req->time_requested < oldestAllowed )
891                tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
892        }
893        reqListClear( &tmp );
894    }
895}
896
897static void
898expireOldRequests( tr_peermsgs * msgs, const time_t now  )
899{
900    time_t oldestAllowed;
901    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
902    dbgmsg( msgs, "entering `expire old requests' block" );
903
904    /* cancel requests that have been queued for too long */
905    oldestAllowed = now - QUEUED_REQUEST_TTL_SECS;
906    expireFromList( msgs, &msgs->clientWillAskFor, oldestAllowed );
907
908    /* if the peer doesn't support "Reject Request",
909     * cancel requests that were sent too long ago. */
910    if( !fext ) {
911        oldestAllowed = now - SENT_REQUEST_TTL_SECS;
912        expireFromList( msgs, &msgs->clientAskedFor, oldestAllowed );
913    }
914
915    dbgmsg( msgs, "leaving `expire old requests' block" );
916}
917
918static void
919pumpRequestQueue( tr_peermsgs * msgs, const time_t now )
920{
921    const int           max = msgs->maxActiveRequests;
922    int                 sent = 0;
923    int                 count = msgs->clientAskedFor.count;
924    struct peer_request req;
925
926    if( msgs->peer->clientIsChoked )
927        return;
928    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
929        return;
930
931    while( ( count < max ) && reqListPop( &msgs->clientWillAskFor, &req ) )
932    {
933        const tr_block_index_t block = _tr_block( msgs->torrent, req.index, req.offset );
934
935        assert( requestIsValid( msgs, &req ) );
936        assert( tr_bitfieldHas( msgs->peer->have, req.index ) );
937
938        /* don't ask for it if we've already got it... this block may have
939         * come in from a different peer after we cancelled a request for it */
940        if( !tr_cpBlockIsComplete( &msgs->torrent->completion, block ) )
941        {
942            protocolSendRequest( msgs, &req );
943            req.time_requested = now;
944            reqListAppend( &msgs->clientAskedFor, &req );
945
946            ++count;
947            ++sent;
948        }
949    }
950
951    if( sent )
952        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
953                sent, msgs->clientAskedFor.count, msgs->clientWillAskFor.count );
954
955    if( count < max )
956        fireNeedReq( msgs );
957}
958
959static inline int
960requestQueueIsFull( const tr_peermsgs * msgs )
961{
962    const int req_max = msgs->maxActiveRequests;
963    return msgs->clientWillAskFor.count >= req_max;
964}
965
966tr_addreq_t
967tr_peerMsgsAddRequest( tr_peermsgs *    msgs,
968                       uint32_t         index,
969                       uint32_t         offset,
970                       uint32_t         length )
971{
972    struct peer_request req;
973
974    assert( msgs );
975    assert( msgs->torrent );
976    assert( reqIsValid( msgs, index, offset, length ) );
977
978    /**
979    ***  Reasons to decline the request
980    **/
981
982    /* don't send requests to choked clients */
983    if( msgs->peer->clientIsChoked ) {
984        dbgmsg( msgs, "declining request because they're choking us" );
985        return TR_ADDREQ_CLIENT_CHOKED;
986    }
987
988    /* peer doesn't have this piece */
989    if( !tr_bitfieldHas( msgs->peer->have, index ) )
990        return TR_ADDREQ_MISSING;
991
992    /* peer's queue is full */
993    if( requestQueueIsFull( msgs ) ) {
994        dbgmsg( msgs, "declining request because we're full" );
995        return TR_ADDREQ_FULL;
996    }
997
998    /* have we already asked for this piece? */
999    req.index = index;
1000    req.offset = offset;
1001    req.length = length;
1002    if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
1003        dbgmsg( msgs, "declining because it's a duplicate" );
1004        return TR_ADDREQ_DUPLICATE;
1005    }
1006    if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
1007        dbgmsg( msgs, "declining because it's a duplicate" );
1008        return TR_ADDREQ_DUPLICATE;
1009    }
1010
1011    /**
1012    ***  Accept this request
1013    **/
1014
1015    dbgmsg( msgs, "adding req for %"PRIu32":%"PRIu32"->%"PRIu32" to our `will request' list",
1016            index, offset, length );
1017    req.time_requested = time( NULL );
1018    reqListAppend( &msgs->clientWillAskFor, &req );
1019    return TR_ADDREQ_OK;
1020}
1021
1022static void
1023cancelAllRequestsToPeer( tr_peermsgs * msgs, tr_bool sendCancel )
1024{
1025    int i;
1026    struct request_list a = msgs->clientWillAskFor;
1027    struct request_list b = msgs->clientAskedFor;
1028    dbgmsg( msgs, "cancelling all requests to peer" );
1029
1030    msgs->clientAskedFor = REQUEST_LIST_INIT;
1031    msgs->clientWillAskFor = REQUEST_LIST_INIT;
1032
1033    for( i=0; i<a.count; ++i )
1034        fireCancelledReq( msgs, &a.requests[i] );
1035
1036    for( i = 0; i < b.count; ++i ) {
1037        fireCancelledReq( msgs, &b.requests[i] );
1038        if( sendCancel )
1039            protocolSendCancel( msgs, &b.requests[i] );
1040    }
1041
1042    reqListClear( &a );
1043    reqListClear( &b );
1044}
1045
1046void
1047tr_peerMsgsCancel( tr_peermsgs * msgs,
1048                   uint32_t      pieceIndex,
1049                   uint32_t      offset,
1050                   uint32_t      length )
1051{
1052    struct peer_request req;
1053
1054    assert( msgs != NULL );
1055    assert( length > 0 );
1056
1057
1058    /* have we asked the peer for this piece? */
1059    req.index = pieceIndex;
1060    req.offset = offset;
1061    req.length = length;
1062
1063    /* if it's only in the queue and hasn't been sent yet, free it */
1064    if( reqListRemove( &msgs->clientWillAskFor, &req ) ) {
1065        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length );
1066        fireCancelledReq( msgs, &req );
1067    }
1068
1069    /* if it's already been sent, send a cancel message too */
1070    if( reqListRemove( &msgs->clientAskedFor, &req ) ) {
1071        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length );
1072        protocolSendCancel( msgs, &req );
1073        fireCancelledReq( msgs, &req );
1074    }
1075}
1076
1077
1078/**
1079***
1080**/
1081
1082static void
1083sendLtepHandshake( tr_peermsgs * msgs )
1084{
1085    tr_benc val, *m;
1086    char * buf;
1087    int len;
1088    int pex;
1089    struct evbuffer * out = msgs->outMessages;
1090
1091    if( msgs->clientSentLtepHandshake )
1092        return;
1093
1094    dbgmsg( msgs, "sending an ltep handshake" );
1095    msgs->clientSentLtepHandshake = 1;
1096
1097    /* decide if we want to advertise pex support */
1098    if( !tr_torrentAllowsPex( msgs->torrent ) )
1099        pex = 0;
1100    else if( msgs->peerSentLtepHandshake )
1101        pex = msgs->peerSupportsPex ? 1 : 0;
1102    else
1103        pex = 1;
1104
1105    tr_bencInitDict( &val, 5 );
1106    tr_bencDictAddInt( &val, "e", msgs->session->encryptionMode != TR_CLEAR_PREFERRED );
1107    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->session ) );
1108    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
1109    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
1110    m  = tr_bencDictAddDict( &val, "m", 1 );
1111    if( pex )
1112        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
1113    buf = tr_bencSave( &val, &len );
1114
1115    tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + len );
1116    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
1117    tr_peerIoWriteUint8 ( msgs->peer->io, out, LTEP_HANDSHAKE );
1118    tr_peerIoWriteBytes ( msgs->peer->io, out, buf, len );
1119    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1120    dbgOutMessageLen( msgs );
1121
1122    /* cleanup */
1123    tr_bencFree( &val );
1124    tr_free( buf );
1125}
1126
1127static void
1128parseLtepHandshake( tr_peermsgs *     msgs,
1129                    int               len,
1130                    struct evbuffer * inbuf )
1131{
1132    int64_t   i;
1133    tr_benc   val, * sub;
1134    uint8_t * tmp = tr_new( uint8_t, len );
1135
1136    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
1137    msgs->peerSentLtepHandshake = 1;
1138
1139    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
1140    {
1141        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
1142        tr_free( tmp );
1143        return;
1144    }
1145
1146    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
1147
1148    /* does the peer prefer encrypted connections? */
1149    if( tr_bencDictFindInt( &val, "e", &i ) )
1150        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1151                                              : ENCRYPTION_PREFERENCE_NO;
1152
1153    /* check supported messages for utorrent pex */
1154    msgs->peerSupportsPex = 0;
1155    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
1156        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
1157            msgs->ut_pex_id = (uint8_t) i;
1158            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
1159            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
1160        }
1161    }
1162
1163    /* look for upload_only (BEP 21) */
1164    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
1165        fireUploadOnly( msgs, i!=0 );
1166
1167    /* get peer's listening port */
1168    if( tr_bencDictFindInt( &val, "p", &i ) ) {
1169        msgs->peer->port = htons( (uint16_t)i );
1170        dbgmsg( msgs, "msgs->port is now %hu", msgs->peer->port );
1171    }
1172
1173    /* get peer's maximum request queue size */
1174    if( tr_bencDictFindInt( &val, "reqq", &i ) )
1175        msgs->reqq = i;
1176
1177    tr_bencFree( &val );
1178    tr_free( tmp );
1179}
1180
1181static void
1182parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1183{
1184    int loaded = 0;
1185    uint8_t * tmp = tr_new( uint8_t, msglen );
1186    tr_benc val;
1187    const tr_torrent * tor = msgs->torrent;
1188    const uint8_t * added;
1189    size_t added_len;
1190
1191    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1192
1193    if( tr_torrentAllowsPex( tor )
1194      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1195    {
1196        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1197        {
1198            const uint8_t * added_f = NULL;
1199            tr_pex *        pex;
1200            size_t          i, n;
1201            size_t          added_f_len = 0;
1202            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1203            pex =
1204                tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len,
1205                                        &n );
1206            for( i = 0; i < n; ++i )
1207                tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1208                                  TR_PEER_FROM_PEX, pex + i );
1209            tr_free( pex );
1210        }
1211       
1212        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1213        {
1214            const uint8_t * added_f = NULL;
1215            tr_pex *        pex;
1216            size_t          i, n;
1217            size_t          added_f_len = 0;
1218            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1219            pex =
1220                tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len,
1221                                         &n );
1222            for( i = 0; i < n; ++i )
1223                tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1224                                  TR_PEER_FROM_PEX, pex + i );
1225            tr_free( pex );
1226        }
1227       
1228    }
1229
1230    if( loaded )
1231        tr_bencFree( &val );
1232    tr_free( tmp );
1233}
1234
1235static void sendPex( tr_peermsgs * msgs );
1236
1237static void
1238parseLtep( tr_peermsgs *     msgs,
1239           int               msglen,
1240           struct evbuffer * inbuf )
1241{
1242    uint8_t ltep_msgid;
1243
1244    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1245    msglen--;
1246
1247    if( ltep_msgid == LTEP_HANDSHAKE )
1248    {
1249        dbgmsg( msgs, "got ltep handshake" );
1250        parseLtepHandshake( msgs, msglen, inbuf );
1251        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1252        {
1253            sendLtepHandshake( msgs );
1254            sendPex( msgs );
1255        }
1256    }
1257    else if( ltep_msgid == TR_LTEP_PEX )
1258    {
1259        dbgmsg( msgs, "got ut pex" );
1260        msgs->peerSupportsPex = 1;
1261        parseUtPex( msgs, msglen, inbuf );
1262    }
1263    else
1264    {
1265        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1266        evbuffer_drain( inbuf, msglen );
1267    }
1268}
1269
1270static int
1271readBtLength( tr_peermsgs *     msgs,
1272              struct evbuffer * inbuf,
1273              size_t            inlen )
1274{
1275    uint32_t len;
1276
1277    if( inlen < sizeof( len ) )
1278        return READ_LATER;
1279
1280    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1281
1282    if( len == 0 ) /* peer sent us a keepalive message */
1283        dbgmsg( msgs, "got KeepAlive" );
1284    else
1285    {
1286        msgs->incoming.length = len;
1287        msgs->state = AWAITING_BT_ID;
1288    }
1289
1290    return READ_NOW;
1291}
1292
1293static int readBtMessage( tr_peermsgs *     msgs,
1294                          struct evbuffer * inbuf,
1295                          size_t            inlen );
1296
1297static int
1298readBtId( tr_peermsgs *     msgs,
1299          struct evbuffer * inbuf,
1300          size_t            inlen )
1301{
1302    uint8_t id;
1303
1304    if( inlen < sizeof( uint8_t ) )
1305        return READ_LATER;
1306
1307    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1308    msgs->incoming.id = id;
1309
1310    if( id == BT_PIECE )
1311    {
1312        msgs->state = AWAITING_BT_PIECE;
1313        return READ_NOW;
1314    }
1315    else if( msgs->incoming.length != 1 )
1316    {
1317        msgs->state = AWAITING_BT_MESSAGE;
1318        return READ_NOW;
1319    }
1320    else return readBtMessage( msgs, inbuf, inlen - 1 );
1321}
1322
1323static void
1324updatePeerProgress( tr_peermsgs * msgs )
1325{
1326    msgs->peer->progress = tr_bitfieldCountTrueBits( msgs->peer->have ) / (float)msgs->torrent->info.pieceCount;
1327    dbgmsg( msgs, "peer progress is %f", msgs->peer->progress );
1328    updateFastSet( msgs );
1329    updateInterest( msgs );
1330    firePeerProgress( msgs );
1331}
1332
1333static void
1334peerMadeRequest( tr_peermsgs *               msgs,
1335                 const struct peer_request * req )
1336{
1337    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1338    const int reqIsValid = requestIsValid( msgs, req );
1339    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( &msgs->torrent->completion, req->index );
1340    const int peerIsChoked = msgs->peer->peerIsChoked;
1341
1342    int allow = FALSE;
1343
1344    if( !reqIsValid )
1345        dbgmsg( msgs, "rejecting an invalid request." );
1346    else if( !clientHasPiece )
1347        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1348    else if( peerIsChoked )
1349        dbgmsg( msgs, "rejecting request from choked peer" );
1350    else
1351        allow = TRUE;
1352
1353    if( allow )
1354        reqListAppend( &msgs->peerAskedFor, req );
1355    else if( fext )
1356        protocolSendReject( msgs, req );
1357}
1358
1359static int
1360messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1361{
1362    switch( id )
1363    {
1364        case BT_CHOKE:
1365        case BT_UNCHOKE:
1366        case BT_INTERESTED:
1367        case BT_NOT_INTERESTED:
1368        case BT_FEXT_HAVE_ALL:
1369        case BT_FEXT_HAVE_NONE:
1370            return len == 1;
1371
1372        case BT_HAVE:
1373        case BT_FEXT_SUGGEST:
1374        case BT_FEXT_ALLOWED_FAST:
1375            return len == 5;
1376
1377        case BT_BITFIELD:
1378            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1379
1380        case BT_REQUEST:
1381        case BT_CANCEL:
1382        case BT_FEXT_REJECT:
1383            return len == 13;
1384
1385        case BT_PIECE:
1386            return len > 9 && len <= 16393;
1387
1388        case BT_PORT:
1389            return len == 3;
1390
1391        case BT_LTEP:
1392            return len >= 2;
1393
1394        default:
1395            return FALSE;
1396    }
1397}
1398
1399static int clientGotBlock( tr_peermsgs *               msgs,
1400                           const uint8_t *             block,
1401                           const struct peer_request * req );
1402
1403static int
1404readBtPiece( tr_peermsgs      * msgs,
1405             struct evbuffer  * inbuf,
1406             size_t             inlen,
1407             size_t           * setme_piece_bytes_read )
1408{
1409    struct peer_request * req = &msgs->incoming.blockReq;
1410
1411    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1412    dbgmsg( msgs, "In readBtPiece" );
1413
1414    if( !req->length )
1415    {
1416        if( inlen < 8 )
1417            return READ_LATER;
1418
1419        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1420        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1421        req->length = msgs->incoming.length - 9;
1422        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1423        return READ_NOW;
1424    }
1425    else
1426    {
1427        int err;
1428
1429        /* read in another chunk of data */
1430        const size_t nLeft = req->length - EVBUFFER_LENGTH( msgs->incoming.block );
1431        size_t n = MIN( nLeft, inlen );
1432        size_t i = n;
1433
1434        while( i > 0 )
1435        {
1436            uint8_t buf[MAX_STACK_ARRAY_SIZE];
1437            const size_t thisPass = MIN( i, sizeof( buf ) );
1438            tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, thisPass );
1439            evbuffer_add( msgs->incoming.block, buf, thisPass );
1440            i -= thisPass;
1441        }
1442
1443        fireClientGotData( msgs, n, TRUE );
1444        *setme_piece_bytes_read += n;
1445        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1446               n, req->index, req->offset, req->length,
1447               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1448        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1449            return READ_LATER;
1450
1451        /* we've got the whole block ... process it */
1452        err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), req );
1453
1454        /* cleanup */
1455        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH( msgs->incoming.block ) );
1456        req->length = 0;
1457        msgs->state = AWAITING_BT_LENGTH;
1458        if( !err )
1459            return READ_NOW;
1460        else {
1461            fireError( msgs, err );
1462            return READ_ERR;
1463        }
1464    }
1465}
1466
1467static int
1468readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1469{
1470    uint32_t      ui32;
1471    uint32_t      msglen = msgs->incoming.length;
1472    const uint8_t id = msgs->incoming.id;
1473    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1474    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1475
1476    --msglen; /* id length */
1477
1478    if( inlen < msglen )
1479        return READ_LATER;
1480
1481    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1482
1483    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1484    {
1485        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1486        fireError( msgs, EMSGSIZE );
1487        return READ_ERR;
1488    }
1489
1490    switch( id )
1491    {
1492        case BT_CHOKE:
1493            dbgmsg( msgs, "got Choke" );
1494            msgs->peer->clientIsChoked = 1;
1495            if( !fext )
1496                cancelAllRequestsToPeer( msgs, FALSE );
1497            break;
1498
1499        case BT_UNCHOKE:
1500            dbgmsg( msgs, "got Unchoke" );
1501            msgs->peer->clientIsChoked = 0;
1502            fireNeedReq( msgs );
1503            break;
1504
1505        case BT_INTERESTED:
1506            dbgmsg( msgs, "got Interested" );
1507            msgs->peer->peerIsInterested = 1;
1508            break;
1509
1510        case BT_NOT_INTERESTED:
1511            dbgmsg( msgs, "got Not Interested" );
1512            msgs->peer->peerIsInterested = 0;
1513            break;
1514
1515        case BT_HAVE:
1516            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1517            dbgmsg( msgs, "got Have: %u", ui32 );
1518            if( tr_bitfieldAdd( msgs->peer->have, ui32 ) ) {
1519                fireError( msgs, ERANGE );
1520                return READ_ERR;
1521            }
1522            updatePeerProgress( msgs );
1523            tr_rcTransferred( &msgs->torrent->swarmSpeed,
1524                              msgs->torrent->info.pieceSize );
1525            break;
1526
1527        case BT_BITFIELD:
1528        {
1529            dbgmsg( msgs, "got a bitfield" );
1530            tr_peerIoReadBytes( msgs->peer->io, inbuf, msgs->peer->have->bits, msglen );
1531            updatePeerProgress( msgs );
1532            fireNeedReq( msgs );
1533            break;
1534        }
1535
1536        case BT_REQUEST:
1537        {
1538            struct peer_request r;
1539            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1540            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1541            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1542            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1543            peerMadeRequest( msgs, &r );
1544            break;
1545        }
1546
1547        case BT_CANCEL:
1548        {
1549            struct peer_request r;
1550            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1551            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1552            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1553            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1554            if( reqListRemove( &msgs->peerAskedFor, &r ) && fext )
1555                protocolSendReject( msgs, &r );
1556            break;
1557        }
1558
1559        case BT_PIECE:
1560            assert( 0 ); /* handled elsewhere! */
1561            break;
1562
1563        case BT_PORT:
1564            dbgmsg( msgs, "Got a BT_PORT" );
1565            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->port );
1566            break;
1567
1568        case BT_FEXT_SUGGEST:
1569            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1570            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1571            if( fext )
1572                fireClientGotSuggest( msgs, ui32 );
1573            else {
1574                fireError( msgs, EMSGSIZE );
1575                return READ_ERR;
1576            }
1577            break;
1578
1579        case BT_FEXT_ALLOWED_FAST:
1580            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1581            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1582            if( fext )
1583                fireClientGotAllowedFast( msgs, ui32 );
1584            else {
1585                fireError( msgs, EMSGSIZE );
1586                return READ_ERR;
1587            }
1588            break;
1589
1590        case BT_FEXT_HAVE_ALL:
1591            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1592            if( fext ) {
1593                tr_bitfieldAddRange( msgs->peer->have, 0, msgs->torrent->info.pieceCount );
1594                updatePeerProgress( msgs );
1595            } else {
1596                fireError( msgs, EMSGSIZE );
1597                return READ_ERR;
1598            }
1599            break;
1600
1601        case BT_FEXT_HAVE_NONE:
1602            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1603            if( fext ) {
1604                tr_bitfieldClear( msgs->peer->have );
1605                updatePeerProgress( msgs );
1606            } else {
1607                fireError( msgs, EMSGSIZE );
1608                return READ_ERR;
1609            }
1610            break;
1611
1612        case BT_FEXT_REJECT:
1613        {
1614            struct peer_request r;
1615            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1616            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1617            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1618            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1619            if( fext )
1620                reqListRemove( &msgs->clientAskedFor, &r );
1621            else {
1622                fireError( msgs, EMSGSIZE );
1623                return READ_ERR;
1624            }
1625            break;
1626        }
1627
1628        case BT_LTEP:
1629            dbgmsg( msgs, "Got a BT_LTEP" );
1630            parseLtep( msgs, msglen, inbuf );
1631            break;
1632
1633        default:
1634            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1635            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1636            break;
1637    }
1638
1639    assert( msglen + 1 == msgs->incoming.length );
1640    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1641
1642    msgs->state = AWAITING_BT_LENGTH;
1643    return READ_NOW;
1644}
1645
1646static inline void
1647decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount )
1648{
1649    tr_torrent * tor = msgs->torrent;
1650
1651    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1652}
1653
1654static inline void
1655clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1656{
1657    decrementDownloadedCount( msgs, req->length );
1658}
1659
1660static void
1661addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1662{
1663    if( !msgs->peer->blame )
1664         msgs->peer->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1665    tr_bitfieldAdd( msgs->peer->blame, index );
1666}
1667
1668/* returns 0 on success, or an errno on failure */
1669static int
1670clientGotBlock( tr_peermsgs *               msgs,
1671                const uint8_t *             data,
1672                const struct peer_request * req )
1673{
1674    int err;
1675    tr_torrent * tor = msgs->torrent;
1676    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1677
1678    assert( msgs );
1679    assert( req );
1680
1681    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1682        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1683                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1684        return EMSGSIZE;
1685    }
1686
1687    /* save the block */
1688    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1689
1690    /**
1691    *** Remove the block from our `we asked for this' list
1692    **/
1693
1694    if( !reqListRemove( &msgs->clientAskedFor, req ) ) {
1695        clientGotUnwantedBlock( msgs, req );
1696        dbgmsg( msgs, "we didn't ask for this message..." );
1697        return 0;
1698    }
1699
1700    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1701            msgs->clientAskedFor.count );
1702
1703    /**
1704    *** Error checks
1705    **/
1706
1707    if( tr_cpBlockIsComplete( &tor->completion, block ) ) {
1708        dbgmsg( msgs, "we have this block already..." );
1709        clientGotUnwantedBlock( msgs, req );
1710        return 0;
1711    }
1712
1713    /**
1714    ***  Save the block
1715    **/
1716
1717    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1718        return err;
1719
1720    addPeerToBlamefield( msgs, req->index );
1721    fireGotBlock( msgs, req );
1722    return 0;
1723}
1724
1725static int peerPulse( void * vmsgs );
1726
1727static void
1728didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1729{
1730    tr_peermsgs * msgs = vmsgs;
1731    firePeerGotData( msgs, bytesWritten, wasPieceData );
1732    peerPulse( msgs );
1733}
1734
1735static ReadState
1736canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1737{
1738    ReadState         ret;
1739    tr_peermsgs *     msgs = vmsgs;
1740    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1741    const size_t      inlen = EVBUFFER_LENGTH( in );
1742
1743    if( !inlen )
1744    {
1745        ret = READ_LATER;
1746    }
1747    else if( msgs->state == AWAITING_BT_PIECE )
1748    {
1749        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1750    }
1751    else switch( msgs->state )
1752    {
1753        case AWAITING_BT_LENGTH:
1754            ret = readBtLength ( msgs, in, inlen ); break;
1755
1756        case AWAITING_BT_ID:
1757            ret = readBtId     ( msgs, in, inlen ); break;
1758
1759        case AWAITING_BT_MESSAGE:
1760            ret = readBtMessage( msgs, in, inlen ); break;
1761
1762        default:
1763            assert( 0 );
1764    }
1765
1766    /* log the raw data that was read */
1767    if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
1768        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1769
1770    return ret;
1771}
1772
1773/**
1774***
1775**/
1776
1777static int
1778ratePulse( void * vmsgs )
1779{
1780    tr_peermsgs * msgs = vmsgs;
1781    const double rateToClient = tr_peerGetPieceSpeed( msgs->peer, TR_PEER_TO_CLIENT );
1782    const int seconds = 10;
1783    const int floor = 8;
1784    const int estimatedBlocksInPeriod = ( rateToClient * seconds * 1024 ) / msgs->torrent->blockSize;
1785
1786    msgs->maxActiveRequests = floor + estimatedBlocksInPeriod;
1787
1788    if( msgs->reqq > 0 )
1789        msgs->maxActiveRequests = MIN( msgs->maxActiveRequests, msgs->reqq );
1790
1791    return TRUE;
1792}
1793
1794static size_t
1795fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1796{
1797    size_t bytesWritten = 0;
1798    struct peer_request req;
1799    const tr_bool haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1800    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1801
1802    /**
1803    ***  Protocol messages
1804    **/
1805
1806    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1807    {
1808        dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1809        msgs->outMessagesBatchedAt = now;
1810    }
1811    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1812    {
1813        const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
1814        /* flush the protocol messages */
1815        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1816        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, FALSE );
1817        msgs->clientSentAnythingAt = now;
1818        msgs->outMessagesBatchedAt = 0;
1819        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1820        bytesWritten +=  len;
1821    }
1822
1823    /**
1824    ***  Blocks
1825    **/
1826
1827    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io ) >= msgs->torrent->blockSize )
1828        && popNextRequest( msgs, &req ) )
1829    {
1830        if( requestIsValid( msgs, &req )
1831            && tr_cpPieceIsComplete( &msgs->torrent->completion, req.index ) )
1832        {
1833            int err;
1834            static uint8_t * buf = NULL;
1835
1836            if( buf == NULL )
1837                buf = tr_new( uint8_t, MAX_BLOCK_SIZE );
1838
1839            /* send a block */
1840            if(( err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf ))) {
1841                fireError( msgs, err );
1842                bytesWritten = 0;
1843                msgs = NULL;
1844            } else {
1845                tr_peerIo * io = msgs->peer->io;
1846                struct evbuffer * out = tr_getBuffer( );
1847                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1848                tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1849                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1850                tr_peerIoWriteUint32( io, out, req.index );
1851                tr_peerIoWriteUint32( io, out, req.offset );
1852                tr_peerIoWriteBytes ( io, out, buf, req.length );
1853                tr_peerIoWriteBuf( io, out, TRUE );
1854                bytesWritten += EVBUFFER_LENGTH( out );
1855                msgs->clientSentAnythingAt = now;
1856                tr_releaseBuffer( out );
1857            }
1858        }
1859        else if( fext ) /* peer needs a reject message */
1860        {
1861            protocolSendReject( msgs, &req );
1862        }
1863    }
1864
1865    /**
1866    ***  Keepalive
1867    **/
1868
1869    if( ( msgs != NULL )
1870        && ( msgs->clientSentAnythingAt != 0 )
1871        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1872    {
1873        dbgmsg( msgs, "sending a keepalive message" );
1874        tr_peerIoWriteUint32( msgs->peer->io, msgs->outMessages, 0 );
1875        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1876    }
1877
1878    return bytesWritten;
1879}
1880
1881static int
1882peerPulse( void * vmsgs )
1883{
1884    tr_peermsgs * msgs = vmsgs;
1885    const time_t  now = time( NULL );
1886
1887    ratePulse( msgs );
1888
1889    pumpRequestQueue( msgs, now );
1890    expireOldRequests( msgs, now );
1891
1892    for( ;; )
1893        if( fillOutputBuffer( msgs, now ) < 1 )
1894            break;
1895
1896    return TRUE; /* loop forever */
1897}
1898
1899void
1900tr_peerMsgsPulse( tr_peermsgs * msgs )
1901{
1902    if( msgs != NULL )
1903        peerPulse( msgs );
1904}
1905
1906static void
1907gotError( tr_peerIo  * io UNUSED,
1908          short        what,
1909          void       * vmsgs )
1910{
1911    if( what & EVBUFFER_TIMEOUT )
1912        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1913    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1914        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1915               what, errno, tr_strerror( errno ) );
1916    fireError( vmsgs, ENOTCONN );
1917}
1918
1919static void
1920sendBitfield( tr_peermsgs * msgs )
1921{
1922    struct evbuffer * out = msgs->outMessages;
1923    tr_bitfield *     field;
1924    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
1925    size_t            i;
1926    size_t            lazyCount = 0;
1927
1928    field = tr_bitfieldDup( tr_cpPieceBitfield( &msgs->torrent->completion ) );
1929
1930    if( tr_sessionIsLazyBitfieldEnabled( msgs->session ) )
1931    {
1932        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
1933            speed over a truly random sample -- let's limit the pool size to
1934            the first 1000 pieces so large torrents don't bog things down */
1935        size_t poolSize;
1936        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
1937        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
1938
1939        /* build the pool */
1940        for( i=poolSize=0; i<maxPoolSize; ++i )
1941            if( tr_bitfieldHas( field, i ) )
1942                pool[poolSize++] = i;
1943
1944        /* pull random piece indices from the pool */
1945        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
1946        {
1947            const int pos = tr_cryptoWeakRandInt( poolSize );
1948            const tr_piece_index_t piece = pool[pos];
1949            tr_bitfieldRem( field, piece );
1950            lazyPieces[lazyCount++] = piece;
1951            pool[pos] = pool[--poolSize];
1952        }
1953
1954        /* cleanup */
1955        tr_free( pool );
1956    }
1957
1958    tr_peerIoWriteUint32( msgs->peer->io, out,
1959                          sizeof( uint8_t ) + field->byteCount );
1960    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_BITFIELD );
1961    tr_peerIoWriteBytes ( msgs->peer->io, out, field->bits, field->byteCount );
1962    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu",
1963            EVBUFFER_LENGTH( out ) );
1964    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1965
1966    for( i = 0; i < lazyCount; ++i )
1967        protocolSendHave( msgs, lazyPieces[i] );
1968
1969    tr_bitfieldFree( field );
1970}
1971
1972static void
1973tellPeerWhatWeHave( tr_peermsgs * msgs )
1974{
1975    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1976
1977    if( fext && ( tr_cpGetStatus( &msgs->torrent->completion ) == TR_SEED ) )
1978    {
1979        protocolSendHaveAll( msgs );
1980    }
1981    else if( fext && ( tr_cpHaveValid( &msgs->torrent->completion ) == 0 ) )
1982    {
1983        protocolSendHaveNone( msgs );
1984    }
1985    else
1986    {
1987        sendBitfield( msgs );
1988    }
1989}
1990
1991/**
1992***
1993**/
1994
1995/* some peers give us error messages if we send
1996   more than this many peers in a single pex message
1997   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1998#define MAX_PEX_ADDED 50
1999#define MAX_PEX_DROPPED 50
2000
2001typedef struct
2002{
2003    tr_pex *  added;
2004    tr_pex *  dropped;
2005    tr_pex *  elements;
2006    int       addedCount;
2007    int       droppedCount;
2008    int       elementCount;
2009}
2010PexDiffs;
2011
2012static void
2013pexAddedCb( void * vpex,
2014            void * userData )
2015{
2016    PexDiffs * diffs = userData;
2017    tr_pex *   pex = vpex;
2018
2019    if( diffs->addedCount < MAX_PEX_ADDED )
2020    {
2021        diffs->added[diffs->addedCount++] = *pex;
2022        diffs->elements[diffs->elementCount++] = *pex;
2023    }
2024}
2025
2026static inline void
2027pexDroppedCb( void * vpex,
2028              void * userData )
2029{
2030    PexDiffs * diffs = userData;
2031    tr_pex *   pex = vpex;
2032
2033    if( diffs->droppedCount < MAX_PEX_DROPPED )
2034    {
2035        diffs->dropped[diffs->droppedCount++] = *pex;
2036    }
2037}
2038
2039static inline void
2040pexElementCb( void * vpex,
2041              void * userData )
2042{
2043    PexDiffs * diffs = userData;
2044    tr_pex * pex = vpex;
2045
2046    diffs->elements[diffs->elementCount++] = *pex;
2047}
2048
2049static void
2050sendPex( tr_peermsgs * msgs )
2051{
2052    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2053    {
2054        PexDiffs diffs;
2055        PexDiffs diffs6;
2056        tr_pex * newPex = NULL;
2057        tr_pex * newPex6 = NULL;
2058        const int newCount = tr_peerMgrGetPeers( msgs->session->peerMgr,
2059                                                 msgs->torrent->info.hash,
2060                                                 &newPex, TR_AF_INET );
2061        const int newCount6 = tr_peerMgrGetPeers( msgs->session->peerMgr,
2062                                                  msgs->torrent->info.hash,
2063                                                  &newPex6, TR_AF_INET6 );
2064
2065        /* build the diffs */
2066        diffs.added = tr_new( tr_pex, newCount );
2067        diffs.addedCount = 0;
2068        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2069        diffs.droppedCount = 0;
2070        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2071        diffs.elementCount = 0;
2072        tr_set_compare( msgs->pex, msgs->pexCount,
2073                        newPex, newCount,
2074                        tr_pexCompare, sizeof( tr_pex ),
2075                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2076        diffs6.added = tr_new( tr_pex, newCount6 );
2077        diffs6.addedCount = 0;
2078        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
2079        diffs6.droppedCount = 0;
2080        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
2081        diffs6.elementCount = 0;
2082        tr_set_compare( msgs->pex6, msgs->pexCount6,
2083                        newPex6, newCount6,
2084                        tr_pexCompare, sizeof( tr_pex ),
2085                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
2086        dbgmsg(
2087            msgs,
2088            "pex: old peer count %d, new peer count %d, added %d, removed %d",
2089            msgs->pexCount, newCount + newCount6,
2090            diffs.addedCount + diffs6.addedCount,
2091            diffs.droppedCount + diffs6.droppedCount );
2092
2093        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2094            !diffs6.droppedCount )
2095        {
2096            tr_free( diffs.elements );
2097            tr_free( diffs6.elements );
2098        }
2099        else
2100        {
2101            int  i;
2102            tr_benc val;
2103            char * benc;
2104            int bencLen;
2105            uint8_t * tmp, *walk;
2106            struct evbuffer * out = msgs->outMessages;
2107
2108            /* update peer */
2109            tr_free( msgs->pex );
2110            msgs->pex = diffs.elements;
2111            msgs->pexCount = diffs.elementCount;
2112            tr_free( msgs->pex6 );
2113            msgs->pex6 = diffs6.elements;
2114            msgs->pexCount6 = diffs6.elementCount;
2115
2116            /* build the pex payload */
2117            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
2118                                         * speed vs. likelihood? */
2119
2120            /* "added" */
2121            tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2122            for( i = 0; i < diffs.addedCount; ++i )
2123            {
2124                tr_suspectAddress( &diffs.added[i].addr, "pex" );
2125                memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2126                memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2127            }
2128            assert( ( walk - tmp ) == diffs.addedCount * 6 );
2129            tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2130            tr_free( tmp );
2131
2132            /* "added.f" */
2133            tmp = walk = tr_new( uint8_t, diffs.addedCount );
2134            for( i = 0; i < diffs.addedCount; ++i )
2135                *walk++ = diffs.added[i].flags;
2136            assert( ( walk - tmp ) == diffs.addedCount );
2137            tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2138            tr_free( tmp );
2139
2140            /* "dropped" */
2141            tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2142            for( i = 0; i < diffs.droppedCount; ++i )
2143            {
2144                memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2145                memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2146            }
2147            assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2148            tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2149            tr_free( tmp );
2150           
2151            /* "added6" */
2152            tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2153            for( i = 0; i < diffs6.addedCount; ++i )
2154            {
2155                tr_suspectAddress( &diffs6.added[i].addr, "pex6" );
2156                memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2157                walk += 16;
2158                memcpy( walk, &diffs6.added[i].port, 2 );
2159                walk += 2;
2160            }
2161            assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2162            tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2163            tr_free( tmp );
2164           
2165            /* "added6.f" */
2166            tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2167            for( i = 0; i < diffs6.addedCount; ++i )
2168                *walk++ = diffs6.added[i].flags;
2169            assert( ( walk - tmp ) == diffs6.addedCount );
2170            tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2171            tr_free( tmp );
2172           
2173            /* "dropped6" */
2174            tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2175            for( i = 0; i < diffs6.droppedCount; ++i )
2176            {
2177                memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2178                walk += 16;
2179                memcpy( walk, &diffs6.dropped[i].port, 2 );
2180                walk += 2;
2181            }
2182            assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2183            tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2184            tr_free( tmp );
2185
2186            /* write the pex message */
2187            benc = tr_bencSave( &val, &bencLen );
2188            tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + bencLen );
2189            tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
2190            tr_peerIoWriteUint8 ( msgs->peer->io, out, msgs->ut_pex_id );
2191            tr_peerIoWriteBytes ( msgs->peer->io, out, benc, bencLen );
2192            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2193            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
2194
2195            tr_free( benc );
2196            tr_bencFree( &val );
2197        }
2198
2199        /* cleanup */
2200        tr_free( diffs.added );
2201        tr_free( diffs.dropped );
2202        tr_free( newPex );
2203        tr_free( diffs6.added );
2204        tr_free( diffs6.dropped );
2205        tr_free( newPex6 );
2206
2207        msgs->clientSentPexAt = time( NULL );
2208    }
2209}
2210
2211static inline int
2212pexPulse( void * vpeer )
2213{
2214    sendPex( vpeer );
2215    return TRUE;
2216}
2217
2218/**
2219***
2220**/
2221
2222tr_peermsgs*
2223tr_peerMsgsNew( struct tr_torrent * torrent,
2224                struct tr_peer    * peer,
2225                tr_delivery_func    func,
2226                void              * userData,
2227                tr_publisher_tag  * setme )
2228{
2229    tr_peermsgs * m;
2230
2231    assert( peer );
2232    assert( peer->io );
2233
2234    m = tr_new0( tr_peermsgs, 1 );
2235    m->publisher = TR_PUBLISHER_INIT;
2236    m->peer = peer;
2237    m->session = torrent->session;
2238    m->torrent = torrent;
2239    m->peer->clientIsChoked = 1;
2240    m->peer->peerIsChoked = 1;
2241    m->peer->clientIsInterested = 0;
2242    m->peer->peerIsInterested = 0;
2243    m->peer->have = tr_bitfieldNew( torrent->info.pieceCount );
2244    m->state = AWAITING_BT_LENGTH;
2245    m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL );
2246    m->outMessages = evbuffer_new( );
2247    m->outMessagesBatchedAt = 0;
2248    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2249    m->incoming.block = evbuffer_new( );
2250    m->peerAskedFor = REQUEST_LIST_INIT;
2251    m->clientAskedFor = REQUEST_LIST_INIT;
2252    m->clientWillAskFor = REQUEST_LIST_INIT;
2253    peer->msgs = m;
2254
2255    *setme = tr_publisherSubscribe( &m->publisher, func, userData );
2256
2257    if( tr_peerIoSupportsLTEP( peer->io ) )
2258        sendLtepHandshake( m );
2259
2260    tellPeerWhatWeHave( m );
2261
2262    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2263    ratePulse( m );
2264
2265    return m;
2266}
2267
2268void
2269tr_peerMsgsFree( tr_peermsgs* msgs )
2270{
2271    if( msgs )
2272    {
2273        tr_timerFree( &msgs->pexTimer );
2274        tr_publisherDestruct( &msgs->publisher );
2275        reqListClear( &msgs->clientWillAskFor );
2276        reqListClear( &msgs->clientAskedFor );
2277        reqListClear( &msgs->peerAskedFor );
2278
2279        evbuffer_free( msgs->incoming.block );
2280        evbuffer_free( msgs->outMessages );
2281        tr_free( msgs->pex6 );
2282        tr_free( msgs->pex );
2283
2284        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2285        tr_free( msgs );
2286    }
2287}
2288
2289void
2290tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2291                        tr_publisher_tag tag )
2292{
2293    tr_publisherUnsubscribe( &peer->publisher, tag );
2294}
2295
Note: See TracBrowser for help on using the repository browser.