source: trunk/libtransmission/peer-msgs.c @ 7404

Last change on this file since 7404 was 7404, checked in by charles, 12 years ago

updated email address

  • Property svn:keywords set to Date Rev Author Id
File size: 65.2 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 7404 2008-12-16 00:20:44Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <event.h>
21
22#include "transmission.h"
23#include "bencode.h"
24#include "completion.h"
25#include "crypto.h"
26#include "inout.h"
27#include "iobuf.h"
28#ifdef WIN32
29#include "net.h" /* for ECONN */
30#endif
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-mgr-private.h"
34#include "peer-msgs.h"
35#include "ratecontrol.h"
36#include "stats.h"
37#include "torrent.h"
38#include "trevent.h"
39#include "utils.h"
40
41/**
42***
43**/
44
45enum
46{
47    BT_CHOKE                = 0,
48    BT_UNCHOKE              = 1,
49    BT_INTERESTED           = 2,
50    BT_NOT_INTERESTED       = 3,
51    BT_HAVE                 = 4,
52    BT_BITFIELD             = 5,
53    BT_REQUEST              = 6,
54    BT_PIECE                = 7,
55    BT_CANCEL               = 8,
56    BT_PORT                 = 9,
57
58    BT_FEXT_SUGGEST         = 13,
59    BT_FEXT_HAVE_ALL        = 14,
60    BT_FEXT_HAVE_NONE       = 15,
61    BT_FEXT_REJECT          = 16,
62    BT_FEXT_ALLOWED_FAST    = 17,
63
64    BT_LTEP                 = 20,
65
66    LTEP_HANDSHAKE          = 0,
67
68    TR_LTEP_PEX             = 1,
69
70
71
72    MIN_CHOKE_PERIOD_SEC    = ( 10 ),
73
74    /* idle seconds before we send a keepalive */
75    KEEPALIVE_INTERVAL_SECS = 100,
76
77    PEX_INTERVAL            = ( 90 * 1000 ), /* msec between sendPex() calls */
78    PEER_PULSE_INTERVAL     = ( 250 ),       /* msec between peerPulse() calls
79                                               */
80
81    MAX_QUEUE_SIZE          = ( 100 ),
82
83    /* how long an unsent request can stay queued before it's returned
84       back to the peer-mgr's pool of requests */
85    QUEUED_REQUEST_TTL_SECS = 20,
86
87    /* how long a sent request can stay queued before it's returned
88       back to the peer-mgr's pool of requests */
89    SENT_REQUEST_TTL_SECS = 240,
90
91    /* used in lowering the outMessages queue period */
92    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
93    HIGH_PRIORITY_INTERVAL_SECS = 2,
94    LOW_PRIORITY_INTERVAL_SECS = 20,
95
96    /* number of pieces to remove from the bitfield when
97     * lazy bitfields are turned on */
98    LAZY_PIECE_COUNT = 26,
99
100    /* number of pieces we'll allow in our fast set */
101    MAX_FAST_SET_SIZE = 3
102};
103
104/**
105***  REQUEST MANAGEMENT
106**/
107
108enum
109{
110    AWAITING_BT_LENGTH,
111    AWAITING_BT_ID,
112    AWAITING_BT_MESSAGE,
113    AWAITING_BT_PIECE
114};
115
116struct peer_request
117{
118    uint32_t    index;
119    uint32_t    offset;
120    uint32_t    length;
121    time_t      time_requested;
122};
123
124static int
125compareRequest( const void * va, const void * vb )
126{
127    const struct peer_request * a = va;
128    const struct peer_request * b = vb;
129
130    if( a->index != b->index )
131        return a->index < b->index ? -1 : 1;
132
133    if( a->offset != b->offset )
134        return a->offset < b->offset ? -1 : 1;
135
136    if( a->length != b->length )
137        return a->length < b->length ? -1 : 1;
138
139    return 0;
140}
141
142struct request_list
143{
144    uint16_t               count;
145    uint16_t               max;
146    struct peer_request *  requests;
147};
148
149static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
150
151static void
152reqListReserve( struct request_list * list,
153                uint16_t              max )
154{
155    if( list->max < max )
156    {
157        list->max = max;
158        list->requests = tr_renew( struct peer_request,
159                                   list->requests,
160                                   list->max );
161    }
162}
163
164static void
165reqListClear( struct request_list * list )
166{
167    tr_free( list->requests );
168    *list = REQUEST_LIST_INIT;
169}
170
171static void
172reqListCopy( struct request_list * dest, const struct request_list * src )
173{
174    dest->count = dest->max = src->count;
175    dest->requests = tr_memdup( src->requests, dest->count * sizeof( struct peer_request ) );
176}
177
178static void
179reqListRemoveOne( struct request_list * list,
180                  int                   i )
181{
182    assert( 0 <= i && i < list->count );
183
184    memmove( &list->requests[i],
185             &list->requests[i + 1],
186             sizeof( struct peer_request ) * ( --list->count - i ) );
187}
188
189static void
190reqListAppend( struct request_list *       list,
191               const struct peer_request * req )
192{
193    if( ++list->count >= list->max )
194        reqListReserve( list, list->max + 8 );
195
196    list->requests[list->count - 1] = *req;
197}
198
199static int
200reqListPop( struct request_list * list,
201            struct peer_request * setme )
202{
203    int success;
204
205    if( !list->count )
206        success = FALSE;
207    else {
208        *setme = list->requests[0];
209        reqListRemoveOne( list, 0 );
210        success = TRUE;
211    }
212
213    return success;
214}
215
216static int
217reqListFind( struct request_list *       list,
218             const struct peer_request * key )
219{
220    uint16_t i;
221
222    for( i = 0; i < list->count; ++i )
223        if( !compareRequest( key, list->requests + i ) )
224            return i;
225
226    return -1;
227}
228
229static int
230reqListRemove( struct request_list *       list,
231               const struct peer_request * key )
232{
233    int success;
234    const int i = reqListFind( list, key );
235
236    if( i < 0 )
237        success = FALSE;
238    else {
239        reqListRemoveOne( list, i );
240        success = TRUE;
241    }
242
243    return success;
244}
245
246/**
247***
248**/
249
250/* this is raw, unchanged data from the peer regarding
251 * the current message that it's sending us. */
252struct tr_incoming
253{
254    uint8_t                id;
255    uint32_t               length; /* includes the +1 for id length */
256    struct peer_request    blockReq; /* metadata for incoming blocks */
257    struct evbuffer *      block; /* piece data for incoming blocks */
258};
259
260/**
261 * Low-level communication state information about a connected peer.
262 *
263 * This structure remembers the low-level protocol states that we're
264 * in with this peer, such as active requests, pex messages, and so on.
265 * Its fields are all private to peer-msgs.c.
266 *
267 * Data not directly involved with sending & receiving messages is
268 * stored in tr_peer, where it can be accessed by both peermsgs and
269 * the peer manager.
270 *
271 * @see struct peer_atom
272 * @see tr_peer
273 */
274struct tr_peermsgs
275{
276    tr_bool         peerSentBitfield;
277    tr_bool         peerSupportsPex;
278    tr_bool         clientSentLtepHandshake;
279    tr_bool         peerSentLtepHandshake;
280    tr_bool         haveFastSet;
281
282    uint8_t         state;
283    uint8_t         ut_pex_id;
284    uint16_t        pexCount;
285    uint16_t        pexCount6;
286    uint16_t        minActiveRequests;
287    uint16_t        maxActiveRequests;
288
289    size_t                 fastsetSize;
290    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
291
292    /* how long the outMessages batch should be allowed to grow before
293     * it's flushed -- some messages (like requests >:) should be sent
294     * very quickly; others aren't as urgent. */
295    int                    outMessagesBatchPeriod;
296
297    tr_peer *              peer;
298
299    tr_session *           session;
300    tr_torrent *           torrent;
301
302    tr_publisher_t *       publisher;
303
304    struct evbuffer *      outMessages; /* all the non-piece messages */
305
306    struct request_list    peerAskedFor;
307    struct request_list    clientAskedFor;
308    struct request_list    clientWillAskFor;
309
310    tr_timer             * pexTimer;
311    tr_pex               * pex;
312    tr_pex               * pex6;
313
314    time_t                 clientSentPexAt;
315    time_t                 clientSentAnythingAt;
316
317    /* when we started batching the outMessages */
318    time_t                outMessagesBatchedAt;
319
320    struct tr_incoming    incoming;
321
322    /* if the peer supports the Extension Protocol in BEP 10 and
323       supplied a reqq argument, it's stored here.  otherwise the
324       value is zero and should be ignored. */
325    int64_t               reqq;
326};
327
328/**
329***
330**/
331
332static void
333myDebug( const char * file, int line,
334         const struct tr_peermsgs * msgs,
335         const char * fmt, ... )
336{
337    FILE * fp = tr_getLog( );
338
339    if( fp )
340    {
341        va_list           args;
342        char              timestr[64];
343        struct evbuffer * buf = evbuffer_new( );
344        char *            base = tr_basename( file );
345
346        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
347                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
348                             msgs->torrent->info.name,
349                             tr_peerIoGetAddrStr( msgs->peer->io ),
350                             msgs->peer->client );
351        va_start( args, fmt );
352        evbuffer_add_vprintf( buf, fmt, args );
353        va_end( args );
354        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
355        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
356
357        tr_free( base );
358        evbuffer_free( buf );
359    }
360}
361
362#define dbgmsg( msgs, ... ) \
363    do { \
364        if( tr_deepLoggingIsActive( ) ) \
365            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
366    } while( 0 )
367
368/**
369***
370**/
371
372static void
373pokeBatchPeriod( tr_peermsgs * msgs,
374                 int           interval )
375{
376    if( msgs->outMessagesBatchPeriod > interval )
377    {
378        msgs->outMessagesBatchPeriod = interval;
379        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
380    }
381}
382
383static void
384dbgOutMessageLen( tr_peermsgs * msgs )
385{
386    dbgmsg( msgs, "outMessage size is now %zu", EVBUFFER_LENGTH( msgs->outMessages ) );
387}
388
389static void
390protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
391{
392    tr_peerIo       * io  = msgs->peer->io;
393    struct evbuffer * out = msgs->outMessages;
394
395    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
396
397    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
398    tr_peerIoWriteUint8 ( io, out, BT_FEXT_REJECT );
399    tr_peerIoWriteUint32( io, out, req->index );
400    tr_peerIoWriteUint32( io, out, req->offset );
401    tr_peerIoWriteUint32( io, out, req->length );
402
403    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
404    dbgOutMessageLen( msgs );
405}
406
407static void
408protocolSendRequest( tr_peermsgs *               msgs,
409                     const struct peer_request * req )
410{
411    tr_peerIo       * io  = msgs->peer->io;
412    struct evbuffer * out = msgs->outMessages;
413
414    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
415    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
416    tr_peerIoWriteUint32( io, out, req->index );
417    tr_peerIoWriteUint32( io, out, req->offset );
418    tr_peerIoWriteUint32( io, out, req->length );
419
420    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
421    dbgOutMessageLen( msgs );
422    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
423}
424
425static void
426protocolSendCancel( tr_peermsgs *               msgs,
427                    const struct peer_request * req )
428{
429    tr_peerIo       * io  = msgs->peer->io;
430    struct evbuffer * out = msgs->outMessages;
431
432    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
433    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
434    tr_peerIoWriteUint32( io, out, req->index );
435    tr_peerIoWriteUint32( io, out, req->offset );
436    tr_peerIoWriteUint32( io, out, req->length );
437
438    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
439    dbgOutMessageLen( msgs );
440    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
441}
442
443static void
444protocolSendHave( tr_peermsgs * msgs,
445                  uint32_t      index )
446{
447    tr_peerIo       * io  = msgs->peer->io;
448    struct evbuffer * out = msgs->outMessages;
449
450    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
451    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
452    tr_peerIoWriteUint32( io, out, index );
453
454    dbgmsg( msgs, "sending Have %u...", index );
455    dbgOutMessageLen( msgs );
456    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
457}
458
459#if 0
460static void
461protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
462{
463    tr_peerIo       * io  = msgs->peer->io;
464    struct evbuffer * out = msgs->outMessages;
465
466    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
467
468    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
469    tr_peerIoWriteUint8 ( io, out, BT_FEXT_ALLOWED_FAST );
470    tr_peerIoWriteUint32( io, out, pieceIndex );
471
472    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
473    dbgOutMessageLen( msgs );
474}
475#endif
476
477static void
478protocolSendChoke( tr_peermsgs * msgs,
479                   int           choke )
480{
481    tr_peerIo       * io  = msgs->peer->io;
482    struct evbuffer * out = msgs->outMessages;
483
484    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
485    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
486
487    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
488    dbgOutMessageLen( msgs );
489    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
490}
491
492static void
493protocolSendHaveAll( tr_peermsgs * msgs )
494{
495    tr_peerIo       * io  = msgs->peer->io;
496    struct evbuffer * out = msgs->outMessages;
497
498    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
499
500    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
501    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_ALL );
502
503    dbgmsg( msgs, "sending HAVE_ALL..." );
504    dbgOutMessageLen( msgs );
505    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
506}
507
508static void
509protocolSendHaveNone( tr_peermsgs * msgs )
510{
511    tr_peerIo       * io  = msgs->peer->io;
512    struct evbuffer * out = msgs->outMessages;
513
514    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
515
516    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
517    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_NONE );
518
519    dbgmsg( msgs, "sending HAVE_NONE..." );
520    dbgOutMessageLen( msgs );
521    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
522}
523
524/**
525***  EVENTS
526**/
527
528static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0, 0 };
529
530static void
531publish( tr_peermsgs * msgs, tr_peer_event * e )
532{
533    assert( msgs->peer );
534    assert( msgs->peer->msgs == msgs );
535
536    tr_publisherPublish( msgs->publisher, msgs->peer, e );
537}
538
539static void
540fireError( tr_peermsgs * msgs, int err )
541{
542    tr_peer_event e = blankEvent;
543    e.eventType = TR_PEER_ERROR;
544    e.err = err;
545    publish( msgs, &e );
546}
547
548static void
549fireUploadOnly( tr_peermsgs * msgs, tr_bool uploadOnly )
550{
551    tr_peer_event e = blankEvent;
552    e.eventType = TR_PEER_UPLOAD_ONLY;
553    e.uploadOnly = uploadOnly;
554    publish( msgs, &e );
555}
556
557static void
558fireNeedReq( tr_peermsgs * msgs )
559{
560    tr_peer_event e = blankEvent;
561    e.eventType = TR_PEER_NEED_REQ;
562    publish( msgs, &e );
563}
564
565static void
566firePeerProgress( tr_peermsgs * msgs )
567{
568    tr_peer_event e = blankEvent;
569    e.eventType = TR_PEER_PEER_PROGRESS;
570    e.progress = msgs->peer->progress;
571    publish( msgs, &e );
572}
573
574static void
575fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
576{
577    tr_peer_event e = blankEvent;
578    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
579    e.pieceIndex = req->index;
580    e.offset = req->offset;
581    e.length = req->length;
582    publish( msgs, &e );
583}
584
585static void
586fireClientGotData( tr_peermsgs * msgs,
587                   uint32_t      length,
588                   int           wasPieceData )
589{
590    tr_peer_event e = blankEvent;
591
592    e.length = length;
593    e.eventType = TR_PEER_CLIENT_GOT_DATA;
594    e.wasPieceData = wasPieceData;
595    publish( msgs, &e );
596}
597
598static void
599fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
600{
601    tr_peer_event e = blankEvent;
602    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
603    e.pieceIndex = pieceIndex;
604    publish( msgs, &e );
605}
606
607static void
608fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
609{
610    tr_peer_event e = blankEvent;
611    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
612    e.pieceIndex = pieceIndex;
613    publish( msgs, &e );
614}
615
616static void
617firePeerGotData( tr_peermsgs  * msgs,
618                 uint32_t       length,
619                 int            wasPieceData )
620{
621    tr_peer_event e = blankEvent;
622
623    e.length = length;
624    e.eventType = TR_PEER_PEER_GOT_DATA;
625    e.wasPieceData = wasPieceData;
626
627    publish( msgs, &e );
628}
629
630static void
631fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
632{
633    tr_peer_event e = blankEvent;
634    e.eventType = TR_PEER_CANCEL;
635    e.pieceIndex = req->index;
636    e.offset = req->offset;
637    e.length = req->length;
638    publish( msgs, &e );
639}
640
641/**
642***  ALLOWED FAST SET
643***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
644**/
645
646size_t
647tr_generateAllowedSet( tr_piece_index_t * setmePieces,
648                       size_t             desiredSetSize,
649                       size_t             pieceCount,
650                       const uint8_t    * infohash,
651                       const tr_address * addr )
652{
653    size_t setSize = 0;
654
655    assert( setmePieces );
656    assert( desiredSetSize <= pieceCount );
657    assert( desiredSetSize );
658    assert( pieceCount );
659    assert( infohash );
660    assert( addr );
661
662    if( addr->type == TR_AF_INET )
663    {
664        uint8_t w[SHA_DIGEST_LENGTH + 4];
665        uint8_t x[SHA_DIGEST_LENGTH];
666
667        *(uint32_t*)w = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
668        memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );                /* (2) */
669        tr_sha1( x, w, sizeof( w ), NULL );                          /* (3) */
670
671        while( setSize<desiredSetSize )
672        {
673            int i;
674            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
675            {
676                size_t k;
677                uint32_t j = i * 4;                                  /* (5) */
678                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
679                uint32_t index = y % pieceCount;                     /* (7) */
680
681                for( k=0; k<setSize; ++k )                           /* (8) */
682                    if( setmePieces[k] == index )
683                        break;
684
685                if( k == setSize )
686                    setmePieces[setSize++] = index;                  /* (9) */
687            }
688
689            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
690        }
691    }
692
693    return setSize;
694}
695
696static void
697updateFastSet( tr_peermsgs * msgs UNUSED )
698{
699#if 0
700    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
701    const int peerIsNeedy = msgs->peer->progress < 0.10;
702
703    if( fext && peerIsNeedy && !msgs->haveFastSet )
704    {
705        size_t i;
706        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
707        const tr_info * inf = &msgs->torrent->info;
708        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
709
710        /* build the fast set */
711        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
712        msgs->haveFastSet = 1;
713
714        /* send it to the peer */
715        for( i=0; i<msgs->fastsetSize; ++i )
716            protocolSendAllowedFast( msgs, msgs->fastset[i] );
717    }
718#endif
719}
720
721/**
722***  INTEREST
723**/
724
725static int
726isPieceInteresting( const tr_peermsgs * msgs,
727                    tr_piece_index_t    piece )
728{
729    const tr_torrent * torrent = msgs->torrent;
730
731    return ( !torrent->info.pieces[piece].dnd )                 /* we want it */
732          && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have */
733          && ( tr_bitfieldHas( msgs->peer->have, piece ) );    /* peer has it */
734}
735
736/* "interested" means we'll ask for piece data if they unchoke us */
737static int
738isPeerInteresting( const tr_peermsgs * msgs )
739{
740    tr_piece_index_t    i;
741    const tr_torrent *  torrent;
742    const tr_bitfield * bitfield;
743    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
744
745    if( clientIsSeed )
746        return FALSE;
747
748    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
749        return FALSE;
750
751    torrent = msgs->torrent;
752    bitfield = tr_cpPieceBitfield( torrent->completion );
753
754    if( !msgs->peer->have )
755        return TRUE;
756
757    assert( bitfield->byteCount == msgs->peer->have->byteCount );
758
759    for( i = 0; i < torrent->info.pieceCount; ++i )
760        if( isPieceInteresting( msgs, i ) )
761            return TRUE;
762
763    return FALSE;
764}
765
766static void
767sendInterest( tr_peermsgs * msgs,
768              int           weAreInterested )
769{
770    struct evbuffer * out = msgs->outMessages;
771
772    assert( msgs );
773    assert( weAreInterested == 0 || weAreInterested == 1 );
774
775    msgs->peer->clientIsInterested = weAreInterested;
776    dbgmsg( msgs, "Sending %s", weAreInterested ? "Interested" : "Not Interested" );
777    tr_peerIoWriteUint32( msgs->peer->io, out, sizeof( uint8_t ) );
778    tr_peerIoWriteUint8 ( msgs->peer->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
779    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
780    dbgOutMessageLen( msgs );
781}
782
783static void
784updateInterest( tr_peermsgs * msgs )
785{
786    const int i = isPeerInteresting( msgs );
787
788    if( i != msgs->peer->clientIsInterested )
789        sendInterest( msgs, i );
790    if( i )
791        fireNeedReq( msgs );
792}
793
794static int
795popNextRequest( tr_peermsgs *         msgs,
796                struct peer_request * setme )
797{
798    return reqListPop( &msgs->peerAskedFor, setme );
799}
800
801static void
802cancelAllRequestsToClient( tr_peermsgs * msgs )
803{
804    struct peer_request req;
805    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
806
807    while( popNextRequest( msgs, &req ) )
808        if( mustSendCancel )
809            protocolSendReject( msgs, &req );
810}
811
812void
813tr_peerMsgsSetChoke( tr_peermsgs * msgs,
814                     int           choke )
815{
816    const time_t now = time( NULL );
817    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
818
819    assert( msgs );
820    assert( msgs->peer );
821    assert( choke == 0 || choke == 1 );
822
823    if( msgs->peer->chokeChangedAt > fibrillationTime )
824    {
825        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
826    }
827    else if( msgs->peer->peerIsChoked != choke )
828    {
829        msgs->peer->peerIsChoked = choke;
830        if( choke )
831            cancelAllRequestsToClient( msgs );
832        protocolSendChoke( msgs, choke );
833        msgs->peer->chokeChangedAt = now;
834    }
835}
836
837/**
838***
839**/
840
841void
842tr_peerMsgsHave( tr_peermsgs * msgs,
843                 uint32_t      index )
844{
845    protocolSendHave( msgs, index );
846
847    /* since we have more pieces now, we might not be interested in this peer */
848    updateInterest( msgs );
849}
850
851/**
852***
853**/
854
855static int
856reqIsValid( const tr_peermsgs * peer,
857            uint32_t            index,
858            uint32_t            offset,
859            uint32_t            length )
860{
861    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
862}
863
864static int
865requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
866{
867    return reqIsValid( msgs, req->index, req->offset, req->length );
868}
869
870static void
871expireOldRequests( tr_peermsgs * msgs, const time_t now  )
872{
873    int i;
874    time_t oldestAllowed;
875    struct request_list tmp = REQUEST_LIST_INIT;
876    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
877
878    /* cancel requests that have been queued for too long */
879    oldestAllowed = now - QUEUED_REQUEST_TTL_SECS;
880    reqListCopy( &tmp, &msgs->clientWillAskFor );
881    for( i = 0; i < tmp.count; ++i ) {
882        const struct peer_request * req = &tmp.requests[i];
883        if( req->time_requested < oldestAllowed )
884            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
885    }
886    reqListClear( &tmp );
887
888    /* if the peer doesn't support "Reject Request",
889     * cancel requests that were sent too long ago. */
890    if( !fext ) {
891        oldestAllowed = now - SENT_REQUEST_TTL_SECS;
892        reqListCopy( &tmp, &msgs->clientAskedFor );
893        for( i=0; i<tmp.count; ++i ) {
894            const struct peer_request * req = &tmp.requests[i];
895            if( req->time_requested < oldestAllowed )
896                tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
897        }
898        reqListClear( &tmp );
899    }
900}
901
902static void
903pumpRequestQueue( tr_peermsgs * msgs, const time_t now )
904{
905    const int           max = msgs->maxActiveRequests;
906    const int           min = msgs->minActiveRequests;
907    int                 sent = 0;
908    int                 count = msgs->clientAskedFor.count;
909    struct peer_request req;
910
911    if( count > min )
912        return;
913    if( msgs->peer->clientIsChoked )
914        return;
915    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
916        return;
917
918    while( ( count < max ) && reqListPop( &msgs->clientWillAskFor, &req ) )
919    {
920        const tr_block_index_t block = _tr_block( msgs->torrent, req.index, req.offset );
921
922        assert( requestIsValid( msgs, &req ) );
923        assert( tr_bitfieldHas( msgs->peer->have, req.index ) );
924
925        /* don't ask for it if we've already got it... this block may have
926         * come in from a different peer after we cancelled a request for it */
927        if( !tr_cpBlockIsComplete( msgs->torrent->completion, block ) )
928        {
929            protocolSendRequest( msgs, &req );
930            req.time_requested = now;
931            reqListAppend( &msgs->clientAskedFor, &req );
932
933            ++count;
934            ++sent;
935        }
936    }
937
938    if( sent )
939        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
940                sent,
941                msgs->clientAskedFor.count,
942                msgs->clientWillAskFor.count );
943
944    if( count < max )
945        fireNeedReq( msgs );
946}
947
948static int
949requestQueueIsFull( const tr_peermsgs * msgs )
950{
951    const int req_max = msgs->maxActiveRequests;
952    return msgs->clientWillAskFor.count >= req_max;
953}
954
955tr_addreq_t
956tr_peerMsgsAddRequest( tr_peermsgs *    msgs,
957                       uint32_t         index,
958                       uint32_t         offset,
959                       uint32_t         length,
960                       int              doForce )
961{
962    struct peer_request req;
963
964    assert( msgs );
965    assert( msgs->torrent );
966    assert( reqIsValid( msgs, index, offset, length ) );
967
968    /**
969    ***  Reasons to decline the request
970    **/
971
972    /* don't send requests to choked clients */
973    if( !doForce && msgs->peer->clientIsChoked ) {
974        dbgmsg( msgs, "declining request because they're choking us" );
975        return TR_ADDREQ_CLIENT_CHOKED;
976    }
977
978    /* peer doesn't have this piece */
979    if( !doForce && !tr_bitfieldHas( msgs->peer->have, index ) )
980        return TR_ADDREQ_MISSING;
981
982    /* peer's queue is full */
983    if( !doForce && requestQueueIsFull( msgs ) ) {
984        dbgmsg( msgs, "declining request because we're full" );
985        return TR_ADDREQ_FULL;
986    }
987
988    /* have we already asked for this piece? */
989    req.index = index;
990    req.offset = offset;
991    req.length = length;
992    if( !doForce ) {
993        if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
994            dbgmsg( msgs, "declining because it's a duplicate" );
995            return TR_ADDREQ_DUPLICATE;
996        }
997        if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
998            dbgmsg( msgs, "declining because it's a duplicate" );
999            return TR_ADDREQ_DUPLICATE;
1000        }
1001    }
1002
1003    /**
1004    ***  Accept this request
1005    **/
1006
1007    dbgmsg( msgs, "adding req for %"PRIu32":%"PRIu32"->%"PRIu32" to our `will request' list",
1008            index, offset, length );
1009    req.time_requested = time( NULL );
1010    reqListAppend( &msgs->clientWillAskFor, &req );
1011    return TR_ADDREQ_OK;
1012}
1013
1014static void
1015cancelAllRequestsToPeer( tr_peermsgs * msgs, tr_bool sendCancel )
1016{
1017    int i;
1018    struct request_list a = msgs->clientWillAskFor;
1019    struct request_list b = msgs->clientAskedFor;
1020    dbgmsg( msgs, "cancelling all requests to peer" );
1021
1022    msgs->clientAskedFor = REQUEST_LIST_INIT;
1023    msgs->clientWillAskFor = REQUEST_LIST_INIT;
1024
1025    for( i=0; i<a.count; ++i )
1026        fireCancelledReq( msgs, &a.requests[i] );
1027
1028    for( i = 0; i < b.count; ++i ) {
1029        fireCancelledReq( msgs, &b.requests[i] );
1030        if( sendCancel )
1031            protocolSendCancel( msgs, &b.requests[i] );
1032    }
1033
1034    reqListClear( &a );
1035    reqListClear( &b );
1036}
1037
1038void
1039tr_peerMsgsCancel( tr_peermsgs * msgs,
1040                   uint32_t      pieceIndex,
1041                   uint32_t      offset,
1042                   uint32_t      length )
1043{
1044    struct peer_request req;
1045
1046    assert( msgs != NULL );
1047    assert( length > 0 );
1048
1049
1050    /* have we asked the peer for this piece? */
1051    req.index = pieceIndex;
1052    req.offset = offset;
1053    req.length = length;
1054
1055    /* if it's only in the queue and hasn't been sent yet, free it */
1056    if( reqListRemove( &msgs->clientWillAskFor, &req ) ) {
1057        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32"\n", pieceIndex, offset, length );
1058        fireCancelledReq( msgs, &req );
1059    }
1060
1061    /* if it's already been sent, send a cancel message too */
1062    if( reqListRemove( &msgs->clientAskedFor, &req ) ) {
1063        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32"\n", pieceIndex, offset, length );
1064        protocolSendCancel( msgs, &req );
1065        fireCancelledReq( msgs, &req );
1066    }
1067}
1068
1069
1070/**
1071***
1072**/
1073
1074static void
1075sendLtepHandshake( tr_peermsgs * msgs )
1076{
1077    tr_benc val, *m;
1078    char * buf;
1079    int len;
1080    int pex;
1081    struct evbuffer * out = msgs->outMessages;
1082
1083    if( msgs->clientSentLtepHandshake )
1084        return;
1085
1086    dbgmsg( msgs, "sending an ltep handshake" );
1087    msgs->clientSentLtepHandshake = 1;
1088
1089    /* decide if we want to advertise pex support */
1090    if( !tr_torrentAllowsPex( msgs->torrent ) )
1091        pex = 0;
1092    else if( msgs->peerSentLtepHandshake )
1093        pex = msgs->peerSupportsPex ? 1 : 0;
1094    else
1095        pex = 1;
1096
1097    tr_bencInitDict( &val, 5 );
1098    tr_bencDictAddInt( &val, "e", msgs->session->encryptionMode != TR_CLEAR_PREFERRED );
1099    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->session ) );
1100    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
1101    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
1102    m  = tr_bencDictAddDict( &val, "m", 1 );
1103    if( pex )
1104        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
1105    buf = tr_bencSave( &val, &len );
1106
1107    tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + len );
1108    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
1109    tr_peerIoWriteUint8 ( msgs->peer->io, out, LTEP_HANDSHAKE );
1110    tr_peerIoWriteBytes ( msgs->peer->io, out, buf, len );
1111    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1112    dbgOutMessageLen( msgs );
1113
1114    /* cleanup */
1115    tr_bencFree( &val );
1116    tr_free( buf );
1117}
1118
1119static void
1120parseLtepHandshake( tr_peermsgs *     msgs,
1121                    int               len,
1122                    struct evbuffer * inbuf )
1123{
1124    int64_t   i;
1125    tr_benc   val, * sub;
1126    uint8_t * tmp = tr_new( uint8_t, len );
1127
1128    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
1129    msgs->peerSentLtepHandshake = 1;
1130
1131    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
1132    {
1133        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
1134        tr_free( tmp );
1135        return;
1136    }
1137
1138    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
1139
1140    /* does the peer prefer encrypted connections? */
1141    if( tr_bencDictFindInt( &val, "e", &i ) )
1142        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1143                                              : ENCRYPTION_PREFERENCE_NO;
1144
1145    /* check supported messages for utorrent pex */
1146    msgs->peerSupportsPex = 0;
1147    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
1148        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
1149            msgs->ut_pex_id = (uint8_t) i;
1150            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
1151            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
1152        }
1153    }
1154
1155    /* look for upload_only (BEP 21) */
1156    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
1157        fireUploadOnly( msgs, i!=0 );
1158
1159    /* get peer's listening port */
1160    if( tr_bencDictFindInt( &val, "p", &i ) ) {
1161        msgs->peer->port = htons( (uint16_t)i );
1162        dbgmsg( msgs, "msgs->port is now %hu", msgs->peer->port );
1163    }
1164
1165    /* get peer's maximum request queue size */
1166    if( tr_bencDictFindInt( &val, "reqq", &i ) )
1167        msgs->reqq = i;
1168
1169    tr_bencFree( &val );
1170    tr_free( tmp );
1171}
1172
1173static void
1174parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1175{
1176    int loaded = 0;
1177    uint8_t * tmp = tr_new( uint8_t, msglen );
1178    tr_benc val;
1179    const tr_torrent * tor = msgs->torrent;
1180    const uint8_t * added;
1181    size_t added_len;
1182
1183    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1184
1185    if( tr_torrentAllowsPex( tor )
1186      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1187    {
1188        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1189        {
1190            const uint8_t * added_f = NULL;
1191            tr_pex *        pex;
1192            size_t          i, n;
1193            size_t          added_f_len = 0;
1194            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1195            pex =
1196                tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len,
1197                                        &n );
1198            for( i = 0; i < n; ++i )
1199                tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1200                                  TR_PEER_FROM_PEX, pex + i );
1201            tr_free( pex );
1202        }
1203       
1204        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1205        {
1206            const uint8_t * added_f = NULL;
1207            tr_pex *        pex;
1208            size_t          i, n;
1209            size_t          added_f_len = 0;
1210            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1211            pex =
1212                tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len,
1213                                         &n );
1214            for( i = 0; i < n; ++i )
1215                tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1216                                  TR_PEER_FROM_PEX, pex + i );
1217            tr_free( pex );
1218        }
1219       
1220    }
1221
1222    if( loaded )
1223        tr_bencFree( &val );
1224    tr_free( tmp );
1225}
1226
1227static void sendPex( tr_peermsgs * msgs );
1228
1229static void
1230parseLtep( tr_peermsgs *     msgs,
1231           int               msglen,
1232           struct evbuffer * inbuf )
1233{
1234    uint8_t ltep_msgid;
1235
1236    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1237    msglen--;
1238
1239    if( ltep_msgid == LTEP_HANDSHAKE )
1240    {
1241        dbgmsg( msgs, "got ltep handshake" );
1242        parseLtepHandshake( msgs, msglen, inbuf );
1243        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1244        {
1245            sendLtepHandshake( msgs );
1246            sendPex( msgs );
1247        }
1248    }
1249    else if( ltep_msgid == TR_LTEP_PEX )
1250    {
1251        dbgmsg( msgs, "got ut pex" );
1252        msgs->peerSupportsPex = 1;
1253        parseUtPex( msgs, msglen, inbuf );
1254    }
1255    else
1256    {
1257        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1258        evbuffer_drain( inbuf, msglen );
1259    }
1260}
1261
1262static int
1263readBtLength( tr_peermsgs *     msgs,
1264              struct evbuffer * inbuf,
1265              size_t            inlen )
1266{
1267    uint32_t len;
1268
1269    if( inlen < sizeof( len ) )
1270        return READ_LATER;
1271
1272    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1273
1274    if( len == 0 ) /* peer sent us a keepalive message */
1275        dbgmsg( msgs, "got KeepAlive" );
1276    else
1277    {
1278        msgs->incoming.length = len;
1279        msgs->state = AWAITING_BT_ID;
1280    }
1281
1282    return READ_NOW;
1283}
1284
1285static int readBtMessage( tr_peermsgs *     msgs,
1286                          struct evbuffer * inbuf,
1287                          size_t            inlen );
1288
1289static int
1290readBtId( tr_peermsgs *     msgs,
1291          struct evbuffer * inbuf,
1292          size_t            inlen )
1293{
1294    uint8_t id;
1295
1296    if( inlen < sizeof( uint8_t ) )
1297        return READ_LATER;
1298
1299    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1300    msgs->incoming.id = id;
1301
1302    if( id == BT_PIECE )
1303    {
1304        msgs->state = AWAITING_BT_PIECE;
1305        return READ_NOW;
1306    }
1307    else if( msgs->incoming.length != 1 )
1308    {
1309        msgs->state = AWAITING_BT_MESSAGE;
1310        return READ_NOW;
1311    }
1312    else return readBtMessage( msgs, inbuf, inlen - 1 );
1313}
1314
1315static void
1316updatePeerProgress( tr_peermsgs * msgs )
1317{
1318    msgs->peer->progress = tr_bitfieldCountTrueBits( msgs->peer->have ) / (float)msgs->torrent->info.pieceCount;
1319    dbgmsg( msgs, "peer progress is %f", msgs->peer->progress );
1320    updateFastSet( msgs );
1321    updateInterest( msgs );
1322    firePeerProgress( msgs );
1323}
1324
1325static void
1326peerMadeRequest( tr_peermsgs *               msgs,
1327                 const struct peer_request * req )
1328{
1329    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1330    const int reqIsValid = requestIsValid( msgs, req );
1331    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
1332    const int peerIsChoked = msgs->peer->peerIsChoked;
1333
1334    int allow = FALSE;
1335
1336    if( !reqIsValid )
1337        dbgmsg( msgs, "rejecting an invalid request." );
1338    else if( !clientHasPiece )
1339        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1340    else if( peerIsChoked )
1341        dbgmsg( msgs, "rejecting request from choked peer" );
1342    else
1343        allow = TRUE;
1344
1345    if( allow )
1346        reqListAppend( &msgs->peerAskedFor, req );
1347    else if( fext )
1348        protocolSendReject( msgs, req );
1349}
1350
1351static int
1352messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1353{
1354    switch( id )
1355    {
1356        case BT_CHOKE:
1357        case BT_UNCHOKE:
1358        case BT_INTERESTED:
1359        case BT_NOT_INTERESTED:
1360        case BT_FEXT_HAVE_ALL:
1361        case BT_FEXT_HAVE_NONE:
1362            return len == 1;
1363
1364        case BT_HAVE:
1365        case BT_FEXT_SUGGEST:
1366        case BT_FEXT_ALLOWED_FAST:
1367            return len == 5;
1368
1369        case BT_BITFIELD:
1370            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1371
1372        case BT_REQUEST:
1373        case BT_CANCEL:
1374        case BT_FEXT_REJECT:
1375            return len == 13;
1376
1377        case BT_PIECE:
1378            return len > 9 && len <= 16393;
1379
1380        case BT_PORT:
1381            return len == 3;
1382
1383        case BT_LTEP:
1384            return len >= 2;
1385
1386        default:
1387            return FALSE;
1388    }
1389}
1390
1391static int clientGotBlock( tr_peermsgs *               msgs,
1392                           const uint8_t *             block,
1393                           const struct peer_request * req );
1394
1395static int
1396readBtPiece( tr_peermsgs      * msgs,
1397             struct evbuffer  * inbuf,
1398             size_t             inlen,
1399             size_t           * setme_piece_bytes_read )
1400{
1401    struct peer_request * req = &msgs->incoming.blockReq;
1402
1403    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1404    dbgmsg( msgs, "In readBtPiece" );
1405
1406    if( !req->length )
1407    {
1408        if( inlen < 8 )
1409            return READ_LATER;
1410
1411        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1412        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1413        req->length = msgs->incoming.length - 9;
1414        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1415        return READ_NOW;
1416    }
1417    else
1418    {
1419        int err;
1420
1421        /* read in another chunk of data */
1422        const size_t nLeft = req->length - EVBUFFER_LENGTH( msgs->incoming.block );
1423        size_t n = MIN( nLeft, inlen );
1424        uint8_t * buf = tr_new( uint8_t, n );
1425        assert( EVBUFFER_LENGTH( inbuf ) >= n );
1426        tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, n );
1427        evbuffer_add( msgs->incoming.block, buf, n );
1428        fireClientGotData( msgs, n, TRUE );
1429        *setme_piece_bytes_read += n;
1430        tr_free( buf );
1431        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1432               n, req->index, req->offset, req->length,
1433               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1434        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1435            return READ_LATER;
1436
1437        /* we've got the whole block ... process it */
1438        err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), req );
1439
1440        /* cleanup */
1441        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH( msgs->incoming.block ) );
1442        req->length = 0;
1443        msgs->state = AWAITING_BT_LENGTH;
1444        if( !err )
1445            return READ_NOW;
1446        else {
1447            fireError( msgs, err );
1448            return READ_ERR;
1449        }
1450    }
1451}
1452
1453static int
1454readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1455{
1456    uint32_t      ui32;
1457    uint32_t      msglen = msgs->incoming.length;
1458    const uint8_t id = msgs->incoming.id;
1459    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1460    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1461
1462    --msglen; /* id length */
1463
1464    if( inlen < msglen )
1465        return READ_LATER;
1466
1467    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1468
1469    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1470    {
1471        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1472        fireError( msgs, EMSGSIZE );
1473        return READ_ERR;
1474    }
1475
1476    switch( id )
1477    {
1478        case BT_CHOKE:
1479            dbgmsg( msgs, "got Choke" );
1480            msgs->peer->clientIsChoked = 1;
1481            if( !fext )
1482                cancelAllRequestsToPeer( msgs, FALSE );
1483            break;
1484
1485        case BT_UNCHOKE:
1486            dbgmsg( msgs, "got Unchoke" );
1487            msgs->peer->clientIsChoked = 0;
1488            fireNeedReq( msgs );
1489            break;
1490
1491        case BT_INTERESTED:
1492            dbgmsg( msgs, "got Interested" );
1493            msgs->peer->peerIsInterested = 1;
1494            break;
1495
1496        case BT_NOT_INTERESTED:
1497            dbgmsg( msgs, "got Not Interested" );
1498            msgs->peer->peerIsInterested = 0;
1499            break;
1500
1501        case BT_HAVE:
1502            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1503            dbgmsg( msgs, "got Have: %u", ui32 );
1504            if( tr_bitfieldAdd( msgs->peer->have, ui32 ) ) {
1505                fireError( msgs, ERANGE );
1506                return READ_ERR;
1507            }
1508            updatePeerProgress( msgs );
1509            tr_rcTransferred( msgs->torrent->swarmSpeed,
1510                              msgs->torrent->info.pieceSize );
1511            break;
1512
1513        case BT_BITFIELD:
1514        {
1515            dbgmsg( msgs, "got a bitfield" );
1516            msgs->peerSentBitfield = 1;
1517            tr_peerIoReadBytes( msgs->peer->io, inbuf, msgs->peer->have->bits, msglen );
1518            updatePeerProgress( msgs );
1519            fireNeedReq( msgs );
1520            break;
1521        }
1522
1523        case BT_REQUEST:
1524        {
1525            struct peer_request r;
1526            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1527            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1528            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1529            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1530            peerMadeRequest( msgs, &r );
1531            break;
1532        }
1533
1534        case BT_CANCEL:
1535        {
1536            struct peer_request r;
1537            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1538            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1539            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1540            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1541            if( reqListRemove( &msgs->peerAskedFor, &r ) && fext )
1542                protocolSendReject( msgs, &r );
1543            break;
1544        }
1545
1546        case BT_PIECE:
1547            assert( 0 ); /* handled elsewhere! */
1548            break;
1549
1550        case BT_PORT:
1551            dbgmsg( msgs, "Got a BT_PORT" );
1552            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->port );
1553            break;
1554
1555        case BT_FEXT_SUGGEST:
1556            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1557            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1558            if( fext )
1559                fireClientGotSuggest( msgs, ui32 );
1560            else {
1561                fireError( msgs, EMSGSIZE );
1562                return READ_ERR;
1563            }
1564            break;
1565
1566        case BT_FEXT_ALLOWED_FAST:
1567            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1568            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1569            if( fext )
1570                fireClientGotAllowedFast( msgs, ui32 );
1571            else {
1572                fireError( msgs, EMSGSIZE );
1573                return READ_ERR;
1574            }
1575            break;
1576
1577        case BT_FEXT_HAVE_ALL:
1578            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1579            if( fext ) {
1580                tr_bitfieldAddRange( msgs->peer->have, 0, msgs->torrent->info.pieceCount );
1581                updatePeerProgress( msgs );
1582            } else {
1583                fireError( msgs, EMSGSIZE );
1584                return READ_ERR;
1585            }
1586            break;
1587
1588
1589        case BT_FEXT_HAVE_NONE:
1590            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1591            if( fext ) {
1592                tr_bitfieldClear( msgs->peer->have );
1593                updatePeerProgress( msgs );
1594            } else {
1595                fireError( msgs, EMSGSIZE );
1596                return READ_ERR;
1597            }
1598            break;
1599
1600        case BT_FEXT_REJECT:
1601        {
1602            struct peer_request r;
1603            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1604            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1605            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1606            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1607            if( fext )
1608                reqListRemove( &msgs->clientAskedFor, &r );
1609            else {
1610                fireError( msgs, EMSGSIZE );
1611                return READ_ERR;
1612            }
1613            break;
1614        }
1615
1616        case BT_LTEP:
1617            dbgmsg( msgs, "Got a BT_LTEP" );
1618            parseLtep( msgs, msglen, inbuf );
1619            break;
1620
1621        default:
1622            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1623            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1624            break;
1625    }
1626
1627    assert( msglen + 1 == msgs->incoming.length );
1628    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1629
1630    msgs->state = AWAITING_BT_LENGTH;
1631    return READ_NOW;
1632}
1633
1634static void
1635decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount )
1636{
1637    tr_torrent * tor = msgs->torrent;
1638
1639    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1640}
1641
1642static void
1643clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1644{
1645    decrementDownloadedCount( msgs, req->length );
1646}
1647
1648static void
1649addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1650{
1651    if( !msgs->peer->blame )
1652         msgs->peer->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1653    tr_bitfieldAdd( msgs->peer->blame, index );
1654}
1655
1656/* returns 0 on success, or an errno on failure */
1657static int
1658clientGotBlock( tr_peermsgs *               msgs,
1659                const uint8_t *             data,
1660                const struct peer_request * req )
1661{
1662    int err;
1663    tr_torrent * tor = msgs->torrent;
1664    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1665
1666    assert( msgs );
1667    assert( req );
1668
1669    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1670        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1671                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1672        return EMSGSIZE;
1673    }
1674
1675    /* save the block */
1676    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1677
1678    /**
1679    *** Remove the block from our `we asked for this' list
1680    **/
1681
1682    if( !reqListRemove( &msgs->clientAskedFor, req ) ) {
1683        clientGotUnwantedBlock( msgs, req );
1684        dbgmsg( msgs, "we didn't ask for this message..." );
1685        return 0;
1686    }
1687
1688    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1689            msgs->clientAskedFor.count );
1690
1691    /**
1692    *** Error checks
1693    **/
1694
1695    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1696        dbgmsg( msgs, "we have this block already..." );
1697        clientGotUnwantedBlock( msgs, req );
1698        return 0;
1699    }
1700
1701    /**
1702    ***  Save the block
1703    **/
1704
1705    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1706        return err;
1707
1708    addPeerToBlamefield( msgs, req->index );
1709    fireGotBlock( msgs, req );
1710    return 0;
1711}
1712
1713static int peerPulse( void * vmsgs );
1714
1715static void
1716didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1717{
1718    tr_peermsgs * msgs = vmsgs;
1719    firePeerGotData( msgs, bytesWritten, wasPieceData );
1720    peerPulse( msgs );
1721}
1722
1723static ReadState
1724canRead( struct tr_iobuf * iobuf, void * vmsgs, size_t * piece )
1725{
1726    ReadState         ret;
1727    tr_peermsgs *     msgs = vmsgs;
1728    struct evbuffer * in = tr_iobuf_input( iobuf );
1729    const size_t      inlen = EVBUFFER_LENGTH( in );
1730
1731    if( !inlen )
1732    {
1733        ret = READ_LATER;
1734    }
1735    else if( msgs->state == AWAITING_BT_PIECE )
1736    {
1737        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1738    }
1739    else switch( msgs->state )
1740    {
1741        case AWAITING_BT_LENGTH:
1742            ret = readBtLength ( msgs, in, inlen ); break;
1743
1744        case AWAITING_BT_ID:
1745            ret = readBtId     ( msgs, in, inlen ); break;
1746
1747        case AWAITING_BT_MESSAGE:
1748            ret = readBtMessage( msgs, in, inlen ); break;
1749
1750        default:
1751            assert( 0 );
1752    }
1753
1754    /* log the raw data that was read */
1755    if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
1756        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1757
1758    return ret;
1759}
1760
1761/**
1762***
1763**/
1764
1765static int
1766ratePulse( void * vmsgs )
1767{
1768    tr_peermsgs * msgs = vmsgs;
1769    const double rateToClient = tr_peerGetPieceSpeed( msgs->peer, TR_PEER_TO_CLIENT );
1770    const int estimatedBlocksInNext30Seconds =
1771                  ( rateToClient * 30 * 1024 ) / msgs->torrent->blockSize;
1772    msgs->minActiveRequests = 8;
1773    msgs->maxActiveRequests = msgs->minActiveRequests + estimatedBlocksInNext30Seconds;
1774    if( msgs->reqq > 0 )
1775        msgs->maxActiveRequests = MIN( msgs->maxActiveRequests, msgs->reqq );
1776    return TRUE;
1777}
1778
1779static size_t
1780fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1781{
1782    size_t bytesWritten = 0;
1783    struct peer_request req;
1784    const int haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1785    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1786
1787    /**
1788    ***  Protocol messages
1789    **/
1790
1791    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1792    {
1793        dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1794        msgs->outMessagesBatchedAt = now;
1795    }
1796    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1797    {
1798        const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
1799        /* flush the protocol messages */
1800        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1801        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, FALSE );
1802        msgs->clientSentAnythingAt = now;
1803        msgs->outMessagesBatchedAt = 0;
1804        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1805        bytesWritten +=  len;
1806    }
1807
1808    /**
1809    ***  Blocks
1810    **/
1811
1812    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io ) >= msgs->torrent->blockSize )
1813        && popNextRequest( msgs, &req ) )
1814    {
1815        if( requestIsValid( msgs, &req )
1816            && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) )
1817        {
1818            /* send a block */
1819            uint8_t * buf = tr_new( uint8_t, req.length );
1820            const int err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf );
1821            if( err ) {
1822                fireError( msgs, err );
1823                bytesWritten = 0;
1824                msgs = NULL;
1825            } else {
1826                tr_peerIo * io = msgs->peer->io;
1827                struct evbuffer * out = evbuffer_new( );
1828                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1829                tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1830                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1831                tr_peerIoWriteUint32( io, out, req.index );
1832                tr_peerIoWriteUint32( io, out, req.offset );
1833                tr_peerIoWriteBytes ( io, out, buf, req.length );
1834                tr_peerIoWriteBuf( io, out, TRUE );
1835                bytesWritten += EVBUFFER_LENGTH( out );
1836                evbuffer_free( out );
1837                msgs->clientSentAnythingAt = now;
1838            }
1839            tr_free( buf );
1840        }
1841        else if( fext ) /* peer needs a reject message */
1842        {
1843            protocolSendReject( msgs, &req );
1844        }
1845    }
1846
1847    /**
1848    ***  Keepalive
1849    **/
1850
1851    if( ( msgs != NULL )
1852        && ( msgs->clientSentAnythingAt != 0 )
1853        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1854    {
1855        dbgmsg( msgs, "sending a keepalive message" );
1856        tr_peerIoWriteUint32( msgs->peer->io, msgs->outMessages, 0 );
1857        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1858    }
1859
1860    return bytesWritten;
1861}
1862
1863static int
1864peerPulse( void * vmsgs )
1865{
1866    tr_peermsgs * msgs = vmsgs;
1867    const time_t  now = time( NULL );
1868
1869    ratePulse( msgs );
1870
1871    pumpRequestQueue( msgs, now );
1872    expireOldRequests( msgs, now );
1873
1874    for( ;; )
1875        if( fillOutputBuffer( msgs, now ) < 1 )
1876            break;
1877
1878    return TRUE; /* loop forever */
1879}
1880
1881void
1882tr_peerMsgsPulse( tr_peermsgs * msgs )
1883{
1884    if( msgs != NULL )
1885        peerPulse( msgs );
1886}
1887
1888static void
1889gotError( struct tr_iobuf  * iobuf UNUSED,
1890          short              what,
1891          void             * vmsgs )
1892{
1893    if( what & EVBUFFER_TIMEOUT )
1894        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1895    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1896        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1897               what, errno, tr_strerror( errno ) );
1898    fireError( vmsgs, ENOTCONN );
1899}
1900
1901static void
1902sendBitfield( tr_peermsgs * msgs )
1903{
1904    struct evbuffer * out = msgs->outMessages;
1905    tr_bitfield *     field;
1906    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
1907    size_t            i;
1908    size_t            lazyCount = 0;
1909
1910    field = tr_bitfieldDup( tr_cpPieceBitfield( msgs->torrent->completion ) );
1911
1912    if( tr_sessionIsLazyBitfieldEnabled( msgs->session ) )
1913    {
1914        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
1915            speed over a truly random sample -- let's limit the pool size to
1916            the first 1000 pieces so large torrents don't bog things down */
1917        size_t poolSize;
1918        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
1919        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
1920
1921        /* build the pool */
1922        for( i=poolSize=0; i<maxPoolSize; ++i )
1923            if( tr_bitfieldHas( field, i ) )
1924                pool[poolSize++] = i;
1925
1926        /* pull random piece indices from the pool */
1927        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
1928        {
1929            const int pos = tr_cryptoWeakRandInt( poolSize );
1930            const tr_piece_index_t piece = pool[pos];
1931            tr_bitfieldRem( field, piece );
1932            lazyPieces[lazyCount++] = piece;
1933            pool[pos] = pool[--poolSize];
1934        }
1935
1936        /* cleanup */
1937        tr_free( pool );
1938    }
1939
1940    tr_peerIoWriteUint32( msgs->peer->io, out,
1941                          sizeof( uint8_t ) + field->byteCount );
1942    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_BITFIELD );
1943    tr_peerIoWriteBytes ( msgs->peer->io, out, field->bits, field->byteCount );
1944    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu",
1945            EVBUFFER_LENGTH( out ) );
1946    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1947
1948    for( i = 0; i < lazyCount; ++i )
1949        protocolSendHave( msgs, lazyPieces[i] );
1950
1951    tr_bitfieldFree( field );
1952}
1953
1954static void
1955tellPeerWhatWeHave( tr_peermsgs * msgs )
1956{
1957    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1958
1959    if( fext && ( tr_cpGetStatus( msgs->torrent->completion ) == TR_SEED ) )
1960    {
1961        protocolSendHaveAll( msgs );
1962    }
1963    else if( fext && ( tr_cpHaveValid( msgs->torrent->completion ) == 0 ) )
1964    {
1965        protocolSendHaveNone( msgs );
1966    }
1967    else
1968    {
1969        sendBitfield( msgs );
1970    }
1971}
1972
1973/**
1974***
1975**/
1976
1977/* some peers give us error messages if we send
1978   more than this many peers in a single pex message
1979   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1980#define MAX_PEX_ADDED 50
1981#define MAX_PEX_DROPPED 50
1982
1983typedef struct
1984{
1985    tr_pex *  added;
1986    tr_pex *  dropped;
1987    tr_pex *  elements;
1988    int       addedCount;
1989    int       droppedCount;
1990    int       elementCount;
1991}
1992PexDiffs;
1993
1994static void
1995pexAddedCb( void * vpex,
1996            void * userData )
1997{
1998    PexDiffs * diffs = userData;
1999    tr_pex *   pex = vpex;
2000
2001    if( diffs->addedCount < MAX_PEX_ADDED )
2002    {
2003        diffs->added[diffs->addedCount++] = *pex;
2004        diffs->elements[diffs->elementCount++] = *pex;
2005    }
2006}
2007
2008static void
2009pexDroppedCb( void * vpex,
2010              void * userData )
2011{
2012    PexDiffs * diffs = userData;
2013    tr_pex *   pex = vpex;
2014
2015    if( diffs->droppedCount < MAX_PEX_DROPPED )
2016    {
2017        diffs->dropped[diffs->droppedCount++] = *pex;
2018    }
2019}
2020
2021static void
2022pexElementCb( void * vpex,
2023              void * userData )
2024{
2025    PexDiffs * diffs = userData;
2026    tr_pex *   pex = vpex;
2027
2028    diffs->elements[diffs->elementCount++] = *pex;
2029}
2030
2031static void
2032sendPex( tr_peermsgs * msgs )
2033{
2034    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2035    {
2036        PexDiffs diffs;
2037        PexDiffs diffs6;
2038        tr_pex * newPex = NULL;
2039        tr_pex * newPex6 = NULL;
2040        const int newCount = tr_peerMgrGetPeers( msgs->session->peerMgr,
2041                                                 msgs->torrent->info.hash,
2042                                                 &newPex, TR_AF_INET );
2043        const int newCount6 = tr_peerMgrGetPeers( msgs->session->peerMgr,
2044                                                  msgs->torrent->info.hash,
2045                                                  &newPex6, TR_AF_INET6 );
2046
2047        /* build the diffs */
2048        diffs.added = tr_new( tr_pex, newCount );
2049        diffs.addedCount = 0;
2050        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2051        diffs.droppedCount = 0;
2052        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2053        diffs.elementCount = 0;
2054        tr_set_compare( msgs->pex, msgs->pexCount,
2055                        newPex, newCount,
2056                        tr_pexCompare, sizeof( tr_pex ),
2057                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2058        diffs6.added = tr_new( tr_pex, newCount6 );
2059        diffs6.addedCount = 0;
2060        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
2061        diffs6.droppedCount = 0;
2062        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
2063        diffs6.elementCount = 0;
2064        tr_set_compare( msgs->pex6, msgs->pexCount6,
2065                        newPex6, newCount6,
2066                        tr_pexCompare, sizeof( tr_pex ),
2067                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
2068        dbgmsg(
2069            msgs,
2070            "pex: old peer count %d, new peer count %d, added %d, removed %d",
2071            msgs->pexCount, newCount + newCount6,
2072            diffs.addedCount + diffs6.addedCount,
2073            diffs.droppedCount + diffs6.droppedCount );
2074
2075        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2076            !diffs6.droppedCount )
2077        {
2078            tr_free( diffs.elements );
2079            tr_free( diffs6.elements );
2080        }
2081        else
2082        {
2083            int  i;
2084            tr_benc val;
2085            char * benc;
2086            int bencLen;
2087            uint8_t * tmp, *walk;
2088            struct evbuffer * out = msgs->outMessages;
2089
2090            /* update peer */
2091            tr_free( msgs->pex );
2092            msgs->pex = diffs.elements;
2093            msgs->pexCount = diffs.elementCount;
2094            tr_free( msgs->pex6 );
2095            msgs->pex6 = diffs6.elements;
2096            msgs->pexCount6 = diffs6.elementCount;
2097
2098            /* build the pex payload */
2099            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
2100                                         * speed vs. likelihood? */
2101
2102            /* "added" */
2103            tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2104            for( i = 0; i < diffs.addedCount; ++i )
2105            {
2106                memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2107                memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2108            }
2109            assert( ( walk - tmp ) == diffs.addedCount * 6 );
2110            tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2111            tr_free( tmp );
2112
2113            /* "added.f" */
2114            tmp = walk = tr_new( uint8_t, diffs.addedCount );
2115            for( i = 0; i < diffs.addedCount; ++i )
2116                *walk++ = diffs.added[i].flags;
2117            assert( ( walk - tmp ) == diffs.addedCount );
2118            tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2119            tr_free( tmp );
2120
2121            /* "dropped" */
2122            tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2123            for( i = 0; i < diffs.droppedCount; ++i )
2124            {
2125                memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2126                memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2127            }
2128            assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2129            tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2130            tr_free( tmp );
2131           
2132            /* "added6" */
2133            tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2134            for( i = 0; i < diffs6.addedCount; ++i )
2135            {
2136                memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2137                walk += 16;
2138                memcpy( walk, &diffs6.added[i].port, 2 );
2139                walk += 2;
2140            }
2141            assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2142            tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2143            tr_free( tmp );
2144           
2145            /* "added6.f" */
2146            tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2147            for( i = 0; i < diffs6.addedCount; ++i )
2148                *walk++ = diffs6.added[i].flags;
2149            assert( ( walk - tmp ) == diffs6.addedCount );
2150            tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2151            tr_free( tmp );
2152           
2153            /* "dropped6" */
2154            tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2155            for( i = 0; i < diffs6.droppedCount; ++i )
2156            {
2157                memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2158                walk += 16;
2159                memcpy( walk, &diffs6.dropped[i].port, 2 );
2160                walk += 2;
2161            }
2162            assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2163            tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2164            tr_free( tmp );
2165
2166            /* write the pex message */
2167            benc = tr_bencSave( &val, &bencLen );
2168            tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + bencLen );
2169            tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
2170            tr_peerIoWriteUint8 ( msgs->peer->io, out, msgs->ut_pex_id );
2171            tr_peerIoWriteBytes ( msgs->peer->io, out, benc, bencLen );
2172            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2173            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
2174
2175            tr_free( benc );
2176            tr_bencFree( &val );
2177        }
2178
2179        /* cleanup */
2180        tr_free( diffs.added );
2181        tr_free( diffs.dropped );
2182        tr_free( newPex );
2183        tr_free( diffs6.added );
2184        tr_free( diffs6.dropped );
2185        tr_free( newPex6 );
2186
2187        msgs->clientSentPexAt = time( NULL );
2188    }
2189}
2190
2191static int
2192pexPulse( void * vpeer )
2193{
2194    sendPex( vpeer );
2195    return TRUE;
2196}
2197
2198/**
2199***
2200**/
2201
2202tr_peermsgs*
2203tr_peerMsgsNew( struct tr_torrent * torrent,
2204                struct tr_peer *    peer,
2205                tr_delivery_func    func,
2206                void *              userData,
2207                tr_publisher_tag *  setme )
2208{
2209    tr_peermsgs * m;
2210
2211    assert( peer );
2212    assert( peer->io );
2213
2214    m = tr_new0( tr_peermsgs, 1 );
2215    m->publisher = tr_publisherNew( );
2216    m->peer = peer;
2217    m->session = torrent->session;
2218    m->torrent = torrent;
2219    m->peer->clientIsChoked = 1;
2220    m->peer->peerIsChoked = 1;
2221    m->peer->clientIsInterested = 0;
2222    m->peer->peerIsInterested = 0;
2223    m->peer->have = tr_bitfieldNew( torrent->info.pieceCount );
2224    m->state = AWAITING_BT_LENGTH;
2225    m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL );
2226    m->outMessages = evbuffer_new( );
2227    m->outMessagesBatchedAt = 0;
2228    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2229    m->incoming.block = evbuffer_new( );
2230    m->peerAskedFor = REQUEST_LIST_INIT;
2231    m->clientAskedFor = REQUEST_LIST_INIT;
2232    m->clientWillAskFor = REQUEST_LIST_INIT;
2233    peer->msgs = m;
2234
2235    *setme = tr_publisherSubscribe( m->publisher, func, userData );
2236
2237    if( tr_peerIoSupportsLTEP( peer->io ) )
2238        sendLtepHandshake( m );
2239
2240    tellPeerWhatWeHave( m );
2241
2242    tr_peerIoSetTimeoutSecs( m->peer->io, 150 ); /* timeout after N seconds of inactivity */
2243    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2244    ratePulse( m );
2245
2246    return m;
2247}
2248
2249void
2250tr_peerMsgsFree( tr_peermsgs* msgs )
2251{
2252    if( msgs )
2253    {
2254        tr_timerFree( &msgs->pexTimer );
2255        tr_publisherFree( &msgs->publisher );
2256        reqListClear( &msgs->clientWillAskFor );
2257        reqListClear( &msgs->clientAskedFor );
2258        reqListClear( &msgs->peerAskedFor );
2259
2260        evbuffer_free( msgs->incoming.block );
2261        evbuffer_free( msgs->outMessages );
2262        tr_free( msgs->pex6 );
2263        tr_free( msgs->pex );
2264
2265        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2266        tr_free( msgs );
2267    }
2268}
2269
2270void
2271tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2272                        tr_publisher_tag tag )
2273{
2274    tr_publisherUnsubscribe( peer->publisher, tag );
2275}
2276
Note: See TracBrowser for help on using the repository browser.