source: branches/1.4x/libtransmission/peer-msgs.c @ 7354

Last change on this file since 7354 was 7354, checked in by charles, 12 years ago

(1.4x libT) fix bug which caused libtransmission to hold onto onto nonproductive peers for longer than it should've

  • Property svn:keywords set to Date Rev Author Id
File size: 59.7 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 7354 2008-12-11 07:11:23Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <event.h>
21
22#include "transmission.h"
23#include "bencode.h"
24#include "completion.h"
25#include "crypto.h"
26#include "inout.h"
27#include "iobuf.h"
28#ifdef WIN32
29#include "net.h" /* for ECONN */
30#endif
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-mgr-private.h"
34#include "peer-msgs.h"
35#include "ratecontrol.h"
36#include "stats.h"
37#include "torrent.h"
38#include "trevent.h"
39#include "utils.h"
40
41/**
42***
43**/
44
45enum
46{
47    BT_CHOKE                = 0,
48    BT_UNCHOKE              = 1,
49    BT_INTERESTED           = 2,
50    BT_NOT_INTERESTED       = 3,
51    BT_HAVE                 = 4,
52    BT_BITFIELD             = 5,
53    BT_REQUEST              = 6,
54    BT_PIECE                = 7,
55    BT_CANCEL               = 8,
56    BT_PORT                 = 9,
57    BT_SUGGEST              = 13,
58    BT_HAVE_ALL             = 14,
59    BT_HAVE_NONE            = 15,
60    BT_REJECT               = 16,
61    BT_ALLOWED_FAST         = 17,
62    BT_LTEP                 = 20,
63
64    LTEP_HANDSHAKE          = 0,
65
66    TR_LTEP_PEX             = 1,
67
68    MIN_CHOKE_PERIOD_SEC    = ( 10 ),
69
70    /* idle seconds before we send a keepalive */
71    KEEPALIVE_INTERVAL_SECS = 100,
72
73    PEX_INTERVAL            = ( 90 * 1000 ), /* msec between sendPex() calls */
74    PEER_PULSE_INTERVAL     = ( 250 ),       /* msec between peerPulse() calls
75                                               */
76
77    MAX_QUEUE_SIZE          = ( 100 ),
78
79    /* (fast peers) max number of pieces we fast-allow to another peer */
80    MAX_FAST_ALLOWED_COUNT   = 10,
81
82    /* (fast peers) max threshold for allowing fast-pieces requests */
83    MAX_FAST_ALLOWED_THRESHOLD = 10,
84
85    /* how long an unsent request can stay queued before it's returned
86       back to the peer-mgr's pool of requests */
87    QUEUED_REQUEST_TTL_SECS = 20,
88
89    /* how long a sent request can stay queued before it's returned
90       back to the peer-mgr's pool of requests */
91    SENT_REQUEST_TTL_SECS = 240,
92
93    /* used in lowering the outMessages queue period */
94    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
95    HIGH_PRIORITY_INTERVAL_SECS = 2,
96    LOW_PRIORITY_INTERVAL_SECS = 20,
97
98    /* number of pieces to remove from the bitfield when
99     * lazy bitfields are turned on */
100    LAZY_PIECE_COUNT = 26
101};
102
103/**
104***  REQUEST MANAGEMENT
105**/
106
107enum
108{
109    AWAITING_BT_LENGTH,
110    AWAITING_BT_ID,
111    AWAITING_BT_MESSAGE,
112    AWAITING_BT_PIECE
113};
114
115struct peer_request
116{
117    uint32_t    index;
118    uint32_t    offset;
119    uint32_t    length;
120    time_t      time_requested;
121};
122
123static int
124compareRequest( const void * va,
125                const void * vb )
126{
127    const struct peer_request * a = va;
128    const struct peer_request * b = vb;
129
130    if( a->index != b->index )
131        return a->index < b->index ? -1 : 1;
132
133    if( a->offset != b->offset )
134        return a->offset < b->offset ? -1 : 1;
135
136    if( a->length != b->length )
137        return a->length < b->length ? -1 : 1;
138
139    return 0;
140}
141
142struct request_list
143{
144    uint16_t               count;
145    uint16_t               max;
146    struct peer_request *  requests;
147};
148
149static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
150
151static void
152reqListReserve( struct request_list * list,
153                uint16_t              max )
154{
155    if( list->max < max )
156    {
157        list->max = max;
158        list->requests = tr_renew( struct peer_request,
159                                   list->requests,
160                                   list->max );
161    }
162}
163
164static void
165reqListClear( struct request_list * list )
166{
167    tr_free( list->requests );
168    *list = REQUEST_LIST_INIT;
169}
170
171static void
172reqListCopy( struct request_list *       dest,
173             const struct request_list * src )
174{
175    dest->count = dest->max = src->count;
176    dest->requests =
177        tr_memdup( src->requests, dest->count * sizeof( struct peer_request ) );
178}
179
180static void
181reqListRemoveOne( struct request_list * list,
182                  int                   i )
183{
184    assert( 0 <= i && i < list->count );
185
186    memmove( &list->requests[i],
187            &list->requests[i + 1],
188            sizeof( struct peer_request ) * ( --list->count - i ) );
189}
190
191static void
192reqListAppend( struct request_list *       list,
193               const struct peer_request * req )
194{
195    if( ++list->count >= list->max )
196        reqListReserve( list, list->max + 8 );
197
198    list->requests[list->count - 1] = *req;
199}
200
201static int
202reqListPop( struct request_list * list,
203            struct peer_request * setme )
204{
205    int success;
206
207    if( !list->count )
208        success = FALSE;
209    else {
210        *setme = list->requests[0];
211        reqListRemoveOne( list, 0 );
212        success = TRUE;
213    }
214
215    return success;
216}
217
218static int
219reqListFind( struct request_list *       list,
220             const struct peer_request * key )
221{
222    uint16_t i;
223
224    for( i = 0; i < list->count; ++i )
225        if( !compareRequest( key, list->requests + i ) )
226            return i;
227
228    return -1;
229}
230
231static int
232reqListRemove( struct request_list *       list,
233               const struct peer_request * key )
234{
235    int success;
236    const int i = reqListFind( list, key );
237
238    if( i < 0 )
239        success = FALSE;
240    else {
241        reqListRemoveOne( list, i );
242        success = TRUE;
243    }
244
245    return success;
246}
247
248/**
249***
250**/
251
252/* this is raw, unchanged data from the peer regarding
253 * the current message that it's sending us. */
254struct tr_incoming
255{
256    uint8_t                id;
257    uint32_t               length; /* includes the +1 for id length */
258    struct peer_request    blockReq; /* metadata for incoming blocks */
259    struct evbuffer *      block; /* piece data for incoming blocks */
260};
261
262struct tr_peermsgs
263{
264    tr_bool         peerSentBitfield;
265    tr_bool         peerSupportsPex;
266    tr_bool         clientSentLtepHandshake;
267    tr_bool         peerSentLtepHandshake;
268
269    uint8_t         state;
270    uint8_t         ut_pex_id;
271    uint16_t        pexCount;
272    uint16_t        minActiveRequests;
273    uint16_t        maxActiveRequests;
274
275    /* how long the outMessages batch should be allowed to grow before
276     * it's flushed -- some messages (like requests >:) should be sent
277     * very quickly; others aren't as urgent. */
278    int                    outMessagesBatchPeriod;
279
280    tr_peer *              info;
281
282    tr_session *           session;
283    tr_torrent *           torrent;
284    tr_peerIo *            io;
285
286    tr_publisher_t *       publisher;
287
288    struct evbuffer *      outMessages; /* all the non-piece messages */
289
290    struct request_list    peerAskedFor;
291    struct request_list    peerAskedForFast;
292    struct request_list    clientAskedFor;
293    struct request_list    clientWillAskFor;
294
295    tr_timer *             pexTimer;
296
297    time_t                 clientSentPexAt;
298    time_t                 clientSentAnythingAt;
299
300    /* when we started batching the outMessages */
301    time_t                outMessagesBatchedAt;
302
303    tr_bitfield *         peerAllowedPieces;
304
305    struct tr_incoming    incoming;
306
307    tr_pex *              pex;
308
309    /* if the peer supports the Extension Protocol in BEP 10 and
310       supplied a reqq argument, it's stored here.  otherwise the
311       value is zero and should be ignored. */
312    int64_t               reqq;
313};
314
315/**
316***
317**/
318
319static void
320myDebug( const char *               file,
321         int                        line,
322         const struct tr_peermsgs * msgs,
323         const char *               fmt,
324         ... )
325{
326    FILE * fp = tr_getLog( );
327
328    if( fp )
329    {
330        va_list           args;
331        char              timestr[64];
332        struct evbuffer * buf = evbuffer_new( );
333        char *            base = tr_basename( file );
334
335        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
336                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
337                             msgs->torrent->info.name,
338                             tr_peerIoGetAddrStr( msgs->io ),
339                             msgs->info->client );
340        va_start( args, fmt );
341        evbuffer_add_vprintf( buf, fmt, args );
342        va_end( args );
343        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
344        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
345
346        tr_free( base );
347        evbuffer_free( buf );
348    }
349}
350
351#define dbgmsg( msgs, ... ) \
352    do { \
353        if( tr_deepLoggingIsActive( ) ) \
354            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
355    } while( 0 )
356
357/**
358***
359**/
360
361static void
362pokeBatchPeriod( tr_peermsgs * msgs,
363                 int           interval )
364{
365    if( msgs->outMessagesBatchPeriod > interval )
366    {
367        msgs->outMessagesBatchPeriod = interval;
368        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
369    }
370}
371
372static void
373protocolSendRequest( tr_peermsgs *               msgs,
374                     const struct peer_request * req )
375{
376    tr_peerIo *       io = msgs->io;
377    struct evbuffer * out = msgs->outMessages;
378
379    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
380    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
381    tr_peerIoWriteUint32( io, out, req->index );
382    tr_peerIoWriteUint32( io, out, req->offset );
383    tr_peerIoWriteUint32( io, out, req->length );
384    dbgmsg( msgs, "requesting %u:%u->%u... outMessage size is now %d",
385           req->index, req->offset, req->length, (int)EVBUFFER_LENGTH( out ) );
386    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
387}
388
389static void
390protocolSendCancel( tr_peermsgs *               msgs,
391                    const struct peer_request * req )
392{
393    tr_peerIo *       io = msgs->io;
394    struct evbuffer * out = msgs->outMessages;
395
396    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
397    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
398    tr_peerIoWriteUint32( io, out, req->index );
399    tr_peerIoWriteUint32( io, out, req->offset );
400    tr_peerIoWriteUint32( io, out, req->length );
401    dbgmsg( msgs, "cancelling %u:%u->%u... outMessage size is now %d",
402           req->index, req->offset, req->length, (int)EVBUFFER_LENGTH( out ) );
403    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
404}
405
406static void
407protocolSendHave( tr_peermsgs * msgs,
408                  uint32_t      index )
409{
410    tr_peerIo *       io = msgs->io;
411    struct evbuffer * out = msgs->outMessages;
412
413    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + sizeof( uint32_t ) );
414    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
415    tr_peerIoWriteUint32( io, out, index );
416    dbgmsg( msgs, "sending Have %u.. outMessage size is now %d",
417           index, (int)EVBUFFER_LENGTH( out ) );
418    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
419}
420
421static void
422protocolSendChoke( tr_peermsgs * msgs,
423                   int           choke )
424{
425    tr_peerIo *       io = msgs->io;
426    struct evbuffer * out = msgs->outMessages;
427
428    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
429    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
430    dbgmsg( msgs, "sending %s... outMessage size is now %d",
431           ( choke ? "Choke" : "Unchoke" ),
432           (int)EVBUFFER_LENGTH( out ) );
433    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
434}
435
436/**
437***  EVENTS
438**/
439
440static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0 };
441
442static void
443publish( tr_peermsgs *   msgs,
444         tr_peer_event * e )
445{
446    tr_publisherPublish( msgs->publisher, msgs->info, e );
447}
448
449static void
450fireError( tr_peermsgs * msgs,
451           int           err )
452{
453    tr_peer_event e = blankEvent;
454
455    e.eventType = TR_PEER_ERROR;
456    e.err = err;
457    publish( msgs, &e );
458}
459
460static void
461fireNeedReq( tr_peermsgs * msgs )
462{
463    tr_peer_event e = blankEvent;
464
465    e.eventType = TR_PEER_NEED_REQ;
466    publish( msgs, &e );
467}
468
469static void
470firePeerProgress( tr_peermsgs * msgs )
471{
472    tr_peer_event e = blankEvent;
473
474    e.eventType = TR_PEER_PEER_PROGRESS;
475    e.progress = msgs->info->progress;
476    publish( msgs, &e );
477}
478
479static void
480fireGotBlock( tr_peermsgs *               msgs,
481              const struct peer_request * req )
482{
483    tr_peer_event e = blankEvent;
484
485    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
486    e.pieceIndex = req->index;
487    e.offset = req->offset;
488    e.length = req->length;
489    publish( msgs, &e );
490}
491
492static void
493fireClientGotData( tr_peermsgs * msgs,
494                   uint32_t      length,
495                   int           wasPieceData )
496{
497    tr_peer_event e = blankEvent;
498
499    e.length = length;
500    e.eventType = TR_PEER_CLIENT_GOT_DATA;
501    e.wasPieceData = wasPieceData;
502    publish( msgs, &e );
503}
504
505static void
506firePeerGotData( tr_peermsgs  * msgs,
507                 uint32_t       length,
508                 int            wasPieceData )
509{
510    tr_peer_event e = blankEvent;
511
512    e.length = length;
513    e.eventType = TR_PEER_PEER_GOT_DATA;
514    e.wasPieceData = wasPieceData;
515
516    publish( msgs, &e );
517}
518
519static void
520fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
521{
522    tr_peer_event e = blankEvent;
523    e.eventType = TR_PEER_CANCEL;
524    e.pieceIndex = req->index;
525    e.offset = req->offset;
526    e.length = req->length;
527    publish( msgs, &e );
528}
529
530/**
531***  INTEREST
532**/
533
534static int
535isPieceInteresting( const tr_peermsgs * peer,
536                    tr_piece_index_t    piece )
537{
538    const tr_torrent * torrent = peer->torrent;
539
540    return ( !torrent->info.pieces[piece].dnd )                 /* we want it */
541           && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have
542                                                                        */
543           && ( tr_bitfieldHas( peer->info->have, piece ) );   /* peer has it */
544}
545
546/* "interested" means we'll ask for piece data if they unchoke us */
547static int
548isPeerInteresting( const tr_peermsgs * msgs )
549{
550    tr_piece_index_t    i;
551    const tr_torrent *  torrent;
552    const tr_bitfield * bitfield;
553    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
554
555    if( clientIsSeed )
556        return FALSE;
557
558    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
559        return FALSE;
560
561    torrent = msgs->torrent;
562    bitfield = tr_cpPieceBitfield( torrent->completion );
563
564    if( !msgs->info->have )
565        return TRUE;
566
567    assert( bitfield->byteCount == msgs->info->have->byteCount );
568
569    for( i = 0; i < torrent->info.pieceCount; ++i )
570        if( isPieceInteresting( msgs, i ) )
571            return TRUE;
572
573    return FALSE;
574}
575
576static void
577sendInterest( tr_peermsgs * msgs,
578              int           weAreInterested )
579{
580    struct evbuffer * out = msgs->outMessages;
581
582    assert( msgs );
583    assert( weAreInterested == 0 || weAreInterested == 1 );
584
585    msgs->info->clientIsInterested = weAreInterested;
586    dbgmsg( msgs, "Sending %s",
587            weAreInterested ? "Interested" : "Not Interested" );
588    tr_peerIoWriteUint32( msgs->io, out, sizeof( uint8_t ) );
589    tr_peerIoWriteUint8 (
590        msgs->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
591    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
592    dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH( out ) );
593}
594
595static void
596updateInterest( tr_peermsgs * msgs )
597{
598    const int i = isPeerInteresting( msgs );
599
600    if( i != msgs->info->clientIsInterested )
601        sendInterest( msgs, i );
602    if( i )
603        fireNeedReq( msgs );
604}
605
606static void
607cancelAllRequestsToClientExceptFast( tr_peermsgs * msgs )
608{
609    reqListClear( &msgs->peerAskedFor );
610}
611
612void
613tr_peerMsgsSetChoke( tr_peermsgs * msgs,
614                     int           choke )
615{
616    const time_t now = time( NULL );
617    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
618
619    assert( msgs );
620    assert( msgs->info );
621    assert( choke == 0 || choke == 1 );
622
623    if( msgs->info->chokeChangedAt > fibrillationTime )
624    {
625        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation",
626                choke );
627    }
628    else if( msgs->info->peerIsChoked != choke )
629    {
630        msgs->info->peerIsChoked = choke;
631        if( choke )
632            cancelAllRequestsToClientExceptFast( msgs );
633        protocolSendChoke( msgs, choke );
634        msgs->info->chokeChangedAt = now;
635    }
636}
637
638/**
639***
640**/
641
642void
643tr_peerMsgsHave( tr_peermsgs * msgs,
644                 uint32_t      index )
645{
646    protocolSendHave( msgs, index );
647
648    /* since we have more pieces now, we might not be interested in this peer */
649    updateInterest( msgs );
650}
651
652#if 0
653static void
654sendFastSuggest( tr_peermsgs * msgs,
655                 uint32_t      pieceIndex )
656{
657    assert( msgs );
658
659    if( tr_peerIoSupportsFEXT( msgs->io ) )
660    {
661        tr_peerIoWriteUint32( msgs->io, msgs->outMessages,
662                             sizeof( uint8_t ) + sizeof( uint32_t ) );
663        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
664        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
665    }
666}
667
668static void
669sendFastHave( tr_peermsgs * msgs,
670              int           all )
671{
672    assert( msgs );
673
674    if( tr_peerIoSupportsFEXT( msgs->io ) )
675    {
676        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof( uint8_t ) );
677        tr_peerIoWriteUint8( msgs->io, msgs->outMessages,
678                            ( all ? BT_HAVE_ALL
679                              : BT_HAVE_NONE ) );
680        updateInterest( msgs );
681    }
682}
683
684#endif
685
686static void
687sendFastReject( tr_peermsgs * msgs,
688                uint32_t      pieceIndex,
689                uint32_t      offset,
690                uint32_t      length )
691{
692    assert( msgs );
693
694    if( tr_peerIoSupportsFEXT( msgs->io ) )
695    {
696        struct evbuffer * out = msgs->outMessages;
697        const uint32_t    len = sizeof( uint8_t ) + 3 * sizeof( uint32_t );
698        dbgmsg( msgs, "sending fast reject %u:%u->%u", pieceIndex, offset,
699                length );
700        tr_peerIoWriteUint32( msgs->io, out, len );
701        tr_peerIoWriteUint8( msgs->io, out, BT_REJECT );
702        tr_peerIoWriteUint32( msgs->io, out, pieceIndex );
703        tr_peerIoWriteUint32( msgs->io, out, offset );
704        tr_peerIoWriteUint32( msgs->io, out, length );
705        pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
706        dbgmsg( msgs, "outMessage size is now %d",
707               (int)EVBUFFER_LENGTH( out ) );
708    }
709}
710
711static tr_bitfield*
712getPeerAllowedPieces( tr_peermsgs * msgs )
713{
714    if( !msgs->peerAllowedPieces && tr_peerIoSupportsFEXT( msgs->io ) )
715    {
716        msgs->peerAllowedPieces = tr_peerMgrGenerateAllowedSet(
717            MAX_FAST_ALLOWED_COUNT,
718            msgs->torrent->info.pieceCount,
719            msgs->torrent->info.hash,
720            tr_peerIoGetAddress( msgs->io, NULL ) );
721    }
722
723    return msgs->peerAllowedPieces;
724}
725
726static void
727sendFastAllowed( tr_peermsgs * msgs,
728                 uint32_t      pieceIndex )
729{
730    assert( msgs );
731
732    if( tr_peerIoSupportsFEXT( msgs->io ) )
733    {
734        struct evbuffer * out = msgs->outMessages;
735        dbgmsg( msgs, "sending fast allowed" );
736        tr_peerIoWriteUint32( msgs->io, out,  sizeof( uint8_t ) +
737                             sizeof( uint32_t ) );
738        tr_peerIoWriteUint8( msgs->io, out, BT_ALLOWED_FAST );
739        tr_peerIoWriteUint32( msgs->io, out, pieceIndex );
740        pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
741        dbgmsg( msgs, "outMessage size is now %d",
742               (int)EVBUFFER_LENGTH( out ) );
743    }
744}
745
746static void
747sendFastAllowedSet( tr_peermsgs * msgs )
748{
749    tr_piece_index_t i = 0;
750
751    while( i <= msgs->torrent->info.pieceCount )
752    {
753        if( tr_bitfieldHas( getPeerAllowedPieces( msgs ), i ) )
754            sendFastAllowed( msgs, i );
755        i++;
756    }
757}
758
759static void
760maybeSendFastAllowedSet( tr_peermsgs * msgs )
761{
762    if( tr_bitfieldCountTrueBits( msgs->info->have ) <=
763        MAX_FAST_ALLOWED_THRESHOLD )
764        sendFastAllowedSet( msgs );
765}
766
767/**
768***
769**/
770
771static int
772reqIsValid( const tr_peermsgs * peer,
773            uint32_t            index,
774            uint32_t            offset,
775            uint32_t            length )
776{
777    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
778}
779
780static int
781requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
782{
783    return reqIsValid( msgs, req->index, req->offset, req->length );
784}
785
786static void
787expireOldRequests( tr_peermsgs * msgs, const time_t now  )
788{
789    int                 i;
790    time_t              oldestAllowed;
791    struct request_list tmp = REQUEST_LIST_INIT;
792
793    /* cancel requests that have been queued for too long */
794    oldestAllowed = now - QUEUED_REQUEST_TTL_SECS;
795    reqListCopy( &tmp, &msgs->clientWillAskFor );
796    for( i = 0; i < tmp.count; ++i )
797    {
798        const struct peer_request * req = &tmp.requests[i];
799        if( req->time_requested < oldestAllowed )
800            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
801    }
802    reqListClear( &tmp );
803
804    /* cancel requests that were sent too long ago */
805    oldestAllowed = now - SENT_REQUEST_TTL_SECS;
806    reqListCopy( &tmp, &msgs->clientAskedFor );
807    for( i = 0; i < tmp.count; ++i )
808    {
809        const struct peer_request * req = &tmp.requests[i];
810        if( req->time_requested < oldestAllowed )
811            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
812    }
813    reqListClear( &tmp );
814}
815
816static void
817pumpRequestQueue( tr_peermsgs * msgs, const time_t now )
818{
819    const int           max = msgs->maxActiveRequests;
820    const int           min = msgs->minActiveRequests;
821    int                 sent = 0;
822    int                 count = msgs->clientAskedFor.count;
823    struct peer_request req;
824
825    if( count > min )
826        return;
827    if( msgs->info->clientIsChoked )
828        return;
829    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
830        return;
831
832    while( ( count < max ) && reqListPop( &msgs->clientWillAskFor, &req ) )
833    {
834        const tr_block_index_t block =
835            _tr_block( msgs->torrent, req.index, req.offset );
836
837        assert( requestIsValid( msgs, &req ) );
838        assert( tr_bitfieldHas( msgs->info->have, req.index ) );
839
840        /* don't ask for it if we've already got it... this block may have
841         * come in from a different peer after we cancelled a request for it */
842        if( !tr_cpBlockIsComplete( msgs->torrent->completion, block ) )
843        {
844            protocolSendRequest( msgs, &req );
845            req.time_requested = now;
846            reqListAppend( &msgs->clientAskedFor, &req );
847
848            ++count;
849            ++sent;
850        }
851    }
852
853    if( sent )
854        dbgmsg( msgs,
855                "pump sent %d requests, now have %d active and %d queued",
856                sent,
857                msgs->clientAskedFor.count,
858                msgs->clientWillAskFor.count );
859
860    if( count < max )
861        fireNeedReq( msgs );
862}
863
864static int
865requestQueueIsFull( const tr_peermsgs * msgs )
866{
867    const int req_max = msgs->maxActiveRequests;
868    return msgs->clientWillAskFor.count >= req_max;
869}
870
871tr_addreq_t
872tr_peerMsgsAddRequest( tr_peermsgs *    msgs,
873                       uint32_t         index,
874                       uint32_t         offset,
875                       uint32_t         length )
876{
877    struct peer_request req;
878
879    assert( msgs );
880    assert( msgs->torrent );
881    assert( reqIsValid( msgs, index, offset, length ) );
882
883    /**
884    ***  Reasons to decline the request
885    **/
886
887    /* don't send requests to choked clients */
888    if( msgs->info->clientIsChoked )
889    {
890        dbgmsg( msgs, "declining request because they're choking us" );
891        return TR_ADDREQ_CLIENT_CHOKED;
892    }
893
894    /* peer doesn't have this piece */
895    if( !tr_bitfieldHas( msgs->info->have, index ) )
896        return TR_ADDREQ_MISSING;
897
898    /* peer's queue is full */
899    if( requestQueueIsFull( msgs ) ) {
900        dbgmsg( msgs, "declining request because we're full" );
901        return TR_ADDREQ_FULL;
902    }
903
904    /* have we already asked for this piece? */
905    req.index = index;
906    req.offset = offset;
907    req.length = length;
908    if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
909        dbgmsg( msgs, "declining because it's a duplicate" );
910        return TR_ADDREQ_DUPLICATE;
911    }
912    if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
913        dbgmsg( msgs, "declining because it's a duplicate" );
914        return TR_ADDREQ_DUPLICATE;
915    }
916
917    /**
918    ***  Accept this request
919    **/
920
921    dbgmsg( msgs, "adding req for %"PRIu32":%"PRIu32"->%"PRIu32" to our `will request' list",
922            index, offset, length );
923    req.time_requested = time( NULL );
924    reqListAppend( &msgs->clientWillAskFor, &req );
925    return TR_ADDREQ_OK;
926}
927
928static void
929cancelAllRequestsToPeer( tr_peermsgs * msgs, tr_bool sendCancel )
930{
931    int                 i;
932    struct request_list a = msgs->clientWillAskFor;
933    struct request_list b = msgs->clientAskedFor;
934    dbgmsg( msgs, "cancelling all requests to peer" );
935
936    msgs->clientAskedFor = REQUEST_LIST_INIT;
937    msgs->clientWillAskFor = REQUEST_LIST_INIT;
938
939    for( i=0; i<a.count; ++i )
940        fireCancelledReq( msgs, &a.requests[i] );
941
942    for( i = 0; i < b.count; ++i ) {
943        fireCancelledReq( msgs, &b.requests[i] );
944        if( sendCancel )
945            protocolSendCancel( msgs, &b.requests[i] );
946    }
947
948    reqListClear( &a );
949    reqListClear( &b );
950}
951
952void
953tr_peerMsgsCancel( tr_peermsgs * msgs,
954                   uint32_t      pieceIndex,
955                   uint32_t      offset,
956                   uint32_t      length )
957{
958    struct peer_request req;
959
960    assert( msgs != NULL );
961    assert( length > 0 );
962
963
964    /* have we asked the peer for this piece? */
965    req.index = pieceIndex;
966    req.offset = offset;
967    req.length = length;
968
969    /* if it's only in the queue and hasn't been sent yet, free it */
970    if( reqListRemove( &msgs->clientWillAskFor, &req ) ) {
971        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32"\n", pieceIndex, offset, length );
972        fireCancelledReq( msgs, &req );
973    }
974
975    /* if it's already been sent, send a cancel message too */
976    if( reqListRemove( &msgs->clientAskedFor, &req ) ) {
977        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32"\n", pieceIndex, offset, length );
978        protocolSendCancel( msgs, &req );
979        fireCancelledReq( msgs, &req );
980    }
981}
982
983/**
984***
985**/
986
987static void
988sendLtepHandshake( tr_peermsgs * msgs )
989{
990    tr_benc           val, *m;
991    char *            buf;
992    int               len;
993    int               pex;
994    struct evbuffer * out = msgs->outMessages;
995
996    if( msgs->clientSentLtepHandshake )
997        return;
998
999    dbgmsg( msgs, "sending an ltep handshake" );
1000    msgs->clientSentLtepHandshake = 1;
1001
1002    /* decide if we want to advertise pex support */
1003    if( !tr_torrentAllowsPex( msgs->torrent ) )
1004        pex = 0;
1005    else if( msgs->peerSentLtepHandshake )
1006        pex = msgs->peerSupportsPex ? 1 : 0;
1007    else
1008        pex = 1;
1009
1010    tr_bencInitDict( &val, 4 );
1011    tr_bencDictAddInt( &val, "e",
1012                       msgs->session->encryptionMode != TR_CLEAR_PREFERRED );
1013    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->session ) );
1014    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
1015    m  = tr_bencDictAddDict( &val, "m", 1 );
1016    if( pex )
1017        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
1018    buf = tr_bencSave( &val, &len );
1019
1020    tr_peerIoWriteUint32( msgs->io, out, 2 * sizeof( uint8_t ) + len );
1021    tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
1022    tr_peerIoWriteUint8 ( msgs->io, out, LTEP_HANDSHAKE );
1023    tr_peerIoWriteBytes ( msgs->io, out, buf, len );
1024    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1025    dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH( out ) );
1026
1027    /* cleanup */
1028    tr_bencFree( &val );
1029    tr_free( buf );
1030}
1031
1032static void
1033parseLtepHandshake( tr_peermsgs *     msgs,
1034                    int               len,
1035                    struct evbuffer * inbuf )
1036{
1037    int64_t   i;
1038    tr_benc   val, * sub;
1039    uint8_t * tmp = tr_new( uint8_t, len );
1040
1041    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
1042    msgs->peerSentLtepHandshake = 1;
1043
1044    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type != TYPE_DICT )
1045    {
1046        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
1047        tr_free( tmp );
1048        return;
1049    }
1050
1051    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
1052
1053    /* does the peer prefer encrypted connections? */
1054    if( tr_bencDictFindInt( &val, "e", &i ) )
1055        msgs->info->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1056                                            : ENCRYPTION_PREFERENCE_NO;
1057
1058    /* check supported messages for utorrent pex */
1059    msgs->peerSupportsPex = 0;
1060    if( tr_bencDictFindDict( &val, "m", &sub ) )
1061    {
1062        if( tr_bencDictFindInt( sub, "ut_pex", &i ) )
1063        {
1064            msgs->ut_pex_id = (uint8_t) i;
1065            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
1066            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
1067        }
1068    }
1069
1070    /* get peer's listening port */
1071    if( tr_bencDictFindInt( &val, "p", &i ) )
1072    {
1073        msgs->info->port = htons( (uint16_t)i );
1074        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
1075    }
1076
1077    /* get peer's maximum request queue size */
1078    if( tr_bencDictFindInt( &val, "reqq", &i ) )
1079        msgs->reqq = i;
1080
1081    tr_bencFree( &val );
1082    tr_free( tmp );
1083}
1084
1085static void
1086parseUtPex( tr_peermsgs *     msgs,
1087            int               msglen,
1088            struct evbuffer * inbuf )
1089{
1090    int                loaded = 0;
1091    uint8_t *          tmp = tr_new( uint8_t, msglen );
1092    tr_benc            val;
1093    const tr_torrent * tor = msgs->torrent;
1094    const uint8_t *    added;
1095    size_t             added_len;
1096
1097    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
1098
1099    if( tr_torrentAllowsPex( tor )
1100      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) )
1101      && tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1102    {
1103        const uint8_t * added_f = NULL;
1104        tr_pex *        pex;
1105        size_t          i, n;
1106        size_t          added_f_len = 0;
1107        tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1108        pex =
1109            tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len,
1110                                    &n );
1111        for( i = 0; i < n; ++i )
1112            tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1113                              TR_PEER_FROM_PEX, pex + i );
1114        tr_free( pex );
1115    }
1116
1117    if( loaded )
1118        tr_bencFree( &val );
1119    tr_free( tmp );
1120}
1121
1122static void sendPex( tr_peermsgs * msgs );
1123
1124static void
1125parseLtep( tr_peermsgs *     msgs,
1126           int               msglen,
1127           struct evbuffer * inbuf )
1128{
1129    uint8_t ltep_msgid;
1130
1131    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
1132    msglen--;
1133
1134    if( ltep_msgid == LTEP_HANDSHAKE )
1135    {
1136        dbgmsg( msgs, "got ltep handshake" );
1137        parseLtepHandshake( msgs, msglen, inbuf );
1138        if( tr_peerIoSupportsLTEP( msgs->io ) )
1139        {
1140            sendLtepHandshake( msgs );
1141            sendPex( msgs );
1142        }
1143    }
1144    else if( ltep_msgid == TR_LTEP_PEX )
1145    {
1146        dbgmsg( msgs, "got ut pex" );
1147        msgs->peerSupportsPex = 1;
1148        parseUtPex( msgs, msglen, inbuf );
1149    }
1150    else
1151    {
1152        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1153        evbuffer_drain( inbuf, msglen );
1154    }
1155}
1156
1157static int
1158readBtLength( tr_peermsgs *     msgs,
1159              struct evbuffer * inbuf,
1160              size_t            inlen )
1161{
1162    uint32_t len;
1163
1164    if( inlen < sizeof( len ) )
1165        return READ_LATER;
1166
1167    tr_peerIoReadUint32( msgs->io, inbuf, &len );
1168
1169    if( len == 0 ) /* peer sent us a keepalive message */
1170        dbgmsg( msgs, "got KeepAlive" );
1171    else
1172    {
1173        msgs->incoming.length = len;
1174        msgs->state = AWAITING_BT_ID;
1175    }
1176
1177    return READ_NOW;
1178}
1179
1180static int readBtMessage( tr_peermsgs *     msgs,
1181                          struct evbuffer * inbuf,
1182                          size_t            inlen );
1183
1184static int
1185readBtId( tr_peermsgs *     msgs,
1186          struct evbuffer * inbuf,
1187          size_t            inlen )
1188{
1189    uint8_t id;
1190
1191    if( inlen < sizeof( uint8_t ) )
1192        return READ_LATER;
1193
1194    tr_peerIoReadUint8( msgs->io, inbuf, &id );
1195    msgs->incoming.id = id;
1196
1197    if( id == BT_PIECE )
1198    {
1199        msgs->state = AWAITING_BT_PIECE;
1200        return READ_NOW;
1201    }
1202    else if( msgs->incoming.length != 1 )
1203    {
1204        msgs->state = AWAITING_BT_MESSAGE;
1205        return READ_NOW;
1206    }
1207    else return readBtMessage( msgs, inbuf, inlen - 1 );
1208}
1209
1210static void
1211updatePeerProgress( tr_peermsgs * msgs )
1212{
1213    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have )
1214                           / (float)msgs->torrent->info.pieceCount;
1215    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
1216    updateInterest( msgs );
1217    firePeerProgress( msgs );
1218}
1219
1220static int
1221clientCanSendFastBlock( const tr_peermsgs * msgs UNUSED )
1222{
1223    /* don't send a fast piece if peer has MAX_FAST_ALLOWED_THRESHOLD pieces */
1224    if( tr_bitfieldCountTrueBits( msgs->info->have ) >
1225        MAX_FAST_ALLOWED_THRESHOLD )
1226        return FALSE;
1227
1228    /* ...or if we don't have ourself enough pieces */
1229    if( tr_bitfieldCountTrueBits( tr_cpPieceBitfield( msgs->torrent->
1230                                                      completion ) ) <
1231        MAX_FAST_ALLOWED_THRESHOLD )
1232        return FALSE;
1233
1234    /* Maybe a bandwidth limit ? */
1235    return TRUE;
1236}
1237
1238static void
1239peerMadeRequest( tr_peermsgs *               msgs,
1240                 const struct peer_request * req )
1241{
1242    const int reqIsValid = requestIsValid( msgs, req );
1243    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete(
1244        msgs->torrent->completion, req->index );
1245    const int peerIsChoked = msgs->info->peerIsChoked;
1246    const int peerIsFast = tr_peerIoSupportsFEXT( msgs->io );
1247    const int pieceIsFast = reqIsValid && tr_bitfieldHas(
1248        getPeerAllowedPieces( msgs ), req->index );
1249    const int canSendFast = clientCanSendFastBlock( msgs );
1250
1251    if( !reqIsValid ) /* bad request */
1252    {
1253        dbgmsg( msgs, "rejecting an invalid request." );
1254        sendFastReject( msgs, req->index, req->offset, req->length );
1255    }
1256    else if( !clientHasPiece ) /* we don't have it */
1257    {
1258        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1259        sendFastReject( msgs, req->index, req->offset, req->length );
1260    }
1261    else if( peerIsChoked && !peerIsFast ) /* doesn't he know he's choked? */
1262    {
1263        tr_peerMsgsSetChoke( msgs, 1 );
1264        sendFastReject( msgs, req->index, req->offset, req->length );
1265    }
1266    else if( peerIsChoked && peerIsFast && ( !pieceIsFast || !canSendFast ) )
1267    {
1268        sendFastReject( msgs, req->index, req->offset, req->length );
1269    }
1270    else /* YAY */
1271    {
1272        if( peerIsFast && pieceIsFast )
1273            reqListAppend( &msgs->peerAskedForFast, req );
1274        else
1275            reqListAppend( &msgs->peerAskedFor, req );
1276    }
1277}
1278
1279static int
1280messageLengthIsCorrect( const tr_peermsgs * msg,
1281                        uint8_t             id,
1282                        uint32_t            len )
1283{
1284    switch( id )
1285    {
1286        case BT_CHOKE:
1287        case BT_UNCHOKE:
1288        case BT_INTERESTED:
1289        case BT_NOT_INTERESTED:
1290        case BT_HAVE_ALL:
1291        case BT_HAVE_NONE:
1292            return len == 1;
1293
1294        case BT_HAVE:
1295        case BT_SUGGEST:
1296        case BT_ALLOWED_FAST:
1297            return len == 5;
1298
1299        case BT_BITFIELD:
1300            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1301
1302        case BT_REQUEST:
1303        case BT_CANCEL:
1304        case BT_REJECT:
1305            return len == 13;
1306
1307        case BT_PIECE:
1308            return len > 9 && len <= 16393;
1309
1310        case BT_PORT:
1311            return len == 3;
1312
1313        case BT_LTEP:
1314            return len >= 2;
1315
1316        default:
1317            return FALSE;
1318    }
1319}
1320
1321static int clientGotBlock( tr_peermsgs *               msgs,
1322                           const uint8_t *             block,
1323                           const struct peer_request * req );
1324
1325static int
1326readBtPiece( tr_peermsgs      * msgs,
1327             struct evbuffer  * inbuf,
1328             size_t             inlen,
1329             size_t           * setme_piece_bytes_read )
1330{
1331    struct peer_request * req = &msgs->incoming.blockReq;
1332
1333    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1334    dbgmsg( msgs, "In readBtPiece" );
1335
1336    if( !req->length )
1337    {
1338        if( inlen < 8 )
1339            return READ_LATER;
1340
1341        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
1342        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
1343        req->length = msgs->incoming.length - 9;
1344        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index,
1345                req->offset,
1346                req->length );
1347        return READ_NOW;
1348    }
1349    else
1350    {
1351        int          err;
1352
1353        /* read in another chunk of data */
1354        const size_t nLeft = req->length - EVBUFFER_LENGTH(
1355            msgs->incoming.block );
1356        size_t       n = MIN( nLeft, inlen );
1357        uint8_t *    buf = tr_new( uint8_t, n );
1358        assert( EVBUFFER_LENGTH( inbuf ) >= n );
1359        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
1360        evbuffer_add( msgs->incoming.block, buf, n );
1361        fireClientGotData( msgs, n, TRUE );
1362        *setme_piece_bytes_read += n;
1363        tr_free( buf );
1364        dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
1365               (int)n, req->index, req->offset, req->length,
1366               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1367        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1368            return READ_LATER;
1369
1370        /* we've got the whole block ... process it */
1371        err = clientGotBlock( msgs, EVBUFFER_DATA(
1372                                  msgs->incoming.block ), req );
1373
1374        /* cleanup */
1375        evbuffer_drain( msgs->incoming.block,
1376                       EVBUFFER_LENGTH( msgs->incoming.block ) );
1377        req->length = 0;
1378        msgs->state = AWAITING_BT_LENGTH;
1379        if( !err )
1380            return READ_NOW;
1381        else
1382        {
1383            fireError( msgs, err );
1384            return READ_ERR;
1385        }
1386    }
1387}
1388
1389static int
1390readBtMessage( tr_peermsgs *     msgs,
1391               struct evbuffer * inbuf,
1392               size_t            inlen )
1393{
1394    uint32_t      ui32;
1395    uint32_t      msglen = msgs->incoming.length;
1396    const uint8_t id = msgs->incoming.id;
1397    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1398
1399    --msglen; /* id length */
1400
1401    if( inlen < msglen )
1402        return READ_LATER;
1403
1404    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id,
1405            (int)msglen,
1406            (int)inlen );
1407
1408    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1409    {
1410        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d",
1411                (int)id, (int)msglen );
1412        fireError( msgs, EMSGSIZE );
1413        return READ_ERR;
1414    }
1415
1416    switch( id )
1417    {
1418        case BT_CHOKE:
1419            dbgmsg( msgs, "got Choke" );
1420            msgs->info->clientIsChoked = 1;
1421            cancelAllRequestsToPeer( msgs, FALSE );
1422            break;
1423
1424        case BT_UNCHOKE:
1425            dbgmsg( msgs, "got Unchoke" );
1426            msgs->info->clientIsChoked = 0;
1427            fireNeedReq( msgs );
1428            break;
1429
1430        case BT_INTERESTED:
1431            dbgmsg( msgs, "got Interested" );
1432            msgs->info->peerIsInterested = 1;
1433            break;
1434
1435        case BT_NOT_INTERESTED:
1436            dbgmsg( msgs, "got Not Interested" );
1437            msgs->info->peerIsInterested = 0;
1438            break;
1439
1440        case BT_HAVE:
1441            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1442            dbgmsg( msgs, "got Have: %u", ui32 );
1443            if( tr_bitfieldAdd( msgs->info->have, ui32 ) )
1444            {
1445                fireError( msgs, ERANGE );
1446                return READ_ERR;
1447            }
1448            updatePeerProgress( msgs );
1449            tr_rcTransferred( msgs->torrent->swarmSpeed,
1450                              msgs->torrent->info.pieceSize );
1451            break;
1452
1453        case BT_BITFIELD:
1454        {
1455            dbgmsg( msgs, "got a bitfield" );
1456            msgs->peerSentBitfield = 1;
1457            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits,
1458                                msglen );
1459            updatePeerProgress( msgs );
1460            maybeSendFastAllowedSet( msgs );
1461            fireNeedReq( msgs );
1462            break;
1463        }
1464
1465        case BT_REQUEST:
1466        {
1467            struct peer_request r;
1468            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1469            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1470            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1471            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset,
1472                    r.length );
1473            peerMadeRequest( msgs, &r );
1474            break;
1475        }
1476
1477        case BT_CANCEL:
1478        {
1479            struct peer_request r;
1480            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1481            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1482            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1483            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset,
1484                    r.length );
1485            reqListRemove( &msgs->peerAskedForFast, &r );
1486            reqListRemove( &msgs->peerAskedFor, &r );
1487            break;
1488        }
1489
1490        case BT_PIECE:
1491            assert( 0 ); /* handled elsewhere! */
1492            break;
1493
1494        case BT_PORT:
1495            dbgmsg( msgs, "Got a BT_PORT" );
1496            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1497            break;
1498
1499        case BT_SUGGEST:
1500        {
1501            dbgmsg( msgs, "Got a BT_SUGGEST" );
1502            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1503            /* we don't do anything with this yet */
1504            break;
1505        }
1506
1507        case BT_HAVE_ALL:
1508            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1509            tr_bitfieldAddRange( msgs->info->have, 0,
1510                                 msgs->torrent->info.pieceCount );
1511            updatePeerProgress( msgs );
1512            maybeSendFastAllowedSet( msgs );
1513            break;
1514
1515
1516        case BT_HAVE_NONE:
1517            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1518            tr_bitfieldClear( msgs->info->have );
1519            updatePeerProgress( msgs );
1520            maybeSendFastAllowedSet( msgs );
1521            break;
1522
1523        case BT_REJECT:
1524        {
1525            struct peer_request r;
1526            dbgmsg( msgs, "Got a BT_REJECT" );
1527            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1528            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1529            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1530            reqListRemove( &msgs->clientAskedFor, &r );
1531            break;
1532        }
1533
1534        case BT_ALLOWED_FAST:
1535        {
1536            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1537            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1538            /* we don't do anything with this yet */
1539            break;
1540        }
1541
1542        case BT_LTEP:
1543            dbgmsg( msgs, "Got a BT_LTEP" );
1544            parseLtep( msgs, msglen, inbuf );
1545            break;
1546
1547        default:
1548            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1549            tr_peerIoDrain( msgs->io, inbuf, msglen );
1550            break;
1551    }
1552
1553    assert( msglen + 1 == msgs->incoming.length );
1554    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1555
1556    msgs->state = AWAITING_BT_LENGTH;
1557    return READ_NOW;
1558}
1559
1560static void
1561decrementDownloadedCount( tr_peermsgs * msgs,
1562                          uint32_t      byteCount )
1563{
1564    tr_torrent * tor = msgs->torrent;
1565
1566    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1567}
1568
1569static void
1570clientGotUnwantedBlock( tr_peermsgs *               msgs,
1571                        const struct peer_request * req )
1572{
1573    decrementDownloadedCount( msgs, req->length );
1574}
1575
1576static void
1577addPeerToBlamefield( tr_peermsgs * msgs,
1578                     uint32_t      index )
1579{
1580    if( !msgs->info->blame )
1581        msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1582    tr_bitfieldAdd( msgs->info->blame, index );
1583}
1584
1585/* returns 0 on success, or an errno on failure */
1586static int
1587clientGotBlock( tr_peermsgs *               msgs,
1588                const uint8_t *             data,
1589                const struct peer_request * req )
1590{
1591    int                    err;
1592    tr_torrent *           tor = msgs->torrent;
1593    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1594
1595    assert( msgs );
1596    assert( req );
1597
1598    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) )
1599    {
1600        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1601                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1602        return EMSGSIZE;
1603    }
1604
1605    /* save the block */
1606    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset,
1607            req->length );
1608
1609    /**
1610    *** Remove the block from our `we asked for this' list
1611    **/
1612
1613    if( !reqListRemove( &msgs->clientAskedFor, req ) )
1614    {
1615        clientGotUnwantedBlock( msgs, req );
1616        dbgmsg( msgs, "we didn't ask for this message..." );
1617        return 0;
1618    }
1619
1620    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1621            msgs->clientAskedFor.count );
1622
1623    /**
1624    *** Error checks
1625    **/
1626
1627    if( tr_cpBlockIsComplete( tor->completion, block ) )
1628    {
1629        dbgmsg( msgs, "we have this block already..." );
1630        clientGotUnwantedBlock( msgs, req );
1631        return 0;
1632    }
1633
1634    /**
1635    ***  Save the block
1636    **/
1637
1638    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1639        return err;
1640
1641    addPeerToBlamefield( msgs, req->index );
1642    fireGotBlock( msgs, req );
1643    return 0;
1644}
1645
1646static int peerPulse( void * vmsgs );
1647
1648static void
1649didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1650{
1651    tr_peermsgs * msgs = vmsgs;
1652    firePeerGotData( msgs, bytesWritten, wasPieceData );
1653    peerPulse( msgs );
1654}
1655
1656static ReadState
1657canRead( struct tr_iobuf * iobuf, void * vmsgs, size_t * piece )
1658{
1659    ReadState         ret;
1660    tr_peermsgs *     msgs = vmsgs;
1661    struct evbuffer * in = tr_iobuf_input( iobuf );
1662    const size_t      inlen = EVBUFFER_LENGTH( in );
1663
1664    if( !inlen )
1665    {
1666        ret = READ_LATER;
1667    }
1668    else if( msgs->state == AWAITING_BT_PIECE )
1669    {
1670        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1671    }
1672    else switch( msgs->state )
1673    {
1674        case AWAITING_BT_LENGTH:
1675            ret = readBtLength ( msgs, in, inlen ); break;
1676
1677        case AWAITING_BT_ID:
1678            ret = readBtId     ( msgs, in, inlen ); break;
1679
1680        case AWAITING_BT_MESSAGE:
1681            ret = readBtMessage( msgs, in, inlen ); break;
1682
1683        default:
1684            assert( 0 );
1685    }
1686
1687    /* log the raw data that was read */
1688    if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
1689        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1690
1691    return ret;
1692}
1693
1694/**
1695***
1696**/
1697
1698static int
1699ratePulse( void * vmsgs )
1700{
1701    tr_peermsgs * msgs = vmsgs;
1702    const double rateToClient = tr_peerGetPieceSpeed( msgs->info, TR_PEER_TO_CLIENT );
1703    const int estimatedBlocksInNext30Seconds =
1704                  ( rateToClient * 30 * 1024 ) / msgs->torrent->blockSize;
1705    msgs->minActiveRequests = 8;
1706    msgs->maxActiveRequests = msgs->minActiveRequests + estimatedBlocksInNext30Seconds;
1707    if( msgs->reqq > 0 )
1708        msgs->maxActiveRequests = MIN( msgs->maxActiveRequests, msgs->reqq );
1709    return TRUE;
1710}
1711
1712static int
1713popNextRequest( tr_peermsgs *         msgs,
1714                struct peer_request * setme )
1715{
1716    return reqListPop( &msgs->peerAskedForFast, setme )
1717        || reqListPop( &msgs->peerAskedFor, setme );
1718}
1719
1720static size_t
1721fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1722{
1723    size_t bytesWritten = 0;
1724    struct peer_request req;
1725    const int haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1726
1727    /**
1728    ***  Protocol messages
1729    **/
1730
1731    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1732    {
1733        dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1734        msgs->outMessagesBatchedAt = now;
1735    }
1736    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1737    {
1738        const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
1739        /* flush the protocol messages */
1740        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->io, len );
1741        tr_peerIoWriteBuf( msgs->io, msgs->outMessages, FALSE );
1742        msgs->clientSentAnythingAt = now;
1743        msgs->outMessagesBatchedAt = 0;
1744        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1745        bytesWritten +=  len;
1746    }
1747
1748    /**
1749    ***  Blocks
1750    **/
1751
1752    if( ( tr_peerIoGetWriteBufferSpace( msgs->io ) >= msgs->torrent->blockSize )
1753        && popNextRequest( msgs, &req )
1754        && requestIsValid( msgs, &req )
1755        && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) )
1756    {
1757        /* send a block */
1758        uint8_t * buf = tr_new( uint8_t, req.length );
1759        const int err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf );
1760        if( err ) {
1761            fireError( msgs, err );
1762            bytesWritten = 0;
1763            msgs = NULL;
1764        } else {
1765            tr_peerIo * io = msgs->io;
1766            struct evbuffer * out = evbuffer_new( );
1767            dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1768            tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1769            tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1770            tr_peerIoWriteUint32( io, out, req.index );
1771            tr_peerIoWriteUint32( io, out, req.offset );
1772            tr_peerIoWriteBytes ( io, out, buf, req.length );
1773            tr_peerIoWriteBuf( io, out, TRUE );
1774            bytesWritten += EVBUFFER_LENGTH( out );
1775            evbuffer_free( out );
1776            msgs->clientSentAnythingAt = now;
1777        }
1778        tr_free( buf );
1779    }
1780
1781    /**
1782    ***  Keepalive
1783    **/
1784
1785    if( ( msgs != NULL )
1786        && ( msgs->clientSentAnythingAt != 0 )
1787        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1788    {
1789        dbgmsg( msgs, "sending a keepalive message" );
1790        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1791        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1792    }
1793
1794    return bytesWritten;
1795}
1796
1797static int
1798peerPulse( void * vmsgs )
1799{
1800    tr_peermsgs * msgs = vmsgs;
1801    const time_t  now = time( NULL );
1802
1803    ratePulse( msgs );
1804
1805    pumpRequestQueue( msgs, now );
1806    expireOldRequests( msgs, now );
1807
1808    for( ;; )
1809        if( fillOutputBuffer( msgs, now ) < 1 )
1810            break;
1811
1812    return TRUE; /* loop forever */
1813}
1814
1815void
1816tr_peerMsgsPulse( tr_peermsgs * msgs )
1817{
1818    if( msgs != NULL )
1819        peerPulse( msgs );
1820}
1821
1822static void
1823gotError( struct tr_iobuf  * iobuf UNUSED,
1824          short              what,
1825          void             * vmsgs )
1826{
1827    if( what & EVBUFFER_TIMEOUT )
1828        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1829    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1830        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1831               what, errno, tr_strerror( errno ) );
1832    fireError( vmsgs, ENOTCONN );
1833}
1834
1835static void
1836sendBitfield( tr_peermsgs * msgs )
1837{
1838    struct evbuffer * out = msgs->outMessages;
1839    tr_bitfield *     field;
1840    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
1841    size_t            i;
1842    size_t            lazyCount = 0;
1843
1844    field = tr_bitfieldDup( tr_cpPieceBitfield( msgs->torrent->completion ) );
1845
1846    if( tr_sessionIsLazyBitfieldEnabled( msgs->session ) )
1847    {
1848        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
1849            speed over a truly random sample -- let's limit the pool size to
1850            the first 1000 pieces so large torrents don't bog things down */
1851        size_t poolSize;
1852        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
1853        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
1854
1855        /* build the pool */
1856        for( i=poolSize=0; i<maxPoolSize; ++i )
1857            if( tr_bitfieldHas( field, i ) )
1858                pool[poolSize++] = i;
1859
1860        /* pull random piece indices from the pool */
1861        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
1862        {
1863            const int pos = tr_cryptoWeakRandInt( poolSize );
1864            const tr_piece_index_t piece = pool[pos];
1865            tr_bitfieldRem( field, piece );
1866            lazyPieces[lazyCount++] = piece;
1867            pool[pos] = pool[--poolSize];
1868        }
1869
1870        /* cleanup */
1871        tr_free( pool );
1872    }
1873
1874    tr_peerIoWriteUint32( msgs->io, out,
1875                          sizeof( uint8_t ) + field->byteCount );
1876    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1877    tr_peerIoWriteBytes ( msgs->io, out, field->bits, field->byteCount );
1878    dbgmsg( msgs, "sending bitfield... outMessage size is now %d",
1879           (int)EVBUFFER_LENGTH( out ) );
1880    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1881
1882    for( i = 0; i < lazyCount; ++i )
1883        protocolSendHave( msgs, lazyPieces[i] );
1884
1885    tr_bitfieldFree( field );
1886}
1887
1888/**
1889***
1890**/
1891
1892/* some peers give us error messages if we send
1893   more than this many peers in a single pex message
1894   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1895#define MAX_PEX_ADDED 50
1896#define MAX_PEX_DROPPED 50
1897
1898typedef struct
1899{
1900    tr_pex *  added;
1901    tr_pex *  dropped;
1902    tr_pex *  elements;
1903    int       addedCount;
1904    int       droppedCount;
1905    int       elementCount;
1906}
1907PexDiffs;
1908
1909static void
1910pexAddedCb( void * vpex,
1911            void * userData )
1912{
1913    PexDiffs * diffs = userData;
1914    tr_pex *   pex = vpex;
1915
1916    if( diffs->addedCount < MAX_PEX_ADDED )
1917    {
1918        diffs->added[diffs->addedCount++] = *pex;
1919        diffs->elements[diffs->elementCount++] = *pex;
1920    }
1921}
1922
1923static void
1924pexDroppedCb( void * vpex,
1925              void * userData )
1926{
1927    PexDiffs * diffs = userData;
1928    tr_pex *   pex = vpex;
1929
1930    if( diffs->droppedCount < MAX_PEX_DROPPED )
1931    {
1932        diffs->dropped[diffs->droppedCount++] = *pex;
1933    }
1934}
1935
1936static void
1937pexElementCb( void * vpex,
1938              void * userData )
1939{
1940    PexDiffs * diffs = userData;
1941    tr_pex *   pex = vpex;
1942
1943    diffs->elements[diffs->elementCount++] = *pex;
1944}
1945
1946static void
1947sendPex( tr_peermsgs * msgs )
1948{
1949    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
1950    {
1951        PexDiffs diffs;
1952        tr_pex * newPex = NULL;
1953        const int newCount = tr_peerMgrGetPeers( msgs->session->peerMgr,
1954                                                 msgs->torrent->info.hash,
1955                                                 &newPex );
1956
1957        /* build the diffs */
1958        diffs.added = tr_new( tr_pex, newCount );
1959        diffs.addedCount = 0;
1960        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1961        diffs.droppedCount = 0;
1962        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1963        diffs.elementCount = 0;
1964        tr_set_compare( msgs->pex, msgs->pexCount,
1965                        newPex, newCount,
1966                        tr_pexCompare, sizeof( tr_pex ),
1967                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
1968        dbgmsg(
1969            msgs,
1970            "pex: old peer count %d, new peer count %d, added %d, removed %d",
1971            msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1972
1973        if( !diffs.addedCount && !diffs.droppedCount )
1974        {
1975            tr_free( diffs.elements );
1976        }
1977        else
1978        {
1979            int  i;
1980            tr_benc val;
1981            char * benc;
1982            int bencLen;
1983            uint8_t * tmp, *walk;
1984            struct evbuffer * out = msgs->outMessages;
1985
1986            /* update peer */
1987            tr_free( msgs->pex );
1988            msgs->pex = diffs.elements;
1989            msgs->pexCount = diffs.elementCount;
1990
1991            /* build the pex payload */
1992            tr_bencInitDict( &val, 3 );
1993
1994            /* "added" */
1995            tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1996            for( i = 0; i < diffs.addedCount; ++i ) {
1997                memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1998                memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1999            }
2000            assert( ( walk - tmp ) == diffs.addedCount * 6 );
2001            tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2002            tr_free( tmp );
2003
2004            /* "added.f" */
2005            tmp = walk = tr_new( uint8_t, diffs.addedCount );
2006            for( i = 0; i < diffs.addedCount; ++i )
2007                *walk++ = diffs.added[i].flags;
2008            assert( ( walk - tmp ) == diffs.addedCount );
2009            tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2010            tr_free( tmp );
2011
2012            /* "dropped" */
2013            tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2014            for( i = 0; i < diffs.droppedCount; ++i ) {
2015                memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
2016                memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2017            }
2018            assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2019            tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2020            tr_free( tmp );
2021
2022            /* write the pex message */
2023            benc = tr_bencSave( &val, &bencLen );
2024            tr_peerIoWriteUint32( msgs->io, out, 2 * sizeof( uint8_t ) + bencLen );
2025            tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
2026            tr_peerIoWriteUint8 ( msgs->io, out, msgs->ut_pex_id );
2027            tr_peerIoWriteBytes ( msgs->io, out, benc, bencLen );
2028            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2029            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
2030
2031            tr_free( benc );
2032            tr_bencFree( &val );
2033        }
2034
2035        /* cleanup */
2036        tr_free( diffs.added );
2037        tr_free( diffs.dropped );
2038        tr_free( newPex );
2039
2040        msgs->clientSentPexAt = time( NULL );
2041    }
2042}
2043
2044static int
2045pexPulse( void * vpeer )
2046{
2047    sendPex( vpeer );
2048    return TRUE;
2049}
2050
2051/**
2052***
2053**/
2054
2055tr_peermsgs*
2056tr_peerMsgsNew( struct tr_torrent * torrent,
2057                struct tr_peer *    info,
2058                tr_delivery_func    func,
2059                void *              userData,
2060                tr_publisher_tag *  setme )
2061{
2062    tr_peermsgs * m;
2063
2064    assert( info );
2065    assert( info->io );
2066
2067    m = tr_new0( tr_peermsgs, 1 );
2068    m->publisher = tr_publisherNew( );
2069    m->info = info;
2070    m->session = torrent->session;
2071    m->torrent = torrent;
2072    m->io = info->io;
2073    m->info->clientIsChoked = 1;
2074    m->info->peerIsChoked = 1;
2075    m->info->clientIsInterested = 0;
2076    m->info->peerIsInterested = 0;
2077    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
2078    m->state = AWAITING_BT_LENGTH;
2079    m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL );
2080    m->outMessages = evbuffer_new( );
2081    m->outMessagesBatchedAt = 0;
2082    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2083    m->incoming.block = evbuffer_new( );
2084    m->peerAllowedPieces = NULL;
2085    m->peerAskedFor = REQUEST_LIST_INIT;
2086    m->peerAskedForFast = REQUEST_LIST_INIT;
2087    m->clientAskedFor = REQUEST_LIST_INIT;
2088    m->clientWillAskFor = REQUEST_LIST_INIT;
2089    *setme = tr_publisherSubscribe( m->publisher, func, userData );
2090
2091    if( tr_peerIoSupportsLTEP( m->io ) )
2092        sendLtepHandshake( m );
2093
2094    sendBitfield( m );
2095
2096    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of
2097                                             inactivity */
2098    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
2099    ratePulse( m );
2100
2101    return m;
2102}
2103
2104void
2105tr_peerMsgsFree( tr_peermsgs* msgs )
2106{
2107    if( msgs )
2108    {
2109        tr_timerFree( &msgs->pexTimer );
2110        tr_publisherFree( &msgs->publisher );
2111        reqListClear( &msgs->clientWillAskFor );
2112        reqListClear( &msgs->clientAskedFor );
2113        reqListClear( &msgs->peerAskedForFast );
2114        reqListClear( &msgs->peerAskedFor );
2115        tr_bitfieldFree( msgs->peerAllowedPieces );
2116        evbuffer_free( msgs->incoming.block );
2117        evbuffer_free( msgs->outMessages );
2118        tr_free( msgs->pex );
2119
2120        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2121        tr_free( msgs );
2122    }
2123}
2124
2125void
2126tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2127                        tr_publisher_tag tag )
2128{
2129    tr_publisherUnsubscribe( peer->publisher, tag );
2130}
2131
Note: See TracBrowser for help on using the repository browser.