source: trunk/libtransmission/peer-msgs.c @ 6595

Last change on this file since 6595 was 6595, checked in by muks, 13 years ago

Make tr_bitfieldHas() a macro

  • Property svn:keywords set to Date Rev Author Id
File size: 57.7 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 6595 2008-08-20 13:45:52Z muks $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19#include <libgen.h> /* basename */
20
21#include <event.h>
22
23#include "transmission.h"
24#include "bencode.h"
25#include "completion.h"
26#include "crypto.h"
27#include "inout.h"
28#include "peer-io.h"
29#include "peer-mgr.h"
30#include "peer-mgr-private.h"
31#include "peer-msgs.h"
32#include "ratecontrol.h"
33#include "stats.h"
34#include "torrent.h"
35#include "trevent.h"
36#include "utils.h"
37
38/**
39***
40**/
41
42enum
43{
44    BT_CHOKE                = 0,
45    BT_UNCHOKE              = 1,
46    BT_INTERESTED           = 2,
47    BT_NOT_INTERESTED       = 3,
48    BT_HAVE                 = 4,
49    BT_BITFIELD             = 5,
50    BT_REQUEST              = 6,
51    BT_PIECE                = 7,
52    BT_CANCEL               = 8,
53    BT_PORT                 = 9,
54    BT_SUGGEST              = 13,
55    BT_HAVE_ALL             = 14,
56    BT_HAVE_NONE            = 15,
57    BT_REJECT               = 16,
58    BT_ALLOWED_FAST         = 17,
59    BT_LTEP                 = 20,
60
61    LTEP_HANDSHAKE          = 0,
62
63    TR_LTEP_PEX             = 1,
64
65    MIN_CHOKE_PERIOD_SEC    = (10),
66
67    /* idle seconds before we send a keepalive */
68    KEEPALIVE_INTERVAL_SECS = 100,
69
70    PEX_INTERVAL            = (90 * 1000), /* msec between sendPex() calls */
71    PEER_PULSE_INTERVAL     = (50),        /* msec between peerPulse() calls */
72    RATE_PULSE_INTERVAL     = (250),       /* msec between ratePulse() calls */
73
74    MAX_QUEUE_SIZE          = (100),
75     
76    /* (fast peers) max number of pieces we fast-allow to another peer */
77    MAX_FAST_ALLOWED_COUNT   = 10,
78
79    /* (fast peers) max threshold for allowing fast-pieces requests */
80    MAX_FAST_ALLOWED_THRESHOLD = 10,
81
82    /* how long an unsent request can stay queued before it's returned
83       back to the peer-mgr's pool of requests */
84    QUEUED_REQUEST_TTL_SECS = 20,
85
86    /* how long a sent request can stay queued before it's returned
87       back to the peer-mgr's pool of requests */
88    SENT_REQUEST_TTL_SECS = 240,
89
90    /* used in lowering the outMessages queue period */
91    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
92    HIGH_PRIORITY_INTERVAL_SECS = 2,
93    LOW_PRIORITY_INTERVAL_SECS = 20,
94
95    /* number of pieces to remove from the bitfield when
96     * lazy bitfields are turned on */
97    LAZY_PIECE_COUNT = 26
98};
99
100/**
101***  REQUEST MANAGEMENT
102**/
103
104enum
105{
106    AWAITING_BT_LENGTH,
107    AWAITING_BT_ID,
108    AWAITING_BT_MESSAGE,
109    AWAITING_BT_PIECE
110};
111
112struct peer_request
113{
114    uint32_t index;
115    uint32_t offset;
116    uint32_t length;
117    time_t time_requested;
118};
119
120static int
121compareRequest( const void * va, const void * vb )
122{
123    int i;
124    const struct peer_request * a = va;
125    const struct peer_request * b = vb;
126    if(( i = tr_compareUint32( a->index, b->index ))) return i;
127    if(( i = tr_compareUint32( a->offset, b->offset ))) return i;
128    if(( i = tr_compareUint32( a->length, b->length ))) return i;
129    return 0;
130}
131
132struct request_list
133{
134    uint16_t count;
135    uint16_t max;
136    struct peer_request * requests;
137};
138
139static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
140
141static void
142reqListReserve( struct request_list * list, uint16_t max )
143{
144    if( list->max < max )
145    {
146        list->max = max;
147        list->requests = tr_renew( struct peer_request,
148                                   list->requests,
149                                   list->max );
150    }
151}
152
153static void
154reqListClear( struct request_list * list )
155{
156    tr_free( list->requests );
157    *list = REQUEST_LIST_INIT;
158}
159
160static void
161reqListCopy( struct request_list * dest, const struct request_list * src )
162{
163    dest->count = dest->max = src->count;
164    dest->requests = tr_memdup( src->requests, dest->count * sizeof( struct peer_request ) );
165}
166
167static void
168reqListRemoveOne( struct request_list * list, int i )
169{
170    assert( 0<=i && i<list->count );
171
172    memmove( &list->requests[i],
173             &list->requests[i+1],
174             sizeof( struct peer_request ) * ( --list->count - i ) );
175}
176
177static void
178reqListAppend( struct request_list * list, const struct peer_request * req )
179{
180    if( ++list->count >= list->max )
181        reqListReserve( list, list->max + 8 );
182
183    list->requests[list->count-1] = *req;
184}
185
186static void
187reqListAppendPiece( const tr_torrent     * tor,
188                    struct request_list  * list,
189                    tr_piece_index_t       piece )
190{
191    const time_t now = time( NULL );
192    const size_t n = tr_torPieceCountBlocks( tor, piece );
193    const tr_block_index_t begin = tr_torPieceFirstBlock( tor, piece );
194    const tr_block_index_t end = begin + n;
195    tr_block_index_t i;
196
197    if( list->count + n >= list->max )
198        reqListReserve( list, list->max + n );
199
200    for( i=begin; i<end; ++i ) {
201        if( !tr_cpBlockIsComplete( tor->completion, i ) ) {
202            struct peer_request * req = list->requests + list->count++;
203            req->index = piece;
204            req->offset = (i * tor->blockSize) - (piece * tor->info.pieceSize);
205            req->length = tr_torBlockCountBytes( tor, i );
206            req->time_requested = now;
207            assert( tr_torrentReqIsValid( tor, req->index, req->offset, req->length ) );
208        }
209    }
210}
211
212static tr_errno
213reqListPop( struct request_list * list, struct peer_request * setme )
214{
215    tr_errno err;
216
217    if( !list->count )
218        err = TR_ERROR;
219    else {
220        *setme = list->requests[0];
221        reqListRemoveOne( list, 0 );
222        err = TR_OK;
223    }
224
225    return err;
226}
227
228static int
229reqListHasPiece( struct request_list * list, const tr_piece_index_t piece )
230{
231    uint16_t i;
232
233    for( i=0; i<list->count; ++i )
234        if( list->requests[i].index == piece )
235            return 1;
236
237    return 0;
238}
239
240static int
241reqListFind( struct request_list * list, const struct peer_request * key )
242{
243    uint16_t i;
244
245    for( i=0; i<list->count; ++i )
246        if( !compareRequest( key, list->requests+i ) )
247            return i;
248
249    return -1;
250}
251
252static tr_errno
253reqListRemove( struct request_list * list, const struct peer_request * key )
254{
255    tr_errno err;
256    const int i = reqListFind( list, key );
257
258    if( i < 0 )
259        err = TR_ERROR;
260    else {
261        err = TR_OK;
262        reqListRemoveOne( list, i );
263    }
264
265    return err;
266}
267
268/**
269***
270**/
271
272/* this is raw, unchanged data from the peer regarding
273 * the current message that it's sending us. */
274struct tr_incoming
275{
276    uint8_t id;
277    uint32_t length; /* includes the +1 for id length */
278    struct peer_request blockReq; /* metadata for incoming blocks */
279    struct evbuffer * block; /* piece data for incoming blocks */
280};
281
282struct tr_peermsgs
283{
284    unsigned int peerSentBitfield         : 1;
285    unsigned int peerSupportsPex          : 1;
286    unsigned int clientSentLtepHandshake  : 1;
287    unsigned int peerSentLtepHandshake    : 1;
288    unsigned int sendingBlock             : 1;
289   
290    uint8_t state;
291    uint8_t ut_pex_id;
292    uint16_t pexCount;
293    uint16_t minActiveRequests;
294    uint16_t maxActiveRequests;
295
296    /* how long the outMessages batch should be allowed to grow before
297     * it's flushed -- some messages (like requests >:) should be sent
298     * very quickly; others aren't as urgent. */
299    int outMessagesBatchPeriod;
300
301    tr_peer * info;
302
303    tr_session * session;
304    tr_torrent * torrent;
305    tr_peerIo * io;
306
307    tr_publisher_t * publisher;
308
309    struct evbuffer * outBlock;    /* buffer of the current piece message */
310    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
311
312    struct request_list peerAskedFor;
313    struct request_list peerAskedForFast;
314    struct request_list clientAskedFor;
315    struct request_list clientWillAskFor;
316
317    tr_timer * rateTimer;
318    tr_timer * pulseTimer;
319    tr_timer * pexTimer;
320
321    time_t clientSentPexAt;
322    time_t clientSentAnythingAt;
323
324    /* when we started batching the outMessages */
325    time_t outMessagesBatchedAt;
326   
327    tr_bitfield * peerAllowedPieces;
328
329    struct tr_incoming incoming; 
330
331    tr_pex * pex;
332};
333
334/**
335***
336**/
337
338static void
339myDebug( const char * file, int line,
340         const struct tr_peermsgs * msgs,
341         const char * fmt, ... )
342{
343    FILE * fp = tr_getLog( );
344    if( fp )
345    {
346        va_list args;
347        char timestr[64];
348        struct evbuffer * buf = evbuffer_new( );
349        char * myfile = tr_strdup( file );
350
351        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
352                             tr_getLogTimeStr( timestr, sizeof(timestr) ),
353                             msgs->torrent->info.name,
354                             tr_peerIoGetAddrStr( msgs->io ),
355                             msgs->info->client );
356        va_start( args, fmt );
357        evbuffer_add_vprintf( buf, fmt, args );
358        va_end( args );
359        evbuffer_add_printf( buf, " (%s:%d)\n", basename(myfile), line );
360        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
361
362        tr_free( myfile );
363        evbuffer_free( buf );
364    }
365}
366
367#define dbgmsg(msgs, fmt...) myDebug(__FILE__, __LINE__, msgs, ##fmt )
368
369/**
370***
371**/
372
373static void
374pokeBatchPeriod( tr_peermsgs * msgs, int interval )
375{
376    if( msgs->outMessagesBatchPeriod > interval )
377    {
378        msgs->outMessagesBatchPeriod = interval;
379        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
380    }
381}
382
383static void
384protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
385{
386    tr_peerIo * io = msgs->io;
387    struct evbuffer * out = msgs->outMessages;
388
389    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
390    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
391    tr_peerIoWriteUint32( io, out, req->index );
392    tr_peerIoWriteUint32( io, out, req->offset );
393    tr_peerIoWriteUint32( io, out, req->length );
394    dbgmsg( msgs, "requesting %u:%u->%u... outMessage size is now %d",
395            req->index, req->offset, req->length, (int)EVBUFFER_LENGTH(out) );
396    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
397}
398
399static void
400protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
401{
402    tr_peerIo * io = msgs->io;
403    struct evbuffer * out = msgs->outMessages;
404
405    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
406    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
407    tr_peerIoWriteUint32( io, out, req->index );
408    tr_peerIoWriteUint32( io, out, req->offset );
409    tr_peerIoWriteUint32( io, out, req->length );
410    dbgmsg( msgs, "cancelling %u:%u->%u... outMessage size is now %d",
411            req->index, req->offset, req->length, (int)EVBUFFER_LENGTH(out) );
412    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
413}
414
415static void
416protocolSendHave( tr_peermsgs * msgs, uint32_t index )
417{
418    tr_peerIo * io = msgs->io;
419    struct evbuffer * out = msgs->outMessages;
420
421    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
422    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
423    tr_peerIoWriteUint32( io, out, index );
424    dbgmsg( msgs, "sending Have %u.. outMessage size is now %d",
425            index, (int)EVBUFFER_LENGTH(out) );
426    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
427}
428
429static void
430protocolSendChoke( tr_peermsgs * msgs, int choke )
431{
432    tr_peerIo * io = msgs->io;
433    struct evbuffer * out = msgs->outMessages;
434
435    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) );
436    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
437    dbgmsg( msgs, "sending %s... outMessage size is now %d",
438                  (choke ? "Choke" : "Unchoke"),
439                  (int)EVBUFFER_LENGTH(out) );
440    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
441}
442
443/**
444***  EVENTS
445**/
446
447static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0 };
448
449static void
450publish( tr_peermsgs * msgs, tr_peer_event * e )
451{
452    tr_publisherPublish( msgs->publisher, msgs->info, e );
453}
454
455static void
456fireError( tr_peermsgs * msgs, tr_errno err )
457{
458    tr_peer_event e = blankEvent;
459    e.eventType = TR_PEER_ERROR;
460    e.err = err;
461    publish( msgs, &e );
462}
463
464static void
465fireNeedReq( tr_peermsgs * msgs )
466{
467    tr_peer_event e = blankEvent;
468    e.eventType = TR_PEER_NEED_REQ;
469    publish( msgs, &e );
470}
471
472static void
473firePeerProgress( tr_peermsgs * msgs )
474{
475    tr_peer_event e = blankEvent;
476    e.eventType = TR_PEER_PEER_PROGRESS;
477    e.progress = msgs->info->progress;
478    publish( msgs, &e );
479}
480
481static void
482fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
483{
484    tr_peer_event e = blankEvent;
485    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
486    e.pieceIndex = req->index;
487    e.offset = req->offset;
488    e.length = req->length;
489    publish( msgs, &e );
490}
491
492static void
493fireClientGotData( tr_peermsgs * msgs, uint32_t length )
494{
495    tr_peer_event e = blankEvent;
496    e.length = length;
497    e.eventType = TR_PEER_CLIENT_GOT_DATA;
498    publish( msgs, &e );
499}
500
501static void
502firePeerGotData( tr_peermsgs * msgs, uint32_t length )
503{
504    tr_peer_event e = blankEvent;
505    e.length = length;
506    e.eventType = TR_PEER_PEER_GOT_DATA;
507    publish( msgs, &e );
508}
509
510static void
511fireCancelledReq( tr_peermsgs * msgs, const tr_piece_index_t pieceIndex )
512{
513    tr_peer_event e = blankEvent;
514    e.eventType = TR_PEER_CANCEL;
515    e.pieceIndex = pieceIndex;
516    publish( msgs, &e );
517}
518
519/**
520***  INTEREST
521**/
522
523static int
524isPieceInteresting( const tr_peermsgs   * peer,
525                    tr_piece_index_t      piece )
526{
527    const tr_torrent * torrent = peer->torrent;
528
529    return ( ( !torrent->info.pieces[piece].dnd )               /* we want it */
530          && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have */
531          && ( tr_bitfieldHas( peer->info->have, piece ) ) );  /* peer has it */
532}
533
534/* "interested" means we'll ask for piece data if they unchoke us */
535static int
536isPeerInteresting( const tr_peermsgs * msgs )
537{
538    tr_piece_index_t i;
539    const tr_torrent * torrent;
540    const tr_bitfield * bitfield;
541    const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
542
543    if( clientIsSeed )
544        return FALSE;
545
546    torrent = msgs->torrent;
547    bitfield = tr_cpPieceBitfield( torrent->completion );
548
549    if( !msgs->info->have )
550        return TRUE;
551
552    assert( bitfield->byteCount == msgs->info->have->byteCount );
553
554    for( i=0; i<torrent->info.pieceCount; ++i )
555        if( isPieceInteresting( msgs, i ) )
556            return TRUE;
557
558    return FALSE;
559}
560
561static void
562sendInterest( tr_peermsgs * msgs, int weAreInterested )
563{
564    struct evbuffer * out = msgs->outMessages;
565
566    assert( msgs );
567    assert( weAreInterested==0 || weAreInterested==1 );
568
569    msgs->info->clientIsInterested = weAreInterested;
570    dbgmsg( msgs, "Sending %s", weAreInterested ? "Interested" : "Not Interested");
571    tr_peerIoWriteUint32( msgs->io, out, sizeof(uint8_t) );
572    tr_peerIoWriteUint8 ( msgs->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
573    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
574    dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH(out) );
575}
576
577static void
578updateInterest( tr_peermsgs * msgs )
579{
580    const int i = isPeerInteresting( msgs );
581    if( i != msgs->info->clientIsInterested )
582        sendInterest( msgs, i );
583    if( i )
584        fireNeedReq( msgs );
585}
586
587static void
588cancelAllRequestsToClientExceptFast( tr_peermsgs * msgs )
589{
590    reqListClear( &msgs->peerAskedFor );
591}
592
593void
594tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
595{
596    const time_t fibrillationTime = time(NULL) - MIN_CHOKE_PERIOD_SEC;
597
598    assert( msgs );
599    assert( msgs->info );
600    assert( choke==0 || choke==1 );
601
602    if( msgs->info->chokeChangedAt > fibrillationTime )
603    {
604        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
605    }
606    else if( msgs->info->peerIsChoked != choke )
607    {
608        msgs->info->peerIsChoked = choke;
609        if( choke )
610            cancelAllRequestsToClientExceptFast( msgs );
611        protocolSendChoke( msgs, choke );
612        msgs->info->chokeChangedAt = time( NULL );
613    }
614}
615
616/**
617***
618**/
619
620void
621tr_peerMsgsHave( tr_peermsgs * msgs,
622                 uint32_t      index )
623{
624    protocolSendHave( msgs, index );
625
626    /* since we have more pieces now, we might not be interested in this peer */
627    updateInterest( msgs );
628}
629#if 0
630static void
631sendFastSuggest( tr_peermsgs * msgs,
632                 uint32_t      pieceIndex )
633{
634    assert( msgs );
635   
636    if( tr_peerIoSupportsFEXT( msgs->io ) )
637    {
638        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
639        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
640        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
641    }
642}
643static void
644sendFastHave( tr_peermsgs * msgs, int all )
645{
646    assert( msgs );
647   
648    if( tr_peerIoSupportsFEXT( msgs->io ) )
649    {
650        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
651        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, ( all ? BT_HAVE_ALL
652                                                                : BT_HAVE_NONE ) );
653        updateInterest( msgs );
654    }
655}
656#endif
657
658static void
659sendFastReject( tr_peermsgs * msgs,
660                uint32_t      pieceIndex,
661                uint32_t      offset,
662                uint32_t      length )
663{
664    assert( msgs );
665
666    if( tr_peerIoSupportsFEXT( msgs->io ) )
667    {
668        struct evbuffer * out = msgs->outMessages;
669        const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
670        dbgmsg( msgs, "sending fast reject %u:%u->%u", pieceIndex, offset, length );
671        tr_peerIoWriteUint32( msgs->io, out, len );
672        tr_peerIoWriteUint8( msgs->io, out, BT_REJECT );
673        tr_peerIoWriteUint32( msgs->io, out, pieceIndex );
674        tr_peerIoWriteUint32( msgs->io, out, offset );
675        tr_peerIoWriteUint32( msgs->io, out, length );
676        pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
677        dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH(out) );
678    }
679}
680
681static tr_bitfield*
682getPeerAllowedPieces( tr_peermsgs * msgs )
683{
684    if( !msgs->peerAllowedPieces && tr_peerIoSupportsFEXT( msgs->io ) )
685    {
686        msgs->peerAllowedPieces = tr_peerMgrGenerateAllowedSet(
687            MAX_FAST_ALLOWED_COUNT,
688            msgs->torrent->info.pieceCount,
689            msgs->torrent->info.hash,
690            tr_peerIoGetAddress( msgs->io, NULL ) );
691    }
692
693    return msgs->peerAllowedPieces;
694}
695
696static void
697sendFastAllowed( tr_peermsgs * msgs,
698                 uint32_t      pieceIndex)
699{
700    assert( msgs );
701   
702    if( tr_peerIoSupportsFEXT( msgs->io ) )
703    {
704        struct evbuffer * out = msgs->outMessages;
705        dbgmsg( msgs, "sending fast allowed" );
706        tr_peerIoWriteUint32( msgs->io, out,  sizeof(uint8_t) + sizeof(uint32_t) );
707        tr_peerIoWriteUint8( msgs->io, out, BT_ALLOWED_FAST );
708        tr_peerIoWriteUint32( msgs->io, out, pieceIndex );
709        pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
710        dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH(out) );
711    }
712}
713
714static void
715sendFastAllowedSet( tr_peermsgs * msgs )
716{
717    tr_piece_index_t i = 0;
718
719    while (i <= msgs->torrent->info.pieceCount )
720    {
721        if ( tr_bitfieldHas( getPeerAllowedPieces( msgs ), i) )
722            sendFastAllowed( msgs, i );
723        i++;
724    }
725}
726
727static void
728maybeSendFastAllowedSet( tr_peermsgs * msgs )
729{
730    if( tr_bitfieldCountTrueBits( msgs->info->have ) <= MAX_FAST_ALLOWED_THRESHOLD )
731        sendFastAllowedSet( msgs );
732}
733
734
735/**
736***
737**/
738
739static int
740reqIsValid( const tr_peermsgs   * peer,
741            uint32_t              index,
742            uint32_t              offset,
743            uint32_t              length )
744{
745    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
746}
747
748static int
749requestIsValid( const tr_peermsgs * peer, const struct peer_request * req )
750{
751    return reqIsValid( peer, req->index, req->offset, req->length );
752}
753
754static void
755tr_peerMsgsCancel( tr_peermsgs * msgs,
756                   uint32_t      pieceIndex )
757{
758    uint16_t i;
759    struct request_list tmp = REQUEST_LIST_INIT;
760    struct request_list * src;
761
762    src = &msgs->clientWillAskFor;
763    for( i=0; i<src->count; ++i )
764        if( src->requests[i].index != pieceIndex )
765            reqListAppend( &tmp, src->requests + i );
766
767    /* swap */
768    reqListClear( &msgs->clientWillAskFor );
769    msgs->clientWillAskFor = tmp;
770    tmp = REQUEST_LIST_INIT;
771
772    src = &msgs->clientAskedFor;
773    for( i=0; i<src->count; ++i )
774        if( src->requests[i].index == pieceIndex )
775            protocolSendCancel( msgs, src->requests + i );
776        else
777            reqListAppend( &tmp, src->requests + i );
778
779    /* swap */
780    reqListClear( &msgs->clientAskedFor );
781    msgs->clientAskedFor = tmp;
782    tmp = REQUEST_LIST_INIT;
783
784    fireCancelledReq( msgs, pieceIndex );
785}
786
787static void
788expireOldRequests( tr_peermsgs * msgs )
789{
790    int i;
791    time_t oldestAllowed;
792    struct request_list tmp = REQUEST_LIST_INIT;
793
794    /* cancel requests that have been queued for too long */
795    oldestAllowed = time( NULL ) - QUEUED_REQUEST_TTL_SECS;
796    reqListCopy( &tmp, &msgs->clientWillAskFor );
797    for( i=0; i<tmp.count; ++i ) {
798        const struct peer_request * req = &tmp.requests[i];
799        if( req->time_requested < oldestAllowed )
800            tr_peerMsgsCancel( msgs, req->index );
801    }
802    reqListClear( &tmp );
803
804    /* cancel requests that were sent too long ago */
805    oldestAllowed = time( NULL ) - SENT_REQUEST_TTL_SECS;
806    reqListCopy( &tmp, &msgs->clientAskedFor );
807    for( i=0; i<tmp.count; ++i ) {
808        const struct peer_request * req = &tmp.requests[i];
809        if( req->time_requested < oldestAllowed )
810            tr_peerMsgsCancel( msgs, req->index );
811    }
812    reqListClear( &tmp );
813}
814
815static void
816pumpRequestQueue( tr_peermsgs * msgs )
817{
818    const int max = msgs->maxActiveRequests;
819    const int min = msgs->minActiveRequests;
820    const time_t now = time( NULL );
821    int sent = 0;
822    int count = msgs->clientAskedFor.count;
823    struct peer_request req;
824
825    if( count > min )
826        return;
827    if( msgs->info->clientIsChoked )
828        return;
829
830    while( ( count < max ) && !reqListPop( &msgs->clientWillAskFor, &req ) )
831    {
832        assert( requestIsValid( msgs, &req ) );
833        assert( tr_bitfieldHas( msgs->info->have, req.index ) );
834
835        protocolSendRequest( msgs, &req );
836        req.time_requested = now;
837        reqListAppend( &msgs->clientAskedFor, &req );
838
839        ++count;
840        ++sent;
841    }
842
843    if( sent )
844        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
845                sent,
846                msgs->clientAskedFor.count,
847                msgs->clientWillAskFor.count );
848
849    if( count < max )
850        fireNeedReq( msgs );
851}
852
853static int
854peerPulse( void * vmsgs );
855
856static int
857requestQueueIsFull( const tr_peermsgs * msgs )
858{
859    const int req_max = msgs->maxActiveRequests;
860    return msgs->clientWillAskFor.count >= req_max;
861}
862
863int
864tr_peerMsgsAddRequest( tr_peermsgs       * msgs,
865                       tr_piece_index_t    piece )
866{
867    struct peer_request req;
868
869    assert( msgs );
870    assert( msgs->torrent );
871    assert( piece < msgs->torrent->info.pieceCount );
872
873    /**
874    ***  Reasons to decline the request
875    **/
876
877    /* don't send requests to choked clients */
878    if( msgs->info->clientIsChoked ) {
879        dbgmsg( msgs, "declining request because they're choking us" );
880        return TR_ADDREQ_CLIENT_CHOKED;
881    }
882
883    /* peer doesn't have this piece */
884    if( !tr_bitfieldHas( msgs->info->have, piece ) )
885        return TR_ADDREQ_MISSING;
886
887    /* peer's queue is full */
888    if( requestQueueIsFull( msgs ) ) {
889        dbgmsg( msgs, "declining request because we're full" );
890        return TR_ADDREQ_FULL;
891    }
892
893    /* have we already asked for this piece? */
894    if( reqListHasPiece( &msgs->clientAskedFor, piece ) || 
895        reqListHasPiece( &msgs->clientWillAskFor, piece ) ) {
896        dbgmsg( msgs, "declining because it's a duplicate" );
897        return TR_ADDREQ_DUPLICATE;
898    }
899
900    /**
901    ***  Accept this request
902    **/
903
904    dbgmsg( msgs, "added req for piece %lu", (unsigned long)piece );
905    req.time_requested = time( NULL );
906    reqListAppendPiece( msgs->torrent, &msgs->clientWillAskFor, piece );
907    return TR_ADDREQ_OK;
908}
909
910static void
911cancelAllRequestsToPeer( tr_peermsgs * msgs )
912{
913    int i;
914    struct request_list a = msgs->clientWillAskFor;
915    struct request_list b = msgs->clientAskedFor;
916
917    msgs->clientAskedFor = REQUEST_LIST_INIT;
918    msgs->clientWillAskFor = REQUEST_LIST_INIT;
919
920    for( i=0; i<a.count; ++i )
921        fireCancelledReq( msgs, a.requests[i].index );
922
923    for( i=0; i<b.count; ++i ) {
924        fireCancelledReq( msgs, b.requests[i].index );
925        protocolSendCancel( msgs, &b.requests[i] );
926    }
927
928    reqListClear( &a );
929    reqListClear( &b );
930}
931
932/**
933***
934**/
935
936static void
937sendLtepHandshake( tr_peermsgs * msgs )
938{
939    tr_benc val, *m;
940    char * buf;
941    int len;
942    int pex;
943    struct evbuffer * out = msgs->outMessages;
944
945    if( msgs->clientSentLtepHandshake )
946        return;
947
948    dbgmsg( msgs, "sending an ltep handshake" );
949    msgs->clientSentLtepHandshake = 1;
950
951    /* decide if we want to advertise pex support */
952    if( !tr_torrentAllowsPex( msgs->torrent ) )
953        pex = 0;
954    else if( msgs->peerSentLtepHandshake )
955        pex = msgs->peerSupportsPex ? 1 : 0;
956    else
957        pex = 1;
958
959    tr_bencInitDict( &val, 4 );
960    tr_bencDictAddInt( &val, "e", msgs->session->encryptionMode != TR_CLEAR_PREFERRED );
961    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->session ) );
962    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
963    m  = tr_bencDictAddDict( &val, "m", 1 );
964    if( pex )
965        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
966    buf = tr_bencSave( &val, &len );
967
968    tr_peerIoWriteUint32( msgs->io, out, 2*sizeof(uint8_t) + len );
969    tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
970    tr_peerIoWriteUint8 ( msgs->io, out, LTEP_HANDSHAKE );
971    tr_peerIoWriteBytes ( msgs->io, out, buf, len );
972    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
973    dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH(out) );
974
975    /* cleanup */
976    tr_bencFree( &val );
977    tr_free( buf );
978}
979
980static void
981parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
982{
983    tr_benc val, * sub;
984    uint8_t * tmp = tr_new( uint8_t, len );
985
986    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
987    msgs->peerSentLtepHandshake = 1;
988
989    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
990        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
991        tr_free( tmp );
992        return;
993    }
994
995    dbgmsg( msgs, "here is the ltep handshake we got [%*.*s]", len, len, tmp );
996
997    /* does the peer prefer encrypted connections? */
998    if(( sub = tr_bencDictFindType( &val, "e", TYPE_INT )))
999        msgs->info->encryption_preference = sub->val.i
1000                                      ? ENCRYPTION_PREFERENCE_YES
1001                                      : ENCRYPTION_PREFERENCE_NO;
1002
1003    /* check supported messages for utorrent pex */
1004    msgs->peerSupportsPex = 0;
1005    if(( sub = tr_bencDictFindType( &val, "m", TYPE_DICT ))) {
1006        if(( sub = tr_bencDictFindType( sub, "ut_pex", TYPE_INT ))) {
1007            msgs->ut_pex_id = (uint8_t) sub->val.i;
1008            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
1009            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
1010        }
1011    }
1012
1013    /* get peer's listening port */
1014    if(( sub = tr_bencDictFindType( &val, "p", TYPE_INT ))) {
1015        msgs->info->port = htons( (uint16_t)sub->val.i );
1016        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
1017    }
1018
1019    tr_bencFree( &val );
1020    tr_free( tmp );
1021}
1022
1023static void
1024parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1025{
1026    int loaded = 0;
1027    uint8_t * tmp = tr_new( uint8_t, msglen );
1028    tr_benc val, *added;
1029    const tr_torrent * tor = msgs->torrent;
1030    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
1031
1032    if( tr_torrentAllowsPex( tor )
1033        && (( loaded = !tr_bencLoad( tmp, msglen, &val, NULL )))
1034        && (( added = tr_bencDictFindType( &val, "added", TYPE_STR ))))
1035    {
1036        const uint8_t * added_f = NULL;
1037        tr_pex * pex;
1038        size_t i, n;
1039        size_t added_f_len = 0;
1040        tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1041        pex = tr_peerMgrCompactToPex( added->val.s.s, added->val.s.i, added_f, added_f_len, &n );
1042        for( i=0; i<n; ++i )
1043            tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1044                              TR_PEER_FROM_PEX, pex+i );
1045        tr_free( pex );
1046    }
1047
1048    if( loaded )
1049        tr_bencFree( &val );
1050    tr_free( tmp );
1051}
1052
1053static void
1054sendPex( tr_peermsgs * msgs );
1055
1056static void
1057parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1058{
1059    uint8_t ltep_msgid;
1060
1061    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
1062    msglen--;
1063
1064    if( ltep_msgid == LTEP_HANDSHAKE )
1065    {
1066        dbgmsg( msgs, "got ltep handshake" );
1067        parseLtepHandshake( msgs, msglen, inbuf );
1068        if( tr_peerIoSupportsLTEP( msgs->io ) )
1069        {
1070            sendLtepHandshake( msgs );
1071            sendPex( msgs );
1072        }
1073    }
1074    else if( ltep_msgid == TR_LTEP_PEX )
1075    {
1076        dbgmsg( msgs, "got ut pex" );
1077        msgs->peerSupportsPex = 1;
1078        parseUtPex( msgs, msglen, inbuf );
1079    }
1080    else
1081    {
1082        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1083        evbuffer_drain( inbuf, msglen );
1084    }
1085}
1086
1087static int
1088readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1089{
1090    uint32_t len;
1091
1092    if( inlen < sizeof(len) )
1093        return READ_MORE;
1094
1095    tr_peerIoReadUint32( msgs->io, inbuf, &len );
1096
1097    if( len == 0 ) /* peer sent us a keepalive message */
1098        dbgmsg( msgs, "got KeepAlive" );
1099    else {
1100        msgs->incoming.length = len;
1101        msgs->state = AWAITING_BT_ID;
1102    }
1103
1104    return READ_AGAIN;
1105}
1106
1107static int
1108readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen );
1109
1110static int
1111readBtId( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1112{
1113    uint8_t id;
1114
1115    if( inlen < sizeof(uint8_t) )
1116        return READ_MORE;
1117
1118    tr_peerIoReadUint8( msgs->io, inbuf, &id );
1119    msgs->incoming.id = id;
1120
1121    if( id==BT_PIECE )
1122    {
1123        msgs->state = AWAITING_BT_PIECE;
1124        return READ_AGAIN;
1125    }
1126    else if( msgs->incoming.length != 1 )
1127    {
1128        msgs->state = AWAITING_BT_MESSAGE;
1129        return READ_AGAIN;
1130    }
1131    else return readBtMessage( msgs, inbuf, inlen-1 );
1132}
1133
1134static void
1135updatePeerProgress( tr_peermsgs * msgs )
1136{
1137    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have )
1138                         / (float)msgs->torrent->info.pieceCount;
1139    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
1140    updateInterest( msgs );
1141    firePeerProgress( msgs );
1142}
1143
1144static int
1145clientCanSendFastBlock( const tr_peermsgs * msgs UNUSED )
1146{
1147    /* don't send a fast piece if peer has MAX_FAST_ALLOWED_THRESHOLD pieces */
1148    if( tr_bitfieldCountTrueBits( msgs->info->have ) > MAX_FAST_ALLOWED_THRESHOLD )
1149        return FALSE;
1150   
1151    /* ...or if we don't have ourself enough pieces */
1152    if( tr_bitfieldCountTrueBits( tr_cpPieceBitfield( msgs->torrent->completion ) ) < MAX_FAST_ALLOWED_THRESHOLD )
1153        return FALSE;
1154
1155    /* Maybe a bandwidth limit ? */
1156    return TRUE;
1157}
1158
1159static void
1160peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
1161{
1162    const int reqIsValid = requestIsValid( msgs, req );
1163    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
1164    const int peerIsChoked = msgs->info->peerIsChoked;
1165    const int peerIsFast = tr_peerIoSupportsFEXT( msgs->io );
1166    const int pieceIsFast = reqIsValid && tr_bitfieldHas( getPeerAllowedPieces( msgs ), req->index );
1167    const int canSendFast = clientCanSendFastBlock( msgs );
1168
1169    if( !reqIsValid ) /* bad request */
1170    {
1171        dbgmsg( msgs, "rejecting an invalid request." );
1172        sendFastReject( msgs, req->index, req->offset, req->length );
1173    }
1174    else if( !clientHasPiece ) /* we don't have it */
1175    {
1176        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1177        sendFastReject( msgs, req->index, req->offset, req->length );
1178    }
1179    else if( peerIsChoked && !peerIsFast ) /* doesn't he know he's choked? */
1180    {
1181        tr_peerMsgsSetChoke( msgs, 1 );
1182        sendFastReject( msgs, req->index, req->offset, req->length );
1183    }
1184    else if( peerIsChoked && peerIsFast && ( !pieceIsFast || !canSendFast ) )
1185    {
1186        sendFastReject( msgs, req->index, req->offset, req->length );
1187    }
1188    else /* YAY */
1189    {
1190        if( peerIsFast && pieceIsFast )
1191            reqListAppend( &msgs->peerAskedForFast, req );
1192        else
1193            reqListAppend( &msgs->peerAskedFor, req );
1194    }
1195}
1196
1197static int
1198messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1199{
1200    switch( id )
1201    {
1202        case BT_CHOKE:
1203        case BT_UNCHOKE:
1204        case BT_INTERESTED:
1205        case BT_NOT_INTERESTED:
1206        case BT_HAVE_ALL:
1207        case BT_HAVE_NONE:
1208            return len==1;
1209
1210        case BT_HAVE:
1211        case BT_SUGGEST:
1212        case BT_ALLOWED_FAST:
1213            return len==5;
1214
1215        case BT_BITFIELD:
1216            return len == (msg->torrent->info.pieceCount+7u)/8u + 1u;
1217       
1218        case BT_REQUEST:
1219        case BT_CANCEL:
1220        case BT_REJECT:
1221            return len==13;
1222
1223        case BT_PIECE:
1224            return len>9 && len<=16393;
1225
1226        case BT_PORT:
1227            return len==3;
1228
1229        case BT_LTEP:
1230            return len >= 2;
1231
1232        default:
1233            return FALSE;
1234    }
1235}
1236
1237static int
1238clientGotBlock( tr_peermsgs * msgs, const uint8_t * block, const struct peer_request * req );
1239
1240static void
1241clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1242{
1243    msgs->info->pieceDataActivityDate = time( NULL );
1244    tr_rcTransferred( msgs->info->rcToClient, byteCount );
1245    fireClientGotData( msgs, byteCount );
1246}
1247
1248static int
1249readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1250{
1251    struct peer_request * req = &msgs->incoming.blockReq;
1252    assert( EVBUFFER_LENGTH(inbuf) >= inlen );
1253    dbgmsg( msgs, "In readBtPiece" );
1254
1255    if( !req->length )
1256    {
1257        if( inlen < 8 )
1258            return READ_MORE;
1259
1260        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
1261        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
1262        req->length = msgs->incoming.length - 9;
1263        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1264        return READ_AGAIN;
1265    }
1266    else
1267    {
1268        int err;
1269
1270        /* read in another chunk of data */
1271        const size_t nLeft = req->length - EVBUFFER_LENGTH(msgs->incoming.block);
1272        size_t n = MIN( nLeft, inlen );
1273        uint8_t * buf = tr_new( uint8_t, n );
1274        assert( EVBUFFER_LENGTH(inbuf) >= n );
1275        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
1276        evbuffer_add( msgs->incoming.block, buf, n );
1277        clientGotBytes( msgs, n );
1278        tr_free( buf );
1279        dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
1280               (int)n, req->index, req->offset, req->length,
1281               (int)( req->length - EVBUFFER_LENGTH(msgs->incoming.block) ) );
1282        if( EVBUFFER_LENGTH(msgs->incoming.block) < req->length )
1283            return READ_MORE;
1284
1285        /* we've got the whole block ... process it */
1286        err = clientGotBlock( msgs, EVBUFFER_DATA(msgs->incoming.block), req );
1287
1288        /* cleanup */
1289        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH(msgs->incoming.block) );
1290        req->length = 0;
1291        msgs->state = AWAITING_BT_LENGTH;
1292        if( !err )
1293            return READ_AGAIN;
1294        else {
1295            fireError( msgs, err );
1296            return READ_DONE;
1297        }
1298    }
1299}
1300
1301static int
1302readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1303{
1304    uint32_t ui32;
1305    uint32_t msglen = msgs->incoming.length;
1306    const uint8_t id = msgs->incoming.id;
1307    const size_t startBufLen = EVBUFFER_LENGTH( inbuf );
1308
1309    --msglen; /* id length */
1310
1311    if( inlen < msglen )
1312        return READ_MORE;
1313
1314    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id, (int)msglen, (int)inlen );
1315
1316    if( !messageLengthIsCorrect( msgs, id, msglen+1 ) )
1317    {
1318        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1319        fireError( msgs, TR_ERROR );
1320        return READ_DONE;
1321    }
1322
1323    switch( id )
1324    {
1325        case BT_CHOKE:
1326            dbgmsg( msgs, "got Choke" );
1327            msgs->info->clientIsChoked = 1;
1328            cancelAllRequestsToPeer( msgs );
1329            cancelAllRequestsToClientExceptFast( msgs );
1330            break;
1331
1332        case BT_UNCHOKE:
1333            dbgmsg( msgs, "got Unchoke" );
1334            msgs->info->clientIsChoked = 0;
1335            fireNeedReq( msgs );
1336            break;
1337
1338        case BT_INTERESTED:
1339            dbgmsg( msgs, "got Interested" );
1340            msgs->info->peerIsInterested = 1;
1341            break;
1342
1343        case BT_NOT_INTERESTED:
1344            dbgmsg( msgs, "got Not Interested" );
1345            msgs->info->peerIsInterested = 0;
1346            break;
1347           
1348        case BT_HAVE:
1349            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1350            dbgmsg( msgs, "got Have: %u", ui32 );
1351            if( tr_bitfieldAdd( msgs->info->have, ui32 ) )
1352                fireError( msgs, TR_ERROR_PEER_MESSAGE );
1353            updatePeerProgress( msgs );
1354            tr_rcTransferred( msgs->torrent->swarmSpeed, msgs->torrent->info.pieceSize );
1355            break;
1356
1357        case BT_BITFIELD: {
1358            dbgmsg( msgs, "got a bitfield" );
1359            msgs->peerSentBitfield = 1;
1360            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
1361            updatePeerProgress( msgs );
1362            maybeSendFastAllowedSet( msgs );
1363            fireNeedReq( msgs );
1364            break;
1365        }
1366
1367        case BT_REQUEST: {
1368            struct peer_request r;
1369            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1370            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1371            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1372            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1373            peerMadeRequest( msgs, &r );
1374            break;
1375        }
1376
1377        case BT_CANCEL: {
1378            struct peer_request r;
1379            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1380            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1381            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1382            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1383            reqListRemove( &msgs->peerAskedForFast, &r );
1384            reqListRemove( &msgs->peerAskedFor, &r );
1385            break;
1386        }
1387
1388        case BT_PIECE:
1389            assert( 0 ); /* handled elsewhere! */
1390            break;
1391       
1392        case BT_PORT:
1393            dbgmsg( msgs, "Got a BT_PORT" );
1394            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1395            break;
1396           
1397        case BT_SUGGEST: {
1398            dbgmsg( msgs, "Got a BT_SUGGEST" );
1399            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1400            /* we don't do anything with this yet */
1401            break;
1402        }
1403           
1404        case BT_HAVE_ALL:
1405            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1406            tr_bitfieldAddRange( msgs->info->have, 0, msgs->torrent->info.pieceCount );
1407            updatePeerProgress( msgs );
1408            maybeSendFastAllowedSet( msgs );
1409            break;
1410       
1411       
1412        case BT_HAVE_NONE:
1413            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1414            tr_bitfieldClear( msgs->info->have );
1415            updatePeerProgress( msgs );
1416            maybeSendFastAllowedSet( msgs );
1417            break;
1418       
1419        case BT_REJECT: {
1420            struct peer_request r;
1421            dbgmsg( msgs, "Got a BT_REJECT" );
1422            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1423            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1424            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1425            reqListRemove( &msgs->clientAskedFor, &r );
1426            break;
1427        }
1428
1429        case BT_ALLOWED_FAST: {
1430            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1431            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1432            /* we don't do anything with this yet */
1433            break;
1434        }
1435
1436        case BT_LTEP:
1437            dbgmsg( msgs, "Got a BT_LTEP" );
1438            parseLtep( msgs, msglen, inbuf );
1439            break;
1440
1441        default:
1442            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1443            tr_peerIoDrain( msgs->io, inbuf, msglen );
1444            break;
1445    }
1446
1447    assert( msglen + 1 == msgs->incoming.length );
1448    assert( EVBUFFER_LENGTH(inbuf) == startBufLen - msglen );
1449
1450    msgs->state = AWAITING_BT_LENGTH;
1451    return READ_AGAIN;
1452}
1453
1454static void
1455peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1456{
1457    msgs->info->pieceDataActivityDate = time( NULL );
1458    tr_rcTransferred( msgs->info->rcToPeer, byteCount );
1459    firePeerGotData( msgs, byteCount );
1460}
1461
1462static size_t
1463getDownloadMax( const tr_peermsgs * msgs )
1464{
1465    static const size_t maxval = ~0;
1466    const tr_torrent * tor = msgs->torrent;
1467
1468    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1469        return tor->handle->useDownloadLimit
1470            ? tr_rcBytesLeft( tor->handle->download ) : maxval;
1471
1472    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
1473        return tr_rcBytesLeft( tor->download );
1474
1475    return maxval;
1476}
1477
1478static void
1479decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount )
1480{
1481    tr_torrent * tor = msgs->torrent;
1482    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1483}
1484
1485static void
1486clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1487{
1488    decrementDownloadedCount( msgs, req->length );
1489}
1490
1491static void
1492addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1493{
1494    if( !msgs->info->blame )
1495         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1496    tr_bitfieldAdd( msgs->info->blame, index );
1497}
1498
1499static tr_errno
1500clientGotBlock( tr_peermsgs                * msgs,
1501                const uint8_t              * data,
1502                const struct peer_request  * req )
1503{
1504    int err;
1505    tr_torrent * tor = msgs->torrent;
1506    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1507
1508    assert( msgs );
1509    assert( req );
1510
1511    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) )
1512    {
1513        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1514                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1515        return TR_ERROR;
1516    }
1517
1518    /* save the block */
1519    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1520
1521    /**
1522    *** Remove the block from our `we asked for this' list
1523    **/
1524
1525    if( reqListRemove( &msgs->clientAskedFor, req ) )
1526    {
1527        clientGotUnwantedBlock( msgs, req );
1528        dbgmsg( msgs, "we didn't ask for this message..." );
1529        return 0;
1530    }
1531
1532    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1533                  msgs->clientAskedFor.count );
1534
1535    /**
1536    *** Error checks
1537    **/
1538
1539    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1540        dbgmsg( msgs, "we have this block already..." );
1541        clientGotUnwantedBlock( msgs, req );
1542        return 0;
1543    }
1544
1545    /**
1546    ***  Save the block
1547    **/
1548
1549    msgs->info->peerSentPieceDataAt = time( NULL );
1550    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1551        return err;
1552
1553    addPeerToBlamefield( msgs, req->index );
1554    fireGotBlock( msgs, req );
1555    return 0;
1556}
1557
1558static void
1559didWrite( struct bufferevent * evin UNUSED, void * vmsgs )
1560{
1561    peerPulse( vmsgs );
1562}
1563
1564static ReadState
1565canRead( struct bufferevent * evin, void * vmsgs )
1566{
1567    ReadState ret;
1568    tr_peermsgs * msgs = vmsgs;
1569    struct evbuffer * in = EVBUFFER_INPUT ( evin );
1570    const size_t inlen = EVBUFFER_LENGTH( in );
1571
1572    if( !inlen )
1573    {
1574        ret = READ_DONE;
1575    }
1576    else if( msgs->state == AWAITING_BT_PIECE )
1577    {
1578        const size_t downloadMax = getDownloadMax( msgs );
1579        const size_t n = MIN( inlen, downloadMax );
1580        ret = n ? readBtPiece( msgs, in, n ) : READ_DONE;
1581    }
1582    else switch( msgs->state )
1583    {
1584        case AWAITING_BT_LENGTH:  ret = readBtLength ( msgs, in, inlen ); break;
1585        case AWAITING_BT_ID:      ret = readBtId     ( msgs, in, inlen ); break;
1586        case AWAITING_BT_MESSAGE: ret = readBtMessage( msgs, in, inlen ); break;
1587        default:                  assert( 0 );
1588    }
1589
1590    return ret;
1591}
1592
1593static void
1594sendKeepalive( tr_peermsgs * msgs )
1595{
1596    dbgmsg( msgs, "sending a keepalive message" );
1597    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1598    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1599}
1600
1601/**
1602***
1603**/
1604
1605static size_t
1606getUploadMax( const tr_peermsgs * msgs )
1607{
1608    static const size_t maxval = ~0;
1609    const tr_torrent * tor = msgs->torrent;
1610    int speedLeft;
1611    int bufLeft;
1612
1613    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1614        speedLeft = tor->handle->useUploadLimit ? tr_rcBytesLeft( tor->handle->upload ) : maxval;
1615    else if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
1616        speedLeft = tr_rcBytesLeft( tor->upload );
1617    else
1618        speedLeft = INT_MAX;
1619
1620    /* this basically says the outbuf shouldn't have more than one block
1621     * queued up in it... blocksize, +13 for the size of the BT protocol's
1622     * block message overhead */
1623    bufLeft = tor->blockSize + 13 - tr_peerIoWriteBytesWaiting( msgs->io );
1624    return MIN( speedLeft, bufLeft );
1625}
1626
1627static int
1628getMaxBlocksFromPeerSoon( const tr_peermsgs * peer )
1629{
1630    const double seconds = 30;
1631    const double bytesPerSecond = peer->info->rateToClient * 1024;
1632    const double totalBytes = bytesPerSecond * seconds;
1633    const int blockCount = totalBytes / peer->torrent->blockSize;
1634    /*fprintf( stderr, "rate %f -- blockCount %d\n", peer->info->rateToClient, blockCount );*/
1635    return blockCount;
1636}
1637
1638static int
1639ratePulse( void * vpeer )
1640{
1641    tr_peermsgs * peer = vpeer;
1642    peer->info->rateToClient = tr_rcRate( peer->info->rcToClient );
1643    peer->info->rateToPeer = tr_rcRate( peer->info->rcToPeer );
1644    peer->minActiveRequests = 4;
1645    peer->maxActiveRequests = peer->minActiveRequests + getMaxBlocksFromPeerSoon( peer );
1646    return TRUE;
1647}
1648
1649static tr_errno
1650popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
1651{
1652    if( !reqListPop( &msgs->peerAskedForFast, setme ) )
1653        return 0;
1654    if( !reqListPop( &msgs->peerAskedFor, setme ) )
1655        return 0;
1656
1657    return TR_ERROR;
1658}
1659
1660static int
1661peerPulse( void * vmsgs )
1662{
1663    const time_t now = time( NULL );
1664    tr_peermsgs * msgs = vmsgs;
1665
1666    tr_peerIoTryRead( msgs->io );
1667    pumpRequestQueue( msgs );
1668    expireOldRequests( msgs );
1669
1670    if( msgs->sendingBlock )
1671    {
1672        const size_t uploadMax = getUploadMax( msgs );
1673        size_t len = EVBUFFER_LENGTH( msgs->outBlock );
1674        const size_t outlen = MIN( len, uploadMax );
1675
1676        assert( len );
1677
1678        if( outlen )
1679        {
1680            tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen );
1681            evbuffer_drain( msgs->outBlock, outlen );
1682            peerGotBytes( msgs, outlen );
1683
1684            len -= outlen;
1685            msgs->clientSentAnythingAt = now;
1686            msgs->sendingBlock = len != 0;
1687
1688            dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen, (int)len );
1689        }
1690        else dbgmsg( msgs, "stalled writing block... uploadMax %lu, outlen %lu", uploadMax, outlen );
1691    }
1692
1693    if( !msgs->sendingBlock )
1694    {
1695        struct peer_request req;
1696        int haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1697
1698        if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1699        {
1700            dbgmsg( msgs, "started an outMessages batch (length is %d)", (int)EVBUFFER_LENGTH( msgs->outMessages ) );
1701            msgs->outMessagesBatchedAt = now;
1702        }
1703        else if( haveMessages
1704                 && ( ( now - msgs->outMessagesBatchedAt ) > msgs->outMessagesBatchPeriod ) )
1705        {
1706            dbgmsg( msgs, "flushing outMessages... (length is %d)", (int)EVBUFFER_LENGTH( msgs->outMessages ) );
1707            tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1708            msgs->clientSentAnythingAt = now;
1709            msgs->outMessagesBatchedAt = 0;
1710            msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1711        }
1712        else if( !EVBUFFER_LENGTH( msgs->outBlock )
1713            && !popNextRequest( msgs, &req )
1714            && requestIsValid( msgs, &req )
1715            && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) )
1716        {
1717            uint8_t * buf = tr_new( uint8_t, req.length );
1718
1719            if( !tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf ) )
1720            {
1721                tr_peerIo * io = msgs->io;
1722                struct evbuffer * out = msgs->outBlock;
1723
1724                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1725                tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 2*sizeof(uint32_t) + req.length );
1726                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1727                tr_peerIoWriteUint32( io, out, req.index );
1728                tr_peerIoWriteUint32( io, out, req.offset );
1729                tr_peerIoWriteBytes ( io, out, buf, req.length );
1730                msgs->sendingBlock = 1;
1731            }
1732
1733            tr_free( buf );
1734        }
1735        else if(    ( !haveMessages )
1736                 && ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
1737        {
1738            sendKeepalive( msgs );
1739        }
1740    }
1741
1742    return TRUE; /* loop forever */
1743}
1744
1745static void
1746gotError( struct bufferevent * evbuf UNUSED, short what, void * vmsgs )
1747{
1748    if( what & EVBUFFER_TIMEOUT )
1749        dbgmsg( vmsgs, "libevent got a timeout, what=%hd, secs=%d", what, evbuf->timeout_read );
1750    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1751        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1752                what, errno, tr_strerror(errno) );
1753    fireError( vmsgs, TR_ERROR );
1754}
1755
1756static void
1757sendBitfield( tr_peermsgs * msgs )
1758{
1759    struct evbuffer * out = msgs->outMessages;
1760    const tr_piece_index_t pieceCount = msgs->torrent->info.pieceCount;
1761    tr_bitfield * field;
1762    tr_piece_index_t lazyPieces[LAZY_PIECE_COUNT];
1763    int i;
1764    int lazyCount = 0;
1765
1766    field = tr_bitfieldDup( tr_cpPieceBitfield( msgs->torrent->completion ) );
1767
1768    if( tr_sessionIsLazyBitfieldEnabled( msgs->session ) )
1769    {
1770        const int maxLazyCount = MIN( LAZY_PIECE_COUNT, pieceCount );
1771
1772        while( lazyCount < maxLazyCount )
1773        {
1774            const size_t pos = tr_cryptoRandInt ( pieceCount );
1775            if( !tr_bitfieldHas( field, pos ) ) /* already removed it */
1776                continue;
1777            dbgmsg( msgs, "lazy bitfield #%d: piece %d of %d",
1778                    (int)(lazyCount+1), (int)pos, (int)pieceCount );
1779            tr_bitfieldRem( field, pos );
1780            lazyPieces[lazyCount++] = pos;
1781        }
1782    }
1783
1784    tr_peerIoWriteUint32( msgs->io, out, sizeof(uint8_t) + field->byteCount );
1785    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1786    tr_peerIoWriteBytes ( msgs->io, out, field->bits, field->byteCount );
1787    dbgmsg( msgs, "sending bitfield... outMessage size is now %d",
1788                  (int)EVBUFFER_LENGTH(out) );
1789    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1790
1791    for( i=0; i<lazyCount; ++i )
1792        protocolSendHave( msgs, lazyPieces[i] );
1793
1794    tr_bitfieldFree( field );
1795}
1796
1797/**
1798***
1799**/
1800
1801/* some peers give us error messages if we send
1802   more than this many peers in a single pex message
1803   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1804#define MAX_PEX_ADDED 50
1805#define MAX_PEX_DROPPED 50
1806
1807typedef struct
1808{
1809    tr_pex * added;
1810    tr_pex * dropped;
1811    tr_pex * elements;
1812    int addedCount;
1813    int droppedCount;
1814    int elementCount;
1815}
1816PexDiffs;
1817
1818static void
1819pexAddedCb( void * vpex, void * userData )
1820{
1821    PexDiffs * diffs = userData;
1822    tr_pex * pex = vpex;
1823    if( diffs->addedCount < MAX_PEX_ADDED )
1824    {
1825        diffs->added[diffs->addedCount++] = *pex;
1826        diffs->elements[diffs->elementCount++] = *pex;
1827    }
1828}
1829
1830static void
1831pexDroppedCb( void * vpex, void * userData )
1832{
1833    PexDiffs * diffs = userData;
1834    tr_pex * pex = vpex;
1835    if( diffs->droppedCount < MAX_PEX_DROPPED )
1836    {
1837        diffs->dropped[diffs->droppedCount++] = *pex;
1838    }
1839}
1840
1841static void
1842pexElementCb( void * vpex, void * userData )
1843{
1844    PexDiffs * diffs = userData;
1845    tr_pex * pex = vpex;
1846    diffs->elements[diffs->elementCount++] = *pex;
1847}
1848
1849static void
1850sendPex( tr_peermsgs * msgs )
1851{
1852    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
1853    {
1854        int i;
1855        tr_pex * newPex = NULL;
1856        const int newCount = tr_peerMgrGetPeers( msgs->session->peerMgr, msgs->torrent->info.hash, &newPex );
1857        PexDiffs diffs;
1858        tr_benc val, *added, *dropped;
1859        uint8_t *tmp, *walk;
1860        char * benc;
1861        int bencLen;
1862        struct evbuffer * out = msgs->outMessages;
1863
1864        /* build the diffs */
1865        diffs.added = tr_new( tr_pex, newCount );
1866        diffs.addedCount = 0;
1867        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1868        diffs.droppedCount = 0;
1869        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1870        diffs.elementCount = 0;
1871        tr_set_compare( msgs->pex, msgs->pexCount,
1872                        newPex, newCount,
1873                        tr_pexCompare, sizeof(tr_pex),
1874                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
1875        dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1876
1877        /* update peer */
1878        tr_free( msgs->pex );
1879        msgs->pex = diffs.elements;
1880        msgs->pexCount = diffs.elementCount;
1881
1882        /* build the pex payload */
1883        tr_bencInitDict( &val, 3 );
1884
1885        /* "added" */
1886        added = tr_bencDictAdd( &val, "added" );
1887        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1888        for( i=0; i<diffs.addedCount; ++i ) {
1889            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1890            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1891        }
1892        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1893        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
1894
1895        /* "added.f" */
1896        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1897        for( i=0; i<diffs.addedCount; ++i )
1898            *walk++ = diffs.added[i].flags;
1899        assert( ( walk - tmp ) == diffs.addedCount );
1900        tr_bencDictAddRaw( &val, "added.f", tmp, walk-tmp );
1901        tr_free( tmp );
1902
1903        /* "dropped" */
1904        dropped = tr_bencDictAdd( &val, "dropped" );
1905        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1906        for( i=0; i<diffs.droppedCount; ++i ) {
1907            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1908            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1909        }
1910        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1911        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
1912
1913        /* write the pex message */
1914        benc = tr_bencSave( &val, &bencLen );
1915        tr_peerIoWriteUint32( msgs->io, out, 2*sizeof(uint8_t) + bencLen );
1916        tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
1917        tr_peerIoWriteUint8 ( msgs->io, out, msgs->ut_pex_id );
1918        tr_peerIoWriteBytes ( msgs->io, out, benc, bencLen );
1919        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1920        dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH(out) );
1921
1922        /* cleanup */
1923        tr_free( benc );
1924        tr_bencFree( &val );
1925        tr_free( diffs.added );
1926        tr_free( diffs.dropped );
1927        tr_free( newPex );
1928
1929        msgs->clientSentPexAt = time( NULL );
1930    }
1931}
1932
1933static int
1934pexPulse( void * vpeer )
1935{
1936    sendPex( vpeer );
1937    return TRUE;
1938}
1939
1940/**
1941***
1942**/
1943
1944tr_peermsgs*
1945tr_peerMsgsNew( struct tr_torrent * torrent,
1946                struct tr_peer    * info,
1947                tr_delivery_func    func,
1948                void              * userData,
1949                tr_publisher_tag  * setme )
1950{
1951    tr_peermsgs * m;
1952
1953    assert( info );
1954    assert( info->io );
1955
1956    m = tr_new0( tr_peermsgs, 1 );
1957    m->publisher = tr_publisherNew( );
1958    m->info = info;
1959    m->session = torrent->handle;
1960    m->torrent = torrent;
1961    m->io = info->io;
1962    m->info->clientIsChoked = 1;
1963    m->info->peerIsChoked = 1;
1964    m->info->clientIsInterested = 0;
1965    m->info->peerIsInterested = 0;
1966    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1967    m->state = AWAITING_BT_LENGTH;
1968    m->pulseTimer = tr_timerNew( m->session, peerPulse, m, PEER_PULSE_INTERVAL );
1969    m->rateTimer = tr_timerNew( m->session, ratePulse, m, RATE_PULSE_INTERVAL );
1970    m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL );
1971    m->outMessages = evbuffer_new( );
1972    m->outMessagesBatchedAt = 0;
1973    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1974    m->incoming.block = evbuffer_new( );
1975    m->outBlock = evbuffer_new( );
1976    m->peerAllowedPieces = NULL;
1977    m->peerAskedFor = REQUEST_LIST_INIT;
1978    m->peerAskedForFast = REQUEST_LIST_INIT;
1979    m->clientAskedFor = REQUEST_LIST_INIT;
1980    m->clientWillAskFor = REQUEST_LIST_INIT;
1981    *setme = tr_publisherSubscribe( m->publisher, func, userData );
1982
1983    if ( tr_peerIoSupportsLTEP( m->io ) )
1984        sendLtepHandshake( m );
1985
1986    sendBitfield( m );
1987   
1988    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of inactivity */
1989    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
1990    ratePulse( m );
1991
1992    return m;
1993}
1994
1995void
1996tr_peerMsgsFree( tr_peermsgs* msgs )
1997{
1998    if( msgs )
1999    {
2000        tr_timerFree( &msgs->pulseTimer );
2001        tr_timerFree( &msgs->rateTimer );
2002        tr_timerFree( &msgs->pexTimer );
2003        tr_publisherFree( &msgs->publisher );
2004        reqListClear( &msgs->clientWillAskFor );
2005        reqListClear( &msgs->clientAskedFor );
2006        reqListClear( &msgs->peerAskedForFast );
2007        reqListClear( &msgs->peerAskedFor );
2008        tr_bitfieldFree( msgs->peerAllowedPieces );
2009        evbuffer_free( msgs->incoming.block );
2010        evbuffer_free( msgs->outMessages );
2011        evbuffer_free( msgs->outBlock );
2012        tr_free( msgs->pex );
2013
2014        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2015        tr_free( msgs );
2016    }
2017}
2018
2019void
2020tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
2021                        tr_publisher_tag    tag )
2022{
2023    tr_publisherUnsubscribe( peer->publisher, tag );
2024}
Note: See TracBrowser for help on using the repository browser.