source: trunk/libtransmission/peer-msgs.c @ 6616

Last change on this file since 6616 was 6616, checked in by charles, 13 years ago

(libT) remove some dead functions: tr_calloc(), tr_compareUint16(), tr_compareUint32()

  • Property svn:keywords set to Date Rev Author Id
File size: 57.7 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 6616 2008-08-21 19:03:56Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19#include <libgen.h> /* basename */
20
21#include <event.h>
22
23#include "transmission.h"
24#include "bencode.h"
25#include "completion.h"
26#include "crypto.h"
27#include "inout.h"
28#include "peer-io.h"
29#include "peer-mgr.h"
30#include "peer-mgr-private.h"
31#include "peer-msgs.h"
32#include "ratecontrol.h"
33#include "stats.h"
34#include "torrent.h"
35#include "trevent.h"
36#include "utils.h"
37
38/**
39***
40**/
41
42enum
43{
44    BT_CHOKE                = 0,
45    BT_UNCHOKE              = 1,
46    BT_INTERESTED           = 2,
47    BT_NOT_INTERESTED       = 3,
48    BT_HAVE                 = 4,
49    BT_BITFIELD             = 5,
50    BT_REQUEST              = 6,
51    BT_PIECE                = 7,
52    BT_CANCEL               = 8,
53    BT_PORT                 = 9,
54    BT_SUGGEST              = 13,
55    BT_HAVE_ALL             = 14,
56    BT_HAVE_NONE            = 15,
57    BT_REJECT               = 16,
58    BT_ALLOWED_FAST         = 17,
59    BT_LTEP                 = 20,
60
61    LTEP_HANDSHAKE          = 0,
62
63    TR_LTEP_PEX             = 1,
64
65    MIN_CHOKE_PERIOD_SEC    = (10),
66
67    /* idle seconds before we send a keepalive */
68    KEEPALIVE_INTERVAL_SECS = 100,
69
70    PEX_INTERVAL            = (90 * 1000), /* msec between sendPex() calls */
71    PEER_PULSE_INTERVAL     = (50),        /* msec between peerPulse() calls */
72    RATE_PULSE_INTERVAL     = (250),       /* msec between ratePulse() calls */
73
74    MAX_QUEUE_SIZE          = (100),
75     
76    /* (fast peers) max number of pieces we fast-allow to another peer */
77    MAX_FAST_ALLOWED_COUNT   = 10,
78
79    /* (fast peers) max threshold for allowing fast-pieces requests */
80    MAX_FAST_ALLOWED_THRESHOLD = 10,
81
82    /* how long an unsent request can stay queued before it's returned
83       back to the peer-mgr's pool of requests */
84    QUEUED_REQUEST_TTL_SECS = 20,
85
86    /* how long a sent request can stay queued before it's returned
87       back to the peer-mgr's pool of requests */
88    SENT_REQUEST_TTL_SECS = 240,
89
90    /* used in lowering the outMessages queue period */
91    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
92    HIGH_PRIORITY_INTERVAL_SECS = 2,
93    LOW_PRIORITY_INTERVAL_SECS = 20,
94
95    /* number of pieces to remove from the bitfield when
96     * lazy bitfields are turned on */
97    LAZY_PIECE_COUNT = 26
98};
99
100/**
101***  REQUEST MANAGEMENT
102**/
103
104enum
105{
106    AWAITING_BT_LENGTH,
107    AWAITING_BT_ID,
108    AWAITING_BT_MESSAGE,
109    AWAITING_BT_PIECE
110};
111
112struct peer_request
113{
114    uint32_t index;
115    uint32_t offset;
116    uint32_t length;
117    time_t time_requested;
118};
119
120static int
121tr_compareUint32( uint32_t a, uint32_t b )
122{
123    if( a < b ) return -1;
124    if( a > b ) return 1;
125    return 0;
126}
127
128static int
129compareRequest( const void * va, const void * vb )
130{
131    int i;
132    const struct peer_request * a = va;
133    const struct peer_request * b = vb;
134    if(( i = tr_compareUint32( a->index, b->index ))) return i;
135    if(( i = tr_compareUint32( a->offset, b->offset ))) return i;
136    if(( i = tr_compareUint32( a->length, b->length ))) return i;
137    return 0;
138}
139
140struct request_list
141{
142    uint16_t count;
143    uint16_t max;
144    struct peer_request * requests;
145};
146
147static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
148
149static void
150reqListReserve( struct request_list * list, uint16_t max )
151{
152    if( list->max < max )
153    {
154        list->max = max;
155        list->requests = tr_renew( struct peer_request,
156                                   list->requests,
157                                   list->max );
158    }
159}
160
161static void
162reqListClear( struct request_list * list )
163{
164    tr_free( list->requests );
165    *list = REQUEST_LIST_INIT;
166}
167
168static void
169reqListCopy( struct request_list * dest, const struct request_list * src )
170{
171    dest->count = dest->max = src->count;
172    dest->requests = tr_memdup( src->requests, dest->count * sizeof( struct peer_request ) );
173}
174
175static void
176reqListRemoveOne( struct request_list * list, int i )
177{
178    assert( 0<=i && i<list->count );
179
180    memmove( &list->requests[i],
181             &list->requests[i+1],
182             sizeof( struct peer_request ) * ( --list->count - i ) );
183}
184
185static void
186reqListAppend( struct request_list * list, const struct peer_request * req )
187{
188    if( ++list->count >= list->max )
189        reqListReserve( list, list->max + 8 );
190
191    list->requests[list->count-1] = *req;
192}
193
194static void
195reqListAppendPiece( const tr_torrent     * tor,
196                    struct request_list  * list,
197                    tr_piece_index_t       piece )
198{
199    const time_t now = time( NULL );
200    const size_t n = tr_torPieceCountBlocks( tor, piece );
201    const tr_block_index_t begin = tr_torPieceFirstBlock( tor, piece );
202    const tr_block_index_t end = begin + n;
203    tr_block_index_t i;
204
205    if( list->count + n >= list->max )
206        reqListReserve( list, list->max + n );
207
208    for( i=begin; i<end; ++i ) {
209        if( !tr_cpBlockIsComplete( tor->completion, i ) ) {
210            struct peer_request * req = list->requests + list->count++;
211            req->index = piece;
212            req->offset = (i * tor->blockSize) - (piece * tor->info.pieceSize);
213            req->length = tr_torBlockCountBytes( tor, i );
214            req->time_requested = now;
215            assert( tr_torrentReqIsValid( tor, req->index, req->offset, req->length ) );
216        }
217    }
218}
219
220static tr_errno
221reqListPop( struct request_list * list, struct peer_request * setme )
222{
223    tr_errno err;
224
225    if( !list->count )
226        err = TR_ERROR;
227    else {
228        *setme = list->requests[0];
229        reqListRemoveOne( list, 0 );
230        err = TR_OK;
231    }
232
233    return err;
234}
235
236static int
237reqListHasPiece( struct request_list * list, const tr_piece_index_t piece )
238{
239    uint16_t i;
240
241    for( i=0; i<list->count; ++i )
242        if( list->requests[i].index == piece )
243            return 1;
244
245    return 0;
246}
247
248static int
249reqListFind( struct request_list * list, const struct peer_request * key )
250{
251    uint16_t i;
252
253    for( i=0; i<list->count; ++i )
254        if( !compareRequest( key, list->requests+i ) )
255            return i;
256
257    return -1;
258}
259
260static tr_errno
261reqListRemove( struct request_list * list, const struct peer_request * key )
262{
263    tr_errno err;
264    const int i = reqListFind( list, key );
265
266    if( i < 0 )
267        err = TR_ERROR;
268    else {
269        err = TR_OK;
270        reqListRemoveOne( list, i );
271    }
272
273    return err;
274}
275
276/**
277***
278**/
279
280/* this is raw, unchanged data from the peer regarding
281 * the current message that it's sending us. */
282struct tr_incoming
283{
284    uint8_t id;
285    uint32_t length; /* includes the +1 for id length */
286    struct peer_request blockReq; /* metadata for incoming blocks */
287    struct evbuffer * block; /* piece data for incoming blocks */
288};
289
290struct tr_peermsgs
291{
292    unsigned int peerSentBitfield         : 1;
293    unsigned int peerSupportsPex          : 1;
294    unsigned int clientSentLtepHandshake  : 1;
295    unsigned int peerSentLtepHandshake    : 1;
296    unsigned int sendingBlock             : 1;
297   
298    uint8_t state;
299    uint8_t ut_pex_id;
300    uint16_t pexCount;
301    uint16_t minActiveRequests;
302    uint16_t maxActiveRequests;
303
304    /* how long the outMessages batch should be allowed to grow before
305     * it's flushed -- some messages (like requests >:) should be sent
306     * very quickly; others aren't as urgent. */
307    int outMessagesBatchPeriod;
308
309    tr_peer * info;
310
311    tr_session * session;
312    tr_torrent * torrent;
313    tr_peerIo * io;
314
315    tr_publisher_t * publisher;
316
317    struct evbuffer * outBlock;    /* buffer of the current piece message */
318    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
319
320    struct request_list peerAskedFor;
321    struct request_list peerAskedForFast;
322    struct request_list clientAskedFor;
323    struct request_list clientWillAskFor;
324
325    tr_timer * rateTimer;
326    tr_timer * pulseTimer;
327    tr_timer * pexTimer;
328
329    time_t clientSentPexAt;
330    time_t clientSentAnythingAt;
331
332    /* when we started batching the outMessages */
333    time_t outMessagesBatchedAt;
334   
335    tr_bitfield * peerAllowedPieces;
336
337    struct tr_incoming incoming; 
338
339    tr_pex * pex;
340};
341
342/**
343***
344**/
345
346static void
347myDebug( const char * file, int line,
348         const struct tr_peermsgs * msgs,
349         const char * fmt, ... )
350{
351    FILE * fp = tr_getLog( );
352    if( fp )
353    {
354        va_list args;
355        char timestr[64];
356        struct evbuffer * buf = evbuffer_new( );
357        char * myfile = tr_strdup( file );
358
359        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
360                             tr_getLogTimeStr( timestr, sizeof(timestr) ),
361                             msgs->torrent->info.name,
362                             tr_peerIoGetAddrStr( msgs->io ),
363                             msgs->info->client );
364        va_start( args, fmt );
365        evbuffer_add_vprintf( buf, fmt, args );
366        va_end( args );
367        evbuffer_add_printf( buf, " (%s:%d)\n", basename(myfile), line );
368        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
369
370        tr_free( myfile );
371        evbuffer_free( buf );
372    }
373}
374
375#define dbgmsg(msgs, fmt...) myDebug(__FILE__, __LINE__, msgs, ##fmt )
376
377/**
378***
379**/
380
381static void
382pokeBatchPeriod( tr_peermsgs * msgs, int interval )
383{
384    if( msgs->outMessagesBatchPeriod > interval )
385    {
386        msgs->outMessagesBatchPeriod = interval;
387        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
388    }
389}
390
391static void
392protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
393{
394    tr_peerIo * io = msgs->io;
395    struct evbuffer * out = msgs->outMessages;
396
397    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
398    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
399    tr_peerIoWriteUint32( io, out, req->index );
400    tr_peerIoWriteUint32( io, out, req->offset );
401    tr_peerIoWriteUint32( io, out, req->length );
402    dbgmsg( msgs, "requesting %u:%u->%u... outMessage size is now %d",
403            req->index, req->offset, req->length, (int)EVBUFFER_LENGTH(out) );
404    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
405}
406
407static void
408protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
409{
410    tr_peerIo * io = msgs->io;
411    struct evbuffer * out = msgs->outMessages;
412
413    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
414    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
415    tr_peerIoWriteUint32( io, out, req->index );
416    tr_peerIoWriteUint32( io, out, req->offset );
417    tr_peerIoWriteUint32( io, out, req->length );
418    dbgmsg( msgs, "cancelling %u:%u->%u... outMessage size is now %d",
419            req->index, req->offset, req->length, (int)EVBUFFER_LENGTH(out) );
420    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
421}
422
423static void
424protocolSendHave( tr_peermsgs * msgs, uint32_t index )
425{
426    tr_peerIo * io = msgs->io;
427    struct evbuffer * out = msgs->outMessages;
428
429    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
430    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
431    tr_peerIoWriteUint32( io, out, index );
432    dbgmsg( msgs, "sending Have %u.. outMessage size is now %d",
433            index, (int)EVBUFFER_LENGTH(out) );
434    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
435}
436
437static void
438protocolSendChoke( tr_peermsgs * msgs, int choke )
439{
440    tr_peerIo * io = msgs->io;
441    struct evbuffer * out = msgs->outMessages;
442
443    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) );
444    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
445    dbgmsg( msgs, "sending %s... outMessage size is now %d",
446                  (choke ? "Choke" : "Unchoke"),
447                  (int)EVBUFFER_LENGTH(out) );
448    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
449}
450
451/**
452***  EVENTS
453**/
454
455static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0 };
456
457static void
458publish( tr_peermsgs * msgs, tr_peer_event * e )
459{
460    tr_publisherPublish( msgs->publisher, msgs->info, e );
461}
462
463static void
464fireError( tr_peermsgs * msgs, tr_errno err )
465{
466    tr_peer_event e = blankEvent;
467    e.eventType = TR_PEER_ERROR;
468    e.err = err;
469    publish( msgs, &e );
470}
471
472static void
473fireNeedReq( tr_peermsgs * msgs )
474{
475    tr_peer_event e = blankEvent;
476    e.eventType = TR_PEER_NEED_REQ;
477    publish( msgs, &e );
478}
479
480static void
481firePeerProgress( tr_peermsgs * msgs )
482{
483    tr_peer_event e = blankEvent;
484    e.eventType = TR_PEER_PEER_PROGRESS;
485    e.progress = msgs->info->progress;
486    publish( msgs, &e );
487}
488
489static void
490fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
491{
492    tr_peer_event e = blankEvent;
493    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
494    e.pieceIndex = req->index;
495    e.offset = req->offset;
496    e.length = req->length;
497    publish( msgs, &e );
498}
499
500static void
501fireClientGotData( tr_peermsgs * msgs, uint32_t length )
502{
503    tr_peer_event e = blankEvent;
504    e.length = length;
505    e.eventType = TR_PEER_CLIENT_GOT_DATA;
506    publish( msgs, &e );
507}
508
509static void
510firePeerGotData( tr_peermsgs * msgs, uint32_t length )
511{
512    tr_peer_event e = blankEvent;
513    e.length = length;
514    e.eventType = TR_PEER_PEER_GOT_DATA;
515    publish( msgs, &e );
516}
517
518static void
519fireCancelledReq( tr_peermsgs * msgs, const tr_piece_index_t pieceIndex )
520{
521    tr_peer_event e = blankEvent;
522    e.eventType = TR_PEER_CANCEL;
523    e.pieceIndex = pieceIndex;
524    publish( msgs, &e );
525}
526
527/**
528***  INTEREST
529**/
530
531static int
532isPieceInteresting( const tr_peermsgs   * peer,
533                    tr_piece_index_t      piece )
534{
535    const tr_torrent * torrent = peer->torrent;
536
537    return ( ( !torrent->info.pieces[piece].dnd )               /* we want it */
538          && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have */
539          && ( tr_bitfieldHas( peer->info->have, piece ) ) );  /* peer has it */
540}
541
542/* "interested" means we'll ask for piece data if they unchoke us */
543static int
544isPeerInteresting( const tr_peermsgs * msgs )
545{
546    tr_piece_index_t i;
547    const tr_torrent * torrent;
548    const tr_bitfield * bitfield;
549    const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
550
551    if( clientIsSeed )
552        return FALSE;
553
554    torrent = msgs->torrent;
555    bitfield = tr_cpPieceBitfield( torrent->completion );
556
557    if( !msgs->info->have )
558        return TRUE;
559
560    assert( bitfield->byteCount == msgs->info->have->byteCount );
561
562    for( i=0; i<torrent->info.pieceCount; ++i )
563        if( isPieceInteresting( msgs, i ) )
564            return TRUE;
565
566    return FALSE;
567}
568
569static void
570sendInterest( tr_peermsgs * msgs, int weAreInterested )
571{
572    struct evbuffer * out = msgs->outMessages;
573
574    assert( msgs );
575    assert( weAreInterested==0 || weAreInterested==1 );
576
577    msgs->info->clientIsInterested = weAreInterested;
578    dbgmsg( msgs, "Sending %s", weAreInterested ? "Interested" : "Not Interested");
579    tr_peerIoWriteUint32( msgs->io, out, sizeof(uint8_t) );
580    tr_peerIoWriteUint8 ( msgs->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
581    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
582    dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH(out) );
583}
584
585static void
586updateInterest( tr_peermsgs * msgs )
587{
588    const int i = isPeerInteresting( msgs );
589    if( i != msgs->info->clientIsInterested )
590        sendInterest( msgs, i );
591    if( i )
592        fireNeedReq( msgs );
593}
594
595static void
596cancelAllRequestsToClientExceptFast( tr_peermsgs * msgs )
597{
598    reqListClear( &msgs->peerAskedFor );
599}
600
601void
602tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
603{
604    const time_t fibrillationTime = time(NULL) - MIN_CHOKE_PERIOD_SEC;
605
606    assert( msgs );
607    assert( msgs->info );
608    assert( choke==0 || choke==1 );
609
610    if( msgs->info->chokeChangedAt > fibrillationTime )
611    {
612        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
613    }
614    else if( msgs->info->peerIsChoked != choke )
615    {
616        msgs->info->peerIsChoked = choke;
617        if( choke )
618            cancelAllRequestsToClientExceptFast( msgs );
619        protocolSendChoke( msgs, choke );
620        msgs->info->chokeChangedAt = time( NULL );
621    }
622}
623
624/**
625***
626**/
627
628void
629tr_peerMsgsHave( tr_peermsgs * msgs,
630                 uint32_t      index )
631{
632    protocolSendHave( msgs, index );
633
634    /* since we have more pieces now, we might not be interested in this peer */
635    updateInterest( msgs );
636}
637#if 0
638static void
639sendFastSuggest( tr_peermsgs * msgs,
640                 uint32_t      pieceIndex )
641{
642    assert( msgs );
643   
644    if( tr_peerIoSupportsFEXT( msgs->io ) )
645    {
646        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
647        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
648        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
649    }
650}
651static void
652sendFastHave( tr_peermsgs * msgs, int all )
653{
654    assert( msgs );
655   
656    if( tr_peerIoSupportsFEXT( msgs->io ) )
657    {
658        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
659        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, ( all ? BT_HAVE_ALL
660                                                                : BT_HAVE_NONE ) );
661        updateInterest( msgs );
662    }
663}
664#endif
665
666static void
667sendFastReject( tr_peermsgs * msgs,
668                uint32_t      pieceIndex,
669                uint32_t      offset,
670                uint32_t      length )
671{
672    assert( msgs );
673
674    if( tr_peerIoSupportsFEXT( msgs->io ) )
675    {
676        struct evbuffer * out = msgs->outMessages;
677        const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
678        dbgmsg( msgs, "sending fast reject %u:%u->%u", pieceIndex, offset, length );
679        tr_peerIoWriteUint32( msgs->io, out, len );
680        tr_peerIoWriteUint8( msgs->io, out, BT_REJECT );
681        tr_peerIoWriteUint32( msgs->io, out, pieceIndex );
682        tr_peerIoWriteUint32( msgs->io, out, offset );
683        tr_peerIoWriteUint32( msgs->io, out, length );
684        pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
685        dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH(out) );
686    }
687}
688
689static tr_bitfield*
690getPeerAllowedPieces( tr_peermsgs * msgs )
691{
692    if( !msgs->peerAllowedPieces && tr_peerIoSupportsFEXT( msgs->io ) )
693    {
694        msgs->peerAllowedPieces = tr_peerMgrGenerateAllowedSet(
695            MAX_FAST_ALLOWED_COUNT,
696            msgs->torrent->info.pieceCount,
697            msgs->torrent->info.hash,
698            tr_peerIoGetAddress( msgs->io, NULL ) );
699    }
700
701    return msgs->peerAllowedPieces;
702}
703
704static void
705sendFastAllowed( tr_peermsgs * msgs,
706                 uint32_t      pieceIndex)
707{
708    assert( msgs );
709   
710    if( tr_peerIoSupportsFEXT( msgs->io ) )
711    {
712        struct evbuffer * out = msgs->outMessages;
713        dbgmsg( msgs, "sending fast allowed" );
714        tr_peerIoWriteUint32( msgs->io, out,  sizeof(uint8_t) + sizeof(uint32_t) );
715        tr_peerIoWriteUint8( msgs->io, out, BT_ALLOWED_FAST );
716        tr_peerIoWriteUint32( msgs->io, out, pieceIndex );
717        pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
718        dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH(out) );
719    }
720}
721
722static void
723sendFastAllowedSet( tr_peermsgs * msgs )
724{
725    tr_piece_index_t i = 0;
726
727    while (i <= msgs->torrent->info.pieceCount )
728    {
729        if ( tr_bitfieldHas( getPeerAllowedPieces( msgs ), i) )
730            sendFastAllowed( msgs, i );
731        i++;
732    }
733}
734
735static void
736maybeSendFastAllowedSet( tr_peermsgs * msgs )
737{
738    if( tr_bitfieldCountTrueBits( msgs->info->have ) <= MAX_FAST_ALLOWED_THRESHOLD )
739        sendFastAllowedSet( msgs );
740}
741
742
743/**
744***
745**/
746
747static int
748reqIsValid( const tr_peermsgs   * peer,
749            uint32_t              index,
750            uint32_t              offset,
751            uint32_t              length )
752{
753    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
754}
755
756static int
757requestIsValid( const tr_peermsgs * peer, const struct peer_request * req )
758{
759    return reqIsValid( peer, req->index, req->offset, req->length );
760}
761
762static void
763tr_peerMsgsCancel( tr_peermsgs * msgs,
764                   uint32_t      pieceIndex )
765{
766    uint16_t i;
767    struct request_list tmp = REQUEST_LIST_INIT;
768    struct request_list * src;
769
770    src = &msgs->clientWillAskFor;
771    for( i=0; i<src->count; ++i )
772        if( src->requests[i].index != pieceIndex )
773            reqListAppend( &tmp, src->requests + i );
774
775    /* swap */
776    reqListClear( &msgs->clientWillAskFor );
777    msgs->clientWillAskFor = tmp;
778    tmp = REQUEST_LIST_INIT;
779
780    src = &msgs->clientAskedFor;
781    for( i=0; i<src->count; ++i )
782        if( src->requests[i].index == pieceIndex )
783            protocolSendCancel( msgs, src->requests + i );
784        else
785            reqListAppend( &tmp, src->requests + i );
786
787    /* swap */
788    reqListClear( &msgs->clientAskedFor );
789    msgs->clientAskedFor = tmp;
790    tmp = REQUEST_LIST_INIT;
791
792    fireCancelledReq( msgs, pieceIndex );
793}
794
795static void
796expireOldRequests( tr_peermsgs * msgs )
797{
798    int i;
799    time_t oldestAllowed;
800    struct request_list tmp = REQUEST_LIST_INIT;
801
802    /* cancel requests that have been queued for too long */
803    oldestAllowed = time( NULL ) - QUEUED_REQUEST_TTL_SECS;
804    reqListCopy( &tmp, &msgs->clientWillAskFor );
805    for( i=0; i<tmp.count; ++i ) {
806        const struct peer_request * req = &tmp.requests[i];
807        if( req->time_requested < oldestAllowed )
808            tr_peerMsgsCancel( msgs, req->index );
809    }
810    reqListClear( &tmp );
811
812    /* cancel requests that were sent too long ago */
813    oldestAllowed = time( NULL ) - SENT_REQUEST_TTL_SECS;
814    reqListCopy( &tmp, &msgs->clientAskedFor );
815    for( i=0; i<tmp.count; ++i ) {
816        const struct peer_request * req = &tmp.requests[i];
817        if( req->time_requested < oldestAllowed )
818            tr_peerMsgsCancel( msgs, req->index );
819    }
820    reqListClear( &tmp );
821}
822
823static void
824pumpRequestQueue( tr_peermsgs * msgs )
825{
826    const int max = msgs->maxActiveRequests;
827    const int min = msgs->minActiveRequests;
828    const time_t now = time( NULL );
829    int sent = 0;
830    int count = msgs->clientAskedFor.count;
831    struct peer_request req;
832
833    if( count > min )
834        return;
835    if( msgs->info->clientIsChoked )
836        return;
837
838    while( ( count < max ) && !reqListPop( &msgs->clientWillAskFor, &req ) )
839    {
840        assert( requestIsValid( msgs, &req ) );
841        assert( tr_bitfieldHas( msgs->info->have, req.index ) );
842
843        protocolSendRequest( msgs, &req );
844        req.time_requested = now;
845        reqListAppend( &msgs->clientAskedFor, &req );
846
847        ++count;
848        ++sent;
849    }
850
851    if( sent )
852        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
853                sent,
854                msgs->clientAskedFor.count,
855                msgs->clientWillAskFor.count );
856
857    if( count < max )
858        fireNeedReq( msgs );
859}
860
861static int
862peerPulse( void * vmsgs );
863
864static int
865requestQueueIsFull( const tr_peermsgs * msgs )
866{
867    const int req_max = msgs->maxActiveRequests;
868    return msgs->clientWillAskFor.count >= req_max;
869}
870
871int
872tr_peerMsgsAddRequest( tr_peermsgs       * msgs,
873                       tr_piece_index_t    piece )
874{
875    struct peer_request req;
876
877    assert( msgs );
878    assert( msgs->torrent );
879    assert( piece < msgs->torrent->info.pieceCount );
880
881    /**
882    ***  Reasons to decline the request
883    **/
884
885    /* don't send requests to choked clients */
886    if( msgs->info->clientIsChoked ) {
887        dbgmsg( msgs, "declining request because they're choking us" );
888        return TR_ADDREQ_CLIENT_CHOKED;
889    }
890
891    /* peer doesn't have this piece */
892    if( !tr_bitfieldHas( msgs->info->have, piece ) )
893        return TR_ADDREQ_MISSING;
894
895    /* peer's queue is full */
896    if( requestQueueIsFull( msgs ) ) {
897        dbgmsg( msgs, "declining request because we're full" );
898        return TR_ADDREQ_FULL;
899    }
900
901    /* have we already asked for this piece? */
902    if( reqListHasPiece( &msgs->clientAskedFor, piece ) || 
903        reqListHasPiece( &msgs->clientWillAskFor, piece ) ) {
904        dbgmsg( msgs, "declining because it's a duplicate" );
905        return TR_ADDREQ_DUPLICATE;
906    }
907
908    /**
909    ***  Accept this request
910    **/
911
912    dbgmsg( msgs, "added req for piece %lu", (unsigned long)piece );
913    req.time_requested = time( NULL );
914    reqListAppendPiece( msgs->torrent, &msgs->clientWillAskFor, piece );
915    return TR_ADDREQ_OK;
916}
917
918static void
919cancelAllRequestsToPeer( tr_peermsgs * msgs )
920{
921    int i;
922    struct request_list a = msgs->clientWillAskFor;
923    struct request_list b = msgs->clientAskedFor;
924
925    msgs->clientAskedFor = REQUEST_LIST_INIT;
926    msgs->clientWillAskFor = REQUEST_LIST_INIT;
927
928    for( i=0; i<a.count; ++i )
929        fireCancelledReq( msgs, a.requests[i].index );
930
931    for( i=0; i<b.count; ++i ) {
932        fireCancelledReq( msgs, b.requests[i].index );
933        protocolSendCancel( msgs, &b.requests[i] );
934    }
935
936    reqListClear( &a );
937    reqListClear( &b );
938}
939
940/**
941***
942**/
943
944static void
945sendLtepHandshake( tr_peermsgs * msgs )
946{
947    tr_benc val, *m;
948    char * buf;
949    int len;
950    int pex;
951    struct evbuffer * out = msgs->outMessages;
952
953    if( msgs->clientSentLtepHandshake )
954        return;
955
956    dbgmsg( msgs, "sending an ltep handshake" );
957    msgs->clientSentLtepHandshake = 1;
958
959    /* decide if we want to advertise pex support */
960    if( !tr_torrentAllowsPex( msgs->torrent ) )
961        pex = 0;
962    else if( msgs->peerSentLtepHandshake )
963        pex = msgs->peerSupportsPex ? 1 : 0;
964    else
965        pex = 1;
966
967    tr_bencInitDict( &val, 4 );
968    tr_bencDictAddInt( &val, "e", msgs->session->encryptionMode != TR_CLEAR_PREFERRED );
969    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->session ) );
970    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
971    m  = tr_bencDictAddDict( &val, "m", 1 );
972    if( pex )
973        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
974    buf = tr_bencSave( &val, &len );
975
976    tr_peerIoWriteUint32( msgs->io, out, 2*sizeof(uint8_t) + len );
977    tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
978    tr_peerIoWriteUint8 ( msgs->io, out, LTEP_HANDSHAKE );
979    tr_peerIoWriteBytes ( msgs->io, out, buf, len );
980    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
981    dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH(out) );
982
983    /* cleanup */
984    tr_bencFree( &val );
985    tr_free( buf );
986}
987
988static void
989parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
990{
991    int64_t i;
992    tr_benc val, * sub;
993    uint8_t * tmp = tr_new( uint8_t, len );
994
995    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
996    msgs->peerSentLtepHandshake = 1;
997
998    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
999        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
1000        tr_free( tmp );
1001        return;
1002    }
1003
1004    dbgmsg( msgs, "here is the ltep handshake we got [%*.*s]", len, len, tmp );
1005
1006    /* does the peer prefer encrypted connections? */
1007    if( tr_bencDictFindInt( &val, "e", &i ) )
1008        msgs->info->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1009                                              : ENCRYPTION_PREFERENCE_NO;
1010
1011    /* check supported messages for utorrent pex */
1012    msgs->peerSupportsPex = 0;
1013    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
1014        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
1015            msgs->ut_pex_id = (uint8_t) i;
1016            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
1017            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
1018        }
1019    }
1020
1021    /* get peer's listening port */
1022    if( tr_bencDictFindInt( &val, "p", &i ) ) {
1023        msgs->info->port = htons( (uint16_t)i );
1024        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
1025    }
1026
1027    tr_bencFree( &val );
1028    tr_free( tmp );
1029}
1030
1031static void
1032parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1033{
1034    int loaded = 0;
1035    uint8_t * tmp = tr_new( uint8_t, msglen );
1036    tr_benc val;
1037    const tr_torrent * tor = msgs->torrent;
1038    const uint8_t * added;
1039    size_t added_len;
1040
1041    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
1042
1043    if( tr_torrentAllowsPex( tor )
1044        && (( loaded = !tr_bencLoad( tmp, msglen, &val, NULL )))
1045        && tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1046    {
1047        const uint8_t * added_f = NULL;
1048        tr_pex * pex;
1049        size_t i, n;
1050        size_t added_f_len = 0;
1051        tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1052        pex = tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len, &n );
1053        for( i=0; i<n; ++i )
1054            tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1055                              TR_PEER_FROM_PEX, pex+i );
1056        tr_free( pex );
1057    }
1058
1059    if( loaded )
1060        tr_bencFree( &val );
1061    tr_free( tmp );
1062}
1063
1064static void
1065sendPex( tr_peermsgs * msgs );
1066
1067static void
1068parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1069{
1070    uint8_t ltep_msgid;
1071
1072    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
1073    msglen--;
1074
1075    if( ltep_msgid == LTEP_HANDSHAKE )
1076    {
1077        dbgmsg( msgs, "got ltep handshake" );
1078        parseLtepHandshake( msgs, msglen, inbuf );
1079        if( tr_peerIoSupportsLTEP( msgs->io ) )
1080        {
1081            sendLtepHandshake( msgs );
1082            sendPex( msgs );
1083        }
1084    }
1085    else if( ltep_msgid == TR_LTEP_PEX )
1086    {
1087        dbgmsg( msgs, "got ut pex" );
1088        msgs->peerSupportsPex = 1;
1089        parseUtPex( msgs, msglen, inbuf );
1090    }
1091    else
1092    {
1093        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1094        evbuffer_drain( inbuf, msglen );
1095    }
1096}
1097
1098static int
1099readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1100{
1101    uint32_t len;
1102
1103    if( inlen < sizeof(len) )
1104        return READ_MORE;
1105
1106    tr_peerIoReadUint32( msgs->io, inbuf, &len );
1107
1108    if( len == 0 ) /* peer sent us a keepalive message */
1109        dbgmsg( msgs, "got KeepAlive" );
1110    else {
1111        msgs->incoming.length = len;
1112        msgs->state = AWAITING_BT_ID;
1113    }
1114
1115    return READ_AGAIN;
1116}
1117
1118static int
1119readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen );
1120
1121static int
1122readBtId( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1123{
1124    uint8_t id;
1125
1126    if( inlen < sizeof(uint8_t) )
1127        return READ_MORE;
1128
1129    tr_peerIoReadUint8( msgs->io, inbuf, &id );
1130    msgs->incoming.id = id;
1131
1132    if( id==BT_PIECE )
1133    {
1134        msgs->state = AWAITING_BT_PIECE;
1135        return READ_AGAIN;
1136    }
1137    else if( msgs->incoming.length != 1 )
1138    {
1139        msgs->state = AWAITING_BT_MESSAGE;
1140        return READ_AGAIN;
1141    }
1142    else return readBtMessage( msgs, inbuf, inlen-1 );
1143}
1144
1145static void
1146updatePeerProgress( tr_peermsgs * msgs )
1147{
1148    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have )
1149                         / (float)msgs->torrent->info.pieceCount;
1150    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
1151    updateInterest( msgs );
1152    firePeerProgress( msgs );
1153}
1154
1155static int
1156clientCanSendFastBlock( const tr_peermsgs * msgs UNUSED )
1157{
1158    /* don't send a fast piece if peer has MAX_FAST_ALLOWED_THRESHOLD pieces */
1159    if( tr_bitfieldCountTrueBits( msgs->info->have ) > MAX_FAST_ALLOWED_THRESHOLD )
1160        return FALSE;
1161   
1162    /* ...or if we don't have ourself enough pieces */
1163    if( tr_bitfieldCountTrueBits( tr_cpPieceBitfield( msgs->torrent->completion ) ) < MAX_FAST_ALLOWED_THRESHOLD )
1164        return FALSE;
1165
1166    /* Maybe a bandwidth limit ? */
1167    return TRUE;
1168}
1169
1170static void
1171peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
1172{
1173    const int reqIsValid = requestIsValid( msgs, req );
1174    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
1175    const int peerIsChoked = msgs->info->peerIsChoked;
1176    const int peerIsFast = tr_peerIoSupportsFEXT( msgs->io );
1177    const int pieceIsFast = reqIsValid && tr_bitfieldHas( getPeerAllowedPieces( msgs ), req->index );
1178    const int canSendFast = clientCanSendFastBlock( msgs );
1179
1180    if( !reqIsValid ) /* bad request */
1181    {
1182        dbgmsg( msgs, "rejecting an invalid request." );
1183        sendFastReject( msgs, req->index, req->offset, req->length );
1184    }
1185    else if( !clientHasPiece ) /* we don't have it */
1186    {
1187        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1188        sendFastReject( msgs, req->index, req->offset, req->length );
1189    }
1190    else if( peerIsChoked && !peerIsFast ) /* doesn't he know he's choked? */
1191    {
1192        tr_peerMsgsSetChoke( msgs, 1 );
1193        sendFastReject( msgs, req->index, req->offset, req->length );
1194    }
1195    else if( peerIsChoked && peerIsFast && ( !pieceIsFast || !canSendFast ) )
1196    {
1197        sendFastReject( msgs, req->index, req->offset, req->length );
1198    }
1199    else /* YAY */
1200    {
1201        if( peerIsFast && pieceIsFast )
1202            reqListAppend( &msgs->peerAskedForFast, req );
1203        else
1204            reqListAppend( &msgs->peerAskedFor, req );
1205    }
1206}
1207
1208static int
1209messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1210{
1211    switch( id )
1212    {
1213        case BT_CHOKE:
1214        case BT_UNCHOKE:
1215        case BT_INTERESTED:
1216        case BT_NOT_INTERESTED:
1217        case BT_HAVE_ALL:
1218        case BT_HAVE_NONE:
1219            return len==1;
1220
1221        case BT_HAVE:
1222        case BT_SUGGEST:
1223        case BT_ALLOWED_FAST:
1224            return len==5;
1225
1226        case BT_BITFIELD:
1227            return len == (msg->torrent->info.pieceCount+7u)/8u + 1u;
1228       
1229        case BT_REQUEST:
1230        case BT_CANCEL:
1231        case BT_REJECT:
1232            return len==13;
1233
1234        case BT_PIECE:
1235            return len>9 && len<=16393;
1236
1237        case BT_PORT:
1238            return len==3;
1239
1240        case BT_LTEP:
1241            return len >= 2;
1242
1243        default:
1244            return FALSE;
1245    }
1246}
1247
1248static int
1249clientGotBlock( tr_peermsgs * msgs, const uint8_t * block, const struct peer_request * req );
1250
1251static void
1252clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1253{
1254    msgs->info->pieceDataActivityDate = time( NULL );
1255    tr_rcTransferred( msgs->info->rcToClient, byteCount );
1256    fireClientGotData( msgs, byteCount );
1257}
1258
1259static int
1260readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1261{
1262    struct peer_request * req = &msgs->incoming.blockReq;
1263    assert( EVBUFFER_LENGTH(inbuf) >= inlen );
1264    dbgmsg( msgs, "In readBtPiece" );
1265
1266    if( !req->length )
1267    {
1268        if( inlen < 8 )
1269            return READ_MORE;
1270
1271        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
1272        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
1273        req->length = msgs->incoming.length - 9;
1274        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1275        return READ_AGAIN;
1276    }
1277    else
1278    {
1279        int err;
1280
1281        /* read in another chunk of data */
1282        const size_t nLeft = req->length - EVBUFFER_LENGTH(msgs->incoming.block);
1283        size_t n = MIN( nLeft, inlen );
1284        uint8_t * buf = tr_new( uint8_t, n );
1285        assert( EVBUFFER_LENGTH(inbuf) >= n );
1286        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
1287        evbuffer_add( msgs->incoming.block, buf, n );
1288        clientGotBytes( msgs, n );
1289        tr_free( buf );
1290        dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
1291               (int)n, req->index, req->offset, req->length,
1292               (int)( req->length - EVBUFFER_LENGTH(msgs->incoming.block) ) );
1293        if( EVBUFFER_LENGTH(msgs->incoming.block) < req->length )
1294            return READ_MORE;
1295
1296        /* we've got the whole block ... process it */
1297        err = clientGotBlock( msgs, EVBUFFER_DATA(msgs->incoming.block), req );
1298
1299        /* cleanup */
1300        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH(msgs->incoming.block) );
1301        req->length = 0;
1302        msgs->state = AWAITING_BT_LENGTH;
1303        if( !err )
1304            return READ_AGAIN;
1305        else {
1306            fireError( msgs, err );
1307            return READ_DONE;
1308        }
1309    }
1310}
1311
1312static int
1313readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1314{
1315    uint32_t ui32;
1316    uint32_t msglen = msgs->incoming.length;
1317    const uint8_t id = msgs->incoming.id;
1318    const size_t startBufLen = EVBUFFER_LENGTH( inbuf );
1319
1320    --msglen; /* id length */
1321
1322    if( inlen < msglen )
1323        return READ_MORE;
1324
1325    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id, (int)msglen, (int)inlen );
1326
1327    if( !messageLengthIsCorrect( msgs, id, msglen+1 ) )
1328    {
1329        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1330        fireError( msgs, TR_ERROR );
1331        return READ_DONE;
1332    }
1333
1334    switch( id )
1335    {
1336        case BT_CHOKE:
1337            dbgmsg( msgs, "got Choke" );
1338            msgs->info->clientIsChoked = 1;
1339            cancelAllRequestsToPeer( msgs );
1340            cancelAllRequestsToClientExceptFast( msgs );
1341            break;
1342
1343        case BT_UNCHOKE:
1344            dbgmsg( msgs, "got Unchoke" );
1345            msgs->info->clientIsChoked = 0;
1346            fireNeedReq( msgs );
1347            break;
1348
1349        case BT_INTERESTED:
1350            dbgmsg( msgs, "got Interested" );
1351            msgs->info->peerIsInterested = 1;
1352            break;
1353
1354        case BT_NOT_INTERESTED:
1355            dbgmsg( msgs, "got Not Interested" );
1356            msgs->info->peerIsInterested = 0;
1357            break;
1358           
1359        case BT_HAVE:
1360            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1361            dbgmsg( msgs, "got Have: %u", ui32 );
1362            if( tr_bitfieldAdd( msgs->info->have, ui32 ) )
1363                fireError( msgs, TR_ERROR_PEER_MESSAGE );
1364            updatePeerProgress( msgs );
1365            tr_rcTransferred( msgs->torrent->swarmSpeed, msgs->torrent->info.pieceSize );
1366            break;
1367
1368        case BT_BITFIELD: {
1369            dbgmsg( msgs, "got a bitfield" );
1370            msgs->peerSentBitfield = 1;
1371            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
1372            updatePeerProgress( msgs );
1373            maybeSendFastAllowedSet( msgs );
1374            fireNeedReq( msgs );
1375            break;
1376        }
1377
1378        case BT_REQUEST: {
1379            struct peer_request r;
1380            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1381            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1382            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1383            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1384            peerMadeRequest( msgs, &r );
1385            break;
1386        }
1387
1388        case BT_CANCEL: {
1389            struct peer_request r;
1390            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1391            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1392            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1393            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1394            reqListRemove( &msgs->peerAskedForFast, &r );
1395            reqListRemove( &msgs->peerAskedFor, &r );
1396            break;
1397        }
1398
1399        case BT_PIECE:
1400            assert( 0 ); /* handled elsewhere! */
1401            break;
1402       
1403        case BT_PORT:
1404            dbgmsg( msgs, "Got a BT_PORT" );
1405            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1406            break;
1407           
1408        case BT_SUGGEST: {
1409            dbgmsg( msgs, "Got a BT_SUGGEST" );
1410            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1411            /* we don't do anything with this yet */
1412            break;
1413        }
1414           
1415        case BT_HAVE_ALL:
1416            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1417            tr_bitfieldAddRange( msgs->info->have, 0, msgs->torrent->info.pieceCount );
1418            updatePeerProgress( msgs );
1419            maybeSendFastAllowedSet( msgs );
1420            break;
1421       
1422       
1423        case BT_HAVE_NONE:
1424            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1425            tr_bitfieldClear( msgs->info->have );
1426            updatePeerProgress( msgs );
1427            maybeSendFastAllowedSet( msgs );
1428            break;
1429       
1430        case BT_REJECT: {
1431            struct peer_request r;
1432            dbgmsg( msgs, "Got a BT_REJECT" );
1433            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1434            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1435            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1436            reqListRemove( &msgs->clientAskedFor, &r );
1437            break;
1438        }
1439
1440        case BT_ALLOWED_FAST: {
1441            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1442            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1443            /* we don't do anything with this yet */
1444            break;
1445        }
1446
1447        case BT_LTEP:
1448            dbgmsg( msgs, "Got a BT_LTEP" );
1449            parseLtep( msgs, msglen, inbuf );
1450            break;
1451
1452        default:
1453            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1454            tr_peerIoDrain( msgs->io, inbuf, msglen );
1455            break;
1456    }
1457
1458    assert( msglen + 1 == msgs->incoming.length );
1459    assert( EVBUFFER_LENGTH(inbuf) == startBufLen - msglen );
1460
1461    msgs->state = AWAITING_BT_LENGTH;
1462    return READ_AGAIN;
1463}
1464
1465static void
1466peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1467{
1468    msgs->info->pieceDataActivityDate = time( NULL );
1469    tr_rcTransferred( msgs->info->rcToPeer, byteCount );
1470    firePeerGotData( msgs, byteCount );
1471}
1472
1473static size_t
1474getDownloadMax( const tr_peermsgs * msgs )
1475{
1476    static const size_t maxval = ~0;
1477    const tr_torrent * tor = msgs->torrent;
1478
1479    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1480        return tor->handle->useDownloadLimit
1481            ? tr_rcBytesLeft( tor->handle->download ) : maxval;
1482
1483    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
1484        return tr_rcBytesLeft( tor->download );
1485
1486    return maxval;
1487}
1488
1489static void
1490decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount )
1491{
1492    tr_torrent * tor = msgs->torrent;
1493    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1494}
1495
1496static void
1497clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1498{
1499    decrementDownloadedCount( msgs, req->length );
1500}
1501
1502static void
1503addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1504{
1505    if( !msgs->info->blame )
1506         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1507    tr_bitfieldAdd( msgs->info->blame, index );
1508}
1509
1510static tr_errno
1511clientGotBlock( tr_peermsgs                * msgs,
1512                const uint8_t              * data,
1513                const struct peer_request  * req )
1514{
1515    int err;
1516    tr_torrent * tor = msgs->torrent;
1517    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1518
1519    assert( msgs );
1520    assert( req );
1521
1522    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) )
1523    {
1524        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1525                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1526        return TR_ERROR;
1527    }
1528
1529    /* save the block */
1530    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1531
1532    /**
1533    *** Remove the block from our `we asked for this' list
1534    **/
1535
1536    if( reqListRemove( &msgs->clientAskedFor, req ) )
1537    {
1538        clientGotUnwantedBlock( msgs, req );
1539        dbgmsg( msgs, "we didn't ask for this message..." );
1540        return 0;
1541    }
1542
1543    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1544                  msgs->clientAskedFor.count );
1545
1546    /**
1547    *** Error checks
1548    **/
1549
1550    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1551        dbgmsg( msgs, "we have this block already..." );
1552        clientGotUnwantedBlock( msgs, req );
1553        return 0;
1554    }
1555
1556    /**
1557    ***  Save the block
1558    **/
1559
1560    msgs->info->peerSentPieceDataAt = time( NULL );
1561    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1562        return err;
1563
1564    addPeerToBlamefield( msgs, req->index );
1565    fireGotBlock( msgs, req );
1566    return 0;
1567}
1568
1569static void
1570didWrite( struct bufferevent * evin UNUSED, void * vmsgs )
1571{
1572    peerPulse( vmsgs );
1573}
1574
1575static ReadState
1576canRead( struct bufferevent * evin, void * vmsgs )
1577{
1578    ReadState ret;
1579    tr_peermsgs * msgs = vmsgs;
1580    struct evbuffer * in = EVBUFFER_INPUT ( evin );
1581    const size_t inlen = EVBUFFER_LENGTH( in );
1582
1583    if( !inlen )
1584    {
1585        ret = READ_DONE;
1586    }
1587    else if( msgs->state == AWAITING_BT_PIECE )
1588    {
1589        const size_t downloadMax = getDownloadMax( msgs );
1590        const size_t n = MIN( inlen, downloadMax );
1591        ret = n ? readBtPiece( msgs, in, n ) : READ_DONE;
1592    }
1593    else switch( msgs->state )
1594    {
1595        case AWAITING_BT_LENGTH:  ret = readBtLength ( msgs, in, inlen ); break;
1596        case AWAITING_BT_ID:      ret = readBtId     ( msgs, in, inlen ); break;
1597        case AWAITING_BT_MESSAGE: ret = readBtMessage( msgs, in, inlen ); break;
1598        default:                  assert( 0 );
1599    }
1600
1601    return ret;
1602}
1603
1604static void
1605sendKeepalive( tr_peermsgs * msgs )
1606{
1607    dbgmsg( msgs, "sending a keepalive message" );
1608    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1609    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1610}
1611
1612/**
1613***
1614**/
1615
1616static size_t
1617getUploadMax( const tr_peermsgs * msgs )
1618{
1619    static const size_t maxval = ~0;
1620    const tr_torrent * tor = msgs->torrent;
1621    int speedLeft;
1622    int bufLeft;
1623
1624    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1625        speedLeft = tor->handle->useUploadLimit ? tr_rcBytesLeft( tor->handle->upload ) : maxval;
1626    else if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
1627        speedLeft = tr_rcBytesLeft( tor->upload );
1628    else
1629        speedLeft = INT_MAX;
1630
1631    /* this basically says the outbuf shouldn't have more than one block
1632     * queued up in it... blocksize, +13 for the size of the BT protocol's
1633     * block message overhead */
1634    bufLeft = tor->blockSize + 13 - tr_peerIoWriteBytesWaiting( msgs->io );
1635    return MIN( speedLeft, bufLeft );
1636}
1637
1638static int
1639getMaxBlocksFromPeerSoon( const tr_peermsgs * peer )
1640{
1641    const double seconds = 30;
1642    const double bytesPerSecond = peer->info->rateToClient * 1024;
1643    const double totalBytes = bytesPerSecond * seconds;
1644    const int blockCount = totalBytes / peer->torrent->blockSize;
1645    /*fprintf( stderr, "rate %f -- blockCount %d\n", peer->info->rateToClient, blockCount );*/
1646    return blockCount;
1647}
1648
1649static int
1650ratePulse( void * vpeer )
1651{
1652    tr_peermsgs * peer = vpeer;
1653    peer->info->rateToClient = tr_rcRate( peer->info->rcToClient );
1654    peer->info->rateToPeer = tr_rcRate( peer->info->rcToPeer );
1655    peer->minActiveRequests = 4;
1656    peer->maxActiveRequests = peer->minActiveRequests + getMaxBlocksFromPeerSoon( peer );
1657    return TRUE;
1658}
1659
1660static tr_errno
1661popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
1662{
1663    if( !reqListPop( &msgs->peerAskedForFast, setme ) )
1664        return 0;
1665    if( !reqListPop( &msgs->peerAskedFor, setme ) )
1666        return 0;
1667
1668    return TR_ERROR;
1669}
1670
1671static int
1672peerPulse( void * vmsgs )
1673{
1674    const time_t now = time( NULL );
1675    tr_peermsgs * msgs = vmsgs;
1676
1677    tr_peerIoTryRead( msgs->io );
1678    pumpRequestQueue( msgs );
1679    expireOldRequests( msgs );
1680
1681    if( msgs->sendingBlock )
1682    {
1683        const size_t uploadMax = getUploadMax( msgs );
1684        size_t len = EVBUFFER_LENGTH( msgs->outBlock );
1685        const size_t outlen = MIN( len, uploadMax );
1686
1687        assert( len );
1688
1689        if( outlen )
1690        {
1691            tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen );
1692            evbuffer_drain( msgs->outBlock, outlen );
1693            peerGotBytes( msgs, outlen );
1694
1695            len -= outlen;
1696            msgs->clientSentAnythingAt = now;
1697            msgs->sendingBlock = len != 0;
1698
1699            dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen, (int)len );
1700        }
1701        else dbgmsg( msgs, "stalled writing block... uploadMax %lu, outlen %lu", uploadMax, outlen );
1702    }
1703
1704    if( !msgs->sendingBlock )
1705    {
1706        struct peer_request req;
1707        int haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1708
1709        if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1710        {
1711            dbgmsg( msgs, "started an outMessages batch (length is %d)", (int)EVBUFFER_LENGTH( msgs->outMessages ) );
1712            msgs->outMessagesBatchedAt = now;
1713        }
1714        else if( haveMessages
1715                 && ( ( now - msgs->outMessagesBatchedAt ) > msgs->outMessagesBatchPeriod ) )
1716        {
1717            dbgmsg( msgs, "flushing outMessages... (length is %d)", (int)EVBUFFER_LENGTH( msgs->outMessages ) );
1718            tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1719            msgs->clientSentAnythingAt = now;
1720            msgs->outMessagesBatchedAt = 0;
1721            msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1722        }
1723        else if( !EVBUFFER_LENGTH( msgs->outBlock )
1724            && !popNextRequest( msgs, &req )
1725            && requestIsValid( msgs, &req )
1726            && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) )
1727        {
1728            uint8_t * buf = tr_new( uint8_t, req.length );
1729
1730            if( !tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf ) )
1731            {
1732                tr_peerIo * io = msgs->io;
1733                struct evbuffer * out = msgs->outBlock;
1734
1735                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1736                tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 2*sizeof(uint32_t) + req.length );
1737                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1738                tr_peerIoWriteUint32( io, out, req.index );
1739                tr_peerIoWriteUint32( io, out, req.offset );
1740                tr_peerIoWriteBytes ( io, out, buf, req.length );
1741                msgs->sendingBlock = 1;
1742            }
1743
1744            tr_free( buf );
1745        }
1746        else if(    ( !haveMessages )
1747                 && ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
1748        {
1749            sendKeepalive( msgs );
1750        }
1751    }
1752
1753    return TRUE; /* loop forever */
1754}
1755
1756static void
1757gotError( struct bufferevent * evbuf UNUSED, short what, void * vmsgs )
1758{
1759    if( what & EVBUFFER_TIMEOUT )
1760        dbgmsg( vmsgs, "libevent got a timeout, what=%hd, secs=%d", what, evbuf->timeout_read );
1761    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1762        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1763                what, errno, tr_strerror(errno) );
1764    fireError( vmsgs, TR_ERROR );
1765}
1766
1767static void
1768sendBitfield( tr_peermsgs * msgs )
1769{
1770    struct evbuffer * out = msgs->outMessages;
1771    const tr_piece_index_t pieceCount = msgs->torrent->info.pieceCount;
1772    tr_bitfield * field;
1773    tr_piece_index_t lazyPieces[LAZY_PIECE_COUNT];
1774    int i;
1775    int lazyCount = 0;
1776
1777    field = tr_bitfieldDup( tr_cpPieceBitfield( msgs->torrent->completion ) );
1778
1779    if( tr_sessionIsLazyBitfieldEnabled( msgs->session ) )
1780    {
1781        const int maxLazyCount = MIN( LAZY_PIECE_COUNT, pieceCount );
1782
1783        while( lazyCount < maxLazyCount )
1784        {
1785            const size_t pos = tr_cryptoRandInt ( pieceCount );
1786            if( !tr_bitfieldHas( field, pos ) ) /* already removed it */
1787                continue;
1788            dbgmsg( msgs, "lazy bitfield #%d: piece %d of %d",
1789                    (int)(lazyCount+1), (int)pos, (int)pieceCount );
1790            tr_bitfieldRem( field, pos );
1791            lazyPieces[lazyCount++] = pos;
1792        }
1793    }
1794
1795    tr_peerIoWriteUint32( msgs->io, out, sizeof(uint8_t) + field->byteCount );
1796    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1797    tr_peerIoWriteBytes ( msgs->io, out, field->bits, field->byteCount );
1798    dbgmsg( msgs, "sending bitfield... outMessage size is now %d",
1799                  (int)EVBUFFER_LENGTH(out) );
1800    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1801
1802    for( i=0; i<lazyCount; ++i )
1803        protocolSendHave( msgs, lazyPieces[i] );
1804
1805    tr_bitfieldFree( field );
1806}
1807
1808/**
1809***
1810**/
1811
1812/* some peers give us error messages if we send
1813   more than this many peers in a single pex message
1814   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1815#define MAX_PEX_ADDED 50
1816#define MAX_PEX_DROPPED 50
1817
1818typedef struct
1819{
1820    tr_pex * added;
1821    tr_pex * dropped;
1822    tr_pex * elements;
1823    int addedCount;
1824    int droppedCount;
1825    int elementCount;
1826}
1827PexDiffs;
1828
1829static void
1830pexAddedCb( void * vpex, void * userData )
1831{
1832    PexDiffs * diffs = userData;
1833    tr_pex * pex = vpex;
1834    if( diffs->addedCount < MAX_PEX_ADDED )
1835    {
1836        diffs->added[diffs->addedCount++] = *pex;
1837        diffs->elements[diffs->elementCount++] = *pex;
1838    }
1839}
1840
1841static void
1842pexDroppedCb( void * vpex, void * userData )
1843{
1844    PexDiffs * diffs = userData;
1845    tr_pex * pex = vpex;
1846    if( diffs->droppedCount < MAX_PEX_DROPPED )
1847    {
1848        diffs->dropped[diffs->droppedCount++] = *pex;
1849    }
1850}
1851
1852static void
1853pexElementCb( void * vpex, void * userData )
1854{
1855    PexDiffs * diffs = userData;
1856    tr_pex * pex = vpex;
1857    diffs->elements[diffs->elementCount++] = *pex;
1858}
1859
1860static void
1861sendPex( tr_peermsgs * msgs )
1862{
1863    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
1864    {
1865        int i;
1866        tr_pex * newPex = NULL;
1867        const int newCount = tr_peerMgrGetPeers( msgs->session->peerMgr, msgs->torrent->info.hash, &newPex );
1868        PexDiffs diffs;
1869        tr_benc val;
1870        uint8_t *tmp, *walk;
1871        char * benc;
1872        int bencLen;
1873        struct evbuffer * out = msgs->outMessages;
1874
1875        /* build the diffs */
1876        diffs.added = tr_new( tr_pex, newCount );
1877        diffs.addedCount = 0;
1878        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1879        diffs.droppedCount = 0;
1880        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1881        diffs.elementCount = 0;
1882        tr_set_compare( msgs->pex, msgs->pexCount,
1883                        newPex, newCount,
1884                        tr_pexCompare, sizeof(tr_pex),
1885                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
1886        dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1887
1888        /* update peer */
1889        tr_free( msgs->pex );
1890        msgs->pex = diffs.elements;
1891        msgs->pexCount = diffs.elementCount;
1892
1893        /* build the pex payload */
1894        tr_bencInitDict( &val, 3 );
1895
1896        /* "added" */
1897        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1898        for( i=0; i<diffs.addedCount; ++i ) {
1899            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1900            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1901        }
1902        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1903        tr_bencDictAddRaw( &val, "added", tmp, walk-tmp );
1904        tr_free( tmp );
1905
1906        /* "added.f" */
1907        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1908        for( i=0; i<diffs.addedCount; ++i )
1909            *walk++ = diffs.added[i].flags;
1910        assert( ( walk - tmp ) == diffs.addedCount );
1911        tr_bencDictAddRaw( &val, "added.f", tmp, walk-tmp );
1912        tr_free( tmp );
1913
1914        /* "dropped" */
1915        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1916        for( i=0; i<diffs.droppedCount; ++i ) {
1917            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1918            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1919        }
1920        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1921        tr_bencDictAddRaw( &val, "dropped", tmp, walk-tmp );
1922        tr_free( tmp );
1923
1924        /* write the pex message */
1925        benc = tr_bencSave( &val, &bencLen );
1926        tr_peerIoWriteUint32( msgs->io, out, 2*sizeof(uint8_t) + bencLen );
1927        tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
1928        tr_peerIoWriteUint8 ( msgs->io, out, msgs->ut_pex_id );
1929        tr_peerIoWriteBytes ( msgs->io, out, benc, bencLen );
1930        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1931        dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH(out) );
1932
1933        /* cleanup */
1934        tr_free( benc );
1935        tr_bencFree( &val );
1936        tr_free( diffs.added );
1937        tr_free( diffs.dropped );
1938        tr_free( newPex );
1939
1940        msgs->clientSentPexAt = time( NULL );
1941    }
1942}
1943
1944static int
1945pexPulse( void * vpeer )
1946{
1947    sendPex( vpeer );
1948    return TRUE;
1949}
1950
1951/**
1952***
1953**/
1954
1955tr_peermsgs*
1956tr_peerMsgsNew( struct tr_torrent * torrent,
1957                struct tr_peer    * info,
1958                tr_delivery_func    func,
1959                void              * userData,
1960                tr_publisher_tag  * setme )
1961{
1962    tr_peermsgs * m;
1963
1964    assert( info );
1965    assert( info->io );
1966
1967    m = tr_new0( tr_peermsgs, 1 );
1968    m->publisher = tr_publisherNew( );
1969    m->info = info;
1970    m->session = torrent->handle;
1971    m->torrent = torrent;
1972    m->io = info->io;
1973    m->info->clientIsChoked = 1;
1974    m->info->peerIsChoked = 1;
1975    m->info->clientIsInterested = 0;
1976    m->info->peerIsInterested = 0;
1977    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1978    m->state = AWAITING_BT_LENGTH;
1979    m->pulseTimer = tr_timerNew( m->session, peerPulse, m, PEER_PULSE_INTERVAL );
1980    m->rateTimer = tr_timerNew( m->session, ratePulse, m, RATE_PULSE_INTERVAL );
1981    m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL );
1982    m->outMessages = evbuffer_new( );
1983    m->outMessagesBatchedAt = 0;
1984    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1985    m->incoming.block = evbuffer_new( );
1986    m->outBlock = evbuffer_new( );
1987    m->peerAllowedPieces = NULL;
1988    m->peerAskedFor = REQUEST_LIST_INIT;
1989    m->peerAskedForFast = REQUEST_LIST_INIT;
1990    m->clientAskedFor = REQUEST_LIST_INIT;
1991    m->clientWillAskFor = REQUEST_LIST_INIT;
1992    *setme = tr_publisherSubscribe( m->publisher, func, userData );
1993
1994    if ( tr_peerIoSupportsLTEP( m->io ) )
1995        sendLtepHandshake( m );
1996
1997    sendBitfield( m );
1998   
1999    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of inactivity */
2000    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
2001    ratePulse( m );
2002
2003    return m;
2004}
2005
2006void
2007tr_peerMsgsFree( tr_peermsgs* msgs )
2008{
2009    if( msgs )
2010    {
2011        tr_timerFree( &msgs->pulseTimer );
2012        tr_timerFree( &msgs->rateTimer );
2013        tr_timerFree( &msgs->pexTimer );
2014        tr_publisherFree( &msgs->publisher );
2015        reqListClear( &msgs->clientWillAskFor );
2016        reqListClear( &msgs->clientAskedFor );
2017        reqListClear( &msgs->peerAskedForFast );
2018        reqListClear( &msgs->peerAskedFor );
2019        tr_bitfieldFree( msgs->peerAllowedPieces );
2020        evbuffer_free( msgs->incoming.block );
2021        evbuffer_free( msgs->outMessages );
2022        evbuffer_free( msgs->outBlock );
2023        tr_free( msgs->pex );
2024
2025        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2026        tr_free( msgs );
2027    }
2028}
2029
2030void
2031tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
2032                        tr_publisher_tag    tag )
2033{
2034    tr_publisherUnsubscribe( peer->publisher, tag );
2035}
Note: See TracBrowser for help on using the repository browser.