source: trunk/libtransmission/peer-msgs.c @ 7147

Last change on this file since 7147 was 7147, checked in by charles, 12 years ago

(libT) #1468: another stab at getting the peer transfer speeds both fast and a little more consistent.

  • Property svn:keywords set to Date Rev Author Id
File size: 59.8 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 7147 2008-11-24 04:21:23Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <event.h>
21
22#include "transmission.h"
23#include "bencode.h"
24#include "completion.h"
25#include "crypto.h"
26#include "inout.h"
27#ifdef WIN32
28#include "net.h" /* for ECONN */
29#endif
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-mgr-private.h"
33#include "peer-msgs.h"
34#include "ratecontrol.h"
35#include "stats.h"
36#include "torrent.h"
37#include "trevent.h"
38#include "utils.h"
39
40/**
41***
42**/
43
44enum
45{
46    BT_CHOKE                = 0,
47    BT_UNCHOKE              = 1,
48    BT_INTERESTED           = 2,
49    BT_NOT_INTERESTED       = 3,
50    BT_HAVE                 = 4,
51    BT_BITFIELD             = 5,
52    BT_REQUEST              = 6,
53    BT_PIECE                = 7,
54    BT_CANCEL               = 8,
55    BT_PORT                 = 9,
56    BT_SUGGEST              = 13,
57    BT_HAVE_ALL             = 14,
58    BT_HAVE_NONE            = 15,
59    BT_REJECT               = 16,
60    BT_ALLOWED_FAST         = 17,
61    BT_LTEP                 = 20,
62
63    LTEP_HANDSHAKE          = 0,
64
65    TR_LTEP_PEX             = 1,
66
67    MIN_CHOKE_PERIOD_SEC    = ( 10 ),
68
69    /* idle seconds before we send a keepalive */
70    KEEPALIVE_INTERVAL_SECS = 100,
71
72    PEX_INTERVAL            = ( 90 * 1000 ), /* msec between sendPex() calls */
73    PEER_PULSE_INTERVAL     = ( 250 ),       /* msec between peerPulse() calls
74                                               */
75
76    MAX_QUEUE_SIZE          = ( 100 ),
77
78    /* (fast peers) max number of pieces we fast-allow to another peer */
79    MAX_FAST_ALLOWED_COUNT   = 10,
80
81    /* (fast peers) max threshold for allowing fast-pieces requests */
82    MAX_FAST_ALLOWED_THRESHOLD = 10,
83
84    /* how long an unsent request can stay queued before it's returned
85       back to the peer-mgr's pool of requests */
86    QUEUED_REQUEST_TTL_SECS = 20,
87
88    /* how long a sent request can stay queued before it's returned
89       back to the peer-mgr's pool of requests */
90    SENT_REQUEST_TTL_SECS = 240,
91
92    /* used in lowering the outMessages queue period */
93    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
94    HIGH_PRIORITY_INTERVAL_SECS = 2,
95    LOW_PRIORITY_INTERVAL_SECS = 20,
96
97    /* number of pieces to remove from the bitfield when
98     * lazy bitfields are turned on */
99    LAZY_PIECE_COUNT = 26
100};
101
102/**
103***  REQUEST MANAGEMENT
104**/
105
106enum
107{
108    AWAITING_BT_LENGTH,
109    AWAITING_BT_ID,
110    AWAITING_BT_MESSAGE,
111    AWAITING_BT_PIECE
112};
113
114struct peer_request
115{
116    uint32_t    index;
117    uint32_t    offset;
118    uint32_t    length;
119    time_t      time_requested;
120};
121
122static int
123compareRequest( const void * va,
124                const void * vb )
125{
126    const struct peer_request * a = va;
127    const struct peer_request * b = vb;
128
129    if( a->index != b->index )
130        return a->index < b->index ? -1 : 1;
131
132    if( a->offset != b->offset )
133        return a->offset < b->offset ? -1 : 1;
134
135    if( a->length != b->length )
136        return a->length < b->length ? -1 : 1;
137
138    return 0;
139}
140
141struct request_list
142{
143    uint16_t               count;
144    uint16_t               max;
145    struct peer_request *  requests;
146};
147
148static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
149
150static void
151reqListReserve( struct request_list * list,
152                uint16_t              max )
153{
154    if( list->max < max )
155    {
156        list->max = max;
157        list->requests = tr_renew( struct peer_request,
158                                   list->requests,
159                                   list->max );
160    }
161}
162
163static void
164reqListClear( struct request_list * list )
165{
166    tr_free( list->requests );
167    *list = REQUEST_LIST_INIT;
168}
169
170static void
171reqListCopy( struct request_list *       dest,
172             const struct request_list * src )
173{
174    dest->count = dest->max = src->count;
175    dest->requests =
176        tr_memdup( src->requests, dest->count * sizeof( struct peer_request ) );
177}
178
179static void
180reqListRemoveOne( struct request_list * list,
181                  int                   i )
182{
183    assert( 0 <= i && i < list->count );
184
185    memmove( &list->requests[i],
186            &list->requests[i + 1],
187            sizeof( struct peer_request ) * ( --list->count - i ) );
188}
189
190static void
191reqListAppend( struct request_list *       list,
192               const struct peer_request * req )
193{
194    if( ++list->count >= list->max )
195        reqListReserve( list, list->max + 8 );
196
197    list->requests[list->count - 1] = *req;
198}
199
200static int
201reqListPop( struct request_list * list,
202            struct peer_request * setme )
203{
204    int success;
205
206    if( !list->count )
207        success = FALSE;
208    else {
209        *setme = list->requests[0];
210        reqListRemoveOne( list, 0 );
211        success = TRUE;
212    }
213
214    return success;
215}
216
217static int
218reqListFind( struct request_list *       list,
219             const struct peer_request * key )
220{
221    uint16_t i;
222
223    for( i = 0; i < list->count; ++i )
224        if( !compareRequest( key, list->requests + i ) )
225            return i;
226
227    return -1;
228}
229
230static int
231reqListRemove( struct request_list *       list,
232               const struct peer_request * key )
233{
234    int success;
235    const int i = reqListFind( list, key );
236
237    if( i < 0 )
238        success = FALSE;
239    else {
240        reqListRemoveOne( list, i );
241        success = TRUE;
242    }
243
244    return success;
245}
246
247/**
248***
249**/
250
251/* this is raw, unchanged data from the peer regarding
252 * the current message that it's sending us. */
253struct tr_incoming
254{
255    uint8_t                id;
256    uint32_t               length; /* includes the +1 for id length */
257    struct peer_request    blockReq; /* metadata for incoming blocks */
258    struct evbuffer *      block; /* piece data for incoming blocks */
259};
260
261struct tr_peermsgs
262{
263    unsigned int    peerSentBitfield        : 1;
264    unsigned int    peerSupportsPex         : 1;
265    unsigned int    clientSentLtepHandshake : 1;
266    unsigned int    peerSentLtepHandshake   : 1;
267    unsigned int    sendingBlock            : 1;
268
269    uint8_t         state;
270    uint8_t         ut_pex_id;
271    uint16_t        pexCount;
272    uint16_t        minActiveRequests;
273    uint16_t        maxActiveRequests;
274
275    /* how long the outMessages batch should be allowed to grow before
276     * it's flushed -- some messages (like requests >:) should be sent
277     * very quickly; others aren't as urgent. */
278    int                    outMessagesBatchPeriod;
279
280    tr_peer *              info;
281
282    tr_session *           session;
283    tr_torrent *           torrent;
284    tr_peerIo *            io;
285
286    tr_publisher_t *       publisher;
287
288    struct evbuffer *      outBlock; /* buffer of the current piece message */
289    struct evbuffer *      outMessages; /* all the non-piece messages */
290
291    struct request_list    peerAskedFor;
292    struct request_list    peerAskedForFast;
293    struct request_list    clientAskedFor;
294    struct request_list    clientWillAskFor;
295
296    tr_timer *             pexTimer;
297
298    time_t                 clientSentPexAt;
299    time_t                 clientSentAnythingAt;
300
301    /* when we started batching the outMessages */
302    time_t                outMessagesBatchedAt;
303
304    tr_bitfield *         peerAllowedPieces;
305
306    struct tr_incoming    incoming;
307
308    tr_pex *              pex;
309};
310
311/**
312***
313**/
314
315static void
316myDebug( const char *               file,
317         int                        line,
318         const struct tr_peermsgs * msgs,
319         const char *               fmt,
320         ... )
321{
322    FILE * fp = tr_getLog( );
323
324    if( fp )
325    {
326        va_list           args;
327        char              timestr[64];
328        struct evbuffer * buf = evbuffer_new( );
329        char *            base = tr_basename( file );
330
331        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
332                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
333                             msgs->torrent->info.name,
334                             tr_peerIoGetAddrStr( msgs->io ),
335                             msgs->info->client );
336        va_start( args, fmt );
337        evbuffer_add_vprintf( buf, fmt, args );
338        va_end( args );
339        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
340        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
341
342        tr_free( base );
343        evbuffer_free( buf );
344    }
345}
346
347#define dbgmsg( msgs, ... ) \
348    do { \
349        if( tr_deepLoggingIsActive( ) ) \
350            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
351    } while( 0 )
352
353/**
354***
355**/
356
357static void
358pokeBatchPeriod( tr_peermsgs * msgs,
359                 int           interval )
360{
361    if( msgs->outMessagesBatchPeriod > interval )
362    {
363        msgs->outMessagesBatchPeriod = interval;
364        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
365    }
366}
367
368static void
369protocolSendRequest( tr_peermsgs *               msgs,
370                     const struct peer_request * req )
371{
372    tr_peerIo *       io = msgs->io;
373    struct evbuffer * out = msgs->outMessages;
374
375    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
376    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
377    tr_peerIoWriteUint32( io, out, req->index );
378    tr_peerIoWriteUint32( io, out, req->offset );
379    tr_peerIoWriteUint32( io, out, req->length );
380    dbgmsg( msgs, "requesting %u:%u->%u... outMessage size is now %d",
381           req->index, req->offset, req->length, (int)EVBUFFER_LENGTH( out ) );
382    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
383}
384
385static void
386protocolSendCancel( tr_peermsgs *               msgs,
387                    const struct peer_request * req )
388{
389    tr_peerIo *       io = msgs->io;
390    struct evbuffer * out = msgs->outMessages;
391
392    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
393    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
394    tr_peerIoWriteUint32( io, out, req->index );
395    tr_peerIoWriteUint32( io, out, req->offset );
396    tr_peerIoWriteUint32( io, out, req->length );
397    dbgmsg( msgs, "cancelling %u:%u->%u... outMessage size is now %d",
398           req->index, req->offset, req->length, (int)EVBUFFER_LENGTH( out ) );
399    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
400}
401
402static void
403protocolSendHave( tr_peermsgs * msgs,
404                  uint32_t      index )
405{
406    tr_peerIo *       io = msgs->io;
407    struct evbuffer * out = msgs->outMessages;
408
409    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + sizeof( uint32_t ) );
410    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
411    tr_peerIoWriteUint32( io, out, index );
412    dbgmsg( msgs, "sending Have %u.. outMessage size is now %d",
413           index, (int)EVBUFFER_LENGTH( out ) );
414    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
415}
416
417static void
418protocolSendChoke( tr_peermsgs * msgs,
419                   int           choke )
420{
421    tr_peerIo *       io = msgs->io;
422    struct evbuffer * out = msgs->outMessages;
423
424    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
425    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
426    dbgmsg( msgs, "sending %s... outMessage size is now %d",
427           ( choke ? "Choke" : "Unchoke" ),
428           (int)EVBUFFER_LENGTH( out ) );
429    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
430}
431
432/**
433***  EVENTS
434**/
435
436static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0 };
437
438static void
439publish( tr_peermsgs *   msgs,
440         tr_peer_event * e )
441{
442    tr_publisherPublish( msgs->publisher, msgs->info, e );
443}
444
445static void
446fireError( tr_peermsgs * msgs,
447           int           err )
448{
449    tr_peer_event e = blankEvent;
450
451    e.eventType = TR_PEER_ERROR;
452    e.err = err;
453    publish( msgs, &e );
454}
455
456static void
457fireNeedReq( tr_peermsgs * msgs )
458{
459    tr_peer_event e = blankEvent;
460
461    e.eventType = TR_PEER_NEED_REQ;
462    publish( msgs, &e );
463}
464
465static void
466firePeerProgress( tr_peermsgs * msgs )
467{
468    tr_peer_event e = blankEvent;
469
470    e.eventType = TR_PEER_PEER_PROGRESS;
471    e.progress = msgs->info->progress;
472    publish( msgs, &e );
473}
474
475static void
476fireGotBlock( tr_peermsgs *               msgs,
477              const struct peer_request * req )
478{
479    tr_peer_event e = blankEvent;
480
481    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
482    e.pieceIndex = req->index;
483    e.offset = req->offset;
484    e.length = req->length;
485    publish( msgs, &e );
486}
487
488static void
489fireClientGotData( tr_peermsgs * msgs,
490                   uint32_t      length,
491                   int           wasPieceData )
492{
493    tr_peer_event e = blankEvent;
494
495    e.length = length;
496    e.eventType = TR_PEER_CLIENT_GOT_DATA;
497    e.wasPieceData = wasPieceData;
498    publish( msgs, &e );
499}
500
501static void
502firePeerGotData( tr_peermsgs  * msgs,
503                 uint32_t       length,
504                 int            wasPieceData )
505{
506    tr_peer_event e = blankEvent;
507
508    e.length = length;
509    e.eventType = TR_PEER_PEER_GOT_DATA;
510    e.wasPieceData = wasPieceData;
511
512    publish( msgs, &e );
513}
514
515static void
516fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
517{
518    tr_peer_event e = blankEvent;
519    e.eventType = TR_PEER_CANCEL;
520    e.pieceIndex = req->index;
521    e.offset = req->offset;
522    e.length = req->length;
523    publish( msgs, &e );
524}
525
526/**
527***  INTEREST
528**/
529
530static int
531isPieceInteresting( const tr_peermsgs * peer,
532                    tr_piece_index_t    piece )
533{
534    const tr_torrent * torrent = peer->torrent;
535
536    return ( !torrent->info.pieces[piece].dnd )                 /* we want it */
537           && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have
538                                                                        */
539           && ( tr_bitfieldHas( peer->info->have, piece ) );   /* peer has it */
540}
541
542/* "interested" means we'll ask for piece data if they unchoke us */
543static int
544isPeerInteresting( const tr_peermsgs * msgs )
545{
546    tr_piece_index_t    i;
547    const tr_torrent *  torrent;
548    const tr_bitfield * bitfield;
549    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
550
551    if( clientIsSeed )
552        return FALSE;
553
554    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
555        return FALSE;
556
557    torrent = msgs->torrent;
558    bitfield = tr_cpPieceBitfield( torrent->completion );
559
560    if( !msgs->info->have )
561        return TRUE;
562
563    assert( bitfield->byteCount == msgs->info->have->byteCount );
564
565    for( i = 0; i < torrent->info.pieceCount; ++i )
566        if( isPieceInteresting( msgs, i ) )
567            return TRUE;
568
569    return FALSE;
570}
571
572static void
573sendInterest( tr_peermsgs * msgs,
574              int           weAreInterested )
575{
576    struct evbuffer * out = msgs->outMessages;
577
578    assert( msgs );
579    assert( weAreInterested == 0 || weAreInterested == 1 );
580
581    msgs->info->clientIsInterested = weAreInterested;
582    dbgmsg( msgs, "Sending %s",
583            weAreInterested ? "Interested" : "Not Interested" );
584    tr_peerIoWriteUint32( msgs->io, out, sizeof( uint8_t ) );
585    tr_peerIoWriteUint8 (
586        msgs->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
587    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
588    dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH( out ) );
589}
590
591static void
592updateInterest( tr_peermsgs * msgs )
593{
594    const int i = isPeerInteresting( msgs );
595
596    if( i != msgs->info->clientIsInterested )
597        sendInterest( msgs, i );
598    if( i )
599        fireNeedReq( msgs );
600}
601
602static void
603cancelAllRequestsToClientExceptFast( tr_peermsgs * msgs )
604{
605    reqListClear( &msgs->peerAskedFor );
606}
607
608void
609tr_peerMsgsSetChoke( tr_peermsgs * msgs,
610                     int           choke )
611{
612    const time_t now = time( NULL );
613    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
614
615    assert( msgs );
616    assert( msgs->info );
617    assert( choke == 0 || choke == 1 );
618
619    if( msgs->info->chokeChangedAt > fibrillationTime )
620    {
621        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation",
622                choke );
623    }
624    else if( msgs->info->peerIsChoked != choke )
625    {
626        msgs->info->peerIsChoked = choke;
627        if( choke )
628            cancelAllRequestsToClientExceptFast( msgs );
629        protocolSendChoke( msgs, choke );
630        msgs->info->chokeChangedAt = now;
631    }
632}
633
634/**
635***
636**/
637
638void
639tr_peerMsgsHave( tr_peermsgs * msgs,
640                 uint32_t      index )
641{
642    protocolSendHave( msgs, index );
643
644    /* since we have more pieces now, we might not be interested in this peer */
645    updateInterest( msgs );
646}
647
648#if 0
649static void
650sendFastSuggest( tr_peermsgs * msgs,
651                 uint32_t      pieceIndex )
652{
653    assert( msgs );
654
655    if( tr_peerIoSupportsFEXT( msgs->io ) )
656    {
657        tr_peerIoWriteUint32( msgs->io, msgs->outMessages,
658                             sizeof( uint8_t ) + sizeof( uint32_t ) );
659        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
660        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
661    }
662}
663
664static void
665sendFastHave( tr_peermsgs * msgs,
666              int           all )
667{
668    assert( msgs );
669
670    if( tr_peerIoSupportsFEXT( msgs->io ) )
671    {
672        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof( uint8_t ) );
673        tr_peerIoWriteUint8( msgs->io, msgs->outMessages,
674                            ( all ? BT_HAVE_ALL
675                              : BT_HAVE_NONE ) );
676        updateInterest( msgs );
677    }
678}
679
680#endif
681
682static void
683sendFastReject( tr_peermsgs * msgs,
684                uint32_t      pieceIndex,
685                uint32_t      offset,
686                uint32_t      length )
687{
688    assert( msgs );
689
690    if( tr_peerIoSupportsFEXT( msgs->io ) )
691    {
692        struct evbuffer * out = msgs->outMessages;
693        const uint32_t    len = sizeof( uint8_t ) + 3 * sizeof( uint32_t );
694        dbgmsg( msgs, "sending fast reject %u:%u->%u", pieceIndex, offset,
695                length );
696        tr_peerIoWriteUint32( msgs->io, out, len );
697        tr_peerIoWriteUint8( msgs->io, out, BT_REJECT );
698        tr_peerIoWriteUint32( msgs->io, out, pieceIndex );
699        tr_peerIoWriteUint32( msgs->io, out, offset );
700        tr_peerIoWriteUint32( msgs->io, out, length );
701        pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
702        dbgmsg( msgs, "outMessage size is now %d",
703               (int)EVBUFFER_LENGTH( out ) );
704    }
705}
706
707static tr_bitfield*
708getPeerAllowedPieces( tr_peermsgs * msgs )
709{
710    if( !msgs->peerAllowedPieces && tr_peerIoSupportsFEXT( msgs->io ) )
711    {
712        msgs->peerAllowedPieces = tr_peerMgrGenerateAllowedSet(
713            MAX_FAST_ALLOWED_COUNT,
714            msgs->torrent->info.pieceCount,
715            msgs->torrent->info.hash,
716            tr_peerIoGetAddress( msgs->io, NULL ) );
717    }
718
719    return msgs->peerAllowedPieces;
720}
721
722static void
723sendFastAllowed( tr_peermsgs * msgs,
724                 uint32_t      pieceIndex )
725{
726    assert( msgs );
727
728    if( tr_peerIoSupportsFEXT( msgs->io ) )
729    {
730        struct evbuffer * out = msgs->outMessages;
731        dbgmsg( msgs, "sending fast allowed" );
732        tr_peerIoWriteUint32( msgs->io, out,  sizeof( uint8_t ) +
733                             sizeof( uint32_t ) );
734        tr_peerIoWriteUint8( msgs->io, out, BT_ALLOWED_FAST );
735        tr_peerIoWriteUint32( msgs->io, out, pieceIndex );
736        pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
737        dbgmsg( msgs, "outMessage size is now %d",
738               (int)EVBUFFER_LENGTH( out ) );
739    }
740}
741
742static void
743sendFastAllowedSet( tr_peermsgs * msgs )
744{
745    tr_piece_index_t i = 0;
746
747    while( i <= msgs->torrent->info.pieceCount )
748    {
749        if( tr_bitfieldHas( getPeerAllowedPieces( msgs ), i ) )
750            sendFastAllowed( msgs, i );
751        i++;
752    }
753}
754
755static void
756maybeSendFastAllowedSet( tr_peermsgs * msgs )
757{
758    if( tr_bitfieldCountTrueBits( msgs->info->have ) <=
759        MAX_FAST_ALLOWED_THRESHOLD )
760        sendFastAllowedSet( msgs );
761}
762
763/**
764***
765**/
766
767static int
768reqIsValid( const tr_peermsgs * peer,
769            uint32_t            index,
770            uint32_t            offset,
771            uint32_t            length )
772{
773    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
774}
775
776static int
777requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
778{
779    return reqIsValid( msgs, req->index, req->offset, req->length );
780}
781
782static void
783expireOldRequests( tr_peermsgs * msgs, const time_t now  )
784{
785    int                 i;
786    time_t              oldestAllowed;
787    struct request_list tmp = REQUEST_LIST_INIT;
788
789    /* cancel requests that have been queued for too long */
790    oldestAllowed = now - QUEUED_REQUEST_TTL_SECS;
791    reqListCopy( &tmp, &msgs->clientWillAskFor );
792    for( i = 0; i < tmp.count; ++i )
793    {
794        const struct peer_request * req = &tmp.requests[i];
795        if( req->time_requested < oldestAllowed )
796            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
797    }
798    reqListClear( &tmp );
799
800    /* cancel requests that were sent too long ago */
801    oldestAllowed = now - SENT_REQUEST_TTL_SECS;
802    reqListCopy( &tmp, &msgs->clientAskedFor );
803    for( i = 0; i < tmp.count; ++i )
804    {
805        const struct peer_request * req = &tmp.requests[i];
806        if( req->time_requested < oldestAllowed )
807            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
808    }
809    reqListClear( &tmp );
810}
811
812static void
813pumpRequestQueue( tr_peermsgs * msgs, const time_t now )
814{
815    const int           max = msgs->maxActiveRequests;
816    const int           min = msgs->minActiveRequests;
817    int                 sent = 0;
818    int                 count = msgs->clientAskedFor.count;
819    struct peer_request req;
820
821    if( count > min )
822        return;
823    if( msgs->info->clientIsChoked )
824        return;
825    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
826        return;
827
828    while( ( count < max ) && reqListPop( &msgs->clientWillAskFor, &req ) )
829    {
830        const tr_block_index_t block =
831            _tr_block( msgs->torrent, req.index, req.offset );
832
833        assert( requestIsValid( msgs, &req ) );
834        assert( tr_bitfieldHas( msgs->info->have, req.index ) );
835
836        /* don't ask for it if we've already got it... this block may have
837         * come in from a different peer after we cancelled a request for it */
838        if( !tr_cpBlockIsComplete( msgs->torrent->completion, block ) )
839        {
840            protocolSendRequest( msgs, &req );
841            req.time_requested = now;
842            reqListAppend( &msgs->clientAskedFor, &req );
843
844            ++count;
845            ++sent;
846        }
847    }
848
849    if( sent )
850        dbgmsg( msgs,
851                "pump sent %d requests, now have %d active and %d queued",
852                sent,
853                msgs->clientAskedFor.count,
854                msgs->clientWillAskFor.count );
855
856    if( count < max )
857        fireNeedReq( msgs );
858}
859
860static int
861requestQueueIsFull( const tr_peermsgs * msgs )
862{
863    const int req_max = msgs->maxActiveRequests;
864    return msgs->clientWillAskFor.count >= req_max;
865}
866
867tr_addreq_t
868tr_peerMsgsAddRequest( tr_peermsgs *    msgs,
869                       uint32_t         index,
870                       uint32_t         offset,
871                       uint32_t         length )
872{
873    struct peer_request req;
874
875    assert( msgs );
876    assert( msgs->torrent );
877    assert( reqIsValid( msgs, index, offset, length ) );
878
879    /**
880    ***  Reasons to decline the request
881    **/
882
883    /* don't send requests to choked clients */
884    if( msgs->info->clientIsChoked )
885    {
886        dbgmsg( msgs, "declining request because they're choking us" );
887        return TR_ADDREQ_CLIENT_CHOKED;
888    }
889
890    /* peer doesn't have this piece */
891    if( !tr_bitfieldHas( msgs->info->have, index ) )
892        return TR_ADDREQ_MISSING;
893
894    /* peer's queue is full */
895    if( requestQueueIsFull( msgs ) ) {
896        dbgmsg( msgs, "declining request because we're full" );
897        return TR_ADDREQ_FULL;
898    }
899
900    /* have we already asked for this piece? */
901    req.index = index;
902    req.offset = offset;
903    req.length = length;
904    if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
905        dbgmsg( msgs, "declining because it's a duplicate" );
906        return TR_ADDREQ_DUPLICATE;
907    }
908    if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
909        dbgmsg( msgs, "declining because it's a duplicate" );
910        return TR_ADDREQ_DUPLICATE;
911    }
912
913    /**
914    ***  Accept this request
915    **/
916
917    dbgmsg( msgs, "adding req for %"PRIu32":%"PRIu32"->%"PRIu32" to our `will request' list",
918            index, offset, length );
919    req.time_requested = time( NULL );
920    reqListAppend( &msgs->clientWillAskFor, &req );
921    return TR_ADDREQ_OK;
922}
923
924static void
925cancelAllRequestsToPeer( tr_peermsgs * msgs )
926{
927    int                 i;
928    struct request_list a = msgs->clientWillAskFor;
929    struct request_list b = msgs->clientAskedFor;
930    dbgmsg( msgs, "cancelling all requests to peer" );
931
932    msgs->clientAskedFor = REQUEST_LIST_INIT;
933    msgs->clientWillAskFor = REQUEST_LIST_INIT;
934
935    for( i=0; i<a.count; ++i )
936        fireCancelledReq( msgs, &a.requests[i] );
937
938    for( i = 0; i < b.count; ++i ) {
939        fireCancelledReq( msgs, &b.requests[i] );
940        protocolSendCancel( msgs, &b.requests[i] );
941    }
942
943    reqListClear( &a );
944    reqListClear( &b );
945}
946
947void
948tr_peerMsgsCancel( tr_peermsgs * msgs,
949                   uint32_t      pieceIndex,
950                   uint32_t      offset,
951                   uint32_t      length )
952{
953    struct peer_request req;
954
955    assert( msgs != NULL );
956    assert( length > 0 );
957
958
959    /* have we asked the peer for this piece? */
960    req.index = pieceIndex;
961    req.offset = offset;
962    req.length = length;
963
964    /* if it's only in the queue and hasn't been sent yet, free it */
965    if( reqListRemove( &msgs->clientWillAskFor, &req ) ) {
966        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32"\n", pieceIndex, offset, length );
967        fireCancelledReq( msgs, &req );
968    }
969
970    /* if it's already been sent, send a cancel message too */
971    if( reqListRemove( &msgs->clientAskedFor, &req ) ) {
972        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32"\n", pieceIndex, offset, length );
973        protocolSendCancel( msgs, &req );
974        fireCancelledReq( msgs, &req );
975    }
976}
977
978/**
979***
980**/
981
982static void
983sendLtepHandshake( tr_peermsgs * msgs )
984{
985    tr_benc           val, *m;
986    char *            buf;
987    int               len;
988    int               pex;
989    struct evbuffer * out = msgs->outMessages;
990
991    if( msgs->clientSentLtepHandshake )
992        return;
993
994    dbgmsg( msgs, "sending an ltep handshake" );
995    msgs->clientSentLtepHandshake = 1;
996
997    /* decide if we want to advertise pex support */
998    if( !tr_torrentAllowsPex( msgs->torrent ) )
999        pex = 0;
1000    else if( msgs->peerSentLtepHandshake )
1001        pex = msgs->peerSupportsPex ? 1 : 0;
1002    else
1003        pex = 1;
1004
1005    tr_bencInitDict( &val, 4 );
1006    tr_bencDictAddInt( &val, "e",
1007                       msgs->session->encryptionMode != TR_CLEAR_PREFERRED );
1008    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->session ) );
1009    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
1010    m  = tr_bencDictAddDict( &val, "m", 1 );
1011    if( pex )
1012        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
1013    buf = tr_bencSave( &val, &len );
1014
1015    tr_peerIoWriteUint32( msgs->io, out, 2 * sizeof( uint8_t ) + len );
1016    tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
1017    tr_peerIoWriteUint8 ( msgs->io, out, LTEP_HANDSHAKE );
1018    tr_peerIoWriteBytes ( msgs->io, out, buf, len );
1019    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1020    dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH( out ) );
1021
1022    /* cleanup */
1023    tr_bencFree( &val );
1024    tr_free( buf );
1025}
1026
1027static void
1028parseLtepHandshake( tr_peermsgs *     msgs,
1029                    int               len,
1030                    struct evbuffer * inbuf )
1031{
1032    int64_t   i;
1033    tr_benc   val, * sub;
1034    uint8_t * tmp = tr_new( uint8_t, len );
1035
1036    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
1037    msgs->peerSentLtepHandshake = 1;
1038
1039    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type != TYPE_DICT )
1040    {
1041        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
1042        tr_free( tmp );
1043        return;
1044    }
1045
1046    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
1047
1048    /* does the peer prefer encrypted connections? */
1049    if( tr_bencDictFindInt( &val, "e", &i ) )
1050        msgs->info->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1051                                            : ENCRYPTION_PREFERENCE_NO;
1052
1053    /* check supported messages for utorrent pex */
1054    msgs->peerSupportsPex = 0;
1055    if( tr_bencDictFindDict( &val, "m", &sub ) )
1056    {
1057        if( tr_bencDictFindInt( sub, "ut_pex", &i ) )
1058        {
1059            msgs->ut_pex_id = (uint8_t) i;
1060            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
1061            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
1062        }
1063    }
1064
1065    /* get peer's listening port */
1066    if( tr_bencDictFindInt( &val, "p", &i ) )
1067    {
1068        msgs->info->port = htons( (uint16_t)i );
1069        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
1070    }
1071
1072    tr_bencFree( &val );
1073    tr_free( tmp );
1074}
1075
1076static void
1077parseUtPex( tr_peermsgs *     msgs,
1078            int               msglen,
1079            struct evbuffer * inbuf )
1080{
1081    int                loaded = 0;
1082    uint8_t *          tmp = tr_new( uint8_t, msglen );
1083    tr_benc            val;
1084    const tr_torrent * tor = msgs->torrent;
1085    const uint8_t *    added;
1086    size_t             added_len;
1087
1088    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
1089
1090    if( tr_torrentAllowsPex( tor )
1091      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) )
1092      && tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1093    {
1094        const uint8_t * added_f = NULL;
1095        tr_pex *        pex;
1096        size_t          i, n;
1097        size_t          added_f_len = 0;
1098        tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1099        pex =
1100            tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len,
1101                                    &n );
1102        for( i = 0; i < n; ++i )
1103            tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1104                              TR_PEER_FROM_PEX, pex + i );
1105        tr_free( pex );
1106    }
1107
1108    if( loaded )
1109        tr_bencFree( &val );
1110    tr_free( tmp );
1111}
1112
1113static void sendPex( tr_peermsgs * msgs );
1114
1115static void
1116parseLtep( tr_peermsgs *     msgs,
1117           int               msglen,
1118           struct evbuffer * inbuf )
1119{
1120    uint8_t ltep_msgid;
1121
1122    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
1123    msglen--;
1124
1125    if( ltep_msgid == LTEP_HANDSHAKE )
1126    {
1127        dbgmsg( msgs, "got ltep handshake" );
1128        parseLtepHandshake( msgs, msglen, inbuf );
1129        if( tr_peerIoSupportsLTEP( msgs->io ) )
1130        {
1131            sendLtepHandshake( msgs );
1132            sendPex( msgs );
1133        }
1134    }
1135    else if( ltep_msgid == TR_LTEP_PEX )
1136    {
1137        dbgmsg( msgs, "got ut pex" );
1138        msgs->peerSupportsPex = 1;
1139        parseUtPex( msgs, msglen, inbuf );
1140    }
1141    else
1142    {
1143        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1144        evbuffer_drain( inbuf, msglen );
1145    }
1146}
1147
1148static int
1149readBtLength( tr_peermsgs *     msgs,
1150              struct evbuffer * inbuf,
1151              size_t            inlen )
1152{
1153    uint32_t len;
1154
1155    if( inlen < sizeof( len ) )
1156        return READ_LATER;
1157
1158    tr_peerIoReadUint32( msgs->io, inbuf, &len );
1159
1160    if( len == 0 ) /* peer sent us a keepalive message */
1161        dbgmsg( msgs, "got KeepAlive" );
1162    else
1163    {
1164        msgs->incoming.length = len;
1165        msgs->state = AWAITING_BT_ID;
1166    }
1167
1168    return READ_NOW;
1169}
1170
1171static int readBtMessage( tr_peermsgs *     msgs,
1172                          struct evbuffer * inbuf,
1173                          size_t            inlen );
1174
1175static int
1176readBtId( tr_peermsgs *     msgs,
1177          struct evbuffer * inbuf,
1178          size_t            inlen )
1179{
1180    uint8_t id;
1181
1182    if( inlen < sizeof( uint8_t ) )
1183        return READ_LATER;
1184
1185    tr_peerIoReadUint8( msgs->io, inbuf, &id );
1186    msgs->incoming.id = id;
1187
1188    if( id == BT_PIECE )
1189    {
1190        msgs->state = AWAITING_BT_PIECE;
1191        return READ_NOW;
1192    }
1193    else if( msgs->incoming.length != 1 )
1194    {
1195        msgs->state = AWAITING_BT_MESSAGE;
1196        return READ_NOW;
1197    }
1198    else return readBtMessage( msgs, inbuf, inlen - 1 );
1199}
1200
1201static void
1202updatePeerProgress( tr_peermsgs * msgs )
1203{
1204    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have )
1205                           / (float)msgs->torrent->info.pieceCount;
1206    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
1207    updateInterest( msgs );
1208    firePeerProgress( msgs );
1209}
1210
1211static int
1212clientCanSendFastBlock( const tr_peermsgs * msgs UNUSED )
1213{
1214    /* don't send a fast piece if peer has MAX_FAST_ALLOWED_THRESHOLD pieces */
1215    if( tr_bitfieldCountTrueBits( msgs->info->have ) >
1216        MAX_FAST_ALLOWED_THRESHOLD )
1217        return FALSE;
1218
1219    /* ...or if we don't have ourself enough pieces */
1220    if( tr_bitfieldCountTrueBits( tr_cpPieceBitfield( msgs->torrent->
1221                                                      completion ) ) <
1222        MAX_FAST_ALLOWED_THRESHOLD )
1223        return FALSE;
1224
1225    /* Maybe a bandwidth limit ? */
1226    return TRUE;
1227}
1228
1229static void
1230peerMadeRequest( tr_peermsgs *               msgs,
1231                 const struct peer_request * req )
1232{
1233    const int reqIsValid = requestIsValid( msgs, req );
1234    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete(
1235        msgs->torrent->completion, req->index );
1236    const int peerIsChoked = msgs->info->peerIsChoked;
1237    const int peerIsFast = tr_peerIoSupportsFEXT( msgs->io );
1238    const int pieceIsFast = reqIsValid && tr_bitfieldHas(
1239        getPeerAllowedPieces( msgs ), req->index );
1240    const int canSendFast = clientCanSendFastBlock( msgs );
1241
1242    if( !reqIsValid ) /* bad request */
1243    {
1244        dbgmsg( msgs, "rejecting an invalid request." );
1245        sendFastReject( msgs, req->index, req->offset, req->length );
1246    }
1247    else if( !clientHasPiece ) /* we don't have it */
1248    {
1249        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1250        sendFastReject( msgs, req->index, req->offset, req->length );
1251    }
1252    else if( peerIsChoked && !peerIsFast ) /* doesn't he know he's choked? */
1253    {
1254        tr_peerMsgsSetChoke( msgs, 1 );
1255        sendFastReject( msgs, req->index, req->offset, req->length );
1256    }
1257    else if( peerIsChoked && peerIsFast && ( !pieceIsFast || !canSendFast ) )
1258    {
1259        sendFastReject( msgs, req->index, req->offset, req->length );
1260    }
1261    else /* YAY */
1262    {
1263        if( peerIsFast && pieceIsFast )
1264            reqListAppend( &msgs->peerAskedForFast, req );
1265        else
1266            reqListAppend( &msgs->peerAskedFor, req );
1267    }
1268}
1269
1270static int
1271messageLengthIsCorrect( const tr_peermsgs * msg,
1272                        uint8_t             id,
1273                        uint32_t            len )
1274{
1275    switch( id )
1276    {
1277        case BT_CHOKE:
1278        case BT_UNCHOKE:
1279        case BT_INTERESTED:
1280        case BT_NOT_INTERESTED:
1281        case BT_HAVE_ALL:
1282        case BT_HAVE_NONE:
1283            return len == 1;
1284
1285        case BT_HAVE:
1286        case BT_SUGGEST:
1287        case BT_ALLOWED_FAST:
1288            return len == 5;
1289
1290        case BT_BITFIELD:
1291            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1292
1293        case BT_REQUEST:
1294        case BT_CANCEL:
1295        case BT_REJECT:
1296            return len == 13;
1297
1298        case BT_PIECE:
1299            return len > 9 && len <= 16393;
1300
1301        case BT_PORT:
1302            return len == 3;
1303
1304        case BT_LTEP:
1305            return len >= 2;
1306
1307        default:
1308            return FALSE;
1309    }
1310}
1311
1312static int clientGotBlock( tr_peermsgs *               msgs,
1313                           const uint8_t *             block,
1314                           const struct peer_request * req );
1315
1316static int
1317readBtPiece( tr_peermsgs *     msgs,
1318             struct evbuffer * inbuf,
1319             size_t            inlen )
1320{
1321    struct peer_request * req = &msgs->incoming.blockReq;
1322
1323    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1324    dbgmsg( msgs, "In readBtPiece" );
1325
1326    if( !req->length )
1327    {
1328        if( inlen < 8 )
1329            return READ_LATER;
1330
1331        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
1332        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
1333        req->length = msgs->incoming.length - 9;
1334        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index,
1335                req->offset,
1336                req->length );
1337        return READ_NOW;
1338    }
1339    else
1340    {
1341        int          err;
1342
1343        /* read in another chunk of data */
1344        const size_t nLeft = req->length - EVBUFFER_LENGTH(
1345            msgs->incoming.block );
1346        size_t       n = MIN( nLeft, inlen );
1347        uint8_t *    buf = tr_new( uint8_t, n );
1348        assert( EVBUFFER_LENGTH( inbuf ) >= n );
1349        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
1350        evbuffer_add( msgs->incoming.block, buf, n );
1351        fireClientGotData( msgs, n, TRUE );
1352        tr_free( buf );
1353        dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
1354               (int)n, req->index, req->offset, req->length,
1355               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1356        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1357            return READ_LATER;
1358
1359        /* we've got the whole block ... process it */
1360        err = clientGotBlock( msgs, EVBUFFER_DATA(
1361                                  msgs->incoming.block ), req );
1362
1363        /* cleanup */
1364        evbuffer_drain( msgs->incoming.block,
1365                       EVBUFFER_LENGTH( msgs->incoming.block ) );
1366        req->length = 0;
1367        msgs->state = AWAITING_BT_LENGTH;
1368        if( !err )
1369            return READ_NOW;
1370        else
1371        {
1372            fireError( msgs, err );
1373            return READ_ERR;
1374        }
1375    }
1376}
1377
1378static int
1379readBtMessage( tr_peermsgs *     msgs,
1380               struct evbuffer * inbuf,
1381               size_t            inlen )
1382{
1383    uint32_t      ui32;
1384    uint32_t      msglen = msgs->incoming.length;
1385    const uint8_t id = msgs->incoming.id;
1386    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1387
1388    --msglen; /* id length */
1389
1390    if( inlen < msglen )
1391        return READ_LATER;
1392
1393    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id,
1394            (int)msglen,
1395            (int)inlen );
1396
1397    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1398    {
1399        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d",
1400                (int)id, (int)msglen );
1401        fireError( msgs, EMSGSIZE );
1402        return READ_ERR;
1403    }
1404
1405    switch( id )
1406    {
1407        case BT_CHOKE:
1408            dbgmsg( msgs, "got Choke" );
1409            msgs->info->clientIsChoked = 1;
1410            cancelAllRequestsToPeer( msgs );
1411            cancelAllRequestsToClientExceptFast( msgs );
1412            break;
1413
1414        case BT_UNCHOKE:
1415            dbgmsg( msgs, "got Unchoke" );
1416            msgs->info->clientIsChoked = 0;
1417            fireNeedReq( msgs );
1418            break;
1419
1420        case BT_INTERESTED:
1421            dbgmsg( msgs, "got Interested" );
1422            msgs->info->peerIsInterested = 1;
1423            break;
1424
1425        case BT_NOT_INTERESTED:
1426            dbgmsg( msgs, "got Not Interested" );
1427            msgs->info->peerIsInterested = 0;
1428            break;
1429
1430        case BT_HAVE:
1431            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1432            dbgmsg( msgs, "got Have: %u", ui32 );
1433            if( tr_bitfieldAdd( msgs->info->have, ui32 ) )
1434                fireError( msgs, ERANGE );
1435            updatePeerProgress( msgs );
1436            tr_rcTransferred( msgs->torrent->swarmSpeed,
1437                              msgs->torrent->info.pieceSize );
1438            break;
1439
1440        case BT_BITFIELD:
1441        {
1442            dbgmsg( msgs, "got a bitfield" );
1443            msgs->peerSentBitfield = 1;
1444            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits,
1445                                msglen );
1446            updatePeerProgress( msgs );
1447            maybeSendFastAllowedSet( msgs );
1448            fireNeedReq( msgs );
1449            break;
1450        }
1451
1452        case BT_REQUEST:
1453        {
1454            struct peer_request r;
1455            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1456            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1457            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1458            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset,
1459                    r.length );
1460            peerMadeRequest( msgs, &r );
1461            break;
1462        }
1463
1464        case BT_CANCEL:
1465        {
1466            struct peer_request r;
1467            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1468            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1469            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1470            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset,
1471                    r.length );
1472            reqListRemove( &msgs->peerAskedForFast, &r );
1473            reqListRemove( &msgs->peerAskedFor, &r );
1474            break;
1475        }
1476
1477        case BT_PIECE:
1478            assert( 0 ); /* handled elsewhere! */
1479            break;
1480
1481        case BT_PORT:
1482            dbgmsg( msgs, "Got a BT_PORT" );
1483            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1484            break;
1485
1486        case BT_SUGGEST:
1487        {
1488            dbgmsg( msgs, "Got a BT_SUGGEST" );
1489            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1490            /* we don't do anything with this yet */
1491            break;
1492        }
1493
1494        case BT_HAVE_ALL:
1495            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1496            tr_bitfieldAddRange( msgs->info->have, 0,
1497                                 msgs->torrent->info.pieceCount );
1498            updatePeerProgress( msgs );
1499            maybeSendFastAllowedSet( msgs );
1500            break;
1501
1502
1503        case BT_HAVE_NONE:
1504            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1505            tr_bitfieldClear( msgs->info->have );
1506            updatePeerProgress( msgs );
1507            maybeSendFastAllowedSet( msgs );
1508            break;
1509
1510        case BT_REJECT:
1511        {
1512            struct peer_request r;
1513            dbgmsg( msgs, "Got a BT_REJECT" );
1514            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1515            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1516            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1517            reqListRemove( &msgs->clientAskedFor, &r );
1518            break;
1519        }
1520
1521        case BT_ALLOWED_FAST:
1522        {
1523            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1524            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1525            /* we don't do anything with this yet */
1526            break;
1527        }
1528
1529        case BT_LTEP:
1530            dbgmsg( msgs, "Got a BT_LTEP" );
1531            parseLtep( msgs, msglen, inbuf );
1532            break;
1533
1534        default:
1535            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1536            tr_peerIoDrain( msgs->io, inbuf, msglen );
1537            break;
1538    }
1539
1540    assert( msglen + 1 == msgs->incoming.length );
1541    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1542
1543    msgs->state = AWAITING_BT_LENGTH;
1544    return READ_NOW;
1545}
1546
1547static void
1548decrementDownloadedCount( tr_peermsgs * msgs,
1549                          uint32_t      byteCount )
1550{
1551    tr_torrent * tor = msgs->torrent;
1552
1553    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1554}
1555
1556static void
1557clientGotUnwantedBlock( tr_peermsgs *               msgs,
1558                        const struct peer_request * req )
1559{
1560    decrementDownloadedCount( msgs, req->length );
1561}
1562
1563static void
1564addPeerToBlamefield( tr_peermsgs * msgs,
1565                     uint32_t      index )
1566{
1567    if( !msgs->info->blame )
1568        msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1569    tr_bitfieldAdd( msgs->info->blame, index );
1570}
1571
1572/* returns 0 on success, or an errno on failure */
1573static int
1574clientGotBlock( tr_peermsgs *               msgs,
1575                const uint8_t *             data,
1576                const struct peer_request * req )
1577{
1578    int                    err;
1579    tr_torrent *           tor = msgs->torrent;
1580    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1581
1582    assert( msgs );
1583    assert( req );
1584
1585    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) )
1586    {
1587        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1588                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1589        return EMSGSIZE;
1590    }
1591
1592    /* save the block */
1593    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset,
1594            req->length );
1595
1596    /**
1597    *** Remove the block from our `we asked for this' list
1598    **/
1599
1600    if( !reqListRemove( &msgs->clientAskedFor, req ) )
1601    {
1602        clientGotUnwantedBlock( msgs, req );
1603        dbgmsg( msgs, "we didn't ask for this message..." );
1604        return 0;
1605    }
1606
1607    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1608            msgs->clientAskedFor.count );
1609
1610    /**
1611    *** Error checks
1612    **/
1613
1614    if( tr_cpBlockIsComplete( tor->completion, block ) )
1615    {
1616        dbgmsg( msgs, "we have this block already..." );
1617        clientGotUnwantedBlock( msgs, req );
1618        return 0;
1619    }
1620
1621    /**
1622    ***  Save the block
1623    **/
1624
1625    msgs->info->peerSentPieceDataAt = time( NULL );
1626    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1627        return err;
1628
1629    addPeerToBlamefield( msgs, req->index );
1630    fireGotBlock( msgs, req );
1631    return 0;
1632}
1633
1634static int peerPulse( void * vmsgs );
1635
1636static void
1637didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1638{
1639    tr_peermsgs * msgs = vmsgs;
1640    firePeerGotData( msgs, bytesWritten, wasPieceData );
1641    peerPulse( msgs );
1642}
1643
1644static ReadState
1645canRead( struct bufferevent * evin,
1646         void *               vmsgs )
1647{
1648    ReadState         ret;
1649    tr_peermsgs *     msgs = vmsgs;
1650    struct evbuffer * in = EVBUFFER_INPUT ( evin );
1651    const size_t      inlen = EVBUFFER_LENGTH( in );
1652
1653    if( !inlen )
1654    {
1655        ret = READ_LATER;
1656    }
1657    else if( msgs->state == AWAITING_BT_PIECE )
1658    {
1659        ret = inlen ? readBtPiece( msgs, in, inlen ) : READ_LATER;
1660    }
1661    else switch( msgs->state )
1662    {
1663        case AWAITING_BT_LENGTH:
1664            ret = readBtLength ( msgs, in, inlen ); break;
1665
1666        case AWAITING_BT_ID:
1667            ret = readBtId     ( msgs, in, inlen ); break;
1668
1669        case AWAITING_BT_MESSAGE:
1670            ret = readBtMessage( msgs, in, inlen ); break;
1671
1672        default:
1673            assert( 0 );
1674    }
1675
1676    /* log the raw data that was read */
1677    if( EVBUFFER_LENGTH( in ) != inlen )
1678        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1679
1680    return ret;
1681}
1682
1683static void
1684sendKeepalive( tr_peermsgs * msgs )
1685{
1686    dbgmsg( msgs, "sending a keepalive message" );
1687    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1688    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1689}
1690
1691/**
1692***
1693**/
1694
1695static int
1696ratePulse( void * vpeer )
1697{
1698    tr_peermsgs * peer = vpeer;
1699    const double rateToClient = tr_peerGetPieceSpeed( peer->info,
1700                                                      TR_PEER_TO_CLIENT );
1701    const int estimatedBlocksInNext30Seconds =
1702                  ( rateToClient * 30 * 1024 ) / peer->torrent->blockSize;
1703
1704    peer->minActiveRequests = 8;
1705    peer->maxActiveRequests = peer->minActiveRequests + estimatedBlocksInNext30Seconds;
1706    return TRUE;
1707}
1708
1709static int
1710popNextRequest( tr_peermsgs *         msgs,
1711                struct peer_request * setme )
1712{
1713    return reqListPop( &msgs->peerAskedForFast, setme )
1714        || reqListPop( &msgs->peerAskedFor, setme );
1715}
1716
1717static int
1718peerPulse( void * vmsgs )
1719{
1720    tr_peermsgs * msgs = vmsgs;
1721    const time_t  now = time( NULL );
1722
1723    ratePulse( msgs );
1724
1725    /*tr_peerIoTryRead( msgs->io );*/
1726    pumpRequestQueue( msgs, now );
1727    expireOldRequests( msgs, now );
1728
1729    if( msgs->sendingBlock )
1730    {
1731        const size_t uploadMax = tr_peerIoGetWriteBufferSpace( msgs->io );
1732        size_t       len = EVBUFFER_LENGTH( msgs->outBlock );
1733        const size_t outlen = MIN( len, uploadMax );
1734
1735        assert( len );
1736
1737        if( outlen )
1738        {
1739            tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen, TRUE );
1740            evbuffer_drain( msgs->outBlock, outlen );
1741
1742            len -= outlen;
1743            msgs->clientSentAnythingAt = now;
1744            msgs->sendingBlock = len != 0;
1745
1746            dbgmsg( msgs, "wrote %zu bytes; %zu left in block", outlen, len );
1747        }
1748        else dbgmsg( msgs, "stalled writing block... uploadMax %lu, outlen %lu", uploadMax, outlen );
1749    }
1750
1751    if( !msgs->sendingBlock )
1752    {
1753        struct peer_request req;
1754        const int haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1755
1756        if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1757        {
1758            dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1759            msgs->outMessagesBatchedAt = now;
1760        }
1761        else if( ( haveMessages ) &&
1762                 ( ( now - msgs->outMessagesBatchedAt ) > msgs->outMessagesBatchPeriod ) )
1763        {
1764            dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->io, EVBUFFER_LENGTH( msgs->outMessages ) );
1765            tr_peerIoWriteBuf( msgs->io, msgs->outMessages, FALSE );
1766            msgs->clientSentAnythingAt = now;
1767            msgs->outMessagesBatchedAt = 0;
1768            msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1769        }
1770        else if( !EVBUFFER_LENGTH( msgs->outBlock )
1771               && popNextRequest( msgs, &req )
1772               && requestIsValid( msgs, &req )
1773               && tr_cpPieceIsComplete( msgs->torrent->completion,
1774                                        req.index ) )
1775        {
1776            uint8_t * buf = tr_new( uint8_t, req.length );
1777            const int err = tr_ioRead( msgs->torrent,
1778                                       req.index, req.offset, req.length,
1779                                       buf );
1780            if( err )
1781            {
1782                fireError( msgs, err );
1783            }
1784            else
1785            {
1786                tr_peerIo *       io = msgs->io;
1787                struct evbuffer * out = msgs->outBlock;
1788
1789                dbgmsg( msgs, "sending block %u:%u->%u", req.index,
1790                        req.offset,
1791                        req.length );
1792                tr_peerIoWriteUint32(
1793                    io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) +
1794                    req.length );
1795                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1796                tr_peerIoWriteUint32( io, out, req.index );
1797                tr_peerIoWriteUint32( io, out, req.offset );
1798                tr_peerIoWriteBytes ( io, out, buf, req.length );
1799                msgs->sendingBlock = 1;
1800            }
1801
1802            tr_free( buf );
1803        }
1804        else if( ( !haveMessages )
1805               && ( now - msgs->clientSentAnythingAt ) >
1806                KEEPALIVE_INTERVAL_SECS )
1807        {
1808            sendKeepalive( msgs );
1809        }
1810    }
1811
1812    return TRUE; /* loop forever */
1813}
1814
1815void
1816tr_peerMsgsPulse( tr_peermsgs * msgs )
1817{
1818    if( msgs != NULL )
1819        peerPulse( msgs );
1820}
1821
1822static void
1823gotError( struct bufferevent * evbuf UNUSED,
1824          short                      what,
1825          void *                     vmsgs )
1826{
1827    if( what & EVBUFFER_TIMEOUT )
1828        dbgmsg( vmsgs, "libevent got a timeout, what=%hd, secs=%d", what,
1829                evbuf->timeout_read );
1830    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1831        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1832               what, errno, tr_strerror( errno ) );
1833    fireError( vmsgs, ENOTCONN );
1834}
1835
1836static void
1837sendBitfield( tr_peermsgs * msgs )
1838{
1839    struct evbuffer * out = msgs->outMessages;
1840    tr_bitfield *     field;
1841    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
1842    size_t            i;
1843    size_t            lazyCount = 0;
1844
1845    field = tr_bitfieldDup( tr_cpPieceBitfield( msgs->torrent->completion ) );
1846
1847#if 0
1848    if( tr_sessionIsLazyBitfieldEnabled( msgs->session ) )
1849    {
1850        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
1851            speed over a truly random sample -- let's limit the pool size to
1852            the first 1000 pieces so large torrents don't bog things down */
1853        size_t poolSize;
1854        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
1855        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
1856
1857        /* build the pool */
1858        for( i=poolSize=0; i<maxPoolSize; ++i )
1859            if( tr_bitfieldHas( field, i ) )
1860                pool[poolSize++] = i;
1861
1862        /* pull random piece indices from the pool */
1863        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
1864        {
1865            const int pos = tr_cryptoWeakRandInt( poolSize );
1866            const tr_piece_index_t piece = pool[pos];
1867            tr_bitfieldRem( field, piece );
1868            lazyPieces[lazyCount++] = piece;
1869            pool[pos] = pool[--poolSize];
1870        }
1871
1872        /* cleanup */
1873        tr_free( pool );
1874    }
1875#endif
1876
1877    tr_peerIoWriteUint32( msgs->io, out,
1878                          sizeof( uint8_t ) + field->byteCount );
1879    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1880    tr_peerIoWriteBytes ( msgs->io, out, field->bits, field->byteCount );
1881    dbgmsg( msgs, "sending bitfield... outMessage size is now %d",
1882           (int)EVBUFFER_LENGTH( out ) );
1883    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1884
1885    for( i = 0; i < lazyCount; ++i )
1886        protocolSendHave( msgs, lazyPieces[i] );
1887
1888    tr_bitfieldFree( field );
1889}
1890
1891/**
1892***
1893**/
1894
1895/* some peers give us error messages if we send
1896   more than this many peers in a single pex message
1897   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1898#define MAX_PEX_ADDED 50
1899#define MAX_PEX_DROPPED 50
1900
1901typedef struct
1902{
1903    tr_pex *  added;
1904    tr_pex *  dropped;
1905    tr_pex *  elements;
1906    int       addedCount;
1907    int       droppedCount;
1908    int       elementCount;
1909}
1910PexDiffs;
1911
1912static void
1913pexAddedCb( void * vpex,
1914            void * userData )
1915{
1916    PexDiffs * diffs = userData;
1917    tr_pex *   pex = vpex;
1918
1919    if( diffs->addedCount < MAX_PEX_ADDED )
1920    {
1921        diffs->added[diffs->addedCount++] = *pex;
1922        diffs->elements[diffs->elementCount++] = *pex;
1923    }
1924}
1925
1926static void
1927pexDroppedCb( void * vpex,
1928              void * userData )
1929{
1930    PexDiffs * diffs = userData;
1931    tr_pex *   pex = vpex;
1932
1933    if( diffs->droppedCount < MAX_PEX_DROPPED )
1934    {
1935        diffs->dropped[diffs->droppedCount++] = *pex;
1936    }
1937}
1938
1939static void
1940pexElementCb( void * vpex,
1941              void * userData )
1942{
1943    PexDiffs * diffs = userData;
1944    tr_pex *   pex = vpex;
1945
1946    diffs->elements[diffs->elementCount++] = *pex;
1947}
1948
1949static void
1950sendPex( tr_peermsgs * msgs )
1951{
1952    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
1953    {
1954        int               i;
1955        tr_pex *          newPex = NULL;
1956        const int         newCount = tr_peerMgrGetPeers(
1957            msgs->session->peerMgr, msgs->torrent->info.hash, &newPex );
1958        PexDiffs          diffs;
1959        tr_benc           val;
1960        uint8_t *         tmp, *walk;
1961        char *            benc;
1962        int               bencLen;
1963        struct evbuffer * out = msgs->outMessages;
1964
1965        /* build the diffs */
1966        diffs.added = tr_new( tr_pex, newCount );
1967        diffs.addedCount = 0;
1968        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1969        diffs.droppedCount = 0;
1970        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1971        diffs.elementCount = 0;
1972        tr_set_compare( msgs->pex, msgs->pexCount,
1973                        newPex, newCount,
1974                        tr_pexCompare, sizeof( tr_pex ),
1975                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
1976        dbgmsg(
1977            msgs,
1978            "pex: old peer count %d, new peer count %d, added %d, removed %d",
1979            msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1980
1981        /* update peer */
1982        tr_free( msgs->pex );
1983        msgs->pex = diffs.elements;
1984        msgs->pexCount = diffs.elementCount;
1985
1986        /* build the pex payload */
1987        tr_bencInitDict( &val, 3 );
1988
1989        /* "added" */
1990        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1991        for( i = 0; i < diffs.addedCount; ++i )
1992        {
1993            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1994            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1995        }
1996        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1997        tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
1998        tr_free( tmp );
1999
2000        /* "added.f" */
2001        tmp = walk = tr_new( uint8_t, diffs.addedCount );
2002        for( i = 0; i < diffs.addedCount; ++i )
2003            *walk++ = diffs.added[i].flags;
2004        assert( ( walk - tmp ) == diffs.addedCount );
2005        tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2006        tr_free( tmp );
2007
2008        /* "dropped" */
2009        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2010        for( i = 0; i < diffs.droppedCount; ++i )
2011        {
2012            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
2013            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2014        }
2015        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2016        tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2017        tr_free( tmp );
2018
2019        /* write the pex message */
2020        benc = tr_bencSave( &val, &bencLen );
2021        tr_peerIoWriteUint32( msgs->io, out,
2022                              2 * sizeof( uint8_t ) + bencLen );
2023        tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
2024        tr_peerIoWriteUint8 ( msgs->io, out, msgs->ut_pex_id );
2025        tr_peerIoWriteBytes ( msgs->io, out, benc, bencLen );
2026        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
2027        dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
2028
2029        /* cleanup */
2030        tr_free( benc );
2031        tr_bencFree( &val );
2032        tr_free( diffs.added );
2033        tr_free( diffs.dropped );
2034        tr_free( newPex );
2035
2036        msgs->clientSentPexAt = time( NULL );
2037    }
2038}
2039
2040static int
2041pexPulse( void * vpeer )
2042{
2043    sendPex( vpeer );
2044    return TRUE;
2045}
2046
2047/**
2048***
2049**/
2050
2051tr_peermsgs*
2052tr_peerMsgsNew( struct tr_torrent * torrent,
2053                struct tr_peer *    info,
2054                tr_delivery_func    func,
2055                void *              userData,
2056                tr_publisher_tag *  setme )
2057{
2058    tr_peermsgs * m;
2059
2060    assert( info );
2061    assert( info->io );
2062
2063    m = tr_new0( tr_peermsgs, 1 );
2064    m->publisher = tr_publisherNew( );
2065    m->info = info;
2066    m->session = torrent->session;
2067    m->torrent = torrent;
2068    m->io = info->io;
2069    m->info->clientIsChoked = 1;
2070    m->info->peerIsChoked = 1;
2071    m->info->clientIsInterested = 0;
2072    m->info->peerIsInterested = 0;
2073    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
2074    m->state = AWAITING_BT_LENGTH;
2075    m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL );
2076    m->outMessages = evbuffer_new( );
2077    m->outMessagesBatchedAt = 0;
2078    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2079    m->incoming.block = evbuffer_new( );
2080    m->outBlock = evbuffer_new( );
2081    m->peerAllowedPieces = NULL;
2082    m->peerAskedFor = REQUEST_LIST_INIT;
2083    m->peerAskedForFast = REQUEST_LIST_INIT;
2084    m->clientAskedFor = REQUEST_LIST_INIT;
2085    m->clientWillAskFor = REQUEST_LIST_INIT;
2086    *setme = tr_publisherSubscribe( m->publisher, func, userData );
2087
2088    if( tr_peerIoSupportsLTEP( m->io ) )
2089        sendLtepHandshake( m );
2090
2091    sendBitfield( m );
2092
2093    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of
2094                                             inactivity */
2095    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
2096    ratePulse( m );
2097
2098    return m;
2099}
2100
2101void
2102tr_peerMsgsFree( tr_peermsgs* msgs )
2103{
2104    if( msgs )
2105    {
2106        tr_timerFree( &msgs->pexTimer );
2107        tr_publisherFree( &msgs->publisher );
2108        reqListClear( &msgs->clientWillAskFor );
2109        reqListClear( &msgs->clientAskedFor );
2110        reqListClear( &msgs->peerAskedForFast );
2111        reqListClear( &msgs->peerAskedFor );
2112        tr_bitfieldFree( msgs->peerAllowedPieces );
2113        evbuffer_free( msgs->incoming.block );
2114        evbuffer_free( msgs->outMessages );
2115        evbuffer_free( msgs->outBlock );
2116        tr_free( msgs->pex );
2117
2118        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2119        tr_free( msgs );
2120    }
2121}
2122
2123void
2124tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2125                        tr_publisher_tag tag )
2126{
2127    tr_publisherUnsubscribe( peer->publisher, tag );
2128}
2129
Note: See TracBrowser for help on using the repository browser.