source: trunk/libtransmission/peer-msgs.c @ 6412

Last change on this file since 6412 was 6412, checked in by charles, 13 years ago

rename `pulse' as peerPulse() and trackerPulse() to make backtraces easier to read

  • Property svn:keywords set to Date Rev Author Id
File size: 56.3 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 6412 2008-07-28 19:47:16Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19#include <libgen.h> /* basename */
20
21#include <event.h>
22
23#include "transmission.h"
24#include "bencode.h"
25#include "completion.h"
26#include "inout.h"
27#include "peer-io.h"
28#include "peer-mgr.h"
29#include "peer-mgr-private.h"
30#include "peer-msgs.h"
31#include "ratecontrol.h"
32#include "stats.h"
33#include "torrent.h"
34#include "trevent.h"
35#include "utils.h"
36
37/**
38***
39**/
40
41enum
42{
43    BT_CHOKE                = 0,
44    BT_UNCHOKE              = 1,
45    BT_INTERESTED           = 2,
46    BT_NOT_INTERESTED       = 3,
47    BT_HAVE                 = 4,
48    BT_BITFIELD             = 5,
49    BT_REQUEST              = 6,
50    BT_PIECE                = 7,
51    BT_CANCEL               = 8,
52    BT_PORT                 = 9,
53    BT_SUGGEST              = 13,
54    BT_HAVE_ALL             = 14,
55    BT_HAVE_NONE            = 15,
56    BT_REJECT               = 16,
57    BT_ALLOWED_FAST         = 17,
58    BT_LTEP                 = 20,
59
60    LTEP_HANDSHAKE          = 0,
61
62    TR_LTEP_PEX             = 1,
63
64    MIN_CHOKE_PERIOD_SEC    = (10),
65
66    /* idle seconds before we send a keepalive */
67    KEEPALIVE_INTERVAL_SECS = 100,
68
69    PEX_INTERVAL            = (90 * 1000), /* msec between sendPex() calls */
70    PEER_PULSE_INTERVAL     = (50),        /* msec between peerPulse() calls */
71    RATE_PULSE_INTERVAL     = (250),       /* msec between ratePulse() calls */
72
73    MAX_QUEUE_SIZE          = (100),
74     
75    /* (fast peers) max number of pieces we fast-allow to another peer */
76    MAX_FAST_ALLOWED_COUNT   = 10,
77
78    /* (fast peers) max threshold for allowing fast-pieces requests */
79    MAX_FAST_ALLOWED_THRESHOLD = 10,
80
81    /* how long an unsent request can stay queued before it's returned
82       back to the peer-mgr's pool of requests */
83    QUEUED_REQUEST_TTL_SECS = 20,
84
85    /* how long a sent request can stay queued before it's returned
86       back to the peer-mgr's pool of requests */
87    SENT_REQUEST_TTL_SECS = 120,
88
89    /* used in lowering the outMessages queue period */
90    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
91    HIGH_PRIORITY_INTERVAL_SECS = 2,
92    LOW_PRIORITY_INTERVAL_SECS = 20
93};
94
95/**
96***  REQUEST MANAGEMENT
97**/
98
99enum
100{
101    AWAITING_BT_LENGTH,
102    AWAITING_BT_ID,
103    AWAITING_BT_MESSAGE,
104    AWAITING_BT_PIECE
105};
106
107struct peer_request
108{
109    uint32_t index;
110    uint32_t offset;
111    uint32_t length;
112    time_t time_requested;
113};
114
115static int
116compareRequest( const void * va, const void * vb )
117{
118    int i;
119    const struct peer_request * a = va;
120    const struct peer_request * b = vb;
121    if(( i = tr_compareUint32( a->index, b->index ))) return i;
122    if(( i = tr_compareUint32( a->offset, b->offset ))) return i;
123    if(( i = tr_compareUint32( a->length, b->length ))) return i;
124    return 0;
125}
126
127struct request_list
128{
129    uint16_t count;
130    uint16_t max;
131    struct peer_request * requests;
132};
133
134static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
135
136static void
137reqListReserve( struct request_list * list, uint16_t max )
138{
139    if( list->max < max )
140    {
141        list->max = max;
142        list->requests = tr_renew( struct peer_request,
143                                   list->requests,
144                                   list->max );
145    }
146}
147
148static void
149reqListClear( struct request_list * list )
150{
151    tr_free( list->requests );
152    *list = REQUEST_LIST_INIT;
153}
154
155static void
156reqListCopy( struct request_list * dest, const struct request_list * src )
157{
158    dest->count = dest->max = src->count;
159    dest->requests = tr_memdup( src->requests, dest->count * sizeof( struct peer_request ) );
160}
161
162static void
163reqListRemoveOne( struct request_list * list, int i )
164{
165    assert( 0<=i && i<list->count );
166
167    memmove( &list->requests[i],
168             &list->requests[i+1],
169             sizeof( struct peer_request ) * ( --list->count - i ) );
170}
171
172static void
173reqListAppend( struct request_list * list, const struct peer_request * req )
174{
175    if( ++list->count >= list->max )
176        reqListReserve( list, list->max + 8 );
177
178    list->requests[list->count-1] = *req;
179}
180
181static void
182reqListAppendPiece( const tr_torrent     * tor,
183                    struct request_list  * list,
184                    tr_piece_index_t       piece )
185{
186    const time_t now = time( NULL );
187    const size_t n = tr_torPieceCountBlocks( tor, piece );
188    const tr_block_index_t begin = tr_torPieceFirstBlock( tor, piece );
189    const tr_block_index_t end = begin + n;
190    tr_block_index_t i;
191
192    if( list->count + n >= list->max )
193        reqListReserve( list, list->max + n );
194
195    for( i=begin; i<end; ++i ) {
196        if( !tr_cpBlockIsComplete( tor->completion, i ) ) {
197            struct peer_request * req = list->requests + list->count++;
198            req->index = piece;
199            req->offset = (i * tor->blockSize) - (piece * tor->info.pieceSize);
200            req->length = tr_torBlockCountBytes( tor, i );
201            req->time_requested = now;
202            assert( tr_torrentReqIsValid( tor, req->index, req->offset, req->length ) );
203        }
204    }
205}
206
207static tr_errno
208reqListPop( struct request_list * list, struct peer_request * setme )
209{
210    tr_errno err;
211
212    if( !list->count )
213        err = TR_ERROR;
214    else {
215        *setme = list->requests[0];
216        reqListRemoveOne( list, 0 );
217        err = TR_OK;
218    }
219
220    return err;
221}
222
223static int
224reqListHasPiece( struct request_list * list, const tr_piece_index_t piece )
225{
226    uint16_t i;
227
228    for( i=0; i<list->count; ++i )
229        if( list->requests[i].index == piece )
230            return 1;
231
232    return 0;
233}
234
235static int
236reqListFind( struct request_list * list, const struct peer_request * key )
237{
238    uint16_t i;
239
240    for( i=0; i<list->count; ++i )
241        if( !compareRequest( key, list->requests+i ) )
242            return i;
243
244    return -1;
245}
246
247static tr_errno
248reqListRemove( struct request_list * list, const struct peer_request * key )
249{
250    tr_errno err;
251    const int i = reqListFind( list, key );
252
253    if( i < 0 )
254        err = TR_ERROR;
255    else {
256        err = TR_OK;
257        reqListRemoveOne( list, i );
258    }
259
260    return err;
261}
262
263/**
264***
265**/
266
267/* this is raw, unchanged data from the peer regarding
268 * the current message that it's sending us. */
269struct tr_incoming
270{
271    uint8_t id;
272    uint32_t length; /* includes the +1 for id length */
273    struct peer_request blockReq; /* metadata for incoming blocks */
274    struct evbuffer * block; /* piece data for incoming blocks */
275};
276
277struct tr_peermsgs
278{
279    unsigned int peerSentBitfield         : 1;
280    unsigned int peerSupportsPex          : 1;
281    unsigned int clientSentLtepHandshake  : 1;
282    unsigned int peerSentLtepHandshake    : 1;
283    unsigned int sendingBlock             : 1;
284   
285    uint8_t state;
286    uint8_t ut_pex_id;
287    uint16_t pexCount;
288    uint16_t minActiveRequests;
289    uint16_t maxActiveRequests;
290
291    /* how long the outMessages batch should be allowed to grow before
292     * it's flushed -- some messages (like requests >:) should be sent
293     * very quickly; others aren't as urgent. */
294    int outMessagesBatchPeriod;
295
296    tr_peer * info;
297
298    tr_handle * handle;
299    tr_torrent * torrent;
300    tr_peerIo * io;
301
302    tr_publisher_t * publisher;
303
304    struct evbuffer * outBlock;    /* buffer of the current piece message */
305    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
306
307    struct request_list peerAskedFor;
308    struct request_list peerAskedForFast;
309    struct request_list clientAskedFor;
310    struct request_list clientWillAskFor;
311
312    tr_timer * rateTimer;
313    tr_timer * pulseTimer;
314    tr_timer * pexTimer;
315
316    time_t clientSentPexAt;
317    time_t clientSentAnythingAt;
318
319    /* when we started batching the outMessages */
320    time_t outMessagesBatchedAt;
321   
322    tr_bitfield * peerAllowedPieces;
323
324    struct tr_incoming incoming; 
325
326    tr_pex * pex;
327};
328
329/**
330***
331**/
332
333static void
334myDebug( const char * file, int line,
335         const struct tr_peermsgs * msgs,
336         const char * fmt, ... )
337{
338    FILE * fp = tr_getLog( );
339    if( fp != NULL )
340    {
341        va_list args;
342        char timestr[64];
343        struct evbuffer * buf = evbuffer_new( );
344        char * myfile = tr_strdup( file );
345
346        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
347                             tr_getLogTimeStr( timestr, sizeof(timestr) ),
348                             msgs->torrent->info.name,
349                             tr_peerIoGetAddrStr( msgs->io ),
350                             msgs->info->client );
351        va_start( args, fmt );
352        evbuffer_add_vprintf( buf, fmt, args );
353        va_end( args );
354        evbuffer_add_printf( buf, " (%s:%d)\n", basename(myfile), line );
355        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
356
357        tr_free( myfile );
358        evbuffer_free( buf );
359    }
360}
361
362#define dbgmsg(msgs, fmt...) myDebug(__FILE__, __LINE__, msgs, ##fmt )
363
364/**
365***
366**/
367
368static void
369pokeBatchPeriod( tr_peermsgs * msgs, int interval )
370{
371    if( msgs->outMessagesBatchPeriod > interval )
372    {
373        msgs->outMessagesBatchPeriod = interval;
374        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
375    }
376}
377
378static void
379protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
380{
381    tr_peerIo * io = msgs->io;
382    struct evbuffer * out = msgs->outMessages;
383
384    dbgmsg( msgs, "requesting %u:%u->%u", req->index, req->offset, req->length );
385    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
386    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
387    tr_peerIoWriteUint32( io, out, req->index );
388    tr_peerIoWriteUint32( io, out, req->offset );
389    tr_peerIoWriteUint32( io, out, req->length );
390    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
391}
392
393static void
394protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
395{
396    tr_peerIo * io = msgs->io;
397    struct evbuffer * out = msgs->outMessages;
398
399    dbgmsg( msgs, "cancelling %u:%u->%u", req->index, req->offset, req->length );
400    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
401    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
402    tr_peerIoWriteUint32( io, out, req->index );
403    tr_peerIoWriteUint32( io, out, req->offset );
404    tr_peerIoWriteUint32( io, out, req->length );
405    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
406}
407
408static void
409protocolSendHave( tr_peermsgs * msgs, uint32_t index )
410{
411    tr_peerIo * io = msgs->io;
412    struct evbuffer * out = msgs->outMessages;
413
414    dbgmsg( msgs, "sending Have %u", index );
415    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
416    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
417    tr_peerIoWriteUint32( io, out, index );
418    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
419}
420
421static void
422protocolSendChoke( tr_peermsgs * msgs, int choke )
423{
424    tr_peerIo * io = msgs->io;
425    struct evbuffer * out = msgs->outMessages;
426
427    dbgmsg( msgs, "sending %s", (choke ? "Choke" : "Unchoke") );
428    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) );
429    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
430    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
431}
432
433/**
434***  EVENTS
435**/
436
437static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0 };
438
439static void
440publish( tr_peermsgs * msgs, tr_peer_event * e )
441{
442    tr_publisherPublish( msgs->publisher, msgs->info, e );
443}
444
445static void
446fireError( tr_peermsgs * msgs, tr_errno err )
447{
448    tr_peer_event e = blankEvent;
449    e.eventType = TR_PEER_ERROR;
450    e.err = err;
451    publish( msgs, &e );
452}
453
454static void
455fireNeedReq( tr_peermsgs * msgs )
456{
457    tr_peer_event e = blankEvent;
458    e.eventType = TR_PEER_NEED_REQ;
459    publish( msgs, &e );
460}
461
462static void
463firePeerProgress( tr_peermsgs * msgs )
464{
465    tr_peer_event e = blankEvent;
466    e.eventType = TR_PEER_PEER_PROGRESS;
467    e.progress = msgs->info->progress;
468    publish( msgs, &e );
469}
470
471static void
472fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
473{
474    tr_peer_event e = blankEvent;
475    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
476    e.pieceIndex = req->index;
477    e.offset = req->offset;
478    e.length = req->length;
479    publish( msgs, &e );
480}
481
482static void
483fireClientGotData( tr_peermsgs * msgs, uint32_t length )
484{
485    tr_peer_event e = blankEvent;
486    e.length = length;
487    e.eventType = TR_PEER_CLIENT_GOT_DATA;
488    publish( msgs, &e );
489}
490
491static void
492firePeerGotData( tr_peermsgs * msgs, uint32_t length )
493{
494    tr_peer_event e = blankEvent;
495    e.length = length;
496    e.eventType = TR_PEER_PEER_GOT_DATA;
497    publish( msgs, &e );
498}
499
500static void
501fireCancelledReq( tr_peermsgs * msgs, const tr_piece_index_t pieceIndex )
502{
503    tr_peer_event e = blankEvent;
504    e.eventType = TR_PEER_CANCEL;
505    e.pieceIndex = pieceIndex;
506    publish( msgs, &e );
507}
508
509/**
510***  INTEREST
511**/
512
513static int
514isPieceInteresting( const tr_peermsgs   * peer,
515                    int                   piece )
516{
517    const tr_torrent * torrent = peer->torrent;
518
519    return ( ( !torrent->info.pieces[piece].dnd )               /* we want it */
520          && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have */
521          && ( tr_bitfieldHas( peer->info->have, piece ) ) );  /* peer has it */
522}
523
524/* "interested" means we'll ask for piece data if they unchoke us */
525static int
526isPeerInteresting( const tr_peermsgs * msgs )
527{
528    tr_piece_index_t i;
529    const tr_torrent * torrent;
530    const tr_bitfield * bitfield;
531    const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
532
533    if( clientIsSeed )
534        return FALSE;
535
536    torrent = msgs->torrent;
537    bitfield = tr_cpPieceBitfield( torrent->completion );
538
539    if( !msgs->info->have )
540        return TRUE;
541
542    assert( bitfield->byteCount == msgs->info->have->byteCount );
543
544    for( i=0; i<torrent->info.pieceCount; ++i )
545        if( isPieceInteresting( msgs, i ) )
546            return TRUE;
547
548    return FALSE;
549}
550
551static void
552sendInterest( tr_peermsgs * msgs, int weAreInterested )
553{
554    assert( msgs != NULL );
555    assert( weAreInterested==0 || weAreInterested==1 );
556
557    msgs->info->clientIsInterested = weAreInterested;
558    dbgmsg( msgs, "Sending %s",
559            weAreInterested ? "Interested" : "Not Interested");
560
561    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
562    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages,
563                   weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
564    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
565}
566
567static void
568updateInterest( tr_peermsgs * msgs )
569{
570    const int i = isPeerInteresting( msgs );
571    if( i != msgs->info->clientIsInterested )
572        sendInterest( msgs, i );
573    if( i )
574        fireNeedReq( msgs );
575}
576
577static void
578cancelAllRequestsToClientExceptFast( tr_peermsgs * msgs )
579{
580    reqListClear( &msgs->peerAskedFor );
581}
582
583void
584tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
585{
586    const time_t fibrillationTime = time(NULL) - MIN_CHOKE_PERIOD_SEC;
587
588    assert( msgs != NULL );
589    assert( msgs->info != NULL );
590    assert( choke==0 || choke==1 );
591
592    if( msgs->info->chokeChangedAt > fibrillationTime )
593    {
594        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
595    }
596    else if( msgs->info->peerIsChoked != choke )
597    {
598        msgs->info->peerIsChoked = choke;
599        if( choke )
600            cancelAllRequestsToClientExceptFast( msgs );
601        protocolSendChoke( msgs, choke );
602        msgs->info->chokeChangedAt = time( NULL );
603    }
604}
605
606/**
607***
608**/
609
610void
611tr_peerMsgsHave( tr_peermsgs * msgs,
612                 uint32_t      index )
613{
614    protocolSendHave( msgs, index );
615
616    /* since we have more pieces now, we might not be interested in this peer */
617    updateInterest( msgs );
618}
619#if 0
620static void
621sendFastSuggest( tr_peermsgs * msgs,
622                 uint32_t      pieceIndex )
623{
624    assert( msgs != NULL );
625   
626    if( tr_peerIoSupportsFEXT( msgs->io ) )
627    {
628        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
629        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
630        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
631    }
632}
633static void
634sendFastHave( tr_peermsgs * msgs, int all )
635{
636    assert( msgs != NULL );
637   
638    if( tr_peerIoSupportsFEXT( msgs->io ) )
639    {
640        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
641        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, ( all ? BT_HAVE_ALL
642                                                                : BT_HAVE_NONE ) );
643        updateInterest( msgs );
644    }
645}
646#endif
647
648static void
649sendFastReject( tr_peermsgs * msgs,
650                uint32_t      pieceIndex,
651                uint32_t      offset,
652                uint32_t      length )
653{
654    assert( msgs != NULL );
655
656    if( tr_peerIoSupportsFEXT( msgs->io ) )
657    {
658        const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
659        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
660        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_REJECT );
661        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
662        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
663        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
664        pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
665    }
666}
667
668static tr_bitfield*
669getPeerAllowedPieces( tr_peermsgs * msgs )
670{
671    if( !msgs->peerAllowedPieces && tr_peerIoSupportsFEXT( msgs->io ) )
672    {
673        msgs->peerAllowedPieces = tr_peerMgrGenerateAllowedSet(
674            MAX_FAST_ALLOWED_COUNT,
675            msgs->torrent->info.pieceCount,
676            msgs->torrent->info.hash,
677            tr_peerIoGetAddress( msgs->io, NULL ) );
678    }
679
680    return msgs->peerAllowedPieces;
681}
682
683static void
684sendFastAllowed( tr_peermsgs * msgs,
685                 uint32_t      pieceIndex)
686{
687    assert( msgs != NULL );
688   
689    if( tr_peerIoSupportsFEXT( msgs->io ) )
690    {
691        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
692        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_ALLOWED_FAST );
693        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
694        pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
695    }
696}
697
698static void
699sendFastAllowedSet( tr_peermsgs * msgs )
700{
701    tr_piece_index_t i = 0;
702
703    while (i <= msgs->torrent->info.pieceCount )
704    {
705        if ( tr_bitfieldHas( getPeerAllowedPieces( msgs ), i) )
706            sendFastAllowed( msgs, i );
707        i++;
708    }
709}
710
711static void
712maybeSendFastAllowedSet( tr_peermsgs * msgs )
713{
714    if( tr_bitfieldCountTrueBits( msgs->info->have ) <= MAX_FAST_ALLOWED_THRESHOLD )
715        sendFastAllowedSet( msgs );
716}
717
718
719/**
720***
721**/
722
723static int
724reqIsValid( const tr_peermsgs   * peer,
725            uint32_t              index,
726            uint32_t              offset,
727            uint32_t              length )
728{
729    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
730}
731
732static int
733requestIsValid( const tr_peermsgs * peer, const struct peer_request * req )
734{
735    return reqIsValid( peer, req->index, req->offset, req->length );
736}
737
738static void
739tr_peerMsgsCancel( tr_peermsgs * msgs,
740                   uint32_t      pieceIndex )
741{
742    uint16_t i;
743    struct request_list tmp = REQUEST_LIST_INIT;
744    struct request_list * src;
745
746    src = &msgs->clientWillAskFor;
747    for( i=0; i<src->count; ++i )
748        if( src->requests[i].index != pieceIndex )
749            reqListAppend( &tmp, src->requests + i );
750
751    /* swap */
752    reqListClear( &msgs->clientWillAskFor );
753    msgs->clientWillAskFor = tmp;
754    tmp = REQUEST_LIST_INIT;
755
756    src = &msgs->clientAskedFor;
757    for( i=0; i<src->count; ++i )
758        if( src->requests[i].index == pieceIndex )
759            protocolSendCancel( msgs, src->requests + i );
760        else
761            reqListAppend( &tmp, src->requests + i );
762
763    /* swap */
764    reqListClear( &msgs->clientAskedFor );
765    msgs->clientAskedFor = tmp;
766    tmp = REQUEST_LIST_INIT;
767
768    fireCancelledReq( msgs, pieceIndex );
769}
770
771static void
772expireOldRequests( tr_peermsgs * msgs )
773{
774    int i;
775    time_t oldestAllowed;
776    struct request_list tmp = REQUEST_LIST_INIT;
777
778    /* cancel requests that have been queued for too long */
779    oldestAllowed = time( NULL ) - QUEUED_REQUEST_TTL_SECS;
780    reqListCopy( &tmp, &msgs->clientWillAskFor );
781    for( i=0; i<tmp.count; ++i ) {
782        const struct peer_request * req = &tmp.requests[i];
783        if( req->time_requested < oldestAllowed )
784            tr_peerMsgsCancel( msgs, req->index );
785    }
786    reqListClear( &tmp );
787
788    /* cancel requests that were sent too long ago */
789    oldestAllowed = time( NULL ) - SENT_REQUEST_TTL_SECS;
790    reqListCopy( &tmp, &msgs->clientAskedFor );
791    for( i=0; i<tmp.count; ++i ) {
792        const struct peer_request * req = &tmp.requests[i];
793        if( req->time_requested < oldestAllowed )
794            tr_peerMsgsCancel( msgs, req->index );
795    }
796    reqListClear( &tmp );
797}
798
799static void
800pumpRequestQueue( tr_peermsgs * msgs )
801{
802    const int max = msgs->maxActiveRequests;
803    const int min = msgs->minActiveRequests;
804    const time_t now = time( NULL );
805    int sent = 0;
806    int count = msgs->clientAskedFor.count;
807    struct peer_request req;
808
809    if( count > min )
810        return;
811    if( msgs->info->clientIsChoked )
812        return;
813
814    while( ( count < max ) && !reqListPop( &msgs->clientWillAskFor, &req ) )
815    {
816        assert( requestIsValid( msgs, &req ) );
817        assert( tr_bitfieldHas( msgs->info->have, req.index ) );
818
819        protocolSendRequest( msgs, &req );
820        req.time_requested = now;
821        reqListAppend( &msgs->clientAskedFor, &req );
822
823        ++count;
824        ++sent;
825    }
826
827    if( sent )
828        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
829                sent,
830                msgs->clientAskedFor.count,
831                msgs->clientWillAskFor.count );
832
833    if( count < max )
834        fireNeedReq( msgs );
835}
836
837static int
838peerPulse( void * vmsgs );
839
840static int
841requestQueueIsFull( const tr_peermsgs * msgs )
842{
843    const int req_max = msgs->maxActiveRequests;
844    return msgs->clientWillAskFor.count >= req_max;
845}
846
847int
848tr_peerMsgsAddRequest( tr_peermsgs       * msgs,
849                       tr_piece_index_t    piece )
850{
851    struct peer_request req;
852
853    assert( msgs != NULL );
854    assert( msgs->torrent != NULL );
855    assert( piece < msgs->torrent->info.pieceCount );
856
857    /**
858    ***  Reasons to decline the request
859    **/
860
861    /* don't send requests to choked clients */
862    if( msgs->info->clientIsChoked ) {
863        dbgmsg( msgs, "declining request because they're choking us" );
864        return TR_ADDREQ_CLIENT_CHOKED;
865    }
866
867    /* peer doesn't have this piece */
868    if( !tr_bitfieldHas( msgs->info->have, piece ) )
869        return TR_ADDREQ_MISSING;
870
871    /* peer's queue is full */
872    if( requestQueueIsFull( msgs ) ) {
873        dbgmsg( msgs, "declining request because we're full" );
874        return TR_ADDREQ_FULL;
875    }
876
877    /* have we already asked for this piece? */
878    if( reqListHasPiece( &msgs->clientAskedFor, piece ) || 
879        reqListHasPiece( &msgs->clientWillAskFor, piece ) ) {
880        dbgmsg( msgs, "declining because it's a duplicate" );
881        return TR_ADDREQ_DUPLICATE;
882    }
883
884    /**
885    ***  Accept this request
886    **/
887
888    dbgmsg( msgs, "added req for piece %lu", (unsigned long)piece );
889    req.time_requested = time( NULL );
890    reqListAppendPiece( msgs->torrent, &msgs->clientWillAskFor, piece );
891    return TR_ADDREQ_OK;
892}
893
894static void
895cancelAllRequestsToPeer( tr_peermsgs * msgs )
896{
897    int i;
898    struct request_list a = msgs->clientWillAskFor;
899    struct request_list b = msgs->clientAskedFor;
900
901    msgs->clientAskedFor = REQUEST_LIST_INIT;
902    msgs->clientWillAskFor = REQUEST_LIST_INIT;
903
904    for( i=0; i<a.count; ++i )
905        fireCancelledReq( msgs, a.requests[i].index );
906
907    for( i=0; i<b.count; ++i ) {
908        fireCancelledReq( msgs, b.requests[i].index );
909        protocolSendCancel( msgs, &b.requests[i] );
910    }
911
912    reqListClear( &a );
913    reqListClear( &b );
914}
915
916/**
917***
918**/
919
920static void
921sendLtepHandshake( tr_peermsgs * msgs )
922{
923    tr_benc val, *m;
924    char * buf;
925    int len;
926    int pex;
927    struct evbuffer * outbuf;
928
929    if( msgs->clientSentLtepHandshake )
930        return;
931
932    outbuf = evbuffer_new( );
933    dbgmsg( msgs, "sending an ltep handshake" );
934    msgs->clientSentLtepHandshake = 1;
935
936    /* decide if we want to advertise pex support */
937    if( !tr_torrentAllowsPex( msgs->torrent ) )
938        pex = 0;
939    else if( msgs->peerSentLtepHandshake )
940        pex = msgs->peerSupportsPex ? 1 : 0;
941    else
942        pex = 1;
943
944    tr_bencInitDict( &val, 4 );
945    tr_bencDictAddInt( &val, "e", msgs->handle->encryptionMode != TR_PLAINTEXT_PREFERRED );
946    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->handle ) );
947    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
948    m  = tr_bencDictAddDict( &val, "m", 1 );
949    if( pex )
950        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
951    buf = tr_bencSave( &val, &len );
952
953    tr_peerIoWriteUint32( msgs->io, outbuf, 2*sizeof(uint8_t) + len );
954    tr_peerIoWriteUint8 ( msgs->io, outbuf, BT_LTEP );
955    tr_peerIoWriteUint8 ( msgs->io, outbuf, LTEP_HANDSHAKE );
956    tr_peerIoWriteBytes ( msgs->io, outbuf, buf, len );
957
958    tr_peerIoWriteBuf( msgs->io, outbuf );
959
960    dbgmsg( msgs, "here is the ltep handshake we sent [%*.*s]", len, len, buf );
961
962    /* cleanup */
963    tr_bencFree( &val );
964    tr_free( buf );
965    evbuffer_free( outbuf );
966}
967
968static void
969parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
970{
971    tr_benc val, * sub;
972    uint8_t * tmp = tr_new( uint8_t, len );
973
974    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
975    msgs->peerSentLtepHandshake = 1;
976
977    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
978        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
979        tr_free( tmp );
980        return;
981    }
982
983    dbgmsg( msgs, "here is the ltep handshake we got [%*.*s]", len, len, tmp );
984
985    /* does the peer prefer encrypted connections? */
986    if(( sub = tr_bencDictFindType( &val, "e", TYPE_INT )))
987        msgs->info->encryption_preference = sub->val.i
988                                      ? ENCRYPTION_PREFERENCE_YES
989                                      : ENCRYPTION_PREFERENCE_NO;
990
991    /* check supported messages for utorrent pex */
992    msgs->peerSupportsPex = 0;
993    if(( sub = tr_bencDictFindType( &val, "m", TYPE_DICT ))) {
994        if(( sub = tr_bencDictFindType( sub, "ut_pex", TYPE_INT ))) {
995            msgs->ut_pex_id = (uint8_t) sub->val.i;
996            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
997            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
998        }
999    }
1000
1001    /* get peer's listening port */
1002    if(( sub = tr_bencDictFindType( &val, "p", TYPE_INT ))) {
1003        msgs->info->port = htons( (uint16_t)sub->val.i );
1004        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
1005    }
1006
1007    tr_bencFree( &val );
1008    tr_free( tmp );
1009}
1010
1011static void
1012parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1013{
1014    int loaded = 0;
1015    uint8_t * tmp = tr_new( uint8_t, msglen );
1016    tr_benc val, *added;
1017    const tr_torrent * tor = msgs->torrent;
1018    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
1019
1020    if( tr_torrentAllowsPex( tor )
1021        && (( loaded = !tr_bencLoad( tmp, msglen, &val, NULL )))
1022        && (( added = tr_bencDictFindType( &val, "added", TYPE_STR ))))
1023    {
1024        const char * added_f = NULL;
1025        tr_pex * pex;
1026        size_t i, n;
1027        tr_bencDictFindStr( &val, "added.f", &added_f );
1028        pex = tr_peerMgrCompactToPex( added->val.s.s, added->val.s.i, added_f, &n );
1029        for( i=0; i<n; ++i )
1030            tr_peerMgrAddPex( msgs->handle->peerMgr, tor->info.hash,
1031                              TR_PEER_FROM_PEX, pex+i );
1032        tr_free( pex );
1033    }
1034
1035    if( loaded )
1036        tr_bencFree( &val );
1037    tr_free( tmp );
1038}
1039
1040static void
1041sendPex( tr_peermsgs * msgs );
1042
1043static void
1044parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1045{
1046    uint8_t ltep_msgid;
1047
1048    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
1049    msglen--;
1050
1051    if( ltep_msgid == LTEP_HANDSHAKE )
1052    {
1053        dbgmsg( msgs, "got ltep handshake" );
1054        parseLtepHandshake( msgs, msglen, inbuf );
1055        if( tr_peerIoSupportsLTEP( msgs->io ) )
1056        {
1057            sendLtepHandshake( msgs );
1058            sendPex( msgs );
1059        }
1060    }
1061    else if( ltep_msgid == TR_LTEP_PEX )
1062    {
1063        dbgmsg( msgs, "got ut pex" );
1064        msgs->peerSupportsPex = 1;
1065        parseUtPex( msgs, msglen, inbuf );
1066    }
1067    else
1068    {
1069        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1070        evbuffer_drain( inbuf, msglen );
1071    }
1072}
1073
1074static int
1075readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1076{
1077    uint32_t len;
1078
1079    if( inlen < sizeof(len) )
1080        return READ_MORE;
1081
1082    tr_peerIoReadUint32( msgs->io, inbuf, &len );
1083
1084    if( len == 0 ) /* peer sent us a keepalive message */
1085        dbgmsg( msgs, "got KeepAlive" );
1086    else {
1087        msgs->incoming.length = len;
1088        msgs->state = AWAITING_BT_ID;
1089    }
1090
1091    return READ_AGAIN;
1092}
1093
1094static int
1095readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen );
1096
1097static int
1098readBtId( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1099{
1100    uint8_t id;
1101
1102    if( inlen < sizeof(uint8_t) )
1103        return READ_MORE;
1104
1105    tr_peerIoReadUint8( msgs->io, inbuf, &id );
1106    msgs->incoming.id = id;
1107
1108    if( id==BT_PIECE )
1109    {
1110        msgs->state = AWAITING_BT_PIECE;
1111        return READ_AGAIN;
1112    }
1113    else if( msgs->incoming.length != 1 )
1114    {
1115        msgs->state = AWAITING_BT_MESSAGE;
1116        return READ_AGAIN;
1117    }
1118    else return readBtMessage( msgs, inbuf, inlen-1 );
1119}
1120
1121static void
1122updatePeerProgress( tr_peermsgs * msgs )
1123{
1124    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have )
1125                         / (float)msgs->torrent->info.pieceCount;
1126    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
1127    updateInterest( msgs );
1128    firePeerProgress( msgs );
1129}
1130
1131static int
1132clientCanSendFastBlock( const tr_peermsgs * msgs UNUSED )
1133{
1134    /* don't send a fast piece if peer has MAX_FAST_ALLOWED_THRESHOLD pieces */
1135    if( tr_bitfieldCountTrueBits( msgs->info->have ) > MAX_FAST_ALLOWED_THRESHOLD )
1136        return FALSE;
1137   
1138    /* ...or if we don't have ourself enough pieces */
1139    if( tr_bitfieldCountTrueBits( tr_cpPieceBitfield( msgs->torrent->completion ) ) < MAX_FAST_ALLOWED_THRESHOLD )
1140        return FALSE;
1141
1142    /* Maybe a bandwidth limit ? */
1143    return TRUE;
1144}
1145
1146static void
1147peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
1148{
1149    const int reqIsValid = requestIsValid( msgs, req );
1150    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
1151    const int peerIsChoked = msgs->info->peerIsChoked;
1152    const int peerIsFast = tr_peerIoSupportsFEXT( msgs->io );
1153    const int pieceIsFast = reqIsValid && tr_bitfieldHas( getPeerAllowedPieces( msgs ), req->index );
1154    const int canSendFast = clientCanSendFastBlock( msgs );
1155
1156    if( !reqIsValid ) /* bad request */
1157    {
1158        dbgmsg( msgs, "rejecting an invalid request." );
1159        sendFastReject( msgs, req->index, req->offset, req->length );
1160    }
1161    else if( !clientHasPiece ) /* we don't have it */
1162    {
1163        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1164        sendFastReject( msgs, req->index, req->offset, req->length );
1165    }
1166    else if( peerIsChoked && !peerIsFast ) /* doesn't he know he's choked? */
1167    {
1168        tr_peerMsgsSetChoke( msgs, 1 );
1169        sendFastReject( msgs, req->index, req->offset, req->length );
1170    }
1171    else if( peerIsChoked && peerIsFast && ( !pieceIsFast || !canSendFast ) )
1172    {
1173        sendFastReject( msgs, req->index, req->offset, req->length );
1174    }
1175    else /* YAY */
1176    {
1177        if( peerIsFast && pieceIsFast )
1178            reqListAppend( &msgs->peerAskedForFast, req );
1179        else
1180            reqListAppend( &msgs->peerAskedFor, req );
1181    }
1182}
1183
1184static int
1185messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1186{
1187    switch( id )
1188    {
1189        case BT_CHOKE:
1190        case BT_UNCHOKE:
1191        case BT_INTERESTED:
1192        case BT_NOT_INTERESTED:
1193        case BT_HAVE_ALL:
1194        case BT_HAVE_NONE:
1195            return len==1;
1196
1197        case BT_HAVE:
1198        case BT_SUGGEST:
1199        case BT_ALLOWED_FAST:
1200            return len==5;
1201
1202        case BT_BITFIELD:
1203            return len == (msg->torrent->info.pieceCount+7u)/8u + 1u;
1204       
1205        case BT_REQUEST:
1206        case BT_CANCEL:
1207        case BT_REJECT:
1208            return len==13;
1209
1210        case BT_PIECE:
1211            return len>9 && len<=16393;
1212
1213        case BT_PORT:
1214            return len==3;
1215
1216        case BT_LTEP:
1217            return len >= 2;
1218
1219        default:
1220            return FALSE;
1221    }
1222}
1223
1224static int
1225clientGotBlock( tr_peermsgs * msgs, const uint8_t * block, const struct peer_request * req );
1226
1227static void
1228clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1229{
1230    msgs->info->pieceDataActivityDate = time( NULL );
1231    tr_rcTransferred( msgs->info->rcToClient, byteCount );
1232    fireClientGotData( msgs, byteCount );
1233}
1234
1235static int
1236readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1237{
1238    struct peer_request * req = &msgs->incoming.blockReq;
1239    assert( EVBUFFER_LENGTH(inbuf) >= inlen );
1240    dbgmsg( msgs, "In readBtPiece" );
1241
1242    if( !req->length )
1243    {
1244        if( inlen < 8 )
1245            return READ_MORE;
1246
1247        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
1248        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
1249        req->length = msgs->incoming.length - 9;
1250        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1251        return READ_AGAIN;
1252    }
1253    else
1254    {
1255        int err;
1256
1257        /* read in another chunk of data */
1258        const size_t nLeft = req->length - EVBUFFER_LENGTH(msgs->incoming.block);
1259        size_t n = MIN( nLeft, inlen );
1260        uint8_t * buf = tr_new( uint8_t, n );
1261        assert( EVBUFFER_LENGTH(inbuf) >= n );
1262        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
1263        evbuffer_add( msgs->incoming.block, buf, n );
1264        clientGotBytes( msgs, n );
1265        tr_free( buf );
1266        dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
1267               (int)n, req->index, req->offset, req->length,
1268               (int)( req->length - EVBUFFER_LENGTH(msgs->incoming.block) ) );
1269        if( EVBUFFER_LENGTH(msgs->incoming.block) < req->length )
1270            return READ_MORE;
1271
1272        /* we've got the whole block ... process it */
1273        err = clientGotBlock( msgs, EVBUFFER_DATA(msgs->incoming.block), req );
1274
1275        /* cleanup */
1276        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH(msgs->incoming.block) );
1277        req->length = 0;
1278        msgs->state = AWAITING_BT_LENGTH;
1279        if( !err )
1280            return READ_AGAIN;
1281        else {
1282            fireError( msgs, err );
1283            return READ_DONE;
1284        }
1285    }
1286}
1287
1288static int
1289readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1290{
1291    uint32_t ui32;
1292    uint32_t msglen = msgs->incoming.length;
1293    const uint8_t id = msgs->incoming.id;
1294    const size_t startBufLen = EVBUFFER_LENGTH( inbuf );
1295
1296    --msglen; /* id length */
1297
1298    if( inlen < msglen )
1299        return READ_MORE;
1300
1301    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id, (int)msglen, (int)inlen );
1302
1303    if( !messageLengthIsCorrect( msgs, id, msglen+1 ) )
1304    {
1305        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1306        fireError( msgs, TR_ERROR );
1307        return READ_DONE;
1308    }
1309
1310    switch( id )
1311    {
1312        case BT_CHOKE:
1313            dbgmsg( msgs, "got Choke" );
1314            msgs->info->clientIsChoked = 1;
1315            cancelAllRequestsToPeer( msgs );
1316            cancelAllRequestsToClientExceptFast( msgs );
1317            break;
1318
1319        case BT_UNCHOKE:
1320            dbgmsg( msgs, "got Unchoke" );
1321            msgs->info->clientIsChoked = 0;
1322            fireNeedReq( msgs );
1323            break;
1324
1325        case BT_INTERESTED:
1326            dbgmsg( msgs, "got Interested" );
1327            msgs->info->peerIsInterested = 1;
1328            break;
1329
1330        case BT_NOT_INTERESTED:
1331            dbgmsg( msgs, "got Not Interested" );
1332            msgs->info->peerIsInterested = 0;
1333            break;
1334           
1335        case BT_HAVE:
1336            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1337            dbgmsg( msgs, "got Have: %u", ui32 );
1338            if( tr_bitfieldAdd( msgs->info->have, ui32 ) )
1339                fireError( msgs, TR_ERROR_PEER_MESSAGE );
1340            updatePeerProgress( msgs );
1341            tr_rcTransferred( msgs->torrent->swarmSpeed, msgs->torrent->info.pieceSize );
1342            break;
1343
1344        case BT_BITFIELD: {
1345            dbgmsg( msgs, "got a bitfield" );
1346            msgs->peerSentBitfield = 1;
1347            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
1348            updatePeerProgress( msgs );
1349            maybeSendFastAllowedSet( msgs );
1350            fireNeedReq( msgs );
1351            break;
1352        }
1353
1354        case BT_REQUEST: {
1355            struct peer_request r;
1356            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1357            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1358            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1359            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1360            peerMadeRequest( msgs, &r );
1361            break;
1362        }
1363
1364        case BT_CANCEL: {
1365            struct peer_request r;
1366            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1367            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1368            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1369            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1370            reqListRemove( &msgs->peerAskedForFast, &r );
1371            reqListRemove( &msgs->peerAskedFor, &r );
1372            break;
1373        }
1374
1375        case BT_PIECE:
1376            assert( 0 ); /* handled elsewhere! */
1377            break;
1378       
1379        case BT_PORT:
1380            dbgmsg( msgs, "Got a BT_PORT" );
1381            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1382            break;
1383           
1384        case BT_SUGGEST: {
1385            dbgmsg( msgs, "Got a BT_SUGGEST" );
1386            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1387            /* we don't do anything with this yet */
1388            break;
1389        }
1390           
1391        case BT_HAVE_ALL:
1392            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1393            tr_bitfieldAddRange( msgs->info->have, 0, msgs->torrent->info.pieceCount );
1394            updatePeerProgress( msgs );
1395            maybeSendFastAllowedSet( msgs );
1396            break;
1397       
1398       
1399        case BT_HAVE_NONE:
1400            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1401            tr_bitfieldClear( msgs->info->have );
1402            updatePeerProgress( msgs );
1403            maybeSendFastAllowedSet( msgs );
1404            break;
1405       
1406        case BT_REJECT: {
1407            struct peer_request r;
1408            dbgmsg( msgs, "Got a BT_REJECT" );
1409            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1410            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1411            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1412            reqListRemove( &msgs->clientAskedFor, &r );
1413            break;
1414        }
1415
1416        case BT_ALLOWED_FAST: {
1417            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1418            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1419            /* we don't do anything with this yet */
1420            break;
1421        }
1422
1423        case BT_LTEP:
1424            dbgmsg( msgs, "Got a BT_LTEP" );
1425            parseLtep( msgs, msglen, inbuf );
1426            break;
1427
1428        default:
1429            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1430            tr_peerIoDrain( msgs->io, inbuf, msglen );
1431            break;
1432    }
1433
1434    assert( msglen + 1 == msgs->incoming.length );
1435    assert( EVBUFFER_LENGTH(inbuf) == startBufLen - msglen );
1436
1437    msgs->state = AWAITING_BT_LENGTH;
1438    return READ_AGAIN;
1439}
1440
1441static void
1442peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1443{
1444    msgs->info->pieceDataActivityDate = time( NULL );
1445    tr_rcTransferred( msgs->info->rcToPeer, byteCount );
1446    firePeerGotData( msgs, byteCount );
1447}
1448
1449static size_t
1450getDownloadMax( const tr_peermsgs * msgs )
1451{
1452    static const size_t maxval = ~0;
1453    const tr_torrent * tor = msgs->torrent;
1454
1455    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1456        return tor->handle->useDownloadLimit
1457            ? tr_rcBytesLeft( tor->handle->download ) : maxval;
1458
1459    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
1460        return tr_rcBytesLeft( tor->download );
1461
1462    return maxval;
1463}
1464
1465static void
1466decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount )
1467{
1468    tr_torrent * tor = msgs->torrent;
1469    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1470}
1471
1472static void
1473clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1474{
1475    decrementDownloadedCount( msgs, req->length );
1476}
1477
1478static void
1479addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1480{
1481    if( !msgs->info->blame )
1482         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1483    tr_bitfieldAdd( msgs->info->blame, index );
1484}
1485
1486static tr_errno
1487clientGotBlock( tr_peermsgs                * msgs,
1488                const uint8_t              * data,
1489                const struct peer_request  * req )
1490{
1491    int err;
1492    tr_torrent * tor = msgs->torrent;
1493    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1494
1495    assert( msgs != NULL );
1496    assert( req != NULL );
1497
1498    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) )
1499    {
1500        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1501                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1502        return TR_ERROR;
1503    }
1504
1505    /* save the block */
1506    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1507
1508    /**
1509    *** Remove the block from our `we asked for this' list
1510    **/
1511
1512    if( reqListRemove( &msgs->clientAskedFor, req ) )
1513    {
1514        clientGotUnwantedBlock( msgs, req );
1515        dbgmsg( msgs, "we didn't ask for this message..." );
1516        return 0;
1517    }
1518
1519    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1520                  msgs->clientAskedFor.count );
1521
1522    /**
1523    *** Error checks
1524    **/
1525
1526    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1527        dbgmsg( msgs, "we have this block already..." );
1528        clientGotUnwantedBlock( msgs, req );
1529        return 0;
1530    }
1531
1532    /**
1533    ***  Save the block
1534    **/
1535
1536    msgs->info->peerSentPieceDataAt = time( NULL );
1537    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1538        return err;
1539
1540    addPeerToBlamefield( msgs, req->index );
1541    fireGotBlock( msgs, req );
1542    return 0;
1543}
1544
1545static void
1546didWrite( struct bufferevent * evin UNUSED, void * vmsgs )
1547{
1548    peerPulse( vmsgs );
1549}
1550
1551static ReadState
1552canRead( struct bufferevent * evin, void * vmsgs )
1553{
1554    ReadState ret;
1555    tr_peermsgs * msgs = vmsgs;
1556    struct evbuffer * in = EVBUFFER_INPUT ( evin );
1557    const size_t inlen = EVBUFFER_LENGTH( in );
1558
1559    if( !inlen )
1560    {
1561        ret = READ_DONE;
1562    }
1563    else if( msgs->state == AWAITING_BT_PIECE )
1564    {
1565        const size_t downloadMax = getDownloadMax( msgs );
1566        const size_t n = MIN( inlen, downloadMax );
1567        ret = n ? readBtPiece( msgs, in, n ) : READ_DONE;
1568    }
1569    else switch( msgs->state )
1570    {
1571        case AWAITING_BT_LENGTH:  ret = readBtLength ( msgs, in, inlen ); break;
1572        case AWAITING_BT_ID:      ret = readBtId     ( msgs, in, inlen ); break;
1573        case AWAITING_BT_MESSAGE: ret = readBtMessage( msgs, in, inlen ); break;
1574        default:                  assert( 0 );
1575    }
1576
1577    return ret;
1578}
1579
1580static void
1581sendKeepalive( tr_peermsgs * msgs )
1582{
1583    dbgmsg( msgs, "sending a keepalive message" );
1584    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1585    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1586}
1587
1588/**
1589***
1590**/
1591
1592static size_t
1593getUploadMax( const tr_peermsgs * msgs )
1594{
1595    static const size_t maxval = ~0;
1596    const tr_torrent * tor = msgs->torrent;
1597    int speedLeft;
1598    int bufLeft;
1599
1600    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1601        speedLeft = tor->handle->useUploadLimit ? tr_rcBytesLeft( tor->handle->upload ) : maxval;
1602    else if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
1603        speedLeft = tr_rcBytesLeft( tor->upload );
1604    else
1605        speedLeft = INT_MAX;
1606
1607    /* this basically says the outbuf shouldn't have more than one block
1608     * queued up in it... blocksize, +13 for the size of the BT protocol's
1609     * block message overhead */
1610    bufLeft = tor->blockSize + 13 - tr_peerIoWriteBytesWaiting( msgs->io );
1611    return MIN( speedLeft, bufLeft );
1612}
1613
1614static int
1615getMaxBlocksFromPeerSoon( const tr_peermsgs * peer )
1616{
1617    const double seconds = 30;
1618    const double bytesPerSecond = peer->info->rateToClient * 1024;
1619    const double totalBytes = bytesPerSecond * seconds;
1620    const int blockCount = totalBytes / peer->torrent->blockSize;
1621    /*fprintf( stderr, "rate %f -- blockCount %d\n", peer->info->rateToClient, blockCount );*/
1622    return blockCount;
1623}
1624
1625static int
1626ratePulse( void * vpeer )
1627{
1628    tr_peermsgs * peer = vpeer;
1629    peer->info->rateToClient = tr_rcRate( peer->info->rcToClient );
1630    peer->info->rateToPeer = tr_rcRate( peer->info->rcToPeer );
1631    peer->minActiveRequests = 4;
1632    peer->maxActiveRequests = peer->minActiveRequests + getMaxBlocksFromPeerSoon( peer );
1633    return TRUE;
1634}
1635
1636static tr_errno
1637popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
1638{
1639    if( !reqListPop( &msgs->peerAskedForFast, setme ) )
1640        return 0;
1641    if( !reqListPop( &msgs->peerAskedFor, setme ) )
1642        return 0;
1643
1644    return TR_ERROR;
1645}
1646
1647static int
1648peerPulse( void * vmsgs )
1649{
1650    const time_t now = time( NULL );
1651    tr_peermsgs * msgs = vmsgs;
1652
1653    tr_peerIoTryRead( msgs->io );
1654    pumpRequestQueue( msgs );
1655    expireOldRequests( msgs );
1656
1657    if( msgs->sendingBlock )
1658    {
1659        const size_t uploadMax = getUploadMax( msgs );
1660        size_t len = EVBUFFER_LENGTH( msgs->outBlock );
1661        const size_t outlen = MIN( len, uploadMax );
1662
1663        assert( len );
1664
1665        if( outlen )
1666        {
1667            tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen );
1668            evbuffer_drain( msgs->outBlock, outlen );
1669            peerGotBytes( msgs, outlen );
1670
1671            len -= outlen;
1672            msgs->clientSentAnythingAt = now;
1673            msgs->sendingBlock = len != 0;
1674
1675            dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen, (int)len );
1676        }
1677        else dbgmsg( msgs, "stalled writing block... uploadMax %lu, outlen %lu", uploadMax, outlen );
1678    }
1679
1680    if( !msgs->sendingBlock )
1681    {
1682        struct peer_request req;
1683        int haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1684
1685        if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1686        {
1687            dbgmsg( msgs, "started an outMessages batch (length is %d)", (int)EVBUFFER_LENGTH( msgs->outMessages ) );
1688            msgs->outMessagesBatchedAt = now;
1689        }
1690        else if( haveMessages
1691                 && ( ( now - msgs->outMessagesBatchedAt ) > msgs->outMessagesBatchPeriod ) )
1692        {
1693            dbgmsg( msgs, "flushing outMessages... (length is %d)", (int)EVBUFFER_LENGTH( msgs->outMessages ) );
1694            tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1695            msgs->clientSentAnythingAt = now;
1696            msgs->outMessagesBatchedAt = 0;
1697            msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1698        }
1699        else if( !EVBUFFER_LENGTH( msgs->outBlock )
1700            && !popNextRequest( msgs, &req )
1701            && requestIsValid( msgs, &req )
1702            && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) )
1703        {
1704            uint8_t * buf = tr_new( uint8_t, req.length );
1705
1706            if( !tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf ) )
1707            {
1708                tr_peerIo * io = msgs->io;
1709                struct evbuffer * out = msgs->outBlock;
1710
1711                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1712                tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 2*sizeof(uint32_t) + req.length );
1713                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1714                tr_peerIoWriteUint32( io, out, req.index );
1715                tr_peerIoWriteUint32( io, out, req.offset );
1716                tr_peerIoWriteBytes ( io, out, buf, req.length );
1717                msgs->sendingBlock = 1;
1718            }
1719
1720            tr_free( buf );
1721        }
1722        else if(    ( !haveMessages )
1723                 && ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
1724        {
1725            sendKeepalive( msgs );
1726        }
1727    }
1728
1729    return TRUE; /* loop forever */
1730}
1731
1732static void
1733gotError( struct bufferevent * evbuf UNUSED, short what, void * vmsgs )
1734{
1735    if( what & EVBUFFER_TIMEOUT )
1736        dbgmsg( vmsgs, "libevent got a timeout, what=%hd, secs=%d", what, evbuf->timeout_read );
1737    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1738        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1739                what, errno, tr_strerror(errno) );
1740    fireError( vmsgs, TR_ERROR );
1741}
1742
1743static void
1744sendBitfield( tr_peermsgs * msgs )
1745{
1746    const tr_bitfield * bitfield = tr_cpPieceBitfield( msgs->torrent->completion );
1747    struct evbuffer * out = msgs->outMessages;
1748
1749    dbgmsg( msgs, "sending peer a bitfield message" );
1750    tr_peerIoWriteUint32( msgs->io, out, sizeof(uint8_t) + bitfield->byteCount );
1751    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1752    tr_peerIoWriteBytes ( msgs->io, out, bitfield->bits, bitfield->byteCount );
1753    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1754}
1755
1756/**
1757***
1758**/
1759
1760/* some peers give us error messages if we send
1761   more than this many peers in a single pex message
1762   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1763#define MAX_PEX_ADDED 50
1764#define MAX_PEX_DROPPED 50
1765
1766typedef struct
1767{
1768    tr_pex * added;
1769    tr_pex * dropped;
1770    tr_pex * elements;
1771    int addedCount;
1772    int droppedCount;
1773    int elementCount;
1774}
1775PexDiffs;
1776
1777static void
1778pexAddedCb( void * vpex, void * userData )
1779{
1780    PexDiffs * diffs = userData;
1781    tr_pex * pex = vpex;
1782    if( diffs->addedCount < MAX_PEX_ADDED )
1783    {
1784        diffs->added[diffs->addedCount++] = *pex;
1785        diffs->elements[diffs->elementCount++] = *pex;
1786    }
1787}
1788
1789static void
1790pexDroppedCb( void * vpex, void * userData )
1791{
1792    PexDiffs * diffs = userData;
1793    tr_pex * pex = vpex;
1794    if( diffs->droppedCount < MAX_PEX_DROPPED )
1795    {
1796        diffs->dropped[diffs->droppedCount++] = *pex;
1797    }
1798}
1799
1800static void
1801pexElementCb( void * vpex, void * userData )
1802{
1803    PexDiffs * diffs = userData;
1804    tr_pex * pex = vpex;
1805    diffs->elements[diffs->elementCount++] = *pex;
1806}
1807
1808static void
1809sendPex( tr_peermsgs * msgs )
1810{
1811    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
1812    {
1813        int i;
1814        tr_pex * newPex = NULL;
1815        const int newCount = tr_peerMgrGetPeers( msgs->handle->peerMgr, msgs->torrent->info.hash, &newPex );
1816        PexDiffs diffs;
1817        tr_benc val, *added, *dropped;
1818        uint8_t *tmp, *walk;
1819        char * benc;
1820        int bencLen;
1821
1822        /* build the diffs */
1823        diffs.added = tr_new( tr_pex, newCount );
1824        diffs.addedCount = 0;
1825        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1826        diffs.droppedCount = 0;
1827        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1828        diffs.elementCount = 0;
1829        tr_set_compare( msgs->pex, msgs->pexCount,
1830                        newPex, newCount,
1831                        tr_pexCompare, sizeof(tr_pex),
1832                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
1833        dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1834
1835        /* update peer */
1836        tr_free( msgs->pex );
1837        msgs->pex = diffs.elements;
1838        msgs->pexCount = diffs.elementCount;
1839
1840        /* build the pex payload */
1841        tr_bencInitDict( &val, 3 );
1842
1843        /* "added" */
1844        added = tr_bencDictAdd( &val, "added" );
1845        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1846        for( i=0; i<diffs.addedCount; ++i ) {
1847            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1848            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1849        }
1850        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1851        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
1852
1853        /* "added.f" */
1854        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1855        for( i=0; i<diffs.addedCount; ++i )
1856            *walk++ = diffs.added[i].flags;
1857        assert( ( walk - tmp ) == diffs.addedCount );
1858        tr_bencDictAddRaw( &val, "added.f", tmp, walk-tmp );
1859        tr_free( tmp );
1860
1861        /* "dropped" */
1862        dropped = tr_bencDictAdd( &val, "dropped" );
1863        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1864        for( i=0; i<diffs.droppedCount; ++i ) {
1865            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1866            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1867        }
1868        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1869        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
1870
1871        /* write the pex message */
1872        benc = tr_bencSave( &val, &bencLen );
1873        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 2*sizeof(uint8_t) + bencLen );
1874        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_LTEP );
1875        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, msgs->ut_pex_id );
1876        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, benc, bencLen );
1877        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1878
1879        /* cleanup */
1880        tr_free( benc );
1881        tr_bencFree( &val );
1882        tr_free( diffs.added );
1883        tr_free( diffs.dropped );
1884        tr_free( newPex );
1885
1886        msgs->clientSentPexAt = time( NULL );
1887    }
1888}
1889
1890static int
1891pexPulse( void * vpeer )
1892{
1893    sendPex( vpeer );
1894    return TRUE;
1895}
1896
1897/**
1898***
1899**/
1900
1901tr_peermsgs*
1902tr_peerMsgsNew( struct tr_torrent * torrent,
1903                struct tr_peer    * info,
1904                tr_delivery_func    func,
1905                void              * userData,
1906                tr_publisher_tag  * setme )
1907{
1908    tr_peermsgs * m;
1909
1910    assert( info != NULL );
1911    assert( info->io != NULL );
1912
1913    m = tr_new0( tr_peermsgs, 1 );
1914    m->publisher = tr_publisherNew( );
1915    m->info = info;
1916    m->handle = torrent->handle;
1917    m->torrent = torrent;
1918    m->io = info->io;
1919    m->info->clientIsChoked = 1;
1920    m->info->peerIsChoked = 1;
1921    m->info->clientIsInterested = 0;
1922    m->info->peerIsInterested = 0;
1923    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1924    m->state = AWAITING_BT_LENGTH;
1925    m->pulseTimer = tr_timerNew( m->handle, peerPulse, m, PEER_PULSE_INTERVAL );
1926    m->rateTimer = tr_timerNew( m->handle, ratePulse, m, RATE_PULSE_INTERVAL );
1927    m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL );
1928    m->outMessages = evbuffer_new( );
1929    m->outMessagesBatchedAt = 0;
1930    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1931    m->incoming.block = evbuffer_new( );
1932    m->outBlock = evbuffer_new( );
1933    m->peerAllowedPieces = NULL;
1934    m->peerAskedFor = REQUEST_LIST_INIT;
1935    m->peerAskedForFast = REQUEST_LIST_INIT;
1936    m->clientAskedFor = REQUEST_LIST_INIT;
1937    m->clientWillAskFor = REQUEST_LIST_INIT;
1938    *setme = tr_publisherSubscribe( m->publisher, func, userData );
1939
1940    if ( tr_peerIoSupportsLTEP( m->io ) )
1941        sendLtepHandshake( m );
1942
1943    sendBitfield( m );
1944   
1945    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of inactivity */
1946    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
1947    ratePulse( m );
1948
1949    return m;
1950}
1951
1952void
1953tr_peerMsgsFree( tr_peermsgs* msgs )
1954{
1955    if( msgs != NULL )
1956    {
1957        tr_timerFree( &msgs->pulseTimer );
1958        tr_timerFree( &msgs->rateTimer );
1959        tr_timerFree( &msgs->pexTimer );
1960        tr_publisherFree( &msgs->publisher );
1961        reqListClear( &msgs->clientWillAskFor );
1962        reqListClear( &msgs->clientAskedFor );
1963        reqListClear( &msgs->peerAskedForFast );
1964        reqListClear( &msgs->peerAskedFor );
1965        tr_bitfieldFree( msgs->peerAllowedPieces );
1966        evbuffer_free( msgs->incoming.block );
1967        evbuffer_free( msgs->outMessages );
1968        evbuffer_free( msgs->outBlock );
1969        tr_free( msgs->pex );
1970
1971        memset( msgs, ~0, sizeof( tr_peermsgs ) );
1972        tr_free( msgs );
1973    }
1974}
1975
1976tr_publisher_tag
1977tr_peerMsgsSubscribe( tr_peermsgs       * peer,
1978                      tr_delivery_func    func,
1979                      void              * userData )
1980{
1981    return tr_publisherSubscribe( peer->publisher, func, userData );
1982}
1983
1984void
1985tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
1986                        tr_publisher_tag    tag )
1987{
1988    tr_publisherUnsubscribe( peer->publisher, tag );
1989}
Note: See TracBrowser for help on using the repository browser.