source: trunk/libtransmission/peer-msgs.c @ 5176

Last change on this file since 5176 was 5176, checked in by charles, 14 years ago

#727: Download speed setting also affects upload speed, peer informations in torrent inspector

  • Property svn:keywords set to Date Rev Author Id
File size: 57.7 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 5176 2008-02-29 17:09:33Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <stdio.h>
16#include <stdlib.h>
17#include <string.h>
18#include <libgen.h> /* basename */
19
20#include <event.h>
21
22#include "transmission.h"
23#include "bencode.h"
24#include "completion.h"
25#include "inout.h"
26#include "peer-io.h"
27#include "peer-mgr.h"
28#include "peer-mgr-private.h"
29#include "peer-msgs.h"
30#include "ratecontrol.h"
31#include "stats.h"
32#include "torrent.h"
33#include "trevent.h"
34#include "utils.h"
35
36/**
37***
38**/
39
40enum
41{
42    BT_CHOKE                = 0,
43    BT_UNCHOKE              = 1,
44    BT_INTERESTED           = 2,
45    BT_NOT_INTERESTED       = 3,
46    BT_HAVE                 = 4,
47    BT_BITFIELD             = 5,
48    BT_REQUEST              = 6,
49    BT_PIECE                = 7,
50    BT_CANCEL               = 8,
51    BT_PORT                 = 9,
52    BT_SUGGEST              = 13,
53    BT_HAVE_ALL             = 14,
54    BT_HAVE_NONE            = 15,
55    BT_REJECT               = 16,
56    BT_ALLOWED_FAST         = 17,
57    BT_LTEP                 = 20,
58
59    LTEP_HANDSHAKE          = 0,
60
61    TR_LTEP_PEX             = 1,
62
63    MAX_REQUEST_BYTE_COUNT  = (16 * 1024), /* drop requests who want too much */
64
65    /* idle seconds before we send a keepalive */
66    KEEPALIVE_INTERVAL_SECS = 90,
67
68    PEX_INTERVAL            = (60 * 1000), /* msec between calls to sendPex() */
69    PEER_PULSE_INTERVAL     = (100),       /* msec between calls to pulse() */
70    RATE_PULSE_INTERVAL     = (250),       /* msec between calls to ratePulse() */
71
72    MAX_QUEUE_SIZE          = (100),
73
74    MAX_OUTBUF_SIZE         = (1024),
75     
76    /* Fast Peers Extension constants */
77    MAX_FAST_ALLOWED_COUNT   = 10,          /* max. number of pieces we fast-allow to another peer */
78    MAX_FAST_ALLOWED_THRESHOLD = 10,        /* max threshold for allowing fast-pieces requests */
79
80    QUEUED_REQUEST_TTL_SECS = 20,
81
82    SENT_REQUEST_TTL_SECS = 90
83};
84
85enum
86{
87    AWAITING_BT_LENGTH,
88    AWAITING_BT_ID,
89    AWAITING_BT_MESSAGE,
90    AWAITING_BT_PIECE
91};
92
93struct peer_request
94{
95    uint32_t index;
96    uint32_t offset;
97    uint32_t length;
98    time_t time_requested;
99};
100
101static int
102compareRequest( const void * va, const void * vb )
103{
104    int i;
105    const struct peer_request * a = va;
106    const struct peer_request * b = vb;
107    if(( i = tr_compareUint32( a->index, b->index ))) return i;
108    if(( i = tr_compareUint32( a->offset, b->offset ))) return i;
109    if(( i = tr_compareUint32( a->length, b->length ))) return i;
110    return 0;
111}
112
113struct request_list
114{
115    uint16_t count;
116    uint16_t max;
117    struct peer_request * requests;
118};
119
120static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
121
122static void
123reqListReserve( struct request_list * list, uint16_t max )
124{
125    if( list->max < max )
126    {
127        list->max = max;
128        list->requests = tr_renew( struct peer_request, list->requests, list->max );
129    }
130}
131
132static void
133reqListClear( struct request_list * list )
134{
135    tr_free( list->requests );
136    *list = REQUEST_LIST_INIT;
137}
138
139static void
140reqListRemoveOne( struct request_list * list, int i )
141{
142    assert( 0<=i && i<list->count );
143
144    memmove( &list->requests[i],
145             &list->requests[i+1],
146             sizeof( struct peer_request ) * ( --list->count - i ) );
147}
148
149static void
150reqListAppend( struct request_list * list, const struct peer_request * req )
151{
152    if( ++list->count >= list->max )
153        reqListReserve( list, list->max + 8 );
154
155    list->requests[list->count-1] = *req;
156}
157
158static tr_errno
159reqListPop( struct request_list * list, struct peer_request * setme )
160{
161    tr_errno err;
162
163    if( !list->count )
164        err = TR_ERROR;
165    else {
166        *setme = list->requests[0];
167        reqListRemoveOne( list, 0 );
168        err = TR_OK;
169    }
170
171    return err;
172}
173
174static int
175reqListFind( struct request_list * list, const struct peer_request * key )
176{
177    uint16_t i;
178    for( i=0; i<list->count; ++i )
179        if( !compareRequest( key, list->requests+i ) )
180            return i;
181    return -1;
182}
183
184static tr_errno
185reqListRemove( struct request_list * list, const struct peer_request * key )
186{
187    tr_errno err;
188    const int i = reqListFind( list, key );
189
190    if( i < 0 )
191        err = TR_ERROR;
192    else {
193        err = TR_OK;
194        reqListRemoveOne( list, i );
195    }
196
197    return err;
198}
199
200static void
201reqListPrune( struct request_list * list,
202              struct request_list * pruned,
203              time_t                cutoff )
204{
205    int i, k=0, p=0;
206    struct peer_request keep[MAX_QUEUE_SIZE];
207    struct peer_request prune[MAX_QUEUE_SIZE];
208
209    for( i=0; i<list->count; ++i ) {
210        const struct peer_request * req = list->requests + i;
211        if( req->time_requested > cutoff )
212            keep[k++] = *req;
213        else
214            prune[p++] = *req;
215    }
216
217    memcpy( list->requests, keep, sizeof(struct peer_request) * k );
218    list->count = k;
219
220    reqListReserve( pruned, pruned->count + p );
221    memcpy( pruned->requests + pruned->count, prune, sizeof(struct peer_request) * p );
222    pruned->count += p;
223}
224
225/* this is raw, unchanged data from the peer regarding
226 * the current message that it's sending us. */
227struct tr_incoming
228{
229    uint32_t length; /* includes the +1 for id length */
230    uint8_t id;
231    struct peer_request blockReq; /* metadata for incoming blocks */
232    struct evbuffer * block; /* piece data for incoming blocks */
233};
234
235struct tr_peermsgs
236{
237    tr_peer * info;
238
239    tr_handle * handle;
240    tr_torrent * torrent;
241    tr_peerIo * io;
242
243    tr_publisher_t * publisher;
244
245    struct evbuffer * outBlock;    /* buffer of all the current piece message */
246    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
247    struct request_list peerAskedFor;
248    struct request_list peerAskedForFast;
249    struct request_list clientAskedFor;
250    struct request_list clientWillAskFor;
251
252    tr_timer * rateTimer;
253    tr_timer * pulseTimer;
254    tr_timer * pexTimer;
255
256    time_t lastReqAddedAt;
257    time_t clientSentPexAt;
258    time_t clientSentAnythingAt;
259
260    time_t clientSentPieceDataAt;
261    time_t peerSentPieceDataAt;
262
263    unsigned int peerSentBitfield         : 1;
264    unsigned int peerSupportsPex          : 1;
265    unsigned int clientSentLtepHandshake  : 1;
266    unsigned int peerSentLtepHandshake    : 1;
267    unsigned int sendingBlock             : 1;
268   
269    tr_bitfield * clientAllowedPieces;
270    tr_bitfield * peerAllowedPieces;
271   
272    tr_bitfield * clientSuggestedPieces;
273   
274    uint8_t state;
275    uint8_t ut_pex_id;
276    uint16_t pexCount;
277    uint32_t maxActiveRequests;
278    uint32_t minActiveRequests;
279
280    struct tr_incoming incoming; 
281
282    tr_pex * pex;
283};
284
285/**
286***
287**/
288
289static void
290myDebug( const char * file, int line,
291         const struct tr_peermsgs * msgs,
292         const char * fmt, ... )
293{
294    FILE * fp = tr_getLog( );
295    if( fp != NULL )
296    {
297        va_list args;
298        char timestr[64];
299        struct evbuffer * buf = evbuffer_new( );
300        char * myfile = tr_strdup( file );
301
302        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
303                             tr_getLogTimeStr( timestr, sizeof(timestr) ),
304                             msgs->torrent->info.name,
305                             tr_peerIoGetAddrStr( msgs->io ),
306                             msgs->info->client );
307        va_start( args, fmt );
308        evbuffer_add_vprintf( buf, fmt, args );
309        va_end( args );
310        evbuffer_add_printf( buf, " (%s:%d)\n", basename(myfile), line );
311        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
312
313        tr_free( myfile );
314        evbuffer_free( buf );
315    }
316}
317
318#define dbgmsg(msgs, fmt...) myDebug(__FILE__, __LINE__, msgs, ##fmt )
319
320/**
321***
322**/
323
324static void
325protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
326{
327    tr_peerIo * io = msgs->io;
328    struct evbuffer * out = msgs->outMessages;
329
330    dbgmsg( msgs, "requesting %u:%u->%u", req->index, req->offset, req->length );
331    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
332    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
333    tr_peerIoWriteUint32( io, out, req->index );
334    tr_peerIoWriteUint32( io, out, req->offset );
335    tr_peerIoWriteUint32( io, out, req->length );
336}
337
338static void
339protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
340{
341    tr_peerIo * io = msgs->io;
342    struct evbuffer * out = msgs->outMessages;
343
344    dbgmsg( msgs, "cancelling %u:%u->%u", req->index, req->offset, req->length );
345    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
346    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
347    tr_peerIoWriteUint32( io, out, req->index );
348    tr_peerIoWriteUint32( io, out, req->offset );
349    tr_peerIoWriteUint32( io, out, req->length );
350}
351
352static void
353protocolSendHave( tr_peermsgs * msgs, uint32_t index )
354{
355    tr_peerIo * io = msgs->io;
356    struct evbuffer * out = msgs->outMessages;
357
358    dbgmsg( msgs, "sending Have %u", index );
359    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
360    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
361    tr_peerIoWriteUint32( io, out, index );
362}
363
364static void
365protocolSendChoke( tr_peermsgs * msgs, int choke )
366{
367    tr_peerIo * io = msgs->io;
368    struct evbuffer * out = msgs->outMessages;
369
370    dbgmsg( msgs, "sending %s", (choke ? "Choke" : "Unchoke") );
371    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) );
372    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
373}
374
375/**
376***  EVENTS
377**/
378
379static const tr_peermsgs_event blankEvent = { 0, 0, 0, 0, 0.0f, 0 };
380
381static void
382publish( tr_peermsgs * msgs, tr_peermsgs_event * e )
383{
384    tr_publisherPublish( msgs->publisher, msgs->info, e );
385}
386
387static void
388fireError( tr_peermsgs * msgs, tr_errno err )
389{
390    tr_peermsgs_event e = blankEvent;
391    e.eventType = TR_PEERMSG_ERROR;
392    e.err = err;
393    publish( msgs, &e );
394}
395
396static void
397fireNeedReq( tr_peermsgs * msgs )
398{
399    tr_peermsgs_event e = blankEvent;
400    e.eventType = TR_PEERMSG_NEED_REQ;
401    publish( msgs, &e );
402}
403
404static void
405firePeerProgress( tr_peermsgs * msgs )
406{
407    tr_peermsgs_event e = blankEvent;
408    e.eventType = TR_PEERMSG_PEER_PROGRESS;
409    e.progress = msgs->info->progress;
410    publish( msgs, &e );
411}
412
413static void
414fireClientHave( tr_peermsgs * msgs, uint32_t pieceIndex )
415{
416    tr_peermsgs_event e = blankEvent;
417    e.eventType = TR_PEERMSG_CLIENT_HAVE;
418    e.pieceIndex = pieceIndex;
419    publish( msgs, &e );
420}
421
422static void
423fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
424{
425    tr_peermsgs_event e = blankEvent;
426    e.eventType = TR_PEERMSG_CLIENT_BLOCK;
427    e.pieceIndex = req->index;
428    e.offset = req->offset;
429    e.length = req->length;
430    publish( msgs, &e );
431}
432
433static void
434firePieceData( tr_peermsgs * msgs )
435{
436    tr_peermsgs_event e = blankEvent;
437    e.eventType = TR_PEERMSG_PIECE_DATA;
438    publish( msgs, &e );
439}
440
441static void
442fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
443{
444    tr_peermsgs_event e = blankEvent;
445    e.eventType = TR_PEERMSG_CANCEL;
446    e.pieceIndex = req->index;
447    e.offset = req->offset;
448    e.length = req->length;
449    publish( msgs, &e );
450}
451
452/**
453***  INTEREST
454**/
455
456static int
457isPieceInteresting( const tr_peermsgs   * peer,
458                    int                   piece )
459{
460    const tr_torrent * torrent = peer->torrent;
461    if( torrent->info.pieces[piece].dnd ) /* we don't want it */
462        return FALSE;
463    if( tr_cpPieceIsComplete( torrent->completion, piece ) ) /* we have it */
464        return FALSE;
465    if( !tr_bitfieldHas( peer->info->have, piece ) ) /* peer doesn't have it */
466        return FALSE;
467    if( tr_bitfieldHas( peer->info->banned, piece ) ) /* peer is banned */
468        return FALSE;
469    return TRUE;
470}
471
472/* "interested" means we'll ask for piece data if they unchoke us */
473static int
474isPeerInteresting( const tr_peermsgs * msgs )
475{
476    int i;
477    const tr_torrent * torrent;
478    const tr_bitfield * bitfield;
479    const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
480
481    if( clientIsSeed )
482        return FALSE;
483
484    torrent = msgs->torrent;
485    bitfield = tr_cpPieceBitfield( torrent->completion );
486
487    if( !msgs->info->have )
488        return TRUE;
489
490    assert( bitfield->len == msgs->info->have->len );
491    for( i=0; i<torrent->info.pieceCount; ++i )
492        if( isPieceInteresting( msgs, i ) )
493            return TRUE;
494
495    return FALSE;
496}
497
498static void
499sendInterest( tr_peermsgs * msgs, int weAreInterested )
500{
501    assert( msgs != NULL );
502    assert( weAreInterested==0 || weAreInterested==1 );
503
504    msgs->info->clientIsInterested = weAreInterested;
505    dbgmsg( msgs, "Sending %s",
506            weAreInterested ? "Interested" : "Not Interested");
507
508    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
509    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages,
510                   weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
511}
512
513static void
514updateInterest( tr_peermsgs * msgs )
515{
516    const int i = isPeerInteresting( msgs );
517    if( i != msgs->info->clientIsInterested )
518        sendInterest( msgs, i );
519    if( i )
520        fireNeedReq( msgs );
521}
522
523#define MIN_CHOKE_PERIOD_SEC 10
524
525static void
526cancelAllRequestsToClientExceptFast( tr_peermsgs * msgs )
527{
528    reqListClear( &msgs->peerAskedFor );
529}
530
531void
532tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
533{
534    const time_t fibrillationTime = time(NULL) - MIN_CHOKE_PERIOD_SEC;
535
536    assert( msgs != NULL );
537    assert( msgs->info != NULL );
538    assert( choke==0 || choke==1 );
539
540    if( msgs->info->chokeChangedAt > fibrillationTime )
541    {
542        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
543    }
544    else if( msgs->info->peerIsChoked != choke )
545    {
546        msgs->info->peerIsChoked = choke;
547        if( choke )
548            cancelAllRequestsToClientExceptFast( msgs );
549        protocolSendChoke( msgs, choke );
550        msgs->info->chokeChangedAt = time( NULL );
551    }
552}
553
554/**
555***
556**/
557
558void
559tr_peerMsgsHave( tr_peermsgs * msgs,
560                 uint32_t      index )
561{
562    protocolSendHave( msgs, index );
563
564    /* since we have more pieces now, we might not be interested in this peer */
565    updateInterest( msgs );
566}
567#if 0
568static void
569sendFastSuggest( tr_peermsgs * msgs,
570                 uint32_t      pieceIndex )
571{
572    assert( msgs != NULL );
573   
574    if( tr_peerIoSupportsFEXT( msgs->io ) )
575    {
576        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
577        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
578        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
579    }
580}
581#endif
582static void
583sendFastHave( tr_peermsgs * msgs, int all )
584{
585    assert( msgs != NULL );
586   
587    if( tr_peerIoSupportsFEXT( msgs->io ) )
588    {
589        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
590        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, ( all ? BT_HAVE_ALL
591                                                                : BT_HAVE_NONE ) );
592        updateInterest( msgs );
593    }
594}
595
596static void
597sendFastReject( tr_peermsgs * msgs,
598                uint32_t      pieceIndex,
599                uint32_t      offset,
600                uint32_t      length )
601{
602    assert( msgs != NULL );
603
604    if( tr_peerIoSupportsFEXT( msgs->io ) )
605    {
606        const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
607        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
608        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_REJECT );
609        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
610        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
611        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
612    }
613}
614
615static tr_bitfield*
616getPeerAllowedPieces( tr_peermsgs * msgs )
617{
618    if( !msgs->peerAllowedPieces && tr_peerIoSupportsFEXT( msgs->io ) )
619    {
620        msgs->peerAllowedPieces = tr_peerMgrGenerateAllowedSet(
621            MAX_FAST_ALLOWED_COUNT,
622            msgs->torrent->info.pieceCount,
623            msgs->torrent->info.hash,
624            tr_peerIoGetAddress( msgs->io, NULL ) );
625    }
626
627    return msgs->peerAllowedPieces;
628}
629
630static void
631sendFastAllowed( tr_peermsgs * msgs,
632                 uint32_t      pieceIndex)
633{
634    assert( msgs != NULL );
635   
636    if( tr_peerIoSupportsFEXT( msgs->io ) )
637    {
638        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
639        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_ALLOWED_FAST );
640        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
641    }
642}
643
644static void
645sendFastAllowedSet( tr_peermsgs * msgs )
646{
647    int i = 0;
648
649    while (i <= msgs->torrent->info.pieceCount )
650    {
651        if ( tr_bitfieldHas( getPeerAllowedPieces( msgs ), i) )
652            sendFastAllowed( msgs, i );
653        i++;
654    }
655}
656
657static void
658maybeSendFastAllowedSet( tr_peermsgs * msgs )
659{
660    if( tr_bitfieldCountTrueBits( msgs->info->have ) <= MAX_FAST_ALLOWED_THRESHOLD )
661        sendFastAllowedSet( msgs );
662}
663
664
665/**
666***
667**/
668
669static int
670reqIsValid( const tr_peermsgs   * msgs,
671            uint32_t              index,
672            uint32_t              offset,
673            uint32_t              length )
674{
675    const tr_torrent * tor = msgs->torrent;
676
677    if( index >= (uint32_t) tor->info.pieceCount )
678        return FALSE;
679    if ( (int)offset >= tr_torPieceCountBytes( tor, (int)index ) )
680        return FALSE;
681    if( length > MAX_REQUEST_BYTE_COUNT )
682        return FALSE;
683    if( tr_pieceOffset( tor, index, offset, length ) > tor->info.totalSize )
684        return FALSE;
685
686    return TRUE;
687}
688
689static int
690requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
691{
692    return reqIsValid( msgs, req->index, req->offset, req->length );
693}
694
695static void
696expireOldRequests( tr_peermsgs * msgs )
697{
698    int i;
699    const time_t now = time( NULL );
700    const time_t queued_cutoff = now - QUEUED_REQUEST_TTL_SECS;
701    const time_t sent_cutoff   = now - SENT_REQUEST_TTL_SECS;
702    struct request_list pruned = REQUEST_LIST_INIT;
703
704    reqListPrune( &msgs->clientWillAskFor, &pruned, queued_cutoff );
705    reqListPrune( &msgs->clientAskedFor, &pruned, sent_cutoff );
706
707    /* expire the old requests */
708    for( i=0; i<pruned.count; ++i ) {
709        const struct peer_request * req = &pruned.requests[i];
710        tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
711    }
712
713    /* cleanup */
714    reqListClear( &pruned );
715}
716
717static void
718pumpRequestQueue( tr_peermsgs * msgs )
719{
720    struct peer_request req;
721    const int max = msgs->maxActiveRequests;
722    const int min = msgs->minActiveRequests;
723    const time_t now = time( NULL );
724    int count = msgs->clientAskedFor.count;
725    int sent = 0;
726
727    if( count > min )
728        return;
729    if( msgs->info->clientIsChoked )
730        return;
731
732    while( ( count < max ) && !reqListPop( &msgs->clientWillAskFor, &req ) )
733    {
734        assert( requestIsValid( msgs, &req ) );
735        assert( tr_bitfieldHas( msgs->info->have, req.index ) );
736
737        protocolSendRequest( msgs, &req );
738        req.time_requested = msgs->lastReqAddedAt = now;
739        reqListAppend( &msgs->clientAskedFor, &req );
740
741        ++count;
742        ++sent;
743    }
744
745    if( sent )
746        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
747                sent,
748                msgs->clientAskedFor.count,
749                msgs->clientWillAskFor.count );
750
751    if( count < max )
752        fireNeedReq( msgs );
753}
754
755static int
756pulse( void * vmsgs );
757
758int
759tr_peerMsgsAddRequest( tr_peermsgs * msgs,
760                       uint32_t      index, 
761                       uint32_t      offset, 
762                       uint32_t      length )
763{
764    const int req_max = msgs->maxActiveRequests;
765    struct peer_request req;
766
767    assert( msgs != NULL );
768    assert( msgs->torrent != NULL );
769    assert( reqIsValid( msgs, index, offset, length ) );
770
771    /**
772    ***  Reasons to decline the request
773    **/
774
775    /* don't send requests to choked clients */
776    if( msgs->info->clientIsChoked ) {
777        dbgmsg( msgs, "declining request because they're choking us" );
778        return TR_ADDREQ_CLIENT_CHOKED;
779    }
780
781    /* peer doesn't have this piece */
782    if( !tr_bitfieldHas( msgs->info->have, index ) )
783        return TR_ADDREQ_MISSING;
784
785    /* peer's queue is full */
786    if( msgs->clientWillAskFor.count >= req_max ) {
787        dbgmsg( msgs, "declining request because we're full" );
788        return TR_ADDREQ_FULL;
789    }
790
791    /* have we already asked for this piece? */
792    req.index = index;
793    req.offset = offset;
794    req.length = length;
795    if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
796        dbgmsg( msgs, "declining because it's a duplicate" );
797        return TR_ADDREQ_DUPLICATE;
798    }
799    if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
800        dbgmsg( msgs, "declining because it's a duplicate" );
801        return TR_ADDREQ_DUPLICATE;
802    }
803
804    /**
805    ***  Accept this request
806    **/
807
808    dbgmsg( msgs, "added req for piece %d, offset %d", (int)index, (int)offset );
809    req.time_requested = time( NULL );
810    reqListAppend( &msgs->clientWillAskFor, &req );
811    return TR_ADDREQ_OK;
812}
813
814static void
815cancelAllRequestsToPeer( tr_peermsgs * msgs )
816{
817    int i;
818    struct request_list a = msgs->clientWillAskFor;
819    struct request_list b = msgs->clientAskedFor;
820
821    msgs->clientAskedFor = REQUEST_LIST_INIT;
822    msgs->clientWillAskFor = REQUEST_LIST_INIT;
823
824    for( i=0; i<a.count; ++i )
825        fireCancelledReq( msgs, &a.requests[i] );
826
827    for( i=0; i<b.count; ++i ) {
828        fireCancelledReq( msgs, &b.requests[i] );
829        protocolSendCancel( msgs, &b.requests[i] );
830    }
831
832    reqListClear( &a );
833    reqListClear( &b );
834}
835
836void
837tr_peerMsgsCancel( tr_peermsgs * msgs,
838                   uint32_t      pieceIndex,
839                   uint32_t      offset,
840                   uint32_t      length )
841{
842    struct peer_request req;
843
844    assert( msgs != NULL );
845    assert( length > 0 );
846
847    /* have we asked the peer for this piece? */
848    req.index = pieceIndex;
849    req.offset = offset;
850    req.length = length;
851
852    /* if it's only in the queue and hasn't been sent yet, free it */
853    if( !reqListRemove( &msgs->clientWillAskFor, &req ) )
854        fireCancelledReq( msgs, &req );
855
856    /* if it's already been sent, send a cancel message too */
857    if( !reqListRemove( &msgs->clientAskedFor, &req ) ) {
858        protocolSendCancel( msgs, &req );
859        fireCancelledReq( msgs, &req );
860    }
861}
862
863/**
864***
865**/
866
867static void
868sendLtepHandshake( tr_peermsgs * msgs )
869{
870    tr_benc val, *m;
871    char * buf;
872    int len;
873    int pex;
874    const char * v = TR_NAME " " USERAGENT_PREFIX;
875    const int port = tr_getPublicPort( msgs->handle );
876    struct evbuffer * outbuf;
877
878    if( msgs->clientSentLtepHandshake )
879        return;
880
881    outbuf = evbuffer_new( );
882    dbgmsg( msgs, "sending an ltep handshake" );
883    msgs->clientSentLtepHandshake = 1;
884
885    /* decide if we want to advertise pex support */
886    if( !tr_torrentAllowsPex( msgs->torrent ) )
887        pex = 0;
888    else if( msgs->peerSentLtepHandshake )
889        pex = msgs->peerSupportsPex ? 1 : 0;
890    else
891        pex = 1;
892
893    tr_bencInit( &val, TYPE_DICT );
894    tr_bencDictReserve( &val, 4 );
895    tr_bencInitInt( tr_bencDictAdd( &val, "e" ), 1 );
896    m  = tr_bencDictAdd( &val, "m" );
897    tr_bencInit( m, TYPE_DICT );
898    if( pex ) {
899        tr_bencDictReserve( m, 1 );
900        tr_bencInitInt( tr_bencDictAdd( m, "ut_pex" ), TR_LTEP_PEX );
901    }
902    if( port > 0 )
903        tr_bencInitInt( tr_bencDictAdd( &val, "p" ), port );
904    tr_bencInitStr( tr_bencDictAdd( &val, "v" ), v, 0, 1 );
905    buf = tr_bencSave( &val, &len );
906
907    tr_peerIoWriteUint32( msgs->io, outbuf, 2*sizeof(uint8_t) + len );
908    tr_peerIoWriteUint8 ( msgs->io, outbuf, BT_LTEP );
909    tr_peerIoWriteUint8 ( msgs->io, outbuf, LTEP_HANDSHAKE );
910    tr_peerIoWriteBytes ( msgs->io, outbuf, buf, len );
911
912    tr_peerIoWriteBuf( msgs->io, outbuf );
913
914#if 0
915    dbgmsg( msgs, "here is the ltep handshake we sent:" );
916    tr_bencPrint( &val );
917    dbgmsg( msgs, "here is the ltep handshake we read [%s]:", tr_bencSave( &val, NULL ) );
918#endif
919
920    /* cleanup */
921    tr_bencFree( &val );
922    tr_free( buf );
923    evbuffer_free( outbuf );
924}
925
926static void
927parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
928{
929    tr_benc val, * sub;
930    uint8_t * tmp = tr_new( uint8_t, len );
931
932    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
933    msgs->peerSentLtepHandshake = 1;
934
935    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
936        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
937        tr_free( tmp );
938        return;
939    }
940
941#if 0
942    dbgmsg( msgs, "here is the ltep handshake we read:" );
943    tr_bencPrint( &val );
944    dbgmsg( msgs, "here is the ltep handshake we read [%s]:", tr_bencSave( &val, NULL ) );
945#endif
946
947    /* does the peer prefer encrypted connections? */
948    if(( sub = tr_bencDictFindType( &val, "e", TYPE_INT )))
949        msgs->info->encryption_preference = sub->val.i
950                                      ? ENCRYPTION_PREFERENCE_YES
951                                      : ENCRYPTION_PREFERENCE_NO;
952
953    /* check supported messages for utorrent pex */
954    msgs->peerSupportsPex = 0;
955    if(( sub = tr_bencDictFindType( &val, "m", TYPE_DICT ))) {
956        if(( sub = tr_bencDictFindType( sub, "ut_pex", TYPE_INT ))) {
957            msgs->ut_pex_id = (uint8_t) sub->val.i;
958            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
959            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
960        }
961    }
962
963    /* get peer's listening port */
964    if(( sub = tr_bencDictFindType( &val, "p", TYPE_INT ))) {
965        msgs->info->port = htons( (uint16_t)sub->val.i );
966        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
967    }
968
969    tr_bencFree( &val );
970    tr_free( tmp );
971}
972
973static void
974parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
975{
976    int gotval = 0;
977    uint8_t * tmp = tr_new( uint8_t, msglen );
978    tr_benc val, *sub;
979    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
980
981    if( tr_torrentAllowsPex( msgs->torrent )
982        && (( gotval = !tr_bencLoad( tmp, msglen, &val, NULL )))
983        && (( sub = tr_bencDictFindType( &val, "added", TYPE_STR ))))
984    {
985        const int n = sub->val.s.i / 6 ;
986        tr_inf( "torrent %s got %d peers from uT pex", msgs->torrent->info.name, n );
987        tr_peerMgrAddPeers( msgs->handle->peerMgr,
988                            msgs->torrent->info.hash,
989                            TR_PEER_FROM_PEX,
990                            (uint8_t*)sub->val.s.s, n );
991    }
992
993    if( gotval )
994        tr_bencFree( &val );
995    tr_free( tmp );
996}
997
998static void
999sendPex( tr_peermsgs * msgs );
1000
1001static void
1002parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1003{
1004    uint8_t ltep_msgid;
1005
1006    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
1007    msglen--;
1008
1009    if( ltep_msgid == LTEP_HANDSHAKE )
1010    {
1011        dbgmsg( msgs, "got ltep handshake" );
1012        parseLtepHandshake( msgs, msglen, inbuf );
1013        if( tr_peerIoSupportsLTEP( msgs->io ) )
1014        {
1015            sendLtepHandshake( msgs );
1016            sendPex( msgs );
1017        }
1018    }
1019    else if( ltep_msgid == TR_LTEP_PEX )
1020    {
1021        dbgmsg( msgs, "got ut pex" );
1022        msgs->peerSupportsPex = 1;
1023        parseUtPex( msgs, msglen, inbuf );
1024    }
1025    else
1026    {
1027        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1028        evbuffer_drain( inbuf, msglen );
1029    }
1030}
1031
1032static int
1033readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1034{
1035    uint32_t len;
1036
1037    if( inlen < sizeof(len) )
1038        return READ_MORE;
1039
1040    tr_peerIoReadUint32( msgs->io, inbuf, &len );
1041
1042    if( len == 0 ) /* peer sent us a keepalive message */
1043        dbgmsg( msgs, "got KeepAlive" );
1044    else {
1045        msgs->incoming.length = len;
1046        msgs->state = AWAITING_BT_ID;
1047    }
1048
1049    return READ_AGAIN;
1050}
1051
1052static int
1053readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen );
1054
1055static int
1056readBtId( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1057{
1058    uint8_t id;
1059
1060    if( inlen < sizeof(uint8_t) )
1061        return READ_MORE;
1062
1063    tr_peerIoReadUint8( msgs->io, inbuf, &id );
1064    msgs->incoming.id = id;
1065
1066    if( id==BT_PIECE )
1067    {
1068        msgs->state = AWAITING_BT_PIECE;
1069        return READ_AGAIN;
1070    }
1071    else if( msgs->incoming.length != 1 )
1072    {
1073        msgs->state = AWAITING_BT_MESSAGE;
1074        return READ_AGAIN;
1075    }
1076    else return readBtMessage( msgs, inbuf, inlen-1 );
1077}
1078
1079static void
1080updatePeerProgress( tr_peermsgs * msgs )
1081{
1082    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
1083    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
1084    updateInterest( msgs );
1085    firePeerProgress( msgs );
1086}
1087
1088static int
1089clientCanSendFastBlock( const tr_peermsgs * msgs UNUSED )
1090{
1091    /* We can't send a fast piece if a peer has more than MAX_FAST_ALLOWED_THRESHOLD pieces */
1092    if( tr_bitfieldCountTrueBits( msgs->info->have ) > MAX_FAST_ALLOWED_THRESHOLD )
1093        return FALSE;
1094   
1095    /* ...or if we don't have ourself enough pieces */
1096    if( tr_bitfieldCountTrueBits( tr_cpPieceBitfield( msgs->torrent->completion ) ) < MAX_FAST_ALLOWED_THRESHOLD )
1097        return FALSE;
1098
1099    /* Maybe a bandwidth limit ? */
1100    return TRUE;
1101}
1102
1103static void
1104peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
1105{
1106    const int reqIsValid = requestIsValid( msgs, req );
1107    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
1108    const int peerIsChoked = msgs->info->peerIsChoked;
1109    const int peerIsFast = tr_peerIoSupportsFEXT( msgs->io );
1110    const int pieceIsFast = reqIsValid && tr_bitfieldHas( getPeerAllowedPieces( msgs ), req->index );
1111    const int canSendFast = clientCanSendFastBlock( msgs );
1112
1113    if( !reqIsValid ) /* bad request */
1114    {
1115        dbgmsg( msgs, "rejecting an invalid request." );
1116        sendFastReject( msgs, req->index, req->offset, req->length );
1117    }
1118    else if( !clientHasPiece ) /* we don't have it */
1119    {
1120        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1121        sendFastReject( msgs, req->index, req->offset, req->length );
1122    }
1123    else if( peerIsChoked && !peerIsFast ) /* doesn't he know he's choked? */
1124    {
1125        tr_peerMsgsSetChoke( msgs, 1 );
1126        sendFastReject( msgs, req->index, req->offset, req->length );
1127    }
1128    else if( peerIsChoked && peerIsFast && ( !pieceIsFast || !canSendFast ) )
1129    {
1130        sendFastReject( msgs, req->index, req->offset, req->length );
1131    }
1132    else /* YAY */
1133    {
1134        if( peerIsFast && pieceIsFast )
1135            reqListAppend( &msgs->peerAskedForFast, req );
1136        else
1137            reqListAppend( &msgs->peerAskedFor, req );
1138    }
1139}
1140
1141static int
1142messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1143{
1144    switch( id )
1145    {
1146        case BT_CHOKE:
1147        case BT_UNCHOKE:
1148        case BT_INTERESTED:
1149        case BT_NOT_INTERESTED:
1150        case BT_HAVE_ALL:
1151        case BT_HAVE_NONE:
1152            return len==1;
1153
1154        case BT_HAVE:
1155        case BT_SUGGEST:
1156        case BT_ALLOWED_FAST:
1157            return len==5;
1158
1159        case BT_BITFIELD:
1160            return len == (msg->torrent->info.pieceCount+7u)/8u + 1u;
1161       
1162        case BT_REQUEST:
1163        case BT_CANCEL:
1164        case BT_REJECT:
1165            return len==13;
1166
1167        case BT_PIECE:
1168            return len>9 && len<=16393;
1169
1170        case BT_PORT:
1171            return len==3;
1172
1173        case BT_LTEP:
1174            return len >= 2;
1175
1176        default:
1177            return FALSE;
1178    }
1179}
1180
1181static int
1182clientGotBlock( tr_peermsgs * msgs, const uint8_t * block, const struct peer_request * req );
1183
1184static void
1185clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1186{
1187    tr_torrent * tor = msgs->torrent;
1188    tor->activityDate = tr_date( );
1189    tor->downloadedCur += byteCount;
1190    msgs->peerSentPieceDataAt = time( NULL );
1191    msgs->info->pieceDataActivityDate = time( NULL );
1192    msgs->info->credit += (int)(byteCount * SWIFT_REPAYMENT_RATIO);
1193    tr_rcTransferred( msgs->info->rcToClient, byteCount );
1194    tr_rcTransferred( tor->download, byteCount );
1195    tr_rcTransferred( tor->handle->download, byteCount );
1196    tr_statsAddDownloaded( msgs->handle, byteCount );
1197    firePieceData( msgs );
1198}
1199
1200static int
1201readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1202{
1203    struct peer_request * req = &msgs->incoming.blockReq;
1204    assert( EVBUFFER_LENGTH(inbuf) >= inlen );
1205    dbgmsg( msgs, "In readBtPiece" );
1206
1207    if( !req->length )
1208    {
1209        if( inlen < 8 )
1210            return READ_MORE;
1211
1212        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
1213        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
1214        req->length = msgs->incoming.length - 9;
1215        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1216        return READ_AGAIN;
1217    }
1218    else
1219    {
1220        int err;
1221
1222        /* read in another chunk of data */
1223        const size_t nLeft = req->length - EVBUFFER_LENGTH(msgs->incoming.block);
1224        size_t n = MIN( nLeft, inlen );
1225        uint8_t * buf = tr_new( uint8_t, n );
1226        assert( EVBUFFER_LENGTH(inbuf) >= n );
1227        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
1228        evbuffer_add( msgs->incoming.block, buf, n );
1229        clientGotBytes( msgs, n );
1230        tr_free( buf );
1231        dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
1232               (int)n, req->index, req->offset, req->length,
1233               (int)( req->length - EVBUFFER_LENGTH(msgs->incoming.block) ) );
1234        if( EVBUFFER_LENGTH(msgs->incoming.block) < req->length )
1235            return READ_MORE;
1236
1237        /* we've got the whole block ... process it */
1238        err = clientGotBlock( msgs, EVBUFFER_DATA(msgs->incoming.block), req );
1239
1240        /* cleanup */
1241        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH(msgs->incoming.block) );
1242        req->length = 0;
1243        msgs->state = AWAITING_BT_LENGTH;
1244        if( !err )
1245            return READ_AGAIN;
1246        else {
1247            fireError( msgs, err );
1248            return READ_DONE;
1249        }
1250    }
1251}
1252
1253static int
1254readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1255{
1256    uint32_t ui32;
1257    uint32_t msglen = msgs->incoming.length;
1258    const uint8_t id = msgs->incoming.id;
1259    const size_t startBufLen = EVBUFFER_LENGTH( inbuf );
1260
1261    --msglen; /* id length */
1262
1263    if( inlen < msglen )
1264        return READ_MORE;
1265
1266    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id, (int)msglen, (int)inlen );
1267
1268    if( !messageLengthIsCorrect( msgs, id, msglen+1 ) )
1269    {
1270        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1271        fireError( msgs, TR_ERROR );
1272        return READ_DONE;
1273    }
1274
1275    switch( id )
1276    {
1277        case BT_CHOKE:
1278            dbgmsg( msgs, "got Choke" );
1279            msgs->info->clientIsChoked = 1;
1280            cancelAllRequestsToPeer( msgs );
1281            cancelAllRequestsToClientExceptFast( msgs );
1282            break;
1283
1284        case BT_UNCHOKE:
1285            dbgmsg( msgs, "got Unchoke" );
1286            msgs->info->clientIsChoked = 0;
1287            fireNeedReq( msgs );
1288            break;
1289
1290        case BT_INTERESTED:
1291            dbgmsg( msgs, "got Interested" );
1292            msgs->info->peerIsInterested = 1;
1293            tr_peerMsgsSetChoke( msgs, 0 );
1294            break;
1295
1296        case BT_NOT_INTERESTED:
1297            dbgmsg( msgs, "got Not Interested" );
1298            msgs->info->peerIsInterested = 0;
1299            break;
1300           
1301        case BT_HAVE:
1302            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1303            dbgmsg( msgs, "got Have: %u", ui32 );
1304            tr_bitfieldAdd( msgs->info->have, ui32 );
1305            /* If this is a fast-allowed piece for this peer, mark it as normal now */
1306            if( msgs->clientAllowedPieces != NULL && tr_bitfieldHas( msgs->clientAllowedPieces, ui32 ) )
1307                tr_bitfieldRem( msgs->clientAllowedPieces, ui32 );
1308            updatePeerProgress( msgs );
1309            tr_rcTransferred( msgs->torrent->swarmspeed, msgs->torrent->info.pieceSize );
1310            break;
1311
1312        case BT_BITFIELD: {
1313            const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
1314            dbgmsg( msgs, "got a bitfield" );
1315            msgs->peerSentBitfield = 1;
1316            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
1317            updatePeerProgress( msgs );
1318            maybeSendFastAllowedSet( msgs );
1319            tr_peerMsgsSetChoke( msgs, !clientIsSeed || (msgs->info->progress<1.0) );
1320            fireNeedReq( msgs );
1321            break;
1322        }
1323
1324        case BT_REQUEST: {
1325            struct peer_request req;
1326            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1327            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1328            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1329            dbgmsg( msgs, "got Request: %u:%u->%u", req.index, req.offset, req.length );
1330            peerMadeRequest( msgs, &req );
1331            break;
1332        }
1333
1334        case BT_CANCEL: {
1335            struct peer_request req;
1336            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1337            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1338            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1339            dbgmsg( msgs, "got a Cancel %u:%u->%u", req.index, req.offset, req.length );
1340            reqListRemove( &msgs->peerAskedForFast, &req );
1341            reqListRemove( &msgs->peerAskedFor, &req );
1342            break;
1343        }
1344
1345        case BT_PIECE:
1346            assert( 0 ); /* should be handled elsewhere! */
1347            break;
1348       
1349        case BT_PORT:
1350            dbgmsg( msgs, "Got a BT_PORT" );
1351            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1352            break;
1353           
1354        case BT_SUGGEST: {
1355            const tr_bitfield * bt = tr_cpPieceBitfield( msgs->torrent->completion );
1356            dbgmsg( msgs, "Got a BT_SUGGEST" );
1357            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1358            if( tr_bitfieldHas( bt, ui32 ) )
1359                tr_bitfieldAdd( msgs->clientSuggestedPieces, ui32 );
1360            break;
1361        }
1362           
1363        case BT_HAVE_ALL:
1364            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1365            tr_bitfieldAddRange( msgs->info->have, 0, msgs->torrent->info.pieceCount );
1366            updatePeerProgress( msgs );
1367            maybeSendFastAllowedSet( msgs );
1368            break;
1369       
1370       
1371        case BT_HAVE_NONE:
1372            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1373            tr_bitfieldClear( msgs->info->have );
1374            updatePeerProgress( msgs );
1375            maybeSendFastAllowedSet( msgs );
1376            break;
1377       
1378        case BT_REJECT: {
1379            struct peer_request req;
1380            dbgmsg( msgs, "Got a BT_REJECT" );
1381            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1382            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1383            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1384            reqListRemove( &msgs->clientAskedFor, &req );
1385            break;
1386        }
1387
1388        case BT_ALLOWED_FAST: {
1389            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1390            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1391            tr_bitfieldAdd( msgs->clientAllowedPieces, ui32 );
1392            break;
1393        }
1394
1395        case BT_LTEP:
1396            dbgmsg( msgs, "Got a BT_LTEP" );
1397            parseLtep( msgs, msglen, inbuf );
1398            break;
1399
1400        default:
1401            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1402            tr_peerIoDrain( msgs->io, inbuf, msglen );
1403            break;
1404    }
1405
1406    assert( msglen + 1 == msgs->incoming.length );
1407    assert( EVBUFFER_LENGTH(inbuf) == startBufLen - msglen );
1408
1409    msgs->state = AWAITING_BT_LENGTH;
1410    return READ_AGAIN;
1411}
1412
1413static void
1414peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1415{
1416    tr_torrent * tor = msgs->torrent;
1417    tor->activityDate = tr_date( );
1418    tor->uploadedCur += byteCount;
1419    msgs->clientSentPieceDataAt = time( NULL );
1420    msgs->info->pieceDataActivityDate = time( NULL );
1421    msgs->info->credit -= byteCount;
1422    tr_rcTransferred( msgs->info->rcToPeer, byteCount );
1423    tr_rcTransferred( tor->upload, byteCount );
1424    tr_rcTransferred( tor->handle->upload, byteCount );
1425    tr_statsAddUploaded( msgs->handle, byteCount );
1426    firePieceData( msgs );
1427}
1428
1429static size_t
1430getDownloadMax( const tr_peermsgs * msgs )
1431{
1432    static const size_t maxval = ~0;
1433    const tr_torrent * tor = msgs->torrent;
1434
1435    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1436        return tor->handle->useDownloadLimit
1437            ? tr_rcBytesLeft( tor->handle->download ) : maxval;
1438
1439    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
1440        return tr_rcBytesLeft( tor->download );
1441
1442    return maxval;
1443}
1444
1445static void
1446reassignBytesToCorrupt( tr_peermsgs * msgs, uint32_t byteCount )
1447{
1448    tr_torrent * tor = msgs->torrent;
1449
1450    /* increment the `corrupt' field */
1451    tor->corruptCur += byteCount;
1452
1453    /* decrement the `downloaded' field */
1454    if( tor->downloadedCur >= byteCount )
1455        tor->downloadedCur -= byteCount;
1456    else
1457        tor->downloadedCur = 0;
1458}
1459
1460
1461static void
1462gotBadPiece( tr_peermsgs * msgs, uint32_t pieceIndex )
1463{
1464    const uint32_t byteCount =
1465        tr_torPieceCountBytes( msgs->torrent, (int)pieceIndex );
1466    reassignBytesToCorrupt( msgs, byteCount );
1467}
1468
1469static void
1470clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1471{
1472    reassignBytesToCorrupt( msgs, req->length );
1473}
1474
1475static void
1476addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1477{
1478    if( !msgs->info->blame )
1479         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1480    tr_bitfieldAdd( msgs->info->blame, index );
1481}
1482
1483static tr_errno
1484clientGotBlock( tr_peermsgs                * msgs,
1485                const uint8_t              * data,
1486                const struct peer_request  * req )
1487{
1488    int err;
1489    tr_torrent * tor = msgs->torrent;
1490    const int block = _tr_block( tor, req->index, req->offset );
1491
1492    assert( msgs != NULL );
1493    assert( req != NULL );
1494
1495    if( req->length != (uint32_t)tr_torBlockCountBytes( msgs->torrent, block ) )
1496    {
1497        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1498                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1499        return TR_ERROR;
1500    }
1501
1502    /* save the block */
1503    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1504
1505    /**
1506    *** Remove the block from our `we asked for this' list
1507    **/
1508
1509    if( reqListRemove( &msgs->clientAskedFor, req ) )
1510    {
1511        clientGotUnwantedBlock( msgs, req );
1512        dbgmsg( msgs, "we didn't ask for this message..." );
1513        return 0;
1514    }
1515
1516    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1517                  msgs->clientAskedFor.count );
1518
1519    /**
1520    *** Error checks
1521    **/
1522
1523    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1524        dbgmsg( msgs, "we have this block already..." );
1525        clientGotUnwantedBlock( msgs, req );
1526        return 0;
1527    }
1528
1529    /**
1530    ***  Save the block
1531    **/
1532
1533    msgs->info->peerSentPieceDataAt = time( NULL );
1534    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1535        return err;
1536
1537    tr_cpBlockAdd( tor->completion, block );
1538
1539    addPeerToBlamefield( msgs, req->index );
1540
1541    fireGotBlock( msgs, req );
1542
1543    /**
1544    ***  Handle if this was the last block in the piece
1545    **/
1546
1547    if( tr_cpPieceIsComplete( tor->completion, req->index ) )
1548    {
1549        const tr_errno err = tr_ioTestPiece( tor, req->index );
1550
1551        tr_torrentSetHasPiece( tor, req->index, !err );
1552        tr_torrentSetPieceChecked( tor, req->index, TRUE );
1553        tr_peerMgrSetBlame( tor->handle->peerMgr, tor->info.hash, req->index, !err );
1554
1555        if( !err )
1556            fireClientHave( msgs, req->index );
1557        else
1558            gotBadPiece( msgs, req->index );
1559    }
1560
1561    return 0;
1562}
1563
1564static void
1565didWrite( struct bufferevent * evin UNUSED, void * vmsgs )
1566{
1567    pulse( vmsgs );
1568}
1569
1570static ReadState
1571canRead( struct bufferevent * evin, void * vmsgs )
1572{
1573    ReadState ret;
1574    tr_peermsgs * msgs = vmsgs;
1575    struct evbuffer * in = EVBUFFER_INPUT ( evin );
1576    const size_t inlen = EVBUFFER_LENGTH( in );
1577
1578    if( !inlen )
1579    {
1580        ret = READ_DONE;
1581    }
1582    else if( msgs->state == AWAITING_BT_PIECE )
1583    {
1584        const size_t downloadMax = getDownloadMax( msgs );
1585        const size_t n = MIN( inlen, downloadMax );
1586        ret = n ? readBtPiece( msgs, in, n ) : READ_DONE;
1587    }
1588    else switch( msgs->state )
1589    {
1590        case AWAITING_BT_LENGTH:  ret = readBtLength ( msgs, in, inlen ); break;
1591        case AWAITING_BT_ID:      ret = readBtId     ( msgs, in, inlen ); break;
1592        case AWAITING_BT_MESSAGE: ret = readBtMessage( msgs, in, inlen ); break;
1593        default:                  assert( 0 );
1594    }
1595
1596    return ret;
1597}
1598
1599static void
1600sendKeepalive( tr_peermsgs * msgs )
1601{
1602    dbgmsg( msgs, "sending a keepalive message" );
1603    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1604}
1605
1606/**
1607***
1608**/
1609
1610static int
1611isSwiftEnabled( const tr_peermsgs * msgs )
1612{
1613    /* rationale: SWIFT is good for getting rid of deadbeats, but most
1614     * private trackers have ratios where you _want_ to feed deadbeats
1615     * as much as possible.  So we disable SWIFT on private torrents */
1616    return SWIFT_ENABLED
1617        && !tr_torrentIsSeed( msgs->torrent )
1618        && !tr_torrentIsPrivate( msgs->torrent );
1619}
1620
1621static size_t
1622getUploadMax( const tr_peermsgs * msgs )
1623{
1624    static const size_t maxval = ~0;
1625    const tr_torrent * tor = msgs->torrent;
1626    const int useSwift = isSwiftEnabled( msgs );
1627    const size_t swiftLeft = msgs->info->credit;
1628    size_t speedLeft;
1629    size_t bufLeft;
1630    size_t ret;
1631
1632    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1633        speedLeft = tor->handle->useUploadLimit ? tr_rcBytesLeft( tor->handle->upload ) : maxval;
1634    else if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
1635        speedLeft = tr_rcBytesLeft( tor->upload );
1636    else
1637        speedLeft = ~0;
1638
1639    bufLeft = MAX_OUTBUF_SIZE - tr_peerIoWriteBytesWaiting( msgs->io );
1640    ret = MIN( speedLeft, bufLeft );
1641    if( useSwift)
1642        ret = MIN( ret, swiftLeft );
1643    return ret;
1644}
1645
1646static int
1647ratePulse( void * vmsgs )
1648{
1649    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1650    msgs->info->rateToClient = tr_rcRate( msgs->info->rcToClient );
1651    msgs->info->rateToPeer = tr_rcRate( msgs->info->rcToPeer );
1652    msgs->maxActiveRequests = MIN( 4 + (int)(msgs->info->rateToClient/4), MAX_QUEUE_SIZE );
1653    msgs->minActiveRequests = msgs->maxActiveRequests / 3;
1654    return TRUE;
1655}
1656
1657static tr_errno
1658popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
1659{
1660    if( !reqListPop( &msgs->peerAskedForFast, setme ) )
1661        return 0;
1662    if( !reqListPop( &msgs->peerAskedFor, setme ) )
1663        return 0;
1664
1665    return TR_ERROR;
1666}
1667
1668static int
1669pulse( void * vmsgs )
1670{
1671    const time_t now = time( NULL );
1672    tr_peermsgs * msgs = vmsgs;
1673
1674    tr_peerIoTryRead( msgs->io );
1675    pumpRequestQueue( msgs );
1676    expireOldRequests( msgs );
1677
1678    if( msgs->sendingBlock )
1679    {
1680        const size_t uploadMax = getUploadMax( msgs );
1681        size_t len = EVBUFFER_LENGTH( msgs->outBlock );
1682        const size_t outlen = MIN( len, uploadMax );
1683
1684        assert( len );
1685
1686        if( outlen )
1687        {
1688            tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen );
1689            evbuffer_drain( msgs->outBlock, outlen );
1690            peerGotBytes( msgs, outlen );
1691
1692            len -= outlen;
1693            msgs->clientSentAnythingAt = now;
1694            msgs->sendingBlock = len!=0;
1695
1696            dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen, (int)len );
1697        }
1698        else dbgmsg( msgs, "stalled writing block... uploadMax %lu, outlen %lu", uploadMax, outlen );
1699    }
1700
1701    if( !msgs->sendingBlock )
1702    {
1703        struct peer_request req;
1704
1705        if(( EVBUFFER_LENGTH( msgs->outMessages ) ))
1706        {
1707            dbgmsg( msgs, "flushing outMessages..." );
1708            tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1709            msgs->clientSentAnythingAt = now;
1710        }
1711        else if( !EVBUFFER_LENGTH( msgs->outBlock )
1712            && !popNextRequest( msgs, &req )
1713            && requestIsValid( msgs, &req )
1714            && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) )
1715        {
1716            uint8_t * buf = tr_new( uint8_t, req.length );
1717
1718            if( !tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf ) )
1719            {
1720                tr_peerIo * io = msgs->io;
1721                struct evbuffer * out = msgs->outBlock;
1722
1723                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1724                tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 2*sizeof(uint32_t) + req.length );
1725                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1726                tr_peerIoWriteUint32( io, out, req.index );
1727                tr_peerIoWriteUint32( io, out, req.offset );
1728                tr_peerIoWriteBytes ( io, out, buf, req.length );
1729                msgs->sendingBlock = 1;
1730            }
1731
1732            tr_free( buf );
1733        }
1734        else if( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
1735        {
1736            sendKeepalive( msgs );
1737        }
1738    }
1739
1740    return TRUE; /* loop forever */
1741}
1742
1743static void
1744gotError( struct bufferevent * evbuf UNUSED, short what, void * vmsgs )
1745{
1746    if( what & EVBUFFER_TIMEOUT )
1747        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1748    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1749        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1750                what, errno, tr_strerror(errno) );
1751    fireError( vmsgs, TR_ERROR );
1752}
1753
1754static void
1755sendBitfield( tr_peermsgs * msgs )
1756{
1757    const tr_bitfield * bitfield = tr_cpPieceBitfield( msgs->torrent->completion );
1758    struct evbuffer * out = msgs->outMessages;
1759
1760    dbgmsg( msgs, "sending peer a bitfield message" );
1761    tr_peerIoWriteUint32( msgs->io, out, sizeof(uint8_t) + bitfield->len );
1762    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1763    tr_peerIoWriteBytes ( msgs->io, out, bitfield->bits, bitfield->len );
1764}
1765
1766/**
1767***
1768**/
1769
1770/* some peers give us error messages if we send
1771   more than this many peers in a single pex message */
1772#define MAX_PEX_DIFFS 200
1773
1774typedef struct
1775{
1776    tr_pex * added;
1777    tr_pex * dropped;
1778    tr_pex * elements;
1779    int addedCount;
1780    int droppedCount;
1781    int elementCount;
1782    int diffCount;
1783}
1784PexDiffs;
1785
1786static void
1787pexAddedCb( void * vpex, void * userData )
1788{
1789    PexDiffs * diffs = (PexDiffs *) userData;
1790    tr_pex * pex = (tr_pex *) vpex;
1791    if( diffs->diffCount < MAX_PEX_DIFFS )
1792    {
1793        diffs->diffCount++;
1794        diffs->added[diffs->addedCount++] = *pex;
1795        diffs->elements[diffs->elementCount++] = *pex;
1796    }
1797}
1798
1799static void
1800pexRemovedCb( void * vpex, void * userData )
1801{
1802    PexDiffs * diffs = (PexDiffs *) userData;
1803    tr_pex * pex = (tr_pex *) vpex;
1804    if( diffs->diffCount < MAX_PEX_DIFFS )
1805    {
1806        diffs->diffCount++;
1807        diffs->dropped[diffs->droppedCount++] = *pex;
1808    }
1809}
1810
1811static void
1812pexElementCb( void * vpex, void * userData )
1813{
1814    PexDiffs * diffs = (PexDiffs *) userData;
1815    tr_pex * pex = (tr_pex *) vpex;
1816    if( diffs->diffCount < MAX_PEX_DIFFS )
1817    {
1818        diffs->diffCount++;
1819        diffs->elements[diffs->elementCount++] = *pex;
1820    }
1821}
1822
1823static void
1824sendPex( tr_peermsgs * msgs )
1825{
1826    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
1827    {
1828        int i;
1829        tr_pex * newPex = NULL;
1830        const int newCount = tr_peerMgrGetPeers( msgs->handle->peerMgr, msgs->torrent->info.hash, &newPex );
1831        PexDiffs diffs;
1832        tr_benc val, *added, *dropped, *flags;
1833        uint8_t *tmp, *walk;
1834        char * benc;
1835        int bencLen;
1836
1837        /* build the diffs */
1838        diffs.added = tr_new( tr_pex, newCount );
1839        diffs.addedCount = 0;
1840        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1841        diffs.droppedCount = 0;
1842        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1843        diffs.elementCount = 0;
1844        diffs.diffCount = 0;
1845        tr_set_compare( msgs->pex, msgs->pexCount,
1846                        newPex, newCount,
1847                        tr_pexCompare, sizeof(tr_pex),
1848                        pexRemovedCb, pexAddedCb, pexElementCb, &diffs );
1849        dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1850
1851        /* update peer */
1852        tr_free( msgs->pex );
1853        msgs->pex = diffs.elements;
1854        msgs->pexCount = diffs.elementCount;
1855
1856        /* build the pex payload */
1857        tr_bencInit( &val, TYPE_DICT );
1858        tr_bencDictReserve( &val, 3 );
1859
1860        /* "added" */
1861        added = tr_bencDictAdd( &val, "added" );
1862        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1863        for( i=0; i<diffs.addedCount; ++i ) {
1864            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1865            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1866        }
1867        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1868        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
1869
1870        /* "added.f" */
1871        flags = tr_bencDictAdd( &val, "added.f" );
1872        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1873        for( i=0; i<diffs.addedCount; ++i )
1874            *walk++ = diffs.added[i].flags;
1875        assert( ( walk - tmp ) == diffs.addedCount );
1876        tr_bencInitStr( flags, tmp, walk-tmp, FALSE );
1877
1878        /* "dropped" */
1879        dropped = tr_bencDictAdd( &val, "dropped" );
1880        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1881        for( i=0; i<diffs.droppedCount; ++i ) {
1882            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1883            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1884        }
1885        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1886        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
1887
1888        /* write the pex message */
1889        benc = tr_bencSave( &val, &bencLen );
1890        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 2*sizeof(uint8_t) + bencLen );
1891        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_LTEP );
1892        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, msgs->ut_pex_id );
1893        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, benc, bencLen );
1894
1895        /* cleanup */
1896        tr_free( benc );
1897        tr_bencFree( &val );
1898        tr_free( diffs.added );
1899        tr_free( diffs.dropped );
1900        tr_free( newPex );
1901
1902        msgs->clientSentPexAt = time( NULL );
1903    }
1904}
1905
1906static int
1907pexPulse( void * vpeer )
1908{
1909    sendPex( vpeer );
1910    return TRUE;
1911}
1912
1913/**
1914***
1915**/
1916
1917tr_peermsgs*
1918tr_peerMsgsNew( struct tr_torrent * torrent,
1919                struct tr_peer    * info,
1920                tr_delivery_func    func,
1921                void              * userData,
1922                tr_publisher_tag  * setme )
1923{
1924    tr_peermsgs * m;
1925
1926    assert( info != NULL );
1927    assert( info->io != NULL );
1928
1929    m = tr_new0( tr_peermsgs, 1 );
1930    m->publisher = tr_publisherNew( );
1931    m->info = info;
1932    m->handle = torrent->handle;
1933    m->torrent = torrent;
1934    m->io = info->io;
1935    m->info->clientIsChoked = 1;
1936    m->info->peerIsChoked = 1;
1937    m->info->clientIsInterested = 0;
1938    m->info->peerIsInterested = 0;
1939    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1940    m->state = AWAITING_BT_LENGTH;
1941    m->pulseTimer = tr_timerNew( m->handle, pulse, m, PEER_PULSE_INTERVAL );
1942    m->rateTimer = tr_timerNew( m->handle, ratePulse, m, RATE_PULSE_INTERVAL );
1943    m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL );
1944    m->outMessages = evbuffer_new( );
1945    m->incoming.block = evbuffer_new( );
1946    m->outBlock = evbuffer_new( );
1947    m->peerAllowedPieces = NULL;
1948    m->peerAskedFor = REQUEST_LIST_INIT;
1949    m->peerAskedForFast = REQUEST_LIST_INIT;
1950    m->clientAskedFor = REQUEST_LIST_INIT;
1951    m->clientWillAskFor = REQUEST_LIST_INIT;
1952    m->clientAllowedPieces = tr_bitfieldNew( m->torrent->info.pieceCount );
1953    m->clientSuggestedPieces = tr_bitfieldNew( m->torrent->info.pieceCount );
1954    *setme = tr_publisherSubscribe( m->publisher, func, userData );
1955
1956    if ( tr_peerIoSupportsLTEP( m->io ) )
1957        sendLtepHandshake( m );
1958
1959    /* bitfield/have-all/have-none must preceed other non-handshake messages... */
1960    if ( !tr_peerIoSupportsFEXT( m->io ) )
1961        sendBitfield( m );
1962    else {
1963        /* This peer is fastpeer-enabled, send it have-all or have-none if appropriate */
1964        float completion = tr_cpPercentComplete( m->torrent->completion );
1965        if ( completion == 0.0f ) {
1966            sendFastHave( m, 0 );
1967        } else if ( completion == 1.0f ) {
1968            sendFastHave( m, 1 );
1969        } else {
1970            sendBitfield( m );
1971        }
1972    }
1973   
1974    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of inactivity */
1975    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
1976    ratePulse( m );
1977
1978    return m;
1979}
1980
1981void
1982tr_peerMsgsFree( tr_peermsgs* msgs )
1983{
1984    if( msgs != NULL )
1985    {
1986        tr_timerFree( &msgs->pulseTimer );
1987        tr_timerFree( &msgs->rateTimer );
1988        tr_timerFree( &msgs->pexTimer );
1989        tr_publisherFree( &msgs->publisher );
1990        reqListClear( &msgs->clientWillAskFor );
1991        reqListClear( &msgs->clientAskedFor );
1992        reqListClear( &msgs->peerAskedForFast );
1993        reqListClear( &msgs->peerAskedFor );
1994        tr_bitfieldFree( msgs->peerAllowedPieces );
1995        tr_bitfieldFree( msgs->clientAllowedPieces );
1996        tr_bitfieldFree( msgs->clientSuggestedPieces );
1997        evbuffer_free( msgs->incoming.block );
1998        evbuffer_free( msgs->outMessages );
1999        evbuffer_free( msgs->outBlock );
2000        tr_free( msgs->pex );
2001
2002        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2003        tr_free( msgs );
2004    }
2005}
2006
2007tr_publisher_tag
2008tr_peerMsgsSubscribe( tr_peermsgs       * peer,
2009                      tr_delivery_func    func,
2010                      void              * userData )
2011{
2012    return tr_publisherSubscribe( peer->publisher, func, userData );
2013}
2014
2015void
2016tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
2017                        tr_publisher_tag    tag )
2018{
2019    tr_publisherUnsubscribe( peer->publisher, tag );
2020}
2021
2022int
2023tr_peerMsgsIsPieceFastAllowed( const tr_peermsgs * peer,
2024                               uint32_t            index )
2025{
2026    return tr_bitfieldHas( peer->clientAllowedPieces, index );
2027}
2028
2029int
2030tr_peerMsgsIsPieceSuggested( const tr_peermsgs * peer,
2031                             uint32_t            index )
2032{
2033    return tr_bitfieldHas( peer->clientSuggestedPieces, index );
2034}
2035
Note: See TracBrowser for help on using the repository browser.