source: trunk/libtransmission/peer-msgs.c @ 7540

Last change on this file since 7540 was 7540, checked in by charles, 12 years ago

(trunk libT) fix r7535 bug reported by Rolcol

  • Property svn:keywords set to Date Rev Author Id
File size: 65.1 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 7540 2008-12-30 02:42:45Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <event.h>
21
22#include "transmission.h"
23#include "session.h"
24#include "bencode.h"
25#include "completion.h"
26#include "crypto.h"
27#include "inout.h"
28#ifdef WIN32
29#include "net.h" /* for ECONN */
30#endif
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-mgr-private.h"
34#include "peer-msgs.h"
35#include "platform.h" /* MAX_STACK_ARRAY_SIZE */
36#include "ratecontrol.h"
37#include "stats.h"
38#include "torrent.h"
39#include "trevent.h"
40#include "utils.h"
41
42/**
43***
44**/
45
46enum
47{
48    BT_CHOKE                = 0,
49    BT_UNCHOKE              = 1,
50    BT_INTERESTED           = 2,
51    BT_NOT_INTERESTED       = 3,
52    BT_HAVE                 = 4,
53    BT_BITFIELD             = 5,
54    BT_REQUEST              = 6,
55    BT_PIECE                = 7,
56    BT_CANCEL               = 8,
57    BT_PORT                 = 9,
58
59    BT_FEXT_SUGGEST         = 13,
60    BT_FEXT_HAVE_ALL        = 14,
61    BT_FEXT_HAVE_NONE       = 15,
62    BT_FEXT_REJECT          = 16,
63    BT_FEXT_ALLOWED_FAST    = 17,
64
65    BT_LTEP                 = 20,
66
67    LTEP_HANDSHAKE          = 0,
68
69    TR_LTEP_PEX             = 1,
70
71
72
73    MIN_CHOKE_PERIOD_SEC    = ( 10 ),
74
75    /* idle seconds before we send a keepalive */
76    KEEPALIVE_INTERVAL_SECS = 100,
77
78    PEX_INTERVAL            = ( 90 * 1000 ), /* msec between sendPex() calls */
79
80    MAX_QUEUE_SIZE          = ( 100 ),
81
82    /* how long an unsent request can stay queued before it's returned
83       back to the peer-mgr's pool of requests */
84    QUEUED_REQUEST_TTL_SECS = 20,
85
86    /* how long a sent request can stay queued before it's returned
87       back to the peer-mgr's pool of requests */
88    SENT_REQUEST_TTL_SECS = 240,
89
90    /* used in lowering the outMessages queue period */
91    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
92    HIGH_PRIORITY_INTERVAL_SECS = 2,
93    LOW_PRIORITY_INTERVAL_SECS = 20,
94
95    /* number of pieces to remove from the bitfield when
96     * lazy bitfields are turned on */
97    LAZY_PIECE_COUNT = 26,
98
99    /* number of pieces we'll allow in our fast set */
100    MAX_FAST_SET_SIZE = 3
101};
102
103/**
104***  REQUEST MANAGEMENT
105**/
106
107enum
108{
109    AWAITING_BT_LENGTH,
110    AWAITING_BT_ID,
111    AWAITING_BT_MESSAGE,
112    AWAITING_BT_PIECE
113};
114
115struct peer_request
116{
117    uint32_t    index;
118    uint32_t    offset;
119    uint32_t    length;
120    time_t      time_requested;
121};
122
123static int
124compareRequest( const void * va, const void * vb )
125{
126    const struct peer_request * a = va;
127    const struct peer_request * b = vb;
128
129    if( a->index != b->index )
130        return a->index < b->index ? -1 : 1;
131
132    if( a->offset != b->offset )
133        return a->offset < b->offset ? -1 : 1;
134
135    if( a->length != b->length )
136        return a->length < b->length ? -1 : 1;
137
138    return 0;
139}
140
141struct request_list
142{
143    uint16_t               count;
144    uint16_t               max;
145    struct peer_request *  requests;
146};
147
148static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
149
150static void
151reqListReserve( struct request_list * list,
152                uint16_t              max )
153{
154    if( list->max < max )
155    {
156        list->max = max;
157        list->requests = tr_renew( struct peer_request,
158                                   list->requests,
159                                   list->max );
160    }
161}
162
163static void
164reqListClear( struct request_list * list )
165{
166    tr_free( list->requests );
167    *list = REQUEST_LIST_INIT;
168}
169
170static void
171reqListCopy( struct request_list * dest, const struct request_list * src )
172{
173    dest->count = dest->max = src->count;
174    dest->requests = tr_memdup( src->requests, dest->count * sizeof( struct peer_request ) );
175}
176
177static void
178reqListRemoveOne( struct request_list * list,
179                  int                   i )
180{
181    assert( 0 <= i && i < list->count );
182
183    memmove( &list->requests[i],
184             &list->requests[i + 1],
185             sizeof( struct peer_request ) * ( --list->count - i ) );
186}
187
188static void
189reqListAppend( struct request_list *       list,
190               const struct peer_request * req )
191{
192    if( ++list->count >= list->max )
193        reqListReserve( list, list->max + 8 );
194
195    list->requests[list->count - 1] = *req;
196}
197
198static int
199reqListPop( struct request_list * list,
200            struct peer_request * setme )
201{
202    int success;
203
204    if( !list->count )
205        success = FALSE;
206    else {
207        *setme = list->requests[0];
208        reqListRemoveOne( list, 0 );
209        success = TRUE;
210    }
211
212    return success;
213}
214
215static int
216reqListFind( struct request_list *       list,
217             const struct peer_request * key )
218{
219    uint16_t i;
220
221    for( i = 0; i < list->count; ++i )
222        if( !compareRequest( key, list->requests + i ) )
223            return i;
224
225    return -1;
226}
227
228static int
229reqListRemove( struct request_list *       list,
230               const struct peer_request * key )
231{
232    int success;
233    const int i = reqListFind( list, key );
234
235    if( i < 0 )
236        success = FALSE;
237    else {
238        reqListRemoveOne( list, i );
239        success = TRUE;
240    }
241
242    return success;
243}
244
245/**
246***
247**/
248
249/* this is raw, unchanged data from the peer regarding
250 * the current message that it's sending us. */
251struct tr_incoming
252{
253    uint8_t                id;
254    uint32_t               length; /* includes the +1 for id length */
255    struct peer_request    blockReq; /* metadata for incoming blocks */
256    struct evbuffer *      block; /* piece data for incoming blocks */
257};
258
259/**
260 * Low-level communication state information about a connected peer.
261 *
262 * This structure remembers the low-level protocol states that we're
263 * in with this peer, such as active requests, pex messages, and so on.
264 * Its fields are all private to peer-msgs.c.
265 *
266 * Data not directly involved with sending & receiving messages is
267 * stored in tr_peer, where it can be accessed by both peermsgs and
268 * the peer manager.
269 *
270 * @see struct peer_atom
271 * @see tr_peer
272 */
273struct tr_peermsgs
274{
275    tr_bool         peerSentBitfield;
276    tr_bool         peerSupportsPex;
277    tr_bool         clientSentLtepHandshake;
278    tr_bool         peerSentLtepHandshake;
279    tr_bool         haveFastSet;
280
281    uint8_t         state;
282    uint8_t         ut_pex_id;
283    uint16_t        pexCount;
284    uint16_t        pexCount6;
285    uint16_t        maxActiveRequests;
286
287    size_t                 fastsetSize;
288    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
289
290    /* how long the outMessages batch should be allowed to grow before
291     * it's flushed -- some messages (like requests >:) should be sent
292     * very quickly; others aren't as urgent. */
293    int                    outMessagesBatchPeriod;
294
295    tr_peer *              peer;
296
297    tr_session *           session;
298    tr_torrent *           torrent;
299
300    tr_publisher           publisher;
301
302    struct evbuffer *      outMessages; /* all the non-piece messages */
303
304    struct request_list    peerAskedFor;
305    struct request_list    clientAskedFor;
306    struct request_list    clientWillAskFor;
307
308    tr_timer             * pexTimer;
309    tr_pex               * pex;
310    tr_pex               * pex6;
311
312    time_t                 clientSentPexAt;
313    time_t                 clientSentAnythingAt;
314
315    /* when we started batching the outMessages */
316    time_t                outMessagesBatchedAt;
317
318    struct tr_incoming    incoming;
319
320    /* if the peer supports the Extension Protocol in BEP 10 and
321       supplied a reqq argument, it's stored here.  otherwise the
322       value is zero and should be ignored. */
323    int64_t               reqq;
324};
325
326/**
327***
328**/
329
330static void
331myDebug( const char * file, int line,
332         const struct tr_peermsgs * msgs,
333         const char * fmt, ... )
334{
335    FILE * fp = tr_getLog( );
336
337    if( fp )
338    {
339        va_list           args;
340        char              timestr[64];
341        struct evbuffer * buf = evbuffer_new( );
342        char *            base = tr_basename( file );
343
344        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
345                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
346                             msgs->torrent->info.name,
347                             tr_peerIoGetAddrStr( msgs->peer->io ),
348                             msgs->peer->client );
349        va_start( args, fmt );
350        evbuffer_add_vprintf( buf, fmt, args );
351        va_end( args );
352        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
353        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
354
355        tr_free( base );
356        evbuffer_free( buf );
357    }
358}
359
360#define dbgmsg( msgs, ... ) \
361    do { \
362        if( tr_deepLoggingIsActive( ) ) \
363            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
364    } while( 0 )
365
366/**
367***
368**/
369
370static void
371pokeBatchPeriod( tr_peermsgs * msgs,
372                 int           interval )
373{
374    if( msgs->outMessagesBatchPeriod > interval )
375    {
376        msgs->outMessagesBatchPeriod = interval;
377        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
378    }
379}
380
381static void
382dbgOutMessageLen( tr_peermsgs * msgs )
383{
384    dbgmsg( msgs, "outMessage size is now %zu", EVBUFFER_LENGTH( msgs->outMessages ) );
385}
386
387static void
388protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
389{
390    tr_peerIo       * io  = msgs->peer->io;
391    struct evbuffer * out = msgs->outMessages;
392
393    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
394
395    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
396    tr_peerIoWriteUint8 ( io, out, BT_FEXT_REJECT );
397    tr_peerIoWriteUint32( io, out, req->index );
398    tr_peerIoWriteUint32( io, out, req->offset );
399    tr_peerIoWriteUint32( io, out, req->length );
400
401    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
402    dbgOutMessageLen( msgs );
403}
404
405static void
406protocolSendRequest( tr_peermsgs               * msgs,
407                     const struct peer_request * req )
408{
409    tr_peerIo       * io  = msgs->peer->io;
410    struct evbuffer * out = msgs->outMessages;
411
412    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
413    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
414    tr_peerIoWriteUint32( io, out, req->index );
415    tr_peerIoWriteUint32( io, out, req->offset );
416    tr_peerIoWriteUint32( io, out, req->length );
417
418    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
419    dbgOutMessageLen( msgs );
420    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
421}
422
423static void
424protocolSendCancel( tr_peermsgs               * msgs,
425                    const struct peer_request * req )
426{
427    tr_peerIo       * io  = msgs->peer->io;
428    struct evbuffer * out = msgs->outMessages;
429
430    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
431    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
432    tr_peerIoWriteUint32( io, out, req->index );
433    tr_peerIoWriteUint32( io, out, req->offset );
434    tr_peerIoWriteUint32( io, out, req->length );
435
436    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
437    dbgOutMessageLen( msgs );
438    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
439}
440
441static void
442protocolSendHave( tr_peermsgs * msgs,
443                  uint32_t      index )
444{
445    tr_peerIo       * io  = msgs->peer->io;
446    struct evbuffer * out = msgs->outMessages;
447
448    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
449    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
450    tr_peerIoWriteUint32( io, out, index );
451
452    dbgmsg( msgs, "sending Have %u", index );
453    dbgOutMessageLen( msgs );
454    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
455}
456
457#if 0
458static void
459protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
460{
461    tr_peerIo       * io  = msgs->peer->io;
462    struct evbuffer * out = msgs->outMessages;
463
464    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
465
466    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
467    tr_peerIoWriteUint8 ( io, out, BT_FEXT_ALLOWED_FAST );
468    tr_peerIoWriteUint32( io, out, pieceIndex );
469
470    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
471    dbgOutMessageLen( msgs );
472}
473#endif
474
475static void
476protocolSendChoke( tr_peermsgs * msgs,
477                   int           choke )
478{
479    tr_peerIo       * io  = msgs->peer->io;
480    struct evbuffer * out = msgs->outMessages;
481
482    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
483    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
484
485    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
486    dbgOutMessageLen( msgs );
487    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
488}
489
490static void
491protocolSendHaveAll( tr_peermsgs * msgs )
492{
493    tr_peerIo       * io  = msgs->peer->io;
494    struct evbuffer * out = msgs->outMessages;
495
496    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
497
498    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
499    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_ALL );
500
501    dbgmsg( msgs, "sending HAVE_ALL..." );
502    dbgOutMessageLen( msgs );
503    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
504}
505
506static void
507protocolSendHaveNone( tr_peermsgs * msgs )
508{
509    tr_peerIo       * io  = msgs->peer->io;
510    struct evbuffer * out = msgs->outMessages;
511
512    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
513
514    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
515    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_NONE );
516
517    dbgmsg( msgs, "sending HAVE_NONE..." );
518    dbgOutMessageLen( msgs );
519    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
520}
521
522/**
523***  EVENTS
524**/
525
526static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0, 0 };
527
528static void
529publish( tr_peermsgs * msgs, tr_peer_event * e )
530{
531    assert( msgs->peer );
532    assert( msgs->peer->msgs == msgs );
533
534    tr_publisherPublish( &msgs->publisher, msgs->peer, e );
535}
536
537static void
538fireError( tr_peermsgs * msgs, int err )
539{
540    tr_peer_event e = blankEvent;
541    e.eventType = TR_PEER_ERROR;
542    e.err = err;
543    publish( msgs, &e );
544}
545
546static void
547fireUploadOnly( tr_peermsgs * msgs, tr_bool uploadOnly )
548{
549    tr_peer_event e = blankEvent;
550    e.eventType = TR_PEER_UPLOAD_ONLY;
551    e.uploadOnly = uploadOnly;
552    publish( msgs, &e );
553}
554
555static void
556fireNeedReq( tr_peermsgs * msgs )
557{
558    tr_peer_event e = blankEvent;
559    e.eventType = TR_PEER_NEED_REQ;
560    publish( msgs, &e );
561}
562
563static void
564firePeerProgress( tr_peermsgs * msgs )
565{
566    tr_peer_event e = blankEvent;
567    e.eventType = TR_PEER_PEER_PROGRESS;
568    e.progress = msgs->peer->progress;
569    publish( msgs, &e );
570}
571
572static void
573fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
574{
575    tr_peer_event e = blankEvent;
576    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
577    e.pieceIndex = req->index;
578    e.offset = req->offset;
579    e.length = req->length;
580    publish( msgs, &e );
581}
582
583static void
584fireClientGotData( tr_peermsgs * msgs,
585                   uint32_t      length,
586                   int           wasPieceData )
587{
588    tr_peer_event e = blankEvent;
589
590    e.length = length;
591    e.eventType = TR_PEER_CLIENT_GOT_DATA;
592    e.wasPieceData = wasPieceData;
593    publish( msgs, &e );
594}
595
596static void
597fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
598{
599    tr_peer_event e = blankEvent;
600    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
601    e.pieceIndex = pieceIndex;
602    publish( msgs, &e );
603}
604
605static void
606fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
607{
608    tr_peer_event e = blankEvent;
609    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
610    e.pieceIndex = pieceIndex;
611    publish( msgs, &e );
612}
613
614static void
615firePeerGotData( tr_peermsgs  * msgs,
616                 uint32_t       length,
617                 int            wasPieceData )
618{
619    tr_peer_event e = blankEvent;
620
621    e.length = length;
622    e.eventType = TR_PEER_PEER_GOT_DATA;
623    e.wasPieceData = wasPieceData;
624
625    publish( msgs, &e );
626}
627
628static void
629fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
630{
631    tr_peer_event e = blankEvent;
632    e.eventType = TR_PEER_CANCEL;
633    e.pieceIndex = req->index;
634    e.offset = req->offset;
635    e.length = req->length;
636    publish( msgs, &e );
637}
638
639/**
640***  ALLOWED FAST SET
641***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
642**/
643
644size_t
645tr_generateAllowedSet( tr_piece_index_t * setmePieces,
646                       size_t             desiredSetSize,
647                       size_t             pieceCount,
648                       const uint8_t    * infohash,
649                       const tr_address * addr )
650{
651    size_t setSize = 0;
652
653    assert( setmePieces );
654    assert( desiredSetSize <= pieceCount );
655    assert( desiredSetSize );
656    assert( pieceCount );
657    assert( infohash );
658    assert( addr );
659
660    if( addr->type == TR_AF_INET )
661    {
662        uint8_t w[SHA_DIGEST_LENGTH + 4];
663        uint8_t x[SHA_DIGEST_LENGTH];
664
665        *(uint32_t*)w = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
666        memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );                /* (2) */
667        tr_sha1( x, w, sizeof( w ), NULL );                          /* (3) */
668
669        while( setSize<desiredSetSize )
670        {
671            int i;
672            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
673            {
674                size_t k;
675                uint32_t j = i * 4;                                  /* (5) */
676                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
677                uint32_t index = y % pieceCount;                     /* (7) */
678
679                for( k=0; k<setSize; ++k )                           /* (8) */
680                    if( setmePieces[k] == index )
681                        break;
682
683                if( k == setSize )
684                    setmePieces[setSize++] = index;                  /* (9) */
685            }
686
687            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
688        }
689    }
690
691    return setSize;
692}
693
694static void
695updateFastSet( tr_peermsgs * msgs UNUSED )
696{
697#if 0
698    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
699    const int peerIsNeedy = msgs->peer->progress < 0.10;
700
701    if( fext && peerIsNeedy && !msgs->haveFastSet )
702    {
703        size_t i;
704        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
705        const tr_info * inf = &msgs->torrent->info;
706        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
707
708        /* build the fast set */
709        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
710        msgs->haveFastSet = 1;
711
712        /* send it to the peer */
713        for( i=0; i<msgs->fastsetSize; ++i )
714            protocolSendAllowedFast( msgs, msgs->fastset[i] );
715    }
716#endif
717}
718
719/**
720***  INTEREST
721**/
722
723static int
724isPieceInteresting( const tr_peermsgs * msgs,
725                    tr_piece_index_t    piece )
726{
727    const tr_torrent * torrent = msgs->torrent;
728
729    return ( !torrent->info.pieces[piece].dnd )                 /* we want it */
730          && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have */
731          && ( tr_bitfieldHas( msgs->peer->have, piece ) );    /* peer has it */
732}
733
734/* "interested" means we'll ask for piece data if they unchoke us */
735static int
736isPeerInteresting( const tr_peermsgs * msgs )
737{
738    tr_piece_index_t    i;
739    const tr_torrent *  torrent;
740    const tr_bitfield * bitfield;
741    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
742
743    if( clientIsSeed )
744        return FALSE;
745
746    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
747        return FALSE;
748
749    torrent = msgs->torrent;
750    bitfield = tr_cpPieceBitfield( torrent->completion );
751
752    if( !msgs->peer->have )
753        return TRUE;
754
755    assert( bitfield->byteCount == msgs->peer->have->byteCount );
756
757    for( i = 0; i < torrent->info.pieceCount; ++i )
758        if( isPieceInteresting( msgs, i ) )
759            return TRUE;
760
761    return FALSE;
762}
763
764static void
765sendInterest( tr_peermsgs * msgs,
766              int           weAreInterested )
767{
768    struct evbuffer * out = msgs->outMessages;
769
770    assert( msgs );
771    assert( weAreInterested == 0 || weAreInterested == 1 );
772
773    msgs->peer->clientIsInterested = weAreInterested;
774    dbgmsg( msgs, "Sending %s", weAreInterested ? "Interested" : "Not Interested" );
775    tr_peerIoWriteUint32( msgs->peer->io, out, sizeof( uint8_t ) );
776    tr_peerIoWriteUint8 ( msgs->peer->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
777
778    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
779    dbgOutMessageLen( msgs );
780}
781
782static void
783updateInterest( tr_peermsgs * msgs )
784{
785    const int i = isPeerInteresting( msgs );
786
787    if( i != msgs->peer->clientIsInterested )
788        sendInterest( msgs, i );
789    if( i )
790        fireNeedReq( msgs );
791}
792
793static int
794popNextRequest( tr_peermsgs *         msgs,
795                struct peer_request * setme )
796{
797    return reqListPop( &msgs->peerAskedFor, setme );
798}
799
800static void
801cancelAllRequestsToClient( tr_peermsgs * msgs )
802{
803    struct peer_request req;
804    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
805
806    while( popNextRequest( msgs, &req ) )
807        if( mustSendCancel )
808            protocolSendReject( msgs, &req );
809}
810
811void
812tr_peerMsgsSetChoke( tr_peermsgs * msgs,
813                     int           choke )
814{
815    const time_t now = time( NULL );
816    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
817
818    assert( msgs );
819    assert( msgs->peer );
820    assert( choke == 0 || choke == 1 );
821
822    if( msgs->peer->chokeChangedAt > fibrillationTime )
823    {
824        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
825    }
826    else if( msgs->peer->peerIsChoked != choke )
827    {
828        msgs->peer->peerIsChoked = choke;
829        if( choke )
830            cancelAllRequestsToClient( msgs );
831        protocolSendChoke( msgs, choke );
832        msgs->peer->chokeChangedAt = now;
833    }
834}
835
836/**
837***
838**/
839
840void
841tr_peerMsgsHave( tr_peermsgs * msgs,
842                 uint32_t      index )
843{
844    protocolSendHave( msgs, index );
845
846    /* since we have more pieces now, we might not be interested in this peer */
847    updateInterest( msgs );
848}
849
850/**
851***
852**/
853
854static int
855reqIsValid( const tr_peermsgs * peer,
856            uint32_t            index,
857            uint32_t            offset,
858            uint32_t            length )
859{
860    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
861}
862
863static int
864requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
865{
866    return reqIsValid( msgs, req->index, req->offset, req->length );
867}
868
869static void
870expireOldRequests( tr_peermsgs * msgs, const time_t now  )
871{
872    int i;
873    time_t oldestAllowed;
874    struct request_list tmp = REQUEST_LIST_INIT;
875    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
876    dbgmsg( msgs, "entering `expire old requests' block" );
877
878    /* cancel requests that have been queued for too long */
879    oldestAllowed = now - QUEUED_REQUEST_TTL_SECS;
880    reqListCopy( &tmp, &msgs->clientWillAskFor );
881    for( i=0; i<tmp.count; ++i ) {
882        const struct peer_request * req = &tmp.requests[i];
883        if( req->time_requested < oldestAllowed )
884            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
885    }
886    reqListClear( &tmp );
887
888    /* if the peer doesn't support "Reject Request",
889     * cancel requests that were sent too long ago. */
890    if( !fext ) {
891        oldestAllowed = now - SENT_REQUEST_TTL_SECS;
892        reqListCopy( &tmp, &msgs->clientAskedFor );
893        for( i=0; i<tmp.count; ++i ) {
894            const struct peer_request * req = &tmp.requests[i];
895            if( req->time_requested < oldestAllowed )
896                tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
897        }
898        reqListClear( &tmp );
899    }
900
901    dbgmsg( msgs, "leaving `expire old requests' block" );
902}
903
904static void
905pumpRequestQueue( tr_peermsgs * msgs, const time_t now )
906{
907    const int           max = msgs->maxActiveRequests;
908    int                 sent = 0;
909    int                 count = msgs->clientAskedFor.count;
910    struct peer_request req;
911
912    if( msgs->peer->clientIsChoked )
913        return;
914    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
915        return;
916
917    while( ( count < max ) && reqListPop( &msgs->clientWillAskFor, &req ) )
918    {
919        const tr_block_index_t block = _tr_block( msgs->torrent, req.index, req.offset );
920
921        assert( requestIsValid( msgs, &req ) );
922        assert( tr_bitfieldHas( msgs->peer->have, req.index ) );
923
924        /* don't ask for it if we've already got it... this block may have
925         * come in from a different peer after we cancelled a request for it */
926        if( !tr_cpBlockIsComplete( msgs->torrent->completion, block ) )
927        {
928            protocolSendRequest( msgs, &req );
929            req.time_requested = now;
930            reqListAppend( &msgs->clientAskedFor, &req );
931
932            ++count;
933            ++sent;
934        }
935    }
936
937    if( sent )
938        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
939                sent, msgs->clientAskedFor.count, msgs->clientWillAskFor.count );
940
941    if( count < max )
942        fireNeedReq( msgs );
943}
944
945static int
946requestQueueIsFull( const tr_peermsgs * msgs )
947{
948    const int req_max = msgs->maxActiveRequests;
949    return msgs->clientWillAskFor.count >= req_max;
950}
951
952tr_addreq_t
953tr_peerMsgsAddRequest( tr_peermsgs *    msgs,
954                       uint32_t         index,
955                       uint32_t         offset,
956                       uint32_t         length )
957{
958    struct peer_request req;
959
960    assert( msgs );
961    assert( msgs->torrent );
962    assert( reqIsValid( msgs, index, offset, length ) );
963
964    /**
965    ***  Reasons to decline the request
966    **/
967
968    /* don't send requests to choked clients */
969    if( msgs->peer->clientIsChoked ) {
970        dbgmsg( msgs, "declining request because they're choking us" );
971        return TR_ADDREQ_CLIENT_CHOKED;
972    }
973
974    /* peer doesn't have this piece */
975    if( !tr_bitfieldHas( msgs->peer->have, index ) )
976        return TR_ADDREQ_MISSING;
977
978    /* peer's queue is full */
979    if( requestQueueIsFull( msgs ) ) {
980        dbgmsg( msgs, "declining request because we're full" );
981        return TR_ADDREQ_FULL;
982    }
983
984    /* have we already asked for this piece? */
985    req.index = index;
986    req.offset = offset;
987    req.length = length;
988    if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
989        dbgmsg( msgs, "declining because it's a duplicate" );
990        return TR_ADDREQ_DUPLICATE;
991    }
992    if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
993        dbgmsg( msgs, "declining because it's a duplicate" );
994        return TR_ADDREQ_DUPLICATE;
995    }
996
997    /**
998    ***  Accept this request
999    **/
1000
1001    dbgmsg( msgs, "adding req for %"PRIu32":%"PRIu32"->%"PRIu32" to our `will request' list",
1002            index, offset, length );
1003    req.time_requested = time( NULL );
1004    reqListAppend( &msgs->clientWillAskFor, &req );
1005    return TR_ADDREQ_OK;
1006}
1007
1008static void
1009cancelAllRequestsToPeer( tr_peermsgs * msgs, tr_bool sendCancel )
1010{
1011    int i;
1012    struct request_list a = msgs->clientWillAskFor;
1013    struct request_list b = msgs->clientAskedFor;
1014    dbgmsg( msgs, "cancelling all requests to peer" );
1015
1016    msgs->clientAskedFor = REQUEST_LIST_INIT;
1017    msgs->clientWillAskFor = REQUEST_LIST_INIT;
1018
1019    for( i=0; i<a.count; ++i )
1020        fireCancelledReq( msgs, &a.requests[i] );
1021
1022    for( i = 0; i < b.count; ++i ) {
1023        fireCancelledReq( msgs, &b.requests[i] );
1024        if( sendCancel )
1025            protocolSendCancel( msgs, &b.requests[i] );
1026    }
1027
1028    reqListClear( &a );
1029    reqListClear( &b );
1030}
1031
1032void
1033tr_peerMsgsCancel( tr_peermsgs * msgs,
1034                   uint32_t      pieceIndex,
1035                   uint32_t      offset,
1036                   uint32_t      length )
1037{
1038    struct peer_request req;
1039
1040    assert( msgs != NULL );
1041    assert( length > 0 );
1042
1043
1044    /* have we asked the peer for this piece? */
1045    req.index = pieceIndex;
1046    req.offset = offset;
1047    req.length = length;
1048
1049    /* if it's only in the queue and hasn't been sent yet, free it */
1050    if( reqListRemove( &msgs->clientWillAskFor, &req ) ) {
1051        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length );
1052        fireCancelledReq( msgs, &req );
1053    }
1054
1055    /* if it's already been sent, send a cancel message too */
1056    if( reqListRemove( &msgs->clientAskedFor, &req ) ) {
1057        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length );
1058        protocolSendCancel( msgs, &req );
1059        fireCancelledReq( msgs, &req );
1060    }
1061}
1062
1063
1064/**
1065***
1066**/
1067
1068static void
1069sendLtepHandshake( tr_peermsgs * msgs )
1070{
1071    tr_benc val, *m;
1072    char * buf;
1073    int len;
1074    int pex;
1075    struct evbuffer * out = msgs->outMessages;
1076
1077    if( msgs->clientSentLtepHandshake )
1078        return;
1079
1080    dbgmsg( msgs, "sending an ltep handshake" );
1081    msgs->clientSentLtepHandshake = 1;
1082
1083    /* decide if we want to advertise pex support */
1084    if( !tr_torrentAllowsPex( msgs->torrent ) )
1085        pex = 0;
1086    else if( msgs->peerSentLtepHandshake )
1087        pex = msgs->peerSupportsPex ? 1 : 0;
1088    else
1089        pex = 1;
1090
1091    tr_bencInitDict( &val, 5 );
1092    tr_bencDictAddInt( &val, "e", msgs->session->encryptionMode != TR_CLEAR_PREFERRED );
1093    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->session ) );
1094    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
1095    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
1096    m  = tr_bencDictAddDict( &val, "m", 1 );
1097    if( pex )
1098        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
1099    buf = tr_bencSave( &val, &len );
1100
1101    tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + len );
1102    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
1103    tr_peerIoWriteUint8 ( msgs->peer->io, out, LTEP_HANDSHAKE );
1104    tr_peerIoWriteBytes ( msgs->peer->io, out, buf, len );
1105    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1106    dbgOutMessageLen( msgs );
1107
1108    /* cleanup */
1109    tr_bencFree( &val );
1110    tr_free( buf );
1111}
1112
1113static void
1114parseLtepHandshake( tr_peermsgs *     msgs,
1115                    int               len,
1116                    struct evbuffer * inbuf )
1117{
1118    int64_t   i;
1119    tr_benc   val, * sub;
1120    uint8_t * tmp = tr_new( uint8_t, len );
1121
1122    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
1123    msgs->peerSentLtepHandshake = 1;
1124
1125    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
1126    {
1127        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
1128        tr_free( tmp );
1129        return;
1130    }
1131
1132    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
1133
1134    /* does the peer prefer encrypted connections? */
1135    if( tr_bencDictFindInt( &val, "e", &i ) )
1136        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1137                                              : ENCRYPTION_PREFERENCE_NO;
1138
1139    /* check supported messages for utorrent pex */
1140    msgs->peerSupportsPex = 0;
1141    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
1142        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
1143            msgs->ut_pex_id = (uint8_t) i;
1144            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
1145            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
1146        }
1147    }
1148
1149    /* look for upload_only (BEP 21) */
1150    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
1151        fireUploadOnly( msgs, i!=0 );
1152
1153    /* get peer's listening port */
1154    if( tr_bencDictFindInt( &val, "p", &i ) ) {
1155        msgs->peer->port = htons( (uint16_t)i );
1156        dbgmsg( msgs, "msgs->port is now %hu", msgs->peer->port );
1157    }
1158
1159    /* get peer's maximum request queue size */
1160    if( tr_bencDictFindInt( &val, "reqq", &i ) )
1161        msgs->reqq = i;
1162
1163    tr_bencFree( &val );
1164    tr_free( tmp );
1165}
1166
1167static void
1168parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1169{
1170    int loaded = 0;
1171    uint8_t * tmp = tr_new( uint8_t, msglen );
1172    tr_benc val;
1173    const tr_torrent * tor = msgs->torrent;
1174    const uint8_t * added;
1175    size_t added_len;
1176
1177    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1178
1179    if( tr_torrentAllowsPex( tor )
1180      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1181    {
1182        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1183        {
1184            const uint8_t * added_f = NULL;
1185            tr_pex *        pex;
1186            size_t          i, n;
1187            size_t          added_f_len = 0;
1188            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1189            pex =
1190                tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len,
1191                                        &n );
1192            for( i = 0; i < n; ++i )
1193                tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1194                                  TR_PEER_FROM_PEX, pex + i );
1195            tr_free( pex );
1196        }
1197       
1198        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1199        {
1200            const uint8_t * added_f = NULL;
1201            tr_pex *        pex;
1202            size_t          i, n;
1203            size_t          added_f_len = 0;
1204            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1205            pex =
1206                tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len,
1207                                         &n );
1208            for( i = 0; i < n; ++i )
1209                tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1210                                  TR_PEER_FROM_PEX, pex + i );
1211            tr_free( pex );
1212        }
1213       
1214    }
1215
1216    if( loaded )
1217        tr_bencFree( &val );
1218    tr_free( tmp );
1219}
1220
1221static void sendPex( tr_peermsgs * msgs );
1222
1223static void
1224parseLtep( tr_peermsgs *     msgs,
1225           int               msglen,
1226           struct evbuffer * inbuf )
1227{
1228    uint8_t ltep_msgid;
1229
1230    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1231    msglen--;
1232
1233    if( ltep_msgid == LTEP_HANDSHAKE )
1234    {
1235        dbgmsg( msgs, "got ltep handshake" );
1236        parseLtepHandshake( msgs, msglen, inbuf );
1237        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1238        {
1239            sendLtepHandshake( msgs );
1240            sendPex( msgs );
1241        }
1242    }
1243    else if( ltep_msgid == TR_LTEP_PEX )
1244    {
1245        dbgmsg( msgs, "got ut pex" );
1246        msgs->peerSupportsPex = 1;
1247        parseUtPex( msgs, msglen, inbuf );
1248    }
1249    else
1250    {
1251        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1252        evbuffer_drain( inbuf, msglen );
1253    }
1254}
1255
1256static int
1257readBtLength( tr_peermsgs *     msgs,
1258              struct evbuffer * inbuf,
1259              size_t            inlen )
1260{
1261    uint32_t len;
1262
1263    if( inlen < sizeof( len ) )
1264        return READ_LATER;
1265
1266    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1267
1268    if( len == 0 ) /* peer sent us a keepalive message */
1269        dbgmsg( msgs, "got KeepAlive" );
1270    else
1271    {
1272        msgs->incoming.length = len;
1273        msgs->state = AWAITING_BT_ID;
1274    }
1275
1276    return READ_NOW;
1277}
1278
1279static int readBtMessage( tr_peermsgs *     msgs,
1280                          struct evbuffer * inbuf,
1281                          size_t            inlen );
1282
1283static int
1284readBtId( tr_peermsgs *     msgs,
1285          struct evbuffer * inbuf,
1286          size_t            inlen )
1287{
1288    uint8_t id;
1289
1290    if( inlen < sizeof( uint8_t ) )
1291        return READ_LATER;
1292
1293    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1294    msgs->incoming.id = id;
1295
1296    if( id == BT_PIECE )
1297    {
1298        msgs->state = AWAITING_BT_PIECE;
1299        return READ_NOW;
1300    }
1301    else if( msgs->incoming.length != 1 )
1302    {
1303        msgs->state = AWAITING_BT_MESSAGE;
1304        return READ_NOW;
1305    }
1306    else return readBtMessage( msgs, inbuf, inlen - 1 );
1307}
1308
1309static void
1310updatePeerProgress( tr_peermsgs * msgs )
1311{
1312    msgs->peer->progress = tr_bitfieldCountTrueBits( msgs->peer->have ) / (float)msgs->torrent->info.pieceCount;
1313    dbgmsg( msgs, "peer progress is %f", msgs->peer->progress );
1314    updateFastSet( msgs );
1315    updateInterest( msgs );
1316    firePeerProgress( msgs );
1317}
1318
1319static void
1320peerMadeRequest( tr_peermsgs *               msgs,
1321                 const struct peer_request * req )
1322{
1323    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1324    const int reqIsValid = requestIsValid( msgs, req );
1325    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
1326    const int peerIsChoked = msgs->peer->peerIsChoked;
1327
1328    int allow = FALSE;
1329
1330    if( !reqIsValid )
1331        dbgmsg( msgs, "rejecting an invalid request." );
1332    else if( !clientHasPiece )
1333        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1334    else if( peerIsChoked )
1335        dbgmsg( msgs, "rejecting request from choked peer" );
1336    else
1337        allow = TRUE;
1338
1339    if( allow )
1340        reqListAppend( &msgs->peerAskedFor, req );
1341    else if( fext )
1342        protocolSendReject( msgs, req );
1343}
1344
1345static int
1346messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1347{
1348    switch( id )
1349    {
1350        case BT_CHOKE:
1351        case BT_UNCHOKE:
1352        case BT_INTERESTED:
1353        case BT_NOT_INTERESTED:
1354        case BT_FEXT_HAVE_ALL:
1355        case BT_FEXT_HAVE_NONE:
1356            return len == 1;
1357
1358        case BT_HAVE:
1359        case BT_FEXT_SUGGEST:
1360        case BT_FEXT_ALLOWED_FAST:
1361            return len == 5;
1362
1363        case BT_BITFIELD:
1364            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1365
1366        case BT_REQUEST:
1367        case BT_CANCEL:
1368        case BT_FEXT_REJECT:
1369            return len == 13;
1370
1371        case BT_PIECE:
1372            return len > 9 && len <= 16393;
1373
1374        case BT_PORT:
1375            return len == 3;
1376
1377        case BT_LTEP:
1378            return len >= 2;
1379
1380        default:
1381            return FALSE;
1382    }
1383}
1384
1385static int clientGotBlock( tr_peermsgs *               msgs,
1386                           const uint8_t *             block,
1387                           const struct peer_request * req );
1388
1389static int
1390readBtPiece( tr_peermsgs      * msgs,
1391             struct evbuffer  * inbuf,
1392             size_t             inlen,
1393             size_t           * setme_piece_bytes_read )
1394{
1395    struct peer_request * req = &msgs->incoming.blockReq;
1396
1397    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1398    dbgmsg( msgs, "In readBtPiece" );
1399
1400    if( !req->length )
1401    {
1402        if( inlen < 8 )
1403            return READ_LATER;
1404
1405        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1406        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1407        req->length = msgs->incoming.length - 9;
1408        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1409        return READ_NOW;
1410    }
1411    else
1412    {
1413        int err;
1414
1415        /* read in another chunk of data */
1416        const size_t nLeft = req->length - EVBUFFER_LENGTH( msgs->incoming.block );
1417        size_t n = MIN( nLeft, inlen );
1418        size_t i = n;
1419
1420        while( i > 0 )
1421        {
1422            uint8_t buf[MAX_STACK_ARRAY_SIZE];
1423            const size_t thisPass = MIN( i, sizeof( buf ) );
1424            tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, thisPass );
1425            evbuffer_add( msgs->incoming.block, buf, thisPass );
1426            i -= thisPass;
1427        }
1428
1429        fireClientGotData( msgs, n, TRUE );
1430        *setme_piece_bytes_read += n;
1431        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1432               n, req->index, req->offset, req->length,
1433               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1434        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1435            return READ_LATER;
1436
1437        /* we've got the whole block ... process it */
1438        err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), req );
1439
1440        /* cleanup */
1441        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH( msgs->incoming.block ) );
1442        req->length = 0;
1443        msgs->state = AWAITING_BT_LENGTH;
1444        if( !err )
1445            return READ_NOW;
1446        else {
1447            fireError( msgs, err );
1448            return READ_ERR;
1449        }
1450    }
1451}
1452
1453static int
1454readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1455{
1456    uint32_t      ui32;
1457    uint32_t      msglen = msgs->incoming.length;
1458    const uint8_t id = msgs->incoming.id;
1459    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1460    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1461
1462    --msglen; /* id length */
1463
1464    if( inlen < msglen )
1465        return READ_LATER;
1466
1467    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1468
1469    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1470    {
1471        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1472        fireError( msgs, EMSGSIZE );
1473        return READ_ERR;
1474    }
1475
1476    switch( id )
1477    {
1478        case BT_CHOKE:
1479            dbgmsg( msgs, "got Choke" );
1480            msgs->peer->clientIsChoked = 1;
1481            if( !fext )
1482                cancelAllRequestsToPeer( msgs, FALSE );
1483            break;
1484
1485        case BT_UNCHOKE:
1486            dbgmsg( msgs, "got Unchoke" );
1487            msgs->peer->clientIsChoked = 0;
1488            fireNeedReq( msgs );
1489            break;
1490
1491        case BT_INTERESTED:
1492            dbgmsg( msgs, "got Interested" );
1493            msgs->peer->peerIsInterested = 1;
1494            break;
1495
1496        case BT_NOT_INTERESTED:
1497            dbgmsg( msgs, "got Not Interested" );
1498            msgs->peer->peerIsInterested = 0;
1499            break;
1500
1501        case BT_HAVE:
1502            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1503            dbgmsg( msgs, "got Have: %u", ui32 );
1504            if( tr_bitfieldAdd( msgs->peer->have, ui32 ) ) {
1505                fireError( msgs, ERANGE );
1506                return READ_ERR;
1507            }
1508            updatePeerProgress( msgs );
1509            tr_rcTransferred( msgs->torrent->swarmSpeed,
1510                              msgs->torrent->info.pieceSize );
1511            break;
1512
1513        case BT_BITFIELD:
1514        {
1515            dbgmsg( msgs, "got a bitfield" );
1516            msgs->peerSentBitfield = 1;
1517            tr_peerIoReadBytes( msgs->peer->io, inbuf, msgs->peer->have->bits, msglen );
1518            updatePeerProgress( msgs );
1519            fireNeedReq( msgs );
1520            break;
1521        }
1522
1523        case BT_REQUEST:
1524        {
1525            struct peer_request r;
1526            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1527            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1528            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1529            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1530            peerMadeRequest( msgs, &r );
1531            break;
1532        }
1533
1534        case BT_CANCEL:
1535        {
1536            struct peer_request r;
1537            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1538            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1539            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1540            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1541            if( reqListRemove( &msgs->peerAskedFor, &r ) && fext )
1542                protocolSendReject( msgs, &r );
1543            break;
1544        }
1545
1546        case BT_PIECE:
1547            assert( 0 ); /* handled elsewhere! */
1548            break;
1549
1550        case BT_PORT:
1551            dbgmsg( msgs, "Got a BT_PORT" );
1552            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->port );
1553            break;
1554
1555        case BT_FEXT_SUGGEST:
1556            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1557            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1558            if( fext )
1559                fireClientGotSuggest( msgs, ui32 );
1560            else {
1561                fireError( msgs, EMSGSIZE );
1562                return READ_ERR;
1563            }
1564            break;
1565
1566        case BT_FEXT_ALLOWED_FAST:
1567            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1568            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1569            if( fext )
1570                fireClientGotAllowedFast( msgs, ui32 );
1571            else {
1572                fireError( msgs, EMSGSIZE );
1573                return READ_ERR;
1574            }
1575            break;
1576
1577        case BT_FEXT_HAVE_ALL:
1578            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1579            if( fext ) {
1580                tr_bitfieldAddRange( msgs->peer->have, 0, msgs->torrent->info.pieceCount );
1581                updatePeerProgress( msgs );
1582            } else {
1583                fireError( msgs, EMSGSIZE );
1584                return READ_ERR;
1585            }
1586            break;
1587
1588        case BT_FEXT_HAVE_NONE:
1589            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1590            if( fext ) {
1591                tr_bitfieldClear( msgs->peer->have );
1592                updatePeerProgress( msgs );
1593            } else {
1594                fireError( msgs, EMSGSIZE );
1595                return READ_ERR;
1596            }
1597            break;
1598
1599        case BT_FEXT_REJECT:
1600        {
1601            struct peer_request r;
1602            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1603            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1604            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1605            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1606            if( fext )
1607                reqListRemove( &msgs->clientAskedFor, &r );
1608            else {
1609                fireError( msgs, EMSGSIZE );
1610                return READ_ERR;
1611            }
1612            break;
1613        }
1614
1615        case BT_LTEP:
1616            dbgmsg( msgs, "Got a BT_LTEP" );
1617            parseLtep( msgs, msglen, inbuf );
1618            break;
1619
1620        default:
1621            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1622            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1623            break;
1624    }
1625
1626    assert( msglen + 1 == msgs->incoming.length );
1627    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1628
1629    msgs->state = AWAITING_BT_LENGTH;
1630    return READ_NOW;
1631}
1632
1633static void
1634decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount )
1635{
1636    tr_torrent * tor = msgs->torrent;
1637
1638    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1639}
1640
1641static void
1642clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1643{
1644    decrementDownloadedCount( msgs, req->length );
1645}
1646
1647static void
1648addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1649{
1650    if( !msgs->peer->blame )
1651         msgs->peer->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1652    tr_bitfieldAdd( msgs->peer->blame, index );
1653}
1654
1655/* returns 0 on success, or an errno on failure */
1656static int
1657clientGotBlock( tr_peermsgs *               msgs,
1658                const uint8_t *             data,
1659                const struct peer_request * req )
1660{
1661    int err;
1662    tr_torrent * tor = msgs->torrent;
1663    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1664
1665    assert( msgs );
1666    assert( req );
1667
1668    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1669        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1670                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1671        return EMSGSIZE;
1672    }
1673
1674    /* save the block */
1675    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1676
1677    /**
1678    *** Remove the block from our `we asked for this' list
1679    **/
1680
1681    if( !reqListRemove( &msgs->clientAskedFor, req ) ) {
1682        clientGotUnwantedBlock( msgs, req );
1683        dbgmsg( msgs, "we didn't ask for this message..." );
1684        return 0;
1685    }
1686
1687    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1688            msgs->clientAskedFor.count );
1689
1690    /**
1691    *** Error checks
1692    **/
1693
1694    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1695        dbgmsg( msgs, "we have this block already..." );
1696        clientGotUnwantedBlock( msgs, req );
1697        return 0;
1698    }
1699
1700    /**
1701    ***  Save the block
1702    **/
1703
1704    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1705        return err;
1706
1707    addPeerToBlamefield( msgs, req->index );
1708    fireGotBlock( msgs, req );
1709    return 0;
1710}
1711
1712static int peerPulse( void * vmsgs );
1713
1714static void
1715didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1716{
1717    tr_peermsgs * msgs = vmsgs;
1718    firePeerGotData( msgs, bytesWritten, wasPieceData );
1719    peerPulse( msgs );
1720}
1721
1722static ReadState
1723canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1724{
1725    ReadState         ret;
1726    tr_peermsgs *     msgs = vmsgs;
1727    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1728    const size_t      inlen = EVBUFFER_LENGTH( in );
1729
1730    if( !inlen )
1731    {
1732        ret = READ_LATER;
1733    }
1734    else if( msgs->state == AWAITING_BT_PIECE )
1735    {
1736        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1737    }
1738    else switch( msgs->state )
1739    {
1740        case AWAITING_BT_LENGTH:
1741            ret = readBtLength ( msgs, in, inlen ); break;
1742
1743        case AWAITING_BT_ID:
1744            ret = readBtId     ( msgs, in, inlen ); break;
1745
1746        case AWAITING_BT_MESSAGE:
1747            ret = readBtMessage( msgs, in, inlen ); break;
1748
1749        default:
1750            assert( 0 );
1751    }
1752
1753    /* log the raw data that was read */
1754    if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
1755        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1756
1757    return ret;
1758}
1759
1760/**
1761***
1762**/
1763
1764static int
1765ratePulse( void * vmsgs )
1766{
1767    tr_peermsgs * msgs = vmsgs;
1768    const double rateToClient = tr_peerGetPieceSpeed( msgs->peer, TR_PEER_TO_CLIENT );
1769    const int seconds = 10;
1770    const int floor = 8;
1771    const int estimatedBlocksInPeriod = ( rateToClient * seconds * 1024 ) / msgs->torrent->blockSize;
1772
1773    msgs->maxActiveRequests = floor + estimatedBlocksInPeriod;
1774
1775    if( msgs->reqq > 0 )
1776        msgs->maxActiveRequests = MIN( msgs->maxActiveRequests, msgs->reqq );
1777
1778    return TRUE;
1779}
1780
1781static size_t
1782fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1783{
1784    size_t bytesWritten = 0;
1785    struct peer_request req;
1786    const tr_bool haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1787    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1788
1789    /**
1790    ***  Protocol messages
1791    **/
1792
1793    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1794    {
1795        dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1796        msgs->outMessagesBatchedAt = now;
1797    }
1798    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1799    {
1800        const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
1801        /* flush the protocol messages */
1802        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1803        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, FALSE );
1804        msgs->clientSentAnythingAt = now;
1805        msgs->outMessagesBatchedAt = 0;
1806        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1807        bytesWritten +=  len;
1808    }
1809
1810    /**
1811    ***  Blocks
1812    **/
1813
1814    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io ) >= msgs->torrent->blockSize )
1815        && popNextRequest( msgs, &req ) )
1816    {
1817        if( requestIsValid( msgs, &req )
1818            && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) )
1819        {
1820            /* send a block */
1821            uint8_t * buf = tr_new( uint8_t, req.length );
1822            const int err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf );
1823            if( err ) {
1824                fireError( msgs, err );
1825                bytesWritten = 0;
1826                msgs = NULL;
1827            } else {
1828                tr_peerIo * io = msgs->peer->io;
1829                struct evbuffer * out = evbuffer_new( );
1830                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1831                tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1832                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1833                tr_peerIoWriteUint32( io, out, req.index );
1834                tr_peerIoWriteUint32( io, out, req.offset );
1835                tr_peerIoWriteBytes ( io, out, buf, req.length );
1836                tr_peerIoWriteBuf( io, out, TRUE );
1837                bytesWritten += EVBUFFER_LENGTH( out );
1838                evbuffer_free( out );
1839                msgs->clientSentAnythingAt = now;
1840            }
1841            tr_free( buf );
1842        }
1843        else if( fext ) /* peer needs a reject message */
1844        {
1845            protocolSendReject( msgs, &req );
1846        }
1847    }
1848
1849    /**
1850    ***  Keepalive
1851    **/
1852
1853    if( ( msgs != NULL )
1854        && ( msgs->clientSentAnythingAt != 0 )
1855        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1856    {
1857        dbgmsg( msgs, "sending a keepalive message" );
1858        tr_peerIoWriteUint32( msgs->peer->io, msgs->outMessages, 0 );
1859        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1860    }
1861
1862    return bytesWritten;
1863}
1864
1865static int
1866peerPulse( void * vmsgs )
1867{
1868    tr_peermsgs * msgs = vmsgs;
1869    const time_t  now = time( NULL );
1870
1871    ratePulse( msgs );
1872
1873    pumpRequestQueue( msgs, now );
1874    expireOldRequests( msgs, now );
1875
1876    for( ;; )
1877        if( fillOutputBuffer( msgs, now ) < 1 )
1878            break;
1879
1880    return TRUE; /* loop forever */
1881}
1882
1883void
1884tr_peerMsgsPulse( tr_peermsgs * msgs )
1885{
1886    if( msgs != NULL )
1887        peerPulse( msgs );
1888}
1889
1890static void
1891gotError( tr_peerIo  * io UNUSED,
1892          short        what,
1893          void       * vmsgs )
1894{
1895    if( what & EVBUFFER_TIMEOUT )
1896        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1897    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1898        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1899               what, errno, tr_strerror( errno ) );
1900    fireError( vmsgs, ENOTCONN );
1901}
1902
1903static void
1904sendBitfield( tr_peermsgs * msgs )
1905{
1906    struct evbuffer * out = msgs->outMessages;
1907    tr_bitfield *     field;
1908    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
1909    size_t            i;
1910    size_t            lazyCount = 0;
1911
1912    field = tr_bitfieldDup( tr_cpPieceBitfield( msgs->torrent->completion ) );
1913
1914    if( tr_sessionIsLazyBitfieldEnabled( msgs->session ) )
1915    {
1916        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
1917            speed over a truly random sample -- let's limit the pool size to
1918            the first 1000 pieces so large torrents don't bog things down */
1919        size_t poolSize;
1920        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
1921        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
1922
1923        /* build the pool */
1924        for( i=poolSize=0; i<maxPoolSize; ++i )
1925            if( tr_bitfieldHas( field, i ) )
1926                pool[poolSize++] = i;
1927
1928        /* pull random piece indices from the pool */
1929        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
1930        {
1931            const int pos = tr_cryptoWeakRandInt( poolSize );
1932            const tr_piece_index_t piece = pool[pos];
1933            tr_bitfieldRem( field, piece );
1934            lazyPieces[lazyCount++] = piece;
1935            pool[pos] = pool[--poolSize];
1936        }
1937
1938        /* cleanup */
1939        tr_free( pool );
1940    }
1941
1942    tr_peerIoWriteUint32( msgs->peer->io, out,
1943                          sizeof( uint8_t ) + field->byteCount );
1944    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_BITFIELD );
1945    tr_peerIoWriteBytes ( msgs->peer->io, out, field->bits, field->byteCount );
1946    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu",
1947            EVBUFFER_LENGTH( out ) );
1948    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1949
1950    for( i = 0; i < lazyCount; ++i )
1951        protocolSendHave( msgs, lazyPieces[i] );
1952
1953    tr_bitfieldFree( field );
1954}
1955
1956static void
1957tellPeerWhatWeHave( tr_peermsgs * msgs )
1958{
1959    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1960
1961    if( fext && ( tr_cpGetStatus( msgs->torrent->completion ) == TR_SEED ) )
1962    {
1963        protocolSendHaveAll( msgs );
1964    }
1965    else if( fext && ( tr_cpHaveValid( msgs->torrent->completion ) == 0 ) )
1966    {
1967        protocolSendHaveNone( msgs );
1968    }
1969    else
1970    {
1971        sendBitfield( msgs );
1972    }
1973}
1974
1975/**
1976***
1977**/
1978
1979/* some peers give us error messages if we send
1980   more than this many peers in a single pex message
1981   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1982#define MAX_PEX_ADDED 50
1983#define MAX_PEX_DROPPED 50
1984
1985typedef struct
1986{
1987    tr_pex *  added;
1988    tr_pex *  dropped;
1989    tr_pex *  elements;
1990    int       addedCount;
1991    int       droppedCount;
1992    int       elementCount;
1993}
1994PexDiffs;
1995
1996static void
1997pexAddedCb( void * vpex,
1998            void * userData )
1999{
2000    PexDiffs * diffs = userData;
2001    tr_pex *   pex = vpex;
2002
2003    if( diffs->addedCount < MAX_PEX_ADDED )
2004    {
2005        diffs->added[diffs->addedCount++] = *pex;
2006        diffs->elements[diffs->elementCount++] = *pex;
2007    }
2008}
2009
2010static void
2011pexDroppedCb( void * vpex,
2012              void * userData )
2013{
2014    PexDiffs * diffs = userData;
2015    tr_pex *   pex = vpex;
2016
2017    if( diffs->droppedCount < MAX_PEX_DROPPED )
2018    {
2019        diffs->dropped[diffs->droppedCount++] = *pex;
2020    }
2021}
2022
2023static void
2024pexElementCb( void * vpex,
2025              void * userData )
2026{
2027    PexDiffs * diffs = userData;
2028    tr_pex *   pex = vpex;
2029
2030    diffs->elements[diffs->elementCount++] = *pex;
2031}
2032
2033static void
2034sendPex( tr_peermsgs * msgs )
2035{
2036    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2037    {
2038        PexDiffs diffs;
2039        PexDiffs diffs6;
2040        tr_pex * newPex = NULL;
2041        tr_pex * newPex6 = NULL;
2042        const int newCount = tr_peerMgrGetPeers( msgs->session->peerMgr,
2043                                                 msgs->torrent->info.hash,
2044                                                 &newPex, TR_AF_INET );
2045        const int newCount6 = tr_peerMgrGetPeers( msgs->session->peerMgr,
2046                                                  msgs->torrent->info.hash,
2047                                                  &newPex6, TR_AF_INET6 );
2048
2049        /* build the diffs */
2050        diffs.added = tr_new( tr_pex, newCount );
2051        diffs.addedCount = 0;
2052        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2053        diffs.droppedCount = 0;
2054        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2055        diffs.elementCount = 0;
2056        tr_set_compare( msgs->pex, msgs->pexCount,
2057                        newPex, newCount,
2058                        tr_pexCompare, sizeof( tr_pex ),
2059                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2060        diffs6.added = tr_new( tr_pex, newCount6 );
2061        diffs6.addedCount = 0;
2062        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
2063        diffs6.droppedCount = 0;
2064        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
2065        diffs6.elementCount = 0;
2066        tr_set_compare( msgs->pex6, msgs->pexCount6,
2067                        newPex6, newCount6,
2068                        tr_pexCompare, sizeof( tr_pex ),
2069                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
2070        dbgmsg(
2071            msgs,
2072            "pex: old peer count %d, new peer count %d, added %d, removed %d",
2073            msgs->pexCount, newCount + newCount6,
2074            diffs.addedCount + diffs6.addedCount,
2075            diffs.droppedCount + diffs6.droppedCount );
2076
2077        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2078            !diffs6.droppedCount )
2079        {
2080            tr_free( diffs.elements );
2081            tr_free( diffs6.elements );
2082        }
2083        else
2084        {
2085            int  i;
2086            tr_benc val;
2087            char * benc;
2088            int bencLen;
2089            uint8_t * tmp, *walk;
2090            struct evbuffer * out = msgs->outMessages;
2091
2092            /* update peer */
2093            tr_free( msgs->pex );
2094            msgs->pex = diffs.elements;
2095            msgs->pexCount = diffs.elementCount;
2096            tr_free( msgs->pex6 );
2097            msgs->pex6 = diffs6.elements;
2098            msgs->pexCount6 = diffs6.elementCount;
2099
2100            /* build the pex payload */
2101            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
2102                                         * speed vs. likelihood? */
2103
2104            /* "added" */
2105            tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2106            for( i = 0; i < diffs.addedCount; ++i )
2107            {
2108                tr_suspectAddress( &diffs.added[i].addr, "pex" );
2109                memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2110                memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2111            }
2112            assert( ( walk - tmp ) == diffs.addedCount * 6 );
2113            tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2114            tr_free( tmp );
2115
2116            /* "added.f" */
2117            tmp = walk = tr_new( uint8_t, diffs.addedCount );
2118            for( i = 0; i < diffs.addedCount; ++i )
2119                *walk++ = diffs.added[i].flags;
2120            assert( ( walk - tmp ) == diffs.addedCount );
2121            tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2122            tr_free( tmp );
2123
2124            /* "dropped" */
2125            tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2126            for( i = 0; i < diffs.droppedCount; ++i )
2127            {
2128                memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2129                memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2130            }
2131            assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2132            tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2133            tr_free( tmp );
2134           
2135            /* "added6" */
2136            tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2137            for( i = 0; i < diffs6.addedCount; ++i )
2138            {
2139                tr_suspectAddress( &diffs6.added[i].addr, "pex6" );
2140                memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2141                walk += 16;
2142                memcpy( walk, &diffs6.added[i].port, 2 );
2143                walk += 2;
2144            }
2145            assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2146            tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2147            tr_free( tmp );
2148           
2149            /* "added6.f" */
2150            tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2151            for( i = 0; i < diffs6.addedCount; ++i )
2152                *walk++ = diffs6.added[i].flags;
2153            assert( ( walk - tmp ) == diffs6.addedCount );
2154            tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2155            tr_free( tmp );
2156           
2157            /* "dropped6" */
2158            tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2159            for( i = 0; i < diffs6.droppedCount; ++i )
2160            {
2161                memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2162                walk += 16;
2163                memcpy( walk, &diffs6.dropped[i].port, 2 );
2164                walk += 2;
2165            }
2166            assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2167            tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2168            tr_free( tmp );
2169
2170            /* write the pex message */
2171            benc = tr_bencSave( &val, &bencLen );
2172            tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + bencLen );
2173            tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
2174            tr_peerIoWriteUint8 ( msgs->peer->io, out, msgs->ut_pex_id );
2175            tr_peerIoWriteBytes ( msgs->peer->io, out, benc, bencLen );
2176            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2177            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
2178
2179            tr_free( benc );
2180            tr_bencFree( &val );
2181        }
2182
2183        /* cleanup */
2184        tr_free( diffs.added );
2185        tr_free( diffs.dropped );
2186        tr_free( newPex );
2187        tr_free( diffs6.added );
2188        tr_free( diffs6.dropped );
2189        tr_free( newPex6 );
2190
2191        msgs->clientSentPexAt = time( NULL );
2192    }
2193}
2194
2195static int
2196pexPulse( void * vpeer )
2197{
2198    sendPex( vpeer );
2199    return TRUE;
2200}
2201
2202/**
2203***
2204**/
2205
2206tr_peermsgs*
2207tr_peerMsgsNew( struct tr_torrent * torrent,
2208                struct tr_peer    * peer,
2209                tr_delivery_func    func,
2210                void              * userData,
2211                tr_publisher_tag  * setme )
2212{
2213    tr_peermsgs * m;
2214
2215    assert( peer );
2216    assert( peer->io );
2217
2218    m = tr_new0( tr_peermsgs, 1 );
2219    m->publisher = TR_PUBLISHER_INIT;
2220    m->peer = peer;
2221    m->session = torrent->session;
2222    m->torrent = torrent;
2223    m->peer->clientIsChoked = 1;
2224    m->peer->peerIsChoked = 1;
2225    m->peer->clientIsInterested = 0;
2226    m->peer->peerIsInterested = 0;
2227    m->peer->have = tr_bitfieldNew( torrent->info.pieceCount );
2228    m->state = AWAITING_BT_LENGTH;
2229    m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL );
2230    m->outMessages = evbuffer_new( );
2231    m->outMessagesBatchedAt = 0;
2232    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2233    m->incoming.block = evbuffer_new( );
2234    m->peerAskedFor = REQUEST_LIST_INIT;
2235    m->clientAskedFor = REQUEST_LIST_INIT;
2236    m->clientWillAskFor = REQUEST_LIST_INIT;
2237    peer->msgs = m;
2238
2239    *setme = tr_publisherSubscribe( &m->publisher, func, userData );
2240
2241    if( tr_peerIoSupportsLTEP( peer->io ) )
2242        sendLtepHandshake( m );
2243
2244    tellPeerWhatWeHave( m );
2245
2246    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2247    ratePulse( m );
2248
2249    return m;
2250}
2251
2252void
2253tr_peerMsgsFree( tr_peermsgs* msgs )
2254{
2255    if( msgs )
2256    {
2257        tr_timerFree( &msgs->pexTimer );
2258        tr_publisherDestruct( &msgs->publisher );
2259        reqListClear( &msgs->clientWillAskFor );
2260        reqListClear( &msgs->clientAskedFor );
2261        reqListClear( &msgs->peerAskedFor );
2262
2263        evbuffer_free( msgs->incoming.block );
2264        evbuffer_free( msgs->outMessages );
2265        tr_free( msgs->pex6 );
2266        tr_free( msgs->pex );
2267
2268        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2269        tr_free( msgs );
2270    }
2271}
2272
2273void
2274tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2275                        tr_publisher_tag tag )
2276{
2277    tr_publisherUnsubscribe( &peer->publisher, tag );
2278}
2279
Note: See TracBrowser for help on using the repository browser.