source: trunk/libtransmission/peer-msgs.c @ 7119

Last change on this file since 7119 was 7119, checked in by charles, 12 years ago

(libT) #1474: lazy bitfields don't work quite right in 1.40

  • Property svn:keywords set to Date Rev Author Id
File size: 59.4 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 7119 2008-11-16 08:56:18Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <event.h>
21
22#include "transmission.h"
23#include "bencode.h"
24#include "completion.h"
25#include "crypto.h"
26#include "inout.h"
27#ifdef WIN32
28#include "net.h" /* for ECONN */
29#endif
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-mgr-private.h"
33#include "peer-msgs.h"
34#include "ratecontrol.h"
35#include "stats.h"
36#include "torrent.h"
37#include "trevent.h"
38#include "utils.h"
39
40/**
41***
42**/
43
44enum
45{
46    BT_CHOKE                = 0,
47    BT_UNCHOKE              = 1,
48    BT_INTERESTED           = 2,
49    BT_NOT_INTERESTED       = 3,
50    BT_HAVE                 = 4,
51    BT_BITFIELD             = 5,
52    BT_REQUEST              = 6,
53    BT_PIECE                = 7,
54    BT_CANCEL               = 8,
55    BT_PORT                 = 9,
56    BT_SUGGEST              = 13,
57    BT_HAVE_ALL             = 14,
58    BT_HAVE_NONE            = 15,
59    BT_REJECT               = 16,
60    BT_ALLOWED_FAST         = 17,
61    BT_LTEP                 = 20,
62
63    LTEP_HANDSHAKE          = 0,
64
65    TR_LTEP_PEX             = 1,
66
67    MIN_CHOKE_PERIOD_SEC    = ( 10 ),
68
69    /* idle seconds before we send a keepalive */
70    KEEPALIVE_INTERVAL_SECS = 100,
71
72    PEX_INTERVAL            = ( 90 * 1000 ), /* msec between sendPex() calls */
73    PEER_PULSE_INTERVAL     = ( 250 ),       /* msec between peerPulse() calls
74                                               */
75
76    MAX_QUEUE_SIZE          = ( 100 ),
77
78    /* (fast peers) max number of pieces we fast-allow to another peer */
79    MAX_FAST_ALLOWED_COUNT   = 10,
80
81    /* (fast peers) max threshold for allowing fast-pieces requests */
82    MAX_FAST_ALLOWED_THRESHOLD = 10,
83
84    /* how long an unsent request can stay queued before it's returned
85       back to the peer-mgr's pool of requests */
86    QUEUED_REQUEST_TTL_SECS = 20,
87
88    /* how long a sent request can stay queued before it's returned
89       back to the peer-mgr's pool of requests */
90    SENT_REQUEST_TTL_SECS = 240,
91
92    /* used in lowering the outMessages queue period */
93    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
94    HIGH_PRIORITY_INTERVAL_SECS = 2,
95    LOW_PRIORITY_INTERVAL_SECS = 20,
96
97    /* number of pieces to remove from the bitfield when
98     * lazy bitfields are turned on */
99    LAZY_PIECE_COUNT = 26
100};
101
102/**
103***  REQUEST MANAGEMENT
104**/
105
106enum
107{
108    AWAITING_BT_LENGTH,
109    AWAITING_BT_ID,
110    AWAITING_BT_MESSAGE,
111    AWAITING_BT_PIECE
112};
113
114struct peer_request
115{
116    uint32_t    index;
117    uint32_t    offset;
118    uint32_t    length;
119    time_t      time_requested;
120};
121
122static int
123compareRequest( const void * va,
124                const void * vb )
125{
126    const struct peer_request * a = va;
127    const struct peer_request * b = vb;
128
129    if( a->index != b->index )
130        return a->index < b->index ? -1 : 1;
131
132    if( a->offset != b->offset )
133        return a->offset < b->offset ? -1 : 1;
134
135    if( a->length != b->length )
136        return a->length < b->length ? -1 : 1;
137
138    return 0;
139}
140
141struct request_list
142{
143    uint16_t               count;
144    uint16_t               max;
145    struct peer_request *  requests;
146};
147
148static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
149
150static void
151reqListReserve( struct request_list * list,
152                uint16_t              max )
153{
154    if( list->max < max )
155    {
156        list->max = max;
157        list->requests = tr_renew( struct peer_request,
158                                   list->requests,
159                                   list->max );
160    }
161}
162
163static void
164reqListClear( struct request_list * list )
165{
166    tr_free( list->requests );
167    *list = REQUEST_LIST_INIT;
168}
169
170static void
171reqListCopy( struct request_list *       dest,
172             const struct request_list * src )
173{
174    dest->count = dest->max = src->count;
175    dest->requests =
176        tr_memdup( src->requests, dest->count * sizeof( struct peer_request ) );
177}
178
179static void
180reqListRemoveOne( struct request_list * list,
181                  int                   i )
182{
183    assert( 0 <= i && i < list->count );
184
185    memmove( &list->requests[i],
186            &list->requests[i + 1],
187            sizeof( struct peer_request ) * ( --list->count - i ) );
188}
189
190static void
191reqListAppend( struct request_list *       list,
192               const struct peer_request * req )
193{
194    if( ++list->count >= list->max )
195        reqListReserve( list, list->max + 8 );
196
197    list->requests[list->count - 1] = *req;
198}
199
200static int
201reqListPop( struct request_list * list,
202            struct peer_request * setme )
203{
204    int success;
205
206    if( !list->count )
207        success = FALSE;
208    else {
209        *setme = list->requests[0];
210        reqListRemoveOne( list, 0 );
211        success = TRUE;
212    }
213
214    return success;
215}
216
217static int
218reqListFind( struct request_list *       list,
219             const struct peer_request * key )
220{
221    uint16_t i;
222
223    for( i = 0; i < list->count; ++i )
224        if( !compareRequest( key, list->requests + i ) )
225            return i;
226
227    return -1;
228}
229
230static int
231reqListRemove( struct request_list *       list,
232               const struct peer_request * key )
233{
234    int success;
235    const int i = reqListFind( list, key );
236
237    if( i < 0 )
238        success = FALSE;
239    else {
240        reqListRemoveOne( list, i );
241        success = TRUE;
242    }
243
244    return success;
245}
246
247/**
248***
249**/
250
251/* this is raw, unchanged data from the peer regarding
252 * the current message that it's sending us. */
253struct tr_incoming
254{
255    uint8_t                id;
256    uint32_t               length; /* includes the +1 for id length */
257    struct peer_request    blockReq; /* metadata for incoming blocks */
258    struct evbuffer *      block; /* piece data for incoming blocks */
259};
260
261struct tr_peermsgs
262{
263    unsigned int    peerSentBitfield        : 1;
264    unsigned int    peerSupportsPex         : 1;
265    unsigned int    clientSentLtepHandshake : 1;
266    unsigned int    peerSentLtepHandshake   : 1;
267    unsigned int    sendingBlock            : 1;
268
269    uint8_t         state;
270    uint8_t         ut_pex_id;
271    uint16_t        pexCount;
272    uint16_t        minActiveRequests;
273    uint16_t        maxActiveRequests;
274
275    /* how long the outMessages batch should be allowed to grow before
276     * it's flushed -- some messages (like requests >:) should be sent
277     * very quickly; others aren't as urgent. */
278    int                    outMessagesBatchPeriod;
279
280    tr_peer *              info;
281
282    tr_session *           session;
283    tr_torrent *           torrent;
284    tr_peerIo *            io;
285
286    tr_publisher_t *       publisher;
287
288    struct evbuffer *      outBlock; /* buffer of the current piece message */
289    struct evbuffer *      outMessages; /* all the non-piece messages */
290
291    struct request_list    peerAskedFor;
292    struct request_list    peerAskedForFast;
293    struct request_list    clientAskedFor;
294    struct request_list    clientWillAskFor;
295
296    tr_timer *             pexTimer;
297
298    time_t                 clientSentPexAt;
299    time_t                 clientSentAnythingAt;
300
301    /* when we started batching the outMessages */
302    time_t                outMessagesBatchedAt;
303
304    tr_bitfield *         peerAllowedPieces;
305
306    struct tr_incoming    incoming;
307
308    tr_pex *              pex;
309};
310
311/**
312***
313**/
314
315static void
316myDebug( const char *               file,
317         int                        line,
318         const struct tr_peermsgs * msgs,
319         const char *               fmt,
320         ... )
321{
322    FILE * fp = tr_getLog( );
323
324    if( fp )
325    {
326        va_list           args;
327        char              timestr[64];
328        struct evbuffer * buf = evbuffer_new( );
329        char *            base = tr_basename( file );
330
331        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
332                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
333                             msgs->torrent->info.name,
334                             tr_peerIoGetAddrStr( msgs->io ),
335                             msgs->info->client );
336        va_start( args, fmt );
337        evbuffer_add_vprintf( buf, fmt, args );
338        va_end( args );
339        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
340        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
341
342        tr_free( base );
343        evbuffer_free( buf );
344    }
345}
346
347#define dbgmsg( msgs, ... ) myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ )
348
349/**
350***
351**/
352
353static void
354pokeBatchPeriod( tr_peermsgs * msgs,
355                 int           interval )
356{
357    if( msgs->outMessagesBatchPeriod > interval )
358    {
359        msgs->outMessagesBatchPeriod = interval;
360        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
361    }
362}
363
364static void
365protocolSendRequest( tr_peermsgs *               msgs,
366                     const struct peer_request * req )
367{
368    tr_peerIo *       io = msgs->io;
369    struct evbuffer * out = msgs->outMessages;
370
371    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
372    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
373    tr_peerIoWriteUint32( io, out, req->index );
374    tr_peerIoWriteUint32( io, out, req->offset );
375    tr_peerIoWriteUint32( io, out, req->length );
376    dbgmsg( msgs, "requesting %u:%u->%u... outMessage size is now %d",
377           req->index, req->offset, req->length, (int)EVBUFFER_LENGTH( out ) );
378    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
379}
380
381static void
382protocolSendCancel( tr_peermsgs *               msgs,
383                    const struct peer_request * req )
384{
385    tr_peerIo *       io = msgs->io;
386    struct evbuffer * out = msgs->outMessages;
387
388    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
389    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
390    tr_peerIoWriteUint32( io, out, req->index );
391    tr_peerIoWriteUint32( io, out, req->offset );
392    tr_peerIoWriteUint32( io, out, req->length );
393    dbgmsg( msgs, "cancelling %u:%u->%u... outMessage size is now %d",
394           req->index, req->offset, req->length, (int)EVBUFFER_LENGTH( out ) );
395    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
396}
397
398static void
399protocolSendHave( tr_peermsgs * msgs,
400                  uint32_t      index )
401{
402    tr_peerIo *       io = msgs->io;
403    struct evbuffer * out = msgs->outMessages;
404
405    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + sizeof( uint32_t ) );
406    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
407    tr_peerIoWriteUint32( io, out, index );
408    dbgmsg( msgs, "sending Have %u.. outMessage size is now %d",
409           index, (int)EVBUFFER_LENGTH( out ) );
410    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
411}
412
413static void
414protocolSendChoke( tr_peermsgs * msgs,
415                   int           choke )
416{
417    tr_peerIo *       io = msgs->io;
418    struct evbuffer * out = msgs->outMessages;
419
420    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
421    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
422    dbgmsg( msgs, "sending %s... outMessage size is now %d",
423           ( choke ? "Choke" : "Unchoke" ),
424           (int)EVBUFFER_LENGTH( out ) );
425    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
426}
427
428/**
429***  EVENTS
430**/
431
432static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0 };
433
434static void
435publish( tr_peermsgs *   msgs,
436         tr_peer_event * e )
437{
438    tr_publisherPublish( msgs->publisher, msgs->info, e );
439}
440
441static void
442fireError( tr_peermsgs * msgs,
443           int           err )
444{
445    tr_peer_event e = blankEvent;
446
447    e.eventType = TR_PEER_ERROR;
448    e.err = err;
449    publish( msgs, &e );
450}
451
452static void
453fireNeedReq( tr_peermsgs * msgs )
454{
455    tr_peer_event e = blankEvent;
456
457    e.eventType = TR_PEER_NEED_REQ;
458    publish( msgs, &e );
459}
460
461static void
462firePeerProgress( tr_peermsgs * msgs )
463{
464    tr_peer_event e = blankEvent;
465
466    e.eventType = TR_PEER_PEER_PROGRESS;
467    e.progress = msgs->info->progress;
468    publish( msgs, &e );
469}
470
471static void
472fireGotBlock( tr_peermsgs *               msgs,
473              const struct peer_request * req )
474{
475    tr_peer_event e = blankEvent;
476
477    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
478    e.pieceIndex = req->index;
479    e.offset = req->offset;
480    e.length = req->length;
481    publish( msgs, &e );
482}
483
484static void
485fireClientGotData( tr_peermsgs * msgs,
486                   uint32_t      length )
487{
488    tr_peer_event e = blankEvent;
489
490    e.length = length;
491    e.eventType = TR_PEER_CLIENT_GOT_DATA;
492    publish( msgs, &e );
493}
494
495static void
496firePeerGotData( tr_peermsgs * msgs,
497                 uint32_t      length )
498{
499    tr_peer_event e = blankEvent;
500
501    e.length = length;
502    e.eventType = TR_PEER_PEER_GOT_DATA;
503    publish( msgs, &e );
504}
505
506static void
507fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
508{
509    tr_peer_event e = blankEvent;
510    e.eventType = TR_PEER_CANCEL;
511    e.pieceIndex = req->index;
512    e.offset = req->offset;
513    e.length = req->length;
514    publish( msgs, &e );
515}
516
517/**
518***  INTEREST
519**/
520
521static int
522isPieceInteresting( const tr_peermsgs * peer,
523                    tr_piece_index_t    piece )
524{
525    const tr_torrent * torrent = peer->torrent;
526
527    return ( !torrent->info.pieces[piece].dnd )                 /* we want it */
528           && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have
529                                                                        */
530           && ( tr_bitfieldHas( peer->info->have, piece ) );   /* peer has it */
531}
532
533/* "interested" means we'll ask for piece data if they unchoke us */
534static int
535isPeerInteresting( const tr_peermsgs * msgs )
536{
537    tr_piece_index_t    i;
538    const tr_torrent *  torrent;
539    const tr_bitfield * bitfield;
540    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
541
542    if( clientIsSeed )
543        return FALSE;
544
545    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
546        return FALSE;
547
548    torrent = msgs->torrent;
549    bitfield = tr_cpPieceBitfield( torrent->completion );
550
551    if( !msgs->info->have )
552        return TRUE;
553
554    assert( bitfield->byteCount == msgs->info->have->byteCount );
555
556    for( i = 0; i < torrent->info.pieceCount; ++i )
557        if( isPieceInteresting( msgs, i ) )
558            return TRUE;
559
560    return FALSE;
561}
562
563static void
564sendInterest( tr_peermsgs * msgs,
565              int           weAreInterested )
566{
567    struct evbuffer * out = msgs->outMessages;
568
569    assert( msgs );
570    assert( weAreInterested == 0 || weAreInterested == 1 );
571
572    msgs->info->clientIsInterested = weAreInterested;
573    dbgmsg( msgs, "Sending %s",
574            weAreInterested ? "Interested" : "Not Interested" );
575    tr_peerIoWriteUint32( msgs->io, out, sizeof( uint8_t ) );
576    tr_peerIoWriteUint8 (
577        msgs->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
578    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
579    dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH( out ) );
580}
581
582static void
583updateInterest( tr_peermsgs * msgs )
584{
585    const int i = isPeerInteresting( msgs );
586
587    if( i != msgs->info->clientIsInterested )
588        sendInterest( msgs, i );
589    if( i )
590        fireNeedReq( msgs );
591}
592
593static void
594cancelAllRequestsToClientExceptFast( tr_peermsgs * msgs )
595{
596    reqListClear( &msgs->peerAskedFor );
597}
598
599void
600tr_peerMsgsSetChoke( tr_peermsgs * msgs,
601                     int           choke )
602{
603    const time_t now = time( NULL );
604    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
605
606    assert( msgs );
607    assert( msgs->info );
608    assert( choke == 0 || choke == 1 );
609
610    if( msgs->info->chokeChangedAt > fibrillationTime )
611    {
612        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation",
613                choke );
614    }
615    else if( msgs->info->peerIsChoked != choke )
616    {
617        msgs->info->peerIsChoked = choke;
618        if( choke )
619            cancelAllRequestsToClientExceptFast( msgs );
620        protocolSendChoke( msgs, choke );
621        msgs->info->chokeChangedAt = now;
622    }
623}
624
625/**
626***
627**/
628
629void
630tr_peerMsgsHave( tr_peermsgs * msgs,
631                 uint32_t      index )
632{
633    protocolSendHave( msgs, index );
634
635    /* since we have more pieces now, we might not be interested in this peer */
636    updateInterest( msgs );
637}
638
639#if 0
640static void
641sendFastSuggest( tr_peermsgs * msgs,
642                 uint32_t      pieceIndex )
643{
644    assert( msgs );
645
646    if( tr_peerIoSupportsFEXT( msgs->io ) )
647    {
648        tr_peerIoWriteUint32( msgs->io, msgs->outMessages,
649                             sizeof( uint8_t ) + sizeof( uint32_t ) );
650        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
651        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
652    }
653}
654
655static void
656sendFastHave( tr_peermsgs * msgs,
657              int           all )
658{
659    assert( msgs );
660
661    if( tr_peerIoSupportsFEXT( msgs->io ) )
662    {
663        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof( uint8_t ) );
664        tr_peerIoWriteUint8( msgs->io, msgs->outMessages,
665                            ( all ? BT_HAVE_ALL
666                              : BT_HAVE_NONE ) );
667        updateInterest( msgs );
668    }
669}
670
671#endif
672
673static void
674sendFastReject( tr_peermsgs * msgs,
675                uint32_t      pieceIndex,
676                uint32_t      offset,
677                uint32_t      length )
678{
679    assert( msgs );
680
681    if( tr_peerIoSupportsFEXT( msgs->io ) )
682    {
683        struct evbuffer * out = msgs->outMessages;
684        const uint32_t    len = sizeof( uint8_t ) + 3 * sizeof( uint32_t );
685        dbgmsg( msgs, "sending fast reject %u:%u->%u", pieceIndex, offset,
686                length );
687        tr_peerIoWriteUint32( msgs->io, out, len );
688        tr_peerIoWriteUint8( msgs->io, out, BT_REJECT );
689        tr_peerIoWriteUint32( msgs->io, out, pieceIndex );
690        tr_peerIoWriteUint32( msgs->io, out, offset );
691        tr_peerIoWriteUint32( msgs->io, out, length );
692        pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
693        dbgmsg( msgs, "outMessage size is now %d",
694               (int)EVBUFFER_LENGTH( out ) );
695    }
696}
697
698static tr_bitfield*
699getPeerAllowedPieces( tr_peermsgs * msgs )
700{
701    if( !msgs->peerAllowedPieces && tr_peerIoSupportsFEXT( msgs->io ) )
702    {
703        msgs->peerAllowedPieces = tr_peerMgrGenerateAllowedSet(
704            MAX_FAST_ALLOWED_COUNT,
705            msgs->torrent->info.pieceCount,
706            msgs->torrent->info.hash,
707            tr_peerIoGetAddress( msgs->io, NULL ) );
708    }
709
710    return msgs->peerAllowedPieces;
711}
712
713static void
714sendFastAllowed( tr_peermsgs * msgs,
715                 uint32_t      pieceIndex )
716{
717    assert( msgs );
718
719    if( tr_peerIoSupportsFEXT( msgs->io ) )
720    {
721        struct evbuffer * out = msgs->outMessages;
722        dbgmsg( msgs, "sending fast allowed" );
723        tr_peerIoWriteUint32( msgs->io, out,  sizeof( uint8_t ) +
724                             sizeof( uint32_t ) );
725        tr_peerIoWriteUint8( msgs->io, out, BT_ALLOWED_FAST );
726        tr_peerIoWriteUint32( msgs->io, out, pieceIndex );
727        pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
728        dbgmsg( msgs, "outMessage size is now %d",
729               (int)EVBUFFER_LENGTH( out ) );
730    }
731}
732
733static void
734sendFastAllowedSet( tr_peermsgs * msgs )
735{
736    tr_piece_index_t i = 0;
737
738    while( i <= msgs->torrent->info.pieceCount )
739    {
740        if( tr_bitfieldHas( getPeerAllowedPieces( msgs ), i ) )
741            sendFastAllowed( msgs, i );
742        i++;
743    }
744}
745
746static void
747maybeSendFastAllowedSet( tr_peermsgs * msgs )
748{
749    if( tr_bitfieldCountTrueBits( msgs->info->have ) <=
750        MAX_FAST_ALLOWED_THRESHOLD )
751        sendFastAllowedSet( msgs );
752}
753
754/**
755***
756**/
757
758static int
759reqIsValid( const tr_peermsgs * peer,
760            uint32_t            index,
761            uint32_t            offset,
762            uint32_t            length )
763{
764    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
765}
766
767static int
768requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
769{
770    return reqIsValid( msgs, req->index, req->offset, req->length );
771}
772
773static void
774expireOldRequests( tr_peermsgs * msgs, const time_t now  )
775{
776    int                 i;
777    time_t              oldestAllowed;
778    struct request_list tmp = REQUEST_LIST_INIT;
779
780    /* cancel requests that have been queued for too long */
781    oldestAllowed = now - QUEUED_REQUEST_TTL_SECS;
782    reqListCopy( &tmp, &msgs->clientWillAskFor );
783    for( i = 0; i < tmp.count; ++i )
784    {
785        const struct peer_request * req = &tmp.requests[i];
786        if( req->time_requested < oldestAllowed )
787            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
788    }
789    reqListClear( &tmp );
790
791    /* cancel requests that were sent too long ago */
792    oldestAllowed = now - SENT_REQUEST_TTL_SECS;
793    reqListCopy( &tmp, &msgs->clientAskedFor );
794    for( i = 0; i < tmp.count; ++i )
795    {
796        const struct peer_request * req = &tmp.requests[i];
797        if( req->time_requested < oldestAllowed )
798            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
799    }
800    reqListClear( &tmp );
801}
802
803static void
804pumpRequestQueue( tr_peermsgs * msgs, const time_t now )
805{
806    const int           max = msgs->maxActiveRequests;
807    const int           min = msgs->minActiveRequests;
808    int                 sent = 0;
809    int                 count = msgs->clientAskedFor.count;
810    struct peer_request req;
811
812    if( count > min )
813        return;
814    if( msgs->info->clientIsChoked )
815        return;
816    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
817        return;
818
819    while( ( count < max ) && reqListPop( &msgs->clientWillAskFor, &req ) )
820    {
821        const tr_block_index_t block =
822            _tr_block( msgs->torrent, req.index, req.offset );
823
824        assert( requestIsValid( msgs, &req ) );
825        assert( tr_bitfieldHas( msgs->info->have, req.index ) );
826
827        /* don't ask for it if we've already got it... this block may have
828         * come in from a different peer after we cancelled a request for it */
829        if( !tr_cpBlockIsComplete( msgs->torrent->completion, block ) )
830        {
831            protocolSendRequest( msgs, &req );
832            req.time_requested = now;
833            reqListAppend( &msgs->clientAskedFor, &req );
834
835            ++count;
836            ++sent;
837        }
838    }
839
840    if( sent )
841        dbgmsg( msgs,
842                "pump sent %d requests, now have %d active and %d queued",
843                sent,
844                msgs->clientAskedFor.count,
845                msgs->clientWillAskFor.count );
846
847    if( count < max )
848        fireNeedReq( msgs );
849}
850
851static int
852requestQueueIsFull( const tr_peermsgs * msgs )
853{
854    const int req_max = msgs->maxActiveRequests;
855    return msgs->clientWillAskFor.count >= req_max;
856}
857
858tr_addreq_t
859tr_peerMsgsAddRequest( tr_peermsgs *    msgs,
860                       uint32_t         index,
861                       uint32_t         offset,
862                       uint32_t         length )
863{
864    struct peer_request req;
865
866    assert( msgs );
867    assert( msgs->torrent );
868    assert( reqIsValid( msgs, index, offset, length ) );
869
870    /**
871    ***  Reasons to decline the request
872    **/
873
874    /* don't send requests to choked clients */
875    if( msgs->info->clientIsChoked )
876    {
877        dbgmsg( msgs, "declining request because they're choking us" );
878        return TR_ADDREQ_CLIENT_CHOKED;
879    }
880
881    /* peer doesn't have this piece */
882    if( !tr_bitfieldHas( msgs->info->have, index ) )
883        return TR_ADDREQ_MISSING;
884
885    /* peer's queue is full */
886    if( requestQueueIsFull( msgs ) ) {
887        dbgmsg( msgs, "declining request because we're full" );
888        return TR_ADDREQ_FULL;
889    }
890
891    /* have we already asked for this piece? */
892    req.index = index;
893    req.offset = offset;
894    req.length = length;
895    if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
896        dbgmsg( msgs, "declining because it's a duplicate" );
897        return TR_ADDREQ_DUPLICATE;
898    }
899    if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
900        dbgmsg( msgs, "declining because it's a duplicate" );
901        return TR_ADDREQ_DUPLICATE;
902    }
903
904    /**
905    ***  Accept this request
906    **/
907
908    dbgmsg( msgs, "added req for piece %lu", (unsigned long)index );
909    req.time_requested = time( NULL );
910    reqListAppend( &msgs->clientWillAskFor, &req );
911    return TR_ADDREQ_OK;
912}
913
914static void
915cancelAllRequestsToPeer( tr_peermsgs * msgs )
916{
917    int                 i;
918    struct request_list a = msgs->clientWillAskFor;
919    struct request_list b = msgs->clientAskedFor;
920
921    msgs->clientAskedFor = REQUEST_LIST_INIT;
922    msgs->clientWillAskFor = REQUEST_LIST_INIT;
923
924    for( i=0; i<a.count; ++i )
925        fireCancelledReq( msgs, &a.requests[i] );
926
927    for( i = 0; i < b.count; ++i ) {
928        fireCancelledReq( msgs, &b.requests[i] );
929        protocolSendCancel( msgs, &b.requests[i] );
930    }
931
932    reqListClear( &a );
933    reqListClear( &b );
934}
935
936void
937tr_peerMsgsCancel( tr_peermsgs * msgs,
938                   uint32_t      pieceIndex,
939                   uint32_t      offset,
940                   uint32_t      length )
941{
942    struct peer_request req;
943
944    assert( msgs != NULL );
945    assert( length > 0 );
946
947    /* have we asked the peer for this piece? */
948    req.index = pieceIndex;
949    req.offset = offset;
950    req.length = length;
951
952    /* if it's only in the queue and hasn't been sent yet, free it */
953    if( reqListRemove( &msgs->clientWillAskFor, &req ) )
954        fireCancelledReq( msgs, &req );
955
956    /* if it's already been sent, send a cancel message too */
957    if( reqListRemove( &msgs->clientAskedFor, &req ) ) {
958        protocolSendCancel( msgs, &req );
959        fireCancelledReq( msgs, &req );
960    }
961}
962
963/**
964***
965**/
966
967static void
968sendLtepHandshake( tr_peermsgs * msgs )
969{
970    tr_benc           val, *m;
971    char *            buf;
972    int               len;
973    int               pex;
974    struct evbuffer * out = msgs->outMessages;
975
976    if( msgs->clientSentLtepHandshake )
977        return;
978
979    dbgmsg( msgs, "sending an ltep handshake" );
980    msgs->clientSentLtepHandshake = 1;
981
982    /* decide if we want to advertise pex support */
983    if( !tr_torrentAllowsPex( msgs->torrent ) )
984        pex = 0;
985    else if( msgs->peerSentLtepHandshake )
986        pex = msgs->peerSupportsPex ? 1 : 0;
987    else
988        pex = 1;
989
990    tr_bencInitDict( &val, 4 );
991    tr_bencDictAddInt( &val, "e",
992                       msgs->session->encryptionMode != TR_CLEAR_PREFERRED );
993    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->session ) );
994    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
995    m  = tr_bencDictAddDict( &val, "m", 1 );
996    if( pex )
997        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
998    buf = tr_bencSave( &val, &len );
999
1000    tr_peerIoWriteUint32( msgs->io, out, 2 * sizeof( uint8_t ) + len );
1001    tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
1002    tr_peerIoWriteUint8 ( msgs->io, out, LTEP_HANDSHAKE );
1003    tr_peerIoWriteBytes ( msgs->io, out, buf, len );
1004    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1005    dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH( out ) );
1006
1007    /* cleanup */
1008    tr_bencFree( &val );
1009    tr_free( buf );
1010}
1011
1012static void
1013parseLtepHandshake( tr_peermsgs *     msgs,
1014                    int               len,
1015                    struct evbuffer * inbuf )
1016{
1017    int64_t   i;
1018    tr_benc   val, * sub;
1019    uint8_t * tmp = tr_new( uint8_t, len );
1020
1021    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
1022    msgs->peerSentLtepHandshake = 1;
1023
1024    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type != TYPE_DICT )
1025    {
1026        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
1027        tr_free( tmp );
1028        return;
1029    }
1030
1031    dbgmsg( msgs, "here is the ltep handshake we got [%*.*s]", len, len,
1032            tmp );
1033
1034    /* does the peer prefer encrypted connections? */
1035    if( tr_bencDictFindInt( &val, "e", &i ) )
1036        msgs->info->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1037                                            : ENCRYPTION_PREFERENCE_NO;
1038
1039    /* check supported messages for utorrent pex */
1040    msgs->peerSupportsPex = 0;
1041    if( tr_bencDictFindDict( &val, "m", &sub ) )
1042    {
1043        if( tr_bencDictFindInt( sub, "ut_pex", &i ) )
1044        {
1045            msgs->ut_pex_id = (uint8_t) i;
1046            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
1047            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
1048        }
1049    }
1050
1051    /* get peer's listening port */
1052    if( tr_bencDictFindInt( &val, "p", &i ) )
1053    {
1054        msgs->info->port = htons( (uint16_t)i );
1055        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
1056    }
1057
1058    tr_bencFree( &val );
1059    tr_free( tmp );
1060}
1061
1062static void
1063parseUtPex( tr_peermsgs *     msgs,
1064            int               msglen,
1065            struct evbuffer * inbuf )
1066{
1067    int                loaded = 0;
1068    uint8_t *          tmp = tr_new( uint8_t, msglen );
1069    tr_benc            val;
1070    const tr_torrent * tor = msgs->torrent;
1071    const uint8_t *    added;
1072    size_t             added_len;
1073
1074    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
1075
1076    if( tr_torrentAllowsPex( tor )
1077      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) )
1078      && tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1079    {
1080        const uint8_t * added_f = NULL;
1081        tr_pex *        pex;
1082        size_t          i, n;
1083        size_t          added_f_len = 0;
1084        tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1085        pex =
1086            tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len,
1087                                    &n );
1088        for( i = 0; i < n; ++i )
1089            tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1090                              TR_PEER_FROM_PEX, pex + i );
1091        tr_free( pex );
1092    }
1093
1094    if( loaded )
1095        tr_bencFree( &val );
1096    tr_free( tmp );
1097}
1098
1099static void sendPex( tr_peermsgs * msgs );
1100
1101static void
1102parseLtep( tr_peermsgs *     msgs,
1103           int               msglen,
1104           struct evbuffer * inbuf )
1105{
1106    uint8_t ltep_msgid;
1107
1108    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
1109    msglen--;
1110
1111    if( ltep_msgid == LTEP_HANDSHAKE )
1112    {
1113        dbgmsg( msgs, "got ltep handshake" );
1114        parseLtepHandshake( msgs, msglen, inbuf );
1115        if( tr_peerIoSupportsLTEP( msgs->io ) )
1116        {
1117            sendLtepHandshake( msgs );
1118            sendPex( msgs );
1119        }
1120    }
1121    else if( ltep_msgid == TR_LTEP_PEX )
1122    {
1123        dbgmsg( msgs, "got ut pex" );
1124        msgs->peerSupportsPex = 1;
1125        parseUtPex( msgs, msglen, inbuf );
1126    }
1127    else
1128    {
1129        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1130        evbuffer_drain( inbuf, msglen );
1131    }
1132}
1133
1134static int
1135readBtLength( tr_peermsgs *     msgs,
1136              struct evbuffer * inbuf,
1137              size_t            inlen )
1138{
1139    uint32_t len;
1140
1141    if( inlen < sizeof( len ) )
1142        return READ_LATER;
1143
1144    tr_peerIoReadUint32( msgs->io, inbuf, &len );
1145
1146    if( len == 0 ) /* peer sent us a keepalive message */
1147        dbgmsg( msgs, "got KeepAlive" );
1148    else
1149    {
1150        msgs->incoming.length = len;
1151        msgs->state = AWAITING_BT_ID;
1152    }
1153
1154    return READ_NOW;
1155}
1156
1157static int readBtMessage( tr_peermsgs *     msgs,
1158                          struct evbuffer * inbuf,
1159                          size_t            inlen );
1160
1161static int
1162readBtId( tr_peermsgs *     msgs,
1163          struct evbuffer * inbuf,
1164          size_t            inlen )
1165{
1166    uint8_t id;
1167
1168    if( inlen < sizeof( uint8_t ) )
1169        return READ_LATER;
1170
1171    tr_peerIoReadUint8( msgs->io, inbuf, &id );
1172    msgs->incoming.id = id;
1173
1174    if( id == BT_PIECE )
1175    {
1176        msgs->state = AWAITING_BT_PIECE;
1177        return READ_NOW;
1178    }
1179    else if( msgs->incoming.length != 1 )
1180    {
1181        msgs->state = AWAITING_BT_MESSAGE;
1182        return READ_NOW;
1183    }
1184    else return readBtMessage( msgs, inbuf, inlen - 1 );
1185}
1186
1187static void
1188updatePeerProgress( tr_peermsgs * msgs )
1189{
1190    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have )
1191                           / (float)msgs->torrent->info.pieceCount;
1192    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
1193    updateInterest( msgs );
1194    firePeerProgress( msgs );
1195}
1196
1197static int
1198clientCanSendFastBlock( const tr_peermsgs * msgs UNUSED )
1199{
1200    /* don't send a fast piece if peer has MAX_FAST_ALLOWED_THRESHOLD pieces */
1201    if( tr_bitfieldCountTrueBits( msgs->info->have ) >
1202        MAX_FAST_ALLOWED_THRESHOLD )
1203        return FALSE;
1204
1205    /* ...or if we don't have ourself enough pieces */
1206    if( tr_bitfieldCountTrueBits( tr_cpPieceBitfield( msgs->torrent->
1207                                                      completion ) ) <
1208        MAX_FAST_ALLOWED_THRESHOLD )
1209        return FALSE;
1210
1211    /* Maybe a bandwidth limit ? */
1212    return TRUE;
1213}
1214
1215static void
1216peerMadeRequest( tr_peermsgs *               msgs,
1217                 const struct peer_request * req )
1218{
1219    const int reqIsValid = requestIsValid( msgs, req );
1220    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete(
1221        msgs->torrent->completion, req->index );
1222    const int peerIsChoked = msgs->info->peerIsChoked;
1223    const int peerIsFast = tr_peerIoSupportsFEXT( msgs->io );
1224    const int pieceIsFast = reqIsValid && tr_bitfieldHas(
1225        getPeerAllowedPieces( msgs ), req->index );
1226    const int canSendFast = clientCanSendFastBlock( msgs );
1227
1228    if( !reqIsValid ) /* bad request */
1229    {
1230        dbgmsg( msgs, "rejecting an invalid request." );
1231        sendFastReject( msgs, req->index, req->offset, req->length );
1232    }
1233    else if( !clientHasPiece ) /* we don't have it */
1234    {
1235        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1236        sendFastReject( msgs, req->index, req->offset, req->length );
1237    }
1238    else if( peerIsChoked && !peerIsFast ) /* doesn't he know he's choked? */
1239    {
1240        tr_peerMsgsSetChoke( msgs, 1 );
1241        sendFastReject( msgs, req->index, req->offset, req->length );
1242    }
1243    else if( peerIsChoked && peerIsFast && ( !pieceIsFast || !canSendFast ) )
1244    {
1245        sendFastReject( msgs, req->index, req->offset, req->length );
1246    }
1247    else /* YAY */
1248    {
1249        if( peerIsFast && pieceIsFast )
1250            reqListAppend( &msgs->peerAskedForFast, req );
1251        else
1252            reqListAppend( &msgs->peerAskedFor, req );
1253    }
1254}
1255
1256static int
1257messageLengthIsCorrect( const tr_peermsgs * msg,
1258                        uint8_t             id,
1259                        uint32_t            len )
1260{
1261    switch( id )
1262    {
1263        case BT_CHOKE:
1264        case BT_UNCHOKE:
1265        case BT_INTERESTED:
1266        case BT_NOT_INTERESTED:
1267        case BT_HAVE_ALL:
1268        case BT_HAVE_NONE:
1269            return len == 1;
1270
1271        case BT_HAVE:
1272        case BT_SUGGEST:
1273        case BT_ALLOWED_FAST:
1274            return len == 5;
1275
1276        case BT_BITFIELD:
1277            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1278
1279        case BT_REQUEST:
1280        case BT_CANCEL:
1281        case BT_REJECT:
1282            return len == 13;
1283
1284        case BT_PIECE:
1285            return len > 9 && len <= 16393;
1286
1287        case BT_PORT:
1288            return len == 3;
1289
1290        case BT_LTEP:
1291            return len >= 2;
1292
1293        default:
1294            return FALSE;
1295    }
1296}
1297
1298static int clientGotBlock( tr_peermsgs *               msgs,
1299                           const uint8_t *             block,
1300                           const struct peer_request * req );
1301
1302static void
1303clientGotBytes( tr_peermsgs * msgs,
1304                uint32_t      byteCount )
1305{
1306    msgs->info->pieceDataActivityDate = time( NULL );
1307    fireClientGotData( msgs, byteCount );
1308}
1309
1310static int
1311readBtPiece( tr_peermsgs *     msgs,
1312             struct evbuffer * inbuf,
1313             size_t            inlen )
1314{
1315    struct peer_request * req = &msgs->incoming.blockReq;
1316
1317    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1318    dbgmsg( msgs, "In readBtPiece" );
1319
1320    if( !req->length )
1321    {
1322        if( inlen < 8 )
1323            return READ_LATER;
1324
1325        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
1326        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
1327        req->length = msgs->incoming.length - 9;
1328        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index,
1329                req->offset,
1330                req->length );
1331        return READ_NOW;
1332    }
1333    else
1334    {
1335        int          err;
1336
1337        /* read in another chunk of data */
1338        const size_t nLeft = req->length - EVBUFFER_LENGTH(
1339            msgs->incoming.block );
1340        size_t       n = MIN( nLeft, inlen );
1341        uint8_t *    buf = tr_new( uint8_t, n );
1342        assert( EVBUFFER_LENGTH( inbuf ) >= n );
1343        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
1344        evbuffer_add( msgs->incoming.block, buf, n );
1345        clientGotBytes( msgs, n );
1346        tr_free( buf );
1347        dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
1348               (int)n, req->index, req->offset, req->length,
1349               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1350        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1351            return READ_LATER;
1352
1353        /* we've got the whole block ... process it */
1354        err = clientGotBlock( msgs, EVBUFFER_DATA(
1355                                  msgs->incoming.block ), req );
1356
1357        /* cleanup */
1358        evbuffer_drain( msgs->incoming.block,
1359                       EVBUFFER_LENGTH( msgs->incoming.block ) );
1360        req->length = 0;
1361        msgs->state = AWAITING_BT_LENGTH;
1362        if( !err )
1363            return READ_NOW;
1364        else
1365        {
1366            fireError( msgs, err );
1367            return READ_ERR;
1368        }
1369    }
1370}
1371
1372static int
1373readBtMessage( tr_peermsgs *     msgs,
1374               struct evbuffer * inbuf,
1375               size_t            inlen )
1376{
1377    uint32_t      ui32;
1378    uint32_t      msglen = msgs->incoming.length;
1379    const uint8_t id = msgs->incoming.id;
1380    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1381
1382    --msglen; /* id length */
1383
1384    if( inlen < msglen )
1385        return READ_LATER;
1386
1387    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id,
1388            (int)msglen,
1389            (int)inlen );
1390
1391    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1392    {
1393        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d",
1394                (int)id, (int)msglen );
1395        fireError( msgs, EMSGSIZE );
1396        return READ_ERR;
1397    }
1398
1399    switch( id )
1400    {
1401        case BT_CHOKE:
1402            dbgmsg( msgs, "got Choke" );
1403            msgs->info->clientIsChoked = 1;
1404            cancelAllRequestsToPeer( msgs );
1405            cancelAllRequestsToClientExceptFast( msgs );
1406            break;
1407
1408        case BT_UNCHOKE:
1409            dbgmsg( msgs, "got Unchoke" );
1410            msgs->info->clientIsChoked = 0;
1411            fireNeedReq( msgs );
1412            break;
1413
1414        case BT_INTERESTED:
1415            dbgmsg( msgs, "got Interested" );
1416            msgs->info->peerIsInterested = 1;
1417            break;
1418
1419        case BT_NOT_INTERESTED:
1420            dbgmsg( msgs, "got Not Interested" );
1421            msgs->info->peerIsInterested = 0;
1422            break;
1423
1424        case BT_HAVE:
1425            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1426            dbgmsg( msgs, "got Have: %u", ui32 );
1427            if( tr_bitfieldAdd( msgs->info->have, ui32 ) )
1428                fireError( msgs, ERANGE );
1429            updatePeerProgress( msgs );
1430            tr_rcTransferred( msgs->torrent->swarmSpeed,
1431                              msgs->torrent->info.pieceSize );
1432            break;
1433
1434        case BT_BITFIELD:
1435        {
1436            dbgmsg( msgs, "got a bitfield" );
1437            msgs->peerSentBitfield = 1;
1438            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits,
1439                                msglen );
1440            updatePeerProgress( msgs );
1441            maybeSendFastAllowedSet( msgs );
1442            fireNeedReq( msgs );
1443            break;
1444        }
1445
1446        case BT_REQUEST:
1447        {
1448            struct peer_request r;
1449            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1450            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1451            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1452            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset,
1453                    r.length );
1454            peerMadeRequest( msgs, &r );
1455            break;
1456        }
1457
1458        case BT_CANCEL:
1459        {
1460            struct peer_request r;
1461            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1462            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1463            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1464            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset,
1465                    r.length );
1466            reqListRemove( &msgs->peerAskedForFast, &r );
1467            reqListRemove( &msgs->peerAskedFor, &r );
1468            break;
1469        }
1470
1471        case BT_PIECE:
1472            assert( 0 ); /* handled elsewhere! */
1473            break;
1474
1475        case BT_PORT:
1476            dbgmsg( msgs, "Got a BT_PORT" );
1477            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1478            break;
1479
1480        case BT_SUGGEST:
1481        {
1482            dbgmsg( msgs, "Got a BT_SUGGEST" );
1483            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1484            /* we don't do anything with this yet */
1485            break;
1486        }
1487
1488        case BT_HAVE_ALL:
1489            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1490            tr_bitfieldAddRange( msgs->info->have, 0,
1491                                 msgs->torrent->info.pieceCount );
1492            updatePeerProgress( msgs );
1493            maybeSendFastAllowedSet( msgs );
1494            break;
1495
1496
1497        case BT_HAVE_NONE:
1498            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1499            tr_bitfieldClear( msgs->info->have );
1500            updatePeerProgress( msgs );
1501            maybeSendFastAllowedSet( msgs );
1502            break;
1503
1504        case BT_REJECT:
1505        {
1506            struct peer_request r;
1507            dbgmsg( msgs, "Got a BT_REJECT" );
1508            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1509            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1510            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1511            reqListRemove( &msgs->clientAskedFor, &r );
1512            break;
1513        }
1514
1515        case BT_ALLOWED_FAST:
1516        {
1517            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1518            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1519            /* we don't do anything with this yet */
1520            break;
1521        }
1522
1523        case BT_LTEP:
1524            dbgmsg( msgs, "Got a BT_LTEP" );
1525            parseLtep( msgs, msglen, inbuf );
1526            break;
1527
1528        default:
1529            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1530            tr_peerIoDrain( msgs->io, inbuf, msglen );
1531            break;
1532    }
1533
1534    assert( msglen + 1 == msgs->incoming.length );
1535    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1536
1537    msgs->state = AWAITING_BT_LENGTH;
1538    return READ_NOW;
1539}
1540
1541static void
1542peerGotBytes( tr_peermsgs * msgs,
1543              uint32_t      byteCount,
1544              const time_t  now )
1545{
1546    msgs->info->pieceDataActivityDate = now;
1547    firePeerGotData( msgs, byteCount );
1548}
1549
1550static void
1551decrementDownloadedCount( tr_peermsgs * msgs,
1552                          uint32_t      byteCount )
1553{
1554    tr_torrent * tor = msgs->torrent;
1555
1556    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1557}
1558
1559static void
1560clientGotUnwantedBlock( tr_peermsgs *               msgs,
1561                        const struct peer_request * req )
1562{
1563    decrementDownloadedCount( msgs, req->length );
1564}
1565
1566static void
1567addPeerToBlamefield( tr_peermsgs * msgs,
1568                     uint32_t      index )
1569{
1570    if( !msgs->info->blame )
1571        msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1572    tr_bitfieldAdd( msgs->info->blame, index );
1573}
1574
1575/* returns 0 on success, or an errno on failure */
1576static int
1577clientGotBlock( tr_peermsgs *               msgs,
1578                const uint8_t *             data,
1579                const struct peer_request * req )
1580{
1581    int                    err;
1582    tr_torrent *           tor = msgs->torrent;
1583    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1584
1585    assert( msgs );
1586    assert( req );
1587
1588    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) )
1589    {
1590        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1591                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1592        return EMSGSIZE;
1593    }
1594
1595    /* save the block */
1596    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset,
1597            req->length );
1598
1599    /**
1600    *** Remove the block from our `we asked for this' list
1601    **/
1602
1603    if( !reqListRemove( &msgs->clientAskedFor, req ) )
1604    {
1605        clientGotUnwantedBlock( msgs, req );
1606        dbgmsg( msgs, "we didn't ask for this message..." );
1607        return 0;
1608    }
1609
1610    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1611            msgs->clientAskedFor.count );
1612
1613    /**
1614    *** Error checks
1615    **/
1616
1617    if( tr_cpBlockIsComplete( tor->completion, block ) )
1618    {
1619        dbgmsg( msgs, "we have this block already..." );
1620        clientGotUnwantedBlock( msgs, req );
1621        return 0;
1622    }
1623
1624    /**
1625    ***  Save the block
1626    **/
1627
1628    msgs->info->peerSentPieceDataAt = time( NULL );
1629    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1630        return err;
1631
1632    addPeerToBlamefield( msgs, req->index );
1633    fireGotBlock( msgs, req );
1634    return 0;
1635}
1636
1637static ReadState
1638canRead( struct bufferevent * evin,
1639         void *               vmsgs )
1640{
1641    ReadState         ret;
1642    tr_peermsgs *     msgs = vmsgs;
1643    struct evbuffer * in = EVBUFFER_INPUT ( evin );
1644    const size_t      inlen = EVBUFFER_LENGTH( in );
1645
1646    if( !inlen )
1647    {
1648        ret = READ_LATER;
1649    }
1650    else if( msgs->state == AWAITING_BT_PIECE )
1651    {
1652        ret = inlen ? readBtPiece( msgs, in, inlen ) : READ_LATER;
1653    }
1654    else switch( msgs->state )
1655        {
1656            case AWAITING_BT_LENGTH:
1657                ret = readBtLength ( msgs, in, inlen ); break;
1658
1659            case AWAITING_BT_ID:
1660                ret = readBtId     ( msgs, in, inlen ); break;
1661
1662            case AWAITING_BT_MESSAGE:
1663                ret = readBtMessage( msgs, in, inlen ); break;
1664
1665            default:
1666                assert( 0 );
1667        }
1668
1669    return ret;
1670}
1671
1672static void
1673sendKeepalive( tr_peermsgs * msgs )
1674{
1675    dbgmsg( msgs, "sending a keepalive message" );
1676    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1677    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1678}
1679
1680/**
1681***
1682**/
1683
1684static int
1685ratePulse( void * vpeer )
1686{
1687    tr_peermsgs * peer = vpeer;
1688    const double rateToClient = tr_peerGetPieceSpeed( peer->info,
1689                                                      TR_PEER_TO_CLIENT );
1690    const int estimatedBlocksInNext30Seconds =
1691                  ( rateToClient * 30 * 1024 ) / peer->torrent->blockSize;
1692
1693    peer->minActiveRequests = 4;
1694    peer->maxActiveRequests = peer->minActiveRequests +
1695                              estimatedBlocksInNext30Seconds;
1696    return TRUE;
1697}
1698
1699static int
1700popNextRequest( tr_peermsgs *         msgs,
1701                struct peer_request * setme )
1702{
1703    return reqListPop( &msgs->peerAskedForFast, setme )
1704        || reqListPop( &msgs->peerAskedFor, setme );
1705}
1706
1707static int
1708peerPulse( void * vmsgs )
1709{
1710    tr_peermsgs * msgs = vmsgs;
1711    const time_t  now = time( NULL );
1712
1713    ratePulse( msgs );
1714
1715    /*tr_peerIoTryRead( msgs->io );*/
1716    pumpRequestQueue( msgs, now );
1717    expireOldRequests( msgs, now );
1718
1719    if( msgs->sendingBlock )
1720    {
1721        const size_t uploadMax = tr_peerIoGetWriteBufferSpace( msgs->io );
1722        size_t       len = EVBUFFER_LENGTH( msgs->outBlock );
1723        const size_t outlen = MIN( len, uploadMax );
1724
1725        assert( len );
1726
1727        if( outlen )
1728        {
1729            tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen );
1730            evbuffer_drain( msgs->outBlock, outlen );
1731            peerGotBytes( msgs, outlen, now );
1732
1733            len -= outlen;
1734            msgs->clientSentAnythingAt = now;
1735            msgs->sendingBlock = len != 0;
1736
1737            dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen,
1738                    (int)len );
1739        }
1740        else dbgmsg( msgs,
1741                     "stalled writing block... uploadMax %lu, outlen %lu",
1742                     uploadMax, outlen );
1743    }
1744
1745    if( !msgs->sendingBlock )
1746    {
1747        struct peer_request req;
1748        const int haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1749
1750        if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1751        {
1752            dbgmsg( msgs, "started an outMessages batch (length is %d)",
1753                   (int)EVBUFFER_LENGTH( msgs->outMessages ) );
1754            msgs->outMessagesBatchedAt = now;
1755        }
1756        else if( haveMessages
1757               && ( ( now - msgs->outMessagesBatchedAt ) >
1758                   msgs->outMessagesBatchPeriod ) )
1759        {
1760            dbgmsg( msgs, "flushing outMessages... (length is %d)",
1761                   (int)EVBUFFER_LENGTH(
1762                       msgs->outMessages ) );
1763            tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1764            msgs->clientSentAnythingAt = now;
1765            msgs->outMessagesBatchedAt = 0;
1766            msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1767        }
1768        else if( !EVBUFFER_LENGTH( msgs->outBlock )
1769               && popNextRequest( msgs, &req )
1770               && requestIsValid( msgs, &req )
1771               && tr_cpPieceIsComplete( msgs->torrent->completion,
1772                                        req.index ) )
1773        {
1774            uint8_t * buf = tr_new( uint8_t, req.length );
1775            const int err = tr_ioRead( msgs->torrent,
1776                                       req.index, req.offset, req.length,
1777                                       buf );
1778            if( err )
1779            {
1780                fireError( msgs, err );
1781            }
1782            else
1783            {
1784                tr_peerIo *       io = msgs->io;
1785                struct evbuffer * out = msgs->outBlock;
1786
1787                dbgmsg( msgs, "sending block %u:%u->%u", req.index,
1788                        req.offset,
1789                        req.length );
1790                tr_peerIoWriteUint32(
1791                    io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) +
1792                    req.length );
1793                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1794                tr_peerIoWriteUint32( io, out, req.index );
1795                tr_peerIoWriteUint32( io, out, req.offset );
1796                tr_peerIoWriteBytes ( io, out, buf, req.length );
1797                msgs->sendingBlock = 1;
1798            }
1799
1800            tr_free( buf );
1801        }
1802        else if( ( !haveMessages )
1803               && ( now - msgs->clientSentAnythingAt ) >
1804                KEEPALIVE_INTERVAL_SECS )
1805        {
1806            sendKeepalive( msgs );
1807        }
1808    }
1809
1810    return TRUE; /* loop forever */
1811}
1812
1813void
1814tr_peerMsgsPulse( tr_peermsgs * msgs )
1815{
1816    if( msgs != NULL )
1817        peerPulse( msgs );
1818}
1819
1820static void
1821gotError( struct bufferevent * evbuf UNUSED,
1822          short                      what,
1823          void *                     vmsgs )
1824{
1825    if( what & EVBUFFER_TIMEOUT )
1826        dbgmsg( vmsgs, "libevent got a timeout, what=%hd, secs=%d", what,
1827                evbuf->timeout_read );
1828    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1829        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1830               what, errno, tr_strerror( errno ) );
1831    fireError( vmsgs, ENOTCONN );
1832}
1833
1834static void
1835sendBitfield( tr_peermsgs * msgs )
1836{
1837    struct evbuffer * out = msgs->outMessages;
1838    tr_bitfield *     field;
1839    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
1840    size_t            i;
1841    size_t            lazyCount = 0;
1842
1843    field = tr_bitfieldDup( tr_cpPieceBitfield( msgs->torrent->completion ) );
1844
1845    if( tr_sessionIsLazyBitfieldEnabled( msgs->session ) )
1846    {
1847        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
1848            speed over a truly random sample -- let's limit the pool size to
1849            the first 1000 pieces so large torrents don't bog things down */
1850        size_t poolSize;
1851        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
1852        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
1853
1854        /* build the pool */
1855        for( i=poolSize=0; i<maxPoolSize; ++i )
1856            if( tr_bitfieldHas( field, i ) )
1857                pool[poolSize++] = i;
1858
1859        /* pull random piece indices from the pool */
1860        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
1861        {
1862            const int pos = tr_cryptoWeakRandInt( poolSize );
1863            const tr_piece_index_t piece = pool[pos];
1864            tr_bitfieldRem( field, piece );
1865            lazyPieces[lazyCount++] = piece;
1866            pool[pos] = pool[--poolSize];
1867        }
1868
1869        /* cleanup */
1870        tr_free( pool );
1871    }
1872
1873    tr_peerIoWriteUint32( msgs->io, out,
1874                          sizeof( uint8_t ) + field->byteCount );
1875    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1876    tr_peerIoWriteBytes ( msgs->io, out, field->bits, field->byteCount );
1877    dbgmsg( msgs, "sending bitfield... outMessage size is now %d",
1878           (int)EVBUFFER_LENGTH( out ) );
1879    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1880
1881    for( i = 0; i < lazyCount; ++i )
1882        protocolSendHave( msgs, lazyPieces[i] );
1883
1884    tr_bitfieldFree( field );
1885}
1886
1887/**
1888***
1889**/
1890
1891/* some peers give us error messages if we send
1892   more than this many peers in a single pex message
1893   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1894#define MAX_PEX_ADDED 50
1895#define MAX_PEX_DROPPED 50
1896
1897typedef struct
1898{
1899    tr_pex *  added;
1900    tr_pex *  dropped;
1901    tr_pex *  elements;
1902    int       addedCount;
1903    int       droppedCount;
1904    int       elementCount;
1905}
1906PexDiffs;
1907
1908static void
1909pexAddedCb( void * vpex,
1910            void * userData )
1911{
1912    PexDiffs * diffs = userData;
1913    tr_pex *   pex = vpex;
1914
1915    if( diffs->addedCount < MAX_PEX_ADDED )
1916    {
1917        diffs->added[diffs->addedCount++] = *pex;
1918        diffs->elements[diffs->elementCount++] = *pex;
1919    }
1920}
1921
1922static void
1923pexDroppedCb( void * vpex,
1924              void * userData )
1925{
1926    PexDiffs * diffs = userData;
1927    tr_pex *   pex = vpex;
1928
1929    if( diffs->droppedCount < MAX_PEX_DROPPED )
1930    {
1931        diffs->dropped[diffs->droppedCount++] = *pex;
1932    }
1933}
1934
1935static void
1936pexElementCb( void * vpex,
1937              void * userData )
1938{
1939    PexDiffs * diffs = userData;
1940    tr_pex *   pex = vpex;
1941
1942    diffs->elements[diffs->elementCount++] = *pex;
1943}
1944
1945static void
1946sendPex( tr_peermsgs * msgs )
1947{
1948    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
1949    {
1950        int               i;
1951        tr_pex *          newPex = NULL;
1952        const int         newCount = tr_peerMgrGetPeers(
1953            msgs->session->peerMgr, msgs->torrent->info.hash, &newPex );
1954        PexDiffs          diffs;
1955        tr_benc           val;
1956        uint8_t *         tmp, *walk;
1957        char *            benc;
1958        int               bencLen;
1959        struct evbuffer * out = msgs->outMessages;
1960
1961        /* build the diffs */
1962        diffs.added = tr_new( tr_pex, newCount );
1963        diffs.addedCount = 0;
1964        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1965        diffs.droppedCount = 0;
1966        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1967        diffs.elementCount = 0;
1968        tr_set_compare( msgs->pex, msgs->pexCount,
1969                        newPex, newCount,
1970                        tr_pexCompare, sizeof( tr_pex ),
1971                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
1972        dbgmsg(
1973            msgs,
1974            "pex: old peer count %d, new peer count %d, added %d, removed %d",
1975            msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1976
1977        /* update peer */
1978        tr_free( msgs->pex );
1979        msgs->pex = diffs.elements;
1980        msgs->pexCount = diffs.elementCount;
1981
1982        /* build the pex payload */
1983        tr_bencInitDict( &val, 3 );
1984
1985        /* "added" */
1986        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1987        for( i = 0; i < diffs.addedCount; ++i )
1988        {
1989            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1990            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1991        }
1992        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1993        tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
1994        tr_free( tmp );
1995
1996        /* "added.f" */
1997        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1998        for( i = 0; i < diffs.addedCount; ++i )
1999            *walk++ = diffs.added[i].flags;
2000        assert( ( walk - tmp ) == diffs.addedCount );
2001        tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2002        tr_free( tmp );
2003
2004        /* "dropped" */
2005        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2006        for( i = 0; i < diffs.droppedCount; ++i )
2007        {
2008            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
2009            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2010        }
2011        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2012        tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2013        tr_free( tmp );
2014
2015        /* write the pex message */
2016        benc = tr_bencSave( &val, &bencLen );
2017        tr_peerIoWriteUint32( msgs->io, out,
2018                              2 * sizeof( uint8_t ) + bencLen );
2019        tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
2020        tr_peerIoWriteUint8 ( msgs->io, out, msgs->ut_pex_id );
2021        tr_peerIoWriteBytes ( msgs->io, out, benc, bencLen );
2022        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
2023        dbgmsg( msgs, "outMessage size is now %d",
2024               (int)EVBUFFER_LENGTH( out ) );
2025
2026        /* cleanup */
2027        tr_free( benc );
2028        tr_bencFree( &val );
2029        tr_free( diffs.added );
2030        tr_free( diffs.dropped );
2031        tr_free( newPex );
2032
2033        msgs->clientSentPexAt = time( NULL );
2034    }
2035}
2036
2037static int
2038pexPulse( void * vpeer )
2039{
2040    sendPex( vpeer );
2041    return TRUE;
2042}
2043
2044/**
2045***
2046**/
2047
2048tr_peermsgs*
2049tr_peerMsgsNew( struct tr_torrent * torrent,
2050                struct tr_peer *    info,
2051                tr_delivery_func    func,
2052                void *              userData,
2053                tr_publisher_tag *  setme )
2054{
2055    tr_peermsgs * m;
2056
2057    assert( info );
2058    assert( info->io );
2059
2060    m = tr_new0( tr_peermsgs, 1 );
2061    m->publisher = tr_publisherNew( );
2062    m->info = info;
2063    m->session = torrent->session;
2064    m->torrent = torrent;
2065    m->io = info->io;
2066    m->info->clientIsChoked = 1;
2067    m->info->peerIsChoked = 1;
2068    m->info->clientIsInterested = 0;
2069    m->info->peerIsInterested = 0;
2070    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
2071    m->state = AWAITING_BT_LENGTH;
2072    m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL );
2073    m->outMessages = evbuffer_new( );
2074    m->outMessagesBatchedAt = 0;
2075    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2076    m->incoming.block = evbuffer_new( );
2077    m->outBlock = evbuffer_new( );
2078    m->peerAllowedPieces = NULL;
2079    m->peerAskedFor = REQUEST_LIST_INIT;
2080    m->peerAskedForFast = REQUEST_LIST_INIT;
2081    m->clientAskedFor = REQUEST_LIST_INIT;
2082    m->clientWillAskFor = REQUEST_LIST_INIT;
2083    *setme = tr_publisherSubscribe( m->publisher, func, userData );
2084
2085    if( tr_peerIoSupportsLTEP( m->io ) )
2086        sendLtepHandshake( m );
2087
2088    sendBitfield( m );
2089
2090    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of
2091                                             inactivity */
2092    tr_peerIoSetIOFuncs( m->io, canRead, NULL, gotError, m );
2093    ratePulse( m );
2094
2095    return m;
2096}
2097
2098void
2099tr_peerMsgsFree( tr_peermsgs* msgs )
2100{
2101    if( msgs )
2102    {
2103        tr_timerFree( &msgs->pexTimer );
2104        tr_publisherFree( &msgs->publisher );
2105        reqListClear( &msgs->clientWillAskFor );
2106        reqListClear( &msgs->clientAskedFor );
2107        reqListClear( &msgs->peerAskedForFast );
2108        reqListClear( &msgs->peerAskedFor );
2109        tr_bitfieldFree( msgs->peerAllowedPieces );
2110        evbuffer_free( msgs->incoming.block );
2111        evbuffer_free( msgs->outMessages );
2112        evbuffer_free( msgs->outBlock );
2113        tr_free( msgs->pex );
2114
2115        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2116        tr_free( msgs );
2117    }
2118}
2119
2120void
2121tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2122                        tr_publisher_tag tag )
2123{
2124    tr_publisherUnsubscribe( peer->publisher, tag );
2125}
2126
Note: See TracBrowser for help on using the repository browser.