source: trunk/libtransmission/peer-msgs.c @ 7476

Last change on this file since 7476 was 7476, checked in by charles, 12 years ago

(trunk libT) #include "session.h" cleanup from wereHamster

  • Property svn:keywords set to Date Rev Author Id
File size: 64.9 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 7476 2008-12-23 17:27:15Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <event.h>
21
22#include "transmission.h"
23#include "session.h"
24#include "bencode.h"
25#include "completion.h"
26#include "crypto.h"
27#include "inout.h"
28#ifdef WIN32
29#include "net.h" /* for ECONN */
30#endif
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-mgr-private.h"
34#include "peer-msgs.h"
35#include "ratecontrol.h"
36#include "stats.h"
37#include "torrent.h"
38#include "trevent.h"
39#include "utils.h"
40
41/**
42***
43**/
44
45enum
46{
47    BT_CHOKE                = 0,
48    BT_UNCHOKE              = 1,
49    BT_INTERESTED           = 2,
50    BT_NOT_INTERESTED       = 3,
51    BT_HAVE                 = 4,
52    BT_BITFIELD             = 5,
53    BT_REQUEST              = 6,
54    BT_PIECE                = 7,
55    BT_CANCEL               = 8,
56    BT_PORT                 = 9,
57
58    BT_FEXT_SUGGEST         = 13,
59    BT_FEXT_HAVE_ALL        = 14,
60    BT_FEXT_HAVE_NONE       = 15,
61    BT_FEXT_REJECT          = 16,
62    BT_FEXT_ALLOWED_FAST    = 17,
63
64    BT_LTEP                 = 20,
65
66    LTEP_HANDSHAKE          = 0,
67
68    TR_LTEP_PEX             = 1,
69
70
71
72    MIN_CHOKE_PERIOD_SEC    = ( 10 ),
73
74    /* idle seconds before we send a keepalive */
75    KEEPALIVE_INTERVAL_SECS = 100,
76
77    PEX_INTERVAL            = ( 90 * 1000 ), /* msec between sendPex() calls */
78
79    MAX_QUEUE_SIZE          = ( 100 ),
80
81    /* how long an unsent request can stay queued before it's returned
82       back to the peer-mgr's pool of requests */
83    QUEUED_REQUEST_TTL_SECS = 20,
84
85    /* how long a sent request can stay queued before it's returned
86       back to the peer-mgr's pool of requests */
87    SENT_REQUEST_TTL_SECS = 240,
88
89    /* used in lowering the outMessages queue period */
90    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
91    HIGH_PRIORITY_INTERVAL_SECS = 2,
92    LOW_PRIORITY_INTERVAL_SECS = 20,
93
94    /* number of pieces to remove from the bitfield when
95     * lazy bitfields are turned on */
96    LAZY_PIECE_COUNT = 26,
97
98    /* number of pieces we'll allow in our fast set */
99    MAX_FAST_SET_SIZE = 3
100};
101
102/**
103***  REQUEST MANAGEMENT
104**/
105
106enum
107{
108    AWAITING_BT_LENGTH,
109    AWAITING_BT_ID,
110    AWAITING_BT_MESSAGE,
111    AWAITING_BT_PIECE
112};
113
114struct peer_request
115{
116    uint32_t    index;
117    uint32_t    offset;
118    uint32_t    length;
119    time_t      time_requested;
120};
121
122static int
123compareRequest( const void * va, const void * vb )
124{
125    const struct peer_request * a = va;
126    const struct peer_request * b = vb;
127
128    if( a->index != b->index )
129        return a->index < b->index ? -1 : 1;
130
131    if( a->offset != b->offset )
132        return a->offset < b->offset ? -1 : 1;
133
134    if( a->length != b->length )
135        return a->length < b->length ? -1 : 1;
136
137    return 0;
138}
139
140struct request_list
141{
142    uint16_t               count;
143    uint16_t               max;
144    struct peer_request *  requests;
145};
146
147static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
148
149static void
150reqListReserve( struct request_list * list,
151                uint16_t              max )
152{
153    if( list->max < max )
154    {
155        list->max = max;
156        list->requests = tr_renew( struct peer_request,
157                                   list->requests,
158                                   list->max );
159    }
160}
161
162static void
163reqListClear( struct request_list * list )
164{
165    tr_free( list->requests );
166    *list = REQUEST_LIST_INIT;
167}
168
169static void
170reqListCopy( struct request_list * dest, const struct request_list * src )
171{
172    dest->count = dest->max = src->count;
173    dest->requests = tr_memdup( src->requests, dest->count * sizeof( struct peer_request ) );
174}
175
176static void
177reqListRemoveOne( struct request_list * list,
178                  int                   i )
179{
180    assert( 0 <= i && i < list->count );
181
182    memmove( &list->requests[i],
183             &list->requests[i + 1],
184             sizeof( struct peer_request ) * ( --list->count - i ) );
185}
186
187static void
188reqListAppend( struct request_list *       list,
189               const struct peer_request * req )
190{
191    if( ++list->count >= list->max )
192        reqListReserve( list, list->max + 8 );
193
194    list->requests[list->count - 1] = *req;
195}
196
197static int
198reqListPop( struct request_list * list,
199            struct peer_request * setme )
200{
201    int success;
202
203    if( !list->count )
204        success = FALSE;
205    else {
206        *setme = list->requests[0];
207        reqListRemoveOne( list, 0 );
208        success = TRUE;
209    }
210
211    return success;
212}
213
214static int
215reqListFind( struct request_list *       list,
216             const struct peer_request * key )
217{
218    uint16_t i;
219
220    for( i = 0; i < list->count; ++i )
221        if( !compareRequest( key, list->requests + i ) )
222            return i;
223
224    return -1;
225}
226
227static int
228reqListRemove( struct request_list *       list,
229               const struct peer_request * key )
230{
231    int success;
232    const int i = reqListFind( list, key );
233
234    if( i < 0 )
235        success = FALSE;
236    else {
237        reqListRemoveOne( list, i );
238        success = TRUE;
239    }
240
241    return success;
242}
243
244/**
245***
246**/
247
248/* this is raw, unchanged data from the peer regarding
249 * the current message that it's sending us. */
250struct tr_incoming
251{
252    uint8_t                id;
253    uint32_t               length; /* includes the +1 for id length */
254    struct peer_request    blockReq; /* metadata for incoming blocks */
255    struct evbuffer *      block; /* piece data for incoming blocks */
256};
257
258/**
259 * Low-level communication state information about a connected peer.
260 *
261 * This structure remembers the low-level protocol states that we're
262 * in with this peer, such as active requests, pex messages, and so on.
263 * Its fields are all private to peer-msgs.c.
264 *
265 * Data not directly involved with sending & receiving messages is
266 * stored in tr_peer, where it can be accessed by both peermsgs and
267 * the peer manager.
268 *
269 * @see struct peer_atom
270 * @see tr_peer
271 */
272struct tr_peermsgs
273{
274    tr_bool         peerSentBitfield;
275    tr_bool         peerSupportsPex;
276    tr_bool         clientSentLtepHandshake;
277    tr_bool         peerSentLtepHandshake;
278    tr_bool         haveFastSet;
279
280    uint8_t         state;
281    uint8_t         ut_pex_id;
282    uint16_t        pexCount;
283    uint16_t        pexCount6;
284    uint16_t        maxActiveRequests;
285
286    size_t                 fastsetSize;
287    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
288
289    /* how long the outMessages batch should be allowed to grow before
290     * it's flushed -- some messages (like requests >:) should be sent
291     * very quickly; others aren't as urgent. */
292    int                    outMessagesBatchPeriod;
293
294    tr_peer *              peer;
295
296    tr_session *           session;
297    tr_torrent *           torrent;
298
299    tr_publisher_t *       publisher;
300
301    struct evbuffer *      outMessages; /* all the non-piece messages */
302
303    struct request_list    peerAskedFor;
304    struct request_list    clientAskedFor;
305    struct request_list    clientWillAskFor;
306
307    tr_timer             * pexTimer;
308    tr_pex               * pex;
309    tr_pex               * pex6;
310
311    time_t                 clientSentPexAt;
312    time_t                 clientSentAnythingAt;
313
314    /* when we started batching the outMessages */
315    time_t                outMessagesBatchedAt;
316
317    struct tr_incoming    incoming;
318
319    /* if the peer supports the Extension Protocol in BEP 10 and
320       supplied a reqq argument, it's stored here.  otherwise the
321       value is zero and should be ignored. */
322    int64_t               reqq;
323};
324
325/**
326***
327**/
328
329static void
330myDebug( const char * file, int line,
331         const struct tr_peermsgs * msgs,
332         const char * fmt, ... )
333{
334    FILE * fp = tr_getLog( );
335
336    if( fp )
337    {
338        va_list           args;
339        char              timestr[64];
340        struct evbuffer * buf = evbuffer_new( );
341        char *            base = tr_basename( file );
342
343        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
344                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
345                             msgs->torrent->info.name,
346                             tr_peerIoGetAddrStr( msgs->peer->io ),
347                             msgs->peer->client );
348        va_start( args, fmt );
349        evbuffer_add_vprintf( buf, fmt, args );
350        va_end( args );
351        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
352        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
353
354        tr_free( base );
355        evbuffer_free( buf );
356    }
357}
358
359#define dbgmsg( msgs, ... ) \
360    do { \
361        if( tr_deepLoggingIsActive( ) ) \
362            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
363    } while( 0 )
364
365/**
366***
367**/
368
369static void
370pokeBatchPeriod( tr_peermsgs * msgs,
371                 int           interval )
372{
373    if( msgs->outMessagesBatchPeriod > interval )
374    {
375        msgs->outMessagesBatchPeriod = interval;
376        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
377    }
378}
379
380static void
381dbgOutMessageLen( tr_peermsgs * msgs )
382{
383    dbgmsg( msgs, "outMessage size is now %zu", EVBUFFER_LENGTH( msgs->outMessages ) );
384}
385
386static void
387protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
388{
389    tr_peerIo       * io  = msgs->peer->io;
390    struct evbuffer * out = msgs->outMessages;
391
392    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
393
394    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
395    tr_peerIoWriteUint8 ( io, out, BT_FEXT_REJECT );
396    tr_peerIoWriteUint32( io, out, req->index );
397    tr_peerIoWriteUint32( io, out, req->offset );
398    tr_peerIoWriteUint32( io, out, req->length );
399
400    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
401    dbgOutMessageLen( msgs );
402}
403
404static void
405protocolSendRequest( tr_peermsgs               * msgs,
406                     const struct peer_request * req )
407{
408    tr_peerIo       * io  = msgs->peer->io;
409    struct evbuffer * out = msgs->outMessages;
410
411    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
412    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
413    tr_peerIoWriteUint32( io, out, req->index );
414    tr_peerIoWriteUint32( io, out, req->offset );
415    tr_peerIoWriteUint32( io, out, req->length );
416
417    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
418    dbgOutMessageLen( msgs );
419    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
420}
421
422static void
423protocolSendCancel( tr_peermsgs               * msgs,
424                    const struct peer_request * req )
425{
426    tr_peerIo       * io  = msgs->peer->io;
427    struct evbuffer * out = msgs->outMessages;
428
429    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
430    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
431    tr_peerIoWriteUint32( io, out, req->index );
432    tr_peerIoWriteUint32( io, out, req->offset );
433    tr_peerIoWriteUint32( io, out, req->length );
434
435    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
436    dbgOutMessageLen( msgs );
437    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
438}
439
440static void
441protocolSendHave( tr_peermsgs * msgs,
442                  uint32_t      index )
443{
444    tr_peerIo       * io  = msgs->peer->io;
445    struct evbuffer * out = msgs->outMessages;
446
447    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
448    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
449    tr_peerIoWriteUint32( io, out, index );
450
451    dbgmsg( msgs, "sending Have %u", index );
452    dbgOutMessageLen( msgs );
453    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
454}
455
456#if 0
457static void
458protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
459{
460    tr_peerIo       * io  = msgs->peer->io;
461    struct evbuffer * out = msgs->outMessages;
462
463    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
464
465    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
466    tr_peerIoWriteUint8 ( io, out, BT_FEXT_ALLOWED_FAST );
467    tr_peerIoWriteUint32( io, out, pieceIndex );
468
469    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
470    dbgOutMessageLen( msgs );
471}
472#endif
473
474static void
475protocolSendChoke( tr_peermsgs * msgs,
476                   int           choke )
477{
478    tr_peerIo       * io  = msgs->peer->io;
479    struct evbuffer * out = msgs->outMessages;
480
481    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
482    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
483
484    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
485    dbgOutMessageLen( msgs );
486    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
487}
488
489static void
490protocolSendHaveAll( tr_peermsgs * msgs )
491{
492    tr_peerIo       * io  = msgs->peer->io;
493    struct evbuffer * out = msgs->outMessages;
494
495    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
496
497    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
498    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_ALL );
499
500    dbgmsg( msgs, "sending HAVE_ALL..." );
501    dbgOutMessageLen( msgs );
502    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
503}
504
505static void
506protocolSendHaveNone( tr_peermsgs * msgs )
507{
508    tr_peerIo       * io  = msgs->peer->io;
509    struct evbuffer * out = msgs->outMessages;
510
511    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
512
513    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
514    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_NONE );
515
516    dbgmsg( msgs, "sending HAVE_NONE..." );
517    dbgOutMessageLen( msgs );
518    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
519}
520
521/**
522***  EVENTS
523**/
524
525static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0, 0 };
526
527static void
528publish( tr_peermsgs * msgs, tr_peer_event * e )
529{
530    assert( msgs->peer );
531    assert( msgs->peer->msgs == msgs );
532
533    tr_publisherPublish( msgs->publisher, msgs->peer, e );
534}
535
536static void
537fireError( tr_peermsgs * msgs, int err )
538{
539    tr_peer_event e = blankEvent;
540    e.eventType = TR_PEER_ERROR;
541    e.err = err;
542    publish( msgs, &e );
543}
544
545static void
546fireUploadOnly( tr_peermsgs * msgs, tr_bool uploadOnly )
547{
548    tr_peer_event e = blankEvent;
549    e.eventType = TR_PEER_UPLOAD_ONLY;
550    e.uploadOnly = uploadOnly;
551    publish( msgs, &e );
552}
553
554static void
555fireNeedReq( tr_peermsgs * msgs )
556{
557    tr_peer_event e = blankEvent;
558    e.eventType = TR_PEER_NEED_REQ;
559    publish( msgs, &e );
560}
561
562static void
563firePeerProgress( tr_peermsgs * msgs )
564{
565    tr_peer_event e = blankEvent;
566    e.eventType = TR_PEER_PEER_PROGRESS;
567    e.progress = msgs->peer->progress;
568    publish( msgs, &e );
569}
570
571static void
572fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
573{
574    tr_peer_event e = blankEvent;
575    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
576    e.pieceIndex = req->index;
577    e.offset = req->offset;
578    e.length = req->length;
579    publish( msgs, &e );
580}
581
582static void
583fireClientGotData( tr_peermsgs * msgs,
584                   uint32_t      length,
585                   int           wasPieceData )
586{
587    tr_peer_event e = blankEvent;
588
589    e.length = length;
590    e.eventType = TR_PEER_CLIENT_GOT_DATA;
591    e.wasPieceData = wasPieceData;
592    publish( msgs, &e );
593}
594
595static void
596fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
597{
598    tr_peer_event e = blankEvent;
599    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
600    e.pieceIndex = pieceIndex;
601    publish( msgs, &e );
602}
603
604static void
605fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
606{
607    tr_peer_event e = blankEvent;
608    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
609    e.pieceIndex = pieceIndex;
610    publish( msgs, &e );
611}
612
613static void
614firePeerGotData( tr_peermsgs  * msgs,
615                 uint32_t       length,
616                 int            wasPieceData )
617{
618    tr_peer_event e = blankEvent;
619
620    e.length = length;
621    e.eventType = TR_PEER_PEER_GOT_DATA;
622    e.wasPieceData = wasPieceData;
623
624    publish( msgs, &e );
625}
626
627static void
628fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
629{
630    tr_peer_event e = blankEvent;
631    e.eventType = TR_PEER_CANCEL;
632    e.pieceIndex = req->index;
633    e.offset = req->offset;
634    e.length = req->length;
635    publish( msgs, &e );
636}
637
638/**
639***  ALLOWED FAST SET
640***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
641**/
642
643size_t
644tr_generateAllowedSet( tr_piece_index_t * setmePieces,
645                       size_t             desiredSetSize,
646                       size_t             pieceCount,
647                       const uint8_t    * infohash,
648                       const tr_address * addr )
649{
650    size_t setSize = 0;
651
652    assert( setmePieces );
653    assert( desiredSetSize <= pieceCount );
654    assert( desiredSetSize );
655    assert( pieceCount );
656    assert( infohash );
657    assert( addr );
658
659    if( addr->type == TR_AF_INET )
660    {
661        uint8_t w[SHA_DIGEST_LENGTH + 4];
662        uint8_t x[SHA_DIGEST_LENGTH];
663
664        *(uint32_t*)w = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
665        memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );                /* (2) */
666        tr_sha1( x, w, sizeof( w ), NULL );                          /* (3) */
667
668        while( setSize<desiredSetSize )
669        {
670            int i;
671            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
672            {
673                size_t k;
674                uint32_t j = i * 4;                                  /* (5) */
675                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
676                uint32_t index = y % pieceCount;                     /* (7) */
677
678                for( k=0; k<setSize; ++k )                           /* (8) */
679                    if( setmePieces[k] == index )
680                        break;
681
682                if( k == setSize )
683                    setmePieces[setSize++] = index;                  /* (9) */
684            }
685
686            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
687        }
688    }
689
690    return setSize;
691}
692
693static void
694updateFastSet( tr_peermsgs * msgs UNUSED )
695{
696#if 0
697    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
698    const int peerIsNeedy = msgs->peer->progress < 0.10;
699
700    if( fext && peerIsNeedy && !msgs->haveFastSet )
701    {
702        size_t i;
703        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
704        const tr_info * inf = &msgs->torrent->info;
705        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
706
707        /* build the fast set */
708        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
709        msgs->haveFastSet = 1;
710
711        /* send it to the peer */
712        for( i=0; i<msgs->fastsetSize; ++i )
713            protocolSendAllowedFast( msgs, msgs->fastset[i] );
714    }
715#endif
716}
717
718/**
719***  INTEREST
720**/
721
722static int
723isPieceInteresting( const tr_peermsgs * msgs,
724                    tr_piece_index_t    piece )
725{
726    const tr_torrent * torrent = msgs->torrent;
727
728    return ( !torrent->info.pieces[piece].dnd )                 /* we want it */
729          && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have */
730          && ( tr_bitfieldHas( msgs->peer->have, piece ) );    /* peer has it */
731}
732
733/* "interested" means we'll ask for piece data if they unchoke us */
734static int
735isPeerInteresting( const tr_peermsgs * msgs )
736{
737    tr_piece_index_t    i;
738    const tr_torrent *  torrent;
739    const tr_bitfield * bitfield;
740    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
741
742    if( clientIsSeed )
743        return FALSE;
744
745    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
746        return FALSE;
747
748    torrent = msgs->torrent;
749    bitfield = tr_cpPieceBitfield( torrent->completion );
750
751    if( !msgs->peer->have )
752        return TRUE;
753
754    assert( bitfield->byteCount == msgs->peer->have->byteCount );
755
756    for( i = 0; i < torrent->info.pieceCount; ++i )
757        if( isPieceInteresting( msgs, i ) )
758            return TRUE;
759
760    return FALSE;
761}
762
763static void
764sendInterest( tr_peermsgs * msgs,
765              int           weAreInterested )
766{
767    struct evbuffer * out = msgs->outMessages;
768
769    assert( msgs );
770    assert( weAreInterested == 0 || weAreInterested == 1 );
771
772    msgs->peer->clientIsInterested = weAreInterested;
773    dbgmsg( msgs, "Sending %s", weAreInterested ? "Interested" : "Not Interested" );
774    tr_peerIoWriteUint32( msgs->peer->io, out, sizeof( uint8_t ) );
775    tr_peerIoWriteUint8 ( msgs->peer->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
776
777    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
778    dbgOutMessageLen( msgs );
779}
780
781static void
782updateInterest( tr_peermsgs * msgs )
783{
784    const int i = isPeerInteresting( msgs );
785
786    if( i != msgs->peer->clientIsInterested )
787        sendInterest( msgs, i );
788    if( i )
789        fireNeedReq( msgs );
790}
791
792static int
793popNextRequest( tr_peermsgs *         msgs,
794                struct peer_request * setme )
795{
796    return reqListPop( &msgs->peerAskedFor, setme );
797}
798
799static void
800cancelAllRequestsToClient( tr_peermsgs * msgs )
801{
802    struct peer_request req;
803    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
804
805    while( popNextRequest( msgs, &req ) )
806        if( mustSendCancel )
807            protocolSendReject( msgs, &req );
808}
809
810void
811tr_peerMsgsSetChoke( tr_peermsgs * msgs,
812                     int           choke )
813{
814    const time_t now = time( NULL );
815    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
816
817    assert( msgs );
818    assert( msgs->peer );
819    assert( choke == 0 || choke == 1 );
820
821    if( msgs->peer->chokeChangedAt > fibrillationTime )
822    {
823        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
824    }
825    else if( msgs->peer->peerIsChoked != choke )
826    {
827        msgs->peer->peerIsChoked = choke;
828        if( choke )
829            cancelAllRequestsToClient( msgs );
830        protocolSendChoke( msgs, choke );
831        msgs->peer->chokeChangedAt = now;
832    }
833}
834
835/**
836***
837**/
838
839void
840tr_peerMsgsHave( tr_peermsgs * msgs,
841                 uint32_t      index )
842{
843    protocolSendHave( msgs, index );
844
845    /* since we have more pieces now, we might not be interested in this peer */
846    updateInterest( msgs );
847}
848
849/**
850***
851**/
852
853static int
854reqIsValid( const tr_peermsgs * peer,
855            uint32_t            index,
856            uint32_t            offset,
857            uint32_t            length )
858{
859    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
860}
861
862static int
863requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
864{
865    return reqIsValid( msgs, req->index, req->offset, req->length );
866}
867
868static void
869expireOldRequests( tr_peermsgs * msgs, const time_t now  )
870{
871    int i;
872    time_t oldestAllowed;
873    struct request_list tmp = REQUEST_LIST_INIT;
874    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
875    dbgmsg( msgs, "entering `expire old requests' block" );
876
877    /* cancel requests that have been queued for too long */
878    oldestAllowed = now - QUEUED_REQUEST_TTL_SECS;
879    reqListCopy( &tmp, &msgs->clientWillAskFor );
880    for( i=0; i<tmp.count; ++i ) {
881        const struct peer_request * req = &tmp.requests[i];
882        if( req->time_requested < oldestAllowed )
883            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
884    }
885    reqListClear( &tmp );
886
887    /* if the peer doesn't support "Reject Request",
888     * cancel requests that were sent too long ago. */
889    if( !fext ) {
890        oldestAllowed = now - SENT_REQUEST_TTL_SECS;
891        reqListCopy( &tmp, &msgs->clientAskedFor );
892        for( i=0; i<tmp.count; ++i ) {
893            const struct peer_request * req = &tmp.requests[i];
894            if( req->time_requested < oldestAllowed )
895                tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
896        }
897        reqListClear( &tmp );
898    }
899
900    dbgmsg( msgs, "leaving `expire old requests' block" );
901}
902
903static void
904pumpRequestQueue( tr_peermsgs * msgs, const time_t now )
905{
906    const int           max = msgs->maxActiveRequests;
907    int                 sent = 0;
908    int                 count = msgs->clientAskedFor.count;
909    struct peer_request req;
910
911    if( msgs->peer->clientIsChoked )
912        return;
913    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
914        return;
915
916    while( ( count < max ) && reqListPop( &msgs->clientWillAskFor, &req ) )
917    {
918        const tr_block_index_t block = _tr_block( msgs->torrent, req.index, req.offset );
919
920        assert( requestIsValid( msgs, &req ) );
921        assert( tr_bitfieldHas( msgs->peer->have, req.index ) );
922
923        /* don't ask for it if we've already got it... this block may have
924         * come in from a different peer after we cancelled a request for it */
925        if( !tr_cpBlockIsComplete( msgs->torrent->completion, block ) )
926        {
927            protocolSendRequest( msgs, &req );
928            req.time_requested = now;
929            reqListAppend( &msgs->clientAskedFor, &req );
930
931            ++count;
932            ++sent;
933        }
934    }
935
936    if( sent )
937        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
938                sent, msgs->clientAskedFor.count, msgs->clientWillAskFor.count );
939
940    if( count < max )
941        fireNeedReq( msgs );
942}
943
944static int
945requestQueueIsFull( const tr_peermsgs * msgs )
946{
947    const int req_max = msgs->maxActiveRequests;
948    return msgs->clientWillAskFor.count >= req_max;
949}
950
951tr_addreq_t
952tr_peerMsgsAddRequest( tr_peermsgs *    msgs,
953                       uint32_t         index,
954                       uint32_t         offset,
955                       uint32_t         length )
956{
957    struct peer_request req;
958
959    assert( msgs );
960    assert( msgs->torrent );
961    assert( reqIsValid( msgs, index, offset, length ) );
962
963    /**
964    ***  Reasons to decline the request
965    **/
966
967    /* don't send requests to choked clients */
968    if( msgs->peer->clientIsChoked ) {
969        dbgmsg( msgs, "declining request because they're choking us" );
970        return TR_ADDREQ_CLIENT_CHOKED;
971    }
972
973    /* peer doesn't have this piece */
974    if( !tr_bitfieldHas( msgs->peer->have, index ) )
975        return TR_ADDREQ_MISSING;
976
977    /* peer's queue is full */
978    if( requestQueueIsFull( msgs ) ) {
979        dbgmsg( msgs, "declining request because we're full" );
980        return TR_ADDREQ_FULL;
981    }
982
983    /* have we already asked for this piece? */
984    req.index = index;
985    req.offset = offset;
986    req.length = length;
987    if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
988        dbgmsg( msgs, "declining because it's a duplicate" );
989        return TR_ADDREQ_DUPLICATE;
990    }
991    if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
992        dbgmsg( msgs, "declining because it's a duplicate" );
993        return TR_ADDREQ_DUPLICATE;
994    }
995
996    /**
997    ***  Accept this request
998    **/
999
1000    dbgmsg( msgs, "adding req for %"PRIu32":%"PRIu32"->%"PRIu32" to our `will request' list",
1001            index, offset, length );
1002    req.time_requested = time( NULL );
1003    reqListAppend( &msgs->clientWillAskFor, &req );
1004    return TR_ADDREQ_OK;
1005}
1006
1007static void
1008cancelAllRequestsToPeer( tr_peermsgs * msgs, tr_bool sendCancel )
1009{
1010    int i;
1011    struct request_list a = msgs->clientWillAskFor;
1012    struct request_list b = msgs->clientAskedFor;
1013    dbgmsg( msgs, "cancelling all requests to peer" );
1014
1015    msgs->clientAskedFor = REQUEST_LIST_INIT;
1016    msgs->clientWillAskFor = REQUEST_LIST_INIT;
1017
1018    for( i=0; i<a.count; ++i )
1019        fireCancelledReq( msgs, &a.requests[i] );
1020
1021    for( i = 0; i < b.count; ++i ) {
1022        fireCancelledReq( msgs, &b.requests[i] );
1023        if( sendCancel )
1024            protocolSendCancel( msgs, &b.requests[i] );
1025    }
1026
1027    reqListClear( &a );
1028    reqListClear( &b );
1029}
1030
1031void
1032tr_peerMsgsCancel( tr_peermsgs * msgs,
1033                   uint32_t      pieceIndex,
1034                   uint32_t      offset,
1035                   uint32_t      length )
1036{
1037    struct peer_request req;
1038
1039    assert( msgs != NULL );
1040    assert( length > 0 );
1041
1042
1043    /* have we asked the peer for this piece? */
1044    req.index = pieceIndex;
1045    req.offset = offset;
1046    req.length = length;
1047
1048    /* if it's only in the queue and hasn't been sent yet, free it */
1049    if( reqListRemove( &msgs->clientWillAskFor, &req ) ) {
1050        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length );
1051        fireCancelledReq( msgs, &req );
1052    }
1053
1054    /* if it's already been sent, send a cancel message too */
1055    if( reqListRemove( &msgs->clientAskedFor, &req ) ) {
1056        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length );
1057        protocolSendCancel( msgs, &req );
1058        fireCancelledReq( msgs, &req );
1059    }
1060}
1061
1062
1063/**
1064***
1065**/
1066
1067static void
1068sendLtepHandshake( tr_peermsgs * msgs )
1069{
1070    tr_benc val, *m;
1071    char * buf;
1072    int len;
1073    int pex;
1074    struct evbuffer * out = msgs->outMessages;
1075
1076    if( msgs->clientSentLtepHandshake )
1077        return;
1078
1079    dbgmsg( msgs, "sending an ltep handshake" );
1080    msgs->clientSentLtepHandshake = 1;
1081
1082    /* decide if we want to advertise pex support */
1083    if( !tr_torrentAllowsPex( msgs->torrent ) )
1084        pex = 0;
1085    else if( msgs->peerSentLtepHandshake )
1086        pex = msgs->peerSupportsPex ? 1 : 0;
1087    else
1088        pex = 1;
1089
1090    tr_bencInitDict( &val, 5 );
1091    tr_bencDictAddInt( &val, "e", msgs->session->encryptionMode != TR_CLEAR_PREFERRED );
1092    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->session ) );
1093    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
1094    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
1095    m  = tr_bencDictAddDict( &val, "m", 1 );
1096    if( pex )
1097        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
1098    buf = tr_bencSave( &val, &len );
1099
1100    tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + len );
1101    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
1102    tr_peerIoWriteUint8 ( msgs->peer->io, out, LTEP_HANDSHAKE );
1103    tr_peerIoWriteBytes ( msgs->peer->io, out, buf, len );
1104    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1105    dbgOutMessageLen( msgs );
1106
1107    /* cleanup */
1108    tr_bencFree( &val );
1109    tr_free( buf );
1110}
1111
1112static void
1113parseLtepHandshake( tr_peermsgs *     msgs,
1114                    int               len,
1115                    struct evbuffer * inbuf )
1116{
1117    int64_t   i;
1118    tr_benc   val, * sub;
1119    uint8_t * tmp = tr_new( uint8_t, len );
1120
1121    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
1122    msgs->peerSentLtepHandshake = 1;
1123
1124    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
1125    {
1126        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
1127        tr_free( tmp );
1128        return;
1129    }
1130
1131    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
1132
1133    /* does the peer prefer encrypted connections? */
1134    if( tr_bencDictFindInt( &val, "e", &i ) )
1135        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1136                                              : ENCRYPTION_PREFERENCE_NO;
1137
1138    /* check supported messages for utorrent pex */
1139    msgs->peerSupportsPex = 0;
1140    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
1141        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
1142            msgs->ut_pex_id = (uint8_t) i;
1143            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
1144            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
1145        }
1146    }
1147
1148    /* look for upload_only (BEP 21) */
1149    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
1150        fireUploadOnly( msgs, i!=0 );
1151
1152    /* get peer's listening port */
1153    if( tr_bencDictFindInt( &val, "p", &i ) ) {
1154        msgs->peer->port = htons( (uint16_t)i );
1155        dbgmsg( msgs, "msgs->port is now %hu", msgs->peer->port );
1156    }
1157
1158    /* get peer's maximum request queue size */
1159    if( tr_bencDictFindInt( &val, "reqq", &i ) )
1160        msgs->reqq = i;
1161
1162    tr_bencFree( &val );
1163    tr_free( tmp );
1164}
1165
1166static void
1167parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1168{
1169    int loaded = 0;
1170    uint8_t * tmp = tr_new( uint8_t, msglen );
1171    tr_benc val;
1172    const tr_torrent * tor = msgs->torrent;
1173    const uint8_t * added;
1174    size_t added_len;
1175
1176    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1177
1178    if( tr_torrentAllowsPex( tor )
1179      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1180    {
1181        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1182        {
1183            const uint8_t * added_f = NULL;
1184            tr_pex *        pex;
1185            size_t          i, n;
1186            size_t          added_f_len = 0;
1187            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1188            pex =
1189                tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len,
1190                                        &n );
1191            for( i = 0; i < n; ++i )
1192                tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1193                                  TR_PEER_FROM_PEX, pex + i );
1194            tr_free( pex );
1195        }
1196       
1197        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1198        {
1199            const uint8_t * added_f = NULL;
1200            tr_pex *        pex;
1201            size_t          i, n;
1202            size_t          added_f_len = 0;
1203            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1204            pex =
1205                tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len,
1206                                         &n );
1207            for( i = 0; i < n; ++i )
1208                tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1209                                  TR_PEER_FROM_PEX, pex + i );
1210            tr_free( pex );
1211        }
1212       
1213    }
1214
1215    if( loaded )
1216        tr_bencFree( &val );
1217    tr_free( tmp );
1218}
1219
1220static void sendPex( tr_peermsgs * msgs );
1221
1222static void
1223parseLtep( tr_peermsgs *     msgs,
1224           int               msglen,
1225           struct evbuffer * inbuf )
1226{
1227    uint8_t ltep_msgid;
1228
1229    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1230    msglen--;
1231
1232    if( ltep_msgid == LTEP_HANDSHAKE )
1233    {
1234        dbgmsg( msgs, "got ltep handshake" );
1235        parseLtepHandshake( msgs, msglen, inbuf );
1236        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1237        {
1238            sendLtepHandshake( msgs );
1239            sendPex( msgs );
1240        }
1241    }
1242    else if( ltep_msgid == TR_LTEP_PEX )
1243    {
1244        dbgmsg( msgs, "got ut pex" );
1245        msgs->peerSupportsPex = 1;
1246        parseUtPex( msgs, msglen, inbuf );
1247    }
1248    else
1249    {
1250        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1251        evbuffer_drain( inbuf, msglen );
1252    }
1253}
1254
1255static int
1256readBtLength( tr_peermsgs *     msgs,
1257              struct evbuffer * inbuf,
1258              size_t            inlen )
1259{
1260    uint32_t len;
1261
1262    if( inlen < sizeof( len ) )
1263        return READ_LATER;
1264
1265    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1266
1267    if( len == 0 ) /* peer sent us a keepalive message */
1268        dbgmsg( msgs, "got KeepAlive" );
1269    else
1270    {
1271        msgs->incoming.length = len;
1272        msgs->state = AWAITING_BT_ID;
1273    }
1274
1275    return READ_NOW;
1276}
1277
1278static int readBtMessage( tr_peermsgs *     msgs,
1279                          struct evbuffer * inbuf,
1280                          size_t            inlen );
1281
1282static int
1283readBtId( tr_peermsgs *     msgs,
1284          struct evbuffer * inbuf,
1285          size_t            inlen )
1286{
1287    uint8_t id;
1288
1289    if( inlen < sizeof( uint8_t ) )
1290        return READ_LATER;
1291
1292    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1293    msgs->incoming.id = id;
1294
1295    if( id == BT_PIECE )
1296    {
1297        msgs->state = AWAITING_BT_PIECE;
1298        return READ_NOW;
1299    }
1300    else if( msgs->incoming.length != 1 )
1301    {
1302        msgs->state = AWAITING_BT_MESSAGE;
1303        return READ_NOW;
1304    }
1305    else return readBtMessage( msgs, inbuf, inlen - 1 );
1306}
1307
1308static void
1309updatePeerProgress( tr_peermsgs * msgs )
1310{
1311    msgs->peer->progress = tr_bitfieldCountTrueBits( msgs->peer->have ) / (float)msgs->torrent->info.pieceCount;
1312    dbgmsg( msgs, "peer progress is %f", msgs->peer->progress );
1313    updateFastSet( msgs );
1314    updateInterest( msgs );
1315    firePeerProgress( msgs );
1316}
1317
1318static void
1319peerMadeRequest( tr_peermsgs *               msgs,
1320                 const struct peer_request * req )
1321{
1322    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1323    const int reqIsValid = requestIsValid( msgs, req );
1324    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
1325    const int peerIsChoked = msgs->peer->peerIsChoked;
1326
1327    int allow = FALSE;
1328
1329    if( !reqIsValid )
1330        dbgmsg( msgs, "rejecting an invalid request." );
1331    else if( !clientHasPiece )
1332        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1333    else if( peerIsChoked )
1334        dbgmsg( msgs, "rejecting request from choked peer" );
1335    else
1336        allow = TRUE;
1337
1338    if( allow )
1339        reqListAppend( &msgs->peerAskedFor, req );
1340    else if( fext )
1341        protocolSendReject( msgs, req );
1342}
1343
1344static int
1345messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1346{
1347    switch( id )
1348    {
1349        case BT_CHOKE:
1350        case BT_UNCHOKE:
1351        case BT_INTERESTED:
1352        case BT_NOT_INTERESTED:
1353        case BT_FEXT_HAVE_ALL:
1354        case BT_FEXT_HAVE_NONE:
1355            return len == 1;
1356
1357        case BT_HAVE:
1358        case BT_FEXT_SUGGEST:
1359        case BT_FEXT_ALLOWED_FAST:
1360            return len == 5;
1361
1362        case BT_BITFIELD:
1363            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1364
1365        case BT_REQUEST:
1366        case BT_CANCEL:
1367        case BT_FEXT_REJECT:
1368            return len == 13;
1369
1370        case BT_PIECE:
1371            return len > 9 && len <= 16393;
1372
1373        case BT_PORT:
1374            return len == 3;
1375
1376        case BT_LTEP:
1377            return len >= 2;
1378
1379        default:
1380            return FALSE;
1381    }
1382}
1383
1384static int clientGotBlock( tr_peermsgs *               msgs,
1385                           const uint8_t *             block,
1386                           const struct peer_request * req );
1387
1388static int
1389readBtPiece( tr_peermsgs      * msgs,
1390             struct evbuffer  * inbuf,
1391             size_t             inlen,
1392             size_t           * setme_piece_bytes_read )
1393{
1394    struct peer_request * req = &msgs->incoming.blockReq;
1395
1396    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1397    dbgmsg( msgs, "In readBtPiece" );
1398
1399    if( !req->length )
1400    {
1401        if( inlen < 8 )
1402            return READ_LATER;
1403
1404        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1405        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1406        req->length = msgs->incoming.length - 9;
1407        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1408        return READ_NOW;
1409    }
1410    else
1411    {
1412        int err;
1413
1414        /* read in another chunk of data */
1415        const size_t nLeft = req->length - EVBUFFER_LENGTH( msgs->incoming.block );
1416        size_t n = MIN( nLeft, inlen );
1417        uint8_t * buf = tr_new( uint8_t, n );
1418        assert( EVBUFFER_LENGTH( inbuf ) >= n );
1419        tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, n );
1420        evbuffer_add( msgs->incoming.block, buf, n );
1421        fireClientGotData( msgs, n, TRUE );
1422        *setme_piece_bytes_read += n;
1423        tr_free( buf );
1424        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1425               n, req->index, req->offset, req->length,
1426               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1427        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1428            return READ_LATER;
1429
1430        /* we've got the whole block ... process it */
1431        err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), req );
1432
1433        /* cleanup */
1434        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH( msgs->incoming.block ) );
1435        req->length = 0;
1436        msgs->state = AWAITING_BT_LENGTH;
1437        if( !err )
1438            return READ_NOW;
1439        else {
1440            fireError( msgs, err );
1441            return READ_ERR;
1442        }
1443    }
1444}
1445
1446static int
1447readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1448{
1449    uint32_t      ui32;
1450    uint32_t      msglen = msgs->incoming.length;
1451    const uint8_t id = msgs->incoming.id;
1452    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1453    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1454
1455    --msglen; /* id length */
1456
1457    if( inlen < msglen )
1458        return READ_LATER;
1459
1460    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1461
1462    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1463    {
1464        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1465        fireError( msgs, EMSGSIZE );
1466        return READ_ERR;
1467    }
1468
1469    switch( id )
1470    {
1471        case BT_CHOKE:
1472            dbgmsg( msgs, "got Choke" );
1473            msgs->peer->clientIsChoked = 1;
1474            if( !fext )
1475                cancelAllRequestsToPeer( msgs, FALSE );
1476            break;
1477
1478        case BT_UNCHOKE:
1479            dbgmsg( msgs, "got Unchoke" );
1480            msgs->peer->clientIsChoked = 0;
1481            fireNeedReq( msgs );
1482            break;
1483
1484        case BT_INTERESTED:
1485            dbgmsg( msgs, "got Interested" );
1486            msgs->peer->peerIsInterested = 1;
1487            break;
1488
1489        case BT_NOT_INTERESTED:
1490            dbgmsg( msgs, "got Not Interested" );
1491            msgs->peer->peerIsInterested = 0;
1492            break;
1493
1494        case BT_HAVE:
1495            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1496            dbgmsg( msgs, "got Have: %u", ui32 );
1497            if( tr_bitfieldAdd( msgs->peer->have, ui32 ) ) {
1498                fireError( msgs, ERANGE );
1499                return READ_ERR;
1500            }
1501            updatePeerProgress( msgs );
1502            tr_rcTransferred( msgs->torrent->swarmSpeed,
1503                              msgs->torrent->info.pieceSize );
1504            break;
1505
1506        case BT_BITFIELD:
1507        {
1508            dbgmsg( msgs, "got a bitfield" );
1509            msgs->peerSentBitfield = 1;
1510            tr_peerIoReadBytes( msgs->peer->io, inbuf, msgs->peer->have->bits, msglen );
1511            updatePeerProgress( msgs );
1512            fireNeedReq( msgs );
1513            break;
1514        }
1515
1516        case BT_REQUEST:
1517        {
1518            struct peer_request r;
1519            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1520            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1521            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1522            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1523            peerMadeRequest( msgs, &r );
1524            break;
1525        }
1526
1527        case BT_CANCEL:
1528        {
1529            struct peer_request r;
1530            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1531            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1532            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1533            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1534            if( reqListRemove( &msgs->peerAskedFor, &r ) && fext )
1535                protocolSendReject( msgs, &r );
1536            break;
1537        }
1538
1539        case BT_PIECE:
1540            assert( 0 ); /* handled elsewhere! */
1541            break;
1542
1543        case BT_PORT:
1544            dbgmsg( msgs, "Got a BT_PORT" );
1545            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->port );
1546            break;
1547
1548        case BT_FEXT_SUGGEST:
1549            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1550            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1551            if( fext )
1552                fireClientGotSuggest( msgs, ui32 );
1553            else {
1554                fireError( msgs, EMSGSIZE );
1555                return READ_ERR;
1556            }
1557            break;
1558
1559        case BT_FEXT_ALLOWED_FAST:
1560            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1561            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1562            if( fext )
1563                fireClientGotAllowedFast( msgs, ui32 );
1564            else {
1565                fireError( msgs, EMSGSIZE );
1566                return READ_ERR;
1567            }
1568            break;
1569
1570        case BT_FEXT_HAVE_ALL:
1571            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1572            if( fext ) {
1573                tr_bitfieldAddRange( msgs->peer->have, 0, msgs->torrent->info.pieceCount );
1574                updatePeerProgress( msgs );
1575            } else {
1576                fireError( msgs, EMSGSIZE );
1577                return READ_ERR;
1578            }
1579            break;
1580
1581        case BT_FEXT_HAVE_NONE:
1582            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1583            if( fext ) {
1584                tr_bitfieldClear( msgs->peer->have );
1585                updatePeerProgress( msgs );
1586            } else {
1587                fireError( msgs, EMSGSIZE );
1588                return READ_ERR;
1589            }
1590            break;
1591
1592        case BT_FEXT_REJECT:
1593        {
1594            struct peer_request r;
1595            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1596            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1597            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1598            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1599            if( fext )
1600                reqListRemove( &msgs->clientAskedFor, &r );
1601            else {
1602                fireError( msgs, EMSGSIZE );
1603                return READ_ERR;
1604            }
1605            break;
1606        }
1607
1608        case BT_LTEP:
1609            dbgmsg( msgs, "Got a BT_LTEP" );
1610            parseLtep( msgs, msglen, inbuf );
1611            break;
1612
1613        default:
1614            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1615            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1616            break;
1617    }
1618
1619    assert( msglen + 1 == msgs->incoming.length );
1620    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1621
1622    msgs->state = AWAITING_BT_LENGTH;
1623    return READ_NOW;
1624}
1625
1626static void
1627decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount )
1628{
1629    tr_torrent * tor = msgs->torrent;
1630
1631    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1632}
1633
1634static void
1635clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1636{
1637    decrementDownloadedCount( msgs, req->length );
1638}
1639
1640static void
1641addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1642{
1643    if( !msgs->peer->blame )
1644         msgs->peer->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1645    tr_bitfieldAdd( msgs->peer->blame, index );
1646}
1647
1648/* returns 0 on success, or an errno on failure */
1649static int
1650clientGotBlock( tr_peermsgs *               msgs,
1651                const uint8_t *             data,
1652                const struct peer_request * req )
1653{
1654    int err;
1655    tr_torrent * tor = msgs->torrent;
1656    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1657
1658    assert( msgs );
1659    assert( req );
1660
1661    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1662        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1663                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1664        return EMSGSIZE;
1665    }
1666
1667    /* save the block */
1668    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1669
1670    /**
1671    *** Remove the block from our `we asked for this' list
1672    **/
1673
1674    if( !reqListRemove( &msgs->clientAskedFor, req ) ) {
1675        clientGotUnwantedBlock( msgs, req );
1676        dbgmsg( msgs, "we didn't ask for this message..." );
1677        return 0;
1678    }
1679
1680    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1681            msgs->clientAskedFor.count );
1682
1683    /**
1684    *** Error checks
1685    **/
1686
1687    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1688        dbgmsg( msgs, "we have this block already..." );
1689        clientGotUnwantedBlock( msgs, req );
1690        return 0;
1691    }
1692
1693    /**
1694    ***  Save the block
1695    **/
1696
1697    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1698        return err;
1699
1700    addPeerToBlamefield( msgs, req->index );
1701    fireGotBlock( msgs, req );
1702    return 0;
1703}
1704
1705static int peerPulse( void * vmsgs );
1706
1707static void
1708didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1709{
1710    tr_peermsgs * msgs = vmsgs;
1711    firePeerGotData( msgs, bytesWritten, wasPieceData );
1712    peerPulse( msgs );
1713}
1714
1715static ReadState
1716canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1717{
1718    ReadState         ret;
1719    tr_peermsgs *     msgs = vmsgs;
1720    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1721    const size_t      inlen = EVBUFFER_LENGTH( in );
1722
1723    if( !inlen )
1724    {
1725        ret = READ_LATER;
1726    }
1727    else if( msgs->state == AWAITING_BT_PIECE )
1728    {
1729        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1730    }
1731    else switch( msgs->state )
1732    {
1733        case AWAITING_BT_LENGTH:
1734            ret = readBtLength ( msgs, in, inlen ); break;
1735
1736        case AWAITING_BT_ID:
1737            ret = readBtId     ( msgs, in, inlen ); break;
1738
1739        case AWAITING_BT_MESSAGE:
1740            ret = readBtMessage( msgs, in, inlen ); break;
1741
1742        default:
1743            assert( 0 );
1744    }
1745
1746    /* log the raw data that was read */
1747    if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
1748        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1749
1750    return ret;
1751}
1752
1753/**
1754***
1755**/
1756
1757static int
1758ratePulse( void * vmsgs )
1759{
1760    tr_peermsgs * msgs = vmsgs;
1761    const double rateToClient = tr_peerGetPieceSpeed( msgs->peer, TR_PEER_TO_CLIENT );
1762    const int seconds = 10;
1763    const int floor = 8;
1764    const int estimatedBlocksInPeriod = ( rateToClient * seconds * 1024 ) / msgs->torrent->blockSize;
1765
1766    msgs->maxActiveRequests = floor + estimatedBlocksInPeriod;
1767
1768    if( msgs->reqq > 0 )
1769        msgs->maxActiveRequests = MIN( msgs->maxActiveRequests, msgs->reqq );
1770
1771    return TRUE;
1772}
1773
1774static size_t
1775fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1776{
1777    size_t bytesWritten = 0;
1778    struct peer_request req;
1779    const tr_bool haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1780    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1781
1782    /**
1783    ***  Protocol messages
1784    **/
1785
1786    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1787    {
1788        dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1789        msgs->outMessagesBatchedAt = now;
1790    }
1791    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1792    {
1793        const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
1794        /* flush the protocol messages */
1795        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1796        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, FALSE );
1797        msgs->clientSentAnythingAt = now;
1798        msgs->outMessagesBatchedAt = 0;
1799        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1800        bytesWritten +=  len;
1801    }
1802
1803    /**
1804    ***  Blocks
1805    **/
1806
1807    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io ) >= msgs->torrent->blockSize )
1808        && popNextRequest( msgs, &req ) )
1809    {
1810        if( requestIsValid( msgs, &req )
1811            && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) )
1812        {
1813            /* send a block */
1814            uint8_t * buf = tr_new( uint8_t, req.length );
1815            const int err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf );
1816            if( err ) {
1817                fireError( msgs, err );
1818                bytesWritten = 0;
1819                msgs = NULL;
1820            } else {
1821                tr_peerIo * io = msgs->peer->io;
1822                struct evbuffer * out = evbuffer_new( );
1823                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1824                tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1825                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1826                tr_peerIoWriteUint32( io, out, req.index );
1827                tr_peerIoWriteUint32( io, out, req.offset );
1828                tr_peerIoWriteBytes ( io, out, buf, req.length );
1829                tr_peerIoWriteBuf( io, out, TRUE );
1830                bytesWritten += EVBUFFER_LENGTH( out );
1831                evbuffer_free( out );
1832                msgs->clientSentAnythingAt = now;
1833            }
1834            tr_free( buf );
1835        }
1836        else if( fext ) /* peer needs a reject message */
1837        {
1838            protocolSendReject( msgs, &req );
1839        }
1840    }
1841
1842    /**
1843    ***  Keepalive
1844    **/
1845
1846    if( ( msgs != NULL )
1847        && ( msgs->clientSentAnythingAt != 0 )
1848        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1849    {
1850        dbgmsg( msgs, "sending a keepalive message" );
1851        tr_peerIoWriteUint32( msgs->peer->io, msgs->outMessages, 0 );
1852        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1853    }
1854
1855    return bytesWritten;
1856}
1857
1858static int
1859peerPulse( void * vmsgs )
1860{
1861    tr_peermsgs * msgs = vmsgs;
1862    const time_t  now = time( NULL );
1863
1864    ratePulse( msgs );
1865
1866    pumpRequestQueue( msgs, now );
1867    expireOldRequests( msgs, now );
1868
1869    for( ;; )
1870        if( fillOutputBuffer( msgs, now ) < 1 )
1871            break;
1872
1873    return TRUE; /* loop forever */
1874}
1875
1876void
1877tr_peerMsgsPulse( tr_peermsgs * msgs )
1878{
1879    if( msgs != NULL )
1880        peerPulse( msgs );
1881}
1882
1883static void
1884gotError( tr_peerIo  * io UNUSED,
1885          short        what,
1886          void       * vmsgs )
1887{
1888    if( what & EVBUFFER_TIMEOUT )
1889        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1890    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1891        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1892               what, errno, tr_strerror( errno ) );
1893    fireError( vmsgs, ENOTCONN );
1894}
1895
1896static void
1897sendBitfield( tr_peermsgs * msgs )
1898{
1899    struct evbuffer * out = msgs->outMessages;
1900    tr_bitfield *     field;
1901    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
1902    size_t            i;
1903    size_t            lazyCount = 0;
1904
1905    field = tr_bitfieldDup( tr_cpPieceBitfield( msgs->torrent->completion ) );
1906
1907    if( tr_sessionIsLazyBitfieldEnabled( msgs->session ) )
1908    {
1909        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
1910            speed over a truly random sample -- let's limit the pool size to
1911            the first 1000 pieces so large torrents don't bog things down */
1912        size_t poolSize;
1913        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
1914        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
1915
1916        /* build the pool */
1917        for( i=poolSize=0; i<maxPoolSize; ++i )
1918            if( tr_bitfieldHas( field, i ) )
1919                pool[poolSize++] = i;
1920
1921        /* pull random piece indices from the pool */
1922        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
1923        {
1924            const int pos = tr_cryptoWeakRandInt( poolSize );
1925            const tr_piece_index_t piece = pool[pos];
1926            tr_bitfieldRem( field, piece );
1927            lazyPieces[lazyCount++] = piece;
1928            pool[pos] = pool[--poolSize];
1929        }
1930
1931        /* cleanup */
1932        tr_free( pool );
1933    }
1934
1935    tr_peerIoWriteUint32( msgs->peer->io, out,
1936                          sizeof( uint8_t ) + field->byteCount );
1937    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_BITFIELD );
1938    tr_peerIoWriteBytes ( msgs->peer->io, out, field->bits, field->byteCount );
1939    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu",
1940            EVBUFFER_LENGTH( out ) );
1941    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1942
1943    for( i = 0; i < lazyCount; ++i )
1944        protocolSendHave( msgs, lazyPieces[i] );
1945
1946    tr_bitfieldFree( field );
1947}
1948
1949static void
1950tellPeerWhatWeHave( tr_peermsgs * msgs )
1951{
1952    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1953
1954    if( fext && ( tr_cpGetStatus( msgs->torrent->completion ) == TR_SEED ) )
1955    {
1956        protocolSendHaveAll( msgs );
1957    }
1958    else if( fext && ( tr_cpHaveValid( msgs->torrent->completion ) == 0 ) )
1959    {
1960        protocolSendHaveNone( msgs );
1961    }
1962    else
1963    {
1964        sendBitfield( msgs );
1965    }
1966}
1967
1968/**
1969***
1970**/
1971
1972/* some peers give us error messages if we send
1973   more than this many peers in a single pex message
1974   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1975#define MAX_PEX_ADDED 50
1976#define MAX_PEX_DROPPED 50
1977
1978typedef struct
1979{
1980    tr_pex *  added;
1981    tr_pex *  dropped;
1982    tr_pex *  elements;
1983    int       addedCount;
1984    int       droppedCount;
1985    int       elementCount;
1986}
1987PexDiffs;
1988
1989static void
1990pexAddedCb( void * vpex,
1991            void * userData )
1992{
1993    PexDiffs * diffs = userData;
1994    tr_pex *   pex = vpex;
1995
1996    if( diffs->addedCount < MAX_PEX_ADDED )
1997    {
1998        diffs->added[diffs->addedCount++] = *pex;
1999        diffs->elements[diffs->elementCount++] = *pex;
2000    }
2001}
2002
2003static void
2004pexDroppedCb( void * vpex,
2005              void * userData )
2006{
2007    PexDiffs * diffs = userData;
2008    tr_pex *   pex = vpex;
2009
2010    if( diffs->droppedCount < MAX_PEX_DROPPED )
2011    {
2012        diffs->dropped[diffs->droppedCount++] = *pex;
2013    }
2014}
2015
2016static void
2017pexElementCb( void * vpex,
2018              void * userData )
2019{
2020    PexDiffs * diffs = userData;
2021    tr_pex *   pex = vpex;
2022
2023    diffs->elements[diffs->elementCount++] = *pex;
2024}
2025
2026static void
2027sendPex( tr_peermsgs * msgs )
2028{
2029    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2030    {
2031        PexDiffs diffs;
2032        PexDiffs diffs6;
2033        tr_pex * newPex = NULL;
2034        tr_pex * newPex6 = NULL;
2035        const int newCount = tr_peerMgrGetPeers( msgs->session->peerMgr,
2036                                                 msgs->torrent->info.hash,
2037                                                 &newPex, TR_AF_INET );
2038        const int newCount6 = tr_peerMgrGetPeers( msgs->session->peerMgr,
2039                                                  msgs->torrent->info.hash,
2040                                                  &newPex6, TR_AF_INET6 );
2041
2042        /* build the diffs */
2043        diffs.added = tr_new( tr_pex, newCount );
2044        diffs.addedCount = 0;
2045        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2046        diffs.droppedCount = 0;
2047        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2048        diffs.elementCount = 0;
2049        tr_set_compare( msgs->pex, msgs->pexCount,
2050                        newPex, newCount,
2051                        tr_pexCompare, sizeof( tr_pex ),
2052                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2053        diffs6.added = tr_new( tr_pex, newCount6 );
2054        diffs6.addedCount = 0;
2055        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
2056        diffs6.droppedCount = 0;
2057        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
2058        diffs6.elementCount = 0;
2059        tr_set_compare( msgs->pex6, msgs->pexCount6,
2060                        newPex6, newCount6,
2061                        tr_pexCompare, sizeof( tr_pex ),
2062                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
2063        dbgmsg(
2064            msgs,
2065            "pex: old peer count %d, new peer count %d, added %d, removed %d",
2066            msgs->pexCount, newCount + newCount6,
2067            diffs.addedCount + diffs6.addedCount,
2068            diffs.droppedCount + diffs6.droppedCount );
2069
2070        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2071            !diffs6.droppedCount )
2072        {
2073            tr_free( diffs.elements );
2074            tr_free( diffs6.elements );
2075        }
2076        else
2077        {
2078            int  i;
2079            tr_benc val;
2080            char * benc;
2081            int bencLen;
2082            uint8_t * tmp, *walk;
2083            struct evbuffer * out = msgs->outMessages;
2084
2085            /* update peer */
2086            tr_free( msgs->pex );
2087            msgs->pex = diffs.elements;
2088            msgs->pexCount = diffs.elementCount;
2089            tr_free( msgs->pex6 );
2090            msgs->pex6 = diffs6.elements;
2091            msgs->pexCount6 = diffs6.elementCount;
2092
2093            /* build the pex payload */
2094            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
2095                                         * speed vs. likelihood? */
2096
2097            /* "added" */
2098            tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2099            for( i = 0; i < diffs.addedCount; ++i )
2100            {
2101                tr_suspectAddress( &diffs.added[i].addr, "pex" );
2102                memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2103                memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2104            }
2105            assert( ( walk - tmp ) == diffs.addedCount * 6 );
2106            tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2107            tr_free( tmp );
2108
2109            /* "added.f" */
2110            tmp = walk = tr_new( uint8_t, diffs.addedCount );
2111            for( i = 0; i < diffs.addedCount; ++i )
2112                *walk++ = diffs.added[i].flags;
2113            assert( ( walk - tmp ) == diffs.addedCount );
2114            tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2115            tr_free( tmp );
2116
2117            /* "dropped" */
2118            tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2119            for( i = 0; i < diffs.droppedCount; ++i )
2120            {
2121                memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2122                memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2123            }
2124            assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2125            tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2126            tr_free( tmp );
2127           
2128            /* "added6" */
2129            tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2130            for( i = 0; i < diffs6.addedCount; ++i )
2131            {
2132                tr_suspectAddress( &diffs6.added[i].addr, "pex6" );
2133                memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2134                walk += 16;
2135                memcpy( walk, &diffs6.added[i].port, 2 );
2136                walk += 2;
2137            }
2138            assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2139            tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2140            tr_free( tmp );
2141           
2142            /* "added6.f" */
2143            tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2144            for( i = 0; i < diffs6.addedCount; ++i )
2145                *walk++ = diffs6.added[i].flags;
2146            assert( ( walk - tmp ) == diffs6.addedCount );
2147            tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2148            tr_free( tmp );
2149           
2150            /* "dropped6" */
2151            tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2152            for( i = 0; i < diffs6.droppedCount; ++i )
2153            {
2154                memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2155                walk += 16;
2156                memcpy( walk, &diffs6.dropped[i].port, 2 );
2157                walk += 2;
2158            }
2159            assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2160            tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2161            tr_free( tmp );
2162
2163            /* write the pex message */
2164            benc = tr_bencSave( &val, &bencLen );
2165            tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + bencLen );
2166            tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
2167            tr_peerIoWriteUint8 ( msgs->peer->io, out, msgs->ut_pex_id );
2168            tr_peerIoWriteBytes ( msgs->peer->io, out, benc, bencLen );
2169            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2170            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
2171
2172            tr_free( benc );
2173            tr_bencFree( &val );
2174        }
2175
2176        /* cleanup */
2177        tr_free( diffs.added );
2178        tr_free( diffs.dropped );
2179        tr_free( newPex );
2180        tr_free( diffs6.added );
2181        tr_free( diffs6.dropped );
2182        tr_free( newPex6 );
2183
2184        msgs->clientSentPexAt = time( NULL );
2185    }
2186}
2187
2188static int
2189pexPulse( void * vpeer )
2190{
2191    sendPex( vpeer );
2192    return TRUE;
2193}
2194
2195/**
2196***
2197**/
2198
2199tr_peermsgs*
2200tr_peerMsgsNew( struct tr_torrent * torrent,
2201                struct tr_peer    * peer,
2202                tr_delivery_func    func,
2203                void              * userData,
2204                tr_publisher_tag  * setme )
2205{
2206    tr_peermsgs * m;
2207
2208    assert( peer );
2209    assert( peer->io );
2210
2211    m = tr_new0( tr_peermsgs, 1 );
2212    m->publisher = tr_publisherNew( );
2213    m->peer = peer;
2214    m->session = torrent->session;
2215    m->torrent = torrent;
2216    m->peer->clientIsChoked = 1;
2217    m->peer->peerIsChoked = 1;
2218    m->peer->clientIsInterested = 0;
2219    m->peer->peerIsInterested = 0;
2220    m->peer->have = tr_bitfieldNew( torrent->info.pieceCount );
2221    m->state = AWAITING_BT_LENGTH;
2222    m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL );
2223    m->outMessages = evbuffer_new( );
2224    m->outMessagesBatchedAt = 0;
2225    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2226    m->incoming.block = evbuffer_new( );
2227    m->peerAskedFor = REQUEST_LIST_INIT;
2228    m->clientAskedFor = REQUEST_LIST_INIT;
2229    m->clientWillAskFor = REQUEST_LIST_INIT;
2230    peer->msgs = m;
2231
2232    *setme = tr_publisherSubscribe( m->publisher, func, userData );
2233
2234    if( tr_peerIoSupportsLTEP( peer->io ) )
2235        sendLtepHandshake( m );
2236
2237    tellPeerWhatWeHave( m );
2238
2239    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2240    ratePulse( m );
2241
2242    return m;
2243}
2244
2245void
2246tr_peerMsgsFree( tr_peermsgs* msgs )
2247{
2248    if( msgs )
2249    {
2250        tr_timerFree( &msgs->pexTimer );
2251        tr_publisherFree( &msgs->publisher );
2252        reqListClear( &msgs->clientWillAskFor );
2253        reqListClear( &msgs->clientAskedFor );
2254        reqListClear( &msgs->peerAskedFor );
2255
2256        evbuffer_free( msgs->incoming.block );
2257        evbuffer_free( msgs->outMessages );
2258        tr_free( msgs->pex6 );
2259        tr_free( msgs->pex );
2260
2261        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2262        tr_free( msgs );
2263    }
2264}
2265
2266void
2267tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2268                        tr_publisher_tag tag )
2269{
2270    tr_publisherUnsubscribe( peer->publisher, tag );
2271}
2272
Note: See TracBrowser for help on using the repository browser.