source: trunk/libtransmission/peer-msgs.c @ 7055

Last change on this file since 7055 was 7055, checked in by charles, 13 years ago

update NEWS

  • Property svn:keywords set to Date Rev Author Id
File size: 59.2 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 7055 2008-11-06 02:56:51Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <event.h>
21
22#include "transmission.h"
23#include "bencode.h"
24#include "completion.h"
25#include "crypto.h"
26#include "inout.h"
27#ifdef WIN32
28#include "net.h" /* for ECONN */
29#endif
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-mgr-private.h"
33#include "peer-msgs.h"
34#include "ratecontrol.h"
35#include "stats.h"
36#include "torrent.h"
37#include "trevent.h"
38#include "utils.h"
39
40/**
41***
42**/
43
44enum
45{
46    BT_CHOKE                = 0,
47    BT_UNCHOKE              = 1,
48    BT_INTERESTED           = 2,
49    BT_NOT_INTERESTED       = 3,
50    BT_HAVE                 = 4,
51    BT_BITFIELD             = 5,
52    BT_REQUEST              = 6,
53    BT_PIECE                = 7,
54    BT_CANCEL               = 8,
55    BT_PORT                 = 9,
56    BT_SUGGEST              = 13,
57    BT_HAVE_ALL             = 14,
58    BT_HAVE_NONE            = 15,
59    BT_REJECT               = 16,
60    BT_ALLOWED_FAST         = 17,
61    BT_LTEP                 = 20,
62
63    LTEP_HANDSHAKE          = 0,
64
65    TR_LTEP_PEX             = 1,
66
67    MIN_CHOKE_PERIOD_SEC    = ( 10 ),
68
69    /* idle seconds before we send a keepalive */
70    KEEPALIVE_INTERVAL_SECS = 100,
71
72    PEX_INTERVAL            = ( 90 * 1000 ), /* msec between sendPex() calls */
73    PEER_PULSE_INTERVAL     = ( 250 ),       /* msec between peerPulse() calls
74                                               */
75
76    MAX_QUEUE_SIZE          = ( 100 ),
77
78    /* (fast peers) max number of pieces we fast-allow to another peer */
79    MAX_FAST_ALLOWED_COUNT   = 10,
80
81    /* (fast peers) max threshold for allowing fast-pieces requests */
82    MAX_FAST_ALLOWED_THRESHOLD = 10,
83
84    /* how long an unsent request can stay queued before it's returned
85       back to the peer-mgr's pool of requests */
86    QUEUED_REQUEST_TTL_SECS = 20,
87
88    /* how long a sent request can stay queued before it's returned
89       back to the peer-mgr's pool of requests */
90    SENT_REQUEST_TTL_SECS = 240,
91
92    /* used in lowering the outMessages queue period */
93    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
94    HIGH_PRIORITY_INTERVAL_SECS = 2,
95    LOW_PRIORITY_INTERVAL_SECS = 20,
96
97    /* number of pieces to remove from the bitfield when
98     * lazy bitfields are turned on */
99    LAZY_PIECE_COUNT = 26
100};
101
102/**
103***  REQUEST MANAGEMENT
104**/
105
106enum
107{
108    AWAITING_BT_LENGTH,
109    AWAITING_BT_ID,
110    AWAITING_BT_MESSAGE,
111    AWAITING_BT_PIECE
112};
113
114struct peer_request
115{
116    uint32_t    index;
117    uint32_t    offset;
118    uint32_t    length;
119    time_t      time_requested;
120};
121
122static int
123compareRequest( const void * va,
124                const void * vb )
125{
126    const struct peer_request * a = va;
127    const struct peer_request * b = vb;
128
129    if( a->index != b->index )
130        return a->index < b->index ? -1 : 1;
131
132    if( a->offset != b->offset )
133        return a->offset < b->offset ? -1 : 1;
134
135    if( a->length != b->length )
136        return a->length < b->length ? -1 : 1;
137
138    return 0;
139}
140
141struct request_list
142{
143    uint16_t               count;
144    uint16_t               max;
145    struct peer_request *  requests;
146};
147
148static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
149
150static void
151reqListReserve( struct request_list * list,
152                uint16_t              max )
153{
154    if( list->max < max )
155    {
156        list->max = max;
157        list->requests = tr_renew( struct peer_request,
158                                   list->requests,
159                                   list->max );
160    }
161}
162
163static void
164reqListClear( struct request_list * list )
165{
166    tr_free( list->requests );
167    *list = REQUEST_LIST_INIT;
168}
169
170static void
171reqListCopy( struct request_list *       dest,
172             const struct request_list * src )
173{
174    dest->count = dest->max = src->count;
175    dest->requests =
176        tr_memdup( src->requests, dest->count * sizeof( struct peer_request ) );
177}
178
179static void
180reqListRemoveOne( struct request_list * list,
181                  int                   i )
182{
183    assert( 0 <= i && i < list->count );
184
185    memmove( &list->requests[i],
186            &list->requests[i + 1],
187            sizeof( struct peer_request ) * ( --list->count - i ) );
188}
189
190static void
191reqListAppend( struct request_list *       list,
192               const struct peer_request * req )
193{
194    if( ++list->count >= list->max )
195        reqListReserve( list, list->max + 8 );
196
197    list->requests[list->count - 1] = *req;
198}
199
200static int
201reqListPop( struct request_list * list,
202            struct peer_request * setme )
203{
204    int success;
205
206    if( !list->count )
207        success = FALSE;
208    else {
209        *setme = list->requests[0];
210        reqListRemoveOne( list, 0 );
211        success = TRUE;
212    }
213
214    return success;
215}
216
217static int
218reqListFind( struct request_list *       list,
219             const struct peer_request * key )
220{
221    uint16_t i;
222
223    for( i = 0; i < list->count; ++i )
224        if( !compareRequest( key, list->requests + i ) )
225            return i;
226
227    return -1;
228}
229
230static int
231reqListRemove( struct request_list *       list,
232               const struct peer_request * key )
233{
234    int success;
235    const int i = reqListFind( list, key );
236
237    if( i < 0 )
238        success = FALSE;
239    else {
240        reqListRemoveOne( list, i );
241        success = TRUE;
242    }
243
244    return success;
245}
246
247/**
248***
249**/
250
251/* this is raw, unchanged data from the peer regarding
252 * the current message that it's sending us. */
253struct tr_incoming
254{
255    uint8_t                id;
256    uint32_t               length; /* includes the +1 for id length */
257    struct peer_request    blockReq; /* metadata for incoming blocks */
258    struct evbuffer *      block; /* piece data for incoming blocks */
259};
260
261struct tr_peermsgs
262{
263    unsigned int    peerSentBitfield        : 1;
264    unsigned int    peerSupportsPex         : 1;
265    unsigned int    clientSentLtepHandshake : 1;
266    unsigned int    peerSentLtepHandshake   : 1;
267    unsigned int    sendingBlock            : 1;
268
269    uint8_t         state;
270    uint8_t         ut_pex_id;
271    uint16_t        pexCount;
272    uint16_t        minActiveRequests;
273    uint16_t        maxActiveRequests;
274
275    /* how long the outMessages batch should be allowed to grow before
276     * it's flushed -- some messages (like requests >:) should be sent
277     * very quickly; others aren't as urgent. */
278    int                    outMessagesBatchPeriod;
279
280    tr_peer *              info;
281
282    tr_session *           session;
283    tr_torrent *           torrent;
284    tr_peerIo *            io;
285
286    tr_publisher_t *       publisher;
287
288    struct evbuffer *      outBlock; /* buffer of the current piece message */
289    struct evbuffer *      outMessages; /* all the non-piece messages */
290
291    struct request_list    peerAskedFor;
292    struct request_list    peerAskedForFast;
293    struct request_list    clientAskedFor;
294    struct request_list    clientWillAskFor;
295
296    tr_timer *             pexTimer;
297
298    time_t                 clientSentPexAt;
299    time_t                 clientSentAnythingAt;
300
301    /* when we started batching the outMessages */
302    time_t                outMessagesBatchedAt;
303
304    tr_bitfield *         peerAllowedPieces;
305
306    struct tr_incoming    incoming;
307
308    tr_pex *              pex;
309};
310
311/**
312***
313**/
314
315static void
316myDebug( const char *               file,
317         int                        line,
318         const struct tr_peermsgs * msgs,
319         const char *               fmt,
320         ... )
321{
322    FILE * fp = tr_getLog( );
323
324    if( fp )
325    {
326        va_list           args;
327        char              timestr[64];
328        struct evbuffer * buf = evbuffer_new( );
329        char *            base = tr_basename( file );
330
331        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
332                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
333                             msgs->torrent->info.name,
334                             tr_peerIoGetAddrStr( msgs->io ),
335                             msgs->info->client );
336        va_start( args, fmt );
337        evbuffer_add_vprintf( buf, fmt, args );
338        va_end( args );
339        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
340        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
341
342        tr_free( base );
343        evbuffer_free( buf );
344    }
345}
346
347#define dbgmsg( msgs, ... ) myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ )
348
349/**
350***
351**/
352
353static void
354pokeBatchPeriod( tr_peermsgs * msgs,
355                 int           interval )
356{
357    if( msgs->outMessagesBatchPeriod > interval )
358    {
359        msgs->outMessagesBatchPeriod = interval;
360        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
361    }
362}
363
364static void
365protocolSendRequest( tr_peermsgs *               msgs,
366                     const struct peer_request * req )
367{
368    tr_peerIo *       io = msgs->io;
369    struct evbuffer * out = msgs->outMessages;
370
371    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
372    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
373    tr_peerIoWriteUint32( io, out, req->index );
374    tr_peerIoWriteUint32( io, out, req->offset );
375    tr_peerIoWriteUint32( io, out, req->length );
376    dbgmsg( msgs, "requesting %u:%u->%u... outMessage size is now %d",
377           req->index, req->offset, req->length, (int)EVBUFFER_LENGTH( out ) );
378    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
379}
380
381static void
382protocolSendCancel( tr_peermsgs *               msgs,
383                    const struct peer_request * req )
384{
385    tr_peerIo *       io = msgs->io;
386    struct evbuffer * out = msgs->outMessages;
387
388    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
389    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
390    tr_peerIoWriteUint32( io, out, req->index );
391    tr_peerIoWriteUint32( io, out, req->offset );
392    tr_peerIoWriteUint32( io, out, req->length );
393    dbgmsg( msgs, "cancelling %u:%u->%u... outMessage size is now %d",
394           req->index, req->offset, req->length, (int)EVBUFFER_LENGTH( out ) );
395    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
396}
397
398static void
399protocolSendHave( tr_peermsgs * msgs,
400                  uint32_t      index )
401{
402    tr_peerIo *       io = msgs->io;
403    struct evbuffer * out = msgs->outMessages;
404
405    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + sizeof( uint32_t ) );
406    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
407    tr_peerIoWriteUint32( io, out, index );
408    dbgmsg( msgs, "sending Have %u.. outMessage size is now %d",
409           index, (int)EVBUFFER_LENGTH( out ) );
410    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
411}
412
413static void
414protocolSendChoke( tr_peermsgs * msgs,
415                   int           choke )
416{
417    tr_peerIo *       io = msgs->io;
418    struct evbuffer * out = msgs->outMessages;
419
420    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
421    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
422    dbgmsg( msgs, "sending %s... outMessage size is now %d",
423           ( choke ? "Choke" : "Unchoke" ),
424           (int)EVBUFFER_LENGTH( out ) );
425    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
426}
427
428/**
429***  EVENTS
430**/
431
432static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0 };
433
434static void
435publish( tr_peermsgs *   msgs,
436         tr_peer_event * e )
437{
438    tr_publisherPublish( msgs->publisher, msgs->info, e );
439}
440
441static void
442fireError( tr_peermsgs * msgs,
443           int           err )
444{
445    tr_peer_event e = blankEvent;
446
447    e.eventType = TR_PEER_ERROR;
448    e.err = err;
449    publish( msgs, &e );
450}
451
452static void
453fireNeedReq( tr_peermsgs * msgs )
454{
455    tr_peer_event e = blankEvent;
456
457    e.eventType = TR_PEER_NEED_REQ;
458    publish( msgs, &e );
459}
460
461static void
462firePeerProgress( tr_peermsgs * msgs )
463{
464    tr_peer_event e = blankEvent;
465
466    e.eventType = TR_PEER_PEER_PROGRESS;
467    e.progress = msgs->info->progress;
468    publish( msgs, &e );
469}
470
471static void
472fireGotBlock( tr_peermsgs *               msgs,
473              const struct peer_request * req )
474{
475    tr_peer_event e = blankEvent;
476
477    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
478    e.pieceIndex = req->index;
479    e.offset = req->offset;
480    e.length = req->length;
481    publish( msgs, &e );
482}
483
484static void
485fireClientGotData( tr_peermsgs * msgs,
486                   uint32_t      length )
487{
488    tr_peer_event e = blankEvent;
489
490    e.length = length;
491    e.eventType = TR_PEER_CLIENT_GOT_DATA;
492    publish( msgs, &e );
493}
494
495static void
496firePeerGotData( tr_peermsgs * msgs,
497                 uint32_t      length )
498{
499    tr_peer_event e = blankEvent;
500
501    e.length = length;
502    e.eventType = TR_PEER_PEER_GOT_DATA;
503    publish( msgs, &e );
504}
505
506static void
507fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
508{
509    tr_peer_event e = blankEvent;
510    e.eventType = TR_PEER_CANCEL;
511    e.pieceIndex = req->index;
512    e.offset = req->offset;
513    e.length = req->length;
514    publish( msgs, &e );
515}
516
517/**
518***  INTEREST
519**/
520
521static int
522isPieceInteresting( const tr_peermsgs * peer,
523                    tr_piece_index_t    piece )
524{
525    const tr_torrent * torrent = peer->torrent;
526
527    return ( !torrent->info.pieces[piece].dnd )                 /* we want it */
528           && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have
529                                                                        */
530           && ( tr_bitfieldHas( peer->info->have, piece ) );   /* peer has it */
531}
532
533/* "interested" means we'll ask for piece data if they unchoke us */
534static int
535isPeerInteresting( const tr_peermsgs * msgs )
536{
537    tr_piece_index_t    i;
538    const tr_torrent *  torrent;
539    const tr_bitfield * bitfield;
540    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
541
542    if( clientIsSeed )
543        return FALSE;
544
545    torrent = msgs->torrent;
546    bitfield = tr_cpPieceBitfield( torrent->completion );
547
548    if( !msgs->info->have )
549        return TRUE;
550
551    assert( bitfield->byteCount == msgs->info->have->byteCount );
552
553    for( i = 0; i < torrent->info.pieceCount; ++i )
554        if( isPieceInteresting( msgs, i ) )
555            return TRUE;
556
557    return FALSE;
558}
559
560static void
561sendInterest( tr_peermsgs * msgs,
562              int           weAreInterested )
563{
564    struct evbuffer * out = msgs->outMessages;
565
566    assert( msgs );
567    assert( weAreInterested == 0 || weAreInterested == 1 );
568
569    msgs->info->clientIsInterested = weAreInterested;
570    dbgmsg( msgs, "Sending %s",
571            weAreInterested ? "Interested" : "Not Interested" );
572    tr_peerIoWriteUint32( msgs->io, out, sizeof( uint8_t ) );
573    tr_peerIoWriteUint8 (
574        msgs->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
575    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
576    dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH( out ) );
577}
578
579static void
580updateInterest( tr_peermsgs * msgs )
581{
582    const int i = isPeerInteresting( msgs );
583
584    if( i != msgs->info->clientIsInterested )
585        sendInterest( msgs, i );
586    if( i )
587        fireNeedReq( msgs );
588}
589
590static void
591cancelAllRequestsToClientExceptFast( tr_peermsgs * msgs )
592{
593    reqListClear( &msgs->peerAskedFor );
594}
595
596void
597tr_peerMsgsSetChoke( tr_peermsgs * msgs,
598                     int           choke )
599{
600    const time_t now = time( NULL );
601    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
602
603    assert( msgs );
604    assert( msgs->info );
605    assert( choke == 0 || choke == 1 );
606
607    if( msgs->info->chokeChangedAt > fibrillationTime )
608    {
609        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation",
610                choke );
611    }
612    else if( msgs->info->peerIsChoked != choke )
613    {
614        msgs->info->peerIsChoked = choke;
615        if( choke )
616            cancelAllRequestsToClientExceptFast( msgs );
617        protocolSendChoke( msgs, choke );
618        msgs->info->chokeChangedAt = now;
619    }
620}
621
622/**
623***
624**/
625
626void
627tr_peerMsgsHave( tr_peermsgs * msgs,
628                 uint32_t      index )
629{
630    protocolSendHave( msgs, index );
631
632    /* since we have more pieces now, we might not be interested in this peer */
633    updateInterest( msgs );
634}
635
636#if 0
637static void
638sendFastSuggest( tr_peermsgs * msgs,
639                 uint32_t      pieceIndex )
640{
641    assert( msgs );
642
643    if( tr_peerIoSupportsFEXT( msgs->io ) )
644    {
645        tr_peerIoWriteUint32( msgs->io, msgs->outMessages,
646                             sizeof( uint8_t ) + sizeof( uint32_t ) );
647        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
648        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
649    }
650}
651
652static void
653sendFastHave( tr_peermsgs * msgs,
654              int           all )
655{
656    assert( msgs );
657
658    if( tr_peerIoSupportsFEXT( msgs->io ) )
659    {
660        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof( uint8_t ) );
661        tr_peerIoWriteUint8( msgs->io, msgs->outMessages,
662                            ( all ? BT_HAVE_ALL
663                              : BT_HAVE_NONE ) );
664        updateInterest( msgs );
665    }
666}
667
668#endif
669
670static void
671sendFastReject( tr_peermsgs * msgs,
672                uint32_t      pieceIndex,
673                uint32_t      offset,
674                uint32_t      length )
675{
676    assert( msgs );
677
678    if( tr_peerIoSupportsFEXT( msgs->io ) )
679    {
680        struct evbuffer * out = msgs->outMessages;
681        const uint32_t    len = sizeof( uint8_t ) + 3 * sizeof( uint32_t );
682        dbgmsg( msgs, "sending fast reject %u:%u->%u", pieceIndex, offset,
683                length );
684        tr_peerIoWriteUint32( msgs->io, out, len );
685        tr_peerIoWriteUint8( msgs->io, out, BT_REJECT );
686        tr_peerIoWriteUint32( msgs->io, out, pieceIndex );
687        tr_peerIoWriteUint32( msgs->io, out, offset );
688        tr_peerIoWriteUint32( msgs->io, out, length );
689        pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
690        dbgmsg( msgs, "outMessage size is now %d",
691               (int)EVBUFFER_LENGTH( out ) );
692    }
693}
694
695static tr_bitfield*
696getPeerAllowedPieces( tr_peermsgs * msgs )
697{
698    if( !msgs->peerAllowedPieces && tr_peerIoSupportsFEXT( msgs->io ) )
699    {
700        msgs->peerAllowedPieces = tr_peerMgrGenerateAllowedSet(
701            MAX_FAST_ALLOWED_COUNT,
702            msgs->torrent->info.pieceCount,
703            msgs->torrent->info.hash,
704            tr_peerIoGetAddress( msgs->io, NULL ) );
705    }
706
707    return msgs->peerAllowedPieces;
708}
709
710static void
711sendFastAllowed( tr_peermsgs * msgs,
712                 uint32_t      pieceIndex )
713{
714    assert( msgs );
715
716    if( tr_peerIoSupportsFEXT( msgs->io ) )
717    {
718        struct evbuffer * out = msgs->outMessages;
719        dbgmsg( msgs, "sending fast allowed" );
720        tr_peerIoWriteUint32( msgs->io, out,  sizeof( uint8_t ) +
721                             sizeof( uint32_t ) );
722        tr_peerIoWriteUint8( msgs->io, out, BT_ALLOWED_FAST );
723        tr_peerIoWriteUint32( msgs->io, out, pieceIndex );
724        pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
725        dbgmsg( msgs, "outMessage size is now %d",
726               (int)EVBUFFER_LENGTH( out ) );
727    }
728}
729
730static void
731sendFastAllowedSet( tr_peermsgs * msgs )
732{
733    tr_piece_index_t i = 0;
734
735    while( i <= msgs->torrent->info.pieceCount )
736    {
737        if( tr_bitfieldHas( getPeerAllowedPieces( msgs ), i ) )
738            sendFastAllowed( msgs, i );
739        i++;
740    }
741}
742
743static void
744maybeSendFastAllowedSet( tr_peermsgs * msgs )
745{
746    if( tr_bitfieldCountTrueBits( msgs->info->have ) <=
747        MAX_FAST_ALLOWED_THRESHOLD )
748        sendFastAllowedSet( msgs );
749}
750
751/**
752***
753**/
754
755static int
756reqIsValid( const tr_peermsgs * peer,
757            uint32_t            index,
758            uint32_t            offset,
759            uint32_t            length )
760{
761    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
762}
763
764static int
765requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
766{
767    return reqIsValid( msgs, req->index, req->offset, req->length );
768}
769
770static void
771expireOldRequests( tr_peermsgs * msgs, const time_t now  )
772{
773    int                 i;
774    time_t              oldestAllowed;
775    struct request_list tmp = REQUEST_LIST_INIT;
776
777    /* cancel requests that have been queued for too long */
778    oldestAllowed = now - QUEUED_REQUEST_TTL_SECS;
779    reqListCopy( &tmp, &msgs->clientWillAskFor );
780    for( i = 0; i < tmp.count; ++i )
781    {
782        const struct peer_request * req = &tmp.requests[i];
783        if( req->time_requested < oldestAllowed )
784            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
785    }
786    reqListClear( &tmp );
787
788    /* cancel requests that were sent too long ago */
789    oldestAllowed = now - SENT_REQUEST_TTL_SECS;
790    reqListCopy( &tmp, &msgs->clientAskedFor );
791    for( i = 0; i < tmp.count; ++i )
792    {
793        const struct peer_request * req = &tmp.requests[i];
794        if( req->time_requested < oldestAllowed )
795            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
796    }
797    reqListClear( &tmp );
798}
799
800static void
801pumpRequestQueue( tr_peermsgs * msgs, const time_t now )
802{
803    const int           max = msgs->maxActiveRequests;
804    const int           min = msgs->minActiveRequests;
805    int                 sent = 0;
806    int                 count = msgs->clientAskedFor.count;
807    struct peer_request req;
808
809    if( count > min )
810        return;
811    if( msgs->info->clientIsChoked )
812        return;
813
814    while( ( count < max ) && reqListPop( &msgs->clientWillAskFor, &req ) )
815    {
816        const tr_block_index_t block =
817            _tr_block( msgs->torrent, req.index, req.offset );
818
819        assert( requestIsValid( msgs, &req ) );
820        assert( tr_bitfieldHas( msgs->info->have, req.index ) );
821
822        /* don't ask for it if we've already got it... this block may have
823         * come in from a different peer after we cancelled a request for it */
824        if( !tr_cpBlockIsComplete( msgs->torrent->completion, block ) )
825        {
826            protocolSendRequest( msgs, &req );
827            req.time_requested = now;
828            reqListAppend( &msgs->clientAskedFor, &req );
829
830            ++count;
831            ++sent;
832        }
833    }
834
835    if( sent )
836        dbgmsg( msgs,
837                "pump sent %d requests, now have %d active and %d queued",
838                sent,
839                msgs->clientAskedFor.count,
840                msgs->clientWillAskFor.count );
841
842    if( count < max )
843        fireNeedReq( msgs );
844}
845
846static int
847requestQueueIsFull( const tr_peermsgs * msgs )
848{
849    const int req_max = msgs->maxActiveRequests;
850    return msgs->clientWillAskFor.count >= req_max;
851}
852
853tr_addreq_t
854tr_peerMsgsAddRequest( tr_peermsgs *    msgs,
855                       uint32_t         index,
856                       uint32_t         offset,
857                       uint32_t         length )
858{
859    struct peer_request req;
860
861    assert( msgs );
862    assert( msgs->torrent );
863    assert( reqIsValid( msgs, index, offset, length ) );
864
865    /**
866    ***  Reasons to decline the request
867    **/
868
869    /* don't send requests to choked clients */
870    if( msgs->info->clientIsChoked )
871    {
872        dbgmsg( msgs, "declining request because they're choking us" );
873        return TR_ADDREQ_CLIENT_CHOKED;
874    }
875
876    /* peer doesn't have this piece */
877    if( !tr_bitfieldHas( msgs->info->have, index ) )
878        return TR_ADDREQ_MISSING;
879
880    /* peer's queue is full */
881    if( requestQueueIsFull( msgs ) ) {
882        dbgmsg( msgs, "declining request because we're full" );
883        return TR_ADDREQ_FULL;
884    }
885
886    /* have we already asked for this piece? */
887    req.index = index;
888    req.offset = offset;
889    req.length = length;
890    if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
891        dbgmsg( msgs, "declining because it's a duplicate" );
892        return TR_ADDREQ_DUPLICATE;
893    }
894    if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
895        dbgmsg( msgs, "declining because it's a duplicate" );
896        return TR_ADDREQ_DUPLICATE;
897    }
898
899    /**
900    ***  Accept this request
901    **/
902
903    dbgmsg( msgs, "added req for piece %lu", (unsigned long)index );
904    req.time_requested = time( NULL );
905    reqListAppend( &msgs->clientWillAskFor, &req );
906    return TR_ADDREQ_OK;
907}
908
909static void
910cancelAllRequestsToPeer( tr_peermsgs * msgs )
911{
912    int                 i;
913    struct request_list a = msgs->clientWillAskFor;
914    struct request_list b = msgs->clientAskedFor;
915
916    msgs->clientAskedFor = REQUEST_LIST_INIT;
917    msgs->clientWillAskFor = REQUEST_LIST_INIT;
918
919    for( i=0; i<a.count; ++i )
920        fireCancelledReq( msgs, &a.requests[i] );
921
922    for( i = 0; i < b.count; ++i ) {
923        fireCancelledReq( msgs, &b.requests[i] );
924        protocolSendCancel( msgs, &b.requests[i] );
925    }
926
927    reqListClear( &a );
928    reqListClear( &b );
929}
930
931void
932tr_peerMsgsCancel( tr_peermsgs * msgs,
933                   uint32_t      pieceIndex,
934                   uint32_t      offset,
935                   uint32_t      length )
936{
937    struct peer_request req;
938
939    assert( msgs != NULL );
940    assert( length > 0 );
941
942    /* have we asked the peer for this piece? */
943    req.index = pieceIndex;
944    req.offset = offset;
945    req.length = length;
946
947    /* if it's only in the queue and hasn't been sent yet, free it */
948    if( reqListRemove( &msgs->clientWillAskFor, &req ) )
949        fireCancelledReq( msgs, &req );
950
951    /* if it's already been sent, send a cancel message too */
952    if( reqListRemove( &msgs->clientAskedFor, &req ) ) {
953        protocolSendCancel( msgs, &req );
954        fireCancelledReq( msgs, &req );
955    }
956}
957
958/**
959***
960**/
961
962static void
963sendLtepHandshake( tr_peermsgs * msgs )
964{
965    tr_benc           val, *m;
966    char *            buf;
967    int               len;
968    int               pex;
969    struct evbuffer * out = msgs->outMessages;
970
971    if( msgs->clientSentLtepHandshake )
972        return;
973
974    dbgmsg( msgs, "sending an ltep handshake" );
975    msgs->clientSentLtepHandshake = 1;
976
977    /* decide if we want to advertise pex support */
978    if( !tr_torrentAllowsPex( msgs->torrent ) )
979        pex = 0;
980    else if( msgs->peerSentLtepHandshake )
981        pex = msgs->peerSupportsPex ? 1 : 0;
982    else
983        pex = 1;
984
985    tr_bencInitDict( &val, 4 );
986    tr_bencDictAddInt( &val, "e",
987                       msgs->session->encryptionMode != TR_CLEAR_PREFERRED );
988    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->session ) );
989    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
990    m  = tr_bencDictAddDict( &val, "m", 1 );
991    if( pex )
992        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
993    buf = tr_bencSave( &val, &len );
994
995    tr_peerIoWriteUint32( msgs->io, out, 2 * sizeof( uint8_t ) + len );
996    tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
997    tr_peerIoWriteUint8 ( msgs->io, out, LTEP_HANDSHAKE );
998    tr_peerIoWriteBytes ( msgs->io, out, buf, len );
999    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1000    dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH( out ) );
1001
1002    /* cleanup */
1003    tr_bencFree( &val );
1004    tr_free( buf );
1005}
1006
1007static void
1008parseLtepHandshake( tr_peermsgs *     msgs,
1009                    int               len,
1010                    struct evbuffer * inbuf )
1011{
1012    int64_t   i;
1013    tr_benc   val, * sub;
1014    uint8_t * tmp = tr_new( uint8_t, len );
1015
1016    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
1017    msgs->peerSentLtepHandshake = 1;
1018
1019    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type != TYPE_DICT )
1020    {
1021        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
1022        tr_free( tmp );
1023        return;
1024    }
1025
1026    dbgmsg( msgs, "here is the ltep handshake we got [%*.*s]", len, len,
1027            tmp );
1028
1029    /* does the peer prefer encrypted connections? */
1030    if( tr_bencDictFindInt( &val, "e", &i ) )
1031        msgs->info->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1032                                            : ENCRYPTION_PREFERENCE_NO;
1033
1034    /* check supported messages for utorrent pex */
1035    msgs->peerSupportsPex = 0;
1036    if( tr_bencDictFindDict( &val, "m", &sub ) )
1037    {
1038        if( tr_bencDictFindInt( sub, "ut_pex", &i ) )
1039        {
1040            msgs->ut_pex_id = (uint8_t) i;
1041            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
1042            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
1043        }
1044    }
1045
1046    /* get peer's listening port */
1047    if( tr_bencDictFindInt( &val, "p", &i ) )
1048    {
1049        msgs->info->port = htons( (uint16_t)i );
1050        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
1051    }
1052
1053    tr_bencFree( &val );
1054    tr_free( tmp );
1055}
1056
1057static void
1058parseUtPex( tr_peermsgs *     msgs,
1059            int               msglen,
1060            struct evbuffer * inbuf )
1061{
1062    int                loaded = 0;
1063    uint8_t *          tmp = tr_new( uint8_t, msglen );
1064    tr_benc            val;
1065    const tr_torrent * tor = msgs->torrent;
1066    const uint8_t *    added;
1067    size_t             added_len;
1068
1069    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
1070
1071    if( tr_torrentAllowsPex( tor )
1072      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) )
1073      && tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1074    {
1075        const uint8_t * added_f = NULL;
1076        tr_pex *        pex;
1077        size_t          i, n;
1078        size_t          added_f_len = 0;
1079        tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1080        pex =
1081            tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len,
1082                                    &n );
1083        for( i = 0; i < n; ++i )
1084            tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1085                              TR_PEER_FROM_PEX, pex + i );
1086        tr_free( pex );
1087    }
1088
1089    if( loaded )
1090        tr_bencFree( &val );
1091    tr_free( tmp );
1092}
1093
1094static void sendPex( tr_peermsgs * msgs );
1095
1096static void
1097parseLtep( tr_peermsgs *     msgs,
1098           int               msglen,
1099           struct evbuffer * inbuf )
1100{
1101    uint8_t ltep_msgid;
1102
1103    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
1104    msglen--;
1105
1106    if( ltep_msgid == LTEP_HANDSHAKE )
1107    {
1108        dbgmsg( msgs, "got ltep handshake" );
1109        parseLtepHandshake( msgs, msglen, inbuf );
1110        if( tr_peerIoSupportsLTEP( msgs->io ) )
1111        {
1112            sendLtepHandshake( msgs );
1113            sendPex( msgs );
1114        }
1115    }
1116    else if( ltep_msgid == TR_LTEP_PEX )
1117    {
1118        dbgmsg( msgs, "got ut pex" );
1119        msgs->peerSupportsPex = 1;
1120        parseUtPex( msgs, msglen, inbuf );
1121    }
1122    else
1123    {
1124        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1125        evbuffer_drain( inbuf, msglen );
1126    }
1127}
1128
1129static int
1130readBtLength( tr_peermsgs *     msgs,
1131              struct evbuffer * inbuf,
1132              size_t            inlen )
1133{
1134    uint32_t len;
1135
1136    if( inlen < sizeof( len ) )
1137        return READ_LATER;
1138
1139    tr_peerIoReadUint32( msgs->io, inbuf, &len );
1140
1141    if( len == 0 ) /* peer sent us a keepalive message */
1142        dbgmsg( msgs, "got KeepAlive" );
1143    else
1144    {
1145        msgs->incoming.length = len;
1146        msgs->state = AWAITING_BT_ID;
1147    }
1148
1149    return READ_NOW;
1150}
1151
1152static int readBtMessage( tr_peermsgs *     msgs,
1153                          struct evbuffer * inbuf,
1154                          size_t            inlen );
1155
1156static int
1157readBtId( tr_peermsgs *     msgs,
1158          struct evbuffer * inbuf,
1159          size_t            inlen )
1160{
1161    uint8_t id;
1162
1163    if( inlen < sizeof( uint8_t ) )
1164        return READ_LATER;
1165
1166    tr_peerIoReadUint8( msgs->io, inbuf, &id );
1167    msgs->incoming.id = id;
1168
1169    if( id == BT_PIECE )
1170    {
1171        msgs->state = AWAITING_BT_PIECE;
1172        return READ_NOW;
1173    }
1174    else if( msgs->incoming.length != 1 )
1175    {
1176        msgs->state = AWAITING_BT_MESSAGE;
1177        return READ_NOW;
1178    }
1179    else return readBtMessage( msgs, inbuf, inlen - 1 );
1180}
1181
1182static void
1183updatePeerProgress( tr_peermsgs * msgs )
1184{
1185    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have )
1186                           / (float)msgs->torrent->info.pieceCount;
1187    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
1188    updateInterest( msgs );
1189    firePeerProgress( msgs );
1190}
1191
1192static int
1193clientCanSendFastBlock( const tr_peermsgs * msgs UNUSED )
1194{
1195    /* don't send a fast piece if peer has MAX_FAST_ALLOWED_THRESHOLD pieces */
1196    if( tr_bitfieldCountTrueBits( msgs->info->have ) >
1197        MAX_FAST_ALLOWED_THRESHOLD )
1198        return FALSE;
1199
1200    /* ...or if we don't have ourself enough pieces */
1201    if( tr_bitfieldCountTrueBits( tr_cpPieceBitfield( msgs->torrent->
1202                                                      completion ) ) <
1203        MAX_FAST_ALLOWED_THRESHOLD )
1204        return FALSE;
1205
1206    /* Maybe a bandwidth limit ? */
1207    return TRUE;
1208}
1209
1210static void
1211peerMadeRequest( tr_peermsgs *               msgs,
1212                 const struct peer_request * req )
1213{
1214    const int reqIsValid = requestIsValid( msgs, req );
1215    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete(
1216        msgs->torrent->completion, req->index );
1217    const int peerIsChoked = msgs->info->peerIsChoked;
1218    const int peerIsFast = tr_peerIoSupportsFEXT( msgs->io );
1219    const int pieceIsFast = reqIsValid && tr_bitfieldHas(
1220        getPeerAllowedPieces( msgs ), req->index );
1221    const int canSendFast = clientCanSendFastBlock( msgs );
1222
1223    if( !reqIsValid ) /* bad request */
1224    {
1225        dbgmsg( msgs, "rejecting an invalid request." );
1226        sendFastReject( msgs, req->index, req->offset, req->length );
1227    }
1228    else if( !clientHasPiece ) /* we don't have it */
1229    {
1230        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1231        sendFastReject( msgs, req->index, req->offset, req->length );
1232    }
1233    else if( peerIsChoked && !peerIsFast ) /* doesn't he know he's choked? */
1234    {
1235        tr_peerMsgsSetChoke( msgs, 1 );
1236        sendFastReject( msgs, req->index, req->offset, req->length );
1237    }
1238    else if( peerIsChoked && peerIsFast && ( !pieceIsFast || !canSendFast ) )
1239    {
1240        sendFastReject( msgs, req->index, req->offset, req->length );
1241    }
1242    else /* YAY */
1243    {
1244        if( peerIsFast && pieceIsFast )
1245            reqListAppend( &msgs->peerAskedForFast, req );
1246        else
1247            reqListAppend( &msgs->peerAskedFor, req );
1248    }
1249}
1250
1251static int
1252messageLengthIsCorrect( const tr_peermsgs * msg,
1253                        uint8_t             id,
1254                        uint32_t            len )
1255{
1256    switch( id )
1257    {
1258        case BT_CHOKE:
1259        case BT_UNCHOKE:
1260        case BT_INTERESTED:
1261        case BT_NOT_INTERESTED:
1262        case BT_HAVE_ALL:
1263        case BT_HAVE_NONE:
1264            return len == 1;
1265
1266        case BT_HAVE:
1267        case BT_SUGGEST:
1268        case BT_ALLOWED_FAST:
1269            return len == 5;
1270
1271        case BT_BITFIELD:
1272            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1273
1274        case BT_REQUEST:
1275        case BT_CANCEL:
1276        case BT_REJECT:
1277            return len == 13;
1278
1279        case BT_PIECE:
1280            return len > 9 && len <= 16393;
1281
1282        case BT_PORT:
1283            return len == 3;
1284
1285        case BT_LTEP:
1286            return len >= 2;
1287
1288        default:
1289            return FALSE;
1290    }
1291}
1292
1293static int clientGotBlock( tr_peermsgs *               msgs,
1294                           const uint8_t *             block,
1295                           const struct peer_request * req );
1296
1297static void
1298clientGotBytes( tr_peermsgs * msgs,
1299                uint32_t      byteCount )
1300{
1301    msgs->info->pieceDataActivityDate = time( NULL );
1302    fireClientGotData( msgs, byteCount );
1303}
1304
1305static int
1306readBtPiece( tr_peermsgs *     msgs,
1307             struct evbuffer * inbuf,
1308             size_t            inlen )
1309{
1310    struct peer_request * req = &msgs->incoming.blockReq;
1311
1312    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1313    dbgmsg( msgs, "In readBtPiece" );
1314
1315    if( !req->length )
1316    {
1317        if( inlen < 8 )
1318            return READ_LATER;
1319
1320        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
1321        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
1322        req->length = msgs->incoming.length - 9;
1323        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index,
1324                req->offset,
1325                req->length );
1326        return READ_NOW;
1327    }
1328    else
1329    {
1330        int          err;
1331
1332        /* read in another chunk of data */
1333        const size_t nLeft = req->length - EVBUFFER_LENGTH(
1334            msgs->incoming.block );
1335        size_t       n = MIN( nLeft, inlen );
1336        uint8_t *    buf = tr_new( uint8_t, n );
1337        assert( EVBUFFER_LENGTH( inbuf ) >= n );
1338        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
1339        evbuffer_add( msgs->incoming.block, buf, n );
1340        clientGotBytes( msgs, n );
1341        tr_free( buf );
1342        dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
1343               (int)n, req->index, req->offset, req->length,
1344               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1345        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1346            return READ_LATER;
1347
1348        /* we've got the whole block ... process it */
1349        err = clientGotBlock( msgs, EVBUFFER_DATA(
1350                                  msgs->incoming.block ), req );
1351
1352        /* cleanup */
1353        evbuffer_drain( msgs->incoming.block,
1354                       EVBUFFER_LENGTH( msgs->incoming.block ) );
1355        req->length = 0;
1356        msgs->state = AWAITING_BT_LENGTH;
1357        if( !err )
1358            return READ_NOW;
1359        else
1360        {
1361            fireError( msgs, err );
1362            return READ_ERR;
1363        }
1364    }
1365}
1366
1367static int
1368readBtMessage( tr_peermsgs *     msgs,
1369               struct evbuffer * inbuf,
1370               size_t            inlen )
1371{
1372    uint32_t      ui32;
1373    uint32_t      msglen = msgs->incoming.length;
1374    const uint8_t id = msgs->incoming.id;
1375    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1376
1377    --msglen; /* id length */
1378
1379    if( inlen < msglen )
1380        return READ_LATER;
1381
1382    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id,
1383            (int)msglen,
1384            (int)inlen );
1385
1386    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1387    {
1388        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d",
1389                (int)id, (int)msglen );
1390        fireError( msgs, EMSGSIZE );
1391        return READ_ERR;
1392    }
1393
1394    switch( id )
1395    {
1396        case BT_CHOKE:
1397            dbgmsg( msgs, "got Choke" );
1398            msgs->info->clientIsChoked = 1;
1399            cancelAllRequestsToPeer( msgs );
1400            cancelAllRequestsToClientExceptFast( msgs );
1401            break;
1402
1403        case BT_UNCHOKE:
1404            dbgmsg( msgs, "got Unchoke" );
1405            msgs->info->clientIsChoked = 0;
1406            fireNeedReq( msgs );
1407            break;
1408
1409        case BT_INTERESTED:
1410            dbgmsg( msgs, "got Interested" );
1411            msgs->info->peerIsInterested = 1;
1412            break;
1413
1414        case BT_NOT_INTERESTED:
1415            dbgmsg( msgs, "got Not Interested" );
1416            msgs->info->peerIsInterested = 0;
1417            break;
1418
1419        case BT_HAVE:
1420            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1421            dbgmsg( msgs, "got Have: %u", ui32 );
1422            if( tr_bitfieldAdd( msgs->info->have, ui32 ) )
1423                fireError( msgs, ERANGE );
1424            updatePeerProgress( msgs );
1425            tr_rcTransferred( msgs->torrent->swarmSpeed,
1426                              msgs->torrent->info.pieceSize );
1427            break;
1428
1429        case BT_BITFIELD:
1430        {
1431            dbgmsg( msgs, "got a bitfield" );
1432            msgs->peerSentBitfield = 1;
1433            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits,
1434                                msglen );
1435            updatePeerProgress( msgs );
1436            maybeSendFastAllowedSet( msgs );
1437            fireNeedReq( msgs );
1438            break;
1439        }
1440
1441        case BT_REQUEST:
1442        {
1443            struct peer_request r;
1444            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1445            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1446            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1447            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset,
1448                    r.length );
1449            peerMadeRequest( msgs, &r );
1450            break;
1451        }
1452
1453        case BT_CANCEL:
1454        {
1455            struct peer_request r;
1456            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1457            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1458            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1459            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset,
1460                    r.length );
1461            reqListRemove( &msgs->peerAskedForFast, &r );
1462            reqListRemove( &msgs->peerAskedFor, &r );
1463            break;
1464        }
1465
1466        case BT_PIECE:
1467            assert( 0 ); /* handled elsewhere! */
1468            break;
1469
1470        case BT_PORT:
1471            dbgmsg( msgs, "Got a BT_PORT" );
1472            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1473            break;
1474
1475        case BT_SUGGEST:
1476        {
1477            dbgmsg( msgs, "Got a BT_SUGGEST" );
1478            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1479            /* we don't do anything with this yet */
1480            break;
1481        }
1482
1483        case BT_HAVE_ALL:
1484            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1485            tr_bitfieldAddRange( msgs->info->have, 0,
1486                                 msgs->torrent->info.pieceCount );
1487            updatePeerProgress( msgs );
1488            maybeSendFastAllowedSet( msgs );
1489            break;
1490
1491
1492        case BT_HAVE_NONE:
1493            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1494            tr_bitfieldClear( msgs->info->have );
1495            updatePeerProgress( msgs );
1496            maybeSendFastAllowedSet( msgs );
1497            break;
1498
1499        case BT_REJECT:
1500        {
1501            struct peer_request r;
1502            dbgmsg( msgs, "Got a BT_REJECT" );
1503            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1504            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1505            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1506            reqListRemove( &msgs->clientAskedFor, &r );
1507            break;
1508        }
1509
1510        case BT_ALLOWED_FAST:
1511        {
1512            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1513            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1514            /* we don't do anything with this yet */
1515            break;
1516        }
1517
1518        case BT_LTEP:
1519            dbgmsg( msgs, "Got a BT_LTEP" );
1520            parseLtep( msgs, msglen, inbuf );
1521            break;
1522
1523        default:
1524            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1525            tr_peerIoDrain( msgs->io, inbuf, msglen );
1526            break;
1527    }
1528
1529    assert( msglen + 1 == msgs->incoming.length );
1530    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1531
1532    msgs->state = AWAITING_BT_LENGTH;
1533    return READ_NOW;
1534}
1535
1536static void
1537peerGotBytes( tr_peermsgs * msgs,
1538              uint32_t      byteCount,
1539              const time_t  now )
1540{
1541    msgs->info->pieceDataActivityDate = now;
1542    firePeerGotData( msgs, byteCount );
1543}
1544
1545static void
1546decrementDownloadedCount( tr_peermsgs * msgs,
1547                          uint32_t      byteCount )
1548{
1549    tr_torrent * tor = msgs->torrent;
1550
1551    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1552}
1553
1554static void
1555clientGotUnwantedBlock( tr_peermsgs *               msgs,
1556                        const struct peer_request * req )
1557{
1558    decrementDownloadedCount( msgs, req->length );
1559}
1560
1561static void
1562addPeerToBlamefield( tr_peermsgs * msgs,
1563                     uint32_t      index )
1564{
1565    if( !msgs->info->blame )
1566        msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1567    tr_bitfieldAdd( msgs->info->blame, index );
1568}
1569
1570/* returns 0 on success, or an errno on failure */
1571static int
1572clientGotBlock( tr_peermsgs *               msgs,
1573                const uint8_t *             data,
1574                const struct peer_request * req )
1575{
1576    int                    err;
1577    tr_torrent *           tor = msgs->torrent;
1578    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1579
1580    assert( msgs );
1581    assert( req );
1582
1583    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) )
1584    {
1585        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1586                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1587        return EMSGSIZE;
1588    }
1589
1590    /* save the block */
1591    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset,
1592            req->length );
1593
1594    /**
1595    *** Remove the block from our `we asked for this' list
1596    **/
1597
1598    if( !reqListRemove( &msgs->clientAskedFor, req ) )
1599    {
1600        clientGotUnwantedBlock( msgs, req );
1601        dbgmsg( msgs, "we didn't ask for this message..." );
1602        return 0;
1603    }
1604
1605    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1606            msgs->clientAskedFor.count );
1607
1608    /**
1609    *** Error checks
1610    **/
1611
1612    if( tr_cpBlockIsComplete( tor->completion, block ) )
1613    {
1614        dbgmsg( msgs, "we have this block already..." );
1615        clientGotUnwantedBlock( msgs, req );
1616        return 0;
1617    }
1618
1619    /**
1620    ***  Save the block
1621    **/
1622
1623    msgs->info->peerSentPieceDataAt = time( NULL );
1624    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1625        return err;
1626
1627    addPeerToBlamefield( msgs, req->index );
1628    fireGotBlock( msgs, req );
1629    return 0;
1630}
1631
1632static ReadState
1633canRead( struct bufferevent * evin,
1634         void *               vmsgs )
1635{
1636    ReadState         ret;
1637    tr_peermsgs *     msgs = vmsgs;
1638    struct evbuffer * in = EVBUFFER_INPUT ( evin );
1639    const size_t      inlen = EVBUFFER_LENGTH( in );
1640
1641    if( !inlen )
1642    {
1643        ret = READ_LATER;
1644    }
1645    else if( msgs->state == AWAITING_BT_PIECE )
1646    {
1647        ret = inlen ? readBtPiece( msgs, in, inlen ) : READ_LATER;
1648    }
1649    else switch( msgs->state )
1650        {
1651            case AWAITING_BT_LENGTH:
1652                ret = readBtLength ( msgs, in, inlen ); break;
1653
1654            case AWAITING_BT_ID:
1655                ret = readBtId     ( msgs, in, inlen ); break;
1656
1657            case AWAITING_BT_MESSAGE:
1658                ret = readBtMessage( msgs, in, inlen ); break;
1659
1660            default:
1661                assert( 0 );
1662        }
1663
1664    return ret;
1665}
1666
1667static void
1668sendKeepalive( tr_peermsgs * msgs )
1669{
1670    dbgmsg( msgs, "sending a keepalive message" );
1671    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1672    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1673}
1674
1675/**
1676***
1677**/
1678
1679static int
1680ratePulse( void * vpeer )
1681{
1682    tr_peermsgs * peer = vpeer;
1683    const double rateToClient = tr_peerGetPieceSpeed( peer->info,
1684                                                      TR_PEER_TO_CLIENT );
1685    const int estimatedBlocksInNext30Seconds =
1686                  ( rateToClient * 30 * 1024 ) / peer->torrent->blockSize;
1687
1688    peer->minActiveRequests = 4;
1689    peer->maxActiveRequests = peer->minActiveRequests +
1690                              estimatedBlocksInNext30Seconds;
1691    return TRUE;
1692}
1693
1694static int
1695popNextRequest( tr_peermsgs *         msgs,
1696                struct peer_request * setme )
1697{
1698    return reqListPop( &msgs->peerAskedForFast, setme )
1699        || reqListPop( &msgs->peerAskedFor, setme );
1700}
1701
1702static int
1703peerPulse( void * vmsgs )
1704{
1705    tr_peermsgs * msgs = vmsgs;
1706    const time_t  now = time( NULL );
1707
1708    ratePulse( msgs );
1709
1710    /*tr_peerIoTryRead( msgs->io );*/
1711    pumpRequestQueue( msgs, now );
1712    expireOldRequests( msgs, now );
1713
1714    if( msgs->sendingBlock )
1715    {
1716        const size_t uploadMax = tr_peerIoGetWriteBufferSpace( msgs->io );
1717        size_t       len = EVBUFFER_LENGTH( msgs->outBlock );
1718        const size_t outlen = MIN( len, uploadMax );
1719
1720        assert( len );
1721
1722        if( outlen )
1723        {
1724            tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen );
1725            evbuffer_drain( msgs->outBlock, outlen );
1726            peerGotBytes( msgs, outlen, now );
1727
1728            len -= outlen;
1729            msgs->clientSentAnythingAt = now;
1730            msgs->sendingBlock = len != 0;
1731
1732            dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen,
1733                    (int)len );
1734        }
1735        else dbgmsg( msgs,
1736                     "stalled writing block... uploadMax %lu, outlen %lu",
1737                     uploadMax, outlen );
1738    }
1739
1740    if( !msgs->sendingBlock )
1741    {
1742        struct peer_request req;
1743        const int haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1744
1745        if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1746        {
1747            dbgmsg( msgs, "started an outMessages batch (length is %d)",
1748                   (int)EVBUFFER_LENGTH( msgs->outMessages ) );
1749            msgs->outMessagesBatchedAt = now;
1750        }
1751        else if( haveMessages
1752               && ( ( now - msgs->outMessagesBatchedAt ) >
1753                   msgs->outMessagesBatchPeriod ) )
1754        {
1755            dbgmsg( msgs, "flushing outMessages... (length is %d)",
1756                   (int)EVBUFFER_LENGTH(
1757                       msgs->outMessages ) );
1758            tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1759            msgs->clientSentAnythingAt = now;
1760            msgs->outMessagesBatchedAt = 0;
1761            msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1762        }
1763        else if( !EVBUFFER_LENGTH( msgs->outBlock )
1764               && popNextRequest( msgs, &req )
1765               && requestIsValid( msgs, &req )
1766               && tr_cpPieceIsComplete( msgs->torrent->completion,
1767                                        req.index ) )
1768        {
1769            uint8_t * buf = tr_new( uint8_t, req.length );
1770            const int err = tr_ioRead( msgs->torrent,
1771                                       req.index, req.offset, req.length,
1772                                       buf );
1773            if( err )
1774            {
1775                fireError( msgs, err );
1776            }
1777            else
1778            {
1779                tr_peerIo *       io = msgs->io;
1780                struct evbuffer * out = msgs->outBlock;
1781
1782                dbgmsg( msgs, "sending block %u:%u->%u", req.index,
1783                        req.offset,
1784                        req.length );
1785                tr_peerIoWriteUint32(
1786                    io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) +
1787                    req.length );
1788                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1789                tr_peerIoWriteUint32( io, out, req.index );
1790                tr_peerIoWriteUint32( io, out, req.offset );
1791                tr_peerIoWriteBytes ( io, out, buf, req.length );
1792                msgs->sendingBlock = 1;
1793            }
1794
1795            tr_free( buf );
1796        }
1797        else if( ( !haveMessages )
1798               && ( now - msgs->clientSentAnythingAt ) >
1799                KEEPALIVE_INTERVAL_SECS )
1800        {
1801            sendKeepalive( msgs );
1802        }
1803    }
1804
1805    return TRUE; /* loop forever */
1806}
1807
1808void
1809tr_peerMsgsPulse( tr_peermsgs * msgs )
1810{
1811    if( msgs != NULL )
1812        peerPulse( msgs );
1813}
1814
1815static void
1816gotError( struct bufferevent * evbuf UNUSED,
1817          short                      what,
1818          void *                     vmsgs )
1819{
1820    if( what & EVBUFFER_TIMEOUT )
1821        dbgmsg( vmsgs, "libevent got a timeout, what=%hd, secs=%d", what,
1822                evbuf->timeout_read );
1823    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1824        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1825               what, errno, tr_strerror( errno ) );
1826    fireError( vmsgs, ENOTCONN );
1827}
1828
1829static void
1830sendBitfield( tr_peermsgs * msgs )
1831{
1832    struct evbuffer * out = msgs->outMessages;
1833    tr_bitfield *     field;
1834    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
1835    size_t            i;
1836    size_t            lazyCount = 0;
1837
1838    field = tr_bitfieldDup( tr_cpPieceBitfield( msgs->torrent->completion ) );
1839
1840    if( tr_sessionIsLazyBitfieldEnabled( msgs->session ) )
1841    {
1842        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
1843            speed over a truly random sample -- let's limit the pool size to
1844            the first 1000 pieces so large torrents don't bog things down */
1845        size_t             poolSize = MIN( msgs->torrent->info.pieceCount,
1846                                           1000 );
1847        tr_piece_index_t * pool = tr_new( tr_piece_index_t, poolSize );
1848
1849        /* build the pool */
1850        for( i = 0; i < poolSize; ++i )
1851            pool[i] = i;
1852
1853        /* pull random piece indices from the pool */
1854        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
1855        {
1856            const int              pos = tr_cryptoWeakRandInt( poolSize );
1857            const tr_piece_index_t piece = pool[pos];
1858            tr_bitfieldRem( field, piece );
1859            lazyPieces[lazyCount++] = piece;
1860            pool[pos] = pool[--poolSize];
1861        }
1862
1863        /* cleanup */
1864        tr_free( pool );
1865    }
1866
1867    tr_peerIoWriteUint32( msgs->io, out,
1868                          sizeof( uint8_t ) + field->byteCount );
1869    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1870    tr_peerIoWriteBytes ( msgs->io, out, field->bits, field->byteCount );
1871    dbgmsg( msgs, "sending bitfield... outMessage size is now %d",
1872           (int)EVBUFFER_LENGTH( out ) );
1873    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1874
1875    for( i = 0; i < lazyCount; ++i )
1876        protocolSendHave( msgs, lazyPieces[i] );
1877
1878    tr_bitfieldFree( field );
1879}
1880
1881/**
1882***
1883**/
1884
1885/* some peers give us error messages if we send
1886   more than this many peers in a single pex message
1887   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1888#define MAX_PEX_ADDED 50
1889#define MAX_PEX_DROPPED 50
1890
1891typedef struct
1892{
1893    tr_pex *  added;
1894    tr_pex *  dropped;
1895    tr_pex *  elements;
1896    int       addedCount;
1897    int       droppedCount;
1898    int       elementCount;
1899}
1900PexDiffs;
1901
1902static void
1903pexAddedCb( void * vpex,
1904            void * userData )
1905{
1906    PexDiffs * diffs = userData;
1907    tr_pex *   pex = vpex;
1908
1909    if( diffs->addedCount < MAX_PEX_ADDED )
1910    {
1911        diffs->added[diffs->addedCount++] = *pex;
1912        diffs->elements[diffs->elementCount++] = *pex;
1913    }
1914}
1915
1916static void
1917pexDroppedCb( void * vpex,
1918              void * userData )
1919{
1920    PexDiffs * diffs = userData;
1921    tr_pex *   pex = vpex;
1922
1923    if( diffs->droppedCount < MAX_PEX_DROPPED )
1924    {
1925        diffs->dropped[diffs->droppedCount++] = *pex;
1926    }
1927}
1928
1929static void
1930pexElementCb( void * vpex,
1931              void * userData )
1932{
1933    PexDiffs * diffs = userData;
1934    tr_pex *   pex = vpex;
1935
1936    diffs->elements[diffs->elementCount++] = *pex;
1937}
1938
1939static void
1940sendPex( tr_peermsgs * msgs )
1941{
1942    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
1943    {
1944        int               i;
1945        tr_pex *          newPex = NULL;
1946        const int         newCount = tr_peerMgrGetPeers(
1947            msgs->session->peerMgr, msgs->torrent->info.hash, &newPex );
1948        PexDiffs          diffs;
1949        tr_benc           val;
1950        uint8_t *         tmp, *walk;
1951        char *            benc;
1952        int               bencLen;
1953        struct evbuffer * out = msgs->outMessages;
1954
1955        /* build the diffs */
1956        diffs.added = tr_new( tr_pex, newCount );
1957        diffs.addedCount = 0;
1958        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1959        diffs.droppedCount = 0;
1960        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1961        diffs.elementCount = 0;
1962        tr_set_compare( msgs->pex, msgs->pexCount,
1963                        newPex, newCount,
1964                        tr_pexCompare, sizeof( tr_pex ),
1965                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
1966        dbgmsg(
1967            msgs,
1968            "pex: old peer count %d, new peer count %d, added %d, removed %d",
1969            msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1970
1971        /* update peer */
1972        tr_free( msgs->pex );
1973        msgs->pex = diffs.elements;
1974        msgs->pexCount = diffs.elementCount;
1975
1976        /* build the pex payload */
1977        tr_bencInitDict( &val, 3 );
1978
1979        /* "added" */
1980        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1981        for( i = 0; i < diffs.addedCount; ++i )
1982        {
1983            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1984            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1985        }
1986        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1987        tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
1988        tr_free( tmp );
1989
1990        /* "added.f" */
1991        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1992        for( i = 0; i < diffs.addedCount; ++i )
1993            *walk++ = diffs.added[i].flags;
1994        assert( ( walk - tmp ) == diffs.addedCount );
1995        tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
1996        tr_free( tmp );
1997
1998        /* "dropped" */
1999        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2000        for( i = 0; i < diffs.droppedCount; ++i )
2001        {
2002            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
2003            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2004        }
2005        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2006        tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2007        tr_free( tmp );
2008
2009        /* write the pex message */
2010        benc = tr_bencSave( &val, &bencLen );
2011        tr_peerIoWriteUint32( msgs->io, out,
2012                              2 * sizeof( uint8_t ) + bencLen );
2013        tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
2014        tr_peerIoWriteUint8 ( msgs->io, out, msgs->ut_pex_id );
2015        tr_peerIoWriteBytes ( msgs->io, out, benc, bencLen );
2016        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
2017        dbgmsg( msgs, "outMessage size is now %d",
2018               (int)EVBUFFER_LENGTH( out ) );
2019
2020        /* cleanup */
2021        tr_free( benc );
2022        tr_bencFree( &val );
2023        tr_free( diffs.added );
2024        tr_free( diffs.dropped );
2025        tr_free( newPex );
2026
2027        msgs->clientSentPexAt = time( NULL );
2028    }
2029}
2030
2031static int
2032pexPulse( void * vpeer )
2033{
2034    sendPex( vpeer );
2035    return TRUE;
2036}
2037
2038/**
2039***
2040**/
2041
2042tr_peermsgs*
2043tr_peerMsgsNew( struct tr_torrent * torrent,
2044                struct tr_peer *    info,
2045                tr_delivery_func    func,
2046                void *              userData,
2047                tr_publisher_tag *  setme )
2048{
2049    tr_peermsgs * m;
2050
2051    assert( info );
2052    assert( info->io );
2053
2054    m = tr_new0( tr_peermsgs, 1 );
2055    m->publisher = tr_publisherNew( );
2056    m->info = info;
2057    m->session = torrent->session;
2058    m->torrent = torrent;
2059    m->io = info->io;
2060    m->info->clientIsChoked = 1;
2061    m->info->peerIsChoked = 1;
2062    m->info->clientIsInterested = 0;
2063    m->info->peerIsInterested = 0;
2064    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
2065    m->state = AWAITING_BT_LENGTH;
2066    m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL );
2067    m->outMessages = evbuffer_new( );
2068    m->outMessagesBatchedAt = 0;
2069    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2070    m->incoming.block = evbuffer_new( );
2071    m->outBlock = evbuffer_new( );
2072    m->peerAllowedPieces = NULL;
2073    m->peerAskedFor = REQUEST_LIST_INIT;
2074    m->peerAskedForFast = REQUEST_LIST_INIT;
2075    m->clientAskedFor = REQUEST_LIST_INIT;
2076    m->clientWillAskFor = REQUEST_LIST_INIT;
2077    *setme = tr_publisherSubscribe( m->publisher, func, userData );
2078
2079    if( tr_peerIoSupportsLTEP( m->io ) )
2080        sendLtepHandshake( m );
2081
2082    sendBitfield( m );
2083
2084    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of
2085                                             inactivity */
2086    tr_peerIoSetIOFuncs( m->io, canRead, NULL, gotError, m );
2087    ratePulse( m );
2088
2089    return m;
2090}
2091
2092void
2093tr_peerMsgsFree( tr_peermsgs* msgs )
2094{
2095    if( msgs )
2096    {
2097        tr_timerFree( &msgs->pexTimer );
2098        tr_publisherFree( &msgs->publisher );
2099        reqListClear( &msgs->clientWillAskFor );
2100        reqListClear( &msgs->clientAskedFor );
2101        reqListClear( &msgs->peerAskedForFast );
2102        reqListClear( &msgs->peerAskedFor );
2103        tr_bitfieldFree( msgs->peerAllowedPieces );
2104        evbuffer_free( msgs->incoming.block );
2105        evbuffer_free( msgs->outMessages );
2106        evbuffer_free( msgs->outBlock );
2107        tr_free( msgs->pex );
2108
2109        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2110        tr_free( msgs );
2111    }
2112}
2113
2114void
2115tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2116                        tr_publisher_tag tag )
2117{
2118    tr_publisherUnsubscribe( peer->publisher, tag );
2119}
2120
Note: See TracBrowser for help on using the repository browser.