source: trunk/libtransmission/peer-msgs.c @ 7235

Last change on this file since 7235 was 7235, checked in by charles, 13 years ago

(libT) misc cleanup

  • Property svn:keywords set to Date Rev Author Id
File size: 61.2 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 7235 2008-12-02 18:24:26Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <event.h>
21
22#include "transmission.h"
23#include "bencode.h"
24#include "completion.h"
25#include "crypto.h"
26#include "inout.h"
27#include "iobuf.h"
28#ifdef WIN32
29#include "net.h" /* for ECONN */
30#endif
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-mgr-private.h"
34#include "peer-msgs.h"
35#include "ratecontrol.h"
36#include "stats.h"
37#include "torrent.h"
38#include "trevent.h"
39#include "utils.h"
40
41/**
42***
43**/
44
45enum
46{
47    BT_CHOKE                = 0,
48    BT_UNCHOKE              = 1,
49    BT_INTERESTED           = 2,
50    BT_NOT_INTERESTED       = 3,
51    BT_HAVE                 = 4,
52    BT_BITFIELD             = 5,
53    BT_REQUEST              = 6,
54    BT_PIECE                = 7,
55    BT_CANCEL               = 8,
56    BT_PORT                 = 9,
57
58    BT_FEXT_SUGGEST         = 13,
59    BT_FEXT_HAVE_ALL        = 14,
60    BT_FEXT_HAVE_NONE       = 15,
61    BT_FEXT_REJECT          = 16,
62    BT_FEXT_ALLOWED_FAST    = 17,
63
64    BT_LTEP                 = 20,
65
66    LTEP_HANDSHAKE          = 0,
67
68    TR_LTEP_PEX             = 1,
69
70
71
72    MIN_CHOKE_PERIOD_SEC    = ( 10 ),
73
74    /* idle seconds before we send a keepalive */
75    KEEPALIVE_INTERVAL_SECS = 100,
76
77    PEX_INTERVAL            = ( 90 * 1000 ), /* msec between sendPex() calls */
78    PEER_PULSE_INTERVAL     = ( 250 ),       /* msec between peerPulse() calls
79                                               */
80
81    MAX_QUEUE_SIZE          = ( 100 ),
82
83    /* how long an unsent request can stay queued before it's returned
84       back to the peer-mgr's pool of requests */
85    QUEUED_REQUEST_TTL_SECS = 20,
86
87    /* how long a sent request can stay queued before it's returned
88       back to the peer-mgr's pool of requests */
89    SENT_REQUEST_TTL_SECS = 240,
90
91    /* used in lowering the outMessages queue period */
92    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
93    HIGH_PRIORITY_INTERVAL_SECS = 2,
94    LOW_PRIORITY_INTERVAL_SECS = 20,
95
96    /* number of pieces to remove from the bitfield when
97     * lazy bitfields are turned on */
98    LAZY_PIECE_COUNT = 26,
99
100    /* number of pieces we'll allow in our fast set */
101    MAX_FAST_SET_SIZE = 3
102};
103
104/**
105***  REQUEST MANAGEMENT
106**/
107
108enum
109{
110    AWAITING_BT_LENGTH,
111    AWAITING_BT_ID,
112    AWAITING_BT_MESSAGE,
113    AWAITING_BT_PIECE
114};
115
116struct peer_request
117{
118    uint32_t    index;
119    uint32_t    offset;
120    uint32_t    length;
121    time_t      time_requested;
122};
123
124static int
125compareRequest( const void * va,
126                const void * vb )
127{
128    const struct peer_request * a = va;
129    const struct peer_request * b = vb;
130
131    if( a->index != b->index )
132        return a->index < b->index ? -1 : 1;
133
134    if( a->offset != b->offset )
135        return a->offset < b->offset ? -1 : 1;
136
137    if( a->length != b->length )
138        return a->length < b->length ? -1 : 1;
139
140    return 0;
141}
142
143struct request_list
144{
145    uint16_t               count;
146    uint16_t               max;
147    struct peer_request *  requests;
148};
149
150static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
151
152static void
153reqListReserve( struct request_list * list,
154                uint16_t              max )
155{
156    if( list->max < max )
157    {
158        list->max = max;
159        list->requests = tr_renew( struct peer_request,
160                                   list->requests,
161                                   list->max );
162    }
163}
164
165static void
166reqListClear( struct request_list * list )
167{
168    tr_free( list->requests );
169    *list = REQUEST_LIST_INIT;
170}
171
172static void
173reqListCopy( struct request_list *       dest,
174             const struct request_list * src )
175{
176    dest->count = dest->max = src->count;
177    dest->requests =
178        tr_memdup( src->requests, dest->count * sizeof( struct peer_request ) );
179}
180
181static void
182reqListRemoveOne( struct request_list * list,
183                  int                   i )
184{
185    assert( 0 <= i && i < list->count );
186
187    memmove( &list->requests[i],
188            &list->requests[i + 1],
189            sizeof( struct peer_request ) * ( --list->count - i ) );
190}
191
192static void
193reqListAppend( struct request_list *       list,
194               const struct peer_request * req )
195{
196    if( ++list->count >= list->max )
197        reqListReserve( list, list->max + 8 );
198
199    list->requests[list->count - 1] = *req;
200}
201
202static int
203reqListPop( struct request_list * list,
204            struct peer_request * setme )
205{
206    int success;
207
208    if( !list->count )
209        success = FALSE;
210    else {
211        *setme = list->requests[0];
212        reqListRemoveOne( list, 0 );
213        success = TRUE;
214    }
215
216    return success;
217}
218
219static int
220reqListFind( struct request_list *       list,
221             const struct peer_request * key )
222{
223    uint16_t i;
224
225    for( i = 0; i < list->count; ++i )
226        if( !compareRequest( key, list->requests + i ) )
227            return i;
228
229    return -1;
230}
231
232static int
233reqListRemove( struct request_list *       list,
234               const struct peer_request * key )
235{
236    int success;
237    const int i = reqListFind( list, key );
238
239    if( i < 0 )
240        success = FALSE;
241    else {
242        reqListRemoveOne( list, i );
243        success = TRUE;
244    }
245
246    return success;
247}
248
249/**
250***
251**/
252
253/* this is raw, unchanged data from the peer regarding
254 * the current message that it's sending us. */
255struct tr_incoming
256{
257    uint8_t                id;
258    uint32_t               length; /* includes the +1 for id length */
259    struct peer_request    blockReq; /* metadata for incoming blocks */
260    struct evbuffer *      block; /* piece data for incoming blocks */
261};
262
263struct tr_peermsgs
264{
265    tr_bool         peerSentBitfield;
266    tr_bool         peerSupportsPex;
267    tr_bool         clientSentLtepHandshake;
268    tr_bool         peerSentLtepHandshake;
269    tr_bool         haveFastSet;
270
271    uint8_t         state;
272    uint8_t         ut_pex_id;
273    uint16_t        pexCount;
274    uint16_t        minActiveRequests;
275    uint16_t        maxActiveRequests;
276
277    size_t                 fastsetSize;
278    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
279
280    /* how long the outMessages batch should be allowed to grow before
281     * it's flushed -- some messages (like requests >:) should be sent
282     * very quickly; others aren't as urgent. */
283    int                    outMessagesBatchPeriod;
284
285    tr_peer *              info;
286
287    tr_session *           session;
288    tr_torrent *           torrent;
289    tr_peerIo *            io;
290
291    tr_publisher_t *       publisher;
292
293    struct evbuffer *      outMessages; /* all the non-piece messages */
294
295    struct request_list    peerAskedFor;
296    struct request_list    clientAskedFor;
297    struct request_list    clientWillAskFor;
298
299    tr_timer *             pexTimer;
300
301    time_t                 clientSentPexAt;
302    time_t                 clientSentAnythingAt;
303
304    /* when we started batching the outMessages */
305    time_t                outMessagesBatchedAt;
306
307    tr_bitfield *         peerAllowedPieces;
308
309    struct tr_incoming    incoming;
310
311    tr_pex *              pex;
312};
313
314/**
315***
316**/
317
318static void
319myDebug( const char *               file,
320         int                        line,
321         const struct tr_peermsgs * msgs,
322         const char *               fmt,
323         ... )
324{
325    FILE * fp = tr_getLog( );
326
327    if( fp )
328    {
329        va_list           args;
330        char              timestr[64];
331        struct evbuffer * buf = evbuffer_new( );
332        char *            base = tr_basename( file );
333
334        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
335                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
336                             msgs->torrent->info.name,
337                             tr_peerIoGetAddrStr( msgs->io ),
338                             msgs->info->client );
339        va_start( args, fmt );
340        evbuffer_add_vprintf( buf, fmt, args );
341        va_end( args );
342        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
343        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
344
345        tr_free( base );
346        evbuffer_free( buf );
347    }
348}
349
350#define dbgmsg( msgs, ... ) \
351    do { \
352        if( tr_deepLoggingIsActive( ) ) \
353            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
354    } while( 0 )
355
356/**
357***
358**/
359
360static void
361pokeBatchPeriod( tr_peermsgs * msgs,
362                 int           interval )
363{
364    if( msgs->outMessagesBatchPeriod > interval )
365    {
366        msgs->outMessagesBatchPeriod = interval;
367        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
368    }
369}
370
371static void
372dbgOutMessageLen( tr_peermsgs * msgs )
373{
374    dbgmsg( msgs, "outMessage size is now %zu", EVBUFFER_LENGTH( msgs->outMessages ) );
375}
376
377static void
378protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
379{
380    tr_peerIo       * io  = msgs->io;
381    struct evbuffer * out = msgs->outMessages;
382
383    assert( tr_peerIoSupportsFEXT( msgs->io ) );
384
385    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
386    tr_peerIoWriteUint8 ( io, out, BT_FEXT_REJECT );
387    tr_peerIoWriteUint32( io, out, req->index );
388    tr_peerIoWriteUint32( io, out, req->offset );
389    tr_peerIoWriteUint32( io, out, req->length );
390
391    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
392    dbgOutMessageLen( msgs );
393}
394
395static void
396protocolSendRequest( tr_peermsgs *               msgs,
397                     const struct peer_request * req )
398{
399    tr_peerIo       * io  = msgs->io;
400    struct evbuffer * out = msgs->outMessages;
401
402    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
403    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
404    tr_peerIoWriteUint32( io, out, req->index );
405    tr_peerIoWriteUint32( io, out, req->offset );
406    tr_peerIoWriteUint32( io, out, req->length );
407
408    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
409    dbgOutMessageLen( msgs );
410    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
411}
412
413static void
414protocolSendCancel( tr_peermsgs *               msgs,
415                    const struct peer_request * req )
416{
417    tr_peerIo       * io  = msgs->io;
418    struct evbuffer * out = msgs->outMessages;
419
420    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
421    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
422    tr_peerIoWriteUint32( io, out, req->index );
423    tr_peerIoWriteUint32( io, out, req->offset );
424    tr_peerIoWriteUint32( io, out, req->length );
425
426    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
427    dbgOutMessageLen( msgs );
428    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
429}
430
431static void
432protocolSendHave( tr_peermsgs * msgs,
433                  uint32_t      index )
434{
435    tr_peerIo       * io  = msgs->io;
436    struct evbuffer * out = msgs->outMessages;
437
438    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
439    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
440    tr_peerIoWriteUint32( io, out, index );
441
442    dbgmsg( msgs, "sending Have %u...", index );
443    dbgOutMessageLen( msgs );
444    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
445}
446
447#if 0
448static void
449protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
450{
451    tr_peerIo       * io  = msgs->io;
452    struct evbuffer * out = msgs->outMessages;
453
454    assert( tr_peerIoSupportsFEXT( msgs->io ) );
455
456    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
457    tr_peerIoWriteUint8 ( io, out, BT_FEXT_ALLOWED_FAST );
458    tr_peerIoWriteUint32( io, out, pieceIndex );
459
460    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
461    dbgOutMessageLen( msgs );
462}
463#endif
464
465static void
466protocolSendChoke( tr_peermsgs * msgs,
467                   int           choke )
468{
469    tr_peerIo       * io  = msgs->io;
470    struct evbuffer * out = msgs->outMessages;
471
472    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
473    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
474
475    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
476    dbgOutMessageLen( msgs );
477    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
478}
479
480static void
481protocolSendHaveAll( tr_peermsgs * msgs )
482{
483    tr_peerIo       * io  = msgs->io;
484    struct evbuffer * out = msgs->outMessages;
485
486    assert( tr_peerIoSupportsFEXT( msgs->io ) );
487
488    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
489    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_ALL );
490
491    dbgmsg( msgs, "sending HAVE_ALL..." );
492    dbgOutMessageLen( msgs );
493    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
494}
495
496static void
497protocolSendHaveNone( tr_peermsgs * msgs )
498{
499    tr_peerIo       * io  = msgs->io;
500    struct evbuffer * out = msgs->outMessages;
501
502    assert( tr_peerIoSupportsFEXT( msgs->io ) );
503
504    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
505    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_NONE );
506
507    dbgmsg( msgs, "sending HAVE_NONE..." );
508    dbgOutMessageLen( msgs );
509    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
510}
511
512/**
513***  EVENTS
514**/
515
516static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0 };
517
518static void
519publish( tr_peermsgs *   msgs,
520         tr_peer_event * e )
521{
522    tr_publisherPublish( msgs->publisher, msgs->info, e );
523}
524
525static void
526fireError( tr_peermsgs * msgs,
527           int           err )
528{
529    tr_peer_event e = blankEvent;
530
531    e.eventType = TR_PEER_ERROR;
532    e.err = err;
533    publish( msgs, &e );
534}
535
536static void
537fireNeedReq( tr_peermsgs * msgs )
538{
539    tr_peer_event e = blankEvent;
540
541    e.eventType = TR_PEER_NEED_REQ;
542    publish( msgs, &e );
543}
544
545static void
546firePeerProgress( tr_peermsgs * msgs )
547{
548    tr_peer_event e = blankEvent;
549
550    e.eventType = TR_PEER_PEER_PROGRESS;
551    e.progress = msgs->info->progress;
552    publish( msgs, &e );
553}
554
555static void
556fireGotBlock( tr_peermsgs *               msgs,
557              const struct peer_request * req )
558{
559    tr_peer_event e = blankEvent;
560
561    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
562    e.pieceIndex = req->index;
563    e.offset = req->offset;
564    e.length = req->length;
565    publish( msgs, &e );
566}
567
568static void
569fireClientGotData( tr_peermsgs * msgs,
570                   uint32_t      length,
571                   int           wasPieceData )
572{
573    tr_peer_event e = blankEvent;
574
575    e.length = length;
576    e.eventType = TR_PEER_CLIENT_GOT_DATA;
577    e.wasPieceData = wasPieceData;
578    publish( msgs, &e );
579}
580
581static void
582fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
583{
584    tr_peer_event e = blankEvent;
585    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
586    e.pieceIndex = pieceIndex;
587    publish( msgs, &e );
588}
589
590static void
591fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
592{
593    tr_peer_event e = blankEvent;
594    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
595    e.pieceIndex = pieceIndex;
596    publish( msgs, &e );
597}
598
599static void
600firePeerGotData( tr_peermsgs  * msgs,
601                 uint32_t       length,
602                 int            wasPieceData )
603{
604    tr_peer_event e = blankEvent;
605
606    e.length = length;
607    e.eventType = TR_PEER_PEER_GOT_DATA;
608    e.wasPieceData = wasPieceData;
609
610    publish( msgs, &e );
611}
612
613static void
614fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
615{
616    tr_peer_event e = blankEvent;
617    e.eventType = TR_PEER_CANCEL;
618    e.pieceIndex = req->index;
619    e.offset = req->offset;
620    e.length = req->length;
621    publish( msgs, &e );
622}
623
624/**
625***  ALLOWED FAST SET
626***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
627**/
628
629size_t
630tr_generateAllowedSet( tr_piece_index_t * setmePieces,
631                       size_t             desiredSetSize,
632                       size_t             pieceCount,
633                       const uint8_t    * infohash,
634                       const tr_address * addr )
635{
636    size_t setSize = 0;
637
638    assert( setmePieces );
639    assert( desiredSetSize <= pieceCount );
640    assert( desiredSetSize );
641    assert( pieceCount );
642    assert( infohash );
643    assert( addr );
644
645    if( addr->type == TR_AF_INET )
646    {
647        uint8_t w[SHA_DIGEST_LENGTH + 4];
648        uint8_t x[SHA_DIGEST_LENGTH];
649
650        *(uint32_t*)w = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
651        memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );                /* (2) */
652        tr_sha1( x, w, sizeof( w ), NULL );                          /* (3) */
653
654        while( setSize<desiredSetSize )
655        {
656            int i;
657            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
658            {
659                size_t k;
660                uint32_t j = i * 4;                                  /* (5) */
661                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
662                uint32_t index = y % pieceCount;                     /* (7) */
663
664                for( k=0; k<setSize; ++k )                           /* (8) */
665                    if( setmePieces[k] == index )
666                        break;
667
668                if( k == setSize )
669                    setmePieces[setSize++] = index;                  /* (9) */
670            }
671
672            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
673        }
674    }
675
676    return setSize;
677}
678
679static void
680updateFastSet( tr_peermsgs * msgs UNUSED )
681{
682#if 0
683    const int fext = tr_peerIoSupportsFEXT( msgs->io );
684    const int peerIsNeedy = msgs->info->progress < 0.10;
685
686    if( fext && peerIsNeedy && !msgs->haveFastSet )
687    {
688        size_t i;
689        const struct tr_address * addr = tr_peerIoGetAddress( msgs->io, NULL );
690        const tr_info * inf = &msgs->torrent->info;
691        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
692
693        /* build the fast set */
694        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
695        msgs->haveFastSet = 1;
696
697        /* send it to the peer */
698        for( i=0; i<msgs->fastsetSize; ++i )
699            protocolSendAllowedFast( msgs, msgs->fastset[i] );
700    }
701#endif
702}
703
704/**
705***  INTEREST
706**/
707
708static int
709isPieceInteresting( const tr_peermsgs * peer,
710                    tr_piece_index_t    piece )
711{
712    const tr_torrent * torrent = peer->torrent;
713
714    return ( !torrent->info.pieces[piece].dnd )                 /* we want it */
715           && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have
716                                                                        */
717           && ( tr_bitfieldHas( peer->info->have, piece ) );   /* peer has it */
718}
719
720/* "interested" means we'll ask for piece data if they unchoke us */
721static int
722isPeerInteresting( const tr_peermsgs * msgs )
723{
724    tr_piece_index_t    i;
725    const tr_torrent *  torrent;
726    const tr_bitfield * bitfield;
727    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
728
729    if( clientIsSeed )
730        return FALSE;
731
732    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
733        return FALSE;
734
735    torrent = msgs->torrent;
736    bitfield = tr_cpPieceBitfield( torrent->completion );
737
738    if( !msgs->info->have )
739        return TRUE;
740
741    assert( bitfield->byteCount == msgs->info->have->byteCount );
742
743    for( i = 0; i < torrent->info.pieceCount; ++i )
744        if( isPieceInteresting( msgs, i ) )
745            return TRUE;
746
747    return FALSE;
748}
749
750static void
751sendInterest( tr_peermsgs * msgs,
752              int           weAreInterested )
753{
754    struct evbuffer * out = msgs->outMessages;
755
756    assert( msgs );
757    assert( weAreInterested == 0 || weAreInterested == 1 );
758
759    msgs->info->clientIsInterested = weAreInterested;
760    dbgmsg( msgs, "Sending %s",
761            weAreInterested ? "Interested" : "Not Interested" );
762    tr_peerIoWriteUint32( msgs->io, out, sizeof( uint8_t ) );
763    tr_peerIoWriteUint8 (
764        msgs->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
765    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
766    dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH( out ) );
767}
768
769static void
770updateInterest( tr_peermsgs * msgs )
771{
772    const int i = isPeerInteresting( msgs );
773
774    if( i != msgs->info->clientIsInterested )
775        sendInterest( msgs, i );
776    if( i )
777        fireNeedReq( msgs );
778}
779
780static int
781popNextRequest( tr_peermsgs *         msgs,
782                struct peer_request * setme )
783{
784    return reqListPop( &msgs->peerAskedFor, setme );
785}
786
787static void
788cancelAllRequestsToClient( tr_peermsgs * msgs )
789{
790    struct peer_request req;
791
792    while( popNextRequest( msgs, &req ) )
793        protocolSendReject( msgs, &req );
794}
795
796void
797tr_peerMsgsSetChoke( tr_peermsgs * msgs,
798                     int           choke )
799{
800    const time_t now = time( NULL );
801    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
802
803    assert( msgs );
804    assert( msgs->info );
805    assert( choke == 0 || choke == 1 );
806
807    if( msgs->info->chokeChangedAt > fibrillationTime )
808    {
809        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation",
810                choke );
811    }
812    else if( msgs->info->peerIsChoked != choke )
813    {
814        msgs->info->peerIsChoked = choke;
815        if( choke )
816            cancelAllRequestsToClient( msgs );
817        protocolSendChoke( msgs, choke );
818        msgs->info->chokeChangedAt = now;
819    }
820}
821
822/**
823***
824**/
825
826void
827tr_peerMsgsHave( tr_peermsgs * msgs,
828                 uint32_t      index )
829{
830    protocolSendHave( msgs, index );
831
832    /* since we have more pieces now, we might not be interested in this peer */
833    updateInterest( msgs );
834}
835
836/**
837***
838**/
839
840static int
841reqIsValid( const tr_peermsgs * peer,
842            uint32_t            index,
843            uint32_t            offset,
844            uint32_t            length )
845{
846    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
847}
848
849static int
850requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
851{
852    return reqIsValid( msgs, req->index, req->offset, req->length );
853}
854
855static void
856expireOldRequests( tr_peermsgs * msgs, const time_t now  )
857{
858    int                 i;
859    time_t              oldestAllowed;
860    struct request_list tmp = REQUEST_LIST_INIT;
861    const int fext = tr_peerIoSupportsFEXT( msgs->io );
862
863    /* cancel requests that have been queued for too long */
864    oldestAllowed = now - QUEUED_REQUEST_TTL_SECS;
865    reqListCopy( &tmp, &msgs->clientWillAskFor );
866    for( i = 0; i < tmp.count; ++i ) {
867        const struct peer_request * req = &tmp.requests[i];
868        if( req->time_requested < oldestAllowed )
869            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
870    }
871    reqListClear( &tmp );
872
873    /* if the peer doesn't support "Reject Request",
874     * cancel requests that were sent too long ago. */
875    if( !fext ) {
876        oldestAllowed = now - SENT_REQUEST_TTL_SECS;
877        reqListCopy( &tmp, &msgs->clientAskedFor );
878        for( i=0; i<tmp.count; ++i ) {
879            const struct peer_request * req = &tmp.requests[i];
880            if( req->time_requested < oldestAllowed )
881                tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
882        }
883        reqListClear( &tmp );
884    }
885}
886
887static void
888pumpRequestQueue( tr_peermsgs * msgs, const time_t now )
889{
890    const int           max = msgs->maxActiveRequests;
891    const int           min = msgs->minActiveRequests;
892    int                 sent = 0;
893    int                 count = msgs->clientAskedFor.count;
894    struct peer_request req;
895
896    if( count > min )
897        return;
898    if( msgs->info->clientIsChoked )
899        return;
900    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
901        return;
902
903    while( ( count < max ) && reqListPop( &msgs->clientWillAskFor, &req ) )
904    {
905        const tr_block_index_t block =
906            _tr_block( msgs->torrent, req.index, req.offset );
907
908        assert( requestIsValid( msgs, &req ) );
909        assert( tr_bitfieldHas( msgs->info->have, req.index ) );
910
911        /* don't ask for it if we've already got it... this block may have
912         * come in from a different peer after we cancelled a request for it */
913        if( !tr_cpBlockIsComplete( msgs->torrent->completion, block ) )
914        {
915            protocolSendRequest( msgs, &req );
916            req.time_requested = now;
917            reqListAppend( &msgs->clientAskedFor, &req );
918
919            ++count;
920            ++sent;
921        }
922    }
923
924    if( sent )
925        dbgmsg( msgs,
926                "pump sent %d requests, now have %d active and %d queued",
927                sent,
928                msgs->clientAskedFor.count,
929                msgs->clientWillAskFor.count );
930
931    if( count < max )
932        fireNeedReq( msgs );
933}
934
935static int
936requestQueueIsFull( const tr_peermsgs * msgs )
937{
938    const int req_max = msgs->maxActiveRequests;
939    return msgs->clientWillAskFor.count >= req_max;
940}
941
942tr_addreq_t
943tr_peerMsgsAddRequest( tr_peermsgs *    msgs,
944                       uint32_t         index,
945                       uint32_t         offset,
946                       uint32_t         length,
947                       int              doForce )
948{
949    struct peer_request req;
950
951    assert( msgs );
952    assert( msgs->torrent );
953    assert( reqIsValid( msgs, index, offset, length ) );
954
955    /**
956    ***  Reasons to decline the request
957    **/
958
959    /* don't send requests to choked clients */
960    if( !doForce && msgs->info->clientIsChoked ) {
961        dbgmsg( msgs, "declining request because they're choking us" );
962        return TR_ADDREQ_CLIENT_CHOKED;
963    }
964
965    /* peer doesn't have this piece */
966    if( !doForce && !tr_bitfieldHas( msgs->info->have, index ) )
967        return TR_ADDREQ_MISSING;
968
969    /* peer's queue is full */
970    if( !doForce && requestQueueIsFull( msgs ) ) {
971        dbgmsg( msgs, "declining request because we're full" );
972        return TR_ADDREQ_FULL;
973    }
974
975    /* have we already asked for this piece? */
976    req.index = index;
977    req.offset = offset;
978    req.length = length;
979    if( !doForce ) {
980        if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
981            dbgmsg( msgs, "declining because it's a duplicate" );
982            return TR_ADDREQ_DUPLICATE;
983        }
984        if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
985            dbgmsg( msgs, "declining because it's a duplicate" );
986            return TR_ADDREQ_DUPLICATE;
987        }
988    }
989
990    /**
991    ***  Accept this request
992    **/
993
994    dbgmsg( msgs, "adding req for %"PRIu32":%"PRIu32"->%"PRIu32" to our `will request' list",
995            index, offset, length );
996    req.time_requested = time( NULL );
997    reqListAppend( &msgs->clientWillAskFor, &req );
998    return TR_ADDREQ_OK;
999}
1000
1001static void
1002cancelAllRequestsToPeer( tr_peermsgs * msgs )
1003{
1004    int                 i;
1005    struct request_list a = msgs->clientWillAskFor;
1006    struct request_list b = msgs->clientAskedFor;
1007    dbgmsg( msgs, "cancelling all requests to peer" );
1008
1009    msgs->clientAskedFor = REQUEST_LIST_INIT;
1010    msgs->clientWillAskFor = REQUEST_LIST_INIT;
1011
1012    for( i=0; i<a.count; ++i )
1013        fireCancelledReq( msgs, &a.requests[i] );
1014
1015    for( i = 0; i < b.count; ++i ) {
1016        fireCancelledReq( msgs, &b.requests[i] );
1017        protocolSendCancel( msgs, &b.requests[i] );
1018    }
1019
1020    reqListClear( &a );
1021    reqListClear( &b );
1022}
1023
1024void
1025tr_peerMsgsCancel( tr_peermsgs * msgs,
1026                   uint32_t      pieceIndex,
1027                   uint32_t      offset,
1028                   uint32_t      length )
1029{
1030    struct peer_request req;
1031
1032    assert( msgs != NULL );
1033    assert( length > 0 );
1034
1035
1036    /* have we asked the peer for this piece? */
1037    req.index = pieceIndex;
1038    req.offset = offset;
1039    req.length = length;
1040
1041    /* if it's only in the queue and hasn't been sent yet, free it */
1042    if( reqListRemove( &msgs->clientWillAskFor, &req ) ) {
1043        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32"\n", pieceIndex, offset, length );
1044        fireCancelledReq( msgs, &req );
1045    }
1046
1047    /* if it's already been sent, send a cancel message too */
1048    if( reqListRemove( &msgs->clientAskedFor, &req ) ) {
1049        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32"\n", pieceIndex, offset, length );
1050        protocolSendCancel( msgs, &req );
1051        fireCancelledReq( msgs, &req );
1052    }
1053}
1054
1055
1056/**
1057***
1058**/
1059
1060static void
1061sendLtepHandshake( tr_peermsgs * msgs )
1062{
1063    tr_benc           val, *m;
1064    char *            buf;
1065    int               len;
1066    int               pex;
1067    struct evbuffer * out = msgs->outMessages;
1068
1069    if( msgs->clientSentLtepHandshake )
1070        return;
1071
1072    dbgmsg( msgs, "sending an ltep handshake" );
1073    msgs->clientSentLtepHandshake = 1;
1074
1075    /* decide if we want to advertise pex support */
1076    if( !tr_torrentAllowsPex( msgs->torrent ) )
1077        pex = 0;
1078    else if( msgs->peerSentLtepHandshake )
1079        pex = msgs->peerSupportsPex ? 1 : 0;
1080    else
1081        pex = 1;
1082
1083    tr_bencInitDict( &val, 4 );
1084    tr_bencDictAddInt( &val, "e",
1085                       msgs->session->encryptionMode != TR_CLEAR_PREFERRED );
1086    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->session ) );
1087    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
1088    m  = tr_bencDictAddDict( &val, "m", 1 );
1089    if( pex )
1090        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
1091    buf = tr_bencSave( &val, &len );
1092
1093    tr_peerIoWriteUint32( msgs->io, out, 2 * sizeof( uint8_t ) + len );
1094    tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
1095    tr_peerIoWriteUint8 ( msgs->io, out, LTEP_HANDSHAKE );
1096    tr_peerIoWriteBytes ( msgs->io, out, buf, len );
1097    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1098    dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH( out ) );
1099
1100    /* cleanup */
1101    tr_bencFree( &val );
1102    tr_free( buf );
1103}
1104
1105static void
1106parseLtepHandshake( tr_peermsgs *     msgs,
1107                    int               len,
1108                    struct evbuffer * inbuf )
1109{
1110    int64_t   i;
1111    tr_benc   val, * sub;
1112    uint8_t * tmp = tr_new( uint8_t, len );
1113
1114    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
1115    msgs->peerSentLtepHandshake = 1;
1116
1117    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type != TYPE_DICT )
1118    {
1119        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
1120        tr_free( tmp );
1121        return;
1122    }
1123
1124    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
1125
1126    /* does the peer prefer encrypted connections? */
1127    if( tr_bencDictFindInt( &val, "e", &i ) )
1128        msgs->info->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1129                                            : ENCRYPTION_PREFERENCE_NO;
1130
1131    /* check supported messages for utorrent pex */
1132    msgs->peerSupportsPex = 0;
1133    if( tr_bencDictFindDict( &val, "m", &sub ) )
1134    {
1135        if( tr_bencDictFindInt( sub, "ut_pex", &i ) )
1136        {
1137            msgs->ut_pex_id = (uint8_t) i;
1138            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
1139            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
1140        }
1141    }
1142
1143    /* get peer's listening port */
1144    if( tr_bencDictFindInt( &val, "p", &i ) )
1145    {
1146        msgs->info->port = htons( (uint16_t)i );
1147        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
1148    }
1149
1150    tr_bencFree( &val );
1151    tr_free( tmp );
1152}
1153
1154static void
1155parseUtPex( tr_peermsgs *     msgs,
1156            int               msglen,
1157            struct evbuffer * inbuf )
1158{
1159    int                loaded = 0;
1160    uint8_t *          tmp = tr_new( uint8_t, msglen );
1161    tr_benc            val;
1162    const tr_torrent * tor = msgs->torrent;
1163    const uint8_t *    added;
1164    size_t             added_len;
1165
1166    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
1167
1168    if( tr_torrentAllowsPex( tor )
1169      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) )
1170      && tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1171    {
1172        const uint8_t * added_f = NULL;
1173        tr_pex *        pex;
1174        size_t          i, n;
1175        size_t          added_f_len = 0;
1176        tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1177        pex =
1178            tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len,
1179                                    &n );
1180        for( i = 0; i < n; ++i )
1181            tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1182                              TR_PEER_FROM_PEX, pex + i );
1183        tr_free( pex );
1184    }
1185
1186    if( loaded )
1187        tr_bencFree( &val );
1188    tr_free( tmp );
1189}
1190
1191static void sendPex( tr_peermsgs * msgs );
1192
1193static void
1194parseLtep( tr_peermsgs *     msgs,
1195           int               msglen,
1196           struct evbuffer * inbuf )
1197{
1198    uint8_t ltep_msgid;
1199
1200    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
1201    msglen--;
1202
1203    if( ltep_msgid == LTEP_HANDSHAKE )
1204    {
1205        dbgmsg( msgs, "got ltep handshake" );
1206        parseLtepHandshake( msgs, msglen, inbuf );
1207        if( tr_peerIoSupportsLTEP( msgs->io ) )
1208        {
1209            sendLtepHandshake( msgs );
1210            sendPex( msgs );
1211        }
1212    }
1213    else if( ltep_msgid == TR_LTEP_PEX )
1214    {
1215        dbgmsg( msgs, "got ut pex" );
1216        msgs->peerSupportsPex = 1;
1217        parseUtPex( msgs, msglen, inbuf );
1218    }
1219    else
1220    {
1221        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1222        evbuffer_drain( inbuf, msglen );
1223    }
1224}
1225
1226static int
1227readBtLength( tr_peermsgs *     msgs,
1228              struct evbuffer * inbuf,
1229              size_t            inlen )
1230{
1231    uint32_t len;
1232
1233    if( inlen < sizeof( len ) )
1234        return READ_LATER;
1235
1236    tr_peerIoReadUint32( msgs->io, inbuf, &len );
1237
1238    if( len == 0 ) /* peer sent us a keepalive message */
1239        dbgmsg( msgs, "got KeepAlive" );
1240    else
1241    {
1242        msgs->incoming.length = len;
1243        msgs->state = AWAITING_BT_ID;
1244    }
1245
1246    return READ_NOW;
1247}
1248
1249static int readBtMessage( tr_peermsgs *     msgs,
1250                          struct evbuffer * inbuf,
1251                          size_t            inlen );
1252
1253static int
1254readBtId( tr_peermsgs *     msgs,
1255          struct evbuffer * inbuf,
1256          size_t            inlen )
1257{
1258    uint8_t id;
1259
1260    if( inlen < sizeof( uint8_t ) )
1261        return READ_LATER;
1262
1263    tr_peerIoReadUint8( msgs->io, inbuf, &id );
1264    msgs->incoming.id = id;
1265
1266    if( id == BT_PIECE )
1267    {
1268        msgs->state = AWAITING_BT_PIECE;
1269        return READ_NOW;
1270    }
1271    else if( msgs->incoming.length != 1 )
1272    {
1273        msgs->state = AWAITING_BT_MESSAGE;
1274        return READ_NOW;
1275    }
1276    else return readBtMessage( msgs, inbuf, inlen - 1 );
1277}
1278
1279static void
1280updatePeerProgress( tr_peermsgs * msgs )
1281{
1282    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
1283    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
1284    updateFastSet( msgs );
1285    updateInterest( msgs );
1286    firePeerProgress( msgs );
1287}
1288
1289static void
1290peerMadeRequest( tr_peermsgs *               msgs,
1291                 const struct peer_request * req )
1292{
1293    const int fext = tr_peerIoSupportsFEXT( msgs->io );
1294    const int reqIsValid = requestIsValid( msgs, req );
1295    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
1296    const int peerIsChoked = msgs->info->peerIsChoked;
1297
1298    int allow = FALSE;
1299
1300    if( !reqIsValid )
1301        dbgmsg( msgs, "rejecting an invalid request." );
1302    else if( !clientHasPiece )
1303        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1304    else if( peerIsChoked )
1305        dbgmsg( msgs, "rejecting request from choked peer" );
1306    else
1307        allow = TRUE;
1308
1309    if( allow )
1310        reqListAppend( &msgs->peerAskedFor, req );
1311    else if( fext )
1312        protocolSendReject( msgs, req );
1313}
1314
1315static int
1316messageLengthIsCorrect( const tr_peermsgs * msg,
1317                        uint8_t             id,
1318                        uint32_t            len )
1319{
1320    switch( id )
1321    {
1322        case BT_CHOKE:
1323        case BT_UNCHOKE:
1324        case BT_INTERESTED:
1325        case BT_NOT_INTERESTED:
1326        case BT_FEXT_HAVE_ALL:
1327        case BT_FEXT_HAVE_NONE:
1328            return len == 1;
1329
1330        case BT_HAVE:
1331        case BT_FEXT_SUGGEST:
1332        case BT_FEXT_ALLOWED_FAST:
1333            return len == 5;
1334
1335        case BT_BITFIELD:
1336            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1337
1338        case BT_REQUEST:
1339        case BT_CANCEL:
1340        case BT_FEXT_REJECT:
1341            return len == 13;
1342
1343        case BT_PIECE:
1344            return len > 9 && len <= 16393;
1345
1346        case BT_PORT:
1347            return len == 3;
1348
1349        case BT_LTEP:
1350            return len >= 2;
1351
1352        default:
1353            return FALSE;
1354    }
1355}
1356
1357static int clientGotBlock( tr_peermsgs *               msgs,
1358                           const uint8_t *             block,
1359                           const struct peer_request * req );
1360
1361static int
1362readBtPiece( tr_peermsgs      * msgs,
1363             struct evbuffer  * inbuf,
1364             size_t             inlen,
1365             size_t           * setme_piece_bytes_read )
1366{
1367    struct peer_request * req = &msgs->incoming.blockReq;
1368
1369    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1370    dbgmsg( msgs, "In readBtPiece" );
1371
1372    if( !req->length )
1373    {
1374        if( inlen < 8 )
1375            return READ_LATER;
1376
1377        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
1378        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
1379        req->length = msgs->incoming.length - 9;
1380        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index,
1381                req->offset,
1382                req->length );
1383        return READ_NOW;
1384    }
1385    else
1386    {
1387        int          err;
1388
1389        /* read in another chunk of data */
1390        const size_t nLeft = req->length - EVBUFFER_LENGTH(
1391            msgs->incoming.block );
1392        size_t       n = MIN( nLeft, inlen );
1393        uint8_t *    buf = tr_new( uint8_t, n );
1394        assert( EVBUFFER_LENGTH( inbuf ) >= n );
1395        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
1396        evbuffer_add( msgs->incoming.block, buf, n );
1397        fireClientGotData( msgs, n, TRUE );
1398        *setme_piece_bytes_read += n;
1399        tr_free( buf );
1400        dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
1401               (int)n, req->index, req->offset, req->length,
1402               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1403        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1404            return READ_LATER;
1405
1406        /* we've got the whole block ... process it */
1407        err = clientGotBlock( msgs, EVBUFFER_DATA(
1408                                  msgs->incoming.block ), req );
1409
1410        /* cleanup */
1411        evbuffer_drain( msgs->incoming.block,
1412                       EVBUFFER_LENGTH( msgs->incoming.block ) );
1413        req->length = 0;
1414        msgs->state = AWAITING_BT_LENGTH;
1415        if( !err )
1416            return READ_NOW;
1417        else
1418        {
1419            fireError( msgs, err );
1420            return READ_ERR;
1421        }
1422    }
1423}
1424
1425static int
1426readBtMessage( tr_peermsgs *     msgs,
1427               struct evbuffer * inbuf,
1428               size_t            inlen )
1429{
1430    uint32_t      ui32;
1431    uint32_t      msglen = msgs->incoming.length;
1432    const uint8_t id = msgs->incoming.id;
1433    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1434    const int     fext = tr_peerIoSupportsFEXT( msgs->io );
1435
1436    --msglen; /* id length */
1437
1438    if( inlen < msglen )
1439        return READ_LATER;
1440
1441    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id,
1442            (int)msglen,
1443            (int)inlen );
1444
1445    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1446    {
1447        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d",
1448                (int)id, (int)msglen );
1449        fireError( msgs, EMSGSIZE );
1450        return READ_ERR;
1451    }
1452
1453    switch( id )
1454    {
1455        case BT_CHOKE:
1456            dbgmsg( msgs, "got Choke" );
1457            msgs->info->clientIsChoked = 1;
1458            if( !fext )
1459                cancelAllRequestsToPeer( msgs );
1460            cancelAllRequestsToClient( msgs );
1461            break;
1462
1463        case BT_UNCHOKE:
1464            dbgmsg( msgs, "got Unchoke" );
1465            msgs->info->clientIsChoked = 0;
1466            fireNeedReq( msgs );
1467            break;
1468
1469        case BT_INTERESTED:
1470            dbgmsg( msgs, "got Interested" );
1471            msgs->info->peerIsInterested = 1;
1472            break;
1473
1474        case BT_NOT_INTERESTED:
1475            dbgmsg( msgs, "got Not Interested" );
1476            msgs->info->peerIsInterested = 0;
1477            break;
1478
1479        case BT_HAVE:
1480            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1481            dbgmsg( msgs, "got Have: %u", ui32 );
1482            if( tr_bitfieldAdd( msgs->info->have, ui32 ) )
1483            {
1484                fireError( msgs, ERANGE );
1485                return READ_ERR;
1486            }
1487            updatePeerProgress( msgs );
1488            tr_rcTransferred( msgs->torrent->swarmSpeed,
1489                              msgs->torrent->info.pieceSize );
1490            break;
1491
1492        case BT_BITFIELD:
1493        {
1494            dbgmsg( msgs, "got a bitfield" );
1495            msgs->peerSentBitfield = 1;
1496            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits,
1497                                msglen );
1498            updatePeerProgress( msgs );
1499            fireNeedReq( msgs );
1500            break;
1501        }
1502
1503        case BT_REQUEST:
1504        {
1505            struct peer_request r;
1506            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1507            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1508            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1509            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1510            peerMadeRequest( msgs, &r );
1511            break;
1512        }
1513
1514        case BT_CANCEL:
1515        {
1516            struct peer_request r;
1517            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1518            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1519            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1520            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1521            if( reqListRemove( &msgs->peerAskedFor, &r ) && fext )
1522                protocolSendReject( msgs, &r );
1523            break;
1524        }
1525
1526        case BT_PIECE:
1527            assert( 0 ); /* handled elsewhere! */
1528            break;
1529
1530        case BT_PORT:
1531            dbgmsg( msgs, "Got a BT_PORT" );
1532            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1533            break;
1534
1535        case BT_FEXT_SUGGEST:
1536            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1537            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1538            if( fext )
1539                fireClientGotSuggest( msgs, ui32 );
1540            else {
1541                fireError( msgs, EPROTO );
1542                return READ_ERR;
1543            }
1544            break;
1545
1546        case BT_FEXT_ALLOWED_FAST:
1547            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1548            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1549            if( fext )
1550                fireClientGotAllowedFast( msgs, ui32 );
1551            else {
1552                fireError( msgs, EPROTO );
1553                return READ_ERR;
1554            }
1555            break;
1556
1557        case BT_FEXT_HAVE_ALL:
1558            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1559            if( fext ) {
1560                tr_bitfieldAddRange( msgs->info->have, 0, msgs->torrent->info.pieceCount );
1561                updatePeerProgress( msgs );
1562            } else {
1563                fireError( msgs, EPROTO );
1564                return READ_ERR;
1565            }
1566            break;
1567
1568
1569        case BT_FEXT_HAVE_NONE:
1570            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1571            if( fext ) {
1572                tr_bitfieldClear( msgs->info->have );
1573                updatePeerProgress( msgs );
1574            } else {
1575                fireError( msgs, EPROTO );
1576                return READ_ERR;
1577            }
1578            break;
1579
1580        case BT_FEXT_REJECT:
1581        {
1582            struct peer_request r;
1583            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1584            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1585            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1586            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1587            if( fext )
1588                reqListRemove( &msgs->clientAskedFor, &r );
1589            else {
1590                fireError( msgs, EPROTO );
1591                return READ_ERR;
1592            }
1593            break;
1594        }
1595
1596        case BT_LTEP:
1597            dbgmsg( msgs, "Got a BT_LTEP" );
1598            parseLtep( msgs, msglen, inbuf );
1599            break;
1600
1601        default:
1602            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1603            tr_peerIoDrain( msgs->io, inbuf, msglen );
1604            break;
1605    }
1606
1607    assert( msglen + 1 == msgs->incoming.length );
1608    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1609
1610    msgs->state = AWAITING_BT_LENGTH;
1611    return READ_NOW;
1612}
1613
1614static void
1615decrementDownloadedCount( tr_peermsgs * msgs,
1616                          uint32_t      byteCount )
1617{
1618    tr_torrent * tor = msgs->torrent;
1619
1620    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1621}
1622
1623static void
1624clientGotUnwantedBlock( tr_peermsgs *               msgs,
1625                        const struct peer_request * req )
1626{
1627    decrementDownloadedCount( msgs, req->length );
1628}
1629
1630static void
1631addPeerToBlamefield( tr_peermsgs * msgs,
1632                     uint32_t      index )
1633{
1634    if( !msgs->info->blame )
1635        msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1636    tr_bitfieldAdd( msgs->info->blame, index );
1637}
1638
1639/* returns 0 on success, or an errno on failure */
1640static int
1641clientGotBlock( tr_peermsgs *               msgs,
1642                const uint8_t *             data,
1643                const struct peer_request * req )
1644{
1645    int                    err;
1646    tr_torrent *           tor = msgs->torrent;
1647    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1648
1649    assert( msgs );
1650    assert( req );
1651
1652    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) )
1653    {
1654        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1655                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1656        return EMSGSIZE;
1657    }
1658
1659    /* save the block */
1660    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset,
1661            req->length );
1662
1663    /**
1664    *** Remove the block from our `we asked for this' list
1665    **/
1666
1667    if( !reqListRemove( &msgs->clientAskedFor, req ) )
1668    {
1669        clientGotUnwantedBlock( msgs, req );
1670        dbgmsg( msgs, "we didn't ask for this message..." );
1671        return 0;
1672    }
1673
1674    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1675            msgs->clientAskedFor.count );
1676
1677    /**
1678    *** Error checks
1679    **/
1680
1681    if( tr_cpBlockIsComplete( tor->completion, block ) )
1682    {
1683        dbgmsg( msgs, "we have this block already..." );
1684        clientGotUnwantedBlock( msgs, req );
1685        return 0;
1686    }
1687
1688    /**
1689    ***  Save the block
1690    **/
1691
1692    msgs->info->peerSentPieceDataAt = time( NULL );
1693    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1694        return err;
1695
1696    addPeerToBlamefield( msgs, req->index );
1697    fireGotBlock( msgs, req );
1698    return 0;
1699}
1700
1701static int peerPulse( void * vmsgs );
1702
1703static void
1704didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1705{
1706    tr_peermsgs * msgs = vmsgs;
1707    firePeerGotData( msgs, bytesWritten, wasPieceData );
1708    peerPulse( msgs );
1709}
1710
1711static ReadState
1712canRead( struct tr_iobuf * iobuf, void * vmsgs, size_t * piece )
1713{
1714    ReadState         ret;
1715    tr_peermsgs *     msgs = vmsgs;
1716    struct evbuffer * in = tr_iobuf_input( iobuf );
1717    const size_t      inlen = EVBUFFER_LENGTH( in );
1718
1719    if( !inlen )
1720    {
1721        ret = READ_LATER;
1722    }
1723    else if( msgs->state == AWAITING_BT_PIECE )
1724    {
1725        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1726    }
1727    else switch( msgs->state )
1728    {
1729        case AWAITING_BT_LENGTH:
1730            ret = readBtLength ( msgs, in, inlen ); break;
1731
1732        case AWAITING_BT_ID:
1733            ret = readBtId     ( msgs, in, inlen ); break;
1734
1735        case AWAITING_BT_MESSAGE:
1736            ret = readBtMessage( msgs, in, inlen ); break;
1737
1738        default:
1739            assert( 0 );
1740    }
1741
1742    /* log the raw data that was read */
1743    if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
1744        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1745
1746    return ret;
1747}
1748
1749/**
1750***
1751**/
1752
1753static int
1754ratePulse( void * vpeer )
1755{
1756    tr_peermsgs * peer = vpeer;
1757    const double rateToClient = tr_peerGetPieceSpeed( peer->info,
1758                                                      TR_PEER_TO_CLIENT );
1759    const int estimatedBlocksInNext30Seconds =
1760                  ( rateToClient * 30 * 1024 ) / peer->torrent->blockSize;
1761
1762    peer->minActiveRequests = 8;
1763    peer->maxActiveRequests = peer->minActiveRequests + estimatedBlocksInNext30Seconds;
1764    return TRUE;
1765}
1766
1767static size_t
1768fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1769{
1770    size_t bytesWritten = 0;
1771    struct peer_request req;
1772    const int haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1773    const int fext = tr_peerIoSupportsFEXT( msgs->io );
1774
1775    /**
1776    ***  Protocol messages
1777    **/
1778
1779    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1780    {
1781        dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1782        msgs->outMessagesBatchedAt = now;
1783    }
1784    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1785    {
1786        const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
1787        /* flush the protocol messages */
1788        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->io, len );
1789        tr_peerIoWriteBuf( msgs->io, msgs->outMessages, FALSE );
1790        msgs->clientSentAnythingAt = now;
1791        msgs->outMessagesBatchedAt = 0;
1792        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1793        bytesWritten +=  len;
1794    }
1795
1796    /**
1797    ***  Blocks
1798    **/
1799
1800    if( ( tr_peerIoGetWriteBufferSpace( msgs->io ) >= msgs->torrent->blockSize )
1801        && popNextRequest( msgs, &req ) )
1802    {
1803        if( requestIsValid( msgs, &req )
1804            && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) )
1805        {
1806            /* send a block */
1807            uint8_t * buf = tr_new( uint8_t, req.length );
1808            const int err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf );
1809            if( err ) {
1810                fireError( msgs, err );
1811                bytesWritten = 0;
1812                msgs = NULL;
1813            } else {
1814                tr_peerIo * io = msgs->io;
1815                struct evbuffer * out = evbuffer_new( );
1816                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1817                tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1818                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1819                tr_peerIoWriteUint32( io, out, req.index );
1820                tr_peerIoWriteUint32( io, out, req.offset );
1821                tr_peerIoWriteBytes ( io, out, buf, req.length );
1822                tr_peerIoWriteBuf( io, out, TRUE );
1823                bytesWritten += EVBUFFER_LENGTH( out );
1824                evbuffer_free( out );
1825                msgs->clientSentAnythingAt = now;
1826            }
1827            tr_free( buf );
1828        }
1829        else if( fext ) /* peer needs a reject message */
1830        {
1831            protocolSendReject( msgs, &req );
1832        }
1833    }
1834
1835    /**
1836    ***  Keepalive
1837    **/
1838
1839    if( ( msgs != NULL )
1840        && ( msgs->clientSentAnythingAt != 0 )
1841        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1842    {
1843        dbgmsg( msgs, "sending a keepalive message" );
1844        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1845        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1846    }
1847
1848    return bytesWritten;
1849}
1850
1851static int
1852peerPulse( void * vmsgs )
1853{
1854    tr_peermsgs * msgs = vmsgs;
1855    const time_t  now = time( NULL );
1856
1857    ratePulse( msgs );
1858
1859    pumpRequestQueue( msgs, now );
1860    expireOldRequests( msgs, now );
1861
1862    for( ;; )
1863        if( fillOutputBuffer( msgs, now ) < 1 )
1864            break;
1865
1866    return TRUE; /* loop forever */
1867}
1868
1869void
1870tr_peerMsgsPulse( tr_peermsgs * msgs )
1871{
1872    if( msgs != NULL )
1873        peerPulse( msgs );
1874}
1875
1876static void
1877gotError( struct tr_iobuf  * iobuf UNUSED,
1878          short              what,
1879          void             * vmsgs )
1880{
1881    if( what & EVBUFFER_TIMEOUT )
1882        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1883    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1884        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1885               what, errno, tr_strerror( errno ) );
1886    fireError( vmsgs, ENOTCONN );
1887}
1888
1889static void
1890sendBitfield( tr_peermsgs * msgs )
1891{
1892    struct evbuffer * out = msgs->outMessages;
1893    tr_bitfield *     field;
1894    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
1895    size_t            i;
1896    size_t            lazyCount = 0;
1897
1898    field = tr_bitfieldDup( tr_cpPieceBitfield( msgs->torrent->completion ) );
1899
1900    if( tr_sessionIsLazyBitfieldEnabled( msgs->session ) )
1901    {
1902        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
1903            speed over a truly random sample -- let's limit the pool size to
1904            the first 1000 pieces so large torrents don't bog things down */
1905        size_t poolSize;
1906        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
1907        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
1908
1909        /* build the pool */
1910        for( i=poolSize=0; i<maxPoolSize; ++i )
1911            if( tr_bitfieldHas( field, i ) )
1912                pool[poolSize++] = i;
1913
1914        /* pull random piece indices from the pool */
1915        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
1916        {
1917            const int pos = tr_cryptoWeakRandInt( poolSize );
1918            const tr_piece_index_t piece = pool[pos];
1919            tr_bitfieldRem( field, piece );
1920            lazyPieces[lazyCount++] = piece;
1921            pool[pos] = pool[--poolSize];
1922        }
1923
1924        /* cleanup */
1925        tr_free( pool );
1926    }
1927
1928    tr_peerIoWriteUint32( msgs->io, out,
1929                          sizeof( uint8_t ) + field->byteCount );
1930    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1931    tr_peerIoWriteBytes ( msgs->io, out, field->bits, field->byteCount );
1932    dbgmsg( msgs, "sending bitfield... outMessage size is now %d",
1933           (int)EVBUFFER_LENGTH( out ) );
1934    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1935
1936    for( i = 0; i < lazyCount; ++i )
1937        protocolSendHave( msgs, lazyPieces[i] );
1938
1939    tr_bitfieldFree( field );
1940}
1941
1942static void
1943tellPeerWhatWeHave( tr_peermsgs * msgs )
1944{
1945    const int fext = tr_peerIoSupportsFEXT( msgs->io );
1946
1947    if( fext && ( tr_cpGetStatus( msgs->torrent->completion ) == TR_SEED ) )
1948    {
1949        protocolSendHaveAll( msgs );
1950    }
1951    else if( fext && ( tr_cpHaveValid( msgs->torrent->completion ) == 0 ) )
1952    {
1953        protocolSendHaveNone( msgs );
1954    }
1955    else
1956    {
1957        sendBitfield( msgs );
1958    }
1959}
1960
1961/**
1962***
1963**/
1964
1965/* some peers give us error messages if we send
1966   more than this many peers in a single pex message
1967   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1968#define MAX_PEX_ADDED 50
1969#define MAX_PEX_DROPPED 50
1970
1971typedef struct
1972{
1973    tr_pex *  added;
1974    tr_pex *  dropped;
1975    tr_pex *  elements;
1976    int       addedCount;
1977    int       droppedCount;
1978    int       elementCount;
1979}
1980PexDiffs;
1981
1982static void
1983pexAddedCb( void * vpex,
1984            void * userData )
1985{
1986    PexDiffs * diffs = userData;
1987    tr_pex *   pex = vpex;
1988
1989    if( diffs->addedCount < MAX_PEX_ADDED )
1990    {
1991        diffs->added[diffs->addedCount++] = *pex;
1992        diffs->elements[diffs->elementCount++] = *pex;
1993    }
1994}
1995
1996static void
1997pexDroppedCb( void * vpex,
1998              void * userData )
1999{
2000    PexDiffs * diffs = userData;
2001    tr_pex *   pex = vpex;
2002
2003    if( diffs->droppedCount < MAX_PEX_DROPPED )
2004    {
2005        diffs->dropped[diffs->droppedCount++] = *pex;
2006    }
2007}
2008
2009static void
2010pexElementCb( void * vpex,
2011              void * userData )
2012{
2013    PexDiffs * diffs = userData;
2014    tr_pex *   pex = vpex;
2015
2016    diffs->elements[diffs->elementCount++] = *pex;
2017}
2018
2019/* TODO: ipv6 pex */
2020static void
2021sendPex( tr_peermsgs * msgs )
2022{
2023    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2024    {
2025        PexDiffs diffs;
2026        tr_pex * newPex = NULL;
2027        const int newCount = tr_peerMgrGetPeers( msgs->session->peerMgr,
2028                                                 msgs->torrent->info.hash,
2029                                                 &newPex );
2030
2031        /* build the diffs */
2032        diffs.added = tr_new( tr_pex, newCount );
2033        diffs.addedCount = 0;
2034        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2035        diffs.droppedCount = 0;
2036        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2037        diffs.elementCount = 0;
2038        tr_set_compare( msgs->pex, msgs->pexCount,
2039                        newPex, newCount,
2040                        tr_pexCompare, sizeof( tr_pex ),
2041                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2042        dbgmsg(
2043            msgs,
2044            "pex: old peer count %d, new peer count %d, added %d, removed %d",
2045            msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
2046
2047        if( !diffs.addedCount && !diffs.droppedCount )
2048        {
2049            tr_free( diffs.elements );
2050        }
2051        else
2052        {
2053            int  i;
2054            tr_benc val;
2055            char * benc;
2056            int bencLen;
2057            uint8_t * tmp, *walk;
2058            struct evbuffer * out = msgs->outMessages;
2059
2060            /* update peer */
2061            tr_free( msgs->pex );
2062            msgs->pex = diffs.elements;
2063            msgs->pexCount = diffs.elementCount;
2064
2065            /* build the pex payload */
2066            tr_bencInitDict( &val, 3 );
2067
2068            /* "added" */
2069            tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2070            for( i = 0; i < diffs.addedCount; ++i ) {
2071                memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2072                memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2073            }
2074            assert( ( walk - tmp ) == diffs.addedCount * 6 );
2075            tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2076            tr_free( tmp );
2077
2078            /* "added.f" */
2079            tmp = walk = tr_new( uint8_t, diffs.addedCount );
2080            for( i = 0; i < diffs.addedCount; ++i )
2081                *walk++ = diffs.added[i].flags;
2082            assert( ( walk - tmp ) == diffs.addedCount );
2083            tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2084            tr_free( tmp );
2085
2086            /* "dropped" */
2087            tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2088            for( i = 0; i < diffs.droppedCount; ++i ) {
2089                memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2090                memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2091            }
2092            assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2093            tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2094            tr_free( tmp );
2095
2096            /* write the pex message */
2097            benc = tr_bencSave( &val, &bencLen );
2098            tr_peerIoWriteUint32( msgs->io, out, 2 * sizeof( uint8_t ) + bencLen );
2099            tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
2100            tr_peerIoWriteUint8 ( msgs->io, out, msgs->ut_pex_id );
2101            tr_peerIoWriteBytes ( msgs->io, out, benc, bencLen );
2102            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2103            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
2104
2105            tr_free( benc );
2106            tr_bencFree( &val );
2107        }
2108
2109        /* cleanup */
2110        tr_free( diffs.added );
2111        tr_free( diffs.dropped );
2112        tr_free( newPex );
2113
2114        msgs->clientSentPexAt = time( NULL );
2115    }
2116}
2117
2118static int
2119pexPulse( void * vpeer )
2120{
2121    sendPex( vpeer );
2122    return TRUE;
2123}
2124
2125/**
2126***
2127**/
2128
2129tr_peermsgs*
2130tr_peerMsgsNew( struct tr_torrent * torrent,
2131                struct tr_peer *    info,
2132                tr_delivery_func    func,
2133                void *              userData,
2134                tr_publisher_tag *  setme )
2135{
2136    tr_peermsgs * m;
2137
2138    assert( info );
2139    assert( info->io );
2140
2141    m = tr_new0( tr_peermsgs, 1 );
2142    m->publisher = tr_publisherNew( );
2143    m->info = info;
2144    m->session = torrent->session;
2145    m->torrent = torrent;
2146    m->io = info->io;
2147    m->info->clientIsChoked = 1;
2148    m->info->peerIsChoked = 1;
2149    m->info->clientIsInterested = 0;
2150    m->info->peerIsInterested = 0;
2151    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
2152    m->state = AWAITING_BT_LENGTH;
2153    m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL );
2154    m->outMessages = evbuffer_new( );
2155    m->outMessagesBatchedAt = 0;
2156    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2157    m->incoming.block = evbuffer_new( );
2158    m->peerAllowedPieces = NULL;
2159    m->peerAskedFor = REQUEST_LIST_INIT;
2160    m->clientAskedFor = REQUEST_LIST_INIT;
2161    m->clientWillAskFor = REQUEST_LIST_INIT;
2162    *setme = tr_publisherSubscribe( m->publisher, func, userData );
2163
2164    if( tr_peerIoSupportsLTEP( m->io ) )
2165        sendLtepHandshake( m );
2166
2167    tellPeerWhatWeHave( m );
2168
2169    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of
2170                                             inactivity */
2171    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
2172    ratePulse( m );
2173
2174    return m;
2175}
2176
2177void
2178tr_peerMsgsFree( tr_peermsgs* msgs )
2179{
2180    if( msgs )
2181    {
2182        tr_timerFree( &msgs->pexTimer );
2183        tr_publisherFree( &msgs->publisher );
2184        reqListClear( &msgs->clientWillAskFor );
2185        reqListClear( &msgs->clientAskedFor );
2186        reqListClear( &msgs->peerAskedFor );
2187
2188        tr_bitfieldFree( msgs->peerAllowedPieces );
2189        evbuffer_free( msgs->incoming.block );
2190        evbuffer_free( msgs->outMessages );
2191        tr_free( msgs->pex );
2192
2193        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2194        tr_free( msgs );
2195    }
2196}
2197
2198void
2199tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2200                        tr_publisher_tag tag )
2201{
2202    tr_publisherUnsubscribe( peer->publisher, tag );
2203}
2204
Note: See TracBrowser for help on using the repository browser.