source: trunk/libtransmission/peer-msgs.c @ 7171

Last change on this file since 7171 was 7171, checked in by charles, 13 years ago

(libT) make peer-io's output buffer size more flexible based on the peer's speed

  • Property svn:keywords set to Date Rev Author Id
File size: 59.1 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 7171 2008-11-28 16:00:29Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <event.h>
21
22#include "transmission.h"
23#include "bencode.h"
24#include "completion.h"
25#include "crypto.h"
26#include "inout.h"
27#include "iobuf.h"
28#ifdef WIN32
29#include "net.h" /* for ECONN */
30#endif
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-mgr-private.h"
34#include "peer-msgs.h"
35#include "ratecontrol.h"
36#include "stats.h"
37#include "torrent.h"
38#include "trevent.h"
39#include "utils.h"
40
41/**
42***
43**/
44
45enum
46{
47    BT_CHOKE                = 0,
48    BT_UNCHOKE              = 1,
49    BT_INTERESTED           = 2,
50    BT_NOT_INTERESTED       = 3,
51    BT_HAVE                 = 4,
52    BT_BITFIELD             = 5,
53    BT_REQUEST              = 6,
54    BT_PIECE                = 7,
55    BT_CANCEL               = 8,
56    BT_PORT                 = 9,
57    BT_SUGGEST              = 13,
58    BT_HAVE_ALL             = 14,
59    BT_HAVE_NONE            = 15,
60    BT_REJECT               = 16,
61    BT_ALLOWED_FAST         = 17,
62    BT_LTEP                 = 20,
63
64    LTEP_HANDSHAKE          = 0,
65
66    TR_LTEP_PEX             = 1,
67
68    MIN_CHOKE_PERIOD_SEC    = ( 10 ),
69
70    /* idle seconds before we send a keepalive */
71    KEEPALIVE_INTERVAL_SECS = 100,
72
73    PEX_INTERVAL            = ( 90 * 1000 ), /* msec between sendPex() calls */
74    PEER_PULSE_INTERVAL     = ( 250 ),       /* msec between peerPulse() calls
75                                               */
76
77    MAX_QUEUE_SIZE          = ( 100 ),
78
79    /* (fast peers) max number of pieces we fast-allow to another peer */
80    MAX_FAST_ALLOWED_COUNT   = 10,
81
82    /* (fast peers) max threshold for allowing fast-pieces requests */
83    MAX_FAST_ALLOWED_THRESHOLD = 10,
84
85    /* how long an unsent request can stay queued before it's returned
86       back to the peer-mgr's pool of requests */
87    QUEUED_REQUEST_TTL_SECS = 20,
88
89    /* how long a sent request can stay queued before it's returned
90       back to the peer-mgr's pool of requests */
91    SENT_REQUEST_TTL_SECS = 240,
92
93    /* used in lowering the outMessages queue period */
94    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
95    HIGH_PRIORITY_INTERVAL_SECS = 2,
96    LOW_PRIORITY_INTERVAL_SECS = 20,
97
98    /* number of pieces to remove from the bitfield when
99     * lazy bitfields are turned on */
100    LAZY_PIECE_COUNT = 26
101};
102
103/**
104***  REQUEST MANAGEMENT
105**/
106
107enum
108{
109    AWAITING_BT_LENGTH,
110    AWAITING_BT_ID,
111    AWAITING_BT_MESSAGE,
112    AWAITING_BT_PIECE
113};
114
115struct peer_request
116{
117    uint32_t    index;
118    uint32_t    offset;
119    uint32_t    length;
120    time_t      time_requested;
121};
122
123static int
124compareRequest( const void * va,
125                const void * vb )
126{
127    const struct peer_request * a = va;
128    const struct peer_request * b = vb;
129
130    if( a->index != b->index )
131        return a->index < b->index ? -1 : 1;
132
133    if( a->offset != b->offset )
134        return a->offset < b->offset ? -1 : 1;
135
136    if( a->length != b->length )
137        return a->length < b->length ? -1 : 1;
138
139    return 0;
140}
141
142struct request_list
143{
144    uint16_t               count;
145    uint16_t               max;
146    struct peer_request *  requests;
147};
148
149static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
150
151static void
152reqListReserve( struct request_list * list,
153                uint16_t              max )
154{
155    if( list->max < max )
156    {
157        list->max = max;
158        list->requests = tr_renew( struct peer_request,
159                                   list->requests,
160                                   list->max );
161    }
162}
163
164static void
165reqListClear( struct request_list * list )
166{
167    tr_free( list->requests );
168    *list = REQUEST_LIST_INIT;
169}
170
171static void
172reqListCopy( struct request_list *       dest,
173             const struct request_list * src )
174{
175    dest->count = dest->max = src->count;
176    dest->requests =
177        tr_memdup( src->requests, dest->count * sizeof( struct peer_request ) );
178}
179
180static void
181reqListRemoveOne( struct request_list * list,
182                  int                   i )
183{
184    assert( 0 <= i && i < list->count );
185
186    memmove( &list->requests[i],
187            &list->requests[i + 1],
188            sizeof( struct peer_request ) * ( --list->count - i ) );
189}
190
191static void
192reqListAppend( struct request_list *       list,
193               const struct peer_request * req )
194{
195    if( ++list->count >= list->max )
196        reqListReserve( list, list->max + 8 );
197
198    list->requests[list->count - 1] = *req;
199}
200
201static int
202reqListPop( struct request_list * list,
203            struct peer_request * setme )
204{
205    int success;
206
207    if( !list->count )
208        success = FALSE;
209    else {
210        *setme = list->requests[0];
211        reqListRemoveOne( list, 0 );
212        success = TRUE;
213    }
214
215    return success;
216}
217
218static int
219reqListFind( struct request_list *       list,
220             const struct peer_request * key )
221{
222    uint16_t i;
223
224    for( i = 0; i < list->count; ++i )
225        if( !compareRequest( key, list->requests + i ) )
226            return i;
227
228    return -1;
229}
230
231static int
232reqListRemove( struct request_list *       list,
233               const struct peer_request * key )
234{
235    int success;
236    const int i = reqListFind( list, key );
237
238    if( i < 0 )
239        success = FALSE;
240    else {
241        reqListRemoveOne( list, i );
242        success = TRUE;
243    }
244
245    return success;
246}
247
248/**
249***
250**/
251
252/* this is raw, unchanged data from the peer regarding
253 * the current message that it's sending us. */
254struct tr_incoming
255{
256    uint8_t                id;
257    uint32_t               length; /* includes the +1 for id length */
258    struct peer_request    blockReq; /* metadata for incoming blocks */
259    struct evbuffer *      block; /* piece data for incoming blocks */
260};
261
262struct tr_peermsgs
263{
264    unsigned int    peerSentBitfield        : 1;
265    unsigned int    peerSupportsPex         : 1;
266    unsigned int    clientSentLtepHandshake : 1;
267    unsigned int    peerSentLtepHandshake   : 1;
268
269    uint8_t         state;
270    uint8_t         ut_pex_id;
271    uint16_t        pexCount;
272    uint16_t        minActiveRequests;
273    uint16_t        maxActiveRequests;
274
275    /* how long the outMessages batch should be allowed to grow before
276     * it's flushed -- some messages (like requests >:) should be sent
277     * very quickly; others aren't as urgent. */
278    int                    outMessagesBatchPeriod;
279
280    tr_peer *              info;
281
282    tr_session *           session;
283    tr_torrent *           torrent;
284    tr_peerIo *            io;
285
286    tr_publisher_t *       publisher;
287
288    struct evbuffer *      outMessages; /* all the non-piece messages */
289
290    struct request_list    peerAskedFor;
291    struct request_list    peerAskedForFast;
292    struct request_list    clientAskedFor;
293    struct request_list    clientWillAskFor;
294
295    tr_timer *             pexTimer;
296
297    time_t                 clientSentPexAt;
298    time_t                 clientSentAnythingAt;
299
300    /* when we started batching the outMessages */
301    time_t                outMessagesBatchedAt;
302
303    tr_bitfield *         peerAllowedPieces;
304
305    struct tr_incoming    incoming;
306
307    tr_pex *              pex;
308};
309
310/**
311***
312**/
313
314static void
315myDebug( const char *               file,
316         int                        line,
317         const struct tr_peermsgs * msgs,
318         const char *               fmt,
319         ... )
320{
321    FILE * fp = tr_getLog( );
322
323    if( fp )
324    {
325        va_list           args;
326        char              timestr[64];
327        struct evbuffer * buf = evbuffer_new( );
328        char *            base = tr_basename( file );
329
330        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
331                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
332                             msgs->torrent->info.name,
333                             tr_peerIoGetAddrStr( msgs->io ),
334                             msgs->info->client );
335        va_start( args, fmt );
336        evbuffer_add_vprintf( buf, fmt, args );
337        va_end( args );
338        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
339        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
340
341        tr_free( base );
342        evbuffer_free( buf );
343    }
344}
345
346#define dbgmsg( msgs, ... ) \
347    do { \
348        if( tr_deepLoggingIsActive( ) ) \
349            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
350    } while( 0 )
351
352/**
353***
354**/
355
356static void
357pokeBatchPeriod( tr_peermsgs * msgs,
358                 int           interval )
359{
360    if( msgs->outMessagesBatchPeriod > interval )
361    {
362        msgs->outMessagesBatchPeriod = interval;
363        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
364    }
365}
366
367static void
368protocolSendRequest( tr_peermsgs *               msgs,
369                     const struct peer_request * req )
370{
371    tr_peerIo *       io = msgs->io;
372    struct evbuffer * out = msgs->outMessages;
373
374    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
375    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
376    tr_peerIoWriteUint32( io, out, req->index );
377    tr_peerIoWriteUint32( io, out, req->offset );
378    tr_peerIoWriteUint32( io, out, req->length );
379    dbgmsg( msgs, "requesting %u:%u->%u... outMessage size is now %d",
380           req->index, req->offset, req->length, (int)EVBUFFER_LENGTH( out ) );
381    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
382}
383
384static void
385protocolSendCancel( tr_peermsgs *               msgs,
386                    const struct peer_request * req )
387{
388    tr_peerIo *       io = msgs->io;
389    struct evbuffer * out = msgs->outMessages;
390
391    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
392    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
393    tr_peerIoWriteUint32( io, out, req->index );
394    tr_peerIoWriteUint32( io, out, req->offset );
395    tr_peerIoWriteUint32( io, out, req->length );
396    dbgmsg( msgs, "cancelling %u:%u->%u... outMessage size is now %d",
397           req->index, req->offset, req->length, (int)EVBUFFER_LENGTH( out ) );
398    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
399}
400
401static void
402protocolSendHave( tr_peermsgs * msgs,
403                  uint32_t      index )
404{
405    tr_peerIo *       io = msgs->io;
406    struct evbuffer * out = msgs->outMessages;
407
408    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + sizeof( uint32_t ) );
409    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
410    tr_peerIoWriteUint32( io, out, index );
411    dbgmsg( msgs, "sending Have %u.. outMessage size is now %d",
412           index, (int)EVBUFFER_LENGTH( out ) );
413    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
414}
415
416static void
417protocolSendChoke( tr_peermsgs * msgs,
418                   int           choke )
419{
420    tr_peerIo *       io = msgs->io;
421    struct evbuffer * out = msgs->outMessages;
422
423    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
424    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
425    dbgmsg( msgs, "sending %s... outMessage size is now %d",
426           ( choke ? "Choke" : "Unchoke" ),
427           (int)EVBUFFER_LENGTH( out ) );
428    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
429}
430
431/**
432***  EVENTS
433**/
434
435static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0 };
436
437static void
438publish( tr_peermsgs *   msgs,
439         tr_peer_event * e )
440{
441    tr_publisherPublish( msgs->publisher, msgs->info, e );
442}
443
444static void
445fireError( tr_peermsgs * msgs,
446           int           err )
447{
448    tr_peer_event e = blankEvent;
449
450    e.eventType = TR_PEER_ERROR;
451    e.err = err;
452    publish( msgs, &e );
453}
454
455static void
456fireNeedReq( tr_peermsgs * msgs )
457{
458    tr_peer_event e = blankEvent;
459
460    e.eventType = TR_PEER_NEED_REQ;
461    publish( msgs, &e );
462}
463
464static void
465firePeerProgress( tr_peermsgs * msgs )
466{
467    tr_peer_event e = blankEvent;
468
469    e.eventType = TR_PEER_PEER_PROGRESS;
470    e.progress = msgs->info->progress;
471    publish( msgs, &e );
472}
473
474static void
475fireGotBlock( tr_peermsgs *               msgs,
476              const struct peer_request * req )
477{
478    tr_peer_event e = blankEvent;
479
480    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
481    e.pieceIndex = req->index;
482    e.offset = req->offset;
483    e.length = req->length;
484    publish( msgs, &e );
485}
486
487static void
488fireClientGotData( tr_peermsgs * msgs,
489                   uint32_t      length,
490                   int           wasPieceData )
491{
492    tr_peer_event e = blankEvent;
493
494    e.length = length;
495    e.eventType = TR_PEER_CLIENT_GOT_DATA;
496    e.wasPieceData = wasPieceData;
497    publish( msgs, &e );
498}
499
500static void
501firePeerGotData( tr_peermsgs  * msgs,
502                 uint32_t       length,
503                 int            wasPieceData )
504{
505    tr_peer_event e = blankEvent;
506
507    e.length = length;
508    e.eventType = TR_PEER_PEER_GOT_DATA;
509    e.wasPieceData = wasPieceData;
510
511    publish( msgs, &e );
512}
513
514static void
515fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
516{
517    tr_peer_event e = blankEvent;
518    e.eventType = TR_PEER_CANCEL;
519    e.pieceIndex = req->index;
520    e.offset = req->offset;
521    e.length = req->length;
522    publish( msgs, &e );
523}
524
525/**
526***  INTEREST
527**/
528
529static int
530isPieceInteresting( const tr_peermsgs * peer,
531                    tr_piece_index_t    piece )
532{
533    const tr_torrent * torrent = peer->torrent;
534
535    return ( !torrent->info.pieces[piece].dnd )                 /* we want it */
536           && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have
537                                                                        */
538           && ( tr_bitfieldHas( peer->info->have, piece ) );   /* peer has it */
539}
540
541/* "interested" means we'll ask for piece data if they unchoke us */
542static int
543isPeerInteresting( const tr_peermsgs * msgs )
544{
545    tr_piece_index_t    i;
546    const tr_torrent *  torrent;
547    const tr_bitfield * bitfield;
548    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
549
550    if( clientIsSeed )
551        return FALSE;
552
553    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
554        return FALSE;
555
556    torrent = msgs->torrent;
557    bitfield = tr_cpPieceBitfield( torrent->completion );
558
559    if( !msgs->info->have )
560        return TRUE;
561
562    assert( bitfield->byteCount == msgs->info->have->byteCount );
563
564    for( i = 0; i < torrent->info.pieceCount; ++i )
565        if( isPieceInteresting( msgs, i ) )
566            return TRUE;
567
568    return FALSE;
569}
570
571static void
572sendInterest( tr_peermsgs * msgs,
573              int           weAreInterested )
574{
575    struct evbuffer * out = msgs->outMessages;
576
577    assert( msgs );
578    assert( weAreInterested == 0 || weAreInterested == 1 );
579
580    msgs->info->clientIsInterested = weAreInterested;
581    dbgmsg( msgs, "Sending %s",
582            weAreInterested ? "Interested" : "Not Interested" );
583    tr_peerIoWriteUint32( msgs->io, out, sizeof( uint8_t ) );
584    tr_peerIoWriteUint8 (
585        msgs->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
586    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
587    dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH( out ) );
588}
589
590static void
591updateInterest( tr_peermsgs * msgs )
592{
593    const int i = isPeerInteresting( msgs );
594
595    if( i != msgs->info->clientIsInterested )
596        sendInterest( msgs, i );
597    if( i )
598        fireNeedReq( msgs );
599}
600
601static void
602cancelAllRequestsToClientExceptFast( tr_peermsgs * msgs )
603{
604    reqListClear( &msgs->peerAskedFor );
605}
606
607void
608tr_peerMsgsSetChoke( tr_peermsgs * msgs,
609                     int           choke )
610{
611    const time_t now = time( NULL );
612    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
613
614    assert( msgs );
615    assert( msgs->info );
616    assert( choke == 0 || choke == 1 );
617
618    if( msgs->info->chokeChangedAt > fibrillationTime )
619    {
620        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation",
621                choke );
622    }
623    else if( msgs->info->peerIsChoked != choke )
624    {
625        msgs->info->peerIsChoked = choke;
626        if( choke )
627            cancelAllRequestsToClientExceptFast( msgs );
628        protocolSendChoke( msgs, choke );
629        msgs->info->chokeChangedAt = now;
630    }
631}
632
633/**
634***
635**/
636
637void
638tr_peerMsgsHave( tr_peermsgs * msgs,
639                 uint32_t      index )
640{
641    protocolSendHave( msgs, index );
642
643    /* since we have more pieces now, we might not be interested in this peer */
644    updateInterest( msgs );
645}
646
647#if 0
648static void
649sendFastSuggest( tr_peermsgs * msgs,
650                 uint32_t      pieceIndex )
651{
652    assert( msgs );
653
654    if( tr_peerIoSupportsFEXT( msgs->io ) )
655    {
656        tr_peerIoWriteUint32( msgs->io, msgs->outMessages,
657                             sizeof( uint8_t ) + sizeof( uint32_t ) );
658        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
659        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
660    }
661}
662
663static void
664sendFastHave( tr_peermsgs * msgs,
665              int           all )
666{
667    assert( msgs );
668
669    if( tr_peerIoSupportsFEXT( msgs->io ) )
670    {
671        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof( uint8_t ) );
672        tr_peerIoWriteUint8( msgs->io, msgs->outMessages,
673                            ( all ? BT_HAVE_ALL
674                              : BT_HAVE_NONE ) );
675        updateInterest( msgs );
676    }
677}
678
679#endif
680
681static void
682sendFastReject( tr_peermsgs * msgs,
683                uint32_t      pieceIndex,
684                uint32_t      offset,
685                uint32_t      length )
686{
687    assert( msgs );
688
689    if( tr_peerIoSupportsFEXT( msgs->io ) )
690    {
691        struct evbuffer * out = msgs->outMessages;
692        const uint32_t    len = sizeof( uint8_t ) + 3 * sizeof( uint32_t );
693        dbgmsg( msgs, "sending fast reject %u:%u->%u", pieceIndex, offset,
694                length );
695        tr_peerIoWriteUint32( msgs->io, out, len );
696        tr_peerIoWriteUint8( msgs->io, out, BT_REJECT );
697        tr_peerIoWriteUint32( msgs->io, out, pieceIndex );
698        tr_peerIoWriteUint32( msgs->io, out, offset );
699        tr_peerIoWriteUint32( msgs->io, out, length );
700        pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
701        dbgmsg( msgs, "outMessage size is now %d",
702               (int)EVBUFFER_LENGTH( out ) );
703    }
704}
705
706static tr_bitfield*
707getPeerAllowedPieces( tr_peermsgs * msgs )
708{
709    if( !msgs->peerAllowedPieces && tr_peerIoSupportsFEXT( msgs->io ) )
710    {
711        msgs->peerAllowedPieces = tr_peerMgrGenerateAllowedSet(
712            MAX_FAST_ALLOWED_COUNT,
713            msgs->torrent->info.pieceCount,
714            msgs->torrent->info.hash,
715            tr_peerIoGetAddress( msgs->io, NULL ) );
716    }
717
718    return msgs->peerAllowedPieces;
719}
720
721static void
722sendFastAllowed( tr_peermsgs * msgs,
723                 uint32_t      pieceIndex )
724{
725    assert( msgs );
726
727    if( tr_peerIoSupportsFEXT( msgs->io ) )
728    {
729        struct evbuffer * out = msgs->outMessages;
730        dbgmsg( msgs, "sending fast allowed" );
731        tr_peerIoWriteUint32( msgs->io, out,  sizeof( uint8_t ) +
732                             sizeof( uint32_t ) );
733        tr_peerIoWriteUint8( msgs->io, out, BT_ALLOWED_FAST );
734        tr_peerIoWriteUint32( msgs->io, out, pieceIndex );
735        pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
736        dbgmsg( msgs, "outMessage size is now %d",
737               (int)EVBUFFER_LENGTH( out ) );
738    }
739}
740
741static void
742sendFastAllowedSet( tr_peermsgs * msgs )
743{
744    tr_piece_index_t i = 0;
745
746    while( i <= msgs->torrent->info.pieceCount )
747    {
748        if( tr_bitfieldHas( getPeerAllowedPieces( msgs ), i ) )
749            sendFastAllowed( msgs, i );
750        i++;
751    }
752}
753
754static void
755maybeSendFastAllowedSet( tr_peermsgs * msgs )
756{
757    if( tr_bitfieldCountTrueBits( msgs->info->have ) <=
758        MAX_FAST_ALLOWED_THRESHOLD )
759        sendFastAllowedSet( msgs );
760}
761
762/**
763***
764**/
765
766static int
767reqIsValid( const tr_peermsgs * peer,
768            uint32_t            index,
769            uint32_t            offset,
770            uint32_t            length )
771{
772    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
773}
774
775static int
776requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
777{
778    return reqIsValid( msgs, req->index, req->offset, req->length );
779}
780
781static void
782expireOldRequests( tr_peermsgs * msgs, const time_t now  )
783{
784    int                 i;
785    time_t              oldestAllowed;
786    struct request_list tmp = REQUEST_LIST_INIT;
787
788    /* cancel requests that have been queued for too long */
789    oldestAllowed = now - QUEUED_REQUEST_TTL_SECS;
790    reqListCopy( &tmp, &msgs->clientWillAskFor );
791    for( i = 0; i < tmp.count; ++i )
792    {
793        const struct peer_request * req = &tmp.requests[i];
794        if( req->time_requested < oldestAllowed )
795            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
796    }
797    reqListClear( &tmp );
798
799    /* cancel requests that were sent too long ago */
800    oldestAllowed = now - SENT_REQUEST_TTL_SECS;
801    reqListCopy( &tmp, &msgs->clientAskedFor );
802    for( i = 0; i < tmp.count; ++i )
803    {
804        const struct peer_request * req = &tmp.requests[i];
805        if( req->time_requested < oldestAllowed )
806            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
807    }
808    reqListClear( &tmp );
809}
810
811static void
812pumpRequestQueue( tr_peermsgs * msgs, const time_t now )
813{
814    const int           max = msgs->maxActiveRequests;
815    const int           min = msgs->minActiveRequests;
816    int                 sent = 0;
817    int                 count = msgs->clientAskedFor.count;
818    struct peer_request req;
819
820    if( count > min )
821        return;
822    if( msgs->info->clientIsChoked )
823        return;
824    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
825        return;
826
827    while( ( count < max ) && reqListPop( &msgs->clientWillAskFor, &req ) )
828    {
829        const tr_block_index_t block =
830            _tr_block( msgs->torrent, req.index, req.offset );
831
832        assert( requestIsValid( msgs, &req ) );
833        assert( tr_bitfieldHas( msgs->info->have, req.index ) );
834
835        /* don't ask for it if we've already got it... this block may have
836         * come in from a different peer after we cancelled a request for it */
837        if( !tr_cpBlockIsComplete( msgs->torrent->completion, block ) )
838        {
839            protocolSendRequest( msgs, &req );
840            req.time_requested = now;
841            reqListAppend( &msgs->clientAskedFor, &req );
842
843            ++count;
844            ++sent;
845        }
846    }
847
848    if( sent )
849        dbgmsg( msgs,
850                "pump sent %d requests, now have %d active and %d queued",
851                sent,
852                msgs->clientAskedFor.count,
853                msgs->clientWillAskFor.count );
854
855    if( count < max )
856        fireNeedReq( msgs );
857}
858
859static int
860requestQueueIsFull( const tr_peermsgs * msgs )
861{
862    const int req_max = msgs->maxActiveRequests;
863    return msgs->clientWillAskFor.count >= req_max;
864}
865
866tr_addreq_t
867tr_peerMsgsAddRequest( tr_peermsgs *    msgs,
868                       uint32_t         index,
869                       uint32_t         offset,
870                       uint32_t         length )
871{
872    struct peer_request req;
873
874    assert( msgs );
875    assert( msgs->torrent );
876    assert( reqIsValid( msgs, index, offset, length ) );
877
878    /**
879    ***  Reasons to decline the request
880    **/
881
882    /* don't send requests to choked clients */
883    if( msgs->info->clientIsChoked )
884    {
885        dbgmsg( msgs, "declining request because they're choking us" );
886        return TR_ADDREQ_CLIENT_CHOKED;
887    }
888
889    /* peer doesn't have this piece */
890    if( !tr_bitfieldHas( msgs->info->have, index ) )
891        return TR_ADDREQ_MISSING;
892
893    /* peer's queue is full */
894    if( requestQueueIsFull( msgs ) ) {
895        dbgmsg( msgs, "declining request because we're full" );
896        return TR_ADDREQ_FULL;
897    }
898
899    /* have we already asked for this piece? */
900    req.index = index;
901    req.offset = offset;
902    req.length = length;
903    if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
904        dbgmsg( msgs, "declining because it's a duplicate" );
905        return TR_ADDREQ_DUPLICATE;
906    }
907    if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
908        dbgmsg( msgs, "declining because it's a duplicate" );
909        return TR_ADDREQ_DUPLICATE;
910    }
911
912    /**
913    ***  Accept this request
914    **/
915
916    dbgmsg( msgs, "adding req for %"PRIu32":%"PRIu32"->%"PRIu32" to our `will request' list",
917            index, offset, length );
918    req.time_requested = time( NULL );
919    reqListAppend( &msgs->clientWillAskFor, &req );
920    return TR_ADDREQ_OK;
921}
922
923static void
924cancelAllRequestsToPeer( tr_peermsgs * msgs )
925{
926    int                 i;
927    struct request_list a = msgs->clientWillAskFor;
928    struct request_list b = msgs->clientAskedFor;
929    dbgmsg( msgs, "cancelling all requests to peer" );
930
931    msgs->clientAskedFor = REQUEST_LIST_INIT;
932    msgs->clientWillAskFor = REQUEST_LIST_INIT;
933
934    for( i=0; i<a.count; ++i )
935        fireCancelledReq( msgs, &a.requests[i] );
936
937    for( i = 0; i < b.count; ++i ) {
938        fireCancelledReq( msgs, &b.requests[i] );
939        protocolSendCancel( msgs, &b.requests[i] );
940    }
941
942    reqListClear( &a );
943    reqListClear( &b );
944}
945
946void
947tr_peerMsgsCancel( tr_peermsgs * msgs,
948                   uint32_t      pieceIndex,
949                   uint32_t      offset,
950                   uint32_t      length )
951{
952    struct peer_request req;
953
954    assert( msgs != NULL );
955    assert( length > 0 );
956
957
958    /* have we asked the peer for this piece? */
959    req.index = pieceIndex;
960    req.offset = offset;
961    req.length = length;
962
963    /* if it's only in the queue and hasn't been sent yet, free it */
964    if( reqListRemove( &msgs->clientWillAskFor, &req ) ) {
965        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32"\n", pieceIndex, offset, length );
966        fireCancelledReq( msgs, &req );
967    }
968
969    /* if it's already been sent, send a cancel message too */
970    if( reqListRemove( &msgs->clientAskedFor, &req ) ) {
971        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32"\n", pieceIndex, offset, length );
972        protocolSendCancel( msgs, &req );
973        fireCancelledReq( msgs, &req );
974    }
975}
976
977/**
978***
979**/
980
981static void
982sendLtepHandshake( tr_peermsgs * msgs )
983{
984    tr_benc           val, *m;
985    char *            buf;
986    int               len;
987    int               pex;
988    struct evbuffer * out = msgs->outMessages;
989
990    if( msgs->clientSentLtepHandshake )
991        return;
992
993    dbgmsg( msgs, "sending an ltep handshake" );
994    msgs->clientSentLtepHandshake = 1;
995
996    /* decide if we want to advertise pex support */
997    if( !tr_torrentAllowsPex( msgs->torrent ) )
998        pex = 0;
999    else if( msgs->peerSentLtepHandshake )
1000        pex = msgs->peerSupportsPex ? 1 : 0;
1001    else
1002        pex = 1;
1003
1004    tr_bencInitDict( &val, 4 );
1005    tr_bencDictAddInt( &val, "e",
1006                       msgs->session->encryptionMode != TR_CLEAR_PREFERRED );
1007    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->session ) );
1008    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
1009    m  = tr_bencDictAddDict( &val, "m", 1 );
1010    if( pex )
1011        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
1012    buf = tr_bencSave( &val, &len );
1013
1014    tr_peerIoWriteUint32( msgs->io, out, 2 * sizeof( uint8_t ) + len );
1015    tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
1016    tr_peerIoWriteUint8 ( msgs->io, out, LTEP_HANDSHAKE );
1017    tr_peerIoWriteBytes ( msgs->io, out, buf, len );
1018    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1019    dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH( out ) );
1020
1021    /* cleanup */
1022    tr_bencFree( &val );
1023    tr_free( buf );
1024}
1025
1026static void
1027parseLtepHandshake( tr_peermsgs *     msgs,
1028                    int               len,
1029                    struct evbuffer * inbuf )
1030{
1031    int64_t   i;
1032    tr_benc   val, * sub;
1033    uint8_t * tmp = tr_new( uint8_t, len );
1034
1035    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
1036    msgs->peerSentLtepHandshake = 1;
1037
1038    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type != TYPE_DICT )
1039    {
1040        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
1041        tr_free( tmp );
1042        return;
1043    }
1044
1045    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
1046
1047    /* does the peer prefer encrypted connections? */
1048    if( tr_bencDictFindInt( &val, "e", &i ) )
1049        msgs->info->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1050                                            : ENCRYPTION_PREFERENCE_NO;
1051
1052    /* check supported messages for utorrent pex */
1053    msgs->peerSupportsPex = 0;
1054    if( tr_bencDictFindDict( &val, "m", &sub ) )
1055    {
1056        if( tr_bencDictFindInt( sub, "ut_pex", &i ) )
1057        {
1058            msgs->ut_pex_id = (uint8_t) i;
1059            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
1060            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
1061        }
1062    }
1063
1064    /* get peer's listening port */
1065    if( tr_bencDictFindInt( &val, "p", &i ) )
1066    {
1067        msgs->info->port = htons( (uint16_t)i );
1068        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
1069    }
1070
1071    tr_bencFree( &val );
1072    tr_free( tmp );
1073}
1074
1075static void
1076parseUtPex( tr_peermsgs *     msgs,
1077            int               msglen,
1078            struct evbuffer * inbuf )
1079{
1080    int                loaded = 0;
1081    uint8_t *          tmp = tr_new( uint8_t, msglen );
1082    tr_benc            val;
1083    const tr_torrent * tor = msgs->torrent;
1084    const uint8_t *    added;
1085    size_t             added_len;
1086
1087    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
1088
1089    if( tr_torrentAllowsPex( tor )
1090      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) )
1091      && tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1092    {
1093        const uint8_t * added_f = NULL;
1094        tr_pex *        pex;
1095        size_t          i, n;
1096        size_t          added_f_len = 0;
1097        tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1098        pex =
1099            tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len,
1100                                    &n );
1101        for( i = 0; i < n; ++i )
1102            tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1103                              TR_PEER_FROM_PEX, pex + i );
1104        tr_free( pex );
1105    }
1106
1107    if( loaded )
1108        tr_bencFree( &val );
1109    tr_free( tmp );
1110}
1111
1112static void sendPex( tr_peermsgs * msgs );
1113
1114static void
1115parseLtep( tr_peermsgs *     msgs,
1116           int               msglen,
1117           struct evbuffer * inbuf )
1118{
1119    uint8_t ltep_msgid;
1120
1121    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
1122    msglen--;
1123
1124    if( ltep_msgid == LTEP_HANDSHAKE )
1125    {
1126        dbgmsg( msgs, "got ltep handshake" );
1127        parseLtepHandshake( msgs, msglen, inbuf );
1128        if( tr_peerIoSupportsLTEP( msgs->io ) )
1129        {
1130            sendLtepHandshake( msgs );
1131            sendPex( msgs );
1132        }
1133    }
1134    else if( ltep_msgid == TR_LTEP_PEX )
1135    {
1136        dbgmsg( msgs, "got ut pex" );
1137        msgs->peerSupportsPex = 1;
1138        parseUtPex( msgs, msglen, inbuf );
1139    }
1140    else
1141    {
1142        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1143        evbuffer_drain( inbuf, msglen );
1144    }
1145}
1146
1147static int
1148readBtLength( tr_peermsgs *     msgs,
1149              struct evbuffer * inbuf,
1150              size_t            inlen )
1151{
1152    uint32_t len;
1153
1154    if( inlen < sizeof( len ) )
1155        return READ_LATER;
1156
1157    tr_peerIoReadUint32( msgs->io, inbuf, &len );
1158
1159    if( len == 0 ) /* peer sent us a keepalive message */
1160        dbgmsg( msgs, "got KeepAlive" );
1161    else
1162    {
1163        msgs->incoming.length = len;
1164        msgs->state = AWAITING_BT_ID;
1165    }
1166
1167    return READ_NOW;
1168}
1169
1170static int readBtMessage( tr_peermsgs *     msgs,
1171                          struct evbuffer * inbuf,
1172                          size_t            inlen );
1173
1174static int
1175readBtId( tr_peermsgs *     msgs,
1176          struct evbuffer * inbuf,
1177          size_t            inlen )
1178{
1179    uint8_t id;
1180
1181    if( inlen < sizeof( uint8_t ) )
1182        return READ_LATER;
1183
1184    tr_peerIoReadUint8( msgs->io, inbuf, &id );
1185    msgs->incoming.id = id;
1186
1187    if( id == BT_PIECE )
1188    {
1189        msgs->state = AWAITING_BT_PIECE;
1190        return READ_NOW;
1191    }
1192    else if( msgs->incoming.length != 1 )
1193    {
1194        msgs->state = AWAITING_BT_MESSAGE;
1195        return READ_NOW;
1196    }
1197    else return readBtMessage( msgs, inbuf, inlen - 1 );
1198}
1199
1200static void
1201updatePeerProgress( tr_peermsgs * msgs )
1202{
1203    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have )
1204                           / (float)msgs->torrent->info.pieceCount;
1205    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
1206    updateInterest( msgs );
1207    firePeerProgress( msgs );
1208}
1209
1210static int
1211clientCanSendFastBlock( const tr_peermsgs * msgs UNUSED )
1212{
1213    /* don't send a fast piece if peer has MAX_FAST_ALLOWED_THRESHOLD pieces */
1214    if( tr_bitfieldCountTrueBits( msgs->info->have ) >
1215        MAX_FAST_ALLOWED_THRESHOLD )
1216        return FALSE;
1217
1218    /* ...or if we don't have ourself enough pieces */
1219    if( tr_bitfieldCountTrueBits( tr_cpPieceBitfield( msgs->torrent->
1220                                                      completion ) ) <
1221        MAX_FAST_ALLOWED_THRESHOLD )
1222        return FALSE;
1223
1224    /* Maybe a bandwidth limit ? */
1225    return TRUE;
1226}
1227
1228static void
1229peerMadeRequest( tr_peermsgs *               msgs,
1230                 const struct peer_request * req )
1231{
1232    const int reqIsValid = requestIsValid( msgs, req );
1233    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete(
1234        msgs->torrent->completion, req->index );
1235    const int peerIsChoked = msgs->info->peerIsChoked;
1236    const int peerIsFast = tr_peerIoSupportsFEXT( msgs->io );
1237    const int pieceIsFast = reqIsValid && tr_bitfieldHas(
1238        getPeerAllowedPieces( msgs ), req->index );
1239    const int canSendFast = clientCanSendFastBlock( msgs );
1240
1241    if( !reqIsValid ) /* bad request */
1242    {
1243        dbgmsg( msgs, "rejecting an invalid request." );
1244        sendFastReject( msgs, req->index, req->offset, req->length );
1245    }
1246    else if( !clientHasPiece ) /* we don't have it */
1247    {
1248        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1249        sendFastReject( msgs, req->index, req->offset, req->length );
1250    }
1251    else if( peerIsChoked && !peerIsFast ) /* doesn't he know he's choked? */
1252    {
1253        tr_peerMsgsSetChoke( msgs, 1 );
1254        sendFastReject( msgs, req->index, req->offset, req->length );
1255    }
1256    else if( peerIsChoked && peerIsFast && ( !pieceIsFast || !canSendFast ) )
1257    {
1258        sendFastReject( msgs, req->index, req->offset, req->length );
1259    }
1260    else /* YAY */
1261    {
1262        if( peerIsFast && pieceIsFast )
1263            reqListAppend( &msgs->peerAskedForFast, req );
1264        else
1265            reqListAppend( &msgs->peerAskedFor, req );
1266    }
1267}
1268
1269static int
1270messageLengthIsCorrect( const tr_peermsgs * msg,
1271                        uint8_t             id,
1272                        uint32_t            len )
1273{
1274    switch( id )
1275    {
1276        case BT_CHOKE:
1277        case BT_UNCHOKE:
1278        case BT_INTERESTED:
1279        case BT_NOT_INTERESTED:
1280        case BT_HAVE_ALL:
1281        case BT_HAVE_NONE:
1282            return len == 1;
1283
1284        case BT_HAVE:
1285        case BT_SUGGEST:
1286        case BT_ALLOWED_FAST:
1287            return len == 5;
1288
1289        case BT_BITFIELD:
1290            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1291
1292        case BT_REQUEST:
1293        case BT_CANCEL:
1294        case BT_REJECT:
1295            return len == 13;
1296
1297        case BT_PIECE:
1298            return len > 9 && len <= 16393;
1299
1300        case BT_PORT:
1301            return len == 3;
1302
1303        case BT_LTEP:
1304            return len >= 2;
1305
1306        default:
1307            return FALSE;
1308    }
1309}
1310
1311static int clientGotBlock( tr_peermsgs *               msgs,
1312                           const uint8_t *             block,
1313                           const struct peer_request * req );
1314
1315static int
1316readBtPiece( tr_peermsgs      * msgs,
1317             struct evbuffer  * inbuf,
1318             size_t             inlen,
1319             size_t           * setme_piece_bytes_read )
1320{
1321    struct peer_request * req = &msgs->incoming.blockReq;
1322
1323    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1324    dbgmsg( msgs, "In readBtPiece" );
1325
1326    if( !req->length )
1327    {
1328        if( inlen < 8 )
1329            return READ_LATER;
1330
1331        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
1332        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
1333        req->length = msgs->incoming.length - 9;
1334        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index,
1335                req->offset,
1336                req->length );
1337        return READ_NOW;
1338    }
1339    else
1340    {
1341        int          err;
1342
1343        /* read in another chunk of data */
1344        const size_t nLeft = req->length - EVBUFFER_LENGTH(
1345            msgs->incoming.block );
1346        size_t       n = MIN( nLeft, inlen );
1347        uint8_t *    buf = tr_new( uint8_t, n );
1348        assert( EVBUFFER_LENGTH( inbuf ) >= n );
1349        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
1350        evbuffer_add( msgs->incoming.block, buf, n );
1351        fireClientGotData( msgs, n, TRUE );
1352        *setme_piece_bytes_read += n;
1353        tr_free( buf );
1354        dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
1355               (int)n, req->index, req->offset, req->length,
1356               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1357        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1358            return READ_LATER;
1359
1360        /* we've got the whole block ... process it */
1361        err = clientGotBlock( msgs, EVBUFFER_DATA(
1362                                  msgs->incoming.block ), req );
1363
1364        /* cleanup */
1365        evbuffer_drain( msgs->incoming.block,
1366                       EVBUFFER_LENGTH( msgs->incoming.block ) );
1367        req->length = 0;
1368        msgs->state = AWAITING_BT_LENGTH;
1369        if( !err )
1370            return READ_NOW;
1371        else
1372        {
1373            fireError( msgs, err );
1374            return READ_ERR;
1375        }
1376    }
1377}
1378
1379static int
1380readBtMessage( tr_peermsgs *     msgs,
1381               struct evbuffer * inbuf,
1382               size_t            inlen )
1383{
1384    uint32_t      ui32;
1385    uint32_t      msglen = msgs->incoming.length;
1386    const uint8_t id = msgs->incoming.id;
1387    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1388
1389    --msglen; /* id length */
1390
1391    if( inlen < msglen )
1392        return READ_LATER;
1393
1394    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id,
1395            (int)msglen,
1396            (int)inlen );
1397
1398    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1399    {
1400        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d",
1401                (int)id, (int)msglen );
1402        fireError( msgs, EMSGSIZE );
1403        return READ_ERR;
1404    }
1405
1406    switch( id )
1407    {
1408        case BT_CHOKE:
1409            dbgmsg( msgs, "got Choke" );
1410            msgs->info->clientIsChoked = 1;
1411            cancelAllRequestsToPeer( msgs );
1412            cancelAllRequestsToClientExceptFast( msgs );
1413            break;
1414
1415        case BT_UNCHOKE:
1416            dbgmsg( msgs, "got Unchoke" );
1417            msgs->info->clientIsChoked = 0;
1418            fireNeedReq( msgs );
1419            break;
1420
1421        case BT_INTERESTED:
1422            dbgmsg( msgs, "got Interested" );
1423            msgs->info->peerIsInterested = 1;
1424            break;
1425
1426        case BT_NOT_INTERESTED:
1427            dbgmsg( msgs, "got Not Interested" );
1428            msgs->info->peerIsInterested = 0;
1429            break;
1430
1431        case BT_HAVE:
1432            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1433            dbgmsg( msgs, "got Have: %u", ui32 );
1434            if( tr_bitfieldAdd( msgs->info->have, ui32 ) )
1435                fireError( msgs, ERANGE );
1436            updatePeerProgress( msgs );
1437            tr_rcTransferred( msgs->torrent->swarmSpeed,
1438                              msgs->torrent->info.pieceSize );
1439            break;
1440
1441        case BT_BITFIELD:
1442        {
1443            dbgmsg( msgs, "got a bitfield" );
1444            msgs->peerSentBitfield = 1;
1445            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits,
1446                                msglen );
1447            updatePeerProgress( msgs );
1448            maybeSendFastAllowedSet( msgs );
1449            fireNeedReq( msgs );
1450            break;
1451        }
1452
1453        case BT_REQUEST:
1454        {
1455            struct peer_request r;
1456            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1457            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1458            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1459            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset,
1460                    r.length );
1461            peerMadeRequest( msgs, &r );
1462            break;
1463        }
1464
1465        case BT_CANCEL:
1466        {
1467            struct peer_request r;
1468            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1469            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1470            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1471            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset,
1472                    r.length );
1473            reqListRemove( &msgs->peerAskedForFast, &r );
1474            reqListRemove( &msgs->peerAskedFor, &r );
1475            break;
1476        }
1477
1478        case BT_PIECE:
1479            assert( 0 ); /* handled elsewhere! */
1480            break;
1481
1482        case BT_PORT:
1483            dbgmsg( msgs, "Got a BT_PORT" );
1484            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1485            break;
1486
1487        case BT_SUGGEST:
1488        {
1489            dbgmsg( msgs, "Got a BT_SUGGEST" );
1490            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1491            /* we don't do anything with this yet */
1492            break;
1493        }
1494
1495        case BT_HAVE_ALL:
1496            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1497            tr_bitfieldAddRange( msgs->info->have, 0,
1498                                 msgs->torrent->info.pieceCount );
1499            updatePeerProgress( msgs );
1500            maybeSendFastAllowedSet( msgs );
1501            break;
1502
1503
1504        case BT_HAVE_NONE:
1505            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1506            tr_bitfieldClear( msgs->info->have );
1507            updatePeerProgress( msgs );
1508            maybeSendFastAllowedSet( msgs );
1509            break;
1510
1511        case BT_REJECT:
1512        {
1513            struct peer_request r;
1514            dbgmsg( msgs, "Got a BT_REJECT" );
1515            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1516            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1517            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1518            reqListRemove( &msgs->clientAskedFor, &r );
1519            break;
1520        }
1521
1522        case BT_ALLOWED_FAST:
1523        {
1524            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1525            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1526            /* we don't do anything with this yet */
1527            break;
1528        }
1529
1530        case BT_LTEP:
1531            dbgmsg( msgs, "Got a BT_LTEP" );
1532            parseLtep( msgs, msglen, inbuf );
1533            break;
1534
1535        default:
1536            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1537            tr_peerIoDrain( msgs->io, inbuf, msglen );
1538            break;
1539    }
1540
1541    assert( msglen + 1 == msgs->incoming.length );
1542    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1543
1544    msgs->state = AWAITING_BT_LENGTH;
1545    return READ_NOW;
1546}
1547
1548static void
1549decrementDownloadedCount( tr_peermsgs * msgs,
1550                          uint32_t      byteCount )
1551{
1552    tr_torrent * tor = msgs->torrent;
1553
1554    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1555}
1556
1557static void
1558clientGotUnwantedBlock( tr_peermsgs *               msgs,
1559                        const struct peer_request * req )
1560{
1561    decrementDownloadedCount( msgs, req->length );
1562}
1563
1564static void
1565addPeerToBlamefield( tr_peermsgs * msgs,
1566                     uint32_t      index )
1567{
1568    if( !msgs->info->blame )
1569        msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1570    tr_bitfieldAdd( msgs->info->blame, index );
1571}
1572
1573/* returns 0 on success, or an errno on failure */
1574static int
1575clientGotBlock( tr_peermsgs *               msgs,
1576                const uint8_t *             data,
1577                const struct peer_request * req )
1578{
1579    int                    err;
1580    tr_torrent *           tor = msgs->torrent;
1581    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1582
1583    assert( msgs );
1584    assert( req );
1585
1586    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) )
1587    {
1588        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1589                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1590        return EMSGSIZE;
1591    }
1592
1593    /* save the block */
1594    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset,
1595            req->length );
1596
1597    /**
1598    *** Remove the block from our `we asked for this' list
1599    **/
1600
1601    if( !reqListRemove( &msgs->clientAskedFor, req ) )
1602    {
1603        clientGotUnwantedBlock( msgs, req );
1604        dbgmsg( msgs, "we didn't ask for this message..." );
1605        return 0;
1606    }
1607
1608    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1609            msgs->clientAskedFor.count );
1610
1611    /**
1612    *** Error checks
1613    **/
1614
1615    if( tr_cpBlockIsComplete( tor->completion, block ) )
1616    {
1617        dbgmsg( msgs, "we have this block already..." );
1618        clientGotUnwantedBlock( msgs, req );
1619        return 0;
1620    }
1621
1622    /**
1623    ***  Save the block
1624    **/
1625
1626    msgs->info->peerSentPieceDataAt = time( NULL );
1627    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1628        return err;
1629
1630    addPeerToBlamefield( msgs, req->index );
1631    fireGotBlock( msgs, req );
1632    return 0;
1633}
1634
1635static int peerPulse( void * vmsgs );
1636
1637static void
1638didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1639{
1640    tr_peermsgs * msgs = vmsgs;
1641    firePeerGotData( msgs, bytesWritten, wasPieceData );
1642    peerPulse( msgs );
1643}
1644
1645static ReadState
1646canRead( struct tr_iobuf * iobuf, void * vmsgs, size_t * piece )
1647{
1648    ReadState         ret;
1649    tr_peermsgs *     msgs = vmsgs;
1650    struct evbuffer * in = tr_iobuf_input( iobuf );
1651    const size_t      inlen = EVBUFFER_LENGTH( in );
1652
1653    if( !inlen )
1654    {
1655        ret = READ_LATER;
1656    }
1657    else if( msgs->state == AWAITING_BT_PIECE )
1658    {
1659        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1660    }
1661    else switch( msgs->state )
1662    {
1663        case AWAITING_BT_LENGTH:
1664            ret = readBtLength ( msgs, in, inlen ); break;
1665
1666        case AWAITING_BT_ID:
1667            ret = readBtId     ( msgs, in, inlen ); break;
1668
1669        case AWAITING_BT_MESSAGE:
1670            ret = readBtMessage( msgs, in, inlen ); break;
1671
1672        default:
1673            assert( 0 );
1674    }
1675
1676    /* log the raw data that was read */
1677    if( EVBUFFER_LENGTH( in ) != inlen )
1678        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1679
1680    return ret;
1681}
1682
1683/**
1684***
1685**/
1686
1687static int
1688ratePulse( void * vpeer )
1689{
1690    tr_peermsgs * peer = vpeer;
1691    const double rateToClient = tr_peerGetPieceSpeed( peer->info,
1692                                                      TR_PEER_TO_CLIENT );
1693    const int estimatedBlocksInNext30Seconds =
1694                  ( rateToClient * 30 * 1024 ) / peer->torrent->blockSize;
1695
1696    peer->minActiveRequests = 8;
1697    peer->maxActiveRequests = peer->minActiveRequests + estimatedBlocksInNext30Seconds;
1698    return TRUE;
1699}
1700
1701static int
1702popNextRequest( tr_peermsgs *         msgs,
1703                struct peer_request * setme )
1704{
1705    return reqListPop( &msgs->peerAskedForFast, setme )
1706        || reqListPop( &msgs->peerAskedFor, setme );
1707}
1708
1709static size_t
1710fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1711{
1712    size_t bytesWritten = 0;
1713    struct peer_request req;
1714    const int haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1715
1716    /**
1717    ***  Protocol messages
1718    **/
1719
1720    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1721    {
1722        dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1723        msgs->outMessagesBatchedAt = now;
1724    }
1725    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1726    {
1727        const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
1728        /* flush the protocol messages */
1729        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->io, len );
1730        tr_peerIoWriteBuf( msgs->io, msgs->outMessages, FALSE );
1731        msgs->clientSentAnythingAt = now;
1732        msgs->outMessagesBatchedAt = 0;
1733        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1734        bytesWritten +=  len;
1735    }
1736
1737    /**
1738    ***  Blocks
1739    **/
1740
1741    if( ( tr_peerIoGetWriteBufferSpace( msgs->io ) >= msgs->torrent->blockSize )
1742        && popNextRequest( msgs, &req )
1743        && requestIsValid( msgs, &req )
1744        && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) )
1745    {
1746        /* send a block */
1747        uint8_t * buf = tr_new( uint8_t, req.length );
1748        const int err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf );
1749        if( err ) {
1750            fireError( msgs, err );
1751        } else {
1752            tr_peerIo * io = msgs->io;
1753            struct evbuffer * out = evbuffer_new( );
1754            dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1755            tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1756            tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1757            tr_peerIoWriteUint32( io, out, req.index );
1758            tr_peerIoWriteUint32( io, out, req.offset );
1759            tr_peerIoWriteBytes ( io, out, buf, req.length );
1760            tr_peerIoWriteBuf( io, out, TRUE );
1761            bytesWritten += EVBUFFER_LENGTH( out );
1762            evbuffer_free( out );
1763            msgs->clientSentAnythingAt = now;
1764        }
1765        tr_free( buf );
1766    }
1767
1768    /**
1769    ***  Keepalive
1770    **/
1771
1772    if( msgs->clientSentAnythingAt
1773        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1774    {
1775        dbgmsg( msgs, "sending a keepalive message" );
1776        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1777        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1778    }
1779
1780    return bytesWritten;
1781}
1782
1783static int
1784peerPulse( void * vmsgs )
1785{
1786    tr_peermsgs * msgs = vmsgs;
1787    const time_t  now = time( NULL );
1788
1789    ratePulse( msgs );
1790
1791    pumpRequestQueue( msgs, now );
1792    expireOldRequests( msgs, now );
1793
1794    for( ;; )
1795        if( fillOutputBuffer( msgs, now ) < 1 )
1796            break;
1797
1798    return TRUE; /* loop forever */
1799}
1800
1801void
1802tr_peerMsgsPulse( tr_peermsgs * msgs )
1803{
1804    if( msgs != NULL )
1805        peerPulse( msgs );
1806}
1807
1808static void
1809gotError( struct tr_iobuf  * iobuf UNUSED,
1810          short              what,
1811          void             * vmsgs )
1812{
1813    if( what & EVBUFFER_TIMEOUT )
1814        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1815    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1816        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1817               what, errno, tr_strerror( errno ) );
1818    fireError( vmsgs, ENOTCONN );
1819}
1820
1821static void
1822sendBitfield( tr_peermsgs * msgs )
1823{
1824    struct evbuffer * out = msgs->outMessages;
1825    tr_bitfield *     field;
1826    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
1827    size_t            i;
1828    size_t            lazyCount = 0;
1829
1830    field = tr_bitfieldDup( tr_cpPieceBitfield( msgs->torrent->completion ) );
1831
1832#if 0
1833    if( tr_sessionIsLazyBitfieldEnabled( msgs->session ) )
1834    {
1835        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
1836            speed over a truly random sample -- let's limit the pool size to
1837            the first 1000 pieces so large torrents don't bog things down */
1838        size_t poolSize;
1839        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
1840        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
1841
1842        /* build the pool */
1843        for( i=poolSize=0; i<maxPoolSize; ++i )
1844            if( tr_bitfieldHas( field, i ) )
1845                pool[poolSize++] = i;
1846
1847        /* pull random piece indices from the pool */
1848        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
1849        {
1850            const int pos = tr_cryptoWeakRandInt( poolSize );
1851            const tr_piece_index_t piece = pool[pos];
1852            tr_bitfieldRem( field, piece );
1853            lazyPieces[lazyCount++] = piece;
1854            pool[pos] = pool[--poolSize];
1855        }
1856
1857        /* cleanup */
1858        tr_free( pool );
1859    }
1860#endif
1861
1862    tr_peerIoWriteUint32( msgs->io, out,
1863                          sizeof( uint8_t ) + field->byteCount );
1864    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1865    tr_peerIoWriteBytes ( msgs->io, out, field->bits, field->byteCount );
1866    dbgmsg( msgs, "sending bitfield... outMessage size is now %d",
1867           (int)EVBUFFER_LENGTH( out ) );
1868    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1869
1870    for( i = 0; i < lazyCount; ++i )
1871        protocolSendHave( msgs, lazyPieces[i] );
1872
1873    tr_bitfieldFree( field );
1874}
1875
1876/**
1877***
1878**/
1879
1880/* some peers give us error messages if we send
1881   more than this many peers in a single pex message
1882   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1883#define MAX_PEX_ADDED 50
1884#define MAX_PEX_DROPPED 50
1885
1886typedef struct
1887{
1888    tr_pex *  added;
1889    tr_pex *  dropped;
1890    tr_pex *  elements;
1891    int       addedCount;
1892    int       droppedCount;
1893    int       elementCount;
1894}
1895PexDiffs;
1896
1897static void
1898pexAddedCb( void * vpex,
1899            void * userData )
1900{
1901    PexDiffs * diffs = userData;
1902    tr_pex *   pex = vpex;
1903
1904    if( diffs->addedCount < MAX_PEX_ADDED )
1905    {
1906        diffs->added[diffs->addedCount++] = *pex;
1907        diffs->elements[diffs->elementCount++] = *pex;
1908    }
1909}
1910
1911static void
1912pexDroppedCb( void * vpex,
1913              void * userData )
1914{
1915    PexDiffs * diffs = userData;
1916    tr_pex *   pex = vpex;
1917
1918    if( diffs->droppedCount < MAX_PEX_DROPPED )
1919    {
1920        diffs->dropped[diffs->droppedCount++] = *pex;
1921    }
1922}
1923
1924static void
1925pexElementCb( void * vpex,
1926              void * userData )
1927{
1928    PexDiffs * diffs = userData;
1929    tr_pex *   pex = vpex;
1930
1931    diffs->elements[diffs->elementCount++] = *pex;
1932}
1933
1934static void
1935sendPex( tr_peermsgs * msgs )
1936{
1937    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
1938    {
1939        PexDiffs diffs;
1940        tr_pex * newPex = NULL;
1941        const int newCount = tr_peerMgrGetPeers( msgs->session->peerMgr,
1942                                                 msgs->torrent->info.hash,
1943                                                 &newPex );
1944
1945        /* build the diffs */
1946        diffs.added = tr_new( tr_pex, newCount );
1947        diffs.addedCount = 0;
1948        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1949        diffs.droppedCount = 0;
1950        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1951        diffs.elementCount = 0;
1952        tr_set_compare( msgs->pex, msgs->pexCount,
1953                        newPex, newCount,
1954                        tr_pexCompare, sizeof( tr_pex ),
1955                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
1956        dbgmsg(
1957            msgs,
1958            "pex: old peer count %d, new peer count %d, added %d, removed %d",
1959            msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1960
1961        if( diffs.addedCount || diffs.droppedCount )
1962        {
1963            int  i;
1964            tr_benc val;
1965            char * benc;
1966            int bencLen;
1967            uint8_t * tmp, *walk;
1968            struct evbuffer * out = msgs->outMessages;
1969
1970            /* update peer */
1971            tr_free( msgs->pex );
1972            msgs->pex = diffs.elements;
1973            msgs->pexCount = diffs.elementCount;
1974
1975            /* build the pex payload */
1976            tr_bencInitDict( &val, 3 );
1977
1978            /* "added" */
1979            tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1980            for( i = 0; i < diffs.addedCount; ++i ) {
1981                memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1982                memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1983            }
1984            assert( ( walk - tmp ) == diffs.addedCount * 6 );
1985            tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
1986            tr_free( tmp );
1987
1988            /* "added.f" */
1989            tmp = walk = tr_new( uint8_t, diffs.addedCount );
1990            for( i = 0; i < diffs.addedCount; ++i )
1991                *walk++ = diffs.added[i].flags;
1992            assert( ( walk - tmp ) == diffs.addedCount );
1993            tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
1994            tr_free( tmp );
1995
1996            /* "dropped" */
1997            tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1998            for( i = 0; i < diffs.droppedCount; ++i ) {
1999                memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
2000                memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2001            }
2002            assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2003            tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2004            tr_free( tmp );
2005
2006            /* write the pex message */
2007            benc = tr_bencSave( &val, &bencLen );
2008            tr_peerIoWriteUint32( msgs->io, out, 2 * sizeof( uint8_t ) + bencLen );
2009            tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
2010            tr_peerIoWriteUint8 ( msgs->io, out, msgs->ut_pex_id );
2011            tr_peerIoWriteBytes ( msgs->io, out, benc, bencLen );
2012            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2013            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
2014
2015            tr_free( benc );
2016            tr_bencFree( &val );
2017        }
2018
2019        /* cleanup */
2020        tr_free( diffs.added );
2021        tr_free( diffs.dropped );
2022        tr_free( newPex );
2023
2024        msgs->clientSentPexAt = time( NULL );
2025    }
2026}
2027
2028static int
2029pexPulse( void * vpeer )
2030{
2031    sendPex( vpeer );
2032    return TRUE;
2033}
2034
2035/**
2036***
2037**/
2038
2039tr_peermsgs*
2040tr_peerMsgsNew( struct tr_torrent * torrent,
2041                struct tr_peer *    info,
2042                tr_delivery_func    func,
2043                void *              userData,
2044                tr_publisher_tag *  setme )
2045{
2046    tr_peermsgs * m;
2047
2048    assert( info );
2049    assert( info->io );
2050
2051    m = tr_new0( tr_peermsgs, 1 );
2052    m->publisher = tr_publisherNew( );
2053    m->info = info;
2054    m->session = torrent->session;
2055    m->torrent = torrent;
2056    m->io = info->io;
2057    m->info->clientIsChoked = 1;
2058    m->info->peerIsChoked = 1;
2059    m->info->clientIsInterested = 0;
2060    m->info->peerIsInterested = 0;
2061    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
2062    m->state = AWAITING_BT_LENGTH;
2063    m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL );
2064    m->outMessages = evbuffer_new( );
2065    m->outMessagesBatchedAt = 0;
2066    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2067    m->incoming.block = evbuffer_new( );
2068    m->peerAllowedPieces = NULL;
2069    m->peerAskedFor = REQUEST_LIST_INIT;
2070    m->peerAskedForFast = REQUEST_LIST_INIT;
2071    m->clientAskedFor = REQUEST_LIST_INIT;
2072    m->clientWillAskFor = REQUEST_LIST_INIT;
2073    *setme = tr_publisherSubscribe( m->publisher, func, userData );
2074
2075    if( tr_peerIoSupportsLTEP( m->io ) )
2076        sendLtepHandshake( m );
2077
2078    sendBitfield( m );
2079
2080    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of
2081                                             inactivity */
2082    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
2083    ratePulse( m );
2084
2085    return m;
2086}
2087
2088void
2089tr_peerMsgsFree( tr_peermsgs* msgs )
2090{
2091    if( msgs )
2092    {
2093        tr_timerFree( &msgs->pexTimer );
2094        tr_publisherFree( &msgs->publisher );
2095        reqListClear( &msgs->clientWillAskFor );
2096        reqListClear( &msgs->clientAskedFor );
2097        reqListClear( &msgs->peerAskedForFast );
2098        reqListClear( &msgs->peerAskedFor );
2099        tr_bitfieldFree( msgs->peerAllowedPieces );
2100        evbuffer_free( msgs->incoming.block );
2101        evbuffer_free( msgs->outMessages );
2102        tr_free( msgs->pex );
2103
2104        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2105        tr_free( msgs );
2106    }
2107}
2108
2109void
2110tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2111                        tr_publisher_tag tag )
2112{
2113    tr_publisherUnsubscribe( peer->publisher, tag );
2114}
2115
Note: See TracBrowser for help on using the repository browser.