source: trunk/libtransmission/peer-msgs.c @ 7125

Last change on this file since 7125 was 7125, checked in by charles, 12 years ago

(libT) better possible fix for #1468: Speed display is very jumpy

  • Property svn:keywords set to Date Rev Author Id
File size: 59.4 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 7125 2008-11-17 04:00:57Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <event.h>
21
22#include "transmission.h"
23#include "bencode.h"
24#include "completion.h"
25#include "crypto.h"
26#include "inout.h"
27#ifdef WIN32
28#include "net.h" /* for ECONN */
29#endif
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-mgr-private.h"
33#include "peer-msgs.h"
34#include "ratecontrol.h"
35#include "stats.h"
36#include "torrent.h"
37#include "trevent.h"
38#include "utils.h"
39
40/**
41***
42**/
43
44enum
45{
46    BT_CHOKE                = 0,
47    BT_UNCHOKE              = 1,
48    BT_INTERESTED           = 2,
49    BT_NOT_INTERESTED       = 3,
50    BT_HAVE                 = 4,
51    BT_BITFIELD             = 5,
52    BT_REQUEST              = 6,
53    BT_PIECE                = 7,
54    BT_CANCEL               = 8,
55    BT_PORT                 = 9,
56    BT_SUGGEST              = 13,
57    BT_HAVE_ALL             = 14,
58    BT_HAVE_NONE            = 15,
59    BT_REJECT               = 16,
60    BT_ALLOWED_FAST         = 17,
61    BT_LTEP                 = 20,
62
63    LTEP_HANDSHAKE          = 0,
64
65    TR_LTEP_PEX             = 1,
66
67    MIN_CHOKE_PERIOD_SEC    = ( 10 ),
68
69    /* idle seconds before we send a keepalive */
70    KEEPALIVE_INTERVAL_SECS = 100,
71
72    PEX_INTERVAL            = ( 90 * 1000 ), /* msec between sendPex() calls */
73    PEER_PULSE_INTERVAL     = ( 250 ),       /* msec between peerPulse() calls
74                                               */
75
76    MAX_QUEUE_SIZE          = ( 100 ),
77
78    /* (fast peers) max number of pieces we fast-allow to another peer */
79    MAX_FAST_ALLOWED_COUNT   = 10,
80
81    /* (fast peers) max threshold for allowing fast-pieces requests */
82    MAX_FAST_ALLOWED_THRESHOLD = 10,
83
84    /* how long an unsent request can stay queued before it's returned
85       back to the peer-mgr's pool of requests */
86    QUEUED_REQUEST_TTL_SECS = 20,
87
88    /* how long a sent request can stay queued before it's returned
89       back to the peer-mgr's pool of requests */
90    SENT_REQUEST_TTL_SECS = 240,
91
92    /* used in lowering the outMessages queue period */
93    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
94    HIGH_PRIORITY_INTERVAL_SECS = 2,
95    LOW_PRIORITY_INTERVAL_SECS = 20,
96
97    /* number of pieces to remove from the bitfield when
98     * lazy bitfields are turned on */
99    LAZY_PIECE_COUNT = 26
100};
101
102/**
103***  REQUEST MANAGEMENT
104**/
105
106enum
107{
108    AWAITING_BT_LENGTH,
109    AWAITING_BT_ID,
110    AWAITING_BT_MESSAGE,
111    AWAITING_BT_PIECE
112};
113
114struct peer_request
115{
116    uint32_t    index;
117    uint32_t    offset;
118    uint32_t    length;
119    time_t      time_requested;
120};
121
122static int
123compareRequest( const void * va,
124                const void * vb )
125{
126    const struct peer_request * a = va;
127    const struct peer_request * b = vb;
128
129    if( a->index != b->index )
130        return a->index < b->index ? -1 : 1;
131
132    if( a->offset != b->offset )
133        return a->offset < b->offset ? -1 : 1;
134
135    if( a->length != b->length )
136        return a->length < b->length ? -1 : 1;
137
138    return 0;
139}
140
141struct request_list
142{
143    uint16_t               count;
144    uint16_t               max;
145    struct peer_request *  requests;
146};
147
148static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
149
150static void
151reqListReserve( struct request_list * list,
152                uint16_t              max )
153{
154    if( list->max < max )
155    {
156        list->max = max;
157        list->requests = tr_renew( struct peer_request,
158                                   list->requests,
159                                   list->max );
160    }
161}
162
163static void
164reqListClear( struct request_list * list )
165{
166    tr_free( list->requests );
167    *list = REQUEST_LIST_INIT;
168}
169
170static void
171reqListCopy( struct request_list *       dest,
172             const struct request_list * src )
173{
174    dest->count = dest->max = src->count;
175    dest->requests =
176        tr_memdup( src->requests, dest->count * sizeof( struct peer_request ) );
177}
178
179static void
180reqListRemoveOne( struct request_list * list,
181                  int                   i )
182{
183    assert( 0 <= i && i < list->count );
184
185    memmove( &list->requests[i],
186            &list->requests[i + 1],
187            sizeof( struct peer_request ) * ( --list->count - i ) );
188}
189
190static void
191reqListAppend( struct request_list *       list,
192               const struct peer_request * req )
193{
194    if( ++list->count >= list->max )
195        reqListReserve( list, list->max + 8 );
196
197    list->requests[list->count - 1] = *req;
198}
199
200static int
201reqListPop( struct request_list * list,
202            struct peer_request * setme )
203{
204    int success;
205
206    if( !list->count )
207        success = FALSE;
208    else {
209        *setme = list->requests[0];
210        reqListRemoveOne( list, 0 );
211        success = TRUE;
212    }
213
214    return success;
215}
216
217static int
218reqListFind( struct request_list *       list,
219             const struct peer_request * key )
220{
221    uint16_t i;
222
223    for( i = 0; i < list->count; ++i )
224        if( !compareRequest( key, list->requests + i ) )
225            return i;
226
227    return -1;
228}
229
230static int
231reqListRemove( struct request_list *       list,
232               const struct peer_request * key )
233{
234    int success;
235    const int i = reqListFind( list, key );
236
237    if( i < 0 )
238        success = FALSE;
239    else {
240        reqListRemoveOne( list, i );
241        success = TRUE;
242    }
243
244    return success;
245}
246
247/**
248***
249**/
250
251/* this is raw, unchanged data from the peer regarding
252 * the current message that it's sending us. */
253struct tr_incoming
254{
255    uint8_t                id;
256    uint32_t               length; /* includes the +1 for id length */
257    struct peer_request    blockReq; /* metadata for incoming blocks */
258    struct evbuffer *      block; /* piece data for incoming blocks */
259};
260
261struct tr_peermsgs
262{
263    unsigned int    peerSentBitfield        : 1;
264    unsigned int    peerSupportsPex         : 1;
265    unsigned int    clientSentLtepHandshake : 1;
266    unsigned int    peerSentLtepHandshake   : 1;
267    unsigned int    sendingBlock            : 1;
268
269    uint8_t         state;
270    uint8_t         ut_pex_id;
271    uint16_t        pexCount;
272    uint16_t        minActiveRequests;
273    uint16_t        maxActiveRequests;
274
275    /* how long the outMessages batch should be allowed to grow before
276     * it's flushed -- some messages (like requests >:) should be sent
277     * very quickly; others aren't as urgent. */
278    int                    outMessagesBatchPeriod;
279
280    tr_peer *              info;
281
282    tr_session *           session;
283    tr_torrent *           torrent;
284    tr_peerIo *            io;
285
286    tr_publisher_t *       publisher;
287
288    struct evbuffer *      outBlock; /* buffer of the current piece message */
289    struct evbuffer *      outMessages; /* all the non-piece messages */
290
291    struct request_list    peerAskedFor;
292    struct request_list    peerAskedForFast;
293    struct request_list    clientAskedFor;
294    struct request_list    clientWillAskFor;
295
296    tr_timer *             pexTimer;
297
298    time_t                 clientSentPexAt;
299    time_t                 clientSentAnythingAt;
300
301    /* when we started batching the outMessages */
302    time_t                outMessagesBatchedAt;
303
304    tr_bitfield *         peerAllowedPieces;
305
306    struct tr_incoming    incoming;
307
308    tr_pex *              pex;
309};
310
311/**
312***
313**/
314
315static void
316myDebug( const char *               file,
317         int                        line,
318         const struct tr_peermsgs * msgs,
319         const char *               fmt,
320         ... )
321{
322    FILE * fp = tr_getLog( );
323
324    if( fp )
325    {
326        va_list           args;
327        char              timestr[64];
328        struct evbuffer * buf = evbuffer_new( );
329        char *            base = tr_basename( file );
330
331        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
332                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
333                             msgs->torrent->info.name,
334                             tr_peerIoGetAddrStr( msgs->io ),
335                             msgs->info->client );
336        va_start( args, fmt );
337        evbuffer_add_vprintf( buf, fmt, args );
338        va_end( args );
339        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
340        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
341
342        tr_free( base );
343        evbuffer_free( buf );
344    }
345}
346
347#define dbgmsg( msgs, ... ) myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ )
348
349/**
350***
351**/
352
353static void
354pokeBatchPeriod( tr_peermsgs * msgs,
355                 int           interval )
356{
357    if( msgs->outMessagesBatchPeriod > interval )
358    {
359        msgs->outMessagesBatchPeriod = interval;
360        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
361    }
362}
363
364static void
365protocolSendRequest( tr_peermsgs *               msgs,
366                     const struct peer_request * req )
367{
368    tr_peerIo *       io = msgs->io;
369    struct evbuffer * out = msgs->outMessages;
370
371    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
372    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
373    tr_peerIoWriteUint32( io, out, req->index );
374    tr_peerIoWriteUint32( io, out, req->offset );
375    tr_peerIoWriteUint32( io, out, req->length );
376    dbgmsg( msgs, "requesting %u:%u->%u... outMessage size is now %d",
377           req->index, req->offset, req->length, (int)EVBUFFER_LENGTH( out ) );
378    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
379}
380
381static void
382protocolSendCancel( tr_peermsgs *               msgs,
383                    const struct peer_request * req )
384{
385    tr_peerIo *       io = msgs->io;
386    struct evbuffer * out = msgs->outMessages;
387
388    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
389    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
390    tr_peerIoWriteUint32( io, out, req->index );
391    tr_peerIoWriteUint32( io, out, req->offset );
392    tr_peerIoWriteUint32( io, out, req->length );
393    dbgmsg( msgs, "cancelling %u:%u->%u... outMessage size is now %d",
394           req->index, req->offset, req->length, (int)EVBUFFER_LENGTH( out ) );
395    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
396}
397
398static void
399protocolSendHave( tr_peermsgs * msgs,
400                  uint32_t      index )
401{
402    tr_peerIo *       io = msgs->io;
403    struct evbuffer * out = msgs->outMessages;
404
405    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + sizeof( uint32_t ) );
406    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
407    tr_peerIoWriteUint32( io, out, index );
408    dbgmsg( msgs, "sending Have %u.. outMessage size is now %d",
409           index, (int)EVBUFFER_LENGTH( out ) );
410    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
411}
412
413static void
414protocolSendChoke( tr_peermsgs * msgs,
415                   int           choke )
416{
417    tr_peerIo *       io = msgs->io;
418    struct evbuffer * out = msgs->outMessages;
419
420    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
421    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
422    dbgmsg( msgs, "sending %s... outMessage size is now %d",
423           ( choke ? "Choke" : "Unchoke" ),
424           (int)EVBUFFER_LENGTH( out ) );
425    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
426}
427
428/**
429***  EVENTS
430**/
431
432static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0 };
433
434static void
435publish( tr_peermsgs *   msgs,
436         tr_peer_event * e )
437{
438    tr_publisherPublish( msgs->publisher, msgs->info, e );
439}
440
441static void
442fireError( tr_peermsgs * msgs,
443           int           err )
444{
445    tr_peer_event e = blankEvent;
446
447    e.eventType = TR_PEER_ERROR;
448    e.err = err;
449    publish( msgs, &e );
450}
451
452static void
453fireNeedReq( tr_peermsgs * msgs )
454{
455    tr_peer_event e = blankEvent;
456
457    e.eventType = TR_PEER_NEED_REQ;
458    publish( msgs, &e );
459}
460
461static void
462firePeerProgress( tr_peermsgs * msgs )
463{
464    tr_peer_event e = blankEvent;
465
466    e.eventType = TR_PEER_PEER_PROGRESS;
467    e.progress = msgs->info->progress;
468    publish( msgs, &e );
469}
470
471static void
472fireGotBlock( tr_peermsgs *               msgs,
473              const struct peer_request * req )
474{
475    tr_peer_event e = blankEvent;
476
477    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
478    e.pieceIndex = req->index;
479    e.offset = req->offset;
480    e.length = req->length;
481    publish( msgs, &e );
482}
483
484static void
485fireClientGotData( tr_peermsgs * msgs,
486                   uint32_t      length,
487                   int           wasPieceData )
488{
489    tr_peer_event e = blankEvent;
490
491    e.length = length;
492    e.eventType = TR_PEER_CLIENT_GOT_DATA;
493    e.wasPieceData = wasPieceData;
494    publish( msgs, &e );
495}
496
497static void
498firePeerGotData( tr_peermsgs  * msgs,
499                 uint32_t       length,
500                 int            wasPieceData )
501{
502    tr_peer_event e = blankEvent;
503
504    e.length = length;
505    e.eventType = TR_PEER_PEER_GOT_DATA;
506    e.wasPieceData = wasPieceData;
507
508    publish( msgs, &e );
509}
510
511static void
512fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
513{
514    tr_peer_event e = blankEvent;
515    e.eventType = TR_PEER_CANCEL;
516    e.pieceIndex = req->index;
517    e.offset = req->offset;
518    e.length = req->length;
519    publish( msgs, &e );
520}
521
522/**
523***  INTEREST
524**/
525
526static int
527isPieceInteresting( const tr_peermsgs * peer,
528                    tr_piece_index_t    piece )
529{
530    const tr_torrent * torrent = peer->torrent;
531
532    return ( !torrent->info.pieces[piece].dnd )                 /* we want it */
533           && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have
534                                                                        */
535           && ( tr_bitfieldHas( peer->info->have, piece ) );   /* peer has it */
536}
537
538/* "interested" means we'll ask for piece data if they unchoke us */
539static int
540isPeerInteresting( const tr_peermsgs * msgs )
541{
542    tr_piece_index_t    i;
543    const tr_torrent *  torrent;
544    const tr_bitfield * bitfield;
545    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
546
547    if( clientIsSeed )
548        return FALSE;
549
550    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
551        return FALSE;
552
553    torrent = msgs->torrent;
554    bitfield = tr_cpPieceBitfield( torrent->completion );
555
556    if( !msgs->info->have )
557        return TRUE;
558
559    assert( bitfield->byteCount == msgs->info->have->byteCount );
560
561    for( i = 0; i < torrent->info.pieceCount; ++i )
562        if( isPieceInteresting( msgs, i ) )
563            return TRUE;
564
565    return FALSE;
566}
567
568static void
569sendInterest( tr_peermsgs * msgs,
570              int           weAreInterested )
571{
572    struct evbuffer * out = msgs->outMessages;
573
574    assert( msgs );
575    assert( weAreInterested == 0 || weAreInterested == 1 );
576
577    msgs->info->clientIsInterested = weAreInterested;
578    dbgmsg( msgs, "Sending %s",
579            weAreInterested ? "Interested" : "Not Interested" );
580    tr_peerIoWriteUint32( msgs->io, out, sizeof( uint8_t ) );
581    tr_peerIoWriteUint8 (
582        msgs->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
583    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
584    dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH( out ) );
585}
586
587static void
588updateInterest( tr_peermsgs * msgs )
589{
590    const int i = isPeerInteresting( msgs );
591
592    if( i != msgs->info->clientIsInterested )
593        sendInterest( msgs, i );
594    if( i )
595        fireNeedReq( msgs );
596}
597
598static void
599cancelAllRequestsToClientExceptFast( tr_peermsgs * msgs )
600{
601    reqListClear( &msgs->peerAskedFor );
602}
603
604void
605tr_peerMsgsSetChoke( tr_peermsgs * msgs,
606                     int           choke )
607{
608    const time_t now = time( NULL );
609    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
610
611    assert( msgs );
612    assert( msgs->info );
613    assert( choke == 0 || choke == 1 );
614
615    if( msgs->info->chokeChangedAt > fibrillationTime )
616    {
617        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation",
618                choke );
619    }
620    else if( msgs->info->peerIsChoked != choke )
621    {
622        msgs->info->peerIsChoked = choke;
623        if( choke )
624            cancelAllRequestsToClientExceptFast( msgs );
625        protocolSendChoke( msgs, choke );
626        msgs->info->chokeChangedAt = now;
627    }
628}
629
630/**
631***
632**/
633
634void
635tr_peerMsgsHave( tr_peermsgs * msgs,
636                 uint32_t      index )
637{
638    protocolSendHave( msgs, index );
639
640    /* since we have more pieces now, we might not be interested in this peer */
641    updateInterest( msgs );
642}
643
644#if 0
645static void
646sendFastSuggest( tr_peermsgs * msgs,
647                 uint32_t      pieceIndex )
648{
649    assert( msgs );
650
651    if( tr_peerIoSupportsFEXT( msgs->io ) )
652    {
653        tr_peerIoWriteUint32( msgs->io, msgs->outMessages,
654                             sizeof( uint8_t ) + sizeof( uint32_t ) );
655        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
656        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
657    }
658}
659
660static void
661sendFastHave( tr_peermsgs * msgs,
662              int           all )
663{
664    assert( msgs );
665
666    if( tr_peerIoSupportsFEXT( msgs->io ) )
667    {
668        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof( uint8_t ) );
669        tr_peerIoWriteUint8( msgs->io, msgs->outMessages,
670                            ( all ? BT_HAVE_ALL
671                              : BT_HAVE_NONE ) );
672        updateInterest( msgs );
673    }
674}
675
676#endif
677
678static void
679sendFastReject( tr_peermsgs * msgs,
680                uint32_t      pieceIndex,
681                uint32_t      offset,
682                uint32_t      length )
683{
684    assert( msgs );
685
686    if( tr_peerIoSupportsFEXT( msgs->io ) )
687    {
688        struct evbuffer * out = msgs->outMessages;
689        const uint32_t    len = sizeof( uint8_t ) + 3 * sizeof( uint32_t );
690        dbgmsg( msgs, "sending fast reject %u:%u->%u", pieceIndex, offset,
691                length );
692        tr_peerIoWriteUint32( msgs->io, out, len );
693        tr_peerIoWriteUint8( msgs->io, out, BT_REJECT );
694        tr_peerIoWriteUint32( msgs->io, out, pieceIndex );
695        tr_peerIoWriteUint32( msgs->io, out, offset );
696        tr_peerIoWriteUint32( msgs->io, out, length );
697        pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
698        dbgmsg( msgs, "outMessage size is now %d",
699               (int)EVBUFFER_LENGTH( out ) );
700    }
701}
702
703static tr_bitfield*
704getPeerAllowedPieces( tr_peermsgs * msgs )
705{
706    if( !msgs->peerAllowedPieces && tr_peerIoSupportsFEXT( msgs->io ) )
707    {
708        msgs->peerAllowedPieces = tr_peerMgrGenerateAllowedSet(
709            MAX_FAST_ALLOWED_COUNT,
710            msgs->torrent->info.pieceCount,
711            msgs->torrent->info.hash,
712            tr_peerIoGetAddress( msgs->io, NULL ) );
713    }
714
715    return msgs->peerAllowedPieces;
716}
717
718static void
719sendFastAllowed( tr_peermsgs * msgs,
720                 uint32_t      pieceIndex )
721{
722    assert( msgs );
723
724    if( tr_peerIoSupportsFEXT( msgs->io ) )
725    {
726        struct evbuffer * out = msgs->outMessages;
727        dbgmsg( msgs, "sending fast allowed" );
728        tr_peerIoWriteUint32( msgs->io, out,  sizeof( uint8_t ) +
729                             sizeof( uint32_t ) );
730        tr_peerIoWriteUint8( msgs->io, out, BT_ALLOWED_FAST );
731        tr_peerIoWriteUint32( msgs->io, out, pieceIndex );
732        pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
733        dbgmsg( msgs, "outMessage size is now %d",
734               (int)EVBUFFER_LENGTH( out ) );
735    }
736}
737
738static void
739sendFastAllowedSet( tr_peermsgs * msgs )
740{
741    tr_piece_index_t i = 0;
742
743    while( i <= msgs->torrent->info.pieceCount )
744    {
745        if( tr_bitfieldHas( getPeerAllowedPieces( msgs ), i ) )
746            sendFastAllowed( msgs, i );
747        i++;
748    }
749}
750
751static void
752maybeSendFastAllowedSet( tr_peermsgs * msgs )
753{
754    if( tr_bitfieldCountTrueBits( msgs->info->have ) <=
755        MAX_FAST_ALLOWED_THRESHOLD )
756        sendFastAllowedSet( msgs );
757}
758
759/**
760***
761**/
762
763static int
764reqIsValid( const tr_peermsgs * peer,
765            uint32_t            index,
766            uint32_t            offset,
767            uint32_t            length )
768{
769    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
770}
771
772static int
773requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
774{
775    return reqIsValid( msgs, req->index, req->offset, req->length );
776}
777
778static void
779expireOldRequests( tr_peermsgs * msgs, const time_t now  )
780{
781    int                 i;
782    time_t              oldestAllowed;
783    struct request_list tmp = REQUEST_LIST_INIT;
784
785    /* cancel requests that have been queued for too long */
786    oldestAllowed = now - QUEUED_REQUEST_TTL_SECS;
787    reqListCopy( &tmp, &msgs->clientWillAskFor );
788    for( i = 0; i < tmp.count; ++i )
789    {
790        const struct peer_request * req = &tmp.requests[i];
791        if( req->time_requested < oldestAllowed )
792            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
793    }
794    reqListClear( &tmp );
795
796    /* cancel requests that were sent too long ago */
797    oldestAllowed = now - SENT_REQUEST_TTL_SECS;
798    reqListCopy( &tmp, &msgs->clientAskedFor );
799    for( i = 0; i < tmp.count; ++i )
800    {
801        const struct peer_request * req = &tmp.requests[i];
802        if( req->time_requested < oldestAllowed )
803            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
804    }
805    reqListClear( &tmp );
806}
807
808static void
809pumpRequestQueue( tr_peermsgs * msgs, const time_t now )
810{
811    const int           max = msgs->maxActiveRequests;
812    const int           min = msgs->minActiveRequests;
813    int                 sent = 0;
814    int                 count = msgs->clientAskedFor.count;
815    struct peer_request req;
816
817    if( count > min )
818        return;
819    if( msgs->info->clientIsChoked )
820        return;
821    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
822        return;
823
824    while( ( count < max ) && reqListPop( &msgs->clientWillAskFor, &req ) )
825    {
826        const tr_block_index_t block =
827            _tr_block( msgs->torrent, req.index, req.offset );
828
829        assert( requestIsValid( msgs, &req ) );
830        assert( tr_bitfieldHas( msgs->info->have, req.index ) );
831
832        /* don't ask for it if we've already got it... this block may have
833         * come in from a different peer after we cancelled a request for it */
834        if( !tr_cpBlockIsComplete( msgs->torrent->completion, block ) )
835        {
836            protocolSendRequest( msgs, &req );
837            req.time_requested = now;
838            reqListAppend( &msgs->clientAskedFor, &req );
839
840            ++count;
841            ++sent;
842        }
843    }
844
845    if( sent )
846        dbgmsg( msgs,
847                "pump sent %d requests, now have %d active and %d queued",
848                sent,
849                msgs->clientAskedFor.count,
850                msgs->clientWillAskFor.count );
851
852    if( count < max )
853        fireNeedReq( msgs );
854}
855
856static int
857requestQueueIsFull( const tr_peermsgs * msgs )
858{
859    const int req_max = msgs->maxActiveRequests;
860    return msgs->clientWillAskFor.count >= req_max;
861}
862
863tr_addreq_t
864tr_peerMsgsAddRequest( tr_peermsgs *    msgs,
865                       uint32_t         index,
866                       uint32_t         offset,
867                       uint32_t         length )
868{
869    struct peer_request req;
870
871    assert( msgs );
872    assert( msgs->torrent );
873    assert( reqIsValid( msgs, index, offset, length ) );
874
875    /**
876    ***  Reasons to decline the request
877    **/
878
879    /* don't send requests to choked clients */
880    if( msgs->info->clientIsChoked )
881    {
882        dbgmsg( msgs, "declining request because they're choking us" );
883        return TR_ADDREQ_CLIENT_CHOKED;
884    }
885
886    /* peer doesn't have this piece */
887    if( !tr_bitfieldHas( msgs->info->have, index ) )
888        return TR_ADDREQ_MISSING;
889
890    /* peer's queue is full */
891    if( requestQueueIsFull( msgs ) ) {
892        dbgmsg( msgs, "declining request because we're full" );
893        return TR_ADDREQ_FULL;
894    }
895
896    /* have we already asked for this piece? */
897    req.index = index;
898    req.offset = offset;
899    req.length = length;
900    if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
901        dbgmsg( msgs, "declining because it's a duplicate" );
902        return TR_ADDREQ_DUPLICATE;
903    }
904    if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
905        dbgmsg( msgs, "declining because it's a duplicate" );
906        return TR_ADDREQ_DUPLICATE;
907    }
908
909    /**
910    ***  Accept this request
911    **/
912
913    dbgmsg( msgs, "added req for piece %lu", (unsigned long)index );
914    req.time_requested = time( NULL );
915    reqListAppend( &msgs->clientWillAskFor, &req );
916    return TR_ADDREQ_OK;
917}
918
919static void
920cancelAllRequestsToPeer( tr_peermsgs * msgs )
921{
922    int                 i;
923    struct request_list a = msgs->clientWillAskFor;
924    struct request_list b = msgs->clientAskedFor;
925
926    msgs->clientAskedFor = REQUEST_LIST_INIT;
927    msgs->clientWillAskFor = REQUEST_LIST_INIT;
928
929    for( i=0; i<a.count; ++i )
930        fireCancelledReq( msgs, &a.requests[i] );
931
932    for( i = 0; i < b.count; ++i ) {
933        fireCancelledReq( msgs, &b.requests[i] );
934        protocolSendCancel( msgs, &b.requests[i] );
935    }
936
937    reqListClear( &a );
938    reqListClear( &b );
939}
940
941void
942tr_peerMsgsCancel( tr_peermsgs * msgs,
943                   uint32_t      pieceIndex,
944                   uint32_t      offset,
945                   uint32_t      length )
946{
947    struct peer_request req;
948
949    assert( msgs != NULL );
950    assert( length > 0 );
951
952    /* have we asked the peer for this piece? */
953    req.index = pieceIndex;
954    req.offset = offset;
955    req.length = length;
956
957    /* if it's only in the queue and hasn't been sent yet, free it */
958    if( reqListRemove( &msgs->clientWillAskFor, &req ) )
959        fireCancelledReq( msgs, &req );
960
961    /* if it's already been sent, send a cancel message too */
962    if( reqListRemove( &msgs->clientAskedFor, &req ) ) {
963        protocolSendCancel( msgs, &req );
964        fireCancelledReq( msgs, &req );
965    }
966}
967
968/**
969***
970**/
971
972static void
973sendLtepHandshake( tr_peermsgs * msgs )
974{
975    tr_benc           val, *m;
976    char *            buf;
977    int               len;
978    int               pex;
979    struct evbuffer * out = msgs->outMessages;
980
981    if( msgs->clientSentLtepHandshake )
982        return;
983
984    dbgmsg( msgs, "sending an ltep handshake" );
985    msgs->clientSentLtepHandshake = 1;
986
987    /* decide if we want to advertise pex support */
988    if( !tr_torrentAllowsPex( msgs->torrent ) )
989        pex = 0;
990    else if( msgs->peerSentLtepHandshake )
991        pex = msgs->peerSupportsPex ? 1 : 0;
992    else
993        pex = 1;
994
995    tr_bencInitDict( &val, 4 );
996    tr_bencDictAddInt( &val, "e",
997                       msgs->session->encryptionMode != TR_CLEAR_PREFERRED );
998    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->session ) );
999    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
1000    m  = tr_bencDictAddDict( &val, "m", 1 );
1001    if( pex )
1002        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
1003    buf = tr_bencSave( &val, &len );
1004
1005    tr_peerIoWriteUint32( msgs->io, out, 2 * sizeof( uint8_t ) + len );
1006    tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
1007    tr_peerIoWriteUint8 ( msgs->io, out, LTEP_HANDSHAKE );
1008    tr_peerIoWriteBytes ( msgs->io, out, buf, len );
1009    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1010    dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH( out ) );
1011
1012    /* cleanup */
1013    tr_bencFree( &val );
1014    tr_free( buf );
1015}
1016
1017static void
1018parseLtepHandshake( tr_peermsgs *     msgs,
1019                    int               len,
1020                    struct evbuffer * inbuf )
1021{
1022    int64_t   i;
1023    tr_benc   val, * sub;
1024    uint8_t * tmp = tr_new( uint8_t, len );
1025
1026    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
1027    msgs->peerSentLtepHandshake = 1;
1028
1029    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type != TYPE_DICT )
1030    {
1031        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
1032        tr_free( tmp );
1033        return;
1034    }
1035
1036    dbgmsg( msgs, "here is the ltep handshake we got [%*.*s]", len, len,
1037            tmp );
1038
1039    /* does the peer prefer encrypted connections? */
1040    if( tr_bencDictFindInt( &val, "e", &i ) )
1041        msgs->info->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1042                                            : ENCRYPTION_PREFERENCE_NO;
1043
1044    /* check supported messages for utorrent pex */
1045    msgs->peerSupportsPex = 0;
1046    if( tr_bencDictFindDict( &val, "m", &sub ) )
1047    {
1048        if( tr_bencDictFindInt( sub, "ut_pex", &i ) )
1049        {
1050            msgs->ut_pex_id = (uint8_t) i;
1051            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
1052            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
1053        }
1054    }
1055
1056    /* get peer's listening port */
1057    if( tr_bencDictFindInt( &val, "p", &i ) )
1058    {
1059        msgs->info->port = htons( (uint16_t)i );
1060        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
1061    }
1062
1063    tr_bencFree( &val );
1064    tr_free( tmp );
1065}
1066
1067static void
1068parseUtPex( tr_peermsgs *     msgs,
1069            int               msglen,
1070            struct evbuffer * inbuf )
1071{
1072    int                loaded = 0;
1073    uint8_t *          tmp = tr_new( uint8_t, msglen );
1074    tr_benc            val;
1075    const tr_torrent * tor = msgs->torrent;
1076    const uint8_t *    added;
1077    size_t             added_len;
1078
1079    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
1080
1081    if( tr_torrentAllowsPex( tor )
1082      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) )
1083      && tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1084    {
1085        const uint8_t * added_f = NULL;
1086        tr_pex *        pex;
1087        size_t          i, n;
1088        size_t          added_f_len = 0;
1089        tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1090        pex =
1091            tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len,
1092                                    &n );
1093        for( i = 0; i < n; ++i )
1094            tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1095                              TR_PEER_FROM_PEX, pex + i );
1096        tr_free( pex );
1097    }
1098
1099    if( loaded )
1100        tr_bencFree( &val );
1101    tr_free( tmp );
1102}
1103
1104static void sendPex( tr_peermsgs * msgs );
1105
1106static void
1107parseLtep( tr_peermsgs *     msgs,
1108           int               msglen,
1109           struct evbuffer * inbuf )
1110{
1111    uint8_t ltep_msgid;
1112
1113    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
1114    msglen--;
1115
1116    if( ltep_msgid == LTEP_HANDSHAKE )
1117    {
1118        dbgmsg( msgs, "got ltep handshake" );
1119        parseLtepHandshake( msgs, msglen, inbuf );
1120        if( tr_peerIoSupportsLTEP( msgs->io ) )
1121        {
1122            sendLtepHandshake( msgs );
1123            sendPex( msgs );
1124        }
1125    }
1126    else if( ltep_msgid == TR_LTEP_PEX )
1127    {
1128        dbgmsg( msgs, "got ut pex" );
1129        msgs->peerSupportsPex = 1;
1130        parseUtPex( msgs, msglen, inbuf );
1131    }
1132    else
1133    {
1134        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1135        evbuffer_drain( inbuf, msglen );
1136    }
1137}
1138
1139static int
1140readBtLength( tr_peermsgs *     msgs,
1141              struct evbuffer * inbuf,
1142              size_t            inlen )
1143{
1144    uint32_t len;
1145
1146    if( inlen < sizeof( len ) )
1147        return READ_LATER;
1148
1149    tr_peerIoReadUint32( msgs->io, inbuf, &len );
1150
1151    if( len == 0 ) /* peer sent us a keepalive message */
1152        dbgmsg( msgs, "got KeepAlive" );
1153    else
1154    {
1155        msgs->incoming.length = len;
1156        msgs->state = AWAITING_BT_ID;
1157    }
1158
1159    return READ_NOW;
1160}
1161
1162static int readBtMessage( tr_peermsgs *     msgs,
1163                          struct evbuffer * inbuf,
1164                          size_t            inlen );
1165
1166static int
1167readBtId( tr_peermsgs *     msgs,
1168          struct evbuffer * inbuf,
1169          size_t            inlen )
1170{
1171    uint8_t id;
1172
1173    if( inlen < sizeof( uint8_t ) )
1174        return READ_LATER;
1175
1176    tr_peerIoReadUint8( msgs->io, inbuf, &id );
1177    msgs->incoming.id = id;
1178
1179    if( id == BT_PIECE )
1180    {
1181        msgs->state = AWAITING_BT_PIECE;
1182        return READ_NOW;
1183    }
1184    else if( msgs->incoming.length != 1 )
1185    {
1186        msgs->state = AWAITING_BT_MESSAGE;
1187        return READ_NOW;
1188    }
1189    else return readBtMessage( msgs, inbuf, inlen - 1 );
1190}
1191
1192static void
1193updatePeerProgress( tr_peermsgs * msgs )
1194{
1195    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have )
1196                           / (float)msgs->torrent->info.pieceCount;
1197    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
1198    updateInterest( msgs );
1199    firePeerProgress( msgs );
1200}
1201
1202static int
1203clientCanSendFastBlock( const tr_peermsgs * msgs UNUSED )
1204{
1205    /* don't send a fast piece if peer has MAX_FAST_ALLOWED_THRESHOLD pieces */
1206    if( tr_bitfieldCountTrueBits( msgs->info->have ) >
1207        MAX_FAST_ALLOWED_THRESHOLD )
1208        return FALSE;
1209
1210    /* ...or if we don't have ourself enough pieces */
1211    if( tr_bitfieldCountTrueBits( tr_cpPieceBitfield( msgs->torrent->
1212                                                      completion ) ) <
1213        MAX_FAST_ALLOWED_THRESHOLD )
1214        return FALSE;
1215
1216    /* Maybe a bandwidth limit ? */
1217    return TRUE;
1218}
1219
1220static void
1221peerMadeRequest( tr_peermsgs *               msgs,
1222                 const struct peer_request * req )
1223{
1224    const int reqIsValid = requestIsValid( msgs, req );
1225    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete(
1226        msgs->torrent->completion, req->index );
1227    const int peerIsChoked = msgs->info->peerIsChoked;
1228    const int peerIsFast = tr_peerIoSupportsFEXT( msgs->io );
1229    const int pieceIsFast = reqIsValid && tr_bitfieldHas(
1230        getPeerAllowedPieces( msgs ), req->index );
1231    const int canSendFast = clientCanSendFastBlock( msgs );
1232
1233    if( !reqIsValid ) /* bad request */
1234    {
1235        dbgmsg( msgs, "rejecting an invalid request." );
1236        sendFastReject( msgs, req->index, req->offset, req->length );
1237    }
1238    else if( !clientHasPiece ) /* we don't have it */
1239    {
1240        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1241        sendFastReject( msgs, req->index, req->offset, req->length );
1242    }
1243    else if( peerIsChoked && !peerIsFast ) /* doesn't he know he's choked? */
1244    {
1245        tr_peerMsgsSetChoke( msgs, 1 );
1246        sendFastReject( msgs, req->index, req->offset, req->length );
1247    }
1248    else if( peerIsChoked && peerIsFast && ( !pieceIsFast || !canSendFast ) )
1249    {
1250        sendFastReject( msgs, req->index, req->offset, req->length );
1251    }
1252    else /* YAY */
1253    {
1254        if( peerIsFast && pieceIsFast )
1255            reqListAppend( &msgs->peerAskedForFast, req );
1256        else
1257            reqListAppend( &msgs->peerAskedFor, req );
1258    }
1259}
1260
1261static int
1262messageLengthIsCorrect( const tr_peermsgs * msg,
1263                        uint8_t             id,
1264                        uint32_t            len )
1265{
1266    switch( id )
1267    {
1268        case BT_CHOKE:
1269        case BT_UNCHOKE:
1270        case BT_INTERESTED:
1271        case BT_NOT_INTERESTED:
1272        case BT_HAVE_ALL:
1273        case BT_HAVE_NONE:
1274            return len == 1;
1275
1276        case BT_HAVE:
1277        case BT_SUGGEST:
1278        case BT_ALLOWED_FAST:
1279            return len == 5;
1280
1281        case BT_BITFIELD:
1282            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1283
1284        case BT_REQUEST:
1285        case BT_CANCEL:
1286        case BT_REJECT:
1287            return len == 13;
1288
1289        case BT_PIECE:
1290            return len > 9 && len <= 16393;
1291
1292        case BT_PORT:
1293            return len == 3;
1294
1295        case BT_LTEP:
1296            return len >= 2;
1297
1298        default:
1299            return FALSE;
1300    }
1301}
1302
1303static int clientGotBlock( tr_peermsgs *               msgs,
1304                           const uint8_t *             block,
1305                           const struct peer_request * req );
1306
1307static int
1308readBtPiece( tr_peermsgs *     msgs,
1309             struct evbuffer * inbuf,
1310             size_t            inlen )
1311{
1312    struct peer_request * req = &msgs->incoming.blockReq;
1313
1314    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1315    dbgmsg( msgs, "In readBtPiece" );
1316
1317    if( !req->length )
1318    {
1319        if( inlen < 8 )
1320            return READ_LATER;
1321
1322        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
1323        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
1324        req->length = msgs->incoming.length - 9;
1325        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index,
1326                req->offset,
1327                req->length );
1328        return READ_NOW;
1329    }
1330    else
1331    {
1332        int          err;
1333
1334        /* read in another chunk of data */
1335        const size_t nLeft = req->length - EVBUFFER_LENGTH(
1336            msgs->incoming.block );
1337        size_t       n = MIN( nLeft, inlen );
1338        uint8_t *    buf = tr_new( uint8_t, n );
1339        assert( EVBUFFER_LENGTH( inbuf ) >= n );
1340        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
1341        evbuffer_add( msgs->incoming.block, buf, n );
1342        fireClientGotData( msgs, n, TRUE );
1343        tr_free( buf );
1344        dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
1345               (int)n, req->index, req->offset, req->length,
1346               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1347        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1348            return READ_LATER;
1349
1350        /* we've got the whole block ... process it */
1351        err = clientGotBlock( msgs, EVBUFFER_DATA(
1352                                  msgs->incoming.block ), req );
1353
1354        /* cleanup */
1355        evbuffer_drain( msgs->incoming.block,
1356                       EVBUFFER_LENGTH( msgs->incoming.block ) );
1357        req->length = 0;
1358        msgs->state = AWAITING_BT_LENGTH;
1359        if( !err )
1360            return READ_NOW;
1361        else
1362        {
1363            fireError( msgs, err );
1364            return READ_ERR;
1365        }
1366    }
1367}
1368
1369static int
1370readBtMessage( tr_peermsgs *     msgs,
1371               struct evbuffer * inbuf,
1372               size_t            inlen )
1373{
1374    uint32_t      ui32;
1375    uint32_t      msglen = msgs->incoming.length;
1376    const uint8_t id = msgs->incoming.id;
1377    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1378
1379    --msglen; /* id length */
1380
1381    if( inlen < msglen )
1382        return READ_LATER;
1383
1384    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id,
1385            (int)msglen,
1386            (int)inlen );
1387
1388    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1389    {
1390        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d",
1391                (int)id, (int)msglen );
1392        fireError( msgs, EMSGSIZE );
1393        return READ_ERR;
1394    }
1395
1396    switch( id )
1397    {
1398        case BT_CHOKE:
1399            dbgmsg( msgs, "got Choke" );
1400            msgs->info->clientIsChoked = 1;
1401            cancelAllRequestsToPeer( msgs );
1402            cancelAllRequestsToClientExceptFast( msgs );
1403            break;
1404
1405        case BT_UNCHOKE:
1406            dbgmsg( msgs, "got Unchoke" );
1407            msgs->info->clientIsChoked = 0;
1408            fireNeedReq( msgs );
1409            break;
1410
1411        case BT_INTERESTED:
1412            dbgmsg( msgs, "got Interested" );
1413            msgs->info->peerIsInterested = 1;
1414            break;
1415
1416        case BT_NOT_INTERESTED:
1417            dbgmsg( msgs, "got Not Interested" );
1418            msgs->info->peerIsInterested = 0;
1419            break;
1420
1421        case BT_HAVE:
1422            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1423            dbgmsg( msgs, "got Have: %u", ui32 );
1424            if( tr_bitfieldAdd( msgs->info->have, ui32 ) )
1425                fireError( msgs, ERANGE );
1426            updatePeerProgress( msgs );
1427            tr_rcTransferred( msgs->torrent->swarmSpeed,
1428                              msgs->torrent->info.pieceSize );
1429            break;
1430
1431        case BT_BITFIELD:
1432        {
1433            dbgmsg( msgs, "got a bitfield" );
1434            msgs->peerSentBitfield = 1;
1435            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits,
1436                                msglen );
1437            updatePeerProgress( msgs );
1438            maybeSendFastAllowedSet( msgs );
1439            fireNeedReq( msgs );
1440            break;
1441        }
1442
1443        case BT_REQUEST:
1444        {
1445            struct peer_request r;
1446            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1447            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1448            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1449            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset,
1450                    r.length );
1451            peerMadeRequest( msgs, &r );
1452            break;
1453        }
1454
1455        case BT_CANCEL:
1456        {
1457            struct peer_request r;
1458            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1459            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1460            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1461            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset,
1462                    r.length );
1463            reqListRemove( &msgs->peerAskedForFast, &r );
1464            reqListRemove( &msgs->peerAskedFor, &r );
1465            break;
1466        }
1467
1468        case BT_PIECE:
1469            assert( 0 ); /* handled elsewhere! */
1470            break;
1471
1472        case BT_PORT:
1473            dbgmsg( msgs, "Got a BT_PORT" );
1474            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1475            break;
1476
1477        case BT_SUGGEST:
1478        {
1479            dbgmsg( msgs, "Got a BT_SUGGEST" );
1480            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1481            /* we don't do anything with this yet */
1482            break;
1483        }
1484
1485        case BT_HAVE_ALL:
1486            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1487            tr_bitfieldAddRange( msgs->info->have, 0,
1488                                 msgs->torrent->info.pieceCount );
1489            updatePeerProgress( msgs );
1490            maybeSendFastAllowedSet( msgs );
1491            break;
1492
1493
1494        case BT_HAVE_NONE:
1495            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1496            tr_bitfieldClear( msgs->info->have );
1497            updatePeerProgress( msgs );
1498            maybeSendFastAllowedSet( msgs );
1499            break;
1500
1501        case BT_REJECT:
1502        {
1503            struct peer_request r;
1504            dbgmsg( msgs, "Got a BT_REJECT" );
1505            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1506            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1507            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1508            reqListRemove( &msgs->clientAskedFor, &r );
1509            break;
1510        }
1511
1512        case BT_ALLOWED_FAST:
1513        {
1514            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1515            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1516            /* we don't do anything with this yet */
1517            break;
1518        }
1519
1520        case BT_LTEP:
1521            dbgmsg( msgs, "Got a BT_LTEP" );
1522            parseLtep( msgs, msglen, inbuf );
1523            break;
1524
1525        default:
1526            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1527            tr_peerIoDrain( msgs->io, inbuf, msglen );
1528            break;
1529    }
1530
1531    assert( msglen + 1 == msgs->incoming.length );
1532    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1533
1534    msgs->state = AWAITING_BT_LENGTH;
1535    return READ_NOW;
1536}
1537
1538static void
1539decrementDownloadedCount( tr_peermsgs * msgs,
1540                          uint32_t      byteCount )
1541{
1542    tr_torrent * tor = msgs->torrent;
1543
1544    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1545}
1546
1547static void
1548clientGotUnwantedBlock( tr_peermsgs *               msgs,
1549                        const struct peer_request * req )
1550{
1551    decrementDownloadedCount( msgs, req->length );
1552}
1553
1554static void
1555addPeerToBlamefield( tr_peermsgs * msgs,
1556                     uint32_t      index )
1557{
1558    if( !msgs->info->blame )
1559        msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1560    tr_bitfieldAdd( msgs->info->blame, index );
1561}
1562
1563/* returns 0 on success, or an errno on failure */
1564static int
1565clientGotBlock( tr_peermsgs *               msgs,
1566                const uint8_t *             data,
1567                const struct peer_request * req )
1568{
1569    int                    err;
1570    tr_torrent *           tor = msgs->torrent;
1571    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1572
1573    assert( msgs );
1574    assert( req );
1575
1576    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) )
1577    {
1578        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1579                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1580        return EMSGSIZE;
1581    }
1582
1583    /* save the block */
1584    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset,
1585            req->length );
1586
1587    /**
1588    *** Remove the block from our `we asked for this' list
1589    **/
1590
1591    if( !reqListRemove( &msgs->clientAskedFor, req ) )
1592    {
1593        clientGotUnwantedBlock( msgs, req );
1594        dbgmsg( msgs, "we didn't ask for this message..." );
1595        return 0;
1596    }
1597
1598    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1599            msgs->clientAskedFor.count );
1600
1601    /**
1602    *** Error checks
1603    **/
1604
1605    if( tr_cpBlockIsComplete( tor->completion, block ) )
1606    {
1607        dbgmsg( msgs, "we have this block already..." );
1608        clientGotUnwantedBlock( msgs, req );
1609        return 0;
1610    }
1611
1612    /**
1613    ***  Save the block
1614    **/
1615
1616    msgs->info->peerSentPieceDataAt = time( NULL );
1617    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1618        return err;
1619
1620    addPeerToBlamefield( msgs, req->index );
1621    fireGotBlock( msgs, req );
1622    return 0;
1623}
1624
1625static void
1626didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1627{
1628    tr_peermsgs * msgs = vmsgs;
1629    firePeerGotData( msgs, bytesWritten, wasPieceData );
1630}
1631
1632static ReadState
1633canRead( struct bufferevent * evin,
1634         void *               vmsgs )
1635{
1636    ReadState         ret;
1637    tr_peermsgs *     msgs = vmsgs;
1638    struct evbuffer * in = EVBUFFER_INPUT ( evin );
1639    const size_t      inlen = EVBUFFER_LENGTH( in );
1640
1641    if( !inlen )
1642    {
1643        ret = READ_LATER;
1644    }
1645    else if( msgs->state == AWAITING_BT_PIECE )
1646    {
1647        ret = inlen ? readBtPiece( msgs, in, inlen ) : READ_LATER;
1648    }
1649    else switch( msgs->state )
1650    {
1651        case AWAITING_BT_LENGTH:
1652            ret = readBtLength ( msgs, in, inlen ); break;
1653
1654        case AWAITING_BT_ID:
1655            ret = readBtId     ( msgs, in, inlen ); break;
1656
1657        case AWAITING_BT_MESSAGE:
1658            ret = readBtMessage( msgs, in, inlen ); break;
1659
1660        default:
1661            assert( 0 );
1662    }
1663
1664    /* log the raw data that was read */
1665    if( EVBUFFER_LENGTH( in ) != inlen )
1666        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1667
1668    return ret;
1669}
1670
1671static void
1672sendKeepalive( tr_peermsgs * msgs )
1673{
1674    dbgmsg( msgs, "sending a keepalive message" );
1675    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1676    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1677}
1678
1679/**
1680***
1681**/
1682
1683static int
1684ratePulse( void * vpeer )
1685{
1686    tr_peermsgs * peer = vpeer;
1687    const double rateToClient = tr_peerGetPieceSpeed( peer->info,
1688                                                      TR_PEER_TO_CLIENT );
1689    const int estimatedBlocksInNext30Seconds =
1690                  ( rateToClient * 30 * 1024 ) / peer->torrent->blockSize;
1691
1692    peer->minActiveRequests = 4;
1693    peer->maxActiveRequests = peer->minActiveRequests +
1694                              estimatedBlocksInNext30Seconds;
1695    return TRUE;
1696}
1697
1698static int
1699popNextRequest( tr_peermsgs *         msgs,
1700                struct peer_request * setme )
1701{
1702    return reqListPop( &msgs->peerAskedForFast, setme )
1703        || reqListPop( &msgs->peerAskedFor, setme );
1704}
1705
1706static int
1707peerPulse( void * vmsgs )
1708{
1709    tr_peermsgs * msgs = vmsgs;
1710    const time_t  now = time( NULL );
1711
1712    ratePulse( msgs );
1713
1714    /*tr_peerIoTryRead( msgs->io );*/
1715    pumpRequestQueue( msgs, now );
1716    expireOldRequests( msgs, now );
1717
1718    if( msgs->sendingBlock )
1719    {
1720        const size_t uploadMax = tr_peerIoGetWriteBufferSpace( msgs->io );
1721        size_t       len = EVBUFFER_LENGTH( msgs->outBlock );
1722        const size_t outlen = MIN( len, uploadMax );
1723
1724        assert( len );
1725
1726        if( outlen )
1727        {
1728            tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen, TRUE );
1729            evbuffer_drain( msgs->outBlock, outlen );
1730
1731            len -= outlen;
1732            msgs->clientSentAnythingAt = now;
1733            msgs->sendingBlock = len != 0;
1734
1735            dbgmsg( msgs, "wrote %zu bytes; %zu left in block", outlen, len );
1736        }
1737        else dbgmsg( msgs, "stalled writing block... uploadMax %lu, outlen %lu", uploadMax, outlen );
1738    }
1739
1740    if( !msgs->sendingBlock )
1741    {
1742        struct peer_request req;
1743        const int haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1744
1745        if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1746        {
1747            dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1748            msgs->outMessagesBatchedAt = now;
1749        }
1750        else if( haveMessages
1751               && ( ( now - msgs->outMessagesBatchedAt ) >
1752                   msgs->outMessagesBatchPeriod ) )
1753        {
1754            dbgmsg( msgs, "flushing outMessages... (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1755            tr_peerIoWriteBuf( msgs->io, msgs->outMessages, FALSE );
1756            msgs->clientSentAnythingAt = now;
1757            msgs->outMessagesBatchedAt = 0;
1758            msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1759        }
1760        else if( !EVBUFFER_LENGTH( msgs->outBlock )
1761               && popNextRequest( msgs, &req )
1762               && requestIsValid( msgs, &req )
1763               && tr_cpPieceIsComplete( msgs->torrent->completion,
1764                                        req.index ) )
1765        {
1766            uint8_t * buf = tr_new( uint8_t, req.length );
1767            const int err = tr_ioRead( msgs->torrent,
1768                                       req.index, req.offset, req.length,
1769                                       buf );
1770            if( err )
1771            {
1772                fireError( msgs, err );
1773            }
1774            else
1775            {
1776                tr_peerIo *       io = msgs->io;
1777                struct evbuffer * out = msgs->outBlock;
1778
1779                dbgmsg( msgs, "sending block %u:%u->%u", req.index,
1780                        req.offset,
1781                        req.length );
1782                tr_peerIoWriteUint32(
1783                    io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) +
1784                    req.length );
1785                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1786                tr_peerIoWriteUint32( io, out, req.index );
1787                tr_peerIoWriteUint32( io, out, req.offset );
1788                tr_peerIoWriteBytes ( io, out, buf, req.length );
1789                msgs->sendingBlock = 1;
1790            }
1791
1792            tr_free( buf );
1793        }
1794        else if( ( !haveMessages )
1795               && ( now - msgs->clientSentAnythingAt ) >
1796                KEEPALIVE_INTERVAL_SECS )
1797        {
1798            sendKeepalive( msgs );
1799        }
1800    }
1801
1802    return TRUE; /* loop forever */
1803}
1804
1805void
1806tr_peerMsgsPulse( tr_peermsgs * msgs )
1807{
1808    if( msgs != NULL )
1809        peerPulse( msgs );
1810}
1811
1812static void
1813gotError( struct bufferevent * evbuf UNUSED,
1814          short                      what,
1815          void *                     vmsgs )
1816{
1817    if( what & EVBUFFER_TIMEOUT )
1818        dbgmsg( vmsgs, "libevent got a timeout, what=%hd, secs=%d", what,
1819                evbuf->timeout_read );
1820    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1821        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1822               what, errno, tr_strerror( errno ) );
1823    fireError( vmsgs, ENOTCONN );
1824}
1825
1826static void
1827sendBitfield( tr_peermsgs * msgs )
1828{
1829    struct evbuffer * out = msgs->outMessages;
1830    tr_bitfield *     field;
1831    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
1832    size_t            i;
1833    size_t            lazyCount = 0;
1834
1835    field = tr_bitfieldDup( tr_cpPieceBitfield( msgs->torrent->completion ) );
1836
1837    if( tr_sessionIsLazyBitfieldEnabled( msgs->session ) )
1838    {
1839        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
1840            speed over a truly random sample -- let's limit the pool size to
1841            the first 1000 pieces so large torrents don't bog things down */
1842        size_t poolSize;
1843        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
1844        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
1845
1846        /* build the pool */
1847        for( i=poolSize=0; i<maxPoolSize; ++i )
1848            if( tr_bitfieldHas( field, i ) )
1849                pool[poolSize++] = i;
1850
1851        /* pull random piece indices from the pool */
1852        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
1853        {
1854            const int pos = tr_cryptoWeakRandInt( poolSize );
1855            const tr_piece_index_t piece = pool[pos];
1856            tr_bitfieldRem( field, piece );
1857            lazyPieces[lazyCount++] = piece;
1858            pool[pos] = pool[--poolSize];
1859        }
1860
1861        /* cleanup */
1862        tr_free( pool );
1863    }
1864
1865    tr_peerIoWriteUint32( msgs->io, out,
1866                          sizeof( uint8_t ) + field->byteCount );
1867    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1868    tr_peerIoWriteBytes ( msgs->io, out, field->bits, field->byteCount );
1869    dbgmsg( msgs, "sending bitfield... outMessage size is now %d",
1870           (int)EVBUFFER_LENGTH( out ) );
1871    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1872
1873    for( i = 0; i < lazyCount; ++i )
1874        protocolSendHave( msgs, lazyPieces[i] );
1875
1876    tr_bitfieldFree( field );
1877}
1878
1879/**
1880***
1881**/
1882
1883/* some peers give us error messages if we send
1884   more than this many peers in a single pex message
1885   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1886#define MAX_PEX_ADDED 50
1887#define MAX_PEX_DROPPED 50
1888
1889typedef struct
1890{
1891    tr_pex *  added;
1892    tr_pex *  dropped;
1893    tr_pex *  elements;
1894    int       addedCount;
1895    int       droppedCount;
1896    int       elementCount;
1897}
1898PexDiffs;
1899
1900static void
1901pexAddedCb( void * vpex,
1902            void * userData )
1903{
1904    PexDiffs * diffs = userData;
1905    tr_pex *   pex = vpex;
1906
1907    if( diffs->addedCount < MAX_PEX_ADDED )
1908    {
1909        diffs->added[diffs->addedCount++] = *pex;
1910        diffs->elements[diffs->elementCount++] = *pex;
1911    }
1912}
1913
1914static void
1915pexDroppedCb( void * vpex,
1916              void * userData )
1917{
1918    PexDiffs * diffs = userData;
1919    tr_pex *   pex = vpex;
1920
1921    if( diffs->droppedCount < MAX_PEX_DROPPED )
1922    {
1923        diffs->dropped[diffs->droppedCount++] = *pex;
1924    }
1925}
1926
1927static void
1928pexElementCb( void * vpex,
1929              void * userData )
1930{
1931    PexDiffs * diffs = userData;
1932    tr_pex *   pex = vpex;
1933
1934    diffs->elements[diffs->elementCount++] = *pex;
1935}
1936
1937static void
1938sendPex( tr_peermsgs * msgs )
1939{
1940    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
1941    {
1942        int               i;
1943        tr_pex *          newPex = NULL;
1944        const int         newCount = tr_peerMgrGetPeers(
1945            msgs->session->peerMgr, msgs->torrent->info.hash, &newPex );
1946        PexDiffs          diffs;
1947        tr_benc           val;
1948        uint8_t *         tmp, *walk;
1949        char *            benc;
1950        int               bencLen;
1951        struct evbuffer * out = msgs->outMessages;
1952
1953        /* build the diffs */
1954        diffs.added = tr_new( tr_pex, newCount );
1955        diffs.addedCount = 0;
1956        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1957        diffs.droppedCount = 0;
1958        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1959        diffs.elementCount = 0;
1960        tr_set_compare( msgs->pex, msgs->pexCount,
1961                        newPex, newCount,
1962                        tr_pexCompare, sizeof( tr_pex ),
1963                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
1964        dbgmsg(
1965            msgs,
1966            "pex: old peer count %d, new peer count %d, added %d, removed %d",
1967            msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1968
1969        /* update peer */
1970        tr_free( msgs->pex );
1971        msgs->pex = diffs.elements;
1972        msgs->pexCount = diffs.elementCount;
1973
1974        /* build the pex payload */
1975        tr_bencInitDict( &val, 3 );
1976
1977        /* "added" */
1978        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1979        for( i = 0; i < diffs.addedCount; ++i )
1980        {
1981            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1982            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1983        }
1984        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1985        tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
1986        tr_free( tmp );
1987
1988        /* "added.f" */
1989        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1990        for( i = 0; i < diffs.addedCount; ++i )
1991            *walk++ = diffs.added[i].flags;
1992        assert( ( walk - tmp ) == diffs.addedCount );
1993        tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
1994        tr_free( tmp );
1995
1996        /* "dropped" */
1997        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1998        for( i = 0; i < diffs.droppedCount; ++i )
1999        {
2000            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
2001            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2002        }
2003        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2004        tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2005        tr_free( tmp );
2006
2007        /* write the pex message */
2008        benc = tr_bencSave( &val, &bencLen );
2009        tr_peerIoWriteUint32( msgs->io, out,
2010                              2 * sizeof( uint8_t ) + bencLen );
2011        tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
2012        tr_peerIoWriteUint8 ( msgs->io, out, msgs->ut_pex_id );
2013        tr_peerIoWriteBytes ( msgs->io, out, benc, bencLen );
2014        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
2015        dbgmsg( msgs, "outMessage size is now %d",
2016               (int)EVBUFFER_LENGTH( out ) );
2017
2018        /* cleanup */
2019        tr_free( benc );
2020        tr_bencFree( &val );
2021        tr_free( diffs.added );
2022        tr_free( diffs.dropped );
2023        tr_free( newPex );
2024
2025        msgs->clientSentPexAt = time( NULL );
2026    }
2027}
2028
2029static int
2030pexPulse( void * vpeer )
2031{
2032    sendPex( vpeer );
2033    return TRUE;
2034}
2035
2036/**
2037***
2038**/
2039
2040tr_peermsgs*
2041tr_peerMsgsNew( struct tr_torrent * torrent,
2042                struct tr_peer *    info,
2043                tr_delivery_func    func,
2044                void *              userData,
2045                tr_publisher_tag *  setme )
2046{
2047    tr_peermsgs * m;
2048
2049    assert( info );
2050    assert( info->io );
2051
2052    m = tr_new0( tr_peermsgs, 1 );
2053    m->publisher = tr_publisherNew( );
2054    m->info = info;
2055    m->session = torrent->session;
2056    m->torrent = torrent;
2057    m->io = info->io;
2058    m->info->clientIsChoked = 1;
2059    m->info->peerIsChoked = 1;
2060    m->info->clientIsInterested = 0;
2061    m->info->peerIsInterested = 0;
2062    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
2063    m->state = AWAITING_BT_LENGTH;
2064    m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL );
2065    m->outMessages = evbuffer_new( );
2066    m->outMessagesBatchedAt = 0;
2067    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2068    m->incoming.block = evbuffer_new( );
2069    m->outBlock = evbuffer_new( );
2070    m->peerAllowedPieces = NULL;
2071    m->peerAskedFor = REQUEST_LIST_INIT;
2072    m->peerAskedForFast = REQUEST_LIST_INIT;
2073    m->clientAskedFor = REQUEST_LIST_INIT;
2074    m->clientWillAskFor = REQUEST_LIST_INIT;
2075    *setme = tr_publisherSubscribe( m->publisher, func, userData );
2076
2077    if( tr_peerIoSupportsLTEP( m->io ) )
2078        sendLtepHandshake( m );
2079
2080    sendBitfield( m );
2081
2082    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of
2083                                             inactivity */
2084    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
2085    ratePulse( m );
2086
2087    return m;
2088}
2089
2090void
2091tr_peerMsgsFree( tr_peermsgs* msgs )
2092{
2093    if( msgs )
2094    {
2095        tr_timerFree( &msgs->pexTimer );
2096        tr_publisherFree( &msgs->publisher );
2097        reqListClear( &msgs->clientWillAskFor );
2098        reqListClear( &msgs->clientAskedFor );
2099        reqListClear( &msgs->peerAskedForFast );
2100        reqListClear( &msgs->peerAskedFor );
2101        tr_bitfieldFree( msgs->peerAllowedPieces );
2102        evbuffer_free( msgs->incoming.block );
2103        evbuffer_free( msgs->outMessages );
2104        evbuffer_free( msgs->outBlock );
2105        tr_free( msgs->pex );
2106
2107        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2108        tr_free( msgs );
2109    }
2110}
2111
2112void
2113tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2114                        tr_publisher_tag tag )
2115{
2116    tr_publisherUnsubscribe( peer->publisher, tag );
2117}
2118
Note: See TracBrowser for help on using the repository browser.