source: trunk/libtransmission/peer-msgs.c @ 6652

Last change on this file since 6652 was 6652, checked in by charles, 13 years ago

(libT) maybe fix the hangs reported by users in the recent nightlies.

  • Property svn:keywords set to Date Rev Author Id
File size: 58.0 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 6652 2008-08-27 18:50:21Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19#include <libgen.h> /* basename */
20
21#include <event.h>
22
23#include "transmission.h"
24#include "bencode.h"
25#include "completion.h"
26#include "crypto.h"
27#include "inout.h"
28#include "peer-io.h"
29#include "peer-mgr.h"
30#include "peer-mgr-private.h"
31#include "peer-msgs.h"
32#include "ratecontrol.h"
33#include "stats.h"
34#include "torrent.h"
35#include "trevent.h"
36#include "utils.h"
37
38/**
39***
40**/
41
42enum
43{
44    BT_CHOKE                = 0,
45    BT_UNCHOKE              = 1,
46    BT_INTERESTED           = 2,
47    BT_NOT_INTERESTED       = 3,
48    BT_HAVE                 = 4,
49    BT_BITFIELD             = 5,
50    BT_REQUEST              = 6,
51    BT_PIECE                = 7,
52    BT_CANCEL               = 8,
53    BT_PORT                 = 9,
54    BT_SUGGEST              = 13,
55    BT_HAVE_ALL             = 14,
56    BT_HAVE_NONE            = 15,
57    BT_REJECT               = 16,
58    BT_ALLOWED_FAST         = 17,
59    BT_LTEP                 = 20,
60
61    LTEP_HANDSHAKE          = 0,
62
63    TR_LTEP_PEX             = 1,
64
65    MIN_CHOKE_PERIOD_SEC    = (10),
66
67    /* idle seconds before we send a keepalive */
68    KEEPALIVE_INTERVAL_SECS = 100,
69
70    PEX_INTERVAL            = (90 * 1000), /* msec between sendPex() calls */
71    PEER_PULSE_INTERVAL     = (250),       /* msec between peerPulse() calls */
72    RATE_PULSE_INTERVAL     = (250),       /* msec between ratePulse() calls */
73
74    MAX_QUEUE_SIZE          = (100),
75     
76    /* (fast peers) max number of pieces we fast-allow to another peer */
77    MAX_FAST_ALLOWED_COUNT   = 10,
78
79    /* (fast peers) max threshold for allowing fast-pieces requests */
80    MAX_FAST_ALLOWED_THRESHOLD = 10,
81
82    /* how long an unsent request can stay queued before it's returned
83       back to the peer-mgr's pool of requests */
84    QUEUED_REQUEST_TTL_SECS = 20,
85
86    /* how long a sent request can stay queued before it's returned
87       back to the peer-mgr's pool of requests */
88    SENT_REQUEST_TTL_SECS = 240,
89
90    /* used in lowering the outMessages queue period */
91    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
92    HIGH_PRIORITY_INTERVAL_SECS = 2,
93    LOW_PRIORITY_INTERVAL_SECS = 20,
94
95    /* number of pieces to remove from the bitfield when
96     * lazy bitfields are turned on */
97    LAZY_PIECE_COUNT = 26
98};
99
100/**
101***  REQUEST MANAGEMENT
102**/
103
104enum
105{
106    AWAITING_BT_LENGTH,
107    AWAITING_BT_ID,
108    AWAITING_BT_MESSAGE,
109    AWAITING_BT_PIECE
110};
111
112struct peer_request
113{
114    uint32_t index;
115    uint32_t offset;
116    uint32_t length;
117    time_t time_requested;
118};
119
120static int
121compareRequest( const void * va, const void * vb )
122{
123    const struct peer_request * a = va;
124    const struct peer_request * b = vb;
125
126    if( a->index != b->index )
127        return a->index < b->index ? -1 : 1;
128
129    if( a->offset != b->offset )
130        return a->offset < b->offset ? -1 : 1;
131
132    if( a->length != b->length )
133        return a->length < b->length ? -1 : 1;
134
135    return 0;
136}
137
138struct request_list
139{
140    uint16_t count;
141    uint16_t max;
142    struct peer_request * requests;
143};
144
145static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
146
147static void
148reqListReserve( struct request_list * list, uint16_t max )
149{
150    if( list->max < max )
151    {
152        list->max = max;
153        list->requests = tr_renew( struct peer_request,
154                                   list->requests,
155                                   list->max );
156    }
157}
158
159static void
160reqListClear( struct request_list * list )
161{
162    tr_free( list->requests );
163    *list = REQUEST_LIST_INIT;
164}
165
166static void
167reqListCopy( struct request_list * dest, const struct request_list * src )
168{
169    dest->count = dest->max = src->count;
170    dest->requests = tr_memdup( src->requests, dest->count * sizeof( struct peer_request ) );
171}
172
173static void
174reqListRemoveOne( struct request_list * list, int i )
175{
176    assert( 0<=i && i<list->count );
177
178    memmove( &list->requests[i],
179             &list->requests[i+1],
180             sizeof( struct peer_request ) * ( --list->count - i ) );
181}
182
183static void
184reqListAppend( struct request_list * list, const struct peer_request * req )
185{
186    if( ++list->count >= list->max )
187        reqListReserve( list, list->max + 8 );
188
189    list->requests[list->count-1] = *req;
190}
191
192static void
193reqListAppendPiece( const tr_torrent     * tor,
194                    struct request_list  * list,
195                    tr_piece_index_t       piece )
196{
197    const time_t now = time( NULL );
198    const size_t n = tr_torPieceCountBlocks( tor, piece );
199    const tr_block_index_t begin = tr_torPieceFirstBlock( tor, piece );
200    const tr_block_index_t end = begin + n;
201    tr_block_index_t i;
202
203    if( list->count + n >= list->max )
204        reqListReserve( list, list->max + n );
205
206    for( i=begin; i<end; ++i ) {
207        if( !tr_cpBlockIsComplete( tor->completion, i ) ) {
208            struct peer_request * req = list->requests + list->count++;
209            req->index = piece;
210            req->offset = (i * tor->blockSize) - (piece * tor->info.pieceSize);
211            req->length = tr_torBlockCountBytes( tor, i );
212            req->time_requested = now;
213            assert( tr_torrentReqIsValid( tor, req->index, req->offset, req->length ) );
214        }
215    }
216}
217
218static tr_errno
219reqListPop( struct request_list * list, struct peer_request * setme )
220{
221    tr_errno err;
222
223    if( !list->count )
224        err = TR_ERROR;
225    else {
226        *setme = list->requests[0];
227        reqListRemoveOne( list, 0 );
228        err = TR_OK;
229    }
230
231    return err;
232}
233
234static int
235reqListHasPiece( struct request_list * list, const tr_piece_index_t piece )
236{
237    uint16_t i;
238
239    for( i=0; i<list->count; ++i )
240        if( list->requests[i].index == piece )
241            return 1;
242
243    return 0;
244}
245
246static int
247reqListFind( struct request_list * list, const struct peer_request * key )
248{
249    uint16_t i;
250
251    for( i=0; i<list->count; ++i )
252        if( !compareRequest( key, list->requests+i ) )
253            return i;
254
255    return -1;
256}
257
258static tr_errno
259reqListRemove( struct request_list * list, const struct peer_request * key )
260{
261    tr_errno err;
262    const int i = reqListFind( list, key );
263
264    if( i < 0 )
265        err = TR_ERROR;
266    else {
267        err = TR_OK;
268        reqListRemoveOne( list, i );
269    }
270
271    return err;
272}
273
274/**
275***
276**/
277
278/* this is raw, unchanged data from the peer regarding
279 * the current message that it's sending us. */
280struct tr_incoming
281{
282    uint8_t id;
283    uint32_t length; /* includes the +1 for id length */
284    struct peer_request blockReq; /* metadata for incoming blocks */
285    struct evbuffer * block; /* piece data for incoming blocks */
286};
287
288struct tr_peermsgs
289{
290    unsigned int peerSentBitfield         : 1;
291    unsigned int peerSupportsPex          : 1;
292    unsigned int clientSentLtepHandshake  : 1;
293    unsigned int peerSentLtepHandshake    : 1;
294    unsigned int sendingBlock             : 1;
295   
296    uint8_t state;
297    uint8_t ut_pex_id;
298    uint16_t pexCount;
299    uint16_t minActiveRequests;
300    uint16_t maxActiveRequests;
301
302    /* how long the outMessages batch should be allowed to grow before
303     * it's flushed -- some messages (like requests >:) should be sent
304     * very quickly; others aren't as urgent. */
305    int outMessagesBatchPeriod;
306
307    tr_peer * info;
308
309    tr_session * session;
310    tr_torrent * torrent;
311    tr_peerIo * io;
312
313    tr_publisher_t * publisher;
314
315    struct evbuffer * outBlock;    /* buffer of the current piece message */
316    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
317
318    struct request_list peerAskedFor;
319    struct request_list peerAskedForFast;
320    struct request_list clientAskedFor;
321    struct request_list clientWillAskFor;
322
323    tr_timer * rateTimer;
324    tr_timer * pulseTimer;
325    tr_timer * pexTimer;
326
327    time_t clientSentPexAt;
328    time_t clientSentAnythingAt;
329
330    /* when we started batching the outMessages */
331    time_t outMessagesBatchedAt;
332   
333    tr_bitfield * peerAllowedPieces;
334
335    struct tr_incoming incoming; 
336
337    tr_pex * pex;
338};
339
340/**
341***
342**/
343
344static void
345myDebug( const char * file, int line,
346         const struct tr_peermsgs * msgs,
347         const char * fmt, ... )
348{
349    FILE * fp = tr_getLog( );
350    if( fp )
351    {
352        va_list args;
353        char timestr[64];
354        struct evbuffer * buf = evbuffer_new( );
355        char * myfile = tr_strdup( file );
356
357        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
358                             tr_getLogTimeStr( timestr, sizeof(timestr) ),
359                             msgs->torrent->info.name,
360                             tr_peerIoGetAddrStr( msgs->io ),
361                             msgs->info->client );
362        va_start( args, fmt );
363        evbuffer_add_vprintf( buf, fmt, args );
364        va_end( args );
365        evbuffer_add_printf( buf, " (%s:%d)\n", basename(myfile), line );
366        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
367
368        tr_free( myfile );
369        evbuffer_free( buf );
370    }
371}
372
373#define dbgmsg(msgs, fmt...) myDebug(__FILE__, __LINE__, msgs, ##fmt )
374
375/**
376***
377**/
378
379static void
380pokeBatchPeriod( tr_peermsgs * msgs, int interval )
381{
382    if( msgs->outMessagesBatchPeriod > interval )
383    {
384        msgs->outMessagesBatchPeriod = interval;
385        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
386    }
387}
388
389static void
390protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
391{
392    tr_peerIo * io = msgs->io;
393    struct evbuffer * out = msgs->outMessages;
394
395    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
396    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
397    tr_peerIoWriteUint32( io, out, req->index );
398    tr_peerIoWriteUint32( io, out, req->offset );
399    tr_peerIoWriteUint32( io, out, req->length );
400    dbgmsg( msgs, "requesting %u:%u->%u... outMessage size is now %d",
401            req->index, req->offset, req->length, (int)EVBUFFER_LENGTH(out) );
402    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
403}
404
405static void
406protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
407{
408    tr_peerIo * io = msgs->io;
409    struct evbuffer * out = msgs->outMessages;
410
411    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
412    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
413    tr_peerIoWriteUint32( io, out, req->index );
414    tr_peerIoWriteUint32( io, out, req->offset );
415    tr_peerIoWriteUint32( io, out, req->length );
416    dbgmsg( msgs, "cancelling %u:%u->%u... outMessage size is now %d",
417            req->index, req->offset, req->length, (int)EVBUFFER_LENGTH(out) );
418    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
419}
420
421static void
422protocolSendHave( tr_peermsgs * msgs, uint32_t index )
423{
424    tr_peerIo * io = msgs->io;
425    struct evbuffer * out = msgs->outMessages;
426
427    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
428    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
429    tr_peerIoWriteUint32( io, out, index );
430    dbgmsg( msgs, "sending Have %u.. outMessage size is now %d",
431            index, (int)EVBUFFER_LENGTH(out) );
432    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
433}
434
435static void
436protocolSendChoke( tr_peermsgs * msgs, int choke )
437{
438    tr_peerIo * io = msgs->io;
439    struct evbuffer * out = msgs->outMessages;
440
441    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) );
442    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
443    dbgmsg( msgs, "sending %s... outMessage size is now %d",
444                  (choke ? "Choke" : "Unchoke"),
445                  (int)EVBUFFER_LENGTH(out) );
446    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
447}
448
449/**
450***  EVENTS
451**/
452
453static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0 };
454
455static void
456publish( tr_peermsgs * msgs, tr_peer_event * e )
457{
458    tr_publisherPublish( msgs->publisher, msgs->info, e );
459}
460
461static void
462fireError( tr_peermsgs * msgs, tr_errno err )
463{
464    tr_peer_event e = blankEvent;
465    e.eventType = TR_PEER_ERROR;
466    e.err = err;
467    publish( msgs, &e );
468}
469
470static void
471fireNeedReq( tr_peermsgs * msgs )
472{
473    tr_peer_event e = blankEvent;
474    e.eventType = TR_PEER_NEED_REQ;
475    publish( msgs, &e );
476}
477
478static void
479firePeerProgress( tr_peermsgs * msgs )
480{
481    tr_peer_event e = blankEvent;
482    e.eventType = TR_PEER_PEER_PROGRESS;
483    e.progress = msgs->info->progress;
484    publish( msgs, &e );
485}
486
487static void
488fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
489{
490    tr_peer_event e = blankEvent;
491    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
492    e.pieceIndex = req->index;
493    e.offset = req->offset;
494    e.length = req->length;
495    publish( msgs, &e );
496}
497
498static void
499fireClientGotData( tr_peermsgs * msgs, uint32_t length )
500{
501    tr_peer_event e = blankEvent;
502    e.length = length;
503    e.eventType = TR_PEER_CLIENT_GOT_DATA;
504    publish( msgs, &e );
505}
506
507static void
508firePeerGotData( tr_peermsgs * msgs, uint32_t length )
509{
510    tr_peer_event e = blankEvent;
511    e.length = length;
512    e.eventType = TR_PEER_PEER_GOT_DATA;
513    publish( msgs, &e );
514}
515
516static void
517fireCancelledReq( tr_peermsgs * msgs, const tr_piece_index_t pieceIndex )
518{
519    tr_peer_event e = blankEvent;
520    e.eventType = TR_PEER_CANCEL;
521    e.pieceIndex = pieceIndex;
522    publish( msgs, &e );
523}
524
525/**
526***  INTEREST
527**/
528
529static int
530isPieceInteresting( const tr_peermsgs   * peer,
531                    tr_piece_index_t      piece )
532{
533    const tr_torrent * torrent = peer->torrent;
534
535    return ( ( !torrent->info.pieces[piece].dnd )               /* we want it */
536          && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have */
537          && ( tr_bitfieldHas( peer->info->have, piece ) ) );  /* peer has it */
538}
539
540/* "interested" means we'll ask for piece data if they unchoke us */
541static int
542isPeerInteresting( const tr_peermsgs * msgs )
543{
544    tr_piece_index_t i;
545    const tr_torrent * torrent;
546    const tr_bitfield * bitfield;
547    const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
548
549    if( clientIsSeed )
550        return FALSE;
551
552    torrent = msgs->torrent;
553    bitfield = tr_cpPieceBitfield( torrent->completion );
554
555    if( !msgs->info->have )
556        return TRUE;
557
558    assert( bitfield->byteCount == msgs->info->have->byteCount );
559
560    for( i=0; i<torrent->info.pieceCount; ++i )
561        if( isPieceInteresting( msgs, i ) )
562            return TRUE;
563
564    return FALSE;
565}
566
567static void
568sendInterest( tr_peermsgs * msgs, int weAreInterested )
569{
570    struct evbuffer * out = msgs->outMessages;
571
572    assert( msgs );
573    assert( weAreInterested==0 || weAreInterested==1 );
574
575    msgs->info->clientIsInterested = weAreInterested;
576    dbgmsg( msgs, "Sending %s", weAreInterested ? "Interested" : "Not Interested");
577    tr_peerIoWriteUint32( msgs->io, out, sizeof(uint8_t) );
578    tr_peerIoWriteUint8 ( msgs->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
579    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
580    dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH(out) );
581}
582
583static void
584updateInterest( tr_peermsgs * msgs )
585{
586    const int i = isPeerInteresting( msgs );
587    if( i != msgs->info->clientIsInterested )
588        sendInterest( msgs, i );
589    if( i )
590        fireNeedReq( msgs );
591}
592
593static void
594cancelAllRequestsToClientExceptFast( tr_peermsgs * msgs )
595{
596    reqListClear( &msgs->peerAskedFor );
597}
598
599void
600tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
601{
602    const time_t fibrillationTime = time(NULL) - MIN_CHOKE_PERIOD_SEC;
603
604    assert( msgs );
605    assert( msgs->info );
606    assert( choke==0 || choke==1 );
607
608    if( msgs->info->chokeChangedAt > fibrillationTime )
609    {
610        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
611    }
612    else if( msgs->info->peerIsChoked != choke )
613    {
614        msgs->info->peerIsChoked = choke;
615        if( choke )
616            cancelAllRequestsToClientExceptFast( msgs );
617        protocolSendChoke( msgs, choke );
618        msgs->info->chokeChangedAt = time( NULL );
619    }
620}
621
622/**
623***
624**/
625
626void
627tr_peerMsgsHave( tr_peermsgs * msgs,
628                 uint32_t      index )
629{
630    protocolSendHave( msgs, index );
631
632    /* since we have more pieces now, we might not be interested in this peer */
633    updateInterest( msgs );
634}
635#if 0
636static void
637sendFastSuggest( tr_peermsgs * msgs,
638                 uint32_t      pieceIndex )
639{
640    assert( msgs );
641   
642    if( tr_peerIoSupportsFEXT( msgs->io ) )
643    {
644        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
645        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
646        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
647    }
648}
649static void
650sendFastHave( tr_peermsgs * msgs, int all )
651{
652    assert( msgs );
653   
654    if( tr_peerIoSupportsFEXT( msgs->io ) )
655    {
656        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
657        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, ( all ? BT_HAVE_ALL
658                                                                : BT_HAVE_NONE ) );
659        updateInterest( msgs );
660    }
661}
662#endif
663
664static void
665sendFastReject( tr_peermsgs * msgs,
666                uint32_t      pieceIndex,
667                uint32_t      offset,
668                uint32_t      length )
669{
670    assert( msgs );
671
672    if( tr_peerIoSupportsFEXT( msgs->io ) )
673    {
674        struct evbuffer * out = msgs->outMessages;
675        const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
676        dbgmsg( msgs, "sending fast reject %u:%u->%u", pieceIndex, offset, length );
677        tr_peerIoWriteUint32( msgs->io, out, len );
678        tr_peerIoWriteUint8( msgs->io, out, BT_REJECT );
679        tr_peerIoWriteUint32( msgs->io, out, pieceIndex );
680        tr_peerIoWriteUint32( msgs->io, out, offset );
681        tr_peerIoWriteUint32( msgs->io, out, length );
682        pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
683        dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH(out) );
684    }
685}
686
687static tr_bitfield*
688getPeerAllowedPieces( tr_peermsgs * msgs )
689{
690    if( !msgs->peerAllowedPieces && tr_peerIoSupportsFEXT( msgs->io ) )
691    {
692        msgs->peerAllowedPieces = tr_peerMgrGenerateAllowedSet(
693            MAX_FAST_ALLOWED_COUNT,
694            msgs->torrent->info.pieceCount,
695            msgs->torrent->info.hash,
696            tr_peerIoGetAddress( msgs->io, NULL ) );
697    }
698
699    return msgs->peerAllowedPieces;
700}
701
702static void
703sendFastAllowed( tr_peermsgs * msgs,
704                 uint32_t      pieceIndex)
705{
706    assert( msgs );
707   
708    if( tr_peerIoSupportsFEXT( msgs->io ) )
709    {
710        struct evbuffer * out = msgs->outMessages;
711        dbgmsg( msgs, "sending fast allowed" );
712        tr_peerIoWriteUint32( msgs->io, out,  sizeof(uint8_t) + sizeof(uint32_t) );
713        tr_peerIoWriteUint8( msgs->io, out, BT_ALLOWED_FAST );
714        tr_peerIoWriteUint32( msgs->io, out, pieceIndex );
715        pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
716        dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH(out) );
717    }
718}
719
720static void
721sendFastAllowedSet( tr_peermsgs * msgs )
722{
723    tr_piece_index_t i = 0;
724
725    while (i <= msgs->torrent->info.pieceCount )
726    {
727        if ( tr_bitfieldHas( getPeerAllowedPieces( msgs ), i) )
728            sendFastAllowed( msgs, i );
729        i++;
730    }
731}
732
733static void
734maybeSendFastAllowedSet( tr_peermsgs * msgs )
735{
736    if( tr_bitfieldCountTrueBits( msgs->info->have ) <= MAX_FAST_ALLOWED_THRESHOLD )
737        sendFastAllowedSet( msgs );
738}
739
740
741/**
742***
743**/
744
745static int
746reqIsValid( const tr_peermsgs   * peer,
747            uint32_t              index,
748            uint32_t              offset,
749            uint32_t              length )
750{
751    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
752}
753
754static int
755requestIsValid( const tr_peermsgs * peer, const struct peer_request * req )
756{
757    return reqIsValid( peer, req->index, req->offset, req->length );
758}
759
760static void
761tr_peerMsgsCancel( tr_peermsgs * msgs,
762                   uint32_t      pieceIndex )
763{
764    uint16_t i;
765    struct request_list tmp = REQUEST_LIST_INIT;
766    struct request_list * src;
767
768    src = &msgs->clientWillAskFor;
769    for( i=0; i<src->count; ++i )
770        if( src->requests[i].index != pieceIndex )
771            reqListAppend( &tmp, src->requests + i );
772
773    /* swap */
774    reqListClear( &msgs->clientWillAskFor );
775    msgs->clientWillAskFor = tmp;
776    tmp = REQUEST_LIST_INIT;
777
778    src = &msgs->clientAskedFor;
779    for( i=0; i<src->count; ++i )
780        if( src->requests[i].index == pieceIndex )
781            protocolSendCancel( msgs, src->requests + i );
782        else
783            reqListAppend( &tmp, src->requests + i );
784
785    /* swap */
786    reqListClear( &msgs->clientAskedFor );
787    msgs->clientAskedFor = tmp;
788    tmp = REQUEST_LIST_INIT;
789
790    fireCancelledReq( msgs, pieceIndex );
791}
792
793static void
794expireOldRequests( tr_peermsgs * msgs )
795{
796    int i;
797    time_t oldestAllowed;
798    struct request_list tmp = REQUEST_LIST_INIT;
799
800    /* cancel requests that have been queued for too long */
801    oldestAllowed = time( NULL ) - QUEUED_REQUEST_TTL_SECS;
802    reqListCopy( &tmp, &msgs->clientWillAskFor );
803    for( i=0; i<tmp.count; ++i ) {
804        const struct peer_request * req = &tmp.requests[i];
805        if( req->time_requested < oldestAllowed )
806            tr_peerMsgsCancel( msgs, req->index );
807    }
808    reqListClear( &tmp );
809
810    /* cancel requests that were sent too long ago */
811    oldestAllowed = time( NULL ) - SENT_REQUEST_TTL_SECS;
812    reqListCopy( &tmp, &msgs->clientAskedFor );
813    for( i=0; i<tmp.count; ++i ) {
814        const struct peer_request * req = &tmp.requests[i];
815        if( req->time_requested < oldestAllowed )
816            tr_peerMsgsCancel( msgs, req->index );
817    }
818    reqListClear( &tmp );
819}
820
821static void
822pumpRequestQueue( tr_peermsgs * msgs )
823{
824    const int max = msgs->maxActiveRequests;
825    const int min = msgs->minActiveRequests;
826    const time_t now = time( NULL );
827    int sent = 0;
828    int count = msgs->clientAskedFor.count;
829    struct peer_request req;
830
831    if( count > min )
832        return;
833    if( msgs->info->clientIsChoked )
834        return;
835
836    while( ( count < max ) && !reqListPop( &msgs->clientWillAskFor, &req ) )
837    {
838        assert( requestIsValid( msgs, &req ) );
839        assert( tr_bitfieldHas( msgs->info->have, req.index ) );
840
841        protocolSendRequest( msgs, &req );
842        req.time_requested = now;
843        reqListAppend( &msgs->clientAskedFor, &req );
844
845        ++count;
846        ++sent;
847    }
848
849    if( sent )
850        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
851                sent,
852                msgs->clientAskedFor.count,
853                msgs->clientWillAskFor.count );
854
855    if( count < max )
856        fireNeedReq( msgs );
857}
858
859static int
860peerPulse( void * vmsgs );
861
862static int
863requestQueueIsFull( const tr_peermsgs * msgs )
864{
865    const int req_max = msgs->maxActiveRequests;
866    return msgs->clientWillAskFor.count >= req_max;
867}
868
869tr_addreq_t
870tr_peerMsgsAddRequest( tr_peermsgs       * msgs,
871                       tr_piece_index_t    piece )
872{
873    struct peer_request req;
874
875    assert( msgs );
876    assert( msgs->torrent );
877    assert( piece < msgs->torrent->info.pieceCount );
878
879    /**
880    ***  Reasons to decline the request
881    **/
882
883    /* don't send requests to choked clients */
884    if( msgs->info->clientIsChoked ) {
885        dbgmsg( msgs, "declining request because they're choking us" );
886        return TR_ADDREQ_CLIENT_CHOKED;
887    }
888
889    /* peer doesn't have this piece */
890    if( !tr_bitfieldHas( msgs->info->have, piece ) )
891        return TR_ADDREQ_MISSING;
892
893    /* peer's queue is full */
894    if( requestQueueIsFull( msgs ) ) {
895        dbgmsg( msgs, "declining request because we're full" );
896        return TR_ADDREQ_FULL;
897    }
898
899    /* have we already asked for this piece? */
900    if( reqListHasPiece( &msgs->clientAskedFor, piece ) || 
901        reqListHasPiece( &msgs->clientWillAskFor, piece ) ) {
902        dbgmsg( msgs, "declining because it's a duplicate" );
903        return TR_ADDREQ_DUPLICATE;
904    }
905
906    /**
907    ***  Accept this request
908    **/
909
910    dbgmsg( msgs, "added req for piece %lu", (unsigned long)piece );
911    req.time_requested = time( NULL );
912    reqListAppendPiece( msgs->torrent, &msgs->clientWillAskFor, piece );
913    return TR_ADDREQ_OK;
914}
915
916static void
917cancelAllRequestsToPeer( tr_peermsgs * msgs )
918{
919    int i;
920    struct request_list a = msgs->clientWillAskFor;
921    struct request_list b = msgs->clientAskedFor;
922
923    msgs->clientAskedFor = REQUEST_LIST_INIT;
924    msgs->clientWillAskFor = REQUEST_LIST_INIT;
925
926    for( i=0; i<a.count; ++i )
927        fireCancelledReq( msgs, a.requests[i].index );
928
929    for( i=0; i<b.count; ++i ) {
930        fireCancelledReq( msgs, b.requests[i].index );
931        protocolSendCancel( msgs, &b.requests[i] );
932    }
933
934    reqListClear( &a );
935    reqListClear( &b );
936}
937
938/**
939***
940**/
941
942static void
943sendLtepHandshake( tr_peermsgs * msgs )
944{
945    tr_benc val, *m;
946    char * buf;
947    int len;
948    int pex;
949    struct evbuffer * out = msgs->outMessages;
950
951    if( msgs->clientSentLtepHandshake )
952        return;
953
954    dbgmsg( msgs, "sending an ltep handshake" );
955    msgs->clientSentLtepHandshake = 1;
956
957    /* decide if we want to advertise pex support */
958    if( !tr_torrentAllowsPex( msgs->torrent ) )
959        pex = 0;
960    else if( msgs->peerSentLtepHandshake )
961        pex = msgs->peerSupportsPex ? 1 : 0;
962    else
963        pex = 1;
964
965    tr_bencInitDict( &val, 4 );
966    tr_bencDictAddInt( &val, "e", msgs->session->encryptionMode != TR_CLEAR_PREFERRED );
967    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->session ) );
968    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
969    m  = tr_bencDictAddDict( &val, "m", 1 );
970    if( pex )
971        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
972    buf = tr_bencSave( &val, &len );
973
974    tr_peerIoWriteUint32( msgs->io, out, 2*sizeof(uint8_t) + len );
975    tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
976    tr_peerIoWriteUint8 ( msgs->io, out, LTEP_HANDSHAKE );
977    tr_peerIoWriteBytes ( msgs->io, out, buf, len );
978    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
979    dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH(out) );
980
981    /* cleanup */
982    tr_bencFree( &val );
983    tr_free( buf );
984}
985
986static void
987parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
988{
989    int64_t i;
990    tr_benc val, * sub;
991    uint8_t * tmp = tr_new( uint8_t, len );
992
993    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
994    msgs->peerSentLtepHandshake = 1;
995
996    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
997        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
998        tr_free( tmp );
999        return;
1000    }
1001
1002    dbgmsg( msgs, "here is the ltep handshake we got [%*.*s]", len, len, tmp );
1003
1004    /* does the peer prefer encrypted connections? */
1005    if( tr_bencDictFindInt( &val, "e", &i ) )
1006        msgs->info->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1007                                              : ENCRYPTION_PREFERENCE_NO;
1008
1009    /* check supported messages for utorrent pex */
1010    msgs->peerSupportsPex = 0;
1011    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
1012        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
1013            msgs->ut_pex_id = (uint8_t) i;
1014            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
1015            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
1016        }
1017    }
1018
1019    /* get peer's listening port */
1020    if( tr_bencDictFindInt( &val, "p", &i ) ) {
1021        msgs->info->port = htons( (uint16_t)i );
1022        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
1023    }
1024
1025    tr_bencFree( &val );
1026    tr_free( tmp );
1027}
1028
1029static void
1030parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1031{
1032    int loaded = 0;
1033    uint8_t * tmp = tr_new( uint8_t, msglen );
1034    tr_benc val;
1035    const tr_torrent * tor = msgs->torrent;
1036    const uint8_t * added;
1037    size_t added_len;
1038
1039    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
1040
1041    if( tr_torrentAllowsPex( tor )
1042        && (( loaded = !tr_bencLoad( tmp, msglen, &val, NULL )))
1043        && tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1044    {
1045        const uint8_t * added_f = NULL;
1046        tr_pex * pex;
1047        size_t i, n;
1048        size_t added_f_len = 0;
1049        tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1050        pex = tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len, &n );
1051        for( i=0; i<n; ++i )
1052            tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1053                              TR_PEER_FROM_PEX, pex+i );
1054        tr_free( pex );
1055    }
1056
1057    if( loaded )
1058        tr_bencFree( &val );
1059    tr_free( tmp );
1060}
1061
1062static void
1063sendPex( tr_peermsgs * msgs );
1064
1065static void
1066parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1067{
1068    uint8_t ltep_msgid;
1069
1070    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
1071    msglen--;
1072
1073    if( ltep_msgid == LTEP_HANDSHAKE )
1074    {
1075        dbgmsg( msgs, "got ltep handshake" );
1076        parseLtepHandshake( msgs, msglen, inbuf );
1077        if( tr_peerIoSupportsLTEP( msgs->io ) )
1078        {
1079            sendLtepHandshake( msgs );
1080            sendPex( msgs );
1081        }
1082    }
1083    else if( ltep_msgid == TR_LTEP_PEX )
1084    {
1085        dbgmsg( msgs, "got ut pex" );
1086        msgs->peerSupportsPex = 1;
1087        parseUtPex( msgs, msglen, inbuf );
1088    }
1089    else
1090    {
1091        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1092        evbuffer_drain( inbuf, msglen );
1093    }
1094}
1095
1096static int
1097readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1098{
1099    uint32_t len;
1100
1101    if( inlen < sizeof(len) )
1102        return READ_MORE;
1103
1104    tr_peerIoReadUint32( msgs->io, inbuf, &len );
1105
1106    if( len == 0 ) /* peer sent us a keepalive message */
1107        dbgmsg( msgs, "got KeepAlive" );
1108    else {
1109        msgs->incoming.length = len;
1110        msgs->state = AWAITING_BT_ID;
1111    }
1112
1113    return READ_AGAIN;
1114}
1115
1116static int
1117readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen );
1118
1119static int
1120readBtId( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1121{
1122    uint8_t id;
1123
1124    if( inlen < sizeof(uint8_t) )
1125        return READ_MORE;
1126
1127    tr_peerIoReadUint8( msgs->io, inbuf, &id );
1128    msgs->incoming.id = id;
1129
1130    if( id==BT_PIECE )
1131    {
1132        msgs->state = AWAITING_BT_PIECE;
1133        return READ_AGAIN;
1134    }
1135    else if( msgs->incoming.length != 1 )
1136    {
1137        msgs->state = AWAITING_BT_MESSAGE;
1138        return READ_AGAIN;
1139    }
1140    else return readBtMessage( msgs, inbuf, inlen-1 );
1141}
1142
1143static void
1144updatePeerProgress( tr_peermsgs * msgs )
1145{
1146    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have )
1147                         / (float)msgs->torrent->info.pieceCount;
1148    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
1149    updateInterest( msgs );
1150    firePeerProgress( msgs );
1151}
1152
1153static int
1154clientCanSendFastBlock( const tr_peermsgs * msgs UNUSED )
1155{
1156    /* don't send a fast piece if peer has MAX_FAST_ALLOWED_THRESHOLD pieces */
1157    if( tr_bitfieldCountTrueBits( msgs->info->have ) > MAX_FAST_ALLOWED_THRESHOLD )
1158        return FALSE;
1159   
1160    /* ...or if we don't have ourself enough pieces */
1161    if( tr_bitfieldCountTrueBits( tr_cpPieceBitfield( msgs->torrent->completion ) ) < MAX_FAST_ALLOWED_THRESHOLD )
1162        return FALSE;
1163
1164    /* Maybe a bandwidth limit ? */
1165    return TRUE;
1166}
1167
1168static void
1169peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
1170{
1171    const int reqIsValid = requestIsValid( msgs, req );
1172    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
1173    const int peerIsChoked = msgs->info->peerIsChoked;
1174    const int peerIsFast = tr_peerIoSupportsFEXT( msgs->io );
1175    const int pieceIsFast = reqIsValid && tr_bitfieldHas( getPeerAllowedPieces( msgs ), req->index );
1176    const int canSendFast = clientCanSendFastBlock( msgs );
1177
1178    if( !reqIsValid ) /* bad request */
1179    {
1180        dbgmsg( msgs, "rejecting an invalid request." );
1181        sendFastReject( msgs, req->index, req->offset, req->length );
1182    }
1183    else if( !clientHasPiece ) /* we don't have it */
1184    {
1185        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1186        sendFastReject( msgs, req->index, req->offset, req->length );
1187    }
1188    else if( peerIsChoked && !peerIsFast ) /* doesn't he know he's choked? */
1189    {
1190        tr_peerMsgsSetChoke( msgs, 1 );
1191        sendFastReject( msgs, req->index, req->offset, req->length );
1192    }
1193    else if( peerIsChoked && peerIsFast && ( !pieceIsFast || !canSendFast ) )
1194    {
1195        sendFastReject( msgs, req->index, req->offset, req->length );
1196    }
1197    else /* YAY */
1198    {
1199        if( peerIsFast && pieceIsFast )
1200            reqListAppend( &msgs->peerAskedForFast, req );
1201        else
1202            reqListAppend( &msgs->peerAskedFor, req );
1203    }
1204}
1205
1206static int
1207messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1208{
1209    switch( id )
1210    {
1211        case BT_CHOKE:
1212        case BT_UNCHOKE:
1213        case BT_INTERESTED:
1214        case BT_NOT_INTERESTED:
1215        case BT_HAVE_ALL:
1216        case BT_HAVE_NONE:
1217            return len==1;
1218
1219        case BT_HAVE:
1220        case BT_SUGGEST:
1221        case BT_ALLOWED_FAST:
1222            return len==5;
1223
1224        case BT_BITFIELD:
1225            return len == (msg->torrent->info.pieceCount+7u)/8u + 1u;
1226       
1227        case BT_REQUEST:
1228        case BT_CANCEL:
1229        case BT_REJECT:
1230            return len==13;
1231
1232        case BT_PIECE:
1233            return len>9 && len<=16393;
1234
1235        case BT_PORT:
1236            return len==3;
1237
1238        case BT_LTEP:
1239            return len >= 2;
1240
1241        default:
1242            return FALSE;
1243    }
1244}
1245
1246static int
1247clientGotBlock( tr_peermsgs * msgs, const uint8_t * block, const struct peer_request * req );
1248
1249static void
1250clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1251{
1252    msgs->info->pieceDataActivityDate = time( NULL );
1253    tr_rcTransferred( msgs->info->rcToClient, byteCount );
1254    fireClientGotData( msgs, byteCount );
1255}
1256
1257static int
1258readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1259{
1260    struct peer_request * req = &msgs->incoming.blockReq;
1261    assert( EVBUFFER_LENGTH(inbuf) >= inlen );
1262    dbgmsg( msgs, "In readBtPiece" );
1263
1264    if( !req->length )
1265    {
1266        if( inlen < 8 )
1267            return READ_MORE;
1268
1269        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
1270        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
1271        req->length = msgs->incoming.length - 9;
1272        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1273        return READ_AGAIN;
1274    }
1275    else
1276    {
1277        int err;
1278
1279        /* read in another chunk of data */
1280        const size_t nLeft = req->length - EVBUFFER_LENGTH(msgs->incoming.block);
1281        size_t n = MIN( nLeft, inlen );
1282        uint8_t * buf = tr_new( uint8_t, n );
1283        assert( EVBUFFER_LENGTH(inbuf) >= n );
1284        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
1285        evbuffer_add( msgs->incoming.block, buf, n );
1286        clientGotBytes( msgs, n );
1287        tr_free( buf );
1288        dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
1289               (int)n, req->index, req->offset, req->length,
1290               (int)( req->length - EVBUFFER_LENGTH(msgs->incoming.block) ) );
1291        if( EVBUFFER_LENGTH(msgs->incoming.block) < req->length )
1292            return READ_MORE;
1293
1294        /* we've got the whole block ... process it */
1295        err = clientGotBlock( msgs, EVBUFFER_DATA(msgs->incoming.block), req );
1296
1297        /* cleanup */
1298        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH(msgs->incoming.block) );
1299        req->length = 0;
1300        msgs->state = AWAITING_BT_LENGTH;
1301        if( !err )
1302            return READ_AGAIN;
1303        else {
1304            fireError( msgs, err );
1305            return READ_DONE;
1306        }
1307    }
1308}
1309
1310static int
1311readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1312{
1313    uint32_t ui32;
1314    uint32_t msglen = msgs->incoming.length;
1315    const uint8_t id = msgs->incoming.id;
1316    const size_t startBufLen = EVBUFFER_LENGTH( inbuf );
1317
1318    --msglen; /* id length */
1319
1320    if( inlen < msglen )
1321        return READ_MORE;
1322
1323    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id, (int)msglen, (int)inlen );
1324
1325    if( !messageLengthIsCorrect( msgs, id, msglen+1 ) )
1326    {
1327        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1328        fireError( msgs, TR_ERROR );
1329        return READ_DONE;
1330    }
1331
1332    switch( id )
1333    {
1334        case BT_CHOKE:
1335            dbgmsg( msgs, "got Choke" );
1336            msgs->info->clientIsChoked = 1;
1337            cancelAllRequestsToPeer( msgs );
1338            cancelAllRequestsToClientExceptFast( msgs );
1339            break;
1340
1341        case BT_UNCHOKE:
1342            dbgmsg( msgs, "got Unchoke" );
1343            msgs->info->clientIsChoked = 0;
1344            fireNeedReq( msgs );
1345            break;
1346
1347        case BT_INTERESTED:
1348            dbgmsg( msgs, "got Interested" );
1349            msgs->info->peerIsInterested = 1;
1350            break;
1351
1352        case BT_NOT_INTERESTED:
1353            dbgmsg( msgs, "got Not Interested" );
1354            msgs->info->peerIsInterested = 0;
1355            break;
1356           
1357        case BT_HAVE:
1358            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1359            dbgmsg( msgs, "got Have: %u", ui32 );
1360            if( tr_bitfieldAdd( msgs->info->have, ui32 ) )
1361                fireError( msgs, TR_ERROR_PEER_MESSAGE );
1362            updatePeerProgress( msgs );
1363            tr_rcTransferred( msgs->torrent->swarmSpeed, msgs->torrent->info.pieceSize );
1364            break;
1365
1366        case BT_BITFIELD: {
1367            dbgmsg( msgs, "got a bitfield" );
1368            msgs->peerSentBitfield = 1;
1369            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
1370            updatePeerProgress( msgs );
1371            maybeSendFastAllowedSet( msgs );
1372            fireNeedReq( msgs );
1373            break;
1374        }
1375
1376        case BT_REQUEST: {
1377            struct peer_request r;
1378            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1379            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1380            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1381            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1382            peerMadeRequest( msgs, &r );
1383            break;
1384        }
1385
1386        case BT_CANCEL: {
1387            struct peer_request r;
1388            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1389            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1390            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1391            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1392            reqListRemove( &msgs->peerAskedForFast, &r );
1393            reqListRemove( &msgs->peerAskedFor, &r );
1394            break;
1395        }
1396
1397        case BT_PIECE:
1398            assert( 0 ); /* handled elsewhere! */
1399            break;
1400       
1401        case BT_PORT:
1402            dbgmsg( msgs, "Got a BT_PORT" );
1403            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1404            break;
1405           
1406        case BT_SUGGEST: {
1407            dbgmsg( msgs, "Got a BT_SUGGEST" );
1408            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1409            /* we don't do anything with this yet */
1410            break;
1411        }
1412           
1413        case BT_HAVE_ALL:
1414            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1415            tr_bitfieldAddRange( msgs->info->have, 0, msgs->torrent->info.pieceCount );
1416            updatePeerProgress( msgs );
1417            maybeSendFastAllowedSet( msgs );
1418            break;
1419       
1420       
1421        case BT_HAVE_NONE:
1422            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1423            tr_bitfieldClear( msgs->info->have );
1424            updatePeerProgress( msgs );
1425            maybeSendFastAllowedSet( msgs );
1426            break;
1427       
1428        case BT_REJECT: {
1429            struct peer_request r;
1430            dbgmsg( msgs, "Got a BT_REJECT" );
1431            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1432            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1433            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1434            reqListRemove( &msgs->clientAskedFor, &r );
1435            break;
1436        }
1437
1438        case BT_ALLOWED_FAST: {
1439            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1440            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1441            /* we don't do anything with this yet */
1442            break;
1443        }
1444
1445        case BT_LTEP:
1446            dbgmsg( msgs, "Got a BT_LTEP" );
1447            parseLtep( msgs, msglen, inbuf );
1448            break;
1449
1450        default:
1451            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1452            tr_peerIoDrain( msgs->io, inbuf, msglen );
1453            break;
1454    }
1455
1456    assert( msglen + 1 == msgs->incoming.length );
1457    assert( EVBUFFER_LENGTH(inbuf) == startBufLen - msglen );
1458
1459    msgs->state = AWAITING_BT_LENGTH;
1460    return READ_AGAIN;
1461}
1462
1463static void
1464peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1465{
1466    msgs->info->pieceDataActivityDate = time( NULL );
1467    tr_rcTransferred( msgs->info->rcToPeer, byteCount );
1468    firePeerGotData( msgs, byteCount );
1469}
1470
1471static size_t
1472getDownloadMax( const tr_peermsgs * msgs )
1473{
1474    static const size_t maxval = ~0;
1475    const tr_torrent * tor = msgs->torrent;
1476
1477    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1478        return tor->handle->useDownloadLimit
1479            ? tr_rcBytesLeft( tor->handle->download ) : maxval;
1480
1481    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
1482        return tr_rcBytesLeft( tor->download );
1483
1484    return maxval;
1485}
1486
1487static void
1488decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount )
1489{
1490    tr_torrent * tor = msgs->torrent;
1491    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1492}
1493
1494static void
1495clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1496{
1497    decrementDownloadedCount( msgs, req->length );
1498}
1499
1500static void
1501addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1502{
1503    if( !msgs->info->blame )
1504         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1505    tr_bitfieldAdd( msgs->info->blame, index );
1506}
1507
1508static tr_errno
1509clientGotBlock( tr_peermsgs                * msgs,
1510                const uint8_t              * data,
1511                const struct peer_request  * req )
1512{
1513    int err;
1514    tr_torrent * tor = msgs->torrent;
1515    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1516
1517    assert( msgs );
1518    assert( req );
1519
1520    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) )
1521    {
1522        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1523                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1524        return TR_ERROR;
1525    }
1526
1527    /* save the block */
1528    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1529
1530    /**
1531    *** Remove the block from our `we asked for this' list
1532    **/
1533
1534    if( reqListRemove( &msgs->clientAskedFor, req ) )
1535    {
1536        clientGotUnwantedBlock( msgs, req );
1537        dbgmsg( msgs, "we didn't ask for this message..." );
1538        return 0;
1539    }
1540
1541    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1542                  msgs->clientAskedFor.count );
1543
1544    /**
1545    *** Error checks
1546    **/
1547
1548    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1549        dbgmsg( msgs, "we have this block already..." );
1550        clientGotUnwantedBlock( msgs, req );
1551        return 0;
1552    }
1553
1554    /**
1555    ***  Save the block
1556    **/
1557
1558    msgs->info->peerSentPieceDataAt = time( NULL );
1559    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1560        return err;
1561
1562    addPeerToBlamefield( msgs, req->index );
1563    fireGotBlock( msgs, req );
1564    return 0;
1565}
1566
1567static void
1568didWrite( struct bufferevent * evin UNUSED, void * vmsgs )
1569{
1570    peerPulse( vmsgs );
1571}
1572
1573static ReadState
1574canRead( struct bufferevent * evin, void * vmsgs )
1575{
1576    ReadState ret;
1577    tr_peermsgs * msgs = vmsgs;
1578    struct evbuffer * in = EVBUFFER_INPUT ( evin );
1579    const size_t inlen = EVBUFFER_LENGTH( in );
1580
1581    if( !inlen )
1582    {
1583        ret = READ_DONE;
1584    }
1585    else if( msgs->state == AWAITING_BT_PIECE )
1586    {
1587        const size_t downloadMax = getDownloadMax( msgs );
1588        const size_t n = MIN( inlen, downloadMax );
1589        ret = n ? readBtPiece( msgs, in, n ) : READ_DONE;
1590    }
1591    else switch( msgs->state )
1592    {
1593        case AWAITING_BT_LENGTH:  ret = readBtLength ( msgs, in, inlen ); break;
1594        case AWAITING_BT_ID:      ret = readBtId     ( msgs, in, inlen ); break;
1595        case AWAITING_BT_MESSAGE: ret = readBtMessage( msgs, in, inlen ); break;
1596        default:                  assert( 0 );
1597    }
1598
1599    return ret;
1600}
1601
1602static void
1603sendKeepalive( tr_peermsgs * msgs )
1604{
1605    dbgmsg( msgs, "sending a keepalive message" );
1606    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1607    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1608}
1609
1610/**
1611***
1612**/
1613
1614static size_t
1615getUploadMax( const tr_peermsgs * msgs )
1616{
1617    static const size_t maxval = ~0;
1618    const tr_torrent * tor = msgs->torrent;
1619    int speedLeft;
1620    int bufLeft;
1621
1622    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1623        speedLeft = tor->handle->useUploadLimit ? tr_rcBytesLeft( tor->handle->upload ) : maxval;
1624    else if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
1625        speedLeft = tr_rcBytesLeft( tor->upload );
1626    else
1627        speedLeft = INT_MAX;
1628
1629    /* this basically says the outbuf shouldn't have more than one block
1630     * queued up in it... blocksize, +13 for the size of the BT protocol's
1631     * block message overhead */
1632    bufLeft = tor->blockSize + 13 - tr_peerIoWriteBytesWaiting( msgs->io );
1633    return MIN( speedLeft, bufLeft );
1634}
1635
1636static int
1637getMaxBlocksFromPeerSoon( const tr_peermsgs * peer )
1638{
1639    const double seconds = 30;
1640    const double bytesPerSecond = peer->info->rateToClient * 1024;
1641    const double totalBytes = bytesPerSecond * seconds;
1642    const int blockCount = totalBytes / peer->torrent->blockSize;
1643    /*fprintf( stderr, "rate %f -- blockCount %d\n", peer->info->rateToClient, blockCount );*/
1644    return blockCount;
1645}
1646
1647static int
1648ratePulse( void * vpeer )
1649{
1650    tr_peermsgs * peer = vpeer;
1651    peer->info->rateToClient = tr_rcRate( peer->info->rcToClient );
1652    peer->info->rateToPeer = tr_rcRate( peer->info->rcToPeer );
1653    peer->minActiveRequests = 4;
1654    peer->maxActiveRequests = peer->minActiveRequests + getMaxBlocksFromPeerSoon( peer );
1655    return TRUE;
1656}
1657
1658static tr_errno
1659popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
1660{
1661    if( !reqListPop( &msgs->peerAskedForFast, setme ) )
1662        return 0;
1663    if( !reqListPop( &msgs->peerAskedFor, setme ) )
1664        return 0;
1665
1666    return TR_ERROR;
1667}
1668
1669static int
1670peerPulse( void * vmsgs )
1671{
1672    const time_t now = time( NULL );
1673    tr_peermsgs * msgs = vmsgs;
1674
1675    tr_peerIoTryRead( msgs->io );
1676    pumpRequestQueue( msgs );
1677    expireOldRequests( msgs );
1678
1679    if( msgs->sendingBlock )
1680    {
1681        const size_t uploadMax = getUploadMax( msgs );
1682        size_t len = EVBUFFER_LENGTH( msgs->outBlock );
1683        const size_t outlen = MIN( len, uploadMax );
1684
1685        assert( len );
1686
1687        if( outlen )
1688        {
1689            tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen );
1690            evbuffer_drain( msgs->outBlock, outlen );
1691            peerGotBytes( msgs, outlen );
1692
1693            len -= outlen;
1694            msgs->clientSentAnythingAt = now;
1695            msgs->sendingBlock = len != 0;
1696
1697            dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen, (int)len );
1698        }
1699        else dbgmsg( msgs, "stalled writing block... uploadMax %lu, outlen %lu", uploadMax, outlen );
1700    }
1701
1702    if( !msgs->sendingBlock )
1703    {
1704        struct peer_request req;
1705        int haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1706
1707        if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1708        {
1709            dbgmsg( msgs, "started an outMessages batch (length is %d)", (int)EVBUFFER_LENGTH( msgs->outMessages ) );
1710            msgs->outMessagesBatchedAt = now;
1711        }
1712        else if( haveMessages
1713                 && ( ( now - msgs->outMessagesBatchedAt ) > msgs->outMessagesBatchPeriod ) )
1714        {
1715            dbgmsg( msgs, "flushing outMessages... (length is %d)", (int)EVBUFFER_LENGTH( msgs->outMessages ) );
1716            tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1717            msgs->clientSentAnythingAt = now;
1718            msgs->outMessagesBatchedAt = 0;
1719            msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1720        }
1721        else if( !EVBUFFER_LENGTH( msgs->outBlock )
1722            && !popNextRequest( msgs, &req )
1723            && requestIsValid( msgs, &req )
1724            && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) )
1725        {
1726            uint8_t * buf = tr_new( uint8_t, req.length );
1727
1728            if( !tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf ) )
1729            {
1730                tr_peerIo * io = msgs->io;
1731                struct evbuffer * out = msgs->outBlock;
1732
1733                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1734                tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 2*sizeof(uint32_t) + req.length );
1735                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1736                tr_peerIoWriteUint32( io, out, req.index );
1737                tr_peerIoWriteUint32( io, out, req.offset );
1738                tr_peerIoWriteBytes ( io, out, buf, req.length );
1739                msgs->sendingBlock = 1;
1740            }
1741
1742            tr_free( buf );
1743        }
1744        else if(    ( !haveMessages )
1745                 && ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
1746        {
1747            sendKeepalive( msgs );
1748        }
1749    }
1750
1751    return TRUE; /* loop forever */
1752}
1753
1754static void
1755gotError( struct bufferevent * evbuf UNUSED, short what, void * vmsgs )
1756{
1757    if( what & EVBUFFER_TIMEOUT )
1758        dbgmsg( vmsgs, "libevent got a timeout, what=%hd, secs=%d", what, evbuf->timeout_read );
1759    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1760        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1761                what, errno, tr_strerror(errno) );
1762    fireError( vmsgs, TR_ERROR );
1763}
1764
1765static void
1766sendBitfield( tr_peermsgs * msgs )
1767{
1768    struct evbuffer * out = msgs->outMessages;
1769    tr_bitfield * field;
1770    tr_piece_index_t lazyPieces[LAZY_PIECE_COUNT];
1771    size_t i;
1772    size_t lazyCount = 0;
1773
1774    field = tr_bitfieldDup( tr_cpPieceBitfield( msgs->torrent->completion ) );
1775
1776    if( tr_sessionIsLazyBitfieldEnabled( msgs->session ) )
1777    {
1778        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
1779            speed over a truly random sample -- let's limit the pool size to
1780            the first 1000 pieces so large torrents don't bog things down */
1781        size_t poolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
1782        tr_piece_index_t * pool = tr_new( tr_piece_index_t, poolSize );
1783
1784        /* build the pool */
1785        for( i=0; i<poolSize; ++i )
1786            pool[i] = i;
1787        poolSize = i;
1788
1789        /* pull random piece indices from the pool */
1790        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
1791        {
1792            const int pos = tr_cryptoWeakRandInt( poolSize );
1793            const tr_piece_index_t piece = pool[pos];
1794            tr_bitfieldRem( field, piece );
1795            lazyPieces[lazyCount++] = piece;
1796            pool[pos] = pool[--poolSize];
1797        }
1798
1799        /* cleanup */
1800        tr_free( pool );
1801    }
1802
1803    tr_peerIoWriteUint32( msgs->io, out, sizeof(uint8_t) + field->byteCount );
1804    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1805    tr_peerIoWriteBytes ( msgs->io, out, field->bits, field->byteCount );
1806    dbgmsg( msgs, "sending bitfield... outMessage size is now %d",
1807                  (int)EVBUFFER_LENGTH(out) );
1808    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1809
1810    for( i=0; i<lazyCount; ++i )
1811        protocolSendHave( msgs, lazyPieces[i] );
1812
1813    tr_bitfieldFree( field );
1814}
1815
1816/**
1817***
1818**/
1819
1820/* some peers give us error messages if we send
1821   more than this many peers in a single pex message
1822   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1823#define MAX_PEX_ADDED 50
1824#define MAX_PEX_DROPPED 50
1825
1826typedef struct
1827{
1828    tr_pex * added;
1829    tr_pex * dropped;
1830    tr_pex * elements;
1831    int addedCount;
1832    int droppedCount;
1833    int elementCount;
1834}
1835PexDiffs;
1836
1837static void
1838pexAddedCb( void * vpex, void * userData )
1839{
1840    PexDiffs * diffs = userData;
1841    tr_pex * pex = vpex;
1842    if( diffs->addedCount < MAX_PEX_ADDED )
1843    {
1844        diffs->added[diffs->addedCount++] = *pex;
1845        diffs->elements[diffs->elementCount++] = *pex;
1846    }
1847}
1848
1849static void
1850pexDroppedCb( void * vpex, void * userData )
1851{
1852    PexDiffs * diffs = userData;
1853    tr_pex * pex = vpex;
1854    if( diffs->droppedCount < MAX_PEX_DROPPED )
1855    {
1856        diffs->dropped[diffs->droppedCount++] = *pex;
1857    }
1858}
1859
1860static void
1861pexElementCb( void * vpex, void * userData )
1862{
1863    PexDiffs * diffs = userData;
1864    tr_pex * pex = vpex;
1865    diffs->elements[diffs->elementCount++] = *pex;
1866}
1867
1868static void
1869sendPex( tr_peermsgs * msgs )
1870{
1871    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
1872    {
1873        int i;
1874        tr_pex * newPex = NULL;
1875        const int newCount = tr_peerMgrGetPeers( msgs->session->peerMgr, msgs->torrent->info.hash, &newPex );
1876        PexDiffs diffs;
1877        tr_benc val;
1878        uint8_t *tmp, *walk;
1879        char * benc;
1880        int bencLen;
1881        struct evbuffer * out = msgs->outMessages;
1882
1883        /* build the diffs */
1884        diffs.added = tr_new( tr_pex, newCount );
1885        diffs.addedCount = 0;
1886        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1887        diffs.droppedCount = 0;
1888        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1889        diffs.elementCount = 0;
1890        tr_set_compare( msgs->pex, msgs->pexCount,
1891                        newPex, newCount,
1892                        tr_pexCompare, sizeof(tr_pex),
1893                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
1894        dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1895
1896        /* update peer */
1897        tr_free( msgs->pex );
1898        msgs->pex = diffs.elements;
1899        msgs->pexCount = diffs.elementCount;
1900
1901        /* build the pex payload */
1902        tr_bencInitDict( &val, 3 );
1903
1904        /* "added" */
1905        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1906        for( i=0; i<diffs.addedCount; ++i ) {
1907            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1908            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1909        }
1910        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1911        tr_bencDictAddRaw( &val, "added", tmp, walk-tmp );
1912        tr_free( tmp );
1913
1914        /* "added.f" */
1915        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1916        for( i=0; i<diffs.addedCount; ++i )
1917            *walk++ = diffs.added[i].flags;
1918        assert( ( walk - tmp ) == diffs.addedCount );
1919        tr_bencDictAddRaw( &val, "added.f", tmp, walk-tmp );
1920        tr_free( tmp );
1921
1922        /* "dropped" */
1923        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1924        for( i=0; i<diffs.droppedCount; ++i ) {
1925            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1926            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1927        }
1928        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1929        tr_bencDictAddRaw( &val, "dropped", tmp, walk-tmp );
1930        tr_free( tmp );
1931
1932        /* write the pex message */
1933        benc = tr_bencSave( &val, &bencLen );
1934        tr_peerIoWriteUint32( msgs->io, out, 2*sizeof(uint8_t) + bencLen );
1935        tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
1936        tr_peerIoWriteUint8 ( msgs->io, out, msgs->ut_pex_id );
1937        tr_peerIoWriteBytes ( msgs->io, out, benc, bencLen );
1938        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1939        dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH(out) );
1940
1941        /* cleanup */
1942        tr_free( benc );
1943        tr_bencFree( &val );
1944        tr_free( diffs.added );
1945        tr_free( diffs.dropped );
1946        tr_free( newPex );
1947
1948        msgs->clientSentPexAt = time( NULL );
1949    }
1950}
1951
1952static int
1953pexPulse( void * vpeer )
1954{
1955    sendPex( vpeer );
1956    return TRUE;
1957}
1958
1959/**
1960***
1961**/
1962
1963tr_peermsgs*
1964tr_peerMsgsNew( struct tr_torrent * torrent,
1965                struct tr_peer    * info,
1966                tr_delivery_func    func,
1967                void              * userData,
1968                tr_publisher_tag  * setme )
1969{
1970    tr_peermsgs * m;
1971
1972    assert( info );
1973    assert( info->io );
1974
1975    m = tr_new0( tr_peermsgs, 1 );
1976    m->publisher = tr_publisherNew( );
1977    m->info = info;
1978    m->session = torrent->handle;
1979    m->torrent = torrent;
1980    m->io = info->io;
1981    m->info->clientIsChoked = 1;
1982    m->info->peerIsChoked = 1;
1983    m->info->clientIsInterested = 0;
1984    m->info->peerIsInterested = 0;
1985    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1986    m->state = AWAITING_BT_LENGTH;
1987    m->pulseTimer = tr_timerNew( m->session, peerPulse, m, PEER_PULSE_INTERVAL );
1988    m->rateTimer = tr_timerNew( m->session, ratePulse, m, RATE_PULSE_INTERVAL );
1989    m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL );
1990    m->outMessages = evbuffer_new( );
1991    m->outMessagesBatchedAt = 0;
1992    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1993    m->incoming.block = evbuffer_new( );
1994    m->outBlock = evbuffer_new( );
1995    m->peerAllowedPieces = NULL;
1996    m->peerAskedFor = REQUEST_LIST_INIT;
1997    m->peerAskedForFast = REQUEST_LIST_INIT;
1998    m->clientAskedFor = REQUEST_LIST_INIT;
1999    m->clientWillAskFor = REQUEST_LIST_INIT;
2000    *setme = tr_publisherSubscribe( m->publisher, func, userData );
2001
2002    if ( tr_peerIoSupportsLTEP( m->io ) )
2003        sendLtepHandshake( m );
2004
2005    sendBitfield( m );
2006   
2007    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of inactivity */
2008    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
2009    ratePulse( m );
2010
2011    return m;
2012}
2013
2014void
2015tr_peerMsgsFree( tr_peermsgs* msgs )
2016{
2017    if( msgs )
2018    {
2019        tr_timerFree( &msgs->pulseTimer );
2020        tr_timerFree( &msgs->rateTimer );
2021        tr_timerFree( &msgs->pexTimer );
2022        tr_publisherFree( &msgs->publisher );
2023        reqListClear( &msgs->clientWillAskFor );
2024        reqListClear( &msgs->clientAskedFor );
2025        reqListClear( &msgs->peerAskedForFast );
2026        reqListClear( &msgs->peerAskedFor );
2027        tr_bitfieldFree( msgs->peerAllowedPieces );
2028        evbuffer_free( msgs->incoming.block );
2029        evbuffer_free( msgs->outMessages );
2030        evbuffer_free( msgs->outBlock );
2031        tr_free( msgs->pex );
2032
2033        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2034        tr_free( msgs );
2035    }
2036}
2037
2038void
2039tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
2040                        tr_publisher_tag    tag )
2041{
2042    tr_publisherUnsubscribe( peer->publisher, tag );
2043}
Note: See TracBrowser for help on using the repository browser.