source: trunk/libtransmission/peer-msgs.c @ 6600

Last change on this file since 6600 was 6600, checked in by charles, 13 years ago

make tr_bencDictFindType() private.

  • Property svn:keywords set to Date Rev Author Id
File size: 57.7 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 6600 2008-08-20 18:42:45Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19#include <libgen.h> /* basename */
20
21#include <event.h>
22
23#include "transmission.h"
24#include "bencode.h"
25#include "completion.h"
26#include "crypto.h"
27#include "inout.h"
28#include "peer-io.h"
29#include "peer-mgr.h"
30#include "peer-mgr-private.h"
31#include "peer-msgs.h"
32#include "ratecontrol.h"
33#include "stats.h"
34#include "torrent.h"
35#include "trevent.h"
36#include "utils.h"
37
38/**
39***
40**/
41
42enum
43{
44    BT_CHOKE                = 0,
45    BT_UNCHOKE              = 1,
46    BT_INTERESTED           = 2,
47    BT_NOT_INTERESTED       = 3,
48    BT_HAVE                 = 4,
49    BT_BITFIELD             = 5,
50    BT_REQUEST              = 6,
51    BT_PIECE                = 7,
52    BT_CANCEL               = 8,
53    BT_PORT                 = 9,
54    BT_SUGGEST              = 13,
55    BT_HAVE_ALL             = 14,
56    BT_HAVE_NONE            = 15,
57    BT_REJECT               = 16,
58    BT_ALLOWED_FAST         = 17,
59    BT_LTEP                 = 20,
60
61    LTEP_HANDSHAKE          = 0,
62
63    TR_LTEP_PEX             = 1,
64
65    MIN_CHOKE_PERIOD_SEC    = (10),
66
67    /* idle seconds before we send a keepalive */
68    KEEPALIVE_INTERVAL_SECS = 100,
69
70    PEX_INTERVAL            = (90 * 1000), /* msec between sendPex() calls */
71    PEER_PULSE_INTERVAL     = (50),        /* msec between peerPulse() calls */
72    RATE_PULSE_INTERVAL     = (250),       /* msec between ratePulse() calls */
73
74    MAX_QUEUE_SIZE          = (100),
75     
76    /* (fast peers) max number of pieces we fast-allow to another peer */
77    MAX_FAST_ALLOWED_COUNT   = 10,
78
79    /* (fast peers) max threshold for allowing fast-pieces requests */
80    MAX_FAST_ALLOWED_THRESHOLD = 10,
81
82    /* how long an unsent request can stay queued before it's returned
83       back to the peer-mgr's pool of requests */
84    QUEUED_REQUEST_TTL_SECS = 20,
85
86    /* how long a sent request can stay queued before it's returned
87       back to the peer-mgr's pool of requests */
88    SENT_REQUEST_TTL_SECS = 240,
89
90    /* used in lowering the outMessages queue period */
91    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
92    HIGH_PRIORITY_INTERVAL_SECS = 2,
93    LOW_PRIORITY_INTERVAL_SECS = 20,
94
95    /* number of pieces to remove from the bitfield when
96     * lazy bitfields are turned on */
97    LAZY_PIECE_COUNT = 26
98};
99
100/**
101***  REQUEST MANAGEMENT
102**/
103
104enum
105{
106    AWAITING_BT_LENGTH,
107    AWAITING_BT_ID,
108    AWAITING_BT_MESSAGE,
109    AWAITING_BT_PIECE
110};
111
112struct peer_request
113{
114    uint32_t index;
115    uint32_t offset;
116    uint32_t length;
117    time_t time_requested;
118};
119
120static int
121compareRequest( const void * va, const void * vb )
122{
123    int i;
124    const struct peer_request * a = va;
125    const struct peer_request * b = vb;
126    if(( i = tr_compareUint32( a->index, b->index ))) return i;
127    if(( i = tr_compareUint32( a->offset, b->offset ))) return i;
128    if(( i = tr_compareUint32( a->length, b->length ))) return i;
129    return 0;
130}
131
132struct request_list
133{
134    uint16_t count;
135    uint16_t max;
136    struct peer_request * requests;
137};
138
139static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
140
141static void
142reqListReserve( struct request_list * list, uint16_t max )
143{
144    if( list->max < max )
145    {
146        list->max = max;
147        list->requests = tr_renew( struct peer_request,
148                                   list->requests,
149                                   list->max );
150    }
151}
152
153static void
154reqListClear( struct request_list * list )
155{
156    tr_free( list->requests );
157    *list = REQUEST_LIST_INIT;
158}
159
160static void
161reqListCopy( struct request_list * dest, const struct request_list * src )
162{
163    dest->count = dest->max = src->count;
164    dest->requests = tr_memdup( src->requests, dest->count * sizeof( struct peer_request ) );
165}
166
167static void
168reqListRemoveOne( struct request_list * list, int i )
169{
170    assert( 0<=i && i<list->count );
171
172    memmove( &list->requests[i],
173             &list->requests[i+1],
174             sizeof( struct peer_request ) * ( --list->count - i ) );
175}
176
177static void
178reqListAppend( struct request_list * list, const struct peer_request * req )
179{
180    if( ++list->count >= list->max )
181        reqListReserve( list, list->max + 8 );
182
183    list->requests[list->count-1] = *req;
184}
185
186static void
187reqListAppendPiece( const tr_torrent     * tor,
188                    struct request_list  * list,
189                    tr_piece_index_t       piece )
190{
191    const time_t now = time( NULL );
192    const size_t n = tr_torPieceCountBlocks( tor, piece );
193    const tr_block_index_t begin = tr_torPieceFirstBlock( tor, piece );
194    const tr_block_index_t end = begin + n;
195    tr_block_index_t i;
196
197    if( list->count + n >= list->max )
198        reqListReserve( list, list->max + n );
199
200    for( i=begin; i<end; ++i ) {
201        if( !tr_cpBlockIsComplete( tor->completion, i ) ) {
202            struct peer_request * req = list->requests + list->count++;
203            req->index = piece;
204            req->offset = (i * tor->blockSize) - (piece * tor->info.pieceSize);
205            req->length = tr_torBlockCountBytes( tor, i );
206            req->time_requested = now;
207            assert( tr_torrentReqIsValid( tor, req->index, req->offset, req->length ) );
208        }
209    }
210}
211
212static tr_errno
213reqListPop( struct request_list * list, struct peer_request * setme )
214{
215    tr_errno err;
216
217    if( !list->count )
218        err = TR_ERROR;
219    else {
220        *setme = list->requests[0];
221        reqListRemoveOne( list, 0 );
222        err = TR_OK;
223    }
224
225    return err;
226}
227
228static int
229reqListHasPiece( struct request_list * list, const tr_piece_index_t piece )
230{
231    uint16_t i;
232
233    for( i=0; i<list->count; ++i )
234        if( list->requests[i].index == piece )
235            return 1;
236
237    return 0;
238}
239
240static int
241reqListFind( struct request_list * list, const struct peer_request * key )
242{
243    uint16_t i;
244
245    for( i=0; i<list->count; ++i )
246        if( !compareRequest( key, list->requests+i ) )
247            return i;
248
249    return -1;
250}
251
252static tr_errno
253reqListRemove( struct request_list * list, const struct peer_request * key )
254{
255    tr_errno err;
256    const int i = reqListFind( list, key );
257
258    if( i < 0 )
259        err = TR_ERROR;
260    else {
261        err = TR_OK;
262        reqListRemoveOne( list, i );
263    }
264
265    return err;
266}
267
268/**
269***
270**/
271
272/* this is raw, unchanged data from the peer regarding
273 * the current message that it's sending us. */
274struct tr_incoming
275{
276    uint8_t id;
277    uint32_t length; /* includes the +1 for id length */
278    struct peer_request blockReq; /* metadata for incoming blocks */
279    struct evbuffer * block; /* piece data for incoming blocks */
280};
281
282struct tr_peermsgs
283{
284    unsigned int peerSentBitfield         : 1;
285    unsigned int peerSupportsPex          : 1;
286    unsigned int clientSentLtepHandshake  : 1;
287    unsigned int peerSentLtepHandshake    : 1;
288    unsigned int sendingBlock             : 1;
289   
290    uint8_t state;
291    uint8_t ut_pex_id;
292    uint16_t pexCount;
293    uint16_t minActiveRequests;
294    uint16_t maxActiveRequests;
295
296    /* how long the outMessages batch should be allowed to grow before
297     * it's flushed -- some messages (like requests >:) should be sent
298     * very quickly; others aren't as urgent. */
299    int outMessagesBatchPeriod;
300
301    tr_peer * info;
302
303    tr_session * session;
304    tr_torrent * torrent;
305    tr_peerIo * io;
306
307    tr_publisher_t * publisher;
308
309    struct evbuffer * outBlock;    /* buffer of the current piece message */
310    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
311
312    struct request_list peerAskedFor;
313    struct request_list peerAskedForFast;
314    struct request_list clientAskedFor;
315    struct request_list clientWillAskFor;
316
317    tr_timer * rateTimer;
318    tr_timer * pulseTimer;
319    tr_timer * pexTimer;
320
321    time_t clientSentPexAt;
322    time_t clientSentAnythingAt;
323
324    /* when we started batching the outMessages */
325    time_t outMessagesBatchedAt;
326   
327    tr_bitfield * peerAllowedPieces;
328
329    struct tr_incoming incoming; 
330
331    tr_pex * pex;
332};
333
334/**
335***
336**/
337
338static void
339myDebug( const char * file, int line,
340         const struct tr_peermsgs * msgs,
341         const char * fmt, ... )
342{
343    FILE * fp = tr_getLog( );
344    if( fp )
345    {
346        va_list args;
347        char timestr[64];
348        struct evbuffer * buf = evbuffer_new( );
349        char * myfile = tr_strdup( file );
350
351        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
352                             tr_getLogTimeStr( timestr, sizeof(timestr) ),
353                             msgs->torrent->info.name,
354                             tr_peerIoGetAddrStr( msgs->io ),
355                             msgs->info->client );
356        va_start( args, fmt );
357        evbuffer_add_vprintf( buf, fmt, args );
358        va_end( args );
359        evbuffer_add_printf( buf, " (%s:%d)\n", basename(myfile), line );
360        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
361
362        tr_free( myfile );
363        evbuffer_free( buf );
364    }
365}
366
367#define dbgmsg(msgs, fmt...) myDebug(__FILE__, __LINE__, msgs, ##fmt )
368
369/**
370***
371**/
372
373static void
374pokeBatchPeriod( tr_peermsgs * msgs, int interval )
375{
376    if( msgs->outMessagesBatchPeriod > interval )
377    {
378        msgs->outMessagesBatchPeriod = interval;
379        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
380    }
381}
382
383static void
384protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
385{
386    tr_peerIo * io = msgs->io;
387    struct evbuffer * out = msgs->outMessages;
388
389    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
390    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
391    tr_peerIoWriteUint32( io, out, req->index );
392    tr_peerIoWriteUint32( io, out, req->offset );
393    tr_peerIoWriteUint32( io, out, req->length );
394    dbgmsg( msgs, "requesting %u:%u->%u... outMessage size is now %d",
395            req->index, req->offset, req->length, (int)EVBUFFER_LENGTH(out) );
396    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
397}
398
399static void
400protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
401{
402    tr_peerIo * io = msgs->io;
403    struct evbuffer * out = msgs->outMessages;
404
405    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
406    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
407    tr_peerIoWriteUint32( io, out, req->index );
408    tr_peerIoWriteUint32( io, out, req->offset );
409    tr_peerIoWriteUint32( io, out, req->length );
410    dbgmsg( msgs, "cancelling %u:%u->%u... outMessage size is now %d",
411            req->index, req->offset, req->length, (int)EVBUFFER_LENGTH(out) );
412    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
413}
414
415static void
416protocolSendHave( tr_peermsgs * msgs, uint32_t index )
417{
418    tr_peerIo * io = msgs->io;
419    struct evbuffer * out = msgs->outMessages;
420
421    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
422    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
423    tr_peerIoWriteUint32( io, out, index );
424    dbgmsg( msgs, "sending Have %u.. outMessage size is now %d",
425            index, (int)EVBUFFER_LENGTH(out) );
426    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
427}
428
429static void
430protocolSendChoke( tr_peermsgs * msgs, int choke )
431{
432    tr_peerIo * io = msgs->io;
433    struct evbuffer * out = msgs->outMessages;
434
435    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) );
436    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
437    dbgmsg( msgs, "sending %s... outMessage size is now %d",
438                  (choke ? "Choke" : "Unchoke"),
439                  (int)EVBUFFER_LENGTH(out) );
440    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
441}
442
443/**
444***  EVENTS
445**/
446
447static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0 };
448
449static void
450publish( tr_peermsgs * msgs, tr_peer_event * e )
451{
452    tr_publisherPublish( msgs->publisher, msgs->info, e );
453}
454
455static void
456fireError( tr_peermsgs * msgs, tr_errno err )
457{
458    tr_peer_event e = blankEvent;
459    e.eventType = TR_PEER_ERROR;
460    e.err = err;
461    publish( msgs, &e );
462}
463
464static void
465fireNeedReq( tr_peermsgs * msgs )
466{
467    tr_peer_event e = blankEvent;
468    e.eventType = TR_PEER_NEED_REQ;
469    publish( msgs, &e );
470}
471
472static void
473firePeerProgress( tr_peermsgs * msgs )
474{
475    tr_peer_event e = blankEvent;
476    e.eventType = TR_PEER_PEER_PROGRESS;
477    e.progress = msgs->info->progress;
478    publish( msgs, &e );
479}
480
481static void
482fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
483{
484    tr_peer_event e = blankEvent;
485    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
486    e.pieceIndex = req->index;
487    e.offset = req->offset;
488    e.length = req->length;
489    publish( msgs, &e );
490}
491
492static void
493fireClientGotData( tr_peermsgs * msgs, uint32_t length )
494{
495    tr_peer_event e = blankEvent;
496    e.length = length;
497    e.eventType = TR_PEER_CLIENT_GOT_DATA;
498    publish( msgs, &e );
499}
500
501static void
502firePeerGotData( tr_peermsgs * msgs, uint32_t length )
503{
504    tr_peer_event e = blankEvent;
505    e.length = length;
506    e.eventType = TR_PEER_PEER_GOT_DATA;
507    publish( msgs, &e );
508}
509
510static void
511fireCancelledReq( tr_peermsgs * msgs, const tr_piece_index_t pieceIndex )
512{
513    tr_peer_event e = blankEvent;
514    e.eventType = TR_PEER_CANCEL;
515    e.pieceIndex = pieceIndex;
516    publish( msgs, &e );
517}
518
519/**
520***  INTEREST
521**/
522
523static int
524isPieceInteresting( const tr_peermsgs   * peer,
525                    tr_piece_index_t      piece )
526{
527    const tr_torrent * torrent = peer->torrent;
528
529    return ( ( !torrent->info.pieces[piece].dnd )               /* we want it */
530          && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have */
531          && ( tr_bitfieldHas( peer->info->have, piece ) ) );  /* peer has it */
532}
533
534/* "interested" means we'll ask for piece data if they unchoke us */
535static int
536isPeerInteresting( const tr_peermsgs * msgs )
537{
538    tr_piece_index_t i;
539    const tr_torrent * torrent;
540    const tr_bitfield * bitfield;
541    const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
542
543    if( clientIsSeed )
544        return FALSE;
545
546    torrent = msgs->torrent;
547    bitfield = tr_cpPieceBitfield( torrent->completion );
548
549    if( !msgs->info->have )
550        return TRUE;
551
552    assert( bitfield->byteCount == msgs->info->have->byteCount );
553
554    for( i=0; i<torrent->info.pieceCount; ++i )
555        if( isPieceInteresting( msgs, i ) )
556            return TRUE;
557
558    return FALSE;
559}
560
561static void
562sendInterest( tr_peermsgs * msgs, int weAreInterested )
563{
564    struct evbuffer * out = msgs->outMessages;
565
566    assert( msgs );
567    assert( weAreInterested==0 || weAreInterested==1 );
568
569    msgs->info->clientIsInterested = weAreInterested;
570    dbgmsg( msgs, "Sending %s", weAreInterested ? "Interested" : "Not Interested");
571    tr_peerIoWriteUint32( msgs->io, out, sizeof(uint8_t) );
572    tr_peerIoWriteUint8 ( msgs->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
573    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
574    dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH(out) );
575}
576
577static void
578updateInterest( tr_peermsgs * msgs )
579{
580    const int i = isPeerInteresting( msgs );
581    if( i != msgs->info->clientIsInterested )
582        sendInterest( msgs, i );
583    if( i )
584        fireNeedReq( msgs );
585}
586
587static void
588cancelAllRequestsToClientExceptFast( tr_peermsgs * msgs )
589{
590    reqListClear( &msgs->peerAskedFor );
591}
592
593void
594tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
595{
596    const time_t fibrillationTime = time(NULL) - MIN_CHOKE_PERIOD_SEC;
597
598    assert( msgs );
599    assert( msgs->info );
600    assert( choke==0 || choke==1 );
601
602    if( msgs->info->chokeChangedAt > fibrillationTime )
603    {
604        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
605    }
606    else if( msgs->info->peerIsChoked != choke )
607    {
608        msgs->info->peerIsChoked = choke;
609        if( choke )
610            cancelAllRequestsToClientExceptFast( msgs );
611        protocolSendChoke( msgs, choke );
612        msgs->info->chokeChangedAt = time( NULL );
613    }
614}
615
616/**
617***
618**/
619
620void
621tr_peerMsgsHave( tr_peermsgs * msgs,
622                 uint32_t      index )
623{
624    protocolSendHave( msgs, index );
625
626    /* since we have more pieces now, we might not be interested in this peer */
627    updateInterest( msgs );
628}
629#if 0
630static void
631sendFastSuggest( tr_peermsgs * msgs,
632                 uint32_t      pieceIndex )
633{
634    assert( msgs );
635   
636    if( tr_peerIoSupportsFEXT( msgs->io ) )
637    {
638        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
639        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
640        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
641    }
642}
643static void
644sendFastHave( tr_peermsgs * msgs, int all )
645{
646    assert( msgs );
647   
648    if( tr_peerIoSupportsFEXT( msgs->io ) )
649    {
650        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
651        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, ( all ? BT_HAVE_ALL
652                                                                : BT_HAVE_NONE ) );
653        updateInterest( msgs );
654    }
655}
656#endif
657
658static void
659sendFastReject( tr_peermsgs * msgs,
660                uint32_t      pieceIndex,
661                uint32_t      offset,
662                uint32_t      length )
663{
664    assert( msgs );
665
666    if( tr_peerIoSupportsFEXT( msgs->io ) )
667    {
668        struct evbuffer * out = msgs->outMessages;
669        const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
670        dbgmsg( msgs, "sending fast reject %u:%u->%u", pieceIndex, offset, length );
671        tr_peerIoWriteUint32( msgs->io, out, len );
672        tr_peerIoWriteUint8( msgs->io, out, BT_REJECT );
673        tr_peerIoWriteUint32( msgs->io, out, pieceIndex );
674        tr_peerIoWriteUint32( msgs->io, out, offset );
675        tr_peerIoWriteUint32( msgs->io, out, length );
676        pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
677        dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH(out) );
678    }
679}
680
681static tr_bitfield*
682getPeerAllowedPieces( tr_peermsgs * msgs )
683{
684    if( !msgs->peerAllowedPieces && tr_peerIoSupportsFEXT( msgs->io ) )
685    {
686        msgs->peerAllowedPieces = tr_peerMgrGenerateAllowedSet(
687            MAX_FAST_ALLOWED_COUNT,
688            msgs->torrent->info.pieceCount,
689            msgs->torrent->info.hash,
690            tr_peerIoGetAddress( msgs->io, NULL ) );
691    }
692
693    return msgs->peerAllowedPieces;
694}
695
696static void
697sendFastAllowed( tr_peermsgs * msgs,
698                 uint32_t      pieceIndex)
699{
700    assert( msgs );
701   
702    if( tr_peerIoSupportsFEXT( msgs->io ) )
703    {
704        struct evbuffer * out = msgs->outMessages;
705        dbgmsg( msgs, "sending fast allowed" );
706        tr_peerIoWriteUint32( msgs->io, out,  sizeof(uint8_t) + sizeof(uint32_t) );
707        tr_peerIoWriteUint8( msgs->io, out, BT_ALLOWED_FAST );
708        tr_peerIoWriteUint32( msgs->io, out, pieceIndex );
709        pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
710        dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH(out) );
711    }
712}
713
714static void
715sendFastAllowedSet( tr_peermsgs * msgs )
716{
717    tr_piece_index_t i = 0;
718
719    while (i <= msgs->torrent->info.pieceCount )
720    {
721        if ( tr_bitfieldHas( getPeerAllowedPieces( msgs ), i) )
722            sendFastAllowed( msgs, i );
723        i++;
724    }
725}
726
727static void
728maybeSendFastAllowedSet( tr_peermsgs * msgs )
729{
730    if( tr_bitfieldCountTrueBits( msgs->info->have ) <= MAX_FAST_ALLOWED_THRESHOLD )
731        sendFastAllowedSet( msgs );
732}
733
734
735/**
736***
737**/
738
739static int
740reqIsValid( const tr_peermsgs   * peer,
741            uint32_t              index,
742            uint32_t              offset,
743            uint32_t              length )
744{
745    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
746}
747
748static int
749requestIsValid( const tr_peermsgs * peer, const struct peer_request * req )
750{
751    return reqIsValid( peer, req->index, req->offset, req->length );
752}
753
754static void
755tr_peerMsgsCancel( tr_peermsgs * msgs,
756                   uint32_t      pieceIndex )
757{
758    uint16_t i;
759    struct request_list tmp = REQUEST_LIST_INIT;
760    struct request_list * src;
761
762    src = &msgs->clientWillAskFor;
763    for( i=0; i<src->count; ++i )
764        if( src->requests[i].index != pieceIndex )
765            reqListAppend( &tmp, src->requests + i );
766
767    /* swap */
768    reqListClear( &msgs->clientWillAskFor );
769    msgs->clientWillAskFor = tmp;
770    tmp = REQUEST_LIST_INIT;
771
772    src = &msgs->clientAskedFor;
773    for( i=0; i<src->count; ++i )
774        if( src->requests[i].index == pieceIndex )
775            protocolSendCancel( msgs, src->requests + i );
776        else
777            reqListAppend( &tmp, src->requests + i );
778
779    /* swap */
780    reqListClear( &msgs->clientAskedFor );
781    msgs->clientAskedFor = tmp;
782    tmp = REQUEST_LIST_INIT;
783
784    fireCancelledReq( msgs, pieceIndex );
785}
786
787static void
788expireOldRequests( tr_peermsgs * msgs )
789{
790    int i;
791    time_t oldestAllowed;
792    struct request_list tmp = REQUEST_LIST_INIT;
793
794    /* cancel requests that have been queued for too long */
795    oldestAllowed = time( NULL ) - QUEUED_REQUEST_TTL_SECS;
796    reqListCopy( &tmp, &msgs->clientWillAskFor );
797    for( i=0; i<tmp.count; ++i ) {
798        const struct peer_request * req = &tmp.requests[i];
799        if( req->time_requested < oldestAllowed )
800            tr_peerMsgsCancel( msgs, req->index );
801    }
802    reqListClear( &tmp );
803
804    /* cancel requests that were sent too long ago */
805    oldestAllowed = time( NULL ) - SENT_REQUEST_TTL_SECS;
806    reqListCopy( &tmp, &msgs->clientAskedFor );
807    for( i=0; i<tmp.count; ++i ) {
808        const struct peer_request * req = &tmp.requests[i];
809        if( req->time_requested < oldestAllowed )
810            tr_peerMsgsCancel( msgs, req->index );
811    }
812    reqListClear( &tmp );
813}
814
815static void
816pumpRequestQueue( tr_peermsgs * msgs )
817{
818    const int max = msgs->maxActiveRequests;
819    const int min = msgs->minActiveRequests;
820    const time_t now = time( NULL );
821    int sent = 0;
822    int count = msgs->clientAskedFor.count;
823    struct peer_request req;
824
825    if( count > min )
826        return;
827    if( msgs->info->clientIsChoked )
828        return;
829
830    while( ( count < max ) && !reqListPop( &msgs->clientWillAskFor, &req ) )
831    {
832        assert( requestIsValid( msgs, &req ) );
833        assert( tr_bitfieldHas( msgs->info->have, req.index ) );
834
835        protocolSendRequest( msgs, &req );
836        req.time_requested = now;
837        reqListAppend( &msgs->clientAskedFor, &req );
838
839        ++count;
840        ++sent;
841    }
842
843    if( sent )
844        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
845                sent,
846                msgs->clientAskedFor.count,
847                msgs->clientWillAskFor.count );
848
849    if( count < max )
850        fireNeedReq( msgs );
851}
852
853static int
854peerPulse( void * vmsgs );
855
856static int
857requestQueueIsFull( const tr_peermsgs * msgs )
858{
859    const int req_max = msgs->maxActiveRequests;
860    return msgs->clientWillAskFor.count >= req_max;
861}
862
863int
864tr_peerMsgsAddRequest( tr_peermsgs       * msgs,
865                       tr_piece_index_t    piece )
866{
867    struct peer_request req;
868
869    assert( msgs );
870    assert( msgs->torrent );
871    assert( piece < msgs->torrent->info.pieceCount );
872
873    /**
874    ***  Reasons to decline the request
875    **/
876
877    /* don't send requests to choked clients */
878    if( msgs->info->clientIsChoked ) {
879        dbgmsg( msgs, "declining request because they're choking us" );
880        return TR_ADDREQ_CLIENT_CHOKED;
881    }
882
883    /* peer doesn't have this piece */
884    if( !tr_bitfieldHas( msgs->info->have, piece ) )
885        return TR_ADDREQ_MISSING;
886
887    /* peer's queue is full */
888    if( requestQueueIsFull( msgs ) ) {
889        dbgmsg( msgs, "declining request because we're full" );
890        return TR_ADDREQ_FULL;
891    }
892
893    /* have we already asked for this piece? */
894    if( reqListHasPiece( &msgs->clientAskedFor, piece ) || 
895        reqListHasPiece( &msgs->clientWillAskFor, piece ) ) {
896        dbgmsg( msgs, "declining because it's a duplicate" );
897        return TR_ADDREQ_DUPLICATE;
898    }
899
900    /**
901    ***  Accept this request
902    **/
903
904    dbgmsg( msgs, "added req for piece %lu", (unsigned long)piece );
905    req.time_requested = time( NULL );
906    reqListAppendPiece( msgs->torrent, &msgs->clientWillAskFor, piece );
907    return TR_ADDREQ_OK;
908}
909
910static void
911cancelAllRequestsToPeer( tr_peermsgs * msgs )
912{
913    int i;
914    struct request_list a = msgs->clientWillAskFor;
915    struct request_list b = msgs->clientAskedFor;
916
917    msgs->clientAskedFor = REQUEST_LIST_INIT;
918    msgs->clientWillAskFor = REQUEST_LIST_INIT;
919
920    for( i=0; i<a.count; ++i )
921        fireCancelledReq( msgs, a.requests[i].index );
922
923    for( i=0; i<b.count; ++i ) {
924        fireCancelledReq( msgs, b.requests[i].index );
925        protocolSendCancel( msgs, &b.requests[i] );
926    }
927
928    reqListClear( &a );
929    reqListClear( &b );
930}
931
932/**
933***
934**/
935
936static void
937sendLtepHandshake( tr_peermsgs * msgs )
938{
939    tr_benc val, *m;
940    char * buf;
941    int len;
942    int pex;
943    struct evbuffer * out = msgs->outMessages;
944
945    if( msgs->clientSentLtepHandshake )
946        return;
947
948    dbgmsg( msgs, "sending an ltep handshake" );
949    msgs->clientSentLtepHandshake = 1;
950
951    /* decide if we want to advertise pex support */
952    if( !tr_torrentAllowsPex( msgs->torrent ) )
953        pex = 0;
954    else if( msgs->peerSentLtepHandshake )
955        pex = msgs->peerSupportsPex ? 1 : 0;
956    else
957        pex = 1;
958
959    tr_bencInitDict( &val, 4 );
960    tr_bencDictAddInt( &val, "e", msgs->session->encryptionMode != TR_CLEAR_PREFERRED );
961    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->session ) );
962    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
963    m  = tr_bencDictAddDict( &val, "m", 1 );
964    if( pex )
965        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
966    buf = tr_bencSave( &val, &len );
967
968    tr_peerIoWriteUint32( msgs->io, out, 2*sizeof(uint8_t) + len );
969    tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
970    tr_peerIoWriteUint8 ( msgs->io, out, LTEP_HANDSHAKE );
971    tr_peerIoWriteBytes ( msgs->io, out, buf, len );
972    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
973    dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH(out) );
974
975    /* cleanup */
976    tr_bencFree( &val );
977    tr_free( buf );
978}
979
980static void
981parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
982{
983    int64_t i;
984    tr_benc val, * sub;
985    uint8_t * tmp = tr_new( uint8_t, len );
986
987    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
988    msgs->peerSentLtepHandshake = 1;
989
990    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
991        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
992        tr_free( tmp );
993        return;
994    }
995
996    dbgmsg( msgs, "here is the ltep handshake we got [%*.*s]", len, len, tmp );
997
998    /* does the peer prefer encrypted connections? */
999    if( tr_bencDictFindInt( &val, "e", &i ) )
1000        msgs->info->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1001                                              : ENCRYPTION_PREFERENCE_NO;
1002
1003    /* check supported messages for utorrent pex */
1004    msgs->peerSupportsPex = 0;
1005    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
1006        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
1007            msgs->ut_pex_id = (uint8_t) i;
1008            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
1009            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
1010        }
1011    }
1012
1013    /* get peer's listening port */
1014    if( tr_bencDictFindInt( &val, "p", &i ) ) {
1015        msgs->info->port = htons( (uint16_t)i );
1016        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
1017    }
1018
1019    tr_bencFree( &val );
1020    tr_free( tmp );
1021}
1022
1023static void
1024parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1025{
1026    int loaded = 0;
1027    uint8_t * tmp = tr_new( uint8_t, msglen );
1028    tr_benc val;
1029    const tr_torrent * tor = msgs->torrent;
1030    const uint8_t * added;
1031    size_t added_len;
1032
1033    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
1034
1035    if( tr_torrentAllowsPex( tor )
1036        && (( loaded = !tr_bencLoad( tmp, msglen, &val, NULL )))
1037        && tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1038    {
1039        const uint8_t * added_f = NULL;
1040        tr_pex * pex;
1041        size_t i, n;
1042        size_t added_f_len = 0;
1043        tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1044        pex = tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len, &n );
1045        for( i=0; i<n; ++i )
1046            tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1047                              TR_PEER_FROM_PEX, pex+i );
1048        tr_free( pex );
1049    }
1050
1051    if( loaded )
1052        tr_bencFree( &val );
1053    tr_free( tmp );
1054}
1055
1056static void
1057sendPex( tr_peermsgs * msgs );
1058
1059static void
1060parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1061{
1062    uint8_t ltep_msgid;
1063
1064    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
1065    msglen--;
1066
1067    if( ltep_msgid == LTEP_HANDSHAKE )
1068    {
1069        dbgmsg( msgs, "got ltep handshake" );
1070        parseLtepHandshake( msgs, msglen, inbuf );
1071        if( tr_peerIoSupportsLTEP( msgs->io ) )
1072        {
1073            sendLtepHandshake( msgs );
1074            sendPex( msgs );
1075        }
1076    }
1077    else if( ltep_msgid == TR_LTEP_PEX )
1078    {
1079        dbgmsg( msgs, "got ut pex" );
1080        msgs->peerSupportsPex = 1;
1081        parseUtPex( msgs, msglen, inbuf );
1082    }
1083    else
1084    {
1085        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1086        evbuffer_drain( inbuf, msglen );
1087    }
1088}
1089
1090static int
1091readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1092{
1093    uint32_t len;
1094
1095    if( inlen < sizeof(len) )
1096        return READ_MORE;
1097
1098    tr_peerIoReadUint32( msgs->io, inbuf, &len );
1099
1100    if( len == 0 ) /* peer sent us a keepalive message */
1101        dbgmsg( msgs, "got KeepAlive" );
1102    else {
1103        msgs->incoming.length = len;
1104        msgs->state = AWAITING_BT_ID;
1105    }
1106
1107    return READ_AGAIN;
1108}
1109
1110static int
1111readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen );
1112
1113static int
1114readBtId( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1115{
1116    uint8_t id;
1117
1118    if( inlen < sizeof(uint8_t) )
1119        return READ_MORE;
1120
1121    tr_peerIoReadUint8( msgs->io, inbuf, &id );
1122    msgs->incoming.id = id;
1123
1124    if( id==BT_PIECE )
1125    {
1126        msgs->state = AWAITING_BT_PIECE;
1127        return READ_AGAIN;
1128    }
1129    else if( msgs->incoming.length != 1 )
1130    {
1131        msgs->state = AWAITING_BT_MESSAGE;
1132        return READ_AGAIN;
1133    }
1134    else return readBtMessage( msgs, inbuf, inlen-1 );
1135}
1136
1137static void
1138updatePeerProgress( tr_peermsgs * msgs )
1139{
1140    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have )
1141                         / (float)msgs->torrent->info.pieceCount;
1142    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
1143    updateInterest( msgs );
1144    firePeerProgress( msgs );
1145}
1146
1147static int
1148clientCanSendFastBlock( const tr_peermsgs * msgs UNUSED )
1149{
1150    /* don't send a fast piece if peer has MAX_FAST_ALLOWED_THRESHOLD pieces */
1151    if( tr_bitfieldCountTrueBits( msgs->info->have ) > MAX_FAST_ALLOWED_THRESHOLD )
1152        return FALSE;
1153   
1154    /* ...or if we don't have ourself enough pieces */
1155    if( tr_bitfieldCountTrueBits( tr_cpPieceBitfield( msgs->torrent->completion ) ) < MAX_FAST_ALLOWED_THRESHOLD )
1156        return FALSE;
1157
1158    /* Maybe a bandwidth limit ? */
1159    return TRUE;
1160}
1161
1162static void
1163peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
1164{
1165    const int reqIsValid = requestIsValid( msgs, req );
1166    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
1167    const int peerIsChoked = msgs->info->peerIsChoked;
1168    const int peerIsFast = tr_peerIoSupportsFEXT( msgs->io );
1169    const int pieceIsFast = reqIsValid && tr_bitfieldHas( getPeerAllowedPieces( msgs ), req->index );
1170    const int canSendFast = clientCanSendFastBlock( msgs );
1171
1172    if( !reqIsValid ) /* bad request */
1173    {
1174        dbgmsg( msgs, "rejecting an invalid request." );
1175        sendFastReject( msgs, req->index, req->offset, req->length );
1176    }
1177    else if( !clientHasPiece ) /* we don't have it */
1178    {
1179        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1180        sendFastReject( msgs, req->index, req->offset, req->length );
1181    }
1182    else if( peerIsChoked && !peerIsFast ) /* doesn't he know he's choked? */
1183    {
1184        tr_peerMsgsSetChoke( msgs, 1 );
1185        sendFastReject( msgs, req->index, req->offset, req->length );
1186    }
1187    else if( peerIsChoked && peerIsFast && ( !pieceIsFast || !canSendFast ) )
1188    {
1189        sendFastReject( msgs, req->index, req->offset, req->length );
1190    }
1191    else /* YAY */
1192    {
1193        if( peerIsFast && pieceIsFast )
1194            reqListAppend( &msgs->peerAskedForFast, req );
1195        else
1196            reqListAppend( &msgs->peerAskedFor, req );
1197    }
1198}
1199
1200static int
1201messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1202{
1203    switch( id )
1204    {
1205        case BT_CHOKE:
1206        case BT_UNCHOKE:
1207        case BT_INTERESTED:
1208        case BT_NOT_INTERESTED:
1209        case BT_HAVE_ALL:
1210        case BT_HAVE_NONE:
1211            return len==1;
1212
1213        case BT_HAVE:
1214        case BT_SUGGEST:
1215        case BT_ALLOWED_FAST:
1216            return len==5;
1217
1218        case BT_BITFIELD:
1219            return len == (msg->torrent->info.pieceCount+7u)/8u + 1u;
1220       
1221        case BT_REQUEST:
1222        case BT_CANCEL:
1223        case BT_REJECT:
1224            return len==13;
1225
1226        case BT_PIECE:
1227            return len>9 && len<=16393;
1228
1229        case BT_PORT:
1230            return len==3;
1231
1232        case BT_LTEP:
1233            return len >= 2;
1234
1235        default:
1236            return FALSE;
1237    }
1238}
1239
1240static int
1241clientGotBlock( tr_peermsgs * msgs, const uint8_t * block, const struct peer_request * req );
1242
1243static void
1244clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1245{
1246    msgs->info->pieceDataActivityDate = time( NULL );
1247    tr_rcTransferred( msgs->info->rcToClient, byteCount );
1248    fireClientGotData( msgs, byteCount );
1249}
1250
1251static int
1252readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1253{
1254    struct peer_request * req = &msgs->incoming.blockReq;
1255    assert( EVBUFFER_LENGTH(inbuf) >= inlen );
1256    dbgmsg( msgs, "In readBtPiece" );
1257
1258    if( !req->length )
1259    {
1260        if( inlen < 8 )
1261            return READ_MORE;
1262
1263        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
1264        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
1265        req->length = msgs->incoming.length - 9;
1266        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1267        return READ_AGAIN;
1268    }
1269    else
1270    {
1271        int err;
1272
1273        /* read in another chunk of data */
1274        const size_t nLeft = req->length - EVBUFFER_LENGTH(msgs->incoming.block);
1275        size_t n = MIN( nLeft, inlen );
1276        uint8_t * buf = tr_new( uint8_t, n );
1277        assert( EVBUFFER_LENGTH(inbuf) >= n );
1278        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
1279        evbuffer_add( msgs->incoming.block, buf, n );
1280        clientGotBytes( msgs, n );
1281        tr_free( buf );
1282        dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
1283               (int)n, req->index, req->offset, req->length,
1284               (int)( req->length - EVBUFFER_LENGTH(msgs->incoming.block) ) );
1285        if( EVBUFFER_LENGTH(msgs->incoming.block) < req->length )
1286            return READ_MORE;
1287
1288        /* we've got the whole block ... process it */
1289        err = clientGotBlock( msgs, EVBUFFER_DATA(msgs->incoming.block), req );
1290
1291        /* cleanup */
1292        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH(msgs->incoming.block) );
1293        req->length = 0;
1294        msgs->state = AWAITING_BT_LENGTH;
1295        if( !err )
1296            return READ_AGAIN;
1297        else {
1298            fireError( msgs, err );
1299            return READ_DONE;
1300        }
1301    }
1302}
1303
1304static int
1305readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1306{
1307    uint32_t ui32;
1308    uint32_t msglen = msgs->incoming.length;
1309    const uint8_t id = msgs->incoming.id;
1310    const size_t startBufLen = EVBUFFER_LENGTH( inbuf );
1311
1312    --msglen; /* id length */
1313
1314    if( inlen < msglen )
1315        return READ_MORE;
1316
1317    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id, (int)msglen, (int)inlen );
1318
1319    if( !messageLengthIsCorrect( msgs, id, msglen+1 ) )
1320    {
1321        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1322        fireError( msgs, TR_ERROR );
1323        return READ_DONE;
1324    }
1325
1326    switch( id )
1327    {
1328        case BT_CHOKE:
1329            dbgmsg( msgs, "got Choke" );
1330            msgs->info->clientIsChoked = 1;
1331            cancelAllRequestsToPeer( msgs );
1332            cancelAllRequestsToClientExceptFast( msgs );
1333            break;
1334
1335        case BT_UNCHOKE:
1336            dbgmsg( msgs, "got Unchoke" );
1337            msgs->info->clientIsChoked = 0;
1338            fireNeedReq( msgs );
1339            break;
1340
1341        case BT_INTERESTED:
1342            dbgmsg( msgs, "got Interested" );
1343            msgs->info->peerIsInterested = 1;
1344            break;
1345
1346        case BT_NOT_INTERESTED:
1347            dbgmsg( msgs, "got Not Interested" );
1348            msgs->info->peerIsInterested = 0;
1349            break;
1350           
1351        case BT_HAVE:
1352            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1353            dbgmsg( msgs, "got Have: %u", ui32 );
1354            if( tr_bitfieldAdd( msgs->info->have, ui32 ) )
1355                fireError( msgs, TR_ERROR_PEER_MESSAGE );
1356            updatePeerProgress( msgs );
1357            tr_rcTransferred( msgs->torrent->swarmSpeed, msgs->torrent->info.pieceSize );
1358            break;
1359
1360        case BT_BITFIELD: {
1361            dbgmsg( msgs, "got a bitfield" );
1362            msgs->peerSentBitfield = 1;
1363            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
1364            updatePeerProgress( msgs );
1365            maybeSendFastAllowedSet( msgs );
1366            fireNeedReq( msgs );
1367            break;
1368        }
1369
1370        case BT_REQUEST: {
1371            struct peer_request r;
1372            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1373            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1374            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1375            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1376            peerMadeRequest( msgs, &r );
1377            break;
1378        }
1379
1380        case BT_CANCEL: {
1381            struct peer_request r;
1382            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1383            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1384            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1385            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1386            reqListRemove( &msgs->peerAskedForFast, &r );
1387            reqListRemove( &msgs->peerAskedFor, &r );
1388            break;
1389        }
1390
1391        case BT_PIECE:
1392            assert( 0 ); /* handled elsewhere! */
1393            break;
1394       
1395        case BT_PORT:
1396            dbgmsg( msgs, "Got a BT_PORT" );
1397            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1398            break;
1399           
1400        case BT_SUGGEST: {
1401            dbgmsg( msgs, "Got a BT_SUGGEST" );
1402            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1403            /* we don't do anything with this yet */
1404            break;
1405        }
1406           
1407        case BT_HAVE_ALL:
1408            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1409            tr_bitfieldAddRange( msgs->info->have, 0, msgs->torrent->info.pieceCount );
1410            updatePeerProgress( msgs );
1411            maybeSendFastAllowedSet( msgs );
1412            break;
1413       
1414       
1415        case BT_HAVE_NONE:
1416            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1417            tr_bitfieldClear( msgs->info->have );
1418            updatePeerProgress( msgs );
1419            maybeSendFastAllowedSet( msgs );
1420            break;
1421       
1422        case BT_REJECT: {
1423            struct peer_request r;
1424            dbgmsg( msgs, "Got a BT_REJECT" );
1425            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1426            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1427            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1428            reqListRemove( &msgs->clientAskedFor, &r );
1429            break;
1430        }
1431
1432        case BT_ALLOWED_FAST: {
1433            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1434            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1435            /* we don't do anything with this yet */
1436            break;
1437        }
1438
1439        case BT_LTEP:
1440            dbgmsg( msgs, "Got a BT_LTEP" );
1441            parseLtep( msgs, msglen, inbuf );
1442            break;
1443
1444        default:
1445            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1446            tr_peerIoDrain( msgs->io, inbuf, msglen );
1447            break;
1448    }
1449
1450    assert( msglen + 1 == msgs->incoming.length );
1451    assert( EVBUFFER_LENGTH(inbuf) == startBufLen - msglen );
1452
1453    msgs->state = AWAITING_BT_LENGTH;
1454    return READ_AGAIN;
1455}
1456
1457static void
1458peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1459{
1460    msgs->info->pieceDataActivityDate = time( NULL );
1461    tr_rcTransferred( msgs->info->rcToPeer, byteCount );
1462    firePeerGotData( msgs, byteCount );
1463}
1464
1465static size_t
1466getDownloadMax( const tr_peermsgs * msgs )
1467{
1468    static const size_t maxval = ~0;
1469    const tr_torrent * tor = msgs->torrent;
1470
1471    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1472        return tor->handle->useDownloadLimit
1473            ? tr_rcBytesLeft( tor->handle->download ) : maxval;
1474
1475    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
1476        return tr_rcBytesLeft( tor->download );
1477
1478    return maxval;
1479}
1480
1481static void
1482decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount )
1483{
1484    tr_torrent * tor = msgs->torrent;
1485    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1486}
1487
1488static void
1489clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1490{
1491    decrementDownloadedCount( msgs, req->length );
1492}
1493
1494static void
1495addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1496{
1497    if( !msgs->info->blame )
1498         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1499    tr_bitfieldAdd( msgs->info->blame, index );
1500}
1501
1502static tr_errno
1503clientGotBlock( tr_peermsgs                * msgs,
1504                const uint8_t              * data,
1505                const struct peer_request  * req )
1506{
1507    int err;
1508    tr_torrent * tor = msgs->torrent;
1509    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1510
1511    assert( msgs );
1512    assert( req );
1513
1514    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) )
1515    {
1516        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1517                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1518        return TR_ERROR;
1519    }
1520
1521    /* save the block */
1522    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1523
1524    /**
1525    *** Remove the block from our `we asked for this' list
1526    **/
1527
1528    if( reqListRemove( &msgs->clientAskedFor, req ) )
1529    {
1530        clientGotUnwantedBlock( msgs, req );
1531        dbgmsg( msgs, "we didn't ask for this message..." );
1532        return 0;
1533    }
1534
1535    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1536                  msgs->clientAskedFor.count );
1537
1538    /**
1539    *** Error checks
1540    **/
1541
1542    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1543        dbgmsg( msgs, "we have this block already..." );
1544        clientGotUnwantedBlock( msgs, req );
1545        return 0;
1546    }
1547
1548    /**
1549    ***  Save the block
1550    **/
1551
1552    msgs->info->peerSentPieceDataAt = time( NULL );
1553    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1554        return err;
1555
1556    addPeerToBlamefield( msgs, req->index );
1557    fireGotBlock( msgs, req );
1558    return 0;
1559}
1560
1561static void
1562didWrite( struct bufferevent * evin UNUSED, void * vmsgs )
1563{
1564    peerPulse( vmsgs );
1565}
1566
1567static ReadState
1568canRead( struct bufferevent * evin, void * vmsgs )
1569{
1570    ReadState ret;
1571    tr_peermsgs * msgs = vmsgs;
1572    struct evbuffer * in = EVBUFFER_INPUT ( evin );
1573    const size_t inlen = EVBUFFER_LENGTH( in );
1574
1575    if( !inlen )
1576    {
1577        ret = READ_DONE;
1578    }
1579    else if( msgs->state == AWAITING_BT_PIECE )
1580    {
1581        const size_t downloadMax = getDownloadMax( msgs );
1582        const size_t n = MIN( inlen, downloadMax );
1583        ret = n ? readBtPiece( msgs, in, n ) : READ_DONE;
1584    }
1585    else switch( msgs->state )
1586    {
1587        case AWAITING_BT_LENGTH:  ret = readBtLength ( msgs, in, inlen ); break;
1588        case AWAITING_BT_ID:      ret = readBtId     ( msgs, in, inlen ); break;
1589        case AWAITING_BT_MESSAGE: ret = readBtMessage( msgs, in, inlen ); break;
1590        default:                  assert( 0 );
1591    }
1592
1593    return ret;
1594}
1595
1596static void
1597sendKeepalive( tr_peermsgs * msgs )
1598{
1599    dbgmsg( msgs, "sending a keepalive message" );
1600    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1601    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1602}
1603
1604/**
1605***
1606**/
1607
1608static size_t
1609getUploadMax( const tr_peermsgs * msgs )
1610{
1611    static const size_t maxval = ~0;
1612    const tr_torrent * tor = msgs->torrent;
1613    int speedLeft;
1614    int bufLeft;
1615
1616    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1617        speedLeft = tor->handle->useUploadLimit ? tr_rcBytesLeft( tor->handle->upload ) : maxval;
1618    else if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
1619        speedLeft = tr_rcBytesLeft( tor->upload );
1620    else
1621        speedLeft = INT_MAX;
1622
1623    /* this basically says the outbuf shouldn't have more than one block
1624     * queued up in it... blocksize, +13 for the size of the BT protocol's
1625     * block message overhead */
1626    bufLeft = tor->blockSize + 13 - tr_peerIoWriteBytesWaiting( msgs->io );
1627    return MIN( speedLeft, bufLeft );
1628}
1629
1630static int
1631getMaxBlocksFromPeerSoon( const tr_peermsgs * peer )
1632{
1633    const double seconds = 30;
1634    const double bytesPerSecond = peer->info->rateToClient * 1024;
1635    const double totalBytes = bytesPerSecond * seconds;
1636    const int blockCount = totalBytes / peer->torrent->blockSize;
1637    /*fprintf( stderr, "rate %f -- blockCount %d\n", peer->info->rateToClient, blockCount );*/
1638    return blockCount;
1639}
1640
1641static int
1642ratePulse( void * vpeer )
1643{
1644    tr_peermsgs * peer = vpeer;
1645    peer->info->rateToClient = tr_rcRate( peer->info->rcToClient );
1646    peer->info->rateToPeer = tr_rcRate( peer->info->rcToPeer );
1647    peer->minActiveRequests = 4;
1648    peer->maxActiveRequests = peer->minActiveRequests + getMaxBlocksFromPeerSoon( peer );
1649    return TRUE;
1650}
1651
1652static tr_errno
1653popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
1654{
1655    if( !reqListPop( &msgs->peerAskedForFast, setme ) )
1656        return 0;
1657    if( !reqListPop( &msgs->peerAskedFor, setme ) )
1658        return 0;
1659
1660    return TR_ERROR;
1661}
1662
1663static int
1664peerPulse( void * vmsgs )
1665{
1666    const time_t now = time( NULL );
1667    tr_peermsgs * msgs = vmsgs;
1668
1669    tr_peerIoTryRead( msgs->io );
1670    pumpRequestQueue( msgs );
1671    expireOldRequests( msgs );
1672
1673    if( msgs->sendingBlock )
1674    {
1675        const size_t uploadMax = getUploadMax( msgs );
1676        size_t len = EVBUFFER_LENGTH( msgs->outBlock );
1677        const size_t outlen = MIN( len, uploadMax );
1678
1679        assert( len );
1680
1681        if( outlen )
1682        {
1683            tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen );
1684            evbuffer_drain( msgs->outBlock, outlen );
1685            peerGotBytes( msgs, outlen );
1686
1687            len -= outlen;
1688            msgs->clientSentAnythingAt = now;
1689            msgs->sendingBlock = len != 0;
1690
1691            dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen, (int)len );
1692        }
1693        else dbgmsg( msgs, "stalled writing block... uploadMax %lu, outlen %lu", uploadMax, outlen );
1694    }
1695
1696    if( !msgs->sendingBlock )
1697    {
1698        struct peer_request req;
1699        int haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1700
1701        if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1702        {
1703            dbgmsg( msgs, "started an outMessages batch (length is %d)", (int)EVBUFFER_LENGTH( msgs->outMessages ) );
1704            msgs->outMessagesBatchedAt = now;
1705        }
1706        else if( haveMessages
1707                 && ( ( now - msgs->outMessagesBatchedAt ) > msgs->outMessagesBatchPeriod ) )
1708        {
1709            dbgmsg( msgs, "flushing outMessages... (length is %d)", (int)EVBUFFER_LENGTH( msgs->outMessages ) );
1710            tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1711            msgs->clientSentAnythingAt = now;
1712            msgs->outMessagesBatchedAt = 0;
1713            msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1714        }
1715        else if( !EVBUFFER_LENGTH( msgs->outBlock )
1716            && !popNextRequest( msgs, &req )
1717            && requestIsValid( msgs, &req )
1718            && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) )
1719        {
1720            uint8_t * buf = tr_new( uint8_t, req.length );
1721
1722            if( !tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf ) )
1723            {
1724                tr_peerIo * io = msgs->io;
1725                struct evbuffer * out = msgs->outBlock;
1726
1727                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1728                tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 2*sizeof(uint32_t) + req.length );
1729                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1730                tr_peerIoWriteUint32( io, out, req.index );
1731                tr_peerIoWriteUint32( io, out, req.offset );
1732                tr_peerIoWriteBytes ( io, out, buf, req.length );
1733                msgs->sendingBlock = 1;
1734            }
1735
1736            tr_free( buf );
1737        }
1738        else if(    ( !haveMessages )
1739                 && ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
1740        {
1741            sendKeepalive( msgs );
1742        }
1743    }
1744
1745    return TRUE; /* loop forever */
1746}
1747
1748static void
1749gotError( struct bufferevent * evbuf UNUSED, short what, void * vmsgs )
1750{
1751    if( what & EVBUFFER_TIMEOUT )
1752        dbgmsg( vmsgs, "libevent got a timeout, what=%hd, secs=%d", what, evbuf->timeout_read );
1753    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1754        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1755                what, errno, tr_strerror(errno) );
1756    fireError( vmsgs, TR_ERROR );
1757}
1758
1759static void
1760sendBitfield( tr_peermsgs * msgs )
1761{
1762    struct evbuffer * out = msgs->outMessages;
1763    const tr_piece_index_t pieceCount = msgs->torrent->info.pieceCount;
1764    tr_bitfield * field;
1765    tr_piece_index_t lazyPieces[LAZY_PIECE_COUNT];
1766    int i;
1767    int lazyCount = 0;
1768
1769    field = tr_bitfieldDup( tr_cpPieceBitfield( msgs->torrent->completion ) );
1770
1771    if( tr_sessionIsLazyBitfieldEnabled( msgs->session ) )
1772    {
1773        const int maxLazyCount = MIN( LAZY_PIECE_COUNT, pieceCount );
1774
1775        while( lazyCount < maxLazyCount )
1776        {
1777            const size_t pos = tr_cryptoRandInt ( pieceCount );
1778            if( !tr_bitfieldHas( field, pos ) ) /* already removed it */
1779                continue;
1780            dbgmsg( msgs, "lazy bitfield #%d: piece %d of %d",
1781                    (int)(lazyCount+1), (int)pos, (int)pieceCount );
1782            tr_bitfieldRem( field, pos );
1783            lazyPieces[lazyCount++] = pos;
1784        }
1785    }
1786
1787    tr_peerIoWriteUint32( msgs->io, out, sizeof(uint8_t) + field->byteCount );
1788    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1789    tr_peerIoWriteBytes ( msgs->io, out, field->bits, field->byteCount );
1790    dbgmsg( msgs, "sending bitfield... outMessage size is now %d",
1791                  (int)EVBUFFER_LENGTH(out) );
1792    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1793
1794    for( i=0; i<lazyCount; ++i )
1795        protocolSendHave( msgs, lazyPieces[i] );
1796
1797    tr_bitfieldFree( field );
1798}
1799
1800/**
1801***
1802**/
1803
1804/* some peers give us error messages if we send
1805   more than this many peers in a single pex message
1806   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1807#define MAX_PEX_ADDED 50
1808#define MAX_PEX_DROPPED 50
1809
1810typedef struct
1811{
1812    tr_pex * added;
1813    tr_pex * dropped;
1814    tr_pex * elements;
1815    int addedCount;
1816    int droppedCount;
1817    int elementCount;
1818}
1819PexDiffs;
1820
1821static void
1822pexAddedCb( void * vpex, void * userData )
1823{
1824    PexDiffs * diffs = userData;
1825    tr_pex * pex = vpex;
1826    if( diffs->addedCount < MAX_PEX_ADDED )
1827    {
1828        diffs->added[diffs->addedCount++] = *pex;
1829        diffs->elements[diffs->elementCount++] = *pex;
1830    }
1831}
1832
1833static void
1834pexDroppedCb( void * vpex, void * userData )
1835{
1836    PexDiffs * diffs = userData;
1837    tr_pex * pex = vpex;
1838    if( diffs->droppedCount < MAX_PEX_DROPPED )
1839    {
1840        diffs->dropped[diffs->droppedCount++] = *pex;
1841    }
1842}
1843
1844static void
1845pexElementCb( void * vpex, void * userData )
1846{
1847    PexDiffs * diffs = userData;
1848    tr_pex * pex = vpex;
1849    diffs->elements[diffs->elementCount++] = *pex;
1850}
1851
1852static void
1853sendPex( tr_peermsgs * msgs )
1854{
1855    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
1856    {
1857        int i;
1858        tr_pex * newPex = NULL;
1859        const int newCount = tr_peerMgrGetPeers( msgs->session->peerMgr, msgs->torrent->info.hash, &newPex );
1860        PexDiffs diffs;
1861        tr_benc val, *added, *dropped;
1862        uint8_t *tmp, *walk;
1863        char * benc;
1864        int bencLen;
1865        struct evbuffer * out = msgs->outMessages;
1866
1867        /* build the diffs */
1868        diffs.added = tr_new( tr_pex, newCount );
1869        diffs.addedCount = 0;
1870        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1871        diffs.droppedCount = 0;
1872        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1873        diffs.elementCount = 0;
1874        tr_set_compare( msgs->pex, msgs->pexCount,
1875                        newPex, newCount,
1876                        tr_pexCompare, sizeof(tr_pex),
1877                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
1878        dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1879
1880        /* update peer */
1881        tr_free( msgs->pex );
1882        msgs->pex = diffs.elements;
1883        msgs->pexCount = diffs.elementCount;
1884
1885        /* build the pex payload */
1886        tr_bencInitDict( &val, 3 );
1887
1888        /* "added" */
1889        added = tr_bencDictAdd( &val, "added" );
1890        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1891        for( i=0; i<diffs.addedCount; ++i ) {
1892            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1893            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1894        }
1895        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1896        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
1897
1898        /* "added.f" */
1899        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1900        for( i=0; i<diffs.addedCount; ++i )
1901            *walk++ = diffs.added[i].flags;
1902        assert( ( walk - tmp ) == diffs.addedCount );
1903        tr_bencDictAddRaw( &val, "added.f", tmp, walk-tmp );
1904        tr_free( tmp );
1905
1906        /* "dropped" */
1907        dropped = tr_bencDictAdd( &val, "dropped" );
1908        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1909        for( i=0; i<diffs.droppedCount; ++i ) {
1910            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1911            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1912        }
1913        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1914        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
1915
1916        /* write the pex message */
1917        benc = tr_bencSave( &val, &bencLen );
1918        tr_peerIoWriteUint32( msgs->io, out, 2*sizeof(uint8_t) + bencLen );
1919        tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
1920        tr_peerIoWriteUint8 ( msgs->io, out, msgs->ut_pex_id );
1921        tr_peerIoWriteBytes ( msgs->io, out, benc, bencLen );
1922        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1923        dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH(out) );
1924
1925        /* cleanup */
1926        tr_free( benc );
1927        tr_bencFree( &val );
1928        tr_free( diffs.added );
1929        tr_free( diffs.dropped );
1930        tr_free( newPex );
1931
1932        msgs->clientSentPexAt = time( NULL );
1933    }
1934}
1935
1936static int
1937pexPulse( void * vpeer )
1938{
1939    sendPex( vpeer );
1940    return TRUE;
1941}
1942
1943/**
1944***
1945**/
1946
1947tr_peermsgs*
1948tr_peerMsgsNew( struct tr_torrent * torrent,
1949                struct tr_peer    * info,
1950                tr_delivery_func    func,
1951                void              * userData,
1952                tr_publisher_tag  * setme )
1953{
1954    tr_peermsgs * m;
1955
1956    assert( info );
1957    assert( info->io );
1958
1959    m = tr_new0( tr_peermsgs, 1 );
1960    m->publisher = tr_publisherNew( );
1961    m->info = info;
1962    m->session = torrent->handle;
1963    m->torrent = torrent;
1964    m->io = info->io;
1965    m->info->clientIsChoked = 1;
1966    m->info->peerIsChoked = 1;
1967    m->info->clientIsInterested = 0;
1968    m->info->peerIsInterested = 0;
1969    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1970    m->state = AWAITING_BT_LENGTH;
1971    m->pulseTimer = tr_timerNew( m->session, peerPulse, m, PEER_PULSE_INTERVAL );
1972    m->rateTimer = tr_timerNew( m->session, ratePulse, m, RATE_PULSE_INTERVAL );
1973    m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL );
1974    m->outMessages = evbuffer_new( );
1975    m->outMessagesBatchedAt = 0;
1976    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1977    m->incoming.block = evbuffer_new( );
1978    m->outBlock = evbuffer_new( );
1979    m->peerAllowedPieces = NULL;
1980    m->peerAskedFor = REQUEST_LIST_INIT;
1981    m->peerAskedForFast = REQUEST_LIST_INIT;
1982    m->clientAskedFor = REQUEST_LIST_INIT;
1983    m->clientWillAskFor = REQUEST_LIST_INIT;
1984    *setme = tr_publisherSubscribe( m->publisher, func, userData );
1985
1986    if ( tr_peerIoSupportsLTEP( m->io ) )
1987        sendLtepHandshake( m );
1988
1989    sendBitfield( m );
1990   
1991    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of inactivity */
1992    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
1993    ratePulse( m );
1994
1995    return m;
1996}
1997
1998void
1999tr_peerMsgsFree( tr_peermsgs* msgs )
2000{
2001    if( msgs )
2002    {
2003        tr_timerFree( &msgs->pulseTimer );
2004        tr_timerFree( &msgs->rateTimer );
2005        tr_timerFree( &msgs->pexTimer );
2006        tr_publisherFree( &msgs->publisher );
2007        reqListClear( &msgs->clientWillAskFor );
2008        reqListClear( &msgs->clientAskedFor );
2009        reqListClear( &msgs->peerAskedForFast );
2010        reqListClear( &msgs->peerAskedFor );
2011        tr_bitfieldFree( msgs->peerAllowedPieces );
2012        evbuffer_free( msgs->incoming.block );
2013        evbuffer_free( msgs->outMessages );
2014        evbuffer_free( msgs->outBlock );
2015        tr_free( msgs->pex );
2016
2017        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2018        tr_free( msgs );
2019    }
2020}
2021
2022void
2023tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
2024                        tr_publisher_tag    tag )
2025{
2026    tr_publisherUnsubscribe( peer->publisher, tag );
2027}
Note: See TracBrowser for help on using the repository browser.