source: trunk/libtransmission/peer-msgs.c @ 7448

Last change on this file since 7448 was 7448, checked in by jhujhiti, 12 years ago

add some debugging stuff to track down where some bogus addresses are coming from

  • Property svn:keywords set to Date Rev Author Id
File size: 65.1 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 7448 2008-12-21 19:13:52Z jhujhiti $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <event.h>
21
22#include "transmission.h"
23#include "bencode.h"
24#include "completion.h"
25#include "crypto.h"
26#include "inout.h"
27#ifdef WIN32
28#include "net.h" /* for ECONN */
29#endif
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-mgr-private.h"
33#include "peer-msgs.h"
34#include "ratecontrol.h"
35#include "stats.h"
36#include "torrent.h"
37#include "trevent.h"
38#include "utils.h"
39
40/**
41***
42**/
43
44enum
45{
46    BT_CHOKE                = 0,
47    BT_UNCHOKE              = 1,
48    BT_INTERESTED           = 2,
49    BT_NOT_INTERESTED       = 3,
50    BT_HAVE                 = 4,
51    BT_BITFIELD             = 5,
52    BT_REQUEST              = 6,
53    BT_PIECE                = 7,
54    BT_CANCEL               = 8,
55    BT_PORT                 = 9,
56
57    BT_FEXT_SUGGEST         = 13,
58    BT_FEXT_HAVE_ALL        = 14,
59    BT_FEXT_HAVE_NONE       = 15,
60    BT_FEXT_REJECT          = 16,
61    BT_FEXT_ALLOWED_FAST    = 17,
62
63    BT_LTEP                 = 20,
64
65    LTEP_HANDSHAKE          = 0,
66
67    TR_LTEP_PEX             = 1,
68
69
70
71    MIN_CHOKE_PERIOD_SEC    = ( 10 ),
72
73    /* idle seconds before we send a keepalive */
74    KEEPALIVE_INTERVAL_SECS = 100,
75
76    PEX_INTERVAL            = ( 90 * 1000 ), /* msec between sendPex() calls */
77
78    MAX_QUEUE_SIZE          = ( 100 ),
79
80    /* how long an unsent request can stay queued before it's returned
81       back to the peer-mgr's pool of requests */
82    QUEUED_REQUEST_TTL_SECS = 20,
83
84    /* how long a sent request can stay queued before it's returned
85       back to the peer-mgr's pool of requests */
86    SENT_REQUEST_TTL_SECS = 240,
87
88    /* used in lowering the outMessages queue period */
89    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
90    HIGH_PRIORITY_INTERVAL_SECS = 2,
91    LOW_PRIORITY_INTERVAL_SECS = 20,
92
93    /* number of pieces to remove from the bitfield when
94     * lazy bitfields are turned on */
95    LAZY_PIECE_COUNT = 26,
96
97    /* number of pieces we'll allow in our fast set */
98    MAX_FAST_SET_SIZE = 3
99};
100
101/**
102***  REQUEST MANAGEMENT
103**/
104
105enum
106{
107    AWAITING_BT_LENGTH,
108    AWAITING_BT_ID,
109    AWAITING_BT_MESSAGE,
110    AWAITING_BT_PIECE
111};
112
113struct peer_request
114{
115    uint32_t    index;
116    uint32_t    offset;
117    uint32_t    length;
118    time_t      time_requested;
119};
120
121static int
122compareRequest( const void * va, const void * vb )
123{
124    const struct peer_request * a = va;
125    const struct peer_request * b = vb;
126
127    if( a->index != b->index )
128        return a->index < b->index ? -1 : 1;
129
130    if( a->offset != b->offset )
131        return a->offset < b->offset ? -1 : 1;
132
133    if( a->length != b->length )
134        return a->length < b->length ? -1 : 1;
135
136    return 0;
137}
138
139struct request_list
140{
141    uint16_t               count;
142    uint16_t               max;
143    struct peer_request *  requests;
144};
145
146static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
147
148static void
149reqListReserve( struct request_list * list,
150                uint16_t              max )
151{
152    if( list->max < max )
153    {
154        list->max = max;
155        list->requests = tr_renew( struct peer_request,
156                                   list->requests,
157                                   list->max );
158    }
159}
160
161static void
162reqListClear( struct request_list * list )
163{
164    tr_free( list->requests );
165    *list = REQUEST_LIST_INIT;
166}
167
168static void
169reqListCopy( struct request_list * dest, const struct request_list * src )
170{
171    dest->count = dest->max = src->count;
172    dest->requests = tr_memdup( src->requests, dest->count * sizeof( struct peer_request ) );
173}
174
175static void
176reqListRemoveOne( struct request_list * list,
177                  int                   i )
178{
179    assert( 0 <= i && i < list->count );
180
181    memmove( &list->requests[i],
182             &list->requests[i + 1],
183             sizeof( struct peer_request ) * ( --list->count - i ) );
184}
185
186static void
187reqListAppend( struct request_list *       list,
188               const struct peer_request * req )
189{
190    if( ++list->count >= list->max )
191        reqListReserve( list, list->max + 8 );
192
193    list->requests[list->count - 1] = *req;
194}
195
196static int
197reqListPop( struct request_list * list,
198            struct peer_request * setme )
199{
200    int success;
201
202    if( !list->count )
203        success = FALSE;
204    else {
205        *setme = list->requests[0];
206        reqListRemoveOne( list, 0 );
207        success = TRUE;
208    }
209
210    return success;
211}
212
213static int
214reqListFind( struct request_list *       list,
215             const struct peer_request * key )
216{
217    uint16_t i;
218
219    for( i = 0; i < list->count; ++i )
220        if( !compareRequest( key, list->requests + i ) )
221            return i;
222
223    return -1;
224}
225
226static int
227reqListRemove( struct request_list *       list,
228               const struct peer_request * key )
229{
230    int success;
231    const int i = reqListFind( list, key );
232
233    if( i < 0 )
234        success = FALSE;
235    else {
236        reqListRemoveOne( list, i );
237        success = TRUE;
238    }
239
240    return success;
241}
242
243/**
244***
245**/
246
247/* this is raw, unchanged data from the peer regarding
248 * the current message that it's sending us. */
249struct tr_incoming
250{
251    uint8_t                id;
252    uint32_t               length; /* includes the +1 for id length */
253    struct peer_request    blockReq; /* metadata for incoming blocks */
254    struct evbuffer *      block; /* piece data for incoming blocks */
255};
256
257/**
258 * Low-level communication state information about a connected peer.
259 *
260 * This structure remembers the low-level protocol states that we're
261 * in with this peer, such as active requests, pex messages, and so on.
262 * Its fields are all private to peer-msgs.c.
263 *
264 * Data not directly involved with sending & receiving messages is
265 * stored in tr_peer, where it can be accessed by both peermsgs and
266 * the peer manager.
267 *
268 * @see struct peer_atom
269 * @see tr_peer
270 */
271struct tr_peermsgs
272{
273    tr_bool         peerSentBitfield;
274    tr_bool         peerSupportsPex;
275    tr_bool         clientSentLtepHandshake;
276    tr_bool         peerSentLtepHandshake;
277    tr_bool         haveFastSet;
278
279    uint8_t         state;
280    uint8_t         ut_pex_id;
281    uint16_t        pexCount;
282    uint16_t        pexCount6;
283    uint16_t        maxActiveRequests;
284
285    size_t                 fastsetSize;
286    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
287
288    /* how long the outMessages batch should be allowed to grow before
289     * it's flushed -- some messages (like requests >:) should be sent
290     * very quickly; others aren't as urgent. */
291    int                    outMessagesBatchPeriod;
292
293    tr_peer *              peer;
294
295    tr_session *           session;
296    tr_torrent *           torrent;
297
298    tr_publisher_t *       publisher;
299
300    struct evbuffer *      outMessages; /* all the non-piece messages */
301
302    struct request_list    peerAskedFor;
303    struct request_list    clientAskedFor;
304    struct request_list    clientWillAskFor;
305
306    tr_timer             * pexTimer;
307    tr_pex               * pex;
308    tr_pex               * pex6;
309
310    time_t                 clientSentPexAt;
311    time_t                 clientSentAnythingAt;
312
313    /* when we started batching the outMessages */
314    time_t                outMessagesBatchedAt;
315
316    struct tr_incoming    incoming;
317
318    /* if the peer supports the Extension Protocol in BEP 10 and
319       supplied a reqq argument, it's stored here.  otherwise the
320       value is zero and should be ignored. */
321    int64_t               reqq;
322};
323
324/**
325***
326**/
327
328static void
329myDebug( const char * file, int line,
330         const struct tr_peermsgs * msgs,
331         const char * fmt, ... )
332{
333    FILE * fp = tr_getLog( );
334
335    if( fp )
336    {
337        va_list           args;
338        char              timestr[64];
339        struct evbuffer * buf = evbuffer_new( );
340        char *            base = tr_basename( file );
341
342        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
343                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
344                             msgs->torrent->info.name,
345                             tr_peerIoGetAddrStr( msgs->peer->io ),
346                             msgs->peer->client );
347        va_start( args, fmt );
348        evbuffer_add_vprintf( buf, fmt, args );
349        va_end( args );
350        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
351        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
352
353        tr_free( base );
354        evbuffer_free( buf );
355    }
356}
357
358#define dbgmsg( msgs, ... ) \
359    do { \
360        if( tr_deepLoggingIsActive( ) ) \
361            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
362    } while( 0 )
363
364/**
365***
366**/
367
368static void
369pokeBatchPeriod( tr_peermsgs * msgs,
370                 int           interval )
371{
372    if( msgs->outMessagesBatchPeriod > interval )
373    {
374        msgs->outMessagesBatchPeriod = interval;
375        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
376    }
377}
378
379static void
380dbgOutMessageLen( tr_peermsgs * msgs )
381{
382    dbgmsg( msgs, "outMessage size is now %zu", EVBUFFER_LENGTH( msgs->outMessages ) );
383}
384
385static void
386protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
387{
388    tr_peerIo       * io  = msgs->peer->io;
389    struct evbuffer * out = msgs->outMessages;
390
391    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
392
393    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
394    tr_peerIoWriteUint8 ( io, out, BT_FEXT_REJECT );
395    tr_peerIoWriteUint32( io, out, req->index );
396    tr_peerIoWriteUint32( io, out, req->offset );
397    tr_peerIoWriteUint32( io, out, req->length );
398
399    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
400    dbgOutMessageLen( msgs );
401}
402
403static void
404protocolSendRequest( tr_peermsgs *               msgs,
405                     const struct peer_request * req )
406{
407    tr_peerIo       * io  = msgs->peer->io;
408    struct evbuffer * out = msgs->outMessages;
409
410    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
411    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
412    tr_peerIoWriteUint32( io, out, req->index );
413    tr_peerIoWriteUint32( io, out, req->offset );
414    tr_peerIoWriteUint32( io, out, req->length );
415
416    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
417    dbgOutMessageLen( msgs );
418    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
419}
420
421static void
422protocolSendCancel( tr_peermsgs *               msgs,
423                    const struct peer_request * req )
424{
425    tr_peerIo       * io  = msgs->peer->io;
426    struct evbuffer * out = msgs->outMessages;
427
428    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
429    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
430    tr_peerIoWriteUint32( io, out, req->index );
431    tr_peerIoWriteUint32( io, out, req->offset );
432    tr_peerIoWriteUint32( io, out, req->length );
433
434    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
435    dbgOutMessageLen( msgs );
436    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
437}
438
439static void
440protocolSendHave( tr_peermsgs * msgs,
441                  uint32_t      index )
442{
443    tr_peerIo       * io  = msgs->peer->io;
444    struct evbuffer * out = msgs->outMessages;
445
446    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
447    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
448    tr_peerIoWriteUint32( io, out, index );
449
450    dbgmsg( msgs, "sending Have %u...", index );
451    dbgOutMessageLen( msgs );
452    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
453}
454
455#if 0
456static void
457protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
458{
459    tr_peerIo       * io  = msgs->peer->io;
460    struct evbuffer * out = msgs->outMessages;
461
462    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
463
464    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
465    tr_peerIoWriteUint8 ( io, out, BT_FEXT_ALLOWED_FAST );
466    tr_peerIoWriteUint32( io, out, pieceIndex );
467
468    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
469    dbgOutMessageLen( msgs );
470}
471#endif
472
473static void
474protocolSendChoke( tr_peermsgs * msgs,
475                   int           choke )
476{
477    tr_peerIo       * io  = msgs->peer->io;
478    struct evbuffer * out = msgs->outMessages;
479
480    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
481    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
482
483    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
484    dbgOutMessageLen( msgs );
485    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
486}
487
488static void
489protocolSendHaveAll( tr_peermsgs * msgs )
490{
491    tr_peerIo       * io  = msgs->peer->io;
492    struct evbuffer * out = msgs->outMessages;
493
494    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
495
496    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
497    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_ALL );
498
499    dbgmsg( msgs, "sending HAVE_ALL..." );
500    dbgOutMessageLen( msgs );
501    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
502}
503
504static void
505protocolSendHaveNone( tr_peermsgs * msgs )
506{
507    tr_peerIo       * io  = msgs->peer->io;
508    struct evbuffer * out = msgs->outMessages;
509
510    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
511
512    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
513    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_NONE );
514
515    dbgmsg( msgs, "sending HAVE_NONE..." );
516    dbgOutMessageLen( msgs );
517    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
518}
519
520/**
521***  EVENTS
522**/
523
524static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0, 0 };
525
526static void
527publish( tr_peermsgs * msgs, tr_peer_event * e )
528{
529    assert( msgs->peer );
530    assert( msgs->peer->msgs == msgs );
531
532    tr_publisherPublish( msgs->publisher, msgs->peer, e );
533}
534
535static void
536fireError( tr_peermsgs * msgs, int err )
537{
538    tr_peer_event e = blankEvent;
539    e.eventType = TR_PEER_ERROR;
540    e.err = err;
541    publish( msgs, &e );
542}
543
544static void
545fireUploadOnly( tr_peermsgs * msgs, tr_bool uploadOnly )
546{
547    tr_peer_event e = blankEvent;
548    e.eventType = TR_PEER_UPLOAD_ONLY;
549    e.uploadOnly = uploadOnly;
550    publish( msgs, &e );
551}
552
553static void
554fireNeedReq( tr_peermsgs * msgs )
555{
556    tr_peer_event e = blankEvent;
557    e.eventType = TR_PEER_NEED_REQ;
558    publish( msgs, &e );
559}
560
561static void
562firePeerProgress( tr_peermsgs * msgs )
563{
564    tr_peer_event e = blankEvent;
565    e.eventType = TR_PEER_PEER_PROGRESS;
566    e.progress = msgs->peer->progress;
567    publish( msgs, &e );
568}
569
570static void
571fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
572{
573    tr_peer_event e = blankEvent;
574    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
575    e.pieceIndex = req->index;
576    e.offset = req->offset;
577    e.length = req->length;
578    publish( msgs, &e );
579}
580
581static void
582fireClientGotData( tr_peermsgs * msgs,
583                   uint32_t      length,
584                   int           wasPieceData )
585{
586    tr_peer_event e = blankEvent;
587
588    e.length = length;
589    e.eventType = TR_PEER_CLIENT_GOT_DATA;
590    e.wasPieceData = wasPieceData;
591    publish( msgs, &e );
592}
593
594static void
595fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
596{
597    tr_peer_event e = blankEvent;
598    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
599    e.pieceIndex = pieceIndex;
600    publish( msgs, &e );
601}
602
603static void
604fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
605{
606    tr_peer_event e = blankEvent;
607    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
608    e.pieceIndex = pieceIndex;
609    publish( msgs, &e );
610}
611
612static void
613firePeerGotData( tr_peermsgs  * msgs,
614                 uint32_t       length,
615                 int            wasPieceData )
616{
617    tr_peer_event e = blankEvent;
618
619    e.length = length;
620    e.eventType = TR_PEER_PEER_GOT_DATA;
621    e.wasPieceData = wasPieceData;
622
623    publish( msgs, &e );
624}
625
626static void
627fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
628{
629    tr_peer_event e = blankEvent;
630    e.eventType = TR_PEER_CANCEL;
631    e.pieceIndex = req->index;
632    e.offset = req->offset;
633    e.length = req->length;
634    publish( msgs, &e );
635}
636
637/**
638***  ALLOWED FAST SET
639***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
640**/
641
642size_t
643tr_generateAllowedSet( tr_piece_index_t * setmePieces,
644                       size_t             desiredSetSize,
645                       size_t             pieceCount,
646                       const uint8_t    * infohash,
647                       const tr_address * addr )
648{
649    size_t setSize = 0;
650
651    assert( setmePieces );
652    assert( desiredSetSize <= pieceCount );
653    assert( desiredSetSize );
654    assert( pieceCount );
655    assert( infohash );
656    assert( addr );
657
658    if( addr->type == TR_AF_INET )
659    {
660        uint8_t w[SHA_DIGEST_LENGTH + 4];
661        uint8_t x[SHA_DIGEST_LENGTH];
662
663        *(uint32_t*)w = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
664        memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );                /* (2) */
665        tr_sha1( x, w, sizeof( w ), NULL );                          /* (3) */
666
667        while( setSize<desiredSetSize )
668        {
669            int i;
670            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
671            {
672                size_t k;
673                uint32_t j = i * 4;                                  /* (5) */
674                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
675                uint32_t index = y % pieceCount;                     /* (7) */
676
677                for( k=0; k<setSize; ++k )                           /* (8) */
678                    if( setmePieces[k] == index )
679                        break;
680
681                if( k == setSize )
682                    setmePieces[setSize++] = index;                  /* (9) */
683            }
684
685            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
686        }
687    }
688
689    return setSize;
690}
691
692static void
693updateFastSet( tr_peermsgs * msgs UNUSED )
694{
695#if 0
696    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
697    const int peerIsNeedy = msgs->peer->progress < 0.10;
698
699    if( fext && peerIsNeedy && !msgs->haveFastSet )
700    {
701        size_t i;
702        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
703        const tr_info * inf = &msgs->torrent->info;
704        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
705
706        /* build the fast set */
707        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
708        msgs->haveFastSet = 1;
709
710        /* send it to the peer */
711        for( i=0; i<msgs->fastsetSize; ++i )
712            protocolSendAllowedFast( msgs, msgs->fastset[i] );
713    }
714#endif
715}
716
717/**
718***  INTEREST
719**/
720
721static int
722isPieceInteresting( const tr_peermsgs * msgs,
723                    tr_piece_index_t    piece )
724{
725    const tr_torrent * torrent = msgs->torrent;
726
727    return ( !torrent->info.pieces[piece].dnd )                 /* we want it */
728          && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have */
729          && ( tr_bitfieldHas( msgs->peer->have, piece ) );    /* peer has it */
730}
731
732/* "interested" means we'll ask for piece data if they unchoke us */
733static int
734isPeerInteresting( const tr_peermsgs * msgs )
735{
736    tr_piece_index_t    i;
737    const tr_torrent *  torrent;
738    const tr_bitfield * bitfield;
739    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
740
741    if( clientIsSeed )
742        return FALSE;
743
744    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
745        return FALSE;
746
747    torrent = msgs->torrent;
748    bitfield = tr_cpPieceBitfield( torrent->completion );
749
750    if( !msgs->peer->have )
751        return TRUE;
752
753    assert( bitfield->byteCount == msgs->peer->have->byteCount );
754
755    for( i = 0; i < torrent->info.pieceCount; ++i )
756        if( isPieceInteresting( msgs, i ) )
757            return TRUE;
758
759    return FALSE;
760}
761
762static void
763sendInterest( tr_peermsgs * msgs,
764              int           weAreInterested )
765{
766    struct evbuffer * out = msgs->outMessages;
767
768    assert( msgs );
769    assert( weAreInterested == 0 || weAreInterested == 1 );
770
771    msgs->peer->clientIsInterested = weAreInterested;
772    dbgmsg( msgs, "Sending %s", weAreInterested ? "Interested" : "Not Interested" );
773    tr_peerIoWriteUint32( msgs->peer->io, out, sizeof( uint8_t ) );
774    tr_peerIoWriteUint8 ( msgs->peer->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
775    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
776    dbgOutMessageLen( msgs );
777}
778
779static void
780updateInterest( tr_peermsgs * msgs )
781{
782    const int i = isPeerInteresting( msgs );
783
784    if( i != msgs->peer->clientIsInterested )
785        sendInterest( msgs, i );
786    if( i )
787        fireNeedReq( msgs );
788}
789
790static int
791popNextRequest( tr_peermsgs *         msgs,
792                struct peer_request * setme )
793{
794    return reqListPop( &msgs->peerAskedFor, setme );
795}
796
797static void
798cancelAllRequestsToClient( tr_peermsgs * msgs )
799{
800    struct peer_request req;
801    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
802
803    while( popNextRequest( msgs, &req ) )
804        if( mustSendCancel )
805            protocolSendReject( msgs, &req );
806}
807
808void
809tr_peerMsgsSetChoke( tr_peermsgs * msgs,
810                     int           choke )
811{
812    const time_t now = time( NULL );
813    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
814
815    assert( msgs );
816    assert( msgs->peer );
817    assert( choke == 0 || choke == 1 );
818
819    if( msgs->peer->chokeChangedAt > fibrillationTime )
820    {
821        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
822    }
823    else if( msgs->peer->peerIsChoked != choke )
824    {
825        msgs->peer->peerIsChoked = choke;
826        if( choke )
827            cancelAllRequestsToClient( msgs );
828        protocolSendChoke( msgs, choke );
829        msgs->peer->chokeChangedAt = now;
830    }
831}
832
833/**
834***
835**/
836
837void
838tr_peerMsgsHave( tr_peermsgs * msgs,
839                 uint32_t      index )
840{
841    protocolSendHave( msgs, index );
842
843    /* since we have more pieces now, we might not be interested in this peer */
844    updateInterest( msgs );
845}
846
847/**
848***
849**/
850
851static int
852reqIsValid( const tr_peermsgs * peer,
853            uint32_t            index,
854            uint32_t            offset,
855            uint32_t            length )
856{
857    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
858}
859
860static int
861requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
862{
863    return reqIsValid( msgs, req->index, req->offset, req->length );
864}
865
866static void
867expireOldRequests( tr_peermsgs * msgs, const time_t now  )
868{
869    int i;
870    time_t oldestAllowed;
871    struct request_list tmp = REQUEST_LIST_INIT;
872    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
873    dbgmsg( msgs, "entering `expire old requests' block" );
874
875    /* cancel requests that have been queued for too long */
876    oldestAllowed = now - QUEUED_REQUEST_TTL_SECS;
877    reqListCopy( &tmp, &msgs->clientWillAskFor );
878    for( i = 0; i < tmp.count; ++i ) {
879        const struct peer_request * req = &tmp.requests[i];
880        if( req->time_requested < oldestAllowed )
881            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
882    }
883    reqListClear( &tmp );
884
885    /* if the peer doesn't support "Reject Request",
886     * cancel requests that were sent too long ago. */
887    if( !fext ) {
888        oldestAllowed = now - SENT_REQUEST_TTL_SECS;
889        reqListCopy( &tmp, &msgs->clientAskedFor );
890        for( i=0; i<tmp.count; ++i ) {
891            const struct peer_request * req = &tmp.requests[i];
892            if( req->time_requested < oldestAllowed )
893                tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
894        }
895        reqListClear( &tmp );
896    }
897
898    dbgmsg( msgs, "leaving `expire old requests' block" );
899}
900
901static void
902pumpRequestQueue( tr_peermsgs * msgs, const time_t now )
903{
904    const int           max = msgs->maxActiveRequests;
905    int                 sent = 0;
906    int                 count = msgs->clientAskedFor.count;
907    struct peer_request req;
908
909    if( msgs->peer->clientIsChoked )
910        return;
911    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
912        return;
913
914    while( ( count < max ) && reqListPop( &msgs->clientWillAskFor, &req ) )
915    {
916        const tr_block_index_t block = _tr_block( msgs->torrent, req.index, req.offset );
917
918        assert( requestIsValid( msgs, &req ) );
919        assert( tr_bitfieldHas( msgs->peer->have, req.index ) );
920
921        /* don't ask for it if we've already got it... this block may have
922         * come in from a different peer after we cancelled a request for it */
923        if( !tr_cpBlockIsComplete( msgs->torrent->completion, block ) )
924        {
925            protocolSendRequest( msgs, &req );
926            req.time_requested = now;
927            reqListAppend( &msgs->clientAskedFor, &req );
928
929            ++count;
930            ++sent;
931        }
932    }
933
934    if( sent )
935        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
936                sent,
937                msgs->clientAskedFor.count,
938                msgs->clientWillAskFor.count );
939
940    if( count < max )
941        fireNeedReq( msgs );
942}
943
944static int
945requestQueueIsFull( const tr_peermsgs * msgs )
946{
947    const int req_max = msgs->maxActiveRequests;
948    return msgs->clientWillAskFor.count >= req_max;
949}
950
951tr_addreq_t
952tr_peerMsgsAddRequest( tr_peermsgs *    msgs,
953                       uint32_t         index,
954                       uint32_t         offset,
955                       uint32_t         length,
956                       int              doForce )
957{
958    struct peer_request req;
959
960    assert( msgs );
961    assert( msgs->torrent );
962    assert( reqIsValid( msgs, index, offset, length ) );
963
964    /**
965    ***  Reasons to decline the request
966    **/
967
968    /* don't send requests to choked clients */
969    if( !doForce && msgs->peer->clientIsChoked ) {
970        dbgmsg( msgs, "declining request because they're choking us" );
971        return TR_ADDREQ_CLIENT_CHOKED;
972    }
973
974    /* peer doesn't have this piece */
975    if( !doForce && !tr_bitfieldHas( msgs->peer->have, index ) )
976        return TR_ADDREQ_MISSING;
977
978    /* peer's queue is full */
979    if( !doForce && requestQueueIsFull( msgs ) ) {
980        dbgmsg( msgs, "declining request because we're full" );
981        return TR_ADDREQ_FULL;
982    }
983
984    /* have we already asked for this piece? */
985    req.index = index;
986    req.offset = offset;
987    req.length = length;
988    if( !doForce ) {
989        if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
990            dbgmsg( msgs, "declining because it's a duplicate" );
991            return TR_ADDREQ_DUPLICATE;
992        }
993        if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
994            dbgmsg( msgs, "declining because it's a duplicate" );
995            return TR_ADDREQ_DUPLICATE;
996        }
997    }
998
999    /**
1000    ***  Accept this request
1001    **/
1002
1003    dbgmsg( msgs, "adding req for %"PRIu32":%"PRIu32"->%"PRIu32" to our `will request' list",
1004            index, offset, length );
1005    req.time_requested = time( NULL );
1006    reqListAppend( &msgs->clientWillAskFor, &req );
1007    return TR_ADDREQ_OK;
1008}
1009
1010static void
1011cancelAllRequestsToPeer( tr_peermsgs * msgs, tr_bool sendCancel )
1012{
1013    int i;
1014    struct request_list a = msgs->clientWillAskFor;
1015    struct request_list b = msgs->clientAskedFor;
1016    dbgmsg( msgs, "cancelling all requests to peer" );
1017
1018    msgs->clientAskedFor = REQUEST_LIST_INIT;
1019    msgs->clientWillAskFor = REQUEST_LIST_INIT;
1020
1021    for( i=0; i<a.count; ++i )
1022        fireCancelledReq( msgs, &a.requests[i] );
1023
1024    for( i = 0; i < b.count; ++i ) {
1025        fireCancelledReq( msgs, &b.requests[i] );
1026        if( sendCancel )
1027            protocolSendCancel( msgs, &b.requests[i] );
1028    }
1029
1030    reqListClear( &a );
1031    reqListClear( &b );
1032}
1033
1034void
1035tr_peerMsgsCancel( tr_peermsgs * msgs,
1036                   uint32_t      pieceIndex,
1037                   uint32_t      offset,
1038                   uint32_t      length )
1039{
1040    struct peer_request req;
1041
1042    assert( msgs != NULL );
1043    assert( length > 0 );
1044
1045
1046    /* have we asked the peer for this piece? */
1047    req.index = pieceIndex;
1048    req.offset = offset;
1049    req.length = length;
1050
1051    /* if it's only in the queue and hasn't been sent yet, free it */
1052    if( reqListRemove( &msgs->clientWillAskFor, &req ) ) {
1053        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length );
1054        fireCancelledReq( msgs, &req );
1055    }
1056
1057    /* if it's already been sent, send a cancel message too */
1058    if( reqListRemove( &msgs->clientAskedFor, &req ) ) {
1059        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length );
1060        protocolSendCancel( msgs, &req );
1061        fireCancelledReq( msgs, &req );
1062    }
1063}
1064
1065
1066/**
1067***
1068**/
1069
1070static void
1071sendLtepHandshake( tr_peermsgs * msgs )
1072{
1073    tr_benc val, *m;
1074    char * buf;
1075    int len;
1076    int pex;
1077    struct evbuffer * out = msgs->outMessages;
1078
1079    if( msgs->clientSentLtepHandshake )
1080        return;
1081
1082    dbgmsg( msgs, "sending an ltep handshake" );
1083    msgs->clientSentLtepHandshake = 1;
1084
1085    /* decide if we want to advertise pex support */
1086    if( !tr_torrentAllowsPex( msgs->torrent ) )
1087        pex = 0;
1088    else if( msgs->peerSentLtepHandshake )
1089        pex = msgs->peerSupportsPex ? 1 : 0;
1090    else
1091        pex = 1;
1092
1093    tr_bencInitDict( &val, 5 );
1094    tr_bencDictAddInt( &val, "e", msgs->session->encryptionMode != TR_CLEAR_PREFERRED );
1095    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->session ) );
1096    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
1097    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
1098    m  = tr_bencDictAddDict( &val, "m", 1 );
1099    if( pex )
1100        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
1101    buf = tr_bencSave( &val, &len );
1102
1103    tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + len );
1104    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
1105    tr_peerIoWriteUint8 ( msgs->peer->io, out, LTEP_HANDSHAKE );
1106    tr_peerIoWriteBytes ( msgs->peer->io, out, buf, len );
1107    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1108    dbgOutMessageLen( msgs );
1109
1110    /* cleanup */
1111    tr_bencFree( &val );
1112    tr_free( buf );
1113}
1114
1115static void
1116parseLtepHandshake( tr_peermsgs *     msgs,
1117                    int               len,
1118                    struct evbuffer * inbuf )
1119{
1120    int64_t   i;
1121    tr_benc   val, * sub;
1122    uint8_t * tmp = tr_new( uint8_t, len );
1123
1124    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
1125    msgs->peerSentLtepHandshake = 1;
1126
1127    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
1128    {
1129        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
1130        tr_free( tmp );
1131        return;
1132    }
1133
1134    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
1135
1136    /* does the peer prefer encrypted connections? */
1137    if( tr_bencDictFindInt( &val, "e", &i ) )
1138        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1139                                              : ENCRYPTION_PREFERENCE_NO;
1140
1141    /* check supported messages for utorrent pex */
1142    msgs->peerSupportsPex = 0;
1143    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
1144        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
1145            msgs->ut_pex_id = (uint8_t) i;
1146            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
1147            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
1148        }
1149    }
1150
1151    /* look for upload_only (BEP 21) */
1152    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
1153        fireUploadOnly( msgs, i!=0 );
1154
1155    /* get peer's listening port */
1156    if( tr_bencDictFindInt( &val, "p", &i ) ) {
1157        msgs->peer->port = htons( (uint16_t)i );
1158        dbgmsg( msgs, "msgs->port is now %hu", msgs->peer->port );
1159    }
1160
1161    /* get peer's maximum request queue size */
1162    if( tr_bencDictFindInt( &val, "reqq", &i ) )
1163        msgs->reqq = i;
1164
1165    tr_bencFree( &val );
1166    tr_free( tmp );
1167}
1168
1169static void
1170parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1171{
1172    int loaded = 0;
1173    uint8_t * tmp = tr_new( uint8_t, msglen );
1174    tr_benc val;
1175    const tr_torrent * tor = msgs->torrent;
1176    const uint8_t * added;
1177    size_t added_len;
1178
1179    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1180
1181    if( tr_torrentAllowsPex( tor )
1182      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1183    {
1184        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1185        {
1186            const uint8_t * added_f = NULL;
1187            tr_pex *        pex;
1188            size_t          i, n;
1189            size_t          added_f_len = 0;
1190            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1191            pex =
1192                tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len,
1193                                        &n );
1194            for( i = 0; i < n; ++i )
1195                tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1196                                  TR_PEER_FROM_PEX, pex + i );
1197            tr_free( pex );
1198        }
1199       
1200        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1201        {
1202            const uint8_t * added_f = NULL;
1203            tr_pex *        pex;
1204            size_t          i, n;
1205            size_t          added_f_len = 0;
1206            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1207            pex =
1208                tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len,
1209                                         &n );
1210            for( i = 0; i < n; ++i )
1211                tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1212                                  TR_PEER_FROM_PEX, pex + i );
1213            tr_free( pex );
1214        }
1215       
1216    }
1217
1218    if( loaded )
1219        tr_bencFree( &val );
1220    tr_free( tmp );
1221}
1222
1223static void sendPex( tr_peermsgs * msgs );
1224
1225static void
1226parseLtep( tr_peermsgs *     msgs,
1227           int               msglen,
1228           struct evbuffer * inbuf )
1229{
1230    uint8_t ltep_msgid;
1231
1232    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1233    msglen--;
1234
1235    if( ltep_msgid == LTEP_HANDSHAKE )
1236    {
1237        dbgmsg( msgs, "got ltep handshake" );
1238        parseLtepHandshake( msgs, msglen, inbuf );
1239        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1240        {
1241            sendLtepHandshake( msgs );
1242            sendPex( msgs );
1243        }
1244    }
1245    else if( ltep_msgid == TR_LTEP_PEX )
1246    {
1247        dbgmsg( msgs, "got ut pex" );
1248        msgs->peerSupportsPex = 1;
1249        parseUtPex( msgs, msglen, inbuf );
1250    }
1251    else
1252    {
1253        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1254        evbuffer_drain( inbuf, msglen );
1255    }
1256}
1257
1258static int
1259readBtLength( tr_peermsgs *     msgs,
1260              struct evbuffer * inbuf,
1261              size_t            inlen )
1262{
1263    uint32_t len;
1264
1265    if( inlen < sizeof( len ) )
1266        return READ_LATER;
1267
1268    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1269
1270    if( len == 0 ) /* peer sent us a keepalive message */
1271        dbgmsg( msgs, "got KeepAlive" );
1272    else
1273    {
1274        msgs->incoming.length = len;
1275        msgs->state = AWAITING_BT_ID;
1276    }
1277
1278    return READ_NOW;
1279}
1280
1281static int readBtMessage( tr_peermsgs *     msgs,
1282                          struct evbuffer * inbuf,
1283                          size_t            inlen );
1284
1285static int
1286readBtId( tr_peermsgs *     msgs,
1287          struct evbuffer * inbuf,
1288          size_t            inlen )
1289{
1290    uint8_t id;
1291
1292    if( inlen < sizeof( uint8_t ) )
1293        return READ_LATER;
1294
1295    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1296    msgs->incoming.id = id;
1297
1298    if( id == BT_PIECE )
1299    {
1300        msgs->state = AWAITING_BT_PIECE;
1301        return READ_NOW;
1302    }
1303    else if( msgs->incoming.length != 1 )
1304    {
1305        msgs->state = AWAITING_BT_MESSAGE;
1306        return READ_NOW;
1307    }
1308    else return readBtMessage( msgs, inbuf, inlen - 1 );
1309}
1310
1311static void
1312updatePeerProgress( tr_peermsgs * msgs )
1313{
1314    msgs->peer->progress = tr_bitfieldCountTrueBits( msgs->peer->have ) / (float)msgs->torrent->info.pieceCount;
1315    dbgmsg( msgs, "peer progress is %f", msgs->peer->progress );
1316    updateFastSet( msgs );
1317    updateInterest( msgs );
1318    firePeerProgress( msgs );
1319}
1320
1321static void
1322peerMadeRequest( tr_peermsgs *               msgs,
1323                 const struct peer_request * req )
1324{
1325    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1326    const int reqIsValid = requestIsValid( msgs, req );
1327    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
1328    const int peerIsChoked = msgs->peer->peerIsChoked;
1329
1330    int allow = FALSE;
1331
1332    if( !reqIsValid )
1333        dbgmsg( msgs, "rejecting an invalid request." );
1334    else if( !clientHasPiece )
1335        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1336    else if( peerIsChoked )
1337        dbgmsg( msgs, "rejecting request from choked peer" );
1338    else
1339        allow = TRUE;
1340
1341    if( allow )
1342        reqListAppend( &msgs->peerAskedFor, req );
1343    else if( fext )
1344        protocolSendReject( msgs, req );
1345}
1346
1347static int
1348messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1349{
1350    switch( id )
1351    {
1352        case BT_CHOKE:
1353        case BT_UNCHOKE:
1354        case BT_INTERESTED:
1355        case BT_NOT_INTERESTED:
1356        case BT_FEXT_HAVE_ALL:
1357        case BT_FEXT_HAVE_NONE:
1358            return len == 1;
1359
1360        case BT_HAVE:
1361        case BT_FEXT_SUGGEST:
1362        case BT_FEXT_ALLOWED_FAST:
1363            return len == 5;
1364
1365        case BT_BITFIELD:
1366            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1367
1368        case BT_REQUEST:
1369        case BT_CANCEL:
1370        case BT_FEXT_REJECT:
1371            return len == 13;
1372
1373        case BT_PIECE:
1374            return len > 9 && len <= 16393;
1375
1376        case BT_PORT:
1377            return len == 3;
1378
1379        case BT_LTEP:
1380            return len >= 2;
1381
1382        default:
1383            return FALSE;
1384    }
1385}
1386
1387static int clientGotBlock( tr_peermsgs *               msgs,
1388                           const uint8_t *             block,
1389                           const struct peer_request * req );
1390
1391static int
1392readBtPiece( tr_peermsgs      * msgs,
1393             struct evbuffer  * inbuf,
1394             size_t             inlen,
1395             size_t           * setme_piece_bytes_read )
1396{
1397    struct peer_request * req = &msgs->incoming.blockReq;
1398
1399    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1400    dbgmsg( msgs, "In readBtPiece" );
1401
1402    if( !req->length )
1403    {
1404        if( inlen < 8 )
1405            return READ_LATER;
1406
1407        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1408        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1409        req->length = msgs->incoming.length - 9;
1410        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1411        return READ_NOW;
1412    }
1413    else
1414    {
1415        int err;
1416
1417        /* read in another chunk of data */
1418        const size_t nLeft = req->length - EVBUFFER_LENGTH( msgs->incoming.block );
1419        size_t n = MIN( nLeft, inlen );
1420        uint8_t * buf = tr_new( uint8_t, n );
1421        assert( EVBUFFER_LENGTH( inbuf ) >= n );
1422        tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, n );
1423        evbuffer_add( msgs->incoming.block, buf, n );
1424        fireClientGotData( msgs, n, TRUE );
1425        *setme_piece_bytes_read += n;
1426        tr_free( buf );
1427        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1428               n, req->index, req->offset, req->length,
1429               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1430        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1431            return READ_LATER;
1432
1433        /* we've got the whole block ... process it */
1434        err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), req );
1435
1436        /* cleanup */
1437        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH( msgs->incoming.block ) );
1438        req->length = 0;
1439        msgs->state = AWAITING_BT_LENGTH;
1440        if( !err )
1441            return READ_NOW;
1442        else {
1443            fireError( msgs, err );
1444            return READ_ERR;
1445        }
1446    }
1447}
1448
1449static int
1450readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1451{
1452    uint32_t      ui32;
1453    uint32_t      msglen = msgs->incoming.length;
1454    const uint8_t id = msgs->incoming.id;
1455    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1456    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1457
1458    --msglen; /* id length */
1459
1460    if( inlen < msglen )
1461        return READ_LATER;
1462
1463    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1464
1465    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1466    {
1467        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1468        fireError( msgs, EMSGSIZE );
1469        return READ_ERR;
1470    }
1471
1472    switch( id )
1473    {
1474        case BT_CHOKE:
1475            dbgmsg( msgs, "got Choke" );
1476            msgs->peer->clientIsChoked = 1;
1477            if( !fext )
1478                cancelAllRequestsToPeer( msgs, FALSE );
1479            break;
1480
1481        case BT_UNCHOKE:
1482            dbgmsg( msgs, "got Unchoke" );
1483            msgs->peer->clientIsChoked = 0;
1484            fireNeedReq( msgs );
1485            break;
1486
1487        case BT_INTERESTED:
1488            dbgmsg( msgs, "got Interested" );
1489            msgs->peer->peerIsInterested = 1;
1490            break;
1491
1492        case BT_NOT_INTERESTED:
1493            dbgmsg( msgs, "got Not Interested" );
1494            msgs->peer->peerIsInterested = 0;
1495            break;
1496
1497        case BT_HAVE:
1498            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1499            dbgmsg( msgs, "got Have: %u", ui32 );
1500            if( tr_bitfieldAdd( msgs->peer->have, ui32 ) ) {
1501                fireError( msgs, ERANGE );
1502                return READ_ERR;
1503            }
1504            updatePeerProgress( msgs );
1505            tr_rcTransferred( msgs->torrent->swarmSpeed,
1506                              msgs->torrent->info.pieceSize );
1507            break;
1508
1509        case BT_BITFIELD:
1510        {
1511            dbgmsg( msgs, "got a bitfield" );
1512            msgs->peerSentBitfield = 1;
1513            tr_peerIoReadBytes( msgs->peer->io, inbuf, msgs->peer->have->bits, msglen );
1514            updatePeerProgress( msgs );
1515            fireNeedReq( msgs );
1516            break;
1517        }
1518
1519        case BT_REQUEST:
1520        {
1521            struct peer_request r;
1522            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1523            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1524            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1525            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1526            peerMadeRequest( msgs, &r );
1527            break;
1528        }
1529
1530        case BT_CANCEL:
1531        {
1532            struct peer_request r;
1533            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1534            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1535            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1536            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1537            if( reqListRemove( &msgs->peerAskedFor, &r ) && fext )
1538                protocolSendReject( msgs, &r );
1539            break;
1540        }
1541
1542        case BT_PIECE:
1543            assert( 0 ); /* handled elsewhere! */
1544            break;
1545
1546        case BT_PORT:
1547            dbgmsg( msgs, "Got a BT_PORT" );
1548            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->port );
1549            break;
1550
1551        case BT_FEXT_SUGGEST:
1552            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1553            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1554            if( fext )
1555                fireClientGotSuggest( msgs, ui32 );
1556            else {
1557                fireError( msgs, EMSGSIZE );
1558                return READ_ERR;
1559            }
1560            break;
1561
1562        case BT_FEXT_ALLOWED_FAST:
1563            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1564            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1565            if( fext )
1566                fireClientGotAllowedFast( msgs, ui32 );
1567            else {
1568                fireError( msgs, EMSGSIZE );
1569                return READ_ERR;
1570            }
1571            break;
1572
1573        case BT_FEXT_HAVE_ALL:
1574            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1575            if( fext ) {
1576                tr_bitfieldAddRange( msgs->peer->have, 0, msgs->torrent->info.pieceCount );
1577                updatePeerProgress( msgs );
1578            } else {
1579                fireError( msgs, EMSGSIZE );
1580                return READ_ERR;
1581            }
1582            break;
1583
1584
1585        case BT_FEXT_HAVE_NONE:
1586            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1587            if( fext ) {
1588                tr_bitfieldClear( msgs->peer->have );
1589                updatePeerProgress( msgs );
1590            } else {
1591                fireError( msgs, EMSGSIZE );
1592                return READ_ERR;
1593            }
1594            break;
1595
1596        case BT_FEXT_REJECT:
1597        {
1598            struct peer_request r;
1599            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1600            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1601            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1602            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1603            if( fext )
1604                reqListRemove( &msgs->clientAskedFor, &r );
1605            else {
1606                fireError( msgs, EMSGSIZE );
1607                return READ_ERR;
1608            }
1609            break;
1610        }
1611
1612        case BT_LTEP:
1613            dbgmsg( msgs, "Got a BT_LTEP" );
1614            parseLtep( msgs, msglen, inbuf );
1615            break;
1616
1617        default:
1618            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1619            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1620            break;
1621    }
1622
1623    assert( msglen + 1 == msgs->incoming.length );
1624    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1625
1626    msgs->state = AWAITING_BT_LENGTH;
1627    return READ_NOW;
1628}
1629
1630static void
1631decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount )
1632{
1633    tr_torrent * tor = msgs->torrent;
1634
1635    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1636}
1637
1638static void
1639clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1640{
1641    decrementDownloadedCount( msgs, req->length );
1642}
1643
1644static void
1645addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1646{
1647    if( !msgs->peer->blame )
1648         msgs->peer->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1649    tr_bitfieldAdd( msgs->peer->blame, index );
1650}
1651
1652/* returns 0 on success, or an errno on failure */
1653static int
1654clientGotBlock( tr_peermsgs *               msgs,
1655                const uint8_t *             data,
1656                const struct peer_request * req )
1657{
1658    int err;
1659    tr_torrent * tor = msgs->torrent;
1660    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1661
1662    assert( msgs );
1663    assert( req );
1664
1665    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1666        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1667                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1668        return EMSGSIZE;
1669    }
1670
1671    /* save the block */
1672    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1673
1674    /**
1675    *** Remove the block from our `we asked for this' list
1676    **/
1677
1678    if( !reqListRemove( &msgs->clientAskedFor, req ) ) {
1679        clientGotUnwantedBlock( msgs, req );
1680        dbgmsg( msgs, "we didn't ask for this message..." );
1681        return 0;
1682    }
1683
1684    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1685            msgs->clientAskedFor.count );
1686
1687    /**
1688    *** Error checks
1689    **/
1690
1691    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1692        dbgmsg( msgs, "we have this block already..." );
1693        clientGotUnwantedBlock( msgs, req );
1694        return 0;
1695    }
1696
1697    /**
1698    ***  Save the block
1699    **/
1700
1701    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1702        return err;
1703
1704    addPeerToBlamefield( msgs, req->index );
1705    fireGotBlock( msgs, req );
1706    return 0;
1707}
1708
1709static int peerPulse( void * vmsgs );
1710
1711static void
1712didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1713{
1714    tr_peermsgs * msgs = vmsgs;
1715    firePeerGotData( msgs, bytesWritten, wasPieceData );
1716    peerPulse( msgs );
1717}
1718
1719static ReadState
1720canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1721{
1722    ReadState         ret;
1723    tr_peermsgs *     msgs = vmsgs;
1724    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1725    const size_t      inlen = EVBUFFER_LENGTH( in );
1726
1727    if( !inlen )
1728    {
1729        ret = READ_LATER;
1730    }
1731    else if( msgs->state == AWAITING_BT_PIECE )
1732    {
1733        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1734    }
1735    else switch( msgs->state )
1736    {
1737        case AWAITING_BT_LENGTH:
1738            ret = readBtLength ( msgs, in, inlen ); break;
1739
1740        case AWAITING_BT_ID:
1741            ret = readBtId     ( msgs, in, inlen ); break;
1742
1743        case AWAITING_BT_MESSAGE:
1744            ret = readBtMessage( msgs, in, inlen ); break;
1745
1746        default:
1747            assert( 0 );
1748    }
1749
1750    /* log the raw data that was read */
1751    if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
1752        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1753
1754    return ret;
1755}
1756
1757/**
1758***
1759**/
1760
1761static int
1762ratePulse( void * vmsgs )
1763{
1764    tr_peermsgs * msgs = vmsgs;
1765    const double rateToClient = tr_peerGetPieceSpeed( msgs->peer, TR_PEER_TO_CLIENT );
1766    const int seconds = 10;
1767    const int floor = 8;
1768    const int estimatedBlocksInPeriod = ( rateToClient * seconds * 1024 ) / msgs->torrent->blockSize;
1769
1770    msgs->maxActiveRequests = floor + estimatedBlocksInPeriod;
1771
1772    if( msgs->reqq > 0 )
1773        msgs->maxActiveRequests = MIN( msgs->maxActiveRequests, msgs->reqq );
1774
1775    return TRUE;
1776}
1777
1778static size_t
1779fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1780{
1781    size_t bytesWritten = 0;
1782    struct peer_request req;
1783    const int haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1784    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1785
1786    /**
1787    ***  Protocol messages
1788    **/
1789
1790    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1791    {
1792        dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1793        msgs->outMessagesBatchedAt = now;
1794    }
1795    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1796    {
1797        const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
1798        /* flush the protocol messages */
1799        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1800        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, FALSE );
1801        msgs->clientSentAnythingAt = now;
1802        msgs->outMessagesBatchedAt = 0;
1803        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1804        bytesWritten +=  len;
1805    }
1806
1807    /**
1808    ***  Blocks
1809    **/
1810
1811    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io ) >= msgs->torrent->blockSize )
1812        && popNextRequest( msgs, &req ) )
1813    {
1814        if( requestIsValid( msgs, &req )
1815            && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) )
1816        {
1817            /* send a block */
1818            uint8_t * buf = tr_new( uint8_t, req.length );
1819            const int err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf );
1820            if( err ) {
1821                fireError( msgs, err );
1822                bytesWritten = 0;
1823                msgs = NULL;
1824            } else {
1825                tr_peerIo * io = msgs->peer->io;
1826                struct evbuffer * out = evbuffer_new( );
1827                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1828                tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1829                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1830                tr_peerIoWriteUint32( io, out, req.index );
1831                tr_peerIoWriteUint32( io, out, req.offset );
1832                tr_peerIoWriteBytes ( io, out, buf, req.length );
1833                tr_peerIoWriteBuf( io, out, TRUE );
1834                bytesWritten += EVBUFFER_LENGTH( out );
1835                evbuffer_free( out );
1836                msgs->clientSentAnythingAt = now;
1837            }
1838            tr_free( buf );
1839        }
1840        else if( fext ) /* peer needs a reject message */
1841        {
1842            protocolSendReject( msgs, &req );
1843        }
1844    }
1845
1846    /**
1847    ***  Keepalive
1848    **/
1849
1850    if( ( msgs != NULL )
1851        && ( msgs->clientSentAnythingAt != 0 )
1852        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1853    {
1854        dbgmsg( msgs, "sending a keepalive message" );
1855        tr_peerIoWriteUint32( msgs->peer->io, msgs->outMessages, 0 );
1856        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1857    }
1858
1859    return bytesWritten;
1860}
1861
1862static int
1863peerPulse( void * vmsgs )
1864{
1865    tr_peermsgs * msgs = vmsgs;
1866    const time_t  now = time( NULL );
1867
1868    ratePulse( msgs );
1869
1870    pumpRequestQueue( msgs, now );
1871    expireOldRequests( msgs, now );
1872
1873    for( ;; )
1874        if( fillOutputBuffer( msgs, now ) < 1 )
1875            break;
1876
1877    return TRUE; /* loop forever */
1878}
1879
1880void
1881tr_peerMsgsPulse( tr_peermsgs * msgs )
1882{
1883    if( msgs )
1884        peerPulse( msgs );
1885}
1886
1887static void
1888gotError( tr_peerIo  * io UNUSED,
1889          short        what,
1890          void       * vmsgs )
1891{
1892    if( what & EVBUFFER_TIMEOUT )
1893        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1894    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1895        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1896               what, errno, tr_strerror( errno ) );
1897    fireError( vmsgs, ENOTCONN );
1898}
1899
1900static void
1901sendBitfield( tr_peermsgs * msgs )
1902{
1903    struct evbuffer * out = msgs->outMessages;
1904    tr_bitfield *     field;
1905    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
1906    size_t            i;
1907    size_t            lazyCount = 0;
1908
1909    field = tr_bitfieldDup( tr_cpPieceBitfield( msgs->torrent->completion ) );
1910
1911    if( tr_sessionIsLazyBitfieldEnabled( msgs->session ) )
1912    {
1913        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
1914            speed over a truly random sample -- let's limit the pool size to
1915            the first 1000 pieces so large torrents don't bog things down */
1916        size_t poolSize;
1917        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
1918        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
1919
1920        /* build the pool */
1921        for( i=poolSize=0; i<maxPoolSize; ++i )
1922            if( tr_bitfieldHas( field, i ) )
1923                pool[poolSize++] = i;
1924
1925        /* pull random piece indices from the pool */
1926        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
1927        {
1928            const int pos = tr_cryptoWeakRandInt( poolSize );
1929            const tr_piece_index_t piece = pool[pos];
1930            tr_bitfieldRem( field, piece );
1931            lazyPieces[lazyCount++] = piece;
1932            pool[pos] = pool[--poolSize];
1933        }
1934
1935        /* cleanup */
1936        tr_free( pool );
1937    }
1938
1939    tr_peerIoWriteUint32( msgs->peer->io, out,
1940                          sizeof( uint8_t ) + field->byteCount );
1941    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_BITFIELD );
1942    tr_peerIoWriteBytes ( msgs->peer->io, out, field->bits, field->byteCount );
1943    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu",
1944            EVBUFFER_LENGTH( out ) );
1945    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1946
1947    for( i = 0; i < lazyCount; ++i )
1948        protocolSendHave( msgs, lazyPieces[i] );
1949
1950    tr_bitfieldFree( field );
1951}
1952
1953static void
1954tellPeerWhatWeHave( tr_peermsgs * msgs )
1955{
1956    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1957
1958    if( fext && ( tr_cpGetStatus( msgs->torrent->completion ) == TR_SEED ) )
1959    {
1960        protocolSendHaveAll( msgs );
1961    }
1962    else if( fext && ( tr_cpHaveValid( msgs->torrent->completion ) == 0 ) )
1963    {
1964        protocolSendHaveNone( msgs );
1965    }
1966    else
1967    {
1968        sendBitfield( msgs );
1969    }
1970}
1971
1972/**
1973***
1974**/
1975
1976/* some peers give us error messages if we send
1977   more than this many peers in a single pex message
1978   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1979#define MAX_PEX_ADDED 50
1980#define MAX_PEX_DROPPED 50
1981
1982typedef struct
1983{
1984    tr_pex *  added;
1985    tr_pex *  dropped;
1986    tr_pex *  elements;
1987    int       addedCount;
1988    int       droppedCount;
1989    int       elementCount;
1990}
1991PexDiffs;
1992
1993static void
1994pexAddedCb( void * vpex,
1995            void * userData )
1996{
1997    PexDiffs * diffs = userData;
1998    tr_pex *   pex = vpex;
1999
2000    if( diffs->addedCount < MAX_PEX_ADDED )
2001    {
2002        diffs->added[diffs->addedCount++] = *pex;
2003        diffs->elements[diffs->elementCount++] = *pex;
2004    }
2005}
2006
2007static void
2008pexDroppedCb( void * vpex,
2009              void * userData )
2010{
2011    PexDiffs * diffs = userData;
2012    tr_pex *   pex = vpex;
2013
2014    if( diffs->droppedCount < MAX_PEX_DROPPED )
2015    {
2016        diffs->dropped[diffs->droppedCount++] = *pex;
2017    }
2018}
2019
2020static void
2021pexElementCb( void * vpex,
2022              void * userData )
2023{
2024    PexDiffs * diffs = userData;
2025    tr_pex *   pex = vpex;
2026
2027    diffs->elements[diffs->elementCount++] = *pex;
2028}
2029
2030static void
2031sendPex( tr_peermsgs * msgs )
2032{
2033    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2034    {
2035        PexDiffs diffs;
2036        PexDiffs diffs6;
2037        tr_pex * newPex = NULL;
2038        tr_pex * newPex6 = NULL;
2039        const int newCount = tr_peerMgrGetPeers( msgs->session->peerMgr,
2040                                                 msgs->torrent->info.hash,
2041                                                 &newPex, TR_AF_INET );
2042        const int newCount6 = tr_peerMgrGetPeers( msgs->session->peerMgr,
2043                                                  msgs->torrent->info.hash,
2044                                                  &newPex6, TR_AF_INET6 );
2045
2046        /* build the diffs */
2047        diffs.added = tr_new( tr_pex, newCount );
2048        diffs.addedCount = 0;
2049        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2050        diffs.droppedCount = 0;
2051        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2052        diffs.elementCount = 0;
2053        tr_set_compare( msgs->pex, msgs->pexCount,
2054                        newPex, newCount,
2055                        tr_pexCompare, sizeof( tr_pex ),
2056                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2057        diffs6.added = tr_new( tr_pex, newCount6 );
2058        diffs6.addedCount = 0;
2059        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
2060        diffs6.droppedCount = 0;
2061        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
2062        diffs6.elementCount = 0;
2063        tr_set_compare( msgs->pex6, msgs->pexCount6,
2064                        newPex6, newCount6,
2065                        tr_pexCompare, sizeof( tr_pex ),
2066                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
2067        dbgmsg(
2068            msgs,
2069            "pex: old peer count %d, new peer count %d, added %d, removed %d",
2070            msgs->pexCount, newCount + newCount6,
2071            diffs.addedCount + diffs6.addedCount,
2072            diffs.droppedCount + diffs6.droppedCount );
2073
2074        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2075            !diffs6.droppedCount )
2076        {
2077            tr_free( diffs.elements );
2078            tr_free( diffs6.elements );
2079        }
2080        else
2081        {
2082            int  i;
2083            tr_benc val;
2084            char * benc;
2085            int bencLen;
2086            uint8_t * tmp, *walk;
2087            struct evbuffer * out = msgs->outMessages;
2088
2089            /* update peer */
2090            tr_free( msgs->pex );
2091            msgs->pex = diffs.elements;
2092            msgs->pexCount = diffs.elementCount;
2093            tr_free( msgs->pex6 );
2094            msgs->pex6 = diffs6.elements;
2095            msgs->pexCount6 = diffs6.elementCount;
2096
2097            /* build the pex payload */
2098            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
2099                                         * speed vs. likelihood? */
2100
2101            /* "added" */
2102            tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2103            for( i = 0; i < diffs.addedCount; ++i )
2104            {
2105                tr_suspectAddress( &diffs.added[i].addr, "pex" );
2106                memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2107                memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2108            }
2109            assert( ( walk - tmp ) == diffs.addedCount * 6 );
2110            tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2111            tr_free( tmp );
2112
2113            /* "added.f" */
2114            tmp = walk = tr_new( uint8_t, diffs.addedCount );
2115            for( i = 0; i < diffs.addedCount; ++i )
2116                *walk++ = diffs.added[i].flags;
2117            assert( ( walk - tmp ) == diffs.addedCount );
2118            tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2119            tr_free( tmp );
2120
2121            /* "dropped" */
2122            tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2123            for( i = 0; i < diffs.droppedCount; ++i )
2124            {
2125                memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2126                memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2127            }
2128            assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2129            tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2130            tr_free( tmp );
2131           
2132            /* "added6" */
2133            tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2134            for( i = 0; i < diffs6.addedCount; ++i )
2135            {
2136                tr_suspectAddress( &diffs6.added[i].addr, "pex6" );
2137                memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2138                walk += 16;
2139                memcpy( walk, &diffs6.added[i].port, 2 );
2140                walk += 2;
2141            }
2142            assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2143            tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2144            tr_free( tmp );
2145           
2146            /* "added6.f" */
2147            tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2148            for( i = 0; i < diffs6.addedCount; ++i )
2149                *walk++ = diffs6.added[i].flags;
2150            assert( ( walk - tmp ) == diffs6.addedCount );
2151            tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2152            tr_free( tmp );
2153           
2154            /* "dropped6" */
2155            tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2156            for( i = 0; i < diffs6.droppedCount; ++i )
2157            {
2158                memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2159                walk += 16;
2160                memcpy( walk, &diffs6.dropped[i].port, 2 );
2161                walk += 2;
2162            }
2163            assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2164            tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2165            tr_free( tmp );
2166
2167            /* write the pex message */
2168            benc = tr_bencSave( &val, &bencLen );
2169            tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + bencLen );
2170            tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
2171            tr_peerIoWriteUint8 ( msgs->peer->io, out, msgs->ut_pex_id );
2172            tr_peerIoWriteBytes ( msgs->peer->io, out, benc, bencLen );
2173            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2174            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
2175
2176            tr_free( benc );
2177            tr_bencFree( &val );
2178        }
2179
2180        /* cleanup */
2181        tr_free( diffs.added );
2182        tr_free( diffs.dropped );
2183        tr_free( newPex );
2184        tr_free( diffs6.added );
2185        tr_free( diffs6.dropped );
2186        tr_free( newPex6 );
2187
2188        msgs->clientSentPexAt = time( NULL );
2189    }
2190}
2191
2192static int
2193pexPulse( void * vpeer )
2194{
2195    sendPex( vpeer );
2196    return TRUE;
2197}
2198
2199/**
2200***
2201**/
2202
2203tr_peermsgs*
2204tr_peerMsgsNew( struct tr_torrent * torrent,
2205                struct tr_peer *    peer,
2206                tr_delivery_func    func,
2207                void *              userData,
2208                tr_publisher_tag *  setme )
2209{
2210    tr_peermsgs * m;
2211
2212    assert( peer );
2213    assert( peer->io );
2214
2215    m = tr_new0( tr_peermsgs, 1 );
2216    m->publisher = tr_publisherNew( );
2217    m->peer = peer;
2218    m->session = torrent->session;
2219    m->torrent = torrent;
2220    m->peer->clientIsChoked = 1;
2221    m->peer->peerIsChoked = 1;
2222    m->peer->clientIsInterested = 0;
2223    m->peer->peerIsInterested = 0;
2224    m->peer->have = tr_bitfieldNew( torrent->info.pieceCount );
2225    m->state = AWAITING_BT_LENGTH;
2226    m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL );
2227    m->outMessages = evbuffer_new( );
2228    m->outMessagesBatchedAt = 0;
2229    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2230    m->incoming.block = evbuffer_new( );
2231    m->peerAskedFor = REQUEST_LIST_INIT;
2232    m->clientAskedFor = REQUEST_LIST_INIT;
2233    m->clientWillAskFor = REQUEST_LIST_INIT;
2234    peer->msgs = m;
2235
2236    *setme = tr_publisherSubscribe( m->publisher, func, userData );
2237
2238    if( tr_peerIoSupportsLTEP( peer->io ) )
2239        sendLtepHandshake( m );
2240
2241    tellPeerWhatWeHave( m );
2242
2243    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2244    ratePulse( m );
2245
2246    return m;
2247}
2248
2249void
2250tr_peerMsgsFree( tr_peermsgs* msgs )
2251{
2252    if( msgs )
2253    {
2254        tr_timerFree( &msgs->pexTimer );
2255        tr_publisherFree( &msgs->publisher );
2256        reqListClear( &msgs->clientWillAskFor );
2257        reqListClear( &msgs->clientAskedFor );
2258        reqListClear( &msgs->peerAskedFor );
2259
2260        evbuffer_free( msgs->incoming.block );
2261        evbuffer_free( msgs->outMessages );
2262        tr_free( msgs->pex6 );
2263        tr_free( msgs->pex );
2264
2265        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2266        tr_free( msgs );
2267    }
2268}
2269
2270void
2271tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2272                        tr_publisher_tag tag )
2273{
2274    tr_publisherUnsubscribe( peer->publisher, tag );
2275}
2276
Note: See TracBrowser for help on using the repository browser.