source: trunk/libtransmission/peer-msgs.c @ 7169

Last change on this file since 7169 was 7169, checked in by charles, 13 years ago

(libT) #1527: don't send pex messages if we don't have any new information to send

  • Property svn:keywords set to Date Rev Author Id
File size: 60.0 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 7169 2008-11-28 05:48:17Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <event.h>
21
22#include "transmission.h"
23#include "bencode.h"
24#include "completion.h"
25#include "crypto.h"
26#include "inout.h"
27#include "iobuf.h"
28#ifdef WIN32
29#include "net.h" /* for ECONN */
30#endif
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-mgr-private.h"
34#include "peer-msgs.h"
35#include "ratecontrol.h"
36#include "stats.h"
37#include "torrent.h"
38#include "trevent.h"
39#include "utils.h"
40
41/**
42***
43**/
44
45enum
46{
47    BT_CHOKE                = 0,
48    BT_UNCHOKE              = 1,
49    BT_INTERESTED           = 2,
50    BT_NOT_INTERESTED       = 3,
51    BT_HAVE                 = 4,
52    BT_BITFIELD             = 5,
53    BT_REQUEST              = 6,
54    BT_PIECE                = 7,
55    BT_CANCEL               = 8,
56    BT_PORT                 = 9,
57    BT_SUGGEST              = 13,
58    BT_HAVE_ALL             = 14,
59    BT_HAVE_NONE            = 15,
60    BT_REJECT               = 16,
61    BT_ALLOWED_FAST         = 17,
62    BT_LTEP                 = 20,
63
64    LTEP_HANDSHAKE          = 0,
65
66    TR_LTEP_PEX             = 1,
67
68    MIN_CHOKE_PERIOD_SEC    = ( 10 ),
69
70    /* idle seconds before we send a keepalive */
71    KEEPALIVE_INTERVAL_SECS = 100,
72
73    PEX_INTERVAL            = ( 90 * 1000 ), /* msec between sendPex() calls */
74    PEER_PULSE_INTERVAL     = ( 250 ),       /* msec between peerPulse() calls
75                                               */
76
77    MAX_QUEUE_SIZE          = ( 100 ),
78
79    /* (fast peers) max number of pieces we fast-allow to another peer */
80    MAX_FAST_ALLOWED_COUNT   = 10,
81
82    /* (fast peers) max threshold for allowing fast-pieces requests */
83    MAX_FAST_ALLOWED_THRESHOLD = 10,
84
85    /* how long an unsent request can stay queued before it's returned
86       back to the peer-mgr's pool of requests */
87    QUEUED_REQUEST_TTL_SECS = 20,
88
89    /* how long a sent request can stay queued before it's returned
90       back to the peer-mgr's pool of requests */
91    SENT_REQUEST_TTL_SECS = 240,
92
93    /* used in lowering the outMessages queue period */
94    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
95    HIGH_PRIORITY_INTERVAL_SECS = 2,
96    LOW_PRIORITY_INTERVAL_SECS = 20,
97
98    /* number of pieces to remove from the bitfield when
99     * lazy bitfields are turned on */
100    LAZY_PIECE_COUNT = 26
101};
102
103/**
104***  REQUEST MANAGEMENT
105**/
106
107enum
108{
109    AWAITING_BT_LENGTH,
110    AWAITING_BT_ID,
111    AWAITING_BT_MESSAGE,
112    AWAITING_BT_PIECE
113};
114
115struct peer_request
116{
117    uint32_t    index;
118    uint32_t    offset;
119    uint32_t    length;
120    time_t      time_requested;
121};
122
123static int
124compareRequest( const void * va,
125                const void * vb )
126{
127    const struct peer_request * a = va;
128    const struct peer_request * b = vb;
129
130    if( a->index != b->index )
131        return a->index < b->index ? -1 : 1;
132
133    if( a->offset != b->offset )
134        return a->offset < b->offset ? -1 : 1;
135
136    if( a->length != b->length )
137        return a->length < b->length ? -1 : 1;
138
139    return 0;
140}
141
142struct request_list
143{
144    uint16_t               count;
145    uint16_t               max;
146    struct peer_request *  requests;
147};
148
149static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
150
151static void
152reqListReserve( struct request_list * list,
153                uint16_t              max )
154{
155    if( list->max < max )
156    {
157        list->max = max;
158        list->requests = tr_renew( struct peer_request,
159                                   list->requests,
160                                   list->max );
161    }
162}
163
164static void
165reqListClear( struct request_list * list )
166{
167    tr_free( list->requests );
168    *list = REQUEST_LIST_INIT;
169}
170
171static void
172reqListCopy( struct request_list *       dest,
173             const struct request_list * src )
174{
175    dest->count = dest->max = src->count;
176    dest->requests =
177        tr_memdup( src->requests, dest->count * sizeof( struct peer_request ) );
178}
179
180static void
181reqListRemoveOne( struct request_list * list,
182                  int                   i )
183{
184    assert( 0 <= i && i < list->count );
185
186    memmove( &list->requests[i],
187            &list->requests[i + 1],
188            sizeof( struct peer_request ) * ( --list->count - i ) );
189}
190
191static void
192reqListAppend( struct request_list *       list,
193               const struct peer_request * req )
194{
195    if( ++list->count >= list->max )
196        reqListReserve( list, list->max + 8 );
197
198    list->requests[list->count - 1] = *req;
199}
200
201static int
202reqListPop( struct request_list * list,
203            struct peer_request * setme )
204{
205    int success;
206
207    if( !list->count )
208        success = FALSE;
209    else {
210        *setme = list->requests[0];
211        reqListRemoveOne( list, 0 );
212        success = TRUE;
213    }
214
215    return success;
216}
217
218static int
219reqListFind( struct request_list *       list,
220             const struct peer_request * key )
221{
222    uint16_t i;
223
224    for( i = 0; i < list->count; ++i )
225        if( !compareRequest( key, list->requests + i ) )
226            return i;
227
228    return -1;
229}
230
231static int
232reqListRemove( struct request_list *       list,
233               const struct peer_request * key )
234{
235    int success;
236    const int i = reqListFind( list, key );
237
238    if( i < 0 )
239        success = FALSE;
240    else {
241        reqListRemoveOne( list, i );
242        success = TRUE;
243    }
244
245    return success;
246}
247
248/**
249***
250**/
251
252/* this is raw, unchanged data from the peer regarding
253 * the current message that it's sending us. */
254struct tr_incoming
255{
256    uint8_t                id;
257    uint32_t               length; /* includes the +1 for id length */
258    struct peer_request    blockReq; /* metadata for incoming blocks */
259    struct evbuffer *      block; /* piece data for incoming blocks */
260};
261
262struct tr_peermsgs
263{
264    unsigned int    peerSentBitfield        : 1;
265    unsigned int    peerSupportsPex         : 1;
266    unsigned int    clientSentLtepHandshake : 1;
267    unsigned int    peerSentLtepHandshake   : 1;
268    unsigned int    sendingBlock            : 1;
269
270    uint8_t         state;
271    uint8_t         ut_pex_id;
272    uint16_t        pexCount;
273    uint16_t        minActiveRequests;
274    uint16_t        maxActiveRequests;
275
276    /* how long the outMessages batch should be allowed to grow before
277     * it's flushed -- some messages (like requests >:) should be sent
278     * very quickly; others aren't as urgent. */
279    int                    outMessagesBatchPeriod;
280
281    tr_peer *              info;
282
283    tr_session *           session;
284    tr_torrent *           torrent;
285    tr_peerIo *            io;
286
287    tr_publisher_t *       publisher;
288
289    struct evbuffer *      outBlock; /* buffer of the current piece message */
290    struct evbuffer *      outMessages; /* all the non-piece messages */
291
292    struct request_list    peerAskedFor;
293    struct request_list    peerAskedForFast;
294    struct request_list    clientAskedFor;
295    struct request_list    clientWillAskFor;
296
297    tr_timer *             pexTimer;
298
299    time_t                 clientSentPexAt;
300    time_t                 clientSentAnythingAt;
301
302    /* when we started batching the outMessages */
303    time_t                outMessagesBatchedAt;
304
305    tr_bitfield *         peerAllowedPieces;
306
307    struct tr_incoming    incoming;
308
309    tr_pex *              pex;
310};
311
312/**
313***
314**/
315
316static void
317myDebug( const char *               file,
318         int                        line,
319         const struct tr_peermsgs * msgs,
320         const char *               fmt,
321         ... )
322{
323    FILE * fp = tr_getLog( );
324
325    if( fp )
326    {
327        va_list           args;
328        char              timestr[64];
329        struct evbuffer * buf = evbuffer_new( );
330        char *            base = tr_basename( file );
331
332        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
333                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
334                             msgs->torrent->info.name,
335                             tr_peerIoGetAddrStr( msgs->io ),
336                             msgs->info->client );
337        va_start( args, fmt );
338        evbuffer_add_vprintf( buf, fmt, args );
339        va_end( args );
340        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
341        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
342
343        tr_free( base );
344        evbuffer_free( buf );
345    }
346}
347
348#define dbgmsg( msgs, ... ) \
349    do { \
350        if( tr_deepLoggingIsActive( ) ) \
351            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
352    } while( 0 )
353
354/**
355***
356**/
357
358static void
359pokeBatchPeriod( tr_peermsgs * msgs,
360                 int           interval )
361{
362    if( msgs->outMessagesBatchPeriod > interval )
363    {
364        msgs->outMessagesBatchPeriod = interval;
365        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
366    }
367}
368
369static void
370protocolSendRequest( tr_peermsgs *               msgs,
371                     const struct peer_request * req )
372{
373    tr_peerIo *       io = msgs->io;
374    struct evbuffer * out = msgs->outMessages;
375
376    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
377    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
378    tr_peerIoWriteUint32( io, out, req->index );
379    tr_peerIoWriteUint32( io, out, req->offset );
380    tr_peerIoWriteUint32( io, out, req->length );
381    dbgmsg( msgs, "requesting %u:%u->%u... outMessage size is now %d",
382           req->index, req->offset, req->length, (int)EVBUFFER_LENGTH( out ) );
383    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
384}
385
386static void
387protocolSendCancel( tr_peermsgs *               msgs,
388                    const struct peer_request * req )
389{
390    tr_peerIo *       io = msgs->io;
391    struct evbuffer * out = msgs->outMessages;
392
393    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
394    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
395    tr_peerIoWriteUint32( io, out, req->index );
396    tr_peerIoWriteUint32( io, out, req->offset );
397    tr_peerIoWriteUint32( io, out, req->length );
398    dbgmsg( msgs, "cancelling %u:%u->%u... outMessage size is now %d",
399           req->index, req->offset, req->length, (int)EVBUFFER_LENGTH( out ) );
400    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
401}
402
403static void
404protocolSendHave( tr_peermsgs * msgs,
405                  uint32_t      index )
406{
407    tr_peerIo *       io = msgs->io;
408    struct evbuffer * out = msgs->outMessages;
409
410    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + sizeof( uint32_t ) );
411    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
412    tr_peerIoWriteUint32( io, out, index );
413    dbgmsg( msgs, "sending Have %u.. outMessage size is now %d",
414           index, (int)EVBUFFER_LENGTH( out ) );
415    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
416}
417
418static void
419protocolSendChoke( tr_peermsgs * msgs,
420                   int           choke )
421{
422    tr_peerIo *       io = msgs->io;
423    struct evbuffer * out = msgs->outMessages;
424
425    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
426    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
427    dbgmsg( msgs, "sending %s... outMessage size is now %d",
428           ( choke ? "Choke" : "Unchoke" ),
429           (int)EVBUFFER_LENGTH( out ) );
430    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
431}
432
433/**
434***  EVENTS
435**/
436
437static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0 };
438
439static void
440publish( tr_peermsgs *   msgs,
441         tr_peer_event * e )
442{
443    tr_publisherPublish( msgs->publisher, msgs->info, e );
444}
445
446static void
447fireError( tr_peermsgs * msgs,
448           int           err )
449{
450    tr_peer_event e = blankEvent;
451
452    e.eventType = TR_PEER_ERROR;
453    e.err = err;
454    publish( msgs, &e );
455}
456
457static void
458fireNeedReq( tr_peermsgs * msgs )
459{
460    tr_peer_event e = blankEvent;
461
462    e.eventType = TR_PEER_NEED_REQ;
463    publish( msgs, &e );
464}
465
466static void
467firePeerProgress( tr_peermsgs * msgs )
468{
469    tr_peer_event e = blankEvent;
470
471    e.eventType = TR_PEER_PEER_PROGRESS;
472    e.progress = msgs->info->progress;
473    publish( msgs, &e );
474}
475
476static void
477fireGotBlock( tr_peermsgs *               msgs,
478              const struct peer_request * req )
479{
480    tr_peer_event e = blankEvent;
481
482    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
483    e.pieceIndex = req->index;
484    e.offset = req->offset;
485    e.length = req->length;
486    publish( msgs, &e );
487}
488
489static void
490fireClientGotData( tr_peermsgs * msgs,
491                   uint32_t      length,
492                   int           wasPieceData )
493{
494    tr_peer_event e = blankEvent;
495
496    e.length = length;
497    e.eventType = TR_PEER_CLIENT_GOT_DATA;
498    e.wasPieceData = wasPieceData;
499    publish( msgs, &e );
500}
501
502static void
503firePeerGotData( tr_peermsgs  * msgs,
504                 uint32_t       length,
505                 int            wasPieceData )
506{
507    tr_peer_event e = blankEvent;
508
509    e.length = length;
510    e.eventType = TR_PEER_PEER_GOT_DATA;
511    e.wasPieceData = wasPieceData;
512
513    publish( msgs, &e );
514}
515
516static void
517fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
518{
519    tr_peer_event e = blankEvent;
520    e.eventType = TR_PEER_CANCEL;
521    e.pieceIndex = req->index;
522    e.offset = req->offset;
523    e.length = req->length;
524    publish( msgs, &e );
525}
526
527/**
528***  INTEREST
529**/
530
531static int
532isPieceInteresting( const tr_peermsgs * peer,
533                    tr_piece_index_t    piece )
534{
535    const tr_torrent * torrent = peer->torrent;
536
537    return ( !torrent->info.pieces[piece].dnd )                 /* we want it */
538           && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have
539                                                                        */
540           && ( tr_bitfieldHas( peer->info->have, piece ) );   /* peer has it */
541}
542
543/* "interested" means we'll ask for piece data if they unchoke us */
544static int
545isPeerInteresting( const tr_peermsgs * msgs )
546{
547    tr_piece_index_t    i;
548    const tr_torrent *  torrent;
549    const tr_bitfield * bitfield;
550    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
551
552    if( clientIsSeed )
553        return FALSE;
554
555    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
556        return FALSE;
557
558    torrent = msgs->torrent;
559    bitfield = tr_cpPieceBitfield( torrent->completion );
560
561    if( !msgs->info->have )
562        return TRUE;
563
564    assert( bitfield->byteCount == msgs->info->have->byteCount );
565
566    for( i = 0; i < torrent->info.pieceCount; ++i )
567        if( isPieceInteresting( msgs, i ) )
568            return TRUE;
569
570    return FALSE;
571}
572
573static void
574sendInterest( tr_peermsgs * msgs,
575              int           weAreInterested )
576{
577    struct evbuffer * out = msgs->outMessages;
578
579    assert( msgs );
580    assert( weAreInterested == 0 || weAreInterested == 1 );
581
582    msgs->info->clientIsInterested = weAreInterested;
583    dbgmsg( msgs, "Sending %s",
584            weAreInterested ? "Interested" : "Not Interested" );
585    tr_peerIoWriteUint32( msgs->io, out, sizeof( uint8_t ) );
586    tr_peerIoWriteUint8 (
587        msgs->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
588    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
589    dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH( out ) );
590}
591
592static void
593updateInterest( tr_peermsgs * msgs )
594{
595    const int i = isPeerInteresting( msgs );
596
597    if( i != msgs->info->clientIsInterested )
598        sendInterest( msgs, i );
599    if( i )
600        fireNeedReq( msgs );
601}
602
603static void
604cancelAllRequestsToClientExceptFast( tr_peermsgs * msgs )
605{
606    reqListClear( &msgs->peerAskedFor );
607}
608
609void
610tr_peerMsgsSetChoke( tr_peermsgs * msgs,
611                     int           choke )
612{
613    const time_t now = time( NULL );
614    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
615
616    assert( msgs );
617    assert( msgs->info );
618    assert( choke == 0 || choke == 1 );
619
620    if( msgs->info->chokeChangedAt > fibrillationTime )
621    {
622        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation",
623                choke );
624    }
625    else if( msgs->info->peerIsChoked != choke )
626    {
627        msgs->info->peerIsChoked = choke;
628        if( choke )
629            cancelAllRequestsToClientExceptFast( msgs );
630        protocolSendChoke( msgs, choke );
631        msgs->info->chokeChangedAt = now;
632    }
633}
634
635/**
636***
637**/
638
639void
640tr_peerMsgsHave( tr_peermsgs * msgs,
641                 uint32_t      index )
642{
643    protocolSendHave( msgs, index );
644
645    /* since we have more pieces now, we might not be interested in this peer */
646    updateInterest( msgs );
647}
648
649#if 0
650static void
651sendFastSuggest( tr_peermsgs * msgs,
652                 uint32_t      pieceIndex )
653{
654    assert( msgs );
655
656    if( tr_peerIoSupportsFEXT( msgs->io ) )
657    {
658        tr_peerIoWriteUint32( msgs->io, msgs->outMessages,
659                             sizeof( uint8_t ) + sizeof( uint32_t ) );
660        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
661        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
662    }
663}
664
665static void
666sendFastHave( tr_peermsgs * msgs,
667              int           all )
668{
669    assert( msgs );
670
671    if( tr_peerIoSupportsFEXT( msgs->io ) )
672    {
673        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof( uint8_t ) );
674        tr_peerIoWriteUint8( msgs->io, msgs->outMessages,
675                            ( all ? BT_HAVE_ALL
676                              : BT_HAVE_NONE ) );
677        updateInterest( msgs );
678    }
679}
680
681#endif
682
683static void
684sendFastReject( tr_peermsgs * msgs,
685                uint32_t      pieceIndex,
686                uint32_t      offset,
687                uint32_t      length )
688{
689    assert( msgs );
690
691    if( tr_peerIoSupportsFEXT( msgs->io ) )
692    {
693        struct evbuffer * out = msgs->outMessages;
694        const uint32_t    len = sizeof( uint8_t ) + 3 * sizeof( uint32_t );
695        dbgmsg( msgs, "sending fast reject %u:%u->%u", pieceIndex, offset,
696                length );
697        tr_peerIoWriteUint32( msgs->io, out, len );
698        tr_peerIoWriteUint8( msgs->io, out, BT_REJECT );
699        tr_peerIoWriteUint32( msgs->io, out, pieceIndex );
700        tr_peerIoWriteUint32( msgs->io, out, offset );
701        tr_peerIoWriteUint32( msgs->io, out, length );
702        pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
703        dbgmsg( msgs, "outMessage size is now %d",
704               (int)EVBUFFER_LENGTH( out ) );
705    }
706}
707
708static tr_bitfield*
709getPeerAllowedPieces( tr_peermsgs * msgs )
710{
711    if( !msgs->peerAllowedPieces && tr_peerIoSupportsFEXT( msgs->io ) )
712    {
713        msgs->peerAllowedPieces = tr_peerMgrGenerateAllowedSet(
714            MAX_FAST_ALLOWED_COUNT,
715            msgs->torrent->info.pieceCount,
716            msgs->torrent->info.hash,
717            tr_peerIoGetAddress( msgs->io, NULL ) );
718    }
719
720    return msgs->peerAllowedPieces;
721}
722
723static void
724sendFastAllowed( tr_peermsgs * msgs,
725                 uint32_t      pieceIndex )
726{
727    assert( msgs );
728
729    if( tr_peerIoSupportsFEXT( msgs->io ) )
730    {
731        struct evbuffer * out = msgs->outMessages;
732        dbgmsg( msgs, "sending fast allowed" );
733        tr_peerIoWriteUint32( msgs->io, out,  sizeof( uint8_t ) +
734                             sizeof( uint32_t ) );
735        tr_peerIoWriteUint8( msgs->io, out, BT_ALLOWED_FAST );
736        tr_peerIoWriteUint32( msgs->io, out, pieceIndex );
737        pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
738        dbgmsg( msgs, "outMessage size is now %d",
739               (int)EVBUFFER_LENGTH( out ) );
740    }
741}
742
743static void
744sendFastAllowedSet( tr_peermsgs * msgs )
745{
746    tr_piece_index_t i = 0;
747
748    while( i <= msgs->torrent->info.pieceCount )
749    {
750        if( tr_bitfieldHas( getPeerAllowedPieces( msgs ), i ) )
751            sendFastAllowed( msgs, i );
752        i++;
753    }
754}
755
756static void
757maybeSendFastAllowedSet( tr_peermsgs * msgs )
758{
759    if( tr_bitfieldCountTrueBits( msgs->info->have ) <=
760        MAX_FAST_ALLOWED_THRESHOLD )
761        sendFastAllowedSet( msgs );
762}
763
764/**
765***
766**/
767
768static int
769reqIsValid( const tr_peermsgs * peer,
770            uint32_t            index,
771            uint32_t            offset,
772            uint32_t            length )
773{
774    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
775}
776
777static int
778requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
779{
780    return reqIsValid( msgs, req->index, req->offset, req->length );
781}
782
783static void
784expireOldRequests( tr_peermsgs * msgs, const time_t now  )
785{
786    int                 i;
787    time_t              oldestAllowed;
788    struct request_list tmp = REQUEST_LIST_INIT;
789
790    /* cancel requests that have been queued for too long */
791    oldestAllowed = now - QUEUED_REQUEST_TTL_SECS;
792    reqListCopy( &tmp, &msgs->clientWillAskFor );
793    for( i = 0; i < tmp.count; ++i )
794    {
795        const struct peer_request * req = &tmp.requests[i];
796        if( req->time_requested < oldestAllowed )
797            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
798    }
799    reqListClear( &tmp );
800
801    /* cancel requests that were sent too long ago */
802    oldestAllowed = now - SENT_REQUEST_TTL_SECS;
803    reqListCopy( &tmp, &msgs->clientAskedFor );
804    for( i = 0; i < tmp.count; ++i )
805    {
806        const struct peer_request * req = &tmp.requests[i];
807        if( req->time_requested < oldestAllowed )
808            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
809    }
810    reqListClear( &tmp );
811}
812
813static void
814pumpRequestQueue( tr_peermsgs * msgs, const time_t now )
815{
816    const int           max = msgs->maxActiveRequests;
817    const int           min = msgs->minActiveRequests;
818    int                 sent = 0;
819    int                 count = msgs->clientAskedFor.count;
820    struct peer_request req;
821
822    if( count > min )
823        return;
824    if( msgs->info->clientIsChoked )
825        return;
826    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
827        return;
828
829    while( ( count < max ) && reqListPop( &msgs->clientWillAskFor, &req ) )
830    {
831        const tr_block_index_t block =
832            _tr_block( msgs->torrent, req.index, req.offset );
833
834        assert( requestIsValid( msgs, &req ) );
835        assert( tr_bitfieldHas( msgs->info->have, req.index ) );
836
837        /* don't ask for it if we've already got it... this block may have
838         * come in from a different peer after we cancelled a request for it */
839        if( !tr_cpBlockIsComplete( msgs->torrent->completion, block ) )
840        {
841            protocolSendRequest( msgs, &req );
842            req.time_requested = now;
843            reqListAppend( &msgs->clientAskedFor, &req );
844
845            ++count;
846            ++sent;
847        }
848    }
849
850    if( sent )
851        dbgmsg( msgs,
852                "pump sent %d requests, now have %d active and %d queued",
853                sent,
854                msgs->clientAskedFor.count,
855                msgs->clientWillAskFor.count );
856
857    if( count < max )
858        fireNeedReq( msgs );
859}
860
861static int
862requestQueueIsFull( const tr_peermsgs * msgs )
863{
864    const int req_max = msgs->maxActiveRequests;
865    return msgs->clientWillAskFor.count >= req_max;
866}
867
868tr_addreq_t
869tr_peerMsgsAddRequest( tr_peermsgs *    msgs,
870                       uint32_t         index,
871                       uint32_t         offset,
872                       uint32_t         length )
873{
874    struct peer_request req;
875
876    assert( msgs );
877    assert( msgs->torrent );
878    assert( reqIsValid( msgs, index, offset, length ) );
879
880    /**
881    ***  Reasons to decline the request
882    **/
883
884    /* don't send requests to choked clients */
885    if( msgs->info->clientIsChoked )
886    {
887        dbgmsg( msgs, "declining request because they're choking us" );
888        return TR_ADDREQ_CLIENT_CHOKED;
889    }
890
891    /* peer doesn't have this piece */
892    if( !tr_bitfieldHas( msgs->info->have, index ) )
893        return TR_ADDREQ_MISSING;
894
895    /* peer's queue is full */
896    if( requestQueueIsFull( msgs ) ) {
897        dbgmsg( msgs, "declining request because we're full" );
898        return TR_ADDREQ_FULL;
899    }
900
901    /* have we already asked for this piece? */
902    req.index = index;
903    req.offset = offset;
904    req.length = length;
905    if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
906        dbgmsg( msgs, "declining because it's a duplicate" );
907        return TR_ADDREQ_DUPLICATE;
908    }
909    if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
910        dbgmsg( msgs, "declining because it's a duplicate" );
911        return TR_ADDREQ_DUPLICATE;
912    }
913
914    /**
915    ***  Accept this request
916    **/
917
918    dbgmsg( msgs, "adding req for %"PRIu32":%"PRIu32"->%"PRIu32" to our `will request' list",
919            index, offset, length );
920    req.time_requested = time( NULL );
921    reqListAppend( &msgs->clientWillAskFor, &req );
922    return TR_ADDREQ_OK;
923}
924
925static void
926cancelAllRequestsToPeer( tr_peermsgs * msgs )
927{
928    int                 i;
929    struct request_list a = msgs->clientWillAskFor;
930    struct request_list b = msgs->clientAskedFor;
931    dbgmsg( msgs, "cancelling all requests to peer" );
932
933    msgs->clientAskedFor = REQUEST_LIST_INIT;
934    msgs->clientWillAskFor = REQUEST_LIST_INIT;
935
936    for( i=0; i<a.count; ++i )
937        fireCancelledReq( msgs, &a.requests[i] );
938
939    for( i = 0; i < b.count; ++i ) {
940        fireCancelledReq( msgs, &b.requests[i] );
941        protocolSendCancel( msgs, &b.requests[i] );
942    }
943
944    reqListClear( &a );
945    reqListClear( &b );
946}
947
948void
949tr_peerMsgsCancel( tr_peermsgs * msgs,
950                   uint32_t      pieceIndex,
951                   uint32_t      offset,
952                   uint32_t      length )
953{
954    struct peer_request req;
955
956    assert( msgs != NULL );
957    assert( length > 0 );
958
959
960    /* have we asked the peer for this piece? */
961    req.index = pieceIndex;
962    req.offset = offset;
963    req.length = length;
964
965    /* if it's only in the queue and hasn't been sent yet, free it */
966    if( reqListRemove( &msgs->clientWillAskFor, &req ) ) {
967        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32"\n", pieceIndex, offset, length );
968        fireCancelledReq( msgs, &req );
969    }
970
971    /* if it's already been sent, send a cancel message too */
972    if( reqListRemove( &msgs->clientAskedFor, &req ) ) {
973        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32"\n", pieceIndex, offset, length );
974        protocolSendCancel( msgs, &req );
975        fireCancelledReq( msgs, &req );
976    }
977}
978
979/**
980***
981**/
982
983static void
984sendLtepHandshake( tr_peermsgs * msgs )
985{
986    tr_benc           val, *m;
987    char *            buf;
988    int               len;
989    int               pex;
990    struct evbuffer * out = msgs->outMessages;
991
992    if( msgs->clientSentLtepHandshake )
993        return;
994
995    dbgmsg( msgs, "sending an ltep handshake" );
996    msgs->clientSentLtepHandshake = 1;
997
998    /* decide if we want to advertise pex support */
999    if( !tr_torrentAllowsPex( msgs->torrent ) )
1000        pex = 0;
1001    else if( msgs->peerSentLtepHandshake )
1002        pex = msgs->peerSupportsPex ? 1 : 0;
1003    else
1004        pex = 1;
1005
1006    tr_bencInitDict( &val, 4 );
1007    tr_bencDictAddInt( &val, "e",
1008                       msgs->session->encryptionMode != TR_CLEAR_PREFERRED );
1009    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->session ) );
1010    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
1011    m  = tr_bencDictAddDict( &val, "m", 1 );
1012    if( pex )
1013        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
1014    buf = tr_bencSave( &val, &len );
1015
1016    tr_peerIoWriteUint32( msgs->io, out, 2 * sizeof( uint8_t ) + len );
1017    tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
1018    tr_peerIoWriteUint8 ( msgs->io, out, LTEP_HANDSHAKE );
1019    tr_peerIoWriteBytes ( msgs->io, out, buf, len );
1020    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1021    dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH( out ) );
1022
1023    /* cleanup */
1024    tr_bencFree( &val );
1025    tr_free( buf );
1026}
1027
1028static void
1029parseLtepHandshake( tr_peermsgs *     msgs,
1030                    int               len,
1031                    struct evbuffer * inbuf )
1032{
1033    int64_t   i;
1034    tr_benc   val, * sub;
1035    uint8_t * tmp = tr_new( uint8_t, len );
1036
1037    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
1038    msgs->peerSentLtepHandshake = 1;
1039
1040    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type != TYPE_DICT )
1041    {
1042        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
1043        tr_free( tmp );
1044        return;
1045    }
1046
1047    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
1048
1049    /* does the peer prefer encrypted connections? */
1050    if( tr_bencDictFindInt( &val, "e", &i ) )
1051        msgs->info->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1052                                            : ENCRYPTION_PREFERENCE_NO;
1053
1054    /* check supported messages for utorrent pex */
1055    msgs->peerSupportsPex = 0;
1056    if( tr_bencDictFindDict( &val, "m", &sub ) )
1057    {
1058        if( tr_bencDictFindInt( sub, "ut_pex", &i ) )
1059        {
1060            msgs->ut_pex_id = (uint8_t) i;
1061            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
1062            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
1063        }
1064    }
1065
1066    /* get peer's listening port */
1067    if( tr_bencDictFindInt( &val, "p", &i ) )
1068    {
1069        msgs->info->port = htons( (uint16_t)i );
1070        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
1071    }
1072
1073    tr_bencFree( &val );
1074    tr_free( tmp );
1075}
1076
1077static void
1078parseUtPex( tr_peermsgs *     msgs,
1079            int               msglen,
1080            struct evbuffer * inbuf )
1081{
1082    int                loaded = 0;
1083    uint8_t *          tmp = tr_new( uint8_t, msglen );
1084    tr_benc            val;
1085    const tr_torrent * tor = msgs->torrent;
1086    const uint8_t *    added;
1087    size_t             added_len;
1088
1089    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
1090
1091    if( tr_torrentAllowsPex( tor )
1092      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) )
1093      && tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1094    {
1095        const uint8_t * added_f = NULL;
1096        tr_pex *        pex;
1097        size_t          i, n;
1098        size_t          added_f_len = 0;
1099        tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1100        pex =
1101            tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len,
1102                                    &n );
1103        for( i = 0; i < n; ++i )
1104            tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1105                              TR_PEER_FROM_PEX, pex + i );
1106        tr_free( pex );
1107    }
1108
1109    if( loaded )
1110        tr_bencFree( &val );
1111    tr_free( tmp );
1112}
1113
1114static void sendPex( tr_peermsgs * msgs );
1115
1116static void
1117parseLtep( tr_peermsgs *     msgs,
1118           int               msglen,
1119           struct evbuffer * inbuf )
1120{
1121    uint8_t ltep_msgid;
1122
1123    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
1124    msglen--;
1125
1126    if( ltep_msgid == LTEP_HANDSHAKE )
1127    {
1128        dbgmsg( msgs, "got ltep handshake" );
1129        parseLtepHandshake( msgs, msglen, inbuf );
1130        if( tr_peerIoSupportsLTEP( msgs->io ) )
1131        {
1132            sendLtepHandshake( msgs );
1133            sendPex( msgs );
1134        }
1135    }
1136    else if( ltep_msgid == TR_LTEP_PEX )
1137    {
1138        dbgmsg( msgs, "got ut pex" );
1139        msgs->peerSupportsPex = 1;
1140        parseUtPex( msgs, msglen, inbuf );
1141    }
1142    else
1143    {
1144        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1145        evbuffer_drain( inbuf, msglen );
1146    }
1147}
1148
1149static int
1150readBtLength( tr_peermsgs *     msgs,
1151              struct evbuffer * inbuf,
1152              size_t            inlen )
1153{
1154    uint32_t len;
1155
1156    if( inlen < sizeof( len ) )
1157        return READ_LATER;
1158
1159    tr_peerIoReadUint32( msgs->io, inbuf, &len );
1160
1161    if( len == 0 ) /* peer sent us a keepalive message */
1162        dbgmsg( msgs, "got KeepAlive" );
1163    else
1164    {
1165        msgs->incoming.length = len;
1166        msgs->state = AWAITING_BT_ID;
1167    }
1168
1169    return READ_NOW;
1170}
1171
1172static int readBtMessage( tr_peermsgs *     msgs,
1173                          struct evbuffer * inbuf,
1174                          size_t            inlen );
1175
1176static int
1177readBtId( tr_peermsgs *     msgs,
1178          struct evbuffer * inbuf,
1179          size_t            inlen )
1180{
1181    uint8_t id;
1182
1183    if( inlen < sizeof( uint8_t ) )
1184        return READ_LATER;
1185
1186    tr_peerIoReadUint8( msgs->io, inbuf, &id );
1187    msgs->incoming.id = id;
1188
1189    if( id == BT_PIECE )
1190    {
1191        msgs->state = AWAITING_BT_PIECE;
1192        return READ_NOW;
1193    }
1194    else if( msgs->incoming.length != 1 )
1195    {
1196        msgs->state = AWAITING_BT_MESSAGE;
1197        return READ_NOW;
1198    }
1199    else return readBtMessage( msgs, inbuf, inlen - 1 );
1200}
1201
1202static void
1203updatePeerProgress( tr_peermsgs * msgs )
1204{
1205    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have )
1206                           / (float)msgs->torrent->info.pieceCount;
1207    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
1208    updateInterest( msgs );
1209    firePeerProgress( msgs );
1210}
1211
1212static int
1213clientCanSendFastBlock( const tr_peermsgs * msgs UNUSED )
1214{
1215    /* don't send a fast piece if peer has MAX_FAST_ALLOWED_THRESHOLD pieces */
1216    if( tr_bitfieldCountTrueBits( msgs->info->have ) >
1217        MAX_FAST_ALLOWED_THRESHOLD )
1218        return FALSE;
1219
1220    /* ...or if we don't have ourself enough pieces */
1221    if( tr_bitfieldCountTrueBits( tr_cpPieceBitfield( msgs->torrent->
1222                                                      completion ) ) <
1223        MAX_FAST_ALLOWED_THRESHOLD )
1224        return FALSE;
1225
1226    /* Maybe a bandwidth limit ? */
1227    return TRUE;
1228}
1229
1230static void
1231peerMadeRequest( tr_peermsgs *               msgs,
1232                 const struct peer_request * req )
1233{
1234    const int reqIsValid = requestIsValid( msgs, req );
1235    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete(
1236        msgs->torrent->completion, req->index );
1237    const int peerIsChoked = msgs->info->peerIsChoked;
1238    const int peerIsFast = tr_peerIoSupportsFEXT( msgs->io );
1239    const int pieceIsFast = reqIsValid && tr_bitfieldHas(
1240        getPeerAllowedPieces( msgs ), req->index );
1241    const int canSendFast = clientCanSendFastBlock( msgs );
1242
1243    if( !reqIsValid ) /* bad request */
1244    {
1245        dbgmsg( msgs, "rejecting an invalid request." );
1246        sendFastReject( msgs, req->index, req->offset, req->length );
1247    }
1248    else if( !clientHasPiece ) /* we don't have it */
1249    {
1250        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1251        sendFastReject( msgs, req->index, req->offset, req->length );
1252    }
1253    else if( peerIsChoked && !peerIsFast ) /* doesn't he know he's choked? */
1254    {
1255        tr_peerMsgsSetChoke( msgs, 1 );
1256        sendFastReject( msgs, req->index, req->offset, req->length );
1257    }
1258    else if( peerIsChoked && peerIsFast && ( !pieceIsFast || !canSendFast ) )
1259    {
1260        sendFastReject( msgs, req->index, req->offset, req->length );
1261    }
1262    else /* YAY */
1263    {
1264        if( peerIsFast && pieceIsFast )
1265            reqListAppend( &msgs->peerAskedForFast, req );
1266        else
1267            reqListAppend( &msgs->peerAskedFor, req );
1268    }
1269}
1270
1271static int
1272messageLengthIsCorrect( const tr_peermsgs * msg,
1273                        uint8_t             id,
1274                        uint32_t            len )
1275{
1276    switch( id )
1277    {
1278        case BT_CHOKE:
1279        case BT_UNCHOKE:
1280        case BT_INTERESTED:
1281        case BT_NOT_INTERESTED:
1282        case BT_HAVE_ALL:
1283        case BT_HAVE_NONE:
1284            return len == 1;
1285
1286        case BT_HAVE:
1287        case BT_SUGGEST:
1288        case BT_ALLOWED_FAST:
1289            return len == 5;
1290
1291        case BT_BITFIELD:
1292            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1293
1294        case BT_REQUEST:
1295        case BT_CANCEL:
1296        case BT_REJECT:
1297            return len == 13;
1298
1299        case BT_PIECE:
1300            return len > 9 && len <= 16393;
1301
1302        case BT_PORT:
1303            return len == 3;
1304
1305        case BT_LTEP:
1306            return len >= 2;
1307
1308        default:
1309            return FALSE;
1310    }
1311}
1312
1313static int clientGotBlock( tr_peermsgs *               msgs,
1314                           const uint8_t *             block,
1315                           const struct peer_request * req );
1316
1317static int
1318readBtPiece( tr_peermsgs      * msgs,
1319             struct evbuffer  * inbuf,
1320             size_t             inlen,
1321             size_t           * setme_piece_bytes_read )
1322{
1323    struct peer_request * req = &msgs->incoming.blockReq;
1324
1325    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1326    dbgmsg( msgs, "In readBtPiece" );
1327
1328    if( !req->length )
1329    {
1330        if( inlen < 8 )
1331            return READ_LATER;
1332
1333        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
1334        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
1335        req->length = msgs->incoming.length - 9;
1336        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index,
1337                req->offset,
1338                req->length );
1339        return READ_NOW;
1340    }
1341    else
1342    {
1343        int          err;
1344
1345        /* read in another chunk of data */
1346        const size_t nLeft = req->length - EVBUFFER_LENGTH(
1347            msgs->incoming.block );
1348        size_t       n = MIN( nLeft, inlen );
1349        uint8_t *    buf = tr_new( uint8_t, n );
1350        assert( EVBUFFER_LENGTH( inbuf ) >= n );
1351        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
1352        evbuffer_add( msgs->incoming.block, buf, n );
1353        fireClientGotData( msgs, n, TRUE );
1354        *setme_piece_bytes_read += n;
1355        tr_free( buf );
1356        dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
1357               (int)n, req->index, req->offset, req->length,
1358               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1359        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1360            return READ_LATER;
1361
1362        /* we've got the whole block ... process it */
1363        err = clientGotBlock( msgs, EVBUFFER_DATA(
1364                                  msgs->incoming.block ), req );
1365
1366        /* cleanup */
1367        evbuffer_drain( msgs->incoming.block,
1368                       EVBUFFER_LENGTH( msgs->incoming.block ) );
1369        req->length = 0;
1370        msgs->state = AWAITING_BT_LENGTH;
1371        if( !err )
1372            return READ_NOW;
1373        else
1374        {
1375            fireError( msgs, err );
1376            return READ_ERR;
1377        }
1378    }
1379}
1380
1381static int
1382readBtMessage( tr_peermsgs *     msgs,
1383               struct evbuffer * inbuf,
1384               size_t            inlen )
1385{
1386    uint32_t      ui32;
1387    uint32_t      msglen = msgs->incoming.length;
1388    const uint8_t id = msgs->incoming.id;
1389    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1390
1391    --msglen; /* id length */
1392
1393    if( inlen < msglen )
1394        return READ_LATER;
1395
1396    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id,
1397            (int)msglen,
1398            (int)inlen );
1399
1400    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1401    {
1402        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d",
1403                (int)id, (int)msglen );
1404        fireError( msgs, EMSGSIZE );
1405        return READ_ERR;
1406    }
1407
1408    switch( id )
1409    {
1410        case BT_CHOKE:
1411            dbgmsg( msgs, "got Choke" );
1412            msgs->info->clientIsChoked = 1;
1413            cancelAllRequestsToPeer( msgs );
1414            cancelAllRequestsToClientExceptFast( msgs );
1415            break;
1416
1417        case BT_UNCHOKE:
1418            dbgmsg( msgs, "got Unchoke" );
1419            msgs->info->clientIsChoked = 0;
1420            fireNeedReq( msgs );
1421            break;
1422
1423        case BT_INTERESTED:
1424            dbgmsg( msgs, "got Interested" );
1425            msgs->info->peerIsInterested = 1;
1426            break;
1427
1428        case BT_NOT_INTERESTED:
1429            dbgmsg( msgs, "got Not Interested" );
1430            msgs->info->peerIsInterested = 0;
1431            break;
1432
1433        case BT_HAVE:
1434            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1435            dbgmsg( msgs, "got Have: %u", ui32 );
1436            if( tr_bitfieldAdd( msgs->info->have, ui32 ) )
1437                fireError( msgs, ERANGE );
1438            updatePeerProgress( msgs );
1439            tr_rcTransferred( msgs->torrent->swarmSpeed,
1440                              msgs->torrent->info.pieceSize );
1441            break;
1442
1443        case BT_BITFIELD:
1444        {
1445            dbgmsg( msgs, "got a bitfield" );
1446            msgs->peerSentBitfield = 1;
1447            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits,
1448                                msglen );
1449            updatePeerProgress( msgs );
1450            maybeSendFastAllowedSet( msgs );
1451            fireNeedReq( msgs );
1452            break;
1453        }
1454
1455        case BT_REQUEST:
1456        {
1457            struct peer_request r;
1458            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1459            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1460            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1461            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset,
1462                    r.length );
1463            peerMadeRequest( msgs, &r );
1464            break;
1465        }
1466
1467        case BT_CANCEL:
1468        {
1469            struct peer_request r;
1470            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1471            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1472            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1473            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset,
1474                    r.length );
1475            reqListRemove( &msgs->peerAskedForFast, &r );
1476            reqListRemove( &msgs->peerAskedFor, &r );
1477            break;
1478        }
1479
1480        case BT_PIECE:
1481            assert( 0 ); /* handled elsewhere! */
1482            break;
1483
1484        case BT_PORT:
1485            dbgmsg( msgs, "Got a BT_PORT" );
1486            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1487            break;
1488
1489        case BT_SUGGEST:
1490        {
1491            dbgmsg( msgs, "Got a BT_SUGGEST" );
1492            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1493            /* we don't do anything with this yet */
1494            break;
1495        }
1496
1497        case BT_HAVE_ALL:
1498            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1499            tr_bitfieldAddRange( msgs->info->have, 0,
1500                                 msgs->torrent->info.pieceCount );
1501            updatePeerProgress( msgs );
1502            maybeSendFastAllowedSet( msgs );
1503            break;
1504
1505
1506        case BT_HAVE_NONE:
1507            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1508            tr_bitfieldClear( msgs->info->have );
1509            updatePeerProgress( msgs );
1510            maybeSendFastAllowedSet( msgs );
1511            break;
1512
1513        case BT_REJECT:
1514        {
1515            struct peer_request r;
1516            dbgmsg( msgs, "Got a BT_REJECT" );
1517            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1518            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1519            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1520            reqListRemove( &msgs->clientAskedFor, &r );
1521            break;
1522        }
1523
1524        case BT_ALLOWED_FAST:
1525        {
1526            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1527            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1528            /* we don't do anything with this yet */
1529            break;
1530        }
1531
1532        case BT_LTEP:
1533            dbgmsg( msgs, "Got a BT_LTEP" );
1534            parseLtep( msgs, msglen, inbuf );
1535            break;
1536
1537        default:
1538            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1539            tr_peerIoDrain( msgs->io, inbuf, msglen );
1540            break;
1541    }
1542
1543    assert( msglen + 1 == msgs->incoming.length );
1544    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1545
1546    msgs->state = AWAITING_BT_LENGTH;
1547    return READ_NOW;
1548}
1549
1550static void
1551decrementDownloadedCount( tr_peermsgs * msgs,
1552                          uint32_t      byteCount )
1553{
1554    tr_torrent * tor = msgs->torrent;
1555
1556    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1557}
1558
1559static void
1560clientGotUnwantedBlock( tr_peermsgs *               msgs,
1561                        const struct peer_request * req )
1562{
1563    decrementDownloadedCount( msgs, req->length );
1564}
1565
1566static void
1567addPeerToBlamefield( tr_peermsgs * msgs,
1568                     uint32_t      index )
1569{
1570    if( !msgs->info->blame )
1571        msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1572    tr_bitfieldAdd( msgs->info->blame, index );
1573}
1574
1575/* returns 0 on success, or an errno on failure */
1576static int
1577clientGotBlock( tr_peermsgs *               msgs,
1578                const uint8_t *             data,
1579                const struct peer_request * req )
1580{
1581    int                    err;
1582    tr_torrent *           tor = msgs->torrent;
1583    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1584
1585    assert( msgs );
1586    assert( req );
1587
1588    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) )
1589    {
1590        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1591                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1592        return EMSGSIZE;
1593    }
1594
1595    /* save the block */
1596    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset,
1597            req->length );
1598
1599    /**
1600    *** Remove the block from our `we asked for this' list
1601    **/
1602
1603    if( !reqListRemove( &msgs->clientAskedFor, req ) )
1604    {
1605        clientGotUnwantedBlock( msgs, req );
1606        dbgmsg( msgs, "we didn't ask for this message..." );
1607        return 0;
1608    }
1609
1610    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1611            msgs->clientAskedFor.count );
1612
1613    /**
1614    *** Error checks
1615    **/
1616
1617    if( tr_cpBlockIsComplete( tor->completion, block ) )
1618    {
1619        dbgmsg( msgs, "we have this block already..." );
1620        clientGotUnwantedBlock( msgs, req );
1621        return 0;
1622    }
1623
1624    /**
1625    ***  Save the block
1626    **/
1627
1628    msgs->info->peerSentPieceDataAt = time( NULL );
1629    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1630        return err;
1631
1632    addPeerToBlamefield( msgs, req->index );
1633    fireGotBlock( msgs, req );
1634    return 0;
1635}
1636
1637static int peerPulse( void * vmsgs );
1638
1639static void
1640didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1641{
1642    tr_peermsgs * msgs = vmsgs;
1643    firePeerGotData( msgs, bytesWritten, wasPieceData );
1644    peerPulse( msgs );
1645}
1646
1647static ReadState
1648canRead( struct tr_iobuf * iobuf, void * vmsgs, size_t * piece )
1649{
1650    ReadState         ret;
1651    tr_peermsgs *     msgs = vmsgs;
1652    struct evbuffer * in = tr_iobuf_input( iobuf );
1653    const size_t      inlen = EVBUFFER_LENGTH( in );
1654
1655    if( !inlen )
1656    {
1657        ret = READ_LATER;
1658    }
1659    else if( msgs->state == AWAITING_BT_PIECE )
1660    {
1661        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1662    }
1663    else switch( msgs->state )
1664    {
1665        case AWAITING_BT_LENGTH:
1666            ret = readBtLength ( msgs, in, inlen ); break;
1667
1668        case AWAITING_BT_ID:
1669            ret = readBtId     ( msgs, in, inlen ); break;
1670
1671        case AWAITING_BT_MESSAGE:
1672            ret = readBtMessage( msgs, in, inlen ); break;
1673
1674        default:
1675            assert( 0 );
1676    }
1677
1678    /* log the raw data that was read */
1679    if( EVBUFFER_LENGTH( in ) != inlen )
1680        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1681
1682    return ret;
1683}
1684
1685static void
1686sendKeepalive( tr_peermsgs * msgs )
1687{
1688    dbgmsg( msgs, "sending a keepalive message" );
1689    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1690    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1691}
1692
1693/**
1694***
1695**/
1696
1697static int
1698ratePulse( void * vpeer )
1699{
1700    tr_peermsgs * peer = vpeer;
1701    const double rateToClient = tr_peerGetPieceSpeed( peer->info,
1702                                                      TR_PEER_TO_CLIENT );
1703    const int estimatedBlocksInNext30Seconds =
1704                  ( rateToClient * 30 * 1024 ) / peer->torrent->blockSize;
1705
1706    peer->minActiveRequests = 8;
1707    peer->maxActiveRequests = peer->minActiveRequests + estimatedBlocksInNext30Seconds;
1708    return TRUE;
1709}
1710
1711static int
1712popNextRequest( tr_peermsgs *         msgs,
1713                struct peer_request * setme )
1714{
1715    return reqListPop( &msgs->peerAskedForFast, setme )
1716        || reqListPop( &msgs->peerAskedFor, setme );
1717}
1718
1719static int
1720peerPulse( void * vmsgs )
1721{
1722    tr_peermsgs * msgs = vmsgs;
1723    const time_t  now = time( NULL );
1724
1725    ratePulse( msgs );
1726
1727    /*tr_peerIoTryRead( msgs->io );*/
1728    pumpRequestQueue( msgs, now );
1729    expireOldRequests( msgs, now );
1730
1731    if( msgs->sendingBlock )
1732    {
1733        const size_t uploadMax = tr_peerIoGetWriteBufferSpace( msgs->io );
1734        size_t       len = EVBUFFER_LENGTH( msgs->outBlock );
1735        const size_t outlen = MIN( len, uploadMax );
1736
1737        assert( len );
1738
1739        if( outlen )
1740        {
1741            tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen, TRUE );
1742            evbuffer_drain( msgs->outBlock, outlen );
1743
1744            len -= outlen;
1745            msgs->clientSentAnythingAt = now;
1746            msgs->sendingBlock = len != 0;
1747
1748            dbgmsg( msgs, "wrote %zu bytes; %zu left in block", outlen, len );
1749        }
1750        else dbgmsg( msgs, "stalled writing block... uploadMax %lu, outlen %lu", uploadMax, outlen );
1751    }
1752
1753    if( !msgs->sendingBlock )
1754    {
1755        struct peer_request req;
1756        const int haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1757
1758        if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1759        {
1760            dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1761            msgs->outMessagesBatchedAt = now;
1762        }
1763        else if( ( haveMessages ) &&
1764                 ( ( now - msgs->outMessagesBatchedAt ) > msgs->outMessagesBatchPeriod ) )
1765        {
1766            dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->io, EVBUFFER_LENGTH( msgs->outMessages ) );
1767            tr_peerIoWriteBuf( msgs->io, msgs->outMessages, FALSE );
1768            msgs->clientSentAnythingAt = now;
1769            msgs->outMessagesBatchedAt = 0;
1770            msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1771        }
1772        else if( !EVBUFFER_LENGTH( msgs->outBlock )
1773               && popNextRequest( msgs, &req )
1774               && requestIsValid( msgs, &req )
1775               && tr_cpPieceIsComplete( msgs->torrent->completion,
1776                                        req.index ) )
1777        {
1778            uint8_t * buf = tr_new( uint8_t, req.length );
1779            const int err = tr_ioRead( msgs->torrent,
1780                                       req.index, req.offset, req.length,
1781                                       buf );
1782            if( err )
1783            {
1784                fireError( msgs, err );
1785            }
1786            else
1787            {
1788                tr_peerIo *       io = msgs->io;
1789                struct evbuffer * out = msgs->outBlock;
1790
1791                dbgmsg( msgs, "sending block %u:%u->%u", req.index,
1792                        req.offset,
1793                        req.length );
1794                tr_peerIoWriteUint32(
1795                    io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) +
1796                    req.length );
1797                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1798                tr_peerIoWriteUint32( io, out, req.index );
1799                tr_peerIoWriteUint32( io, out, req.offset );
1800                tr_peerIoWriteBytes ( io, out, buf, req.length );
1801                msgs->sendingBlock = 1;
1802            }
1803
1804            tr_free( buf );
1805        }
1806        else if( ( !haveMessages )
1807               && ( now - msgs->clientSentAnythingAt ) >
1808                KEEPALIVE_INTERVAL_SECS )
1809        {
1810            sendKeepalive( msgs );
1811        }
1812    }
1813
1814    return TRUE; /* loop forever */
1815}
1816
1817void
1818tr_peerMsgsPulse( tr_peermsgs * msgs )
1819{
1820    if( msgs != NULL )
1821        peerPulse( msgs );
1822}
1823
1824static void
1825gotError( struct tr_iobuf  * iobuf UNUSED,
1826          short              what,
1827          void             * vmsgs )
1828{
1829    if( what & EVBUFFER_TIMEOUT )
1830        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1831    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1832        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1833               what, errno, tr_strerror( errno ) );
1834    fireError( vmsgs, ENOTCONN );
1835}
1836
1837static void
1838sendBitfield( tr_peermsgs * msgs )
1839{
1840    struct evbuffer * out = msgs->outMessages;
1841    tr_bitfield *     field;
1842    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
1843    size_t            i;
1844    size_t            lazyCount = 0;
1845
1846    field = tr_bitfieldDup( tr_cpPieceBitfield( msgs->torrent->completion ) );
1847
1848#if 0
1849    if( tr_sessionIsLazyBitfieldEnabled( msgs->session ) )
1850    {
1851        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
1852            speed over a truly random sample -- let's limit the pool size to
1853            the first 1000 pieces so large torrents don't bog things down */
1854        size_t poolSize;
1855        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
1856        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
1857
1858        /* build the pool */
1859        for( i=poolSize=0; i<maxPoolSize; ++i )
1860            if( tr_bitfieldHas( field, i ) )
1861                pool[poolSize++] = i;
1862
1863        /* pull random piece indices from the pool */
1864        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
1865        {
1866            const int pos = tr_cryptoWeakRandInt( poolSize );
1867            const tr_piece_index_t piece = pool[pos];
1868            tr_bitfieldRem( field, piece );
1869            lazyPieces[lazyCount++] = piece;
1870            pool[pos] = pool[--poolSize];
1871        }
1872
1873        /* cleanup */
1874        tr_free( pool );
1875    }
1876#endif
1877
1878    tr_peerIoWriteUint32( msgs->io, out,
1879                          sizeof( uint8_t ) + field->byteCount );
1880    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1881    tr_peerIoWriteBytes ( msgs->io, out, field->bits, field->byteCount );
1882    dbgmsg( msgs, "sending bitfield... outMessage size is now %d",
1883           (int)EVBUFFER_LENGTH( out ) );
1884    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1885
1886    for( i = 0; i < lazyCount; ++i )
1887        protocolSendHave( msgs, lazyPieces[i] );
1888
1889    tr_bitfieldFree( field );
1890}
1891
1892/**
1893***
1894**/
1895
1896/* some peers give us error messages if we send
1897   more than this many peers in a single pex message
1898   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1899#define MAX_PEX_ADDED 50
1900#define MAX_PEX_DROPPED 50
1901
1902typedef struct
1903{
1904    tr_pex *  added;
1905    tr_pex *  dropped;
1906    tr_pex *  elements;
1907    int       addedCount;
1908    int       droppedCount;
1909    int       elementCount;
1910}
1911PexDiffs;
1912
1913static void
1914pexAddedCb( void * vpex,
1915            void * userData )
1916{
1917    PexDiffs * diffs = userData;
1918    tr_pex *   pex = vpex;
1919
1920    if( diffs->addedCount < MAX_PEX_ADDED )
1921    {
1922        diffs->added[diffs->addedCount++] = *pex;
1923        diffs->elements[diffs->elementCount++] = *pex;
1924    }
1925}
1926
1927static void
1928pexDroppedCb( void * vpex,
1929              void * userData )
1930{
1931    PexDiffs * diffs = userData;
1932    tr_pex *   pex = vpex;
1933
1934    if( diffs->droppedCount < MAX_PEX_DROPPED )
1935    {
1936        diffs->dropped[diffs->droppedCount++] = *pex;
1937    }
1938}
1939
1940static void
1941pexElementCb( void * vpex,
1942              void * userData )
1943{
1944    PexDiffs * diffs = userData;
1945    tr_pex *   pex = vpex;
1946
1947    diffs->elements[diffs->elementCount++] = *pex;
1948}
1949
1950static void
1951sendPex( tr_peermsgs * msgs )
1952{
1953    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
1954    {
1955        PexDiffs diffs;
1956        tr_pex * newPex = NULL;
1957        const int newCount = tr_peerMgrGetPeers( msgs->session->peerMgr,
1958                                                 msgs->torrent->info.hash,
1959                                                 &newPex );
1960
1961        /* build the diffs */
1962        diffs.added = tr_new( tr_pex, newCount );
1963        diffs.addedCount = 0;
1964        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1965        diffs.droppedCount = 0;
1966        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1967        diffs.elementCount = 0;
1968        tr_set_compare( msgs->pex, msgs->pexCount,
1969                        newPex, newCount,
1970                        tr_pexCompare, sizeof( tr_pex ),
1971                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
1972        dbgmsg(
1973            msgs,
1974            "pex: old peer count %d, new peer count %d, added %d, removed %d",
1975            msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1976
1977        if( diffs.addedCount || diffs.droppedCount )
1978        {
1979            int  i;
1980            tr_benc val;
1981            char * benc;
1982            int bencLen;
1983            uint8_t * tmp, *walk;
1984            struct evbuffer * out = msgs->outMessages;
1985
1986            /* update peer */
1987            tr_free( msgs->pex );
1988            msgs->pex = diffs.elements;
1989            msgs->pexCount = diffs.elementCount;
1990
1991            /* build the pex payload */
1992            tr_bencInitDict( &val, 3 );
1993
1994            /* "added" */
1995            tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1996            for( i = 0; i < diffs.addedCount; ++i ) {
1997                memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1998                memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1999            }
2000            assert( ( walk - tmp ) == diffs.addedCount * 6 );
2001            tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2002            tr_free( tmp );
2003
2004            /* "added.f" */
2005            tmp = walk = tr_new( uint8_t, diffs.addedCount );
2006            for( i = 0; i < diffs.addedCount; ++i )
2007                *walk++ = diffs.added[i].flags;
2008            assert( ( walk - tmp ) == diffs.addedCount );
2009            tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2010            tr_free( tmp );
2011
2012            /* "dropped" */
2013            tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2014            for( i = 0; i < diffs.droppedCount; ++i ) {
2015                memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
2016                memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2017            }
2018            assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2019            tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2020            tr_free( tmp );
2021
2022            /* write the pex message */
2023            benc = tr_bencSave( &val, &bencLen );
2024            tr_peerIoWriteUint32( msgs->io, out, 2 * sizeof( uint8_t ) + bencLen );
2025            tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
2026            tr_peerIoWriteUint8 ( msgs->io, out, msgs->ut_pex_id );
2027            tr_peerIoWriteBytes ( msgs->io, out, benc, bencLen );
2028            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2029            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
2030
2031            tr_free( benc );
2032            tr_bencFree( &val );
2033        }
2034
2035        /* cleanup */
2036        tr_free( diffs.added );
2037        tr_free( diffs.dropped );
2038        tr_free( newPex );
2039
2040        msgs->clientSentPexAt = time( NULL );
2041    }
2042}
2043
2044static int
2045pexPulse( void * vpeer )
2046{
2047    sendPex( vpeer );
2048    return TRUE;
2049}
2050
2051/**
2052***
2053**/
2054
2055tr_peermsgs*
2056tr_peerMsgsNew( struct tr_torrent * torrent,
2057                struct tr_peer *    info,
2058                tr_delivery_func    func,
2059                void *              userData,
2060                tr_publisher_tag *  setme )
2061{
2062    tr_peermsgs * m;
2063
2064    assert( info );
2065    assert( info->io );
2066
2067    m = tr_new0( tr_peermsgs, 1 );
2068    m->publisher = tr_publisherNew( );
2069    m->info = info;
2070    m->session = torrent->session;
2071    m->torrent = torrent;
2072    m->io = info->io;
2073    m->info->clientIsChoked = 1;
2074    m->info->peerIsChoked = 1;
2075    m->info->clientIsInterested = 0;
2076    m->info->peerIsInterested = 0;
2077    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
2078    m->state = AWAITING_BT_LENGTH;
2079    m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL );
2080    m->outMessages = evbuffer_new( );
2081    m->outMessagesBatchedAt = 0;
2082    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2083    m->incoming.block = evbuffer_new( );
2084    m->outBlock = evbuffer_new( );
2085    m->peerAllowedPieces = NULL;
2086    m->peerAskedFor = REQUEST_LIST_INIT;
2087    m->peerAskedForFast = REQUEST_LIST_INIT;
2088    m->clientAskedFor = REQUEST_LIST_INIT;
2089    m->clientWillAskFor = REQUEST_LIST_INIT;
2090    *setme = tr_publisherSubscribe( m->publisher, func, userData );
2091
2092    if( tr_peerIoSupportsLTEP( m->io ) )
2093        sendLtepHandshake( m );
2094
2095    sendBitfield( m );
2096
2097    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of
2098                                             inactivity */
2099    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
2100    ratePulse( m );
2101
2102    return m;
2103}
2104
2105void
2106tr_peerMsgsFree( tr_peermsgs* msgs )
2107{
2108    if( msgs )
2109    {
2110        tr_timerFree( &msgs->pexTimer );
2111        tr_publisherFree( &msgs->publisher );
2112        reqListClear( &msgs->clientWillAskFor );
2113        reqListClear( &msgs->clientAskedFor );
2114        reqListClear( &msgs->peerAskedForFast );
2115        reqListClear( &msgs->peerAskedFor );
2116        tr_bitfieldFree( msgs->peerAllowedPieces );
2117        evbuffer_free( msgs->incoming.block );
2118        evbuffer_free( msgs->outMessages );
2119        evbuffer_free( msgs->outBlock );
2120        tr_free( msgs->pex );
2121
2122        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2123        tr_free( msgs );
2124    }
2125}
2126
2127void
2128tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2129                        tr_publisher_tag tag )
2130{
2131    tr_publisherUnsubscribe( peer->publisher, tag );
2132}
2133
Note: See TracBrowser for help on using the repository browser.