source: trunk/libtransmission/peer-msgs.c @ 7154

Last change on this file since 7154 was 7154, checked in by charles, 12 years ago

(libT) yet another stab at getting bandwidth management under control. this version may suck less than previous attempts. It also breaks the mac build until someone adds iobuf.[ch] to xcode...

  • Property svn:keywords set to Date Rev Author Id
File size: 59.8 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 7154 2008-11-25 21:35:17Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <event.h>
21
22#include "transmission.h"
23#include "bencode.h"
24#include "completion.h"
25#include "crypto.h"
26#include "inout.h"
27#include "iobuf.h"
28#ifdef WIN32
29#include "net.h" /* for ECONN */
30#endif
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-mgr-private.h"
34#include "peer-msgs.h"
35#include "ratecontrol.h"
36#include "stats.h"
37#include "torrent.h"
38#include "trevent.h"
39#include "utils.h"
40
41/**
42***
43**/
44
45enum
46{
47    BT_CHOKE                = 0,
48    BT_UNCHOKE              = 1,
49    BT_INTERESTED           = 2,
50    BT_NOT_INTERESTED       = 3,
51    BT_HAVE                 = 4,
52    BT_BITFIELD             = 5,
53    BT_REQUEST              = 6,
54    BT_PIECE                = 7,
55    BT_CANCEL               = 8,
56    BT_PORT                 = 9,
57    BT_SUGGEST              = 13,
58    BT_HAVE_ALL             = 14,
59    BT_HAVE_NONE            = 15,
60    BT_REJECT               = 16,
61    BT_ALLOWED_FAST         = 17,
62    BT_LTEP                 = 20,
63
64    LTEP_HANDSHAKE          = 0,
65
66    TR_LTEP_PEX             = 1,
67
68    MIN_CHOKE_PERIOD_SEC    = ( 10 ),
69
70    /* idle seconds before we send a keepalive */
71    KEEPALIVE_INTERVAL_SECS = 100,
72
73    PEX_INTERVAL            = ( 90 * 1000 ), /* msec between sendPex() calls */
74    PEER_PULSE_INTERVAL     = ( 250 ),       /* msec between peerPulse() calls
75                                               */
76
77    MAX_QUEUE_SIZE          = ( 100 ),
78
79    /* (fast peers) max number of pieces we fast-allow to another peer */
80    MAX_FAST_ALLOWED_COUNT   = 10,
81
82    /* (fast peers) max threshold for allowing fast-pieces requests */
83    MAX_FAST_ALLOWED_THRESHOLD = 10,
84
85    /* how long an unsent request can stay queued before it's returned
86       back to the peer-mgr's pool of requests */
87    QUEUED_REQUEST_TTL_SECS = 20,
88
89    /* how long a sent request can stay queued before it's returned
90       back to the peer-mgr's pool of requests */
91    SENT_REQUEST_TTL_SECS = 240,
92
93    /* used in lowering the outMessages queue period */
94    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
95    HIGH_PRIORITY_INTERVAL_SECS = 2,
96    LOW_PRIORITY_INTERVAL_SECS = 20,
97
98    /* number of pieces to remove from the bitfield when
99     * lazy bitfields are turned on */
100    LAZY_PIECE_COUNT = 26
101};
102
103/**
104***  REQUEST MANAGEMENT
105**/
106
107enum
108{
109    AWAITING_BT_LENGTH,
110    AWAITING_BT_ID,
111    AWAITING_BT_MESSAGE,
112    AWAITING_BT_PIECE
113};
114
115struct peer_request
116{
117    uint32_t    index;
118    uint32_t    offset;
119    uint32_t    length;
120    time_t      time_requested;
121};
122
123static int
124compareRequest( const void * va,
125                const void * vb )
126{
127    const struct peer_request * a = va;
128    const struct peer_request * b = vb;
129
130    if( a->index != b->index )
131        return a->index < b->index ? -1 : 1;
132
133    if( a->offset != b->offset )
134        return a->offset < b->offset ? -1 : 1;
135
136    if( a->length != b->length )
137        return a->length < b->length ? -1 : 1;
138
139    return 0;
140}
141
142struct request_list
143{
144    uint16_t               count;
145    uint16_t               max;
146    struct peer_request *  requests;
147};
148
149static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
150
151static void
152reqListReserve( struct request_list * list,
153                uint16_t              max )
154{
155    if( list->max < max )
156    {
157        list->max = max;
158        list->requests = tr_renew( struct peer_request,
159                                   list->requests,
160                                   list->max );
161    }
162}
163
164static void
165reqListClear( struct request_list * list )
166{
167    tr_free( list->requests );
168    *list = REQUEST_LIST_INIT;
169}
170
171static void
172reqListCopy( struct request_list *       dest,
173             const struct request_list * src )
174{
175    dest->count = dest->max = src->count;
176    dest->requests =
177        tr_memdup( src->requests, dest->count * sizeof( struct peer_request ) );
178}
179
180static void
181reqListRemoveOne( struct request_list * list,
182                  int                   i )
183{
184    assert( 0 <= i && i < list->count );
185
186    memmove( &list->requests[i],
187            &list->requests[i + 1],
188            sizeof( struct peer_request ) * ( --list->count - i ) );
189}
190
191static void
192reqListAppend( struct request_list *       list,
193               const struct peer_request * req )
194{
195    if( ++list->count >= list->max )
196        reqListReserve( list, list->max + 8 );
197
198    list->requests[list->count - 1] = *req;
199}
200
201static int
202reqListPop( struct request_list * list,
203            struct peer_request * setme )
204{
205    int success;
206
207    if( !list->count )
208        success = FALSE;
209    else {
210        *setme = list->requests[0];
211        reqListRemoveOne( list, 0 );
212        success = TRUE;
213    }
214
215    return success;
216}
217
218static int
219reqListFind( struct request_list *       list,
220             const struct peer_request * key )
221{
222    uint16_t i;
223
224    for( i = 0; i < list->count; ++i )
225        if( !compareRequest( key, list->requests + i ) )
226            return i;
227
228    return -1;
229}
230
231static int
232reqListRemove( struct request_list *       list,
233               const struct peer_request * key )
234{
235    int success;
236    const int i = reqListFind( list, key );
237
238    if( i < 0 )
239        success = FALSE;
240    else {
241        reqListRemoveOne( list, i );
242        success = TRUE;
243    }
244
245    return success;
246}
247
248/**
249***
250**/
251
252/* this is raw, unchanged data from the peer regarding
253 * the current message that it's sending us. */
254struct tr_incoming
255{
256    uint8_t                id;
257    uint32_t               length; /* includes the +1 for id length */
258    struct peer_request    blockReq; /* metadata for incoming blocks */
259    struct evbuffer *      block; /* piece data for incoming blocks */
260};
261
262struct tr_peermsgs
263{
264    unsigned int    peerSentBitfield        : 1;
265    unsigned int    peerSupportsPex         : 1;
266    unsigned int    clientSentLtepHandshake : 1;
267    unsigned int    peerSentLtepHandshake   : 1;
268    unsigned int    sendingBlock            : 1;
269
270    uint8_t         state;
271    uint8_t         ut_pex_id;
272    uint16_t        pexCount;
273    uint16_t        minActiveRequests;
274    uint16_t        maxActiveRequests;
275
276    /* how long the outMessages batch should be allowed to grow before
277     * it's flushed -- some messages (like requests >:) should be sent
278     * very quickly; others aren't as urgent. */
279    int                    outMessagesBatchPeriod;
280
281    tr_peer *              info;
282
283    tr_session *           session;
284    tr_torrent *           torrent;
285    tr_peerIo *            io;
286
287    tr_publisher_t *       publisher;
288
289    struct evbuffer *      outBlock; /* buffer of the current piece message */
290    struct evbuffer *      outMessages; /* all the non-piece messages */
291
292    struct request_list    peerAskedFor;
293    struct request_list    peerAskedForFast;
294    struct request_list    clientAskedFor;
295    struct request_list    clientWillAskFor;
296
297    tr_timer *             pexTimer;
298
299    time_t                 clientSentPexAt;
300    time_t                 clientSentAnythingAt;
301
302    /* when we started batching the outMessages */
303    time_t                outMessagesBatchedAt;
304
305    tr_bitfield *         peerAllowedPieces;
306
307    struct tr_incoming    incoming;
308
309    tr_pex *              pex;
310};
311
312/**
313***
314**/
315
316static void
317myDebug( const char *               file,
318         int                        line,
319         const struct tr_peermsgs * msgs,
320         const char *               fmt,
321         ... )
322{
323    FILE * fp = tr_getLog( );
324
325    if( fp )
326    {
327        va_list           args;
328        char              timestr[64];
329        struct evbuffer * buf = evbuffer_new( );
330        char *            base = tr_basename( file );
331
332        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
333                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
334                             msgs->torrent->info.name,
335                             tr_peerIoGetAddrStr( msgs->io ),
336                             msgs->info->client );
337        va_start( args, fmt );
338        evbuffer_add_vprintf( buf, fmt, args );
339        va_end( args );
340        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
341        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
342
343        tr_free( base );
344        evbuffer_free( buf );
345    }
346}
347
348#define dbgmsg( msgs, ... ) \
349    do { \
350        if( tr_deepLoggingIsActive( ) ) \
351            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
352    } while( 0 )
353
354/**
355***
356**/
357
358static void
359pokeBatchPeriod( tr_peermsgs * msgs,
360                 int           interval )
361{
362    if( msgs->outMessagesBatchPeriod > interval )
363    {
364        msgs->outMessagesBatchPeriod = interval;
365        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
366    }
367}
368
369static void
370protocolSendRequest( tr_peermsgs *               msgs,
371                     const struct peer_request * req )
372{
373    tr_peerIo *       io = msgs->io;
374    struct evbuffer * out = msgs->outMessages;
375
376    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
377    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
378    tr_peerIoWriteUint32( io, out, req->index );
379    tr_peerIoWriteUint32( io, out, req->offset );
380    tr_peerIoWriteUint32( io, out, req->length );
381    dbgmsg( msgs, "requesting %u:%u->%u... outMessage size is now %d",
382           req->index, req->offset, req->length, (int)EVBUFFER_LENGTH( out ) );
383    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
384}
385
386static void
387protocolSendCancel( tr_peermsgs *               msgs,
388                    const struct peer_request * req )
389{
390    tr_peerIo *       io = msgs->io;
391    struct evbuffer * out = msgs->outMessages;
392
393    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
394    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
395    tr_peerIoWriteUint32( io, out, req->index );
396    tr_peerIoWriteUint32( io, out, req->offset );
397    tr_peerIoWriteUint32( io, out, req->length );
398    dbgmsg( msgs, "cancelling %u:%u->%u... outMessage size is now %d",
399           req->index, req->offset, req->length, (int)EVBUFFER_LENGTH( out ) );
400    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
401}
402
403static void
404protocolSendHave( tr_peermsgs * msgs,
405                  uint32_t      index )
406{
407    tr_peerIo *       io = msgs->io;
408    struct evbuffer * out = msgs->outMessages;
409
410    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + sizeof( uint32_t ) );
411    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
412    tr_peerIoWriteUint32( io, out, index );
413    dbgmsg( msgs, "sending Have %u.. outMessage size is now %d",
414           index, (int)EVBUFFER_LENGTH( out ) );
415    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
416}
417
418static void
419protocolSendChoke( tr_peermsgs * msgs,
420                   int           choke )
421{
422    tr_peerIo *       io = msgs->io;
423    struct evbuffer * out = msgs->outMessages;
424
425    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
426    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
427    dbgmsg( msgs, "sending %s... outMessage size is now %d",
428           ( choke ? "Choke" : "Unchoke" ),
429           (int)EVBUFFER_LENGTH( out ) );
430    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
431}
432
433/**
434***  EVENTS
435**/
436
437static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0 };
438
439static void
440publish( tr_peermsgs *   msgs,
441         tr_peer_event * e )
442{
443    tr_publisherPublish( msgs->publisher, msgs->info, e );
444}
445
446static void
447fireError( tr_peermsgs * msgs,
448           int           err )
449{
450    tr_peer_event e = blankEvent;
451
452    e.eventType = TR_PEER_ERROR;
453    e.err = err;
454    publish( msgs, &e );
455}
456
457static void
458fireNeedReq( tr_peermsgs * msgs )
459{
460    tr_peer_event e = blankEvent;
461
462    e.eventType = TR_PEER_NEED_REQ;
463    publish( msgs, &e );
464}
465
466static void
467firePeerProgress( tr_peermsgs * msgs )
468{
469    tr_peer_event e = blankEvent;
470
471    e.eventType = TR_PEER_PEER_PROGRESS;
472    e.progress = msgs->info->progress;
473    publish( msgs, &e );
474}
475
476static void
477fireGotBlock( tr_peermsgs *               msgs,
478              const struct peer_request * req )
479{
480    tr_peer_event e = blankEvent;
481
482    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
483    e.pieceIndex = req->index;
484    e.offset = req->offset;
485    e.length = req->length;
486    publish( msgs, &e );
487}
488
489static void
490fireClientGotData( tr_peermsgs * msgs,
491                   uint32_t      length,
492                   int           wasPieceData )
493{
494    tr_peer_event e = blankEvent;
495
496    e.length = length;
497    e.eventType = TR_PEER_CLIENT_GOT_DATA;
498    e.wasPieceData = wasPieceData;
499    publish( msgs, &e );
500}
501
502static void
503firePeerGotData( tr_peermsgs  * msgs,
504                 uint32_t       length,
505                 int            wasPieceData )
506{
507    tr_peer_event e = blankEvent;
508
509    e.length = length;
510    e.eventType = TR_PEER_PEER_GOT_DATA;
511    e.wasPieceData = wasPieceData;
512
513    publish( msgs, &e );
514}
515
516static void
517fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
518{
519    tr_peer_event e = blankEvent;
520    e.eventType = TR_PEER_CANCEL;
521    e.pieceIndex = req->index;
522    e.offset = req->offset;
523    e.length = req->length;
524    publish( msgs, &e );
525}
526
527/**
528***  INTEREST
529**/
530
531static int
532isPieceInteresting( const tr_peermsgs * peer,
533                    tr_piece_index_t    piece )
534{
535    const tr_torrent * torrent = peer->torrent;
536
537    return ( !torrent->info.pieces[piece].dnd )                 /* we want it */
538           && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have
539                                                                        */
540           && ( tr_bitfieldHas( peer->info->have, piece ) );   /* peer has it */
541}
542
543/* "interested" means we'll ask for piece data if they unchoke us */
544static int
545isPeerInteresting( const tr_peermsgs * msgs )
546{
547    tr_piece_index_t    i;
548    const tr_torrent *  torrent;
549    const tr_bitfield * bitfield;
550    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
551
552    if( clientIsSeed )
553        return FALSE;
554
555    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
556        return FALSE;
557
558    torrent = msgs->torrent;
559    bitfield = tr_cpPieceBitfield( torrent->completion );
560
561    if( !msgs->info->have )
562        return TRUE;
563
564    assert( bitfield->byteCount == msgs->info->have->byteCount );
565
566    for( i = 0; i < torrent->info.pieceCount; ++i )
567        if( isPieceInteresting( msgs, i ) )
568            return TRUE;
569
570    return FALSE;
571}
572
573static void
574sendInterest( tr_peermsgs * msgs,
575              int           weAreInterested )
576{
577    struct evbuffer * out = msgs->outMessages;
578
579    assert( msgs );
580    assert( weAreInterested == 0 || weAreInterested == 1 );
581
582    msgs->info->clientIsInterested = weAreInterested;
583    dbgmsg( msgs, "Sending %s",
584            weAreInterested ? "Interested" : "Not Interested" );
585    tr_peerIoWriteUint32( msgs->io, out, sizeof( uint8_t ) );
586    tr_peerIoWriteUint8 (
587        msgs->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
588    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
589    dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH( out ) );
590}
591
592static void
593updateInterest( tr_peermsgs * msgs )
594{
595    const int i = isPeerInteresting( msgs );
596
597    if( i != msgs->info->clientIsInterested )
598        sendInterest( msgs, i );
599    if( i )
600        fireNeedReq( msgs );
601}
602
603static void
604cancelAllRequestsToClientExceptFast( tr_peermsgs * msgs )
605{
606    reqListClear( &msgs->peerAskedFor );
607}
608
609void
610tr_peerMsgsSetChoke( tr_peermsgs * msgs,
611                     int           choke )
612{
613    const time_t now = time( NULL );
614    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
615
616    assert( msgs );
617    assert( msgs->info );
618    assert( choke == 0 || choke == 1 );
619
620    if( msgs->info->chokeChangedAt > fibrillationTime )
621    {
622        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation",
623                choke );
624    }
625    else if( msgs->info->peerIsChoked != choke )
626    {
627        msgs->info->peerIsChoked = choke;
628        if( choke )
629            cancelAllRequestsToClientExceptFast( msgs );
630        protocolSendChoke( msgs, choke );
631        msgs->info->chokeChangedAt = now;
632    }
633}
634
635/**
636***
637**/
638
639void
640tr_peerMsgsHave( tr_peermsgs * msgs,
641                 uint32_t      index )
642{
643    protocolSendHave( msgs, index );
644
645    /* since we have more pieces now, we might not be interested in this peer */
646    updateInterest( msgs );
647}
648
649#if 0
650static void
651sendFastSuggest( tr_peermsgs * msgs,
652                 uint32_t      pieceIndex )
653{
654    assert( msgs );
655
656    if( tr_peerIoSupportsFEXT( msgs->io ) )
657    {
658        tr_peerIoWriteUint32( msgs->io, msgs->outMessages,
659                             sizeof( uint8_t ) + sizeof( uint32_t ) );
660        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
661        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
662    }
663}
664
665static void
666sendFastHave( tr_peermsgs * msgs,
667              int           all )
668{
669    assert( msgs );
670
671    if( tr_peerIoSupportsFEXT( msgs->io ) )
672    {
673        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof( uint8_t ) );
674        tr_peerIoWriteUint8( msgs->io, msgs->outMessages,
675                            ( all ? BT_HAVE_ALL
676                              : BT_HAVE_NONE ) );
677        updateInterest( msgs );
678    }
679}
680
681#endif
682
683static void
684sendFastReject( tr_peermsgs * msgs,
685                uint32_t      pieceIndex,
686                uint32_t      offset,
687                uint32_t      length )
688{
689    assert( msgs );
690
691    if( tr_peerIoSupportsFEXT( msgs->io ) )
692    {
693        struct evbuffer * out = msgs->outMessages;
694        const uint32_t    len = sizeof( uint8_t ) + 3 * sizeof( uint32_t );
695        dbgmsg( msgs, "sending fast reject %u:%u->%u", pieceIndex, offset,
696                length );
697        tr_peerIoWriteUint32( msgs->io, out, len );
698        tr_peerIoWriteUint8( msgs->io, out, BT_REJECT );
699        tr_peerIoWriteUint32( msgs->io, out, pieceIndex );
700        tr_peerIoWriteUint32( msgs->io, out, offset );
701        tr_peerIoWriteUint32( msgs->io, out, length );
702        pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
703        dbgmsg( msgs, "outMessage size is now %d",
704               (int)EVBUFFER_LENGTH( out ) );
705    }
706}
707
708static tr_bitfield*
709getPeerAllowedPieces( tr_peermsgs * msgs )
710{
711    if( !msgs->peerAllowedPieces && tr_peerIoSupportsFEXT( msgs->io ) )
712    {
713        msgs->peerAllowedPieces = tr_peerMgrGenerateAllowedSet(
714            MAX_FAST_ALLOWED_COUNT,
715            msgs->torrent->info.pieceCount,
716            msgs->torrent->info.hash,
717            tr_peerIoGetAddress( msgs->io, NULL ) );
718    }
719
720    return msgs->peerAllowedPieces;
721}
722
723static void
724sendFastAllowed( tr_peermsgs * msgs,
725                 uint32_t      pieceIndex )
726{
727    assert( msgs );
728
729    if( tr_peerIoSupportsFEXT( msgs->io ) )
730    {
731        struct evbuffer * out = msgs->outMessages;
732        dbgmsg( msgs, "sending fast allowed" );
733        tr_peerIoWriteUint32( msgs->io, out,  sizeof( uint8_t ) +
734                             sizeof( uint32_t ) );
735        tr_peerIoWriteUint8( msgs->io, out, BT_ALLOWED_FAST );
736        tr_peerIoWriteUint32( msgs->io, out, pieceIndex );
737        pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
738        dbgmsg( msgs, "outMessage size is now %d",
739               (int)EVBUFFER_LENGTH( out ) );
740    }
741}
742
743static void
744sendFastAllowedSet( tr_peermsgs * msgs )
745{
746    tr_piece_index_t i = 0;
747
748    while( i <= msgs->torrent->info.pieceCount )
749    {
750        if( tr_bitfieldHas( getPeerAllowedPieces( msgs ), i ) )
751            sendFastAllowed( msgs, i );
752        i++;
753    }
754}
755
756static void
757maybeSendFastAllowedSet( tr_peermsgs * msgs )
758{
759    if( tr_bitfieldCountTrueBits( msgs->info->have ) <=
760        MAX_FAST_ALLOWED_THRESHOLD )
761        sendFastAllowedSet( msgs );
762}
763
764/**
765***
766**/
767
768static int
769reqIsValid( const tr_peermsgs * peer,
770            uint32_t            index,
771            uint32_t            offset,
772            uint32_t            length )
773{
774    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
775}
776
777static int
778requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
779{
780    return reqIsValid( msgs, req->index, req->offset, req->length );
781}
782
783static void
784expireOldRequests( tr_peermsgs * msgs, const time_t now  )
785{
786    int                 i;
787    time_t              oldestAllowed;
788    struct request_list tmp = REQUEST_LIST_INIT;
789
790    /* cancel requests that have been queued for too long */
791    oldestAllowed = now - QUEUED_REQUEST_TTL_SECS;
792    reqListCopy( &tmp, &msgs->clientWillAskFor );
793    for( i = 0; i < tmp.count; ++i )
794    {
795        const struct peer_request * req = &tmp.requests[i];
796        if( req->time_requested < oldestAllowed )
797            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
798    }
799    reqListClear( &tmp );
800
801    /* cancel requests that were sent too long ago */
802    oldestAllowed = now - SENT_REQUEST_TTL_SECS;
803    reqListCopy( &tmp, &msgs->clientAskedFor );
804    for( i = 0; i < tmp.count; ++i )
805    {
806        const struct peer_request * req = &tmp.requests[i];
807        if( req->time_requested < oldestAllowed )
808            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
809    }
810    reqListClear( &tmp );
811}
812
813static void
814pumpRequestQueue( tr_peermsgs * msgs, const time_t now )
815{
816    const int           max = msgs->maxActiveRequests;
817    const int           min = msgs->minActiveRequests;
818    int                 sent = 0;
819    int                 count = msgs->clientAskedFor.count;
820    struct peer_request req;
821
822    if( count > min )
823        return;
824    if( msgs->info->clientIsChoked )
825        return;
826    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
827        return;
828
829    while( ( count < max ) && reqListPop( &msgs->clientWillAskFor, &req ) )
830    {
831        const tr_block_index_t block =
832            _tr_block( msgs->torrent, req.index, req.offset );
833
834        assert( requestIsValid( msgs, &req ) );
835        assert( tr_bitfieldHas( msgs->info->have, req.index ) );
836
837        /* don't ask for it if we've already got it... this block may have
838         * come in from a different peer after we cancelled a request for it */
839        if( !tr_cpBlockIsComplete( msgs->torrent->completion, block ) )
840        {
841            protocolSendRequest( msgs, &req );
842            req.time_requested = now;
843            reqListAppend( &msgs->clientAskedFor, &req );
844
845            ++count;
846            ++sent;
847        }
848    }
849
850    if( sent )
851        dbgmsg( msgs,
852                "pump sent %d requests, now have %d active and %d queued",
853                sent,
854                msgs->clientAskedFor.count,
855                msgs->clientWillAskFor.count );
856
857    if( count < max )
858        fireNeedReq( msgs );
859}
860
861static int
862requestQueueIsFull( const tr_peermsgs * msgs )
863{
864    const int req_max = msgs->maxActiveRequests;
865    return msgs->clientWillAskFor.count >= req_max;
866}
867
868tr_addreq_t
869tr_peerMsgsAddRequest( tr_peermsgs *    msgs,
870                       uint32_t         index,
871                       uint32_t         offset,
872                       uint32_t         length )
873{
874    struct peer_request req;
875
876    assert( msgs );
877    assert( msgs->torrent );
878    assert( reqIsValid( msgs, index, offset, length ) );
879
880    /**
881    ***  Reasons to decline the request
882    **/
883
884    /* don't send requests to choked clients */
885    if( msgs->info->clientIsChoked )
886    {
887        dbgmsg( msgs, "declining request because they're choking us" );
888        return TR_ADDREQ_CLIENT_CHOKED;
889    }
890
891    /* peer doesn't have this piece */
892    if( !tr_bitfieldHas( msgs->info->have, index ) )
893        return TR_ADDREQ_MISSING;
894
895    /* peer's queue is full */
896    if( requestQueueIsFull( msgs ) ) {
897        dbgmsg( msgs, "declining request because we're full" );
898        return TR_ADDREQ_FULL;
899    }
900
901    /* have we already asked for this piece? */
902    req.index = index;
903    req.offset = offset;
904    req.length = length;
905    if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
906        dbgmsg( msgs, "declining because it's a duplicate" );
907        return TR_ADDREQ_DUPLICATE;
908    }
909    if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
910        dbgmsg( msgs, "declining because it's a duplicate" );
911        return TR_ADDREQ_DUPLICATE;
912    }
913
914    /**
915    ***  Accept this request
916    **/
917
918    dbgmsg( msgs, "adding req for %"PRIu32":%"PRIu32"->%"PRIu32" to our `will request' list",
919            index, offset, length );
920    req.time_requested = time( NULL );
921    reqListAppend( &msgs->clientWillAskFor, &req );
922    return TR_ADDREQ_OK;
923}
924
925static void
926cancelAllRequestsToPeer( tr_peermsgs * msgs )
927{
928    int                 i;
929    struct request_list a = msgs->clientWillAskFor;
930    struct request_list b = msgs->clientAskedFor;
931    dbgmsg( msgs, "cancelling all requests to peer" );
932
933    msgs->clientAskedFor = REQUEST_LIST_INIT;
934    msgs->clientWillAskFor = REQUEST_LIST_INIT;
935
936    for( i=0; i<a.count; ++i )
937        fireCancelledReq( msgs, &a.requests[i] );
938
939    for( i = 0; i < b.count; ++i ) {
940        fireCancelledReq( msgs, &b.requests[i] );
941        protocolSendCancel( msgs, &b.requests[i] );
942    }
943
944    reqListClear( &a );
945    reqListClear( &b );
946}
947
948void
949tr_peerMsgsCancel( tr_peermsgs * msgs,
950                   uint32_t      pieceIndex,
951                   uint32_t      offset,
952                   uint32_t      length )
953{
954    struct peer_request req;
955
956    assert( msgs != NULL );
957    assert( length > 0 );
958
959
960    /* have we asked the peer for this piece? */
961    req.index = pieceIndex;
962    req.offset = offset;
963    req.length = length;
964
965    /* if it's only in the queue and hasn't been sent yet, free it */
966    if( reqListRemove( &msgs->clientWillAskFor, &req ) ) {
967        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32"\n", pieceIndex, offset, length );
968        fireCancelledReq( msgs, &req );
969    }
970
971    /* if it's already been sent, send a cancel message too */
972    if( reqListRemove( &msgs->clientAskedFor, &req ) ) {
973        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32"\n", pieceIndex, offset, length );
974        protocolSendCancel( msgs, &req );
975        fireCancelledReq( msgs, &req );
976    }
977}
978
979/**
980***
981**/
982
983static void
984sendLtepHandshake( tr_peermsgs * msgs )
985{
986    tr_benc           val, *m;
987    char *            buf;
988    int               len;
989    int               pex;
990    struct evbuffer * out = msgs->outMessages;
991
992    if( msgs->clientSentLtepHandshake )
993        return;
994
995    dbgmsg( msgs, "sending an ltep handshake" );
996    msgs->clientSentLtepHandshake = 1;
997
998    /* decide if we want to advertise pex support */
999    if( !tr_torrentAllowsPex( msgs->torrent ) )
1000        pex = 0;
1001    else if( msgs->peerSentLtepHandshake )
1002        pex = msgs->peerSupportsPex ? 1 : 0;
1003    else
1004        pex = 1;
1005
1006    tr_bencInitDict( &val, 4 );
1007    tr_bencDictAddInt( &val, "e",
1008                       msgs->session->encryptionMode != TR_CLEAR_PREFERRED );
1009    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->session ) );
1010    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
1011    m  = tr_bencDictAddDict( &val, "m", 1 );
1012    if( pex )
1013        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
1014    buf = tr_bencSave( &val, &len );
1015
1016    tr_peerIoWriteUint32( msgs->io, out, 2 * sizeof( uint8_t ) + len );
1017    tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
1018    tr_peerIoWriteUint8 ( msgs->io, out, LTEP_HANDSHAKE );
1019    tr_peerIoWriteBytes ( msgs->io, out, buf, len );
1020    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1021    dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH( out ) );
1022
1023    /* cleanup */
1024    tr_bencFree( &val );
1025    tr_free( buf );
1026}
1027
1028static void
1029parseLtepHandshake( tr_peermsgs *     msgs,
1030                    int               len,
1031                    struct evbuffer * inbuf )
1032{
1033    int64_t   i;
1034    tr_benc   val, * sub;
1035    uint8_t * tmp = tr_new( uint8_t, len );
1036
1037    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
1038    msgs->peerSentLtepHandshake = 1;
1039
1040    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type != TYPE_DICT )
1041    {
1042        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
1043        tr_free( tmp );
1044        return;
1045    }
1046
1047    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
1048
1049    /* does the peer prefer encrypted connections? */
1050    if( tr_bencDictFindInt( &val, "e", &i ) )
1051        msgs->info->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1052                                            : ENCRYPTION_PREFERENCE_NO;
1053
1054    /* check supported messages for utorrent pex */
1055    msgs->peerSupportsPex = 0;
1056    if( tr_bencDictFindDict( &val, "m", &sub ) )
1057    {
1058        if( tr_bencDictFindInt( sub, "ut_pex", &i ) )
1059        {
1060            msgs->ut_pex_id = (uint8_t) i;
1061            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
1062            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
1063        }
1064    }
1065
1066    /* get peer's listening port */
1067    if( tr_bencDictFindInt( &val, "p", &i ) )
1068    {
1069        msgs->info->port = htons( (uint16_t)i );
1070        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
1071    }
1072
1073    tr_bencFree( &val );
1074    tr_free( tmp );
1075}
1076
1077static void
1078parseUtPex( tr_peermsgs *     msgs,
1079            int               msglen,
1080            struct evbuffer * inbuf )
1081{
1082    int                loaded = 0;
1083    uint8_t *          tmp = tr_new( uint8_t, msglen );
1084    tr_benc            val;
1085    const tr_torrent * tor = msgs->torrent;
1086    const uint8_t *    added;
1087    size_t             added_len;
1088
1089    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
1090
1091    if( tr_torrentAllowsPex( tor )
1092      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) )
1093      && tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1094    {
1095        const uint8_t * added_f = NULL;
1096        tr_pex *        pex;
1097        size_t          i, n;
1098        size_t          added_f_len = 0;
1099        tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1100        pex =
1101            tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len,
1102                                    &n );
1103        for( i = 0; i < n; ++i )
1104            tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1105                              TR_PEER_FROM_PEX, pex + i );
1106        tr_free( pex );
1107    }
1108
1109    if( loaded )
1110        tr_bencFree( &val );
1111    tr_free( tmp );
1112}
1113
1114static void sendPex( tr_peermsgs * msgs );
1115
1116static void
1117parseLtep( tr_peermsgs *     msgs,
1118           int               msglen,
1119           struct evbuffer * inbuf )
1120{
1121    uint8_t ltep_msgid;
1122
1123    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
1124    msglen--;
1125
1126    if( ltep_msgid == LTEP_HANDSHAKE )
1127    {
1128        dbgmsg( msgs, "got ltep handshake" );
1129        parseLtepHandshake( msgs, msglen, inbuf );
1130        if( tr_peerIoSupportsLTEP( msgs->io ) )
1131        {
1132            sendLtepHandshake( msgs );
1133            sendPex( msgs );
1134        }
1135    }
1136    else if( ltep_msgid == TR_LTEP_PEX )
1137    {
1138        dbgmsg( msgs, "got ut pex" );
1139        msgs->peerSupportsPex = 1;
1140        parseUtPex( msgs, msglen, inbuf );
1141    }
1142    else
1143    {
1144        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1145        evbuffer_drain( inbuf, msglen );
1146    }
1147}
1148
1149static int
1150readBtLength( tr_peermsgs *     msgs,
1151              struct evbuffer * inbuf,
1152              size_t            inlen )
1153{
1154    uint32_t len;
1155
1156    if( inlen < sizeof( len ) )
1157        return READ_LATER;
1158
1159    tr_peerIoReadUint32( msgs->io, inbuf, &len );
1160
1161    if( len == 0 ) /* peer sent us a keepalive message */
1162        dbgmsg( msgs, "got KeepAlive" );
1163    else
1164    {
1165        msgs->incoming.length = len;
1166        msgs->state = AWAITING_BT_ID;
1167    }
1168
1169    return READ_NOW;
1170}
1171
1172static int readBtMessage( tr_peermsgs *     msgs,
1173                          struct evbuffer * inbuf,
1174                          size_t            inlen );
1175
1176static int
1177readBtId( tr_peermsgs *     msgs,
1178          struct evbuffer * inbuf,
1179          size_t            inlen )
1180{
1181    uint8_t id;
1182
1183    if( inlen < sizeof( uint8_t ) )
1184        return READ_LATER;
1185
1186    tr_peerIoReadUint8( msgs->io, inbuf, &id );
1187    msgs->incoming.id = id;
1188
1189    if( id == BT_PIECE )
1190    {
1191        msgs->state = AWAITING_BT_PIECE;
1192        return READ_NOW;
1193    }
1194    else if( msgs->incoming.length != 1 )
1195    {
1196        msgs->state = AWAITING_BT_MESSAGE;
1197        return READ_NOW;
1198    }
1199    else return readBtMessage( msgs, inbuf, inlen - 1 );
1200}
1201
1202static void
1203updatePeerProgress( tr_peermsgs * msgs )
1204{
1205    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have )
1206                           / (float)msgs->torrent->info.pieceCount;
1207    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
1208    updateInterest( msgs );
1209    firePeerProgress( msgs );
1210}
1211
1212static int
1213clientCanSendFastBlock( const tr_peermsgs * msgs UNUSED )
1214{
1215    /* don't send a fast piece if peer has MAX_FAST_ALLOWED_THRESHOLD pieces */
1216    if( tr_bitfieldCountTrueBits( msgs->info->have ) >
1217        MAX_FAST_ALLOWED_THRESHOLD )
1218        return FALSE;
1219
1220    /* ...or if we don't have ourself enough pieces */
1221    if( tr_bitfieldCountTrueBits( tr_cpPieceBitfield( msgs->torrent->
1222                                                      completion ) ) <
1223        MAX_FAST_ALLOWED_THRESHOLD )
1224        return FALSE;
1225
1226    /* Maybe a bandwidth limit ? */
1227    return TRUE;
1228}
1229
1230static void
1231peerMadeRequest( tr_peermsgs *               msgs,
1232                 const struct peer_request * req )
1233{
1234    const int reqIsValid = requestIsValid( msgs, req );
1235    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete(
1236        msgs->torrent->completion, req->index );
1237    const int peerIsChoked = msgs->info->peerIsChoked;
1238    const int peerIsFast = tr_peerIoSupportsFEXT( msgs->io );
1239    const int pieceIsFast = reqIsValid && tr_bitfieldHas(
1240        getPeerAllowedPieces( msgs ), req->index );
1241    const int canSendFast = clientCanSendFastBlock( msgs );
1242
1243    if( !reqIsValid ) /* bad request */
1244    {
1245        dbgmsg( msgs, "rejecting an invalid request." );
1246        sendFastReject( msgs, req->index, req->offset, req->length );
1247    }
1248    else if( !clientHasPiece ) /* we don't have it */
1249    {
1250        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1251        sendFastReject( msgs, req->index, req->offset, req->length );
1252    }
1253    else if( peerIsChoked && !peerIsFast ) /* doesn't he know he's choked? */
1254    {
1255        tr_peerMsgsSetChoke( msgs, 1 );
1256        sendFastReject( msgs, req->index, req->offset, req->length );
1257    }
1258    else if( peerIsChoked && peerIsFast && ( !pieceIsFast || !canSendFast ) )
1259    {
1260        sendFastReject( msgs, req->index, req->offset, req->length );
1261    }
1262    else /* YAY */
1263    {
1264        if( peerIsFast && pieceIsFast )
1265            reqListAppend( &msgs->peerAskedForFast, req );
1266        else
1267            reqListAppend( &msgs->peerAskedFor, req );
1268    }
1269}
1270
1271static int
1272messageLengthIsCorrect( const tr_peermsgs * msg,
1273                        uint8_t             id,
1274                        uint32_t            len )
1275{
1276    switch( id )
1277    {
1278        case BT_CHOKE:
1279        case BT_UNCHOKE:
1280        case BT_INTERESTED:
1281        case BT_NOT_INTERESTED:
1282        case BT_HAVE_ALL:
1283        case BT_HAVE_NONE:
1284            return len == 1;
1285
1286        case BT_HAVE:
1287        case BT_SUGGEST:
1288        case BT_ALLOWED_FAST:
1289            return len == 5;
1290
1291        case BT_BITFIELD:
1292            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1293
1294        case BT_REQUEST:
1295        case BT_CANCEL:
1296        case BT_REJECT:
1297            return len == 13;
1298
1299        case BT_PIECE:
1300            return len > 9 && len <= 16393;
1301
1302        case BT_PORT:
1303            return len == 3;
1304
1305        case BT_LTEP:
1306            return len >= 2;
1307
1308        default:
1309            return FALSE;
1310    }
1311}
1312
1313static int clientGotBlock( tr_peermsgs *               msgs,
1314                           const uint8_t *             block,
1315                           const struct peer_request * req );
1316
1317static int
1318readBtPiece( tr_peermsgs      * msgs,
1319             struct evbuffer  * inbuf,
1320             size_t             inlen,
1321             size_t           * setme_piece_bytes_read )
1322{
1323    struct peer_request * req = &msgs->incoming.blockReq;
1324
1325    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1326    dbgmsg( msgs, "In readBtPiece" );
1327
1328    if( !req->length )
1329    {
1330        if( inlen < 8 )
1331            return READ_LATER;
1332
1333        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
1334        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
1335        req->length = msgs->incoming.length - 9;
1336        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index,
1337                req->offset,
1338                req->length );
1339        return READ_NOW;
1340    }
1341    else
1342    {
1343        int          err;
1344
1345        /* read in another chunk of data */
1346        const size_t nLeft = req->length - EVBUFFER_LENGTH(
1347            msgs->incoming.block );
1348        size_t       n = MIN( nLeft, inlen );
1349        uint8_t *    buf = tr_new( uint8_t, n );
1350        assert( EVBUFFER_LENGTH( inbuf ) >= n );
1351        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
1352        evbuffer_add( msgs->incoming.block, buf, n );
1353        fireClientGotData( msgs, n, TRUE );
1354        *setme_piece_bytes_read += n;
1355        tr_free( buf );
1356        dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
1357               (int)n, req->index, req->offset, req->length,
1358               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1359        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1360            return READ_LATER;
1361
1362        /* we've got the whole block ... process it */
1363        err = clientGotBlock( msgs, EVBUFFER_DATA(
1364                                  msgs->incoming.block ), req );
1365
1366        /* cleanup */
1367        evbuffer_drain( msgs->incoming.block,
1368                       EVBUFFER_LENGTH( msgs->incoming.block ) );
1369        req->length = 0;
1370        msgs->state = AWAITING_BT_LENGTH;
1371        if( !err )
1372            return READ_NOW;
1373        else
1374        {
1375            fireError( msgs, err );
1376            return READ_ERR;
1377        }
1378    }
1379}
1380
1381static int
1382readBtMessage( tr_peermsgs *     msgs,
1383               struct evbuffer * inbuf,
1384               size_t            inlen )
1385{
1386    uint32_t      ui32;
1387    uint32_t      msglen = msgs->incoming.length;
1388    const uint8_t id = msgs->incoming.id;
1389    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1390
1391    --msglen; /* id length */
1392
1393    if( inlen < msglen )
1394        return READ_LATER;
1395
1396    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id,
1397            (int)msglen,
1398            (int)inlen );
1399
1400    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1401    {
1402        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d",
1403                (int)id, (int)msglen );
1404        fireError( msgs, EMSGSIZE );
1405        return READ_ERR;
1406    }
1407
1408    switch( id )
1409    {
1410        case BT_CHOKE:
1411            dbgmsg( msgs, "got Choke" );
1412            msgs->info->clientIsChoked = 1;
1413            cancelAllRequestsToPeer( msgs );
1414            cancelAllRequestsToClientExceptFast( msgs );
1415            break;
1416
1417        case BT_UNCHOKE:
1418            dbgmsg( msgs, "got Unchoke" );
1419            msgs->info->clientIsChoked = 0;
1420            fireNeedReq( msgs );
1421            break;
1422
1423        case BT_INTERESTED:
1424            dbgmsg( msgs, "got Interested" );
1425            msgs->info->peerIsInterested = 1;
1426            break;
1427
1428        case BT_NOT_INTERESTED:
1429            dbgmsg( msgs, "got Not Interested" );
1430            msgs->info->peerIsInterested = 0;
1431            break;
1432
1433        case BT_HAVE:
1434            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1435            dbgmsg( msgs, "got Have: %u", ui32 );
1436            if( tr_bitfieldAdd( msgs->info->have, ui32 ) )
1437                fireError( msgs, ERANGE );
1438            updatePeerProgress( msgs );
1439            tr_rcTransferred( msgs->torrent->swarmSpeed,
1440                              msgs->torrent->info.pieceSize );
1441            break;
1442
1443        case BT_BITFIELD:
1444        {
1445            dbgmsg( msgs, "got a bitfield" );
1446            msgs->peerSentBitfield = 1;
1447            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits,
1448                                msglen );
1449            updatePeerProgress( msgs );
1450            maybeSendFastAllowedSet( msgs );
1451            fireNeedReq( msgs );
1452            break;
1453        }
1454
1455        case BT_REQUEST:
1456        {
1457            struct peer_request r;
1458            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1459            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1460            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1461            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset,
1462                    r.length );
1463            peerMadeRequest( msgs, &r );
1464            break;
1465        }
1466
1467        case BT_CANCEL:
1468        {
1469            struct peer_request r;
1470            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1471            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1472            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1473            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset,
1474                    r.length );
1475            reqListRemove( &msgs->peerAskedForFast, &r );
1476            reqListRemove( &msgs->peerAskedFor, &r );
1477            break;
1478        }
1479
1480        case BT_PIECE:
1481            assert( 0 ); /* handled elsewhere! */
1482            break;
1483
1484        case BT_PORT:
1485            dbgmsg( msgs, "Got a BT_PORT" );
1486            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1487            break;
1488
1489        case BT_SUGGEST:
1490        {
1491            dbgmsg( msgs, "Got a BT_SUGGEST" );
1492            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1493            /* we don't do anything with this yet */
1494            break;
1495        }
1496
1497        case BT_HAVE_ALL:
1498            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1499            tr_bitfieldAddRange( msgs->info->have, 0,
1500                                 msgs->torrent->info.pieceCount );
1501            updatePeerProgress( msgs );
1502            maybeSendFastAllowedSet( msgs );
1503            break;
1504
1505
1506        case BT_HAVE_NONE:
1507            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1508            tr_bitfieldClear( msgs->info->have );
1509            updatePeerProgress( msgs );
1510            maybeSendFastAllowedSet( msgs );
1511            break;
1512
1513        case BT_REJECT:
1514        {
1515            struct peer_request r;
1516            dbgmsg( msgs, "Got a BT_REJECT" );
1517            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1518            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1519            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1520            reqListRemove( &msgs->clientAskedFor, &r );
1521            break;
1522        }
1523
1524        case BT_ALLOWED_FAST:
1525        {
1526            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1527            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1528            /* we don't do anything with this yet */
1529            break;
1530        }
1531
1532        case BT_LTEP:
1533            dbgmsg( msgs, "Got a BT_LTEP" );
1534            parseLtep( msgs, msglen, inbuf );
1535            break;
1536
1537        default:
1538            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1539            tr_peerIoDrain( msgs->io, inbuf, msglen );
1540            break;
1541    }
1542
1543    assert( msglen + 1 == msgs->incoming.length );
1544    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1545
1546    msgs->state = AWAITING_BT_LENGTH;
1547    return READ_NOW;
1548}
1549
1550static void
1551decrementDownloadedCount( tr_peermsgs * msgs,
1552                          uint32_t      byteCount )
1553{
1554    tr_torrent * tor = msgs->torrent;
1555
1556    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1557}
1558
1559static void
1560clientGotUnwantedBlock( tr_peermsgs *               msgs,
1561                        const struct peer_request * req )
1562{
1563    decrementDownloadedCount( msgs, req->length );
1564}
1565
1566static void
1567addPeerToBlamefield( tr_peermsgs * msgs,
1568                     uint32_t      index )
1569{
1570    if( !msgs->info->blame )
1571        msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1572    tr_bitfieldAdd( msgs->info->blame, index );
1573}
1574
1575/* returns 0 on success, or an errno on failure */
1576static int
1577clientGotBlock( tr_peermsgs *               msgs,
1578                const uint8_t *             data,
1579                const struct peer_request * req )
1580{
1581    int                    err;
1582    tr_torrent *           tor = msgs->torrent;
1583    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1584
1585    assert( msgs );
1586    assert( req );
1587
1588    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) )
1589    {
1590        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1591                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1592        return EMSGSIZE;
1593    }
1594
1595    /* save the block */
1596    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset,
1597            req->length );
1598
1599    /**
1600    *** Remove the block from our `we asked for this' list
1601    **/
1602
1603    if( !reqListRemove( &msgs->clientAskedFor, req ) )
1604    {
1605        clientGotUnwantedBlock( msgs, req );
1606        dbgmsg( msgs, "we didn't ask for this message..." );
1607        return 0;
1608    }
1609
1610    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1611            msgs->clientAskedFor.count );
1612
1613    /**
1614    *** Error checks
1615    **/
1616
1617    if( tr_cpBlockIsComplete( tor->completion, block ) )
1618    {
1619        dbgmsg( msgs, "we have this block already..." );
1620        clientGotUnwantedBlock( msgs, req );
1621        return 0;
1622    }
1623
1624    /**
1625    ***  Save the block
1626    **/
1627
1628    msgs->info->peerSentPieceDataAt = time( NULL );
1629    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1630        return err;
1631
1632    addPeerToBlamefield( msgs, req->index );
1633    fireGotBlock( msgs, req );
1634    return 0;
1635}
1636
1637static int peerPulse( void * vmsgs );
1638
1639static void
1640didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1641{
1642    tr_peermsgs * msgs = vmsgs;
1643    firePeerGotData( msgs, bytesWritten, wasPieceData );
1644    peerPulse( msgs );
1645}
1646
1647static ReadState
1648canRead( struct tr_iobuf * iobuf, void * vmsgs, size_t * piece )
1649{
1650    ReadState         ret;
1651    tr_peermsgs *     msgs = vmsgs;
1652    struct evbuffer * in = tr_iobuf_input( iobuf );
1653    const size_t      inlen = EVBUFFER_LENGTH( in );
1654
1655    if( !inlen )
1656    {
1657        ret = READ_LATER;
1658    }
1659    else if( msgs->state == AWAITING_BT_PIECE )
1660    {
1661        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1662    }
1663    else switch( msgs->state )
1664    {
1665        case AWAITING_BT_LENGTH:
1666            ret = readBtLength ( msgs, in, inlen ); break;
1667
1668        case AWAITING_BT_ID:
1669            ret = readBtId     ( msgs, in, inlen ); break;
1670
1671        case AWAITING_BT_MESSAGE:
1672            ret = readBtMessage( msgs, in, inlen ); break;
1673
1674        default:
1675            assert( 0 );
1676    }
1677
1678    /* log the raw data that was read */
1679    if( EVBUFFER_LENGTH( in ) != inlen )
1680        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1681
1682    return ret;
1683}
1684
1685static void
1686sendKeepalive( tr_peermsgs * msgs )
1687{
1688    dbgmsg( msgs, "sending a keepalive message" );
1689    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1690    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1691}
1692
1693/**
1694***
1695**/
1696
1697static int
1698ratePulse( void * vpeer )
1699{
1700    tr_peermsgs * peer = vpeer;
1701    const double rateToClient = tr_peerGetPieceSpeed( peer->info,
1702                                                      TR_PEER_TO_CLIENT );
1703    const int estimatedBlocksInNext30Seconds =
1704                  ( rateToClient * 30 * 1024 ) / peer->torrent->blockSize;
1705
1706    peer->minActiveRequests = 8;
1707    peer->maxActiveRequests = peer->minActiveRequests + estimatedBlocksInNext30Seconds;
1708    return TRUE;
1709}
1710
1711static int
1712popNextRequest( tr_peermsgs *         msgs,
1713                struct peer_request * setme )
1714{
1715    return reqListPop( &msgs->peerAskedForFast, setme )
1716        || reqListPop( &msgs->peerAskedFor, setme );
1717}
1718
1719static int
1720peerPulse( void * vmsgs )
1721{
1722    tr_peermsgs * msgs = vmsgs;
1723    const time_t  now = time( NULL );
1724
1725    ratePulse( msgs );
1726
1727    /*tr_peerIoTryRead( msgs->io );*/
1728    pumpRequestQueue( msgs, now );
1729    expireOldRequests( msgs, now );
1730
1731    if( msgs->sendingBlock )
1732    {
1733        const size_t uploadMax = tr_peerIoGetWriteBufferSpace( msgs->io );
1734        size_t       len = EVBUFFER_LENGTH( msgs->outBlock );
1735        const size_t outlen = MIN( len, uploadMax );
1736
1737        assert( len );
1738
1739        if( outlen )
1740        {
1741            tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen, TRUE );
1742            evbuffer_drain( msgs->outBlock, outlen );
1743
1744            len -= outlen;
1745            msgs->clientSentAnythingAt = now;
1746            msgs->sendingBlock = len != 0;
1747
1748            dbgmsg( msgs, "wrote %zu bytes; %zu left in block", outlen, len );
1749        }
1750        else dbgmsg( msgs, "stalled writing block... uploadMax %lu, outlen %lu", uploadMax, outlen );
1751    }
1752
1753    if( !msgs->sendingBlock )
1754    {
1755        struct peer_request req;
1756        const int haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1757
1758        if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1759        {
1760            dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1761            msgs->outMessagesBatchedAt = now;
1762        }
1763        else if( ( haveMessages ) &&
1764                 ( ( now - msgs->outMessagesBatchedAt ) > msgs->outMessagesBatchPeriod ) )
1765        {
1766            dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->io, EVBUFFER_LENGTH( msgs->outMessages ) );
1767            tr_peerIoWriteBuf( msgs->io, msgs->outMessages, FALSE );
1768            msgs->clientSentAnythingAt = now;
1769            msgs->outMessagesBatchedAt = 0;
1770            msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1771        }
1772        else if( !EVBUFFER_LENGTH( msgs->outBlock )
1773               && popNextRequest( msgs, &req )
1774               && requestIsValid( msgs, &req )
1775               && tr_cpPieceIsComplete( msgs->torrent->completion,
1776                                        req.index ) )
1777        {
1778            uint8_t * buf = tr_new( uint8_t, req.length );
1779            const int err = tr_ioRead( msgs->torrent,
1780                                       req.index, req.offset, req.length,
1781                                       buf );
1782            if( err )
1783            {
1784                fireError( msgs, err );
1785            }
1786            else
1787            {
1788                tr_peerIo *       io = msgs->io;
1789                struct evbuffer * out = msgs->outBlock;
1790
1791                dbgmsg( msgs, "sending block %u:%u->%u", req.index,
1792                        req.offset,
1793                        req.length );
1794                tr_peerIoWriteUint32(
1795                    io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) +
1796                    req.length );
1797                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1798                tr_peerIoWriteUint32( io, out, req.index );
1799                tr_peerIoWriteUint32( io, out, req.offset );
1800                tr_peerIoWriteBytes ( io, out, buf, req.length );
1801                msgs->sendingBlock = 1;
1802            }
1803
1804            tr_free( buf );
1805        }
1806        else if( ( !haveMessages )
1807               && ( now - msgs->clientSentAnythingAt ) >
1808                KEEPALIVE_INTERVAL_SECS )
1809        {
1810            sendKeepalive( msgs );
1811        }
1812    }
1813
1814    return TRUE; /* loop forever */
1815}
1816
1817void
1818tr_peerMsgsPulse( tr_peermsgs * msgs )
1819{
1820    if( msgs != NULL )
1821        peerPulse( msgs );
1822}
1823
1824static void
1825gotError( struct tr_iobuf  * iobuf UNUSED,
1826          short              what,
1827          void             * vmsgs )
1828{
1829    if( what & EVBUFFER_TIMEOUT )
1830        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1831    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1832        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1833               what, errno, tr_strerror( errno ) );
1834    fireError( vmsgs, ENOTCONN );
1835}
1836
1837static void
1838sendBitfield( tr_peermsgs * msgs )
1839{
1840    struct evbuffer * out = msgs->outMessages;
1841    tr_bitfield *     field;
1842    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
1843    size_t            i;
1844    size_t            lazyCount = 0;
1845
1846    field = tr_bitfieldDup( tr_cpPieceBitfield( msgs->torrent->completion ) );
1847
1848#if 0
1849    if( tr_sessionIsLazyBitfieldEnabled( msgs->session ) )
1850    {
1851        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
1852            speed over a truly random sample -- let's limit the pool size to
1853            the first 1000 pieces so large torrents don't bog things down */
1854        size_t poolSize;
1855        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
1856        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
1857
1858        /* build the pool */
1859        for( i=poolSize=0; i<maxPoolSize; ++i )
1860            if( tr_bitfieldHas( field, i ) )
1861                pool[poolSize++] = i;
1862
1863        /* pull random piece indices from the pool */
1864        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
1865        {
1866            const int pos = tr_cryptoWeakRandInt( poolSize );
1867            const tr_piece_index_t piece = pool[pos];
1868            tr_bitfieldRem( field, piece );
1869            lazyPieces[lazyCount++] = piece;
1870            pool[pos] = pool[--poolSize];
1871        }
1872
1873        /* cleanup */
1874        tr_free( pool );
1875    }
1876#endif
1877
1878    tr_peerIoWriteUint32( msgs->io, out,
1879                          sizeof( uint8_t ) + field->byteCount );
1880    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1881    tr_peerIoWriteBytes ( msgs->io, out, field->bits, field->byteCount );
1882    dbgmsg( msgs, "sending bitfield... outMessage size is now %d",
1883           (int)EVBUFFER_LENGTH( out ) );
1884    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1885
1886    for( i = 0; i < lazyCount; ++i )
1887        protocolSendHave( msgs, lazyPieces[i] );
1888
1889    tr_bitfieldFree( field );
1890}
1891
1892/**
1893***
1894**/
1895
1896/* some peers give us error messages if we send
1897   more than this many peers in a single pex message
1898   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1899#define MAX_PEX_ADDED 50
1900#define MAX_PEX_DROPPED 50
1901
1902typedef struct
1903{
1904    tr_pex *  added;
1905    tr_pex *  dropped;
1906    tr_pex *  elements;
1907    int       addedCount;
1908    int       droppedCount;
1909    int       elementCount;
1910}
1911PexDiffs;
1912
1913static void
1914pexAddedCb( void * vpex,
1915            void * userData )
1916{
1917    PexDiffs * diffs = userData;
1918    tr_pex *   pex = vpex;
1919
1920    if( diffs->addedCount < MAX_PEX_ADDED )
1921    {
1922        diffs->added[diffs->addedCount++] = *pex;
1923        diffs->elements[diffs->elementCount++] = *pex;
1924    }
1925}
1926
1927static void
1928pexDroppedCb( void * vpex,
1929              void * userData )
1930{
1931    PexDiffs * diffs = userData;
1932    tr_pex *   pex = vpex;
1933
1934    if( diffs->droppedCount < MAX_PEX_DROPPED )
1935    {
1936        diffs->dropped[diffs->droppedCount++] = *pex;
1937    }
1938}
1939
1940static void
1941pexElementCb( void * vpex,
1942              void * userData )
1943{
1944    PexDiffs * diffs = userData;
1945    tr_pex *   pex = vpex;
1946
1947    diffs->elements[diffs->elementCount++] = *pex;
1948}
1949
1950static void
1951sendPex( tr_peermsgs * msgs )
1952{
1953    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
1954    {
1955        int               i;
1956        tr_pex *          newPex = NULL;
1957        const int         newCount = tr_peerMgrGetPeers(
1958            msgs->session->peerMgr, msgs->torrent->info.hash, &newPex );
1959        PexDiffs          diffs;
1960        tr_benc           val;
1961        uint8_t *         tmp, *walk;
1962        char *            benc;
1963        int               bencLen;
1964        struct evbuffer * out = msgs->outMessages;
1965
1966        /* build the diffs */
1967        diffs.added = tr_new( tr_pex, newCount );
1968        diffs.addedCount = 0;
1969        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1970        diffs.droppedCount = 0;
1971        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1972        diffs.elementCount = 0;
1973        tr_set_compare( msgs->pex, msgs->pexCount,
1974                        newPex, newCount,
1975                        tr_pexCompare, sizeof( tr_pex ),
1976                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
1977        dbgmsg(
1978            msgs,
1979            "pex: old peer count %d, new peer count %d, added %d, removed %d",
1980            msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1981
1982        /* update peer */
1983        tr_free( msgs->pex );
1984        msgs->pex = diffs.elements;
1985        msgs->pexCount = diffs.elementCount;
1986
1987        /* build the pex payload */
1988        tr_bencInitDict( &val, 3 );
1989
1990        /* "added" */
1991        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1992        for( i = 0; i < diffs.addedCount; ++i )
1993        {
1994            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1995            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1996        }
1997        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1998        tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
1999        tr_free( tmp );
2000
2001        /* "added.f" */
2002        tmp = walk = tr_new( uint8_t, diffs.addedCount );
2003        for( i = 0; i < diffs.addedCount; ++i )
2004            *walk++ = diffs.added[i].flags;
2005        assert( ( walk - tmp ) == diffs.addedCount );
2006        tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2007        tr_free( tmp );
2008
2009        /* "dropped" */
2010        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2011        for( i = 0; i < diffs.droppedCount; ++i )
2012        {
2013            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
2014            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2015        }
2016        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2017        tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2018        tr_free( tmp );
2019
2020        /* write the pex message */
2021        benc = tr_bencSave( &val, &bencLen );
2022        tr_peerIoWriteUint32( msgs->io, out,
2023                              2 * sizeof( uint8_t ) + bencLen );
2024        tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
2025        tr_peerIoWriteUint8 ( msgs->io, out, msgs->ut_pex_id );
2026        tr_peerIoWriteBytes ( msgs->io, out, benc, bencLen );
2027        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
2028        dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
2029
2030        /* cleanup */
2031        tr_free( benc );
2032        tr_bencFree( &val );
2033        tr_free( diffs.added );
2034        tr_free( diffs.dropped );
2035        tr_free( newPex );
2036
2037        msgs->clientSentPexAt = time( NULL );
2038    }
2039}
2040
2041static int
2042pexPulse( void * vpeer )
2043{
2044    sendPex( vpeer );
2045    return TRUE;
2046}
2047
2048/**
2049***
2050**/
2051
2052tr_peermsgs*
2053tr_peerMsgsNew( struct tr_torrent * torrent,
2054                struct tr_peer *    info,
2055                tr_delivery_func    func,
2056                void *              userData,
2057                tr_publisher_tag *  setme )
2058{
2059    tr_peermsgs * m;
2060
2061    assert( info );
2062    assert( info->io );
2063
2064    m = tr_new0( tr_peermsgs, 1 );
2065    m->publisher = tr_publisherNew( );
2066    m->info = info;
2067    m->session = torrent->session;
2068    m->torrent = torrent;
2069    m->io = info->io;
2070    m->info->clientIsChoked = 1;
2071    m->info->peerIsChoked = 1;
2072    m->info->clientIsInterested = 0;
2073    m->info->peerIsInterested = 0;
2074    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
2075    m->state = AWAITING_BT_LENGTH;
2076    m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL );
2077    m->outMessages = evbuffer_new( );
2078    m->outMessagesBatchedAt = 0;
2079    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2080    m->incoming.block = evbuffer_new( );
2081    m->outBlock = evbuffer_new( );
2082    m->peerAllowedPieces = NULL;
2083    m->peerAskedFor = REQUEST_LIST_INIT;
2084    m->peerAskedForFast = REQUEST_LIST_INIT;
2085    m->clientAskedFor = REQUEST_LIST_INIT;
2086    m->clientWillAskFor = REQUEST_LIST_INIT;
2087    *setme = tr_publisherSubscribe( m->publisher, func, userData );
2088
2089    if( tr_peerIoSupportsLTEP( m->io ) )
2090        sendLtepHandshake( m );
2091
2092    sendBitfield( m );
2093
2094    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of
2095                                             inactivity */
2096    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
2097    ratePulse( m );
2098
2099    return m;
2100}
2101
2102void
2103tr_peerMsgsFree( tr_peermsgs* msgs )
2104{
2105    if( msgs )
2106    {
2107        tr_timerFree( &msgs->pexTimer );
2108        tr_publisherFree( &msgs->publisher );
2109        reqListClear( &msgs->clientWillAskFor );
2110        reqListClear( &msgs->clientAskedFor );
2111        reqListClear( &msgs->peerAskedForFast );
2112        reqListClear( &msgs->peerAskedFor );
2113        tr_bitfieldFree( msgs->peerAllowedPieces );
2114        evbuffer_free( msgs->incoming.block );
2115        evbuffer_free( msgs->outMessages );
2116        evbuffer_free( msgs->outBlock );
2117        tr_free( msgs->pex );
2118
2119        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2120        tr_free( msgs );
2121    }
2122}
2123
2124void
2125tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2126                        tr_publisher_tag tag )
2127{
2128    tr_publisherUnsubscribe( peer->publisher, tag );
2129}
2130
Note: See TracBrowser for help on using the repository browser.