source: branches/encryption/libtransmission/peer-msgs.c @ 3101

Last change on this file since 3101 was 3101, checked in by charles, 15 years ago
  • add a public API call for setting encryption preferences
  • fix upload bug that caused us to send the same block out more than once
  • fix handshake bug that caused us to shun uTorrent and vice versa
  • fix minor memory leaks (valgrind).
  • move extension handshakes into peer-msgs
  • Property svn:keywords set to Date Rev Author Id
File size: 37.5 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 3101 2007-09-20 04:11:09Z charles $
11 */
12
13#include <assert.h>
14#include <ctype.h>
15#include <stdio.h>
16#include <stdlib.h>
17#include <string.h>
18
19#include <arpa/inet.h>
20
21#include <sys/types.h> /* event.h needs this */
22#include <event.h>
23
24#include "transmission.h"
25#include "bencode.h"
26#include "completion.h"
27#include "inout.h"
28#include "list.h"
29#include "peer-io.h"
30#include "peer-mgr.h"
31#include "peer-mgr-private.h"
32#include "peer-msgs.h"
33#include "ratecontrol.h"
34#include "trevent.h"
35#include "utils.h"
36
37/**
38***
39**/
40
41#define PEX_INTERVAL (60 * 1000)
42
43#define PEER_PULSE_INTERVAL (100)
44
45enum
46{
47    BT_CHOKE           = 0,
48    BT_UNCHOKE         = 1,
49    BT_INTERESTED      = 2,
50    BT_NOT_INTERESTED  = 3,
51    BT_HAVE            = 4,
52    BT_BITFIELD        = 5,
53    BT_REQUEST         = 6,
54    BT_PIECE           = 7,
55    BT_CANCEL          = 8,
56    BT_PORT            = 9,
57    BT_LTEP            = 20,
58
59    LTEP_HANDSHAKE     = 0,
60
61    OUR_LTEP_PEX       = 1
62};
63
64enum
65{
66    AWAITING_BT_LENGTH,
67    AWAITING_BT_MESSAGE,
68    READING_BT_PIECE
69};
70
71struct peer_request
72{
73    uint32_t index;
74    uint32_t offset;
75    uint32_t length;
76    time_t time_requested;
77};
78
79static int
80peer_request_compare( const void * va, const void * vb )
81{
82    struct peer_request * a = (struct peer_request*) va;
83    struct peer_request * b = (struct peer_request*) vb;
84    if( a->index != b->index ) return a->index - b->index;
85    if( a->offset != b->offset ) return a->offset - b->offset;
86    if( a->length != b->length ) return a->length - b->length;
87    return 0;
88}
89
90struct tr_peermsgs
91{
92    tr_peer * info;
93
94    tr_handle * handle;
95    tr_torrent * torrent;
96    tr_peerIo * io;
97
98    tr_publisher_t * publisher;
99
100    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
101    struct evbuffer * outBlock;    /* the block we're currently sending */
102    struct evbuffer * inBlock;     /* the block we're currently receiving */
103    tr_list * peerAskedFor;
104    tr_list * clientAskedFor;
105
106    tr_timer * pulseTimer;
107    tr_timer * pexTimer;
108
109    struct peer_request blockToUs; /* the block currntly being sent to us */
110
111    time_t lastReqAddedAt;
112    time_t gotKeepAliveTime;
113    time_t clientSentPexAt;
114
115    unsigned int notListening          : 1;
116    unsigned int peerSupportsPex       : 1;
117    unsigned int hasSentLtepHandshake  : 1;
118
119    uint8_t state;
120
121    uint8_t ut_pex_id;
122    uint16_t listeningPort;
123
124    uint16_t pexCount;
125
126    uint32_t incomingMessageLength;
127
128    tr_pex * pex;
129};
130
131/**
132***
133**/
134
135static void
136myDebug( const char * file, int line, const struct tr_peermsgs * msgs, const char * fmt, ... )
137{
138    va_list args;
139    const char * addr = tr_peerIoGetAddrStr( msgs->io );
140    struct evbuffer * buf = evbuffer_new( );
141    evbuffer_add_printf( buf, "[%s:%d] %s (%p) ", file, line, addr, msgs );
142    va_start( args, fmt );
143    evbuffer_add_vprintf( buf, fmt, args );
144    va_end( args );
145
146    fprintf( stderr, "%s\n", EVBUFFER_DATA(buf) );
147
148    evbuffer_free( buf );
149}
150
151#define dbgmsg(handshake, fmt...) myDebug(__FILE__, __LINE__, handshake, ##fmt )
152
153/**
154***  EVENTS
155**/
156
157static const tr_peermsgs_event blankEvent = { 0, 0, 0, 0 };
158
159static void
160publishEvent( tr_peermsgs * peer, int eventType )
161{
162    tr_peermsgs_event e = blankEvent;
163    e.eventType = eventType;
164    tr_publisherPublish( peer->publisher, peer, &e );
165}
166
167static void
168fireNeedReq( tr_peermsgs * msgs )
169{
170    tr_peermsgs_event e = blankEvent;
171    e.eventType = TR_PEERMSG_NEED_REQ;
172    tr_publisherPublish( msgs->publisher, msgs, &e );
173}
174
175static void
176fireClientHave( tr_peermsgs * msgs, uint32_t pieceIndex )
177{
178    tr_peermsgs_event e = blankEvent;
179    e.eventType = TR_PEERMSG_CLIENT_HAVE;
180    e.pieceIndex = pieceIndex;
181    tr_publisherPublish( msgs->publisher, msgs, &e );
182}
183
184static void
185fireGotBlock( tr_peermsgs * peer, uint32_t pieceIndex, uint32_t offset, uint32_t length )
186{
187    tr_peermsgs_event e = blankEvent;
188    e.eventType = TR_PEERMSG_CLIENT_BLOCK;
189    e.pieceIndex = pieceIndex;
190    e.offset = offset;
191    e.length = length;
192    tr_publisherPublish( peer->publisher, peer, &e );
193}
194
195static void
196fireGotError( tr_peermsgs * peer )
197{
198    publishEvent( peer, TR_PEERMSG_GOT_ERROR );
199}
200
201/**
202***  INTEREST
203**/
204
205static int
206isPieceInteresting( const tr_peermsgs   * peer,
207                    int                   piece )
208{
209    const tr_torrent * torrent = peer->torrent;
210    if( torrent->info.pieces[piece].dnd ) /* we don't want it */
211        return FALSE;
212    if( tr_cpPieceIsComplete( torrent->completion, piece ) ) /* we already have it */
213        return FALSE;
214    if( !tr_bitfieldHas( peer->info->have, piece ) ) /* peer doesn't have it */
215        return FALSE;
216    if( tr_bitfieldHas( peer->info->banned, piece ) ) /* peer is banned for it */
217        return FALSE;
218    return TRUE;
219}
220
221static int
222isPeerInteresting( const tr_peermsgs * msgs )
223{
224    const int clientIsSeed = tr_cpGetStatus( msgs->torrent->completion ) != TR_CP_INCOMPLETE;
225    const int peerIsSeed = msgs->info->progress >= 1.0;
226
227    if( peerIsSeed )
228    {
229        return !clientIsSeed;
230    }
231    else if( clientIsSeed )
232    {
233        return !peerIsSeed;
234    }
235    else /* we're both leeches... */
236    {
237        int i;
238        const tr_torrent * torrent = msgs->torrent;
239        const tr_bitfield * bitfield = tr_cpPieceBitfield( torrent->completion );
240
241        if( !msgs->info->have ) /* We don't know what this peer has... what should this be? */
242            return TRUE;
243
244        assert( bitfield->len == msgs->info->have->len );
245        for( i=0; i<torrent->info.pieceCount; ++i )
246            if( isPieceInteresting( msgs, i ) )
247                return TRUE;
248
249        return FALSE;
250    }
251}
252
253static void
254sendInterest( tr_peermsgs * msgs, int weAreInterested )
255{
256    const uint32_t len = sizeof(uint8_t);
257    const uint8_t bt_msgid = weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED;
258
259    assert( msgs != NULL );
260    assert( weAreInterested==0 || weAreInterested==1 );
261
262    msgs->info->clientIsInterested = weAreInterested;
263    dbgmsg( msgs, ": sending an %s message", (weAreInterested ? "INTERESTED" : "NOT_INTERESTED") );
264    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
265    tr_peerIoWriteBytes( msgs->io, msgs->outMessages, &bt_msgid, 1 );
266}
267
268static void
269updateInterest( tr_peermsgs * msgs )
270{
271    const int i = isPeerInteresting( msgs );
272    if( i != msgs->info->clientIsInterested )
273        sendInterest( msgs, i );
274    if( i )
275        fireNeedReq( msgs );
276}
277
278void
279tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
280{
281    assert( msgs != NULL );
282    assert( msgs->info != NULL );
283    assert( choke==0 || choke==1 );
284
285    if( msgs->info->peerIsChoked != choke )
286    {
287        const uint32_t len = sizeof(uint8_t);
288        const uint8_t bt_msgid = choke ? BT_CHOKE : BT_UNCHOKE;
289
290        msgs->info->peerIsChoked = choke ? 1 : 0;
291        if( msgs->info )
292        {
293            tr_list_foreach( msgs->peerAskedFor, tr_free );
294            tr_list_free( &msgs->peerAskedFor );
295        }
296
297        dbgmsg( msgs, "sending a %s message", (choke ? "CHOKE" : "UNCHOKE") );
298        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
299        tr_peerIoWriteBytes( msgs->io, msgs->outMessages, &bt_msgid, 1 );
300    }
301}
302
303/**
304***
305**/
306
307void
308tr_peerMsgsCancel( tr_peermsgs * msgs,
309                   uint32_t      pieceIndex,
310                   uint32_t      offset,
311                   uint32_t      length )
312{
313    tr_list * node;
314    struct peer_request tmp;
315
316    assert( msgs != NULL );
317    assert( length > 0 );
318
319    tmp.index = pieceIndex;
320    tmp.offset = offset;
321    tmp.length = length;
322
323    node = tr_list_find( msgs->clientAskedFor, &tmp, peer_request_compare );
324    if( node != NULL )
325    {
326        /* cancel the request */
327        const uint8_t bt_msgid = BT_CANCEL;
328        const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
329        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
330        tr_peerIoWriteBytes( msgs->io, msgs->outMessages, &bt_msgid, 1 );
331        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
332        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
333        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
334
335        /* remove it from our "requested" list */
336        tr_list_remove_data( &msgs->peerAskedFor, node->data );
337    }
338}
339
340/**
341***
342**/
343
344void
345tr_peerMsgsHave( tr_peermsgs * msgs,
346                 uint32_t      pieceIndex )
347{
348    const uint8_t bt_msgid = BT_HAVE;
349    const uint32_t len = sizeof(uint8_t) + sizeof(uint32_t);
350    dbgmsg( msgs, "w00t telling them we HAVE piece #%d", pieceIndex );
351    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
352    tr_peerIoWriteBytes( msgs->io, msgs->outMessages, &bt_msgid, 1 );
353    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
354
355    updateInterest( msgs );
356}
357
358/**
359***
360**/
361
362static int
363pulse( void * vmsgs );
364
365int
366tr_peerMsgsAddRequest( tr_peermsgs * msgs,
367                       uint32_t      index, 
368                       uint32_t      offset, 
369                       uint32_t      length )
370{
371    const uint8_t bt_msgid = BT_REQUEST;
372    const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
373    struct peer_request * req;
374    int maxSize;
375
376    assert( msgs != NULL );
377    assert( msgs->torrent != NULL );
378    assert( index < ((uint32_t)msgs->torrent->info.pieceCount) );
379    assert( offset < (uint32_t)tr_torPieceCountBytes( msgs->torrent, (int)index ) );
380    assert( (offset + length) <= (uint32_t)tr_torPieceCountBytes( msgs->torrent, (int)index ) );
381
382    if( msgs->info->clientIsChoked )
383        return TR_ADDREQ_CLIENT_CHOKED;
384
385    if( !tr_bitfieldHas( msgs->info->have, index ) )
386        return TR_ADDREQ_MISSING;
387
388    maxSize = MIN( 2 + (int)(tr_peerIoGetRateToClient(msgs->io)/10), 100 );
389    //if( ( time(NULL) - msgs->lastReqAddedAt <= 5 ) && ( tr_list_size( msgs->clientAskedFor) >= maxSize ) )
390    if( tr_list_size( msgs->clientAskedFor) >= maxSize )
391        return TR_ADDREQ_FULL;
392
393    dbgmsg( msgs, "w00t peer has a max request queue size of %d... adding request for piece %d, offset %d", maxSize, (int)index, (int)offset );
394
395    /* queue the request */
396    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
397    tr_peerIoWriteBytes( msgs->io, msgs->outMessages, &bt_msgid, 1 );
398    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, index );
399    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
400    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
401
402    /* add it to our `requests sent' list */
403    req = tr_new( struct peer_request, 1 );
404    req->index = index;
405    req->offset = offset;
406    req->length = length;
407    req->time_requested = msgs->lastReqAddedAt = time( NULL );
408    tr_list_append( &msgs->clientAskedFor, req );
409    pulse( msgs );
410
411    return TR_ADDREQ_OK;
412}
413
414/**
415***
416**/
417
418static void
419sendLtepHandshake( tr_peermsgs * msgs )
420{
421    benc_val_t val, *m;
422    char * buf;
423    int len;
424    const char * v = TR_NAME " " USERAGENT_PREFIX;
425    const int port = tr_getPublicPort( msgs->handle );
426    struct evbuffer * outbuf = evbuffer_new( );
427    uint32_t msglen;
428    const uint8_t tr_msgid = 20; /* ltep extension id */
429    const uint8_t ltep_msgid = 0; /* handshake id */
430
431    dbgmsg( msgs, "sending an ltep handshake" );
432    tr_bencInit( &val, TYPE_DICT );
433    tr_bencDictReserve( &val, 3 );
434    m  = tr_bencDictAdd( &val, "m" );
435    tr_bencInit( m, TYPE_DICT );
436    tr_bencDictReserve( m, 1 );
437    tr_bencInitInt( tr_bencDictAdd( m, "ut_pex" ), OUR_LTEP_PEX );
438    if( port > 0 )
439        tr_bencInitInt( tr_bencDictAdd( &val, "p" ), port );
440    tr_bencInitStr( tr_bencDictAdd( &val, "v" ), v, 0, 1 );
441    buf = tr_bencSaveMalloc( &val,  &len );
442    tr_bencPrint( &val );
443
444    msglen = sizeof(tr_msgid) + sizeof(ltep_msgid) + len;
445    tr_peerIoWriteUint32( msgs->io, outbuf, msglen );
446    tr_peerIoWriteBytes ( msgs->io, outbuf, &tr_msgid, 1 );
447    tr_peerIoWriteBytes ( msgs->io, outbuf, &ltep_msgid, 1 );
448    tr_peerIoWriteBytes ( msgs->io, outbuf, buf, len );
449
450    tr_peerIoWriteBuf( msgs->io, outbuf );
451
452    msgs->hasSentLtepHandshake = 1;
453
454    /* cleanup */
455    tr_bencFree( &val );
456    tr_free( buf );
457    evbuffer_free( outbuf );
458
459}
460
461static void
462parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
463{
464    benc_val_t val, * sub;
465    uint8_t * tmp = tr_new( uint8_t, len );
466    evbuffer_remove( inbuf, tmp, len );
467
468    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
469        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
470        tr_free( tmp );
471        return;
472    }
473
474    tr_bencPrint( &val );
475
476    /* check supported messages for utorrent pex */
477    sub = tr_bencDictFind( &val, "m" );
478    if( tr_bencIsDict( sub ) ) {
479        sub = tr_bencDictFind( sub, "ut_pex" );
480        if( tr_bencIsInt( sub ) ) {
481            msgs->peerSupportsPex = 1;
482            msgs->ut_pex_id = (uint8_t) sub->val.i;
483            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
484        }
485    }
486
487#if 0
488    /* get peer's client name */
489    sub = tr_bencDictFind( &val, "v" );
490    if( tr_bencIsStr( sub ) ) {
491int i;
492        tr_free( msgs->info->client );
493        fprintf( stderr, "dictionary says client is [%s]\n", sub->val.s.s );
494        msgs->info->client = tr_strndup( sub->val.s.s, sub->val.s.i );
495for( i=0; i<sub->val.s.i; ++i ) { fprintf( stderr, "[%c] (%d)\n", sub->val.s.s[i], (int)sub->val.s.s[i] );
496                                  if( (int)msgs->info->client[i]==-75 ) msgs->info->client[i]='u'; }
497        fprintf( stderr, "msgs->client is now [%s]\n", msgs->info->client );
498    }
499#endif
500
501    /* get peer's listening port */
502    sub = tr_bencDictFind( &val, "p" );
503    if( tr_bencIsInt( sub ) ) {
504        msgs->listeningPort = htons( (uint16_t)sub->val.i );
505        dbgmsg( msgs, "msgs->port is now %hu", msgs->listeningPort );
506    }
507
508    tr_bencFree( &val );
509    tr_free( tmp );
510}
511
512static void
513parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
514{
515    benc_val_t val, * sub;
516    uint8_t * tmp;
517
518    if( msgs->torrent->pexDisabled ) /* no sharing! */
519        return;
520
521    tmp = tr_new( uint8_t, msglen );
522    evbuffer_remove( inbuf, tmp, msglen );
523
524    if( tr_bencLoad( tmp, msglen, &val, NULL ) || !tr_bencIsDict( &val ) ) {
525        dbgmsg( msgs, "GET can't read extended-pex dictionary" );
526        tr_free( tmp );
527        return;
528    }
529
530    sub = tr_bencDictFind( &val, "added" );
531    if( tr_bencIsStr(sub) && ((sub->val.s.i % 6) == 0)) {
532        const int n = sub->val.s.i / 6 ;
533        dbgmsg( msgs, "got %d peers from uT pex", n );
534        tr_peerMgrAddPeers( msgs->handle->peerMgr,
535                            msgs->torrent->info.hash,
536                            TR_PEER_FROM_PEX,
537                            (uint8_t*)sub->val.s.s, n );
538    }
539
540    tr_bencFree( &val );
541    tr_free( tmp );
542}
543
544static void
545sendPex( tr_peermsgs * msgs );
546
547static void
548parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
549{
550    uint8_t ltep_msgid;
551
552    tr_peerIoReadBytes( msgs->io, inbuf, &ltep_msgid, 1 );
553    msglen--;
554
555    if( ltep_msgid == LTEP_HANDSHAKE )
556    {
557        dbgmsg( msgs, "got ltep handshake" );
558        parseLtepHandshake( msgs, msglen, inbuf );
559        if( !msgs->hasSentLtepHandshake )
560            sendLtepHandshake( msgs );
561    }
562    else if( ltep_msgid == msgs->ut_pex_id )
563    {
564        dbgmsg( msgs, "got ut pex" );
565        msgs->peerSupportsPex = 1;
566        parseUtPex( msgs, msglen, inbuf );
567    }
568    else
569    {
570        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
571        evbuffer_drain( inbuf, msglen );
572    }
573}
574
575static int
576readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf )
577{
578    uint32_t len;
579    const size_t needlen = sizeof(uint32_t);
580
581    if( EVBUFFER_LENGTH(inbuf) < needlen )
582        return READ_MORE;
583
584    tr_peerIoReadUint32( msgs->io, inbuf, &len );
585
586    if( len == 0 ) { /* peer sent us a keepalive message */
587        dbgmsg( msgs, "peer sent us a keepalive message..." );
588        msgs->gotKeepAliveTime = time( NULL );
589    } else {
590        dbgmsg( msgs, "peer is sending us a message with %"PRIu64" bytes...\n", (uint64_t)len );
591        msgs->incomingMessageLength = len;
592        msgs->state = AWAITING_BT_MESSAGE;
593    } return READ_AGAIN;
594}
595
596static int
597readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf )
598{
599    uint8_t id;
600    uint32_t ui32;
601    uint32_t msglen = msgs->incomingMessageLength;
602
603    if( EVBUFFER_LENGTH(inbuf) < msglen )
604        return READ_MORE;
605
606    tr_peerIoReadBytes( msgs->io, inbuf, &id, 1 );
607    msglen--;
608    dbgmsg( msgs, "peer sent us a message... "
609                  "bt id number is %d, and remaining len is %d", (int)id, (int)msglen );
610
611    switch( id )
612    {
613        case BT_CHOKE:
614            assert( msglen == 0 );
615            dbgmsg( msgs, "w00t peer sent us a BT_CHOKE" );
616            msgs->info->clientIsChoked = 1;
617            tr_list_foreach( msgs->peerAskedFor, tr_free );
618            tr_list_free( &msgs->peerAskedFor );
619            tr_list_foreach( msgs->clientAskedFor, tr_free );
620            tr_list_free( &msgs->clientAskedFor );
621            break;
622
623        case BT_UNCHOKE:
624            assert( msglen == 0 );
625            dbgmsg( msgs, "w00t peer sent us a BT_UNCHOKE" );
626            msgs->info->clientIsChoked = 0;
627            fireNeedReq( msgs );
628            break;
629
630        case BT_INTERESTED:
631            assert( msglen == 0 );
632            dbgmsg( msgs, "w00t peer sent us a BT_INTERESTED" );
633            msgs->info->peerIsInterested = 1;
634            break;
635
636        case BT_NOT_INTERESTED:
637            assert( msglen == 0 );
638            dbgmsg( msgs, "w00t peer sent us a BT_NOT_INTERESTED" );
639            msgs->info->peerIsInterested = 0;
640            break;
641
642        case BT_HAVE:
643            assert( msglen == 4 );
644            dbgmsg( msgs, "w00t peer sent us a BT_HAVE" );
645            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
646            tr_bitfieldAdd( msgs->info->have, ui32 );
647            msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
648            dbgmsg( msgs, "after the HAVE message, peer progress is %f", msgs->info->progress );
649            updateInterest( msgs );
650            break;
651
652        case BT_BITFIELD:
653            assert( msglen == msgs->info->have->len );
654            dbgmsg( msgs, "w00t peer sent us a BT_BITFIELD" );
655            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
656            msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
657            dbgmsg( msgs, "after the HAVE message, peer progress is %f", msgs->info->progress );
658            updateInterest( msgs );
659            fireNeedReq( msgs );
660            /* FIXME: maybe unchoke */
661            break;
662
663        case BT_REQUEST: {
664            struct peer_request * req;
665            assert( msglen == 12 );
666            dbgmsg( msgs, "peer sent us a BT_REQUEST" );
667            req = tr_new( struct peer_request, 1 );
668            tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
669            tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
670            tr_peerIoReadUint32( msgs->io, inbuf, &req->length );
671            if( !msgs->info->peerIsChoked )
672                tr_list_append( &msgs->peerAskedFor, req );
673            break;
674        }
675
676        case BT_CANCEL: {
677            struct peer_request req;
678            tr_list * node;
679            assert( msglen == 12 );
680            dbgmsg( msgs, "peer sent us a BT_CANCEL" );
681            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
682            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
683            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
684            node = tr_list_find( msgs->peerAskedFor, &req, peer_request_compare );
685            if( node != NULL ) {
686                void * data = node->data;
687                tr_list_remove_data( &msgs->peerAskedFor, data );
688                tr_free( data );
689                dbgmsg( msgs, "found the req that peer is cancelling... cancelled." );
690            }
691            break;
692        }
693
694        case BT_PIECE: {
695            dbgmsg( msgs, "peer sent us a BT_PIECE" );
696            assert( msgs->blockToUs.length == 0 );
697            msgs->state = READING_BT_PIECE;
698            tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.index );
699            tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.offset );
700            msgs->blockToUs.length = msglen - 8;
701            assert( msgs->blockToUs.length > 0 );
702            assert( EVBUFFER_LENGTH(msgs->inBlock) == 0 );
703            //evbuffer_drain( msgs->inBlock, ~0 );
704            return READ_AGAIN;
705            break;
706        }
707
708        case BT_PORT: {
709            assert( msglen == 2 );
710            dbgmsg( msgs, "peer sent us a BT_PORT" );
711            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->listeningPort );
712            break;
713        }
714
715        case BT_LTEP:
716            dbgmsg( msgs, "peer sent us a BT_LTEP" );
717            parseLtep( msgs, msglen, inbuf );
718            break;
719
720        default:
721            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
722            tr_peerIoDrain( msgs->io, inbuf, msglen );
723            assert( 0 );
724    }
725
726    msgs->incomingMessageLength = -1;
727    msgs->state = AWAITING_BT_LENGTH;
728    return READ_AGAIN;
729}
730
731static void
732clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
733{
734    tr_torrent * tor = msgs->torrent;
735    tor->downloadedCur += byteCount;
736    tr_rcTransferred( tor->download, byteCount );
737    tr_rcTransferred( tor->handle->download, byteCount );
738}
739
740static void
741peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
742{
743    tr_torrent * tor = msgs->torrent;
744    tor->uploadedCur += byteCount;
745    tr_rcTransferred( tor->upload, byteCount );
746    tr_rcTransferred( tor->handle->upload, byteCount );
747}
748
749static int
750canDownload( const tr_peermsgs * msgs UNUSED )
751{
752#if 0
753    tr_torrent * tor = msgs->torrent;
754
755    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
756        return !tor->handle->useDownloadLimit || tr_rcCanTransfer( tor->handle->download );
757
758    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
759        return tr_rcCanTransfer( tor->download );
760#endif
761
762    return TRUE;
763}
764
765static void
766reassignBytesToCorrupt( tr_peermsgs * msgs, uint32_t byteCount )
767{
768    tr_torrent * tor = msgs->torrent;
769
770    /* increment the `corrupt' field */
771    tor->corruptCur += byteCount;
772
773    /* decrement the `downloaded' field */
774    if( tor->downloadedCur >= byteCount )
775        tor->downloadedCur -= byteCount;
776    else
777        tor->downloadedCur = 0;
778}
779
780
781static void
782gotBadPiece( tr_peermsgs * msgs, uint32_t pieceIndex )
783{
784    const uint32_t byteCount = tr_torPieceCountBytes( msgs->torrent, (int)pieceIndex );
785    reassignBytesToCorrupt( msgs, byteCount );
786}
787
788static void
789gotUnwantedBlock( tr_peermsgs * msgs, uint32_t index UNUSED, uint32_t offset UNUSED, uint32_t length )
790{
791    reassignBytesToCorrupt( msgs, length );
792}
793
794static void
795addUsToBlamefield( tr_peermsgs * msgs, uint32_t index )
796{
797    if( !msgs->info->blame )
798         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
799    tr_bitfieldAdd( msgs->info->blame, index );
800}
801
802static void
803gotBlock( tr_peermsgs      * msgs,
804          struct evbuffer  * inbuf,
805          uint32_t           index,
806          uint32_t           offset,
807          uint32_t           length )
808{
809    tr_torrent * tor = msgs->torrent;
810    const int block = _tr_block( tor, index, offset );
811    struct peer_request key, *req;
812
813    /**
814    *** Remove the block from our `we asked for this' list
815    **/
816
817    key.index = index;
818    key.offset = offset;
819    key.length = length;
820    req = (struct peer_request*) tr_list_remove( &msgs->clientAskedFor, &key,
821                                                 peer_request_compare );
822    dbgmsg( msgs, "w00t peer sent us a block.  turnaround time was %d seconds", 
823                     (int)(time(NULL) - req->time_requested) );
824    if( req == NULL ) {
825        gotUnwantedBlock( msgs, index, offset, length );
826        dbgmsg( msgs, "we didn't ask for this message..." );
827        return;
828    }
829    tr_free( req );
830    dbgmsg( msgs, "peer has %d more blocks we've asked for",
831                  tr_list_size(msgs->clientAskedFor));
832
833    /**
834    *** Error checks
835    **/
836
837    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
838        dbgmsg( msgs, "have this block already..." );
839        tr_dbg( "have this block already..." );
840        gotUnwantedBlock( msgs, index, offset, length );
841        return;
842    }
843
844    if( (int)length != tr_torBlockCountBytes( tor, block ) ) {
845        dbgmsg( msgs, "block is the wrong length..." );
846        tr_dbg( "block is the wrong length..." );
847        gotUnwantedBlock( msgs, index, offset, length );
848        return;
849    }
850
851    /**
852    ***  Write the block
853    **/
854
855    if( tr_ioWrite( tor, index, offset, length, EVBUFFER_DATA( inbuf ))) {
856        return;
857    }
858
859    tr_cpBlockAdd( tor->completion, block );
860
861    addUsToBlamefield( msgs, index );
862
863    fireGotBlock( msgs, index, offset, length );
864    fireNeedReq( msgs );
865
866    /**
867    ***  Handle if this was the last block in the piece
868    **/
869
870    if( tr_cpPieceIsComplete( tor->completion, index ) )
871    {
872        if( !tr_ioHash( tor, index ) )
873        {
874            gotBadPiece( msgs, index );
875            return;
876        }
877
878        fireClientHave( msgs, index );
879    }
880}
881
882
883static ReadState
884readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf )
885{
886    uint32_t inlen;
887    uint8_t * tmp;
888
889    assert( msgs != NULL );
890    assert( msgs->blockToUs.length > 0 );
891    assert( inbuf != NULL );
892    assert( EVBUFFER_LENGTH( inbuf ) > 0 );
893
894    /* read from the inbuf into our block buffer */
895    inlen = MIN( EVBUFFER_LENGTH(inbuf), msgs->blockToUs.length );
896    tmp = tr_new( uint8_t, inlen );
897    tr_peerIoReadBytes( msgs->io, inbuf, tmp, inlen );
898    evbuffer_add( msgs->inBlock, tmp, inlen );
899
900    /* update our tables accordingly */
901    assert( inlen >= msgs->blockToUs.length );
902    msgs->blockToUs.length -= inlen;
903    msgs->info->peerSentDataAt = time( NULL );
904    clientGotBytes( msgs, inlen );
905
906    /* if this was the entire block, save it */
907    if( !msgs->blockToUs.length )
908    {
909        dbgmsg( msgs, "w00t -- got block index %u, offset %u", msgs->blockToUs.index, msgs->blockToUs.offset );
910        assert( (int)EVBUFFER_LENGTH( msgs->inBlock ) == tr_torBlockCountBytes( msgs->torrent, _tr_block(msgs->torrent,msgs->blockToUs.index, msgs->blockToUs.offset) ) );
911        gotBlock( msgs, msgs->inBlock,
912                        msgs->blockToUs.index,
913                        msgs->blockToUs.offset,
914                        EVBUFFER_LENGTH( msgs->inBlock ) );
915        evbuffer_drain( msgs->inBlock, ~0 );
916        msgs->state = AWAITING_BT_LENGTH;
917    }
918
919    /* cleanup */
920    tr_free( tmp );
921    return READ_AGAIN;
922}
923
924static ReadState
925canRead( struct bufferevent * evin, void * vpeer )
926{
927    ReadState ret;
928    tr_peermsgs * peer = (tr_peermsgs *) vpeer;
929    struct evbuffer * inbuf = EVBUFFER_INPUT ( evin );
930
931    switch( peer->state )
932    {
933        case AWAITING_BT_LENGTH:  ret = readBtLength  ( peer, inbuf ); break;
934        case AWAITING_BT_MESSAGE: ret = readBtMessage ( peer, inbuf ); break;
935        case READING_BT_PIECE:    ret = readBtPiece   ( peer, inbuf ); break;
936        default: assert( 0 );
937    }
938    return ret;
939}
940
941/**
942***
943**/
944
945static int
946canWrite( const tr_peermsgs * msgs )
947{
948    /* don't let our outbuffer get too large */
949    if( tr_peerIoWriteBytesWaiting( msgs->io ) > 4096 )
950        return FALSE;
951
952    return TRUE;
953}
954
955static int
956canUpload( const tr_peermsgs * msgs )
957{
958    const tr_torrent * tor = msgs->torrent;
959
960    if( !canWrite( msgs ) )
961        return FALSE;
962
963    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
964        return !tor->handle->useUploadLimit || tr_rcCanTransfer( tor->handle->upload );
965
966    if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
967        return tr_rcCanTransfer( tor->upload );
968
969    return TRUE;
970}
971
972static int
973pulse( void * vmsgs )
974{
975    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
976    size_t len;
977
978    /* if we froze out a downloaded block because of speed limits,
979       start listening to the peer again */
980#if 0
981    if( msgs->notListening )
982    {
983        fprintf( stderr, "msgs %p thawing out...\n", msgs );
984        msgs->notListening = 0;
985        tr_peerIoSetIOMode ( msgs->io, EV_READ, 0 );
986    }
987#endif
988
989    if( !canWrite( msgs ) )
990    {
991    }
992    else if(( len = EVBUFFER_LENGTH( msgs->outBlock ) ))
993    {
994        while ( len && canUpload( msgs ) )
995        {
996            const size_t outlen = MIN( len, 4096 );
997            tr_peerIoWrite( msgs->io, EVBUFFER_DATA(msgs->outBlock), outlen );
998            evbuffer_drain( msgs->outBlock, outlen );
999            peerGotBytes( msgs, outlen );
1000            len -= outlen;
1001            dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen, (int)len );
1002        }
1003    }
1004    else if(( len = EVBUFFER_LENGTH( msgs->outMessages ) ))
1005    {
1006        tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1007    }
1008    else if(( msgs->peerAskedFor ))
1009    {
1010        struct peer_request * req = tr_list_pop_front( &msgs->peerAskedFor );
1011        uint8_t * tmp = tr_new( uint8_t, req->length );
1012        const uint8_t msgid = BT_PIECE;
1013        const uint32_t msglen = sizeof(uint8_t) + sizeof(uint32_t)*2 + req->length;
1014        tr_ioRead( msgs->torrent, req->index, req->offset, req->length, tmp );
1015        tr_peerIoWriteUint32( msgs->io, msgs->outBlock, msglen );
1016        tr_peerIoWriteBytes ( msgs->io, msgs->outBlock, &msgid, 1 );
1017        tr_peerIoWriteUint32( msgs->io, msgs->outBlock, req->index );
1018        tr_peerIoWriteUint32( msgs->io, msgs->outBlock, req->offset );
1019        tr_peerIoWriteBytes ( msgs->io, msgs->outBlock, tmp, req->length );
1020        tr_free( tmp );
1021        dbgmsg( msgs, "putting req into out queue: index %d, offset %d, length %d ... %d blocks left in our queue", (int)req->index, (int)req->offset, (int)req->length, tr_list_size(msgs->peerAskedFor) );
1022        tr_free( req );
1023    }
1024
1025    return TRUE; /* loop forever */
1026}
1027
1028static void
1029didWrite( struct bufferevent * evin UNUSED, void * vpeer )
1030{
1031    pulse( (tr_peermsgs *) vpeer );
1032}
1033
1034static void
1035gotError( struct bufferevent * evbuf UNUSED, short what UNUSED, void * vpeer )
1036{
1037    fireGotError( (tr_peermsgs*)vpeer );
1038}
1039
1040static void
1041sendBitfield( tr_peermsgs * msgs )
1042{
1043    const tr_bitfield * bitfield = tr_cpPieceBitfield( msgs->torrent->completion );
1044    const uint32_t len = sizeof(uint8_t) + bitfield->len;
1045    const uint8_t bt_msgid = BT_BITFIELD;
1046
1047    dbgmsg( msgs, "sending peer a bitfield message\n", msgs );
1048    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
1049    tr_peerIoWriteBytes( msgs->io, msgs->outMessages, &bt_msgid, 1 );
1050    tr_peerIoWriteBytes( msgs->io, msgs->outMessages, bitfield->bits, bitfield->len );
1051}
1052
1053/**
1054***
1055**/
1056
1057#define MAX_DIFFS 50
1058
1059typedef struct
1060{
1061    tr_pex * added;
1062    tr_pex * dropped;
1063    tr_pex * elements;
1064    int addedCount;
1065    int droppedCount;
1066    int elementCount;
1067    int diffCount;
1068}
1069PexDiffs;
1070
1071static void
1072pexAddedCb( void * vpex, void * userData )
1073{
1074    PexDiffs * diffs = (PexDiffs *) userData;
1075    tr_pex * pex = (tr_pex *) vpex;
1076    if( diffs->diffCount < MAX_DIFFS )
1077    {
1078        diffs->diffCount++;
1079        diffs->added[diffs->addedCount++] = *pex;
1080        diffs->elements[diffs->elementCount++] = *pex;
1081    }
1082}
1083
1084static void
1085pexRemovedCb( void * vpex, void * userData )
1086{
1087    PexDiffs * diffs = (PexDiffs *) userData;
1088    tr_pex * pex = (tr_pex *) vpex;
1089    if( diffs->diffCount < MAX_DIFFS )
1090    {
1091        diffs->diffCount++;
1092        diffs->dropped[diffs->droppedCount++] = *pex;
1093    }
1094}
1095
1096static void
1097pexElementCb( void * vpex, void * userData )
1098{
1099    PexDiffs * diffs = (PexDiffs *) userData;
1100    tr_pex * pex = (tr_pex *) vpex;
1101    if( diffs->diffCount < MAX_DIFFS )
1102    {
1103        diffs->diffCount++;
1104        diffs->elements[diffs->elementCount++] = *pex;
1105    }
1106}
1107
1108static void
1109sendPex( tr_peermsgs * msgs )
1110{
1111    if( msgs->peerSupportsPex && !msgs->torrent->pexDisabled )
1112    {
1113        int i;
1114        tr_pex * newPex = NULL;
1115        const int newCount = tr_peerMgrGetPeers( msgs->handle->peerMgr, msgs->torrent->info.hash, &newPex );
1116        PexDiffs diffs;
1117        benc_val_t val, *added, *dropped, *flags;
1118        uint8_t *tmp, *walk;
1119        char * benc;
1120        int bencLen;
1121        const uint8_t bt_msgid = BT_LTEP;
1122        const uint8_t ltep_msgid = OUR_LTEP_PEX;
1123
1124        /* build the diffs */
1125        diffs.added = tr_new( tr_pex, newCount );
1126        diffs.addedCount = 0;
1127        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1128        diffs.droppedCount = 0;
1129        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1130        diffs.elementCount = 0;
1131        diffs.diffCount = 0;
1132        tr_set_compare( msgs->pex, msgs->pexCount,
1133                        newPex, newCount,
1134                        tr_pexCompare, sizeof(tr_pex),
1135                        pexRemovedCb, pexAddedCb, pexElementCb, &diffs );
1136        dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1137
1138        /* update peer */
1139        tr_free( msgs->pex );
1140        msgs->pex = diffs.elements;
1141        msgs->pexCount = diffs.elementCount;
1142
1143        /* build the pex payload */
1144        tr_bencInit( &val, TYPE_DICT );
1145        tr_bencDictReserve( &val, 3 );
1146
1147        /* "added" */
1148        added = tr_bencDictAdd( &val, "added" );
1149        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1150        for( i=0; i<diffs.addedCount; ++i ) {
1151            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1152            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1153        }
1154        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1155        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
1156
1157        /* "added.f" */
1158        flags = tr_bencDictAdd( &val, "added.f" );
1159        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1160        for( i=0; i<diffs.addedCount; ++i ) {
1161            dbgmsg( msgs, "PEX -->> -->> flag is %d", (int)diffs.added[i].flags );
1162            *walk++ = diffs.added[i].flags;
1163        }
1164        assert( ( walk - tmp ) == diffs.addedCount );
1165        tr_bencInitStr( flags, tmp, walk-tmp, FALSE );
1166
1167        /* "dropped" */
1168        dropped = tr_bencDictAdd( &val, "dropped" );
1169        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1170        for( i=0; i<diffs.droppedCount; ++i ) {
1171            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1172            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1173        }
1174        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1175        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
1176
1177        /* write the pex message */
1178        benc = tr_bencSaveMalloc( &val, &bencLen );
1179        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 1 + 1 + bencLen );
1180        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, &bt_msgid, 1 );
1181        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, &ltep_msgid, 1 );
1182        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, benc, bencLen );
1183
1184        /* cleanup */
1185        tr_free( benc );
1186        tr_bencFree( &val );
1187        tr_free( diffs.added );
1188        tr_free( diffs.dropped );
1189        tr_free( newPex );
1190
1191        msgs->clientSentPexAt = time( NULL );
1192    }
1193}
1194
1195static int
1196pexPulse( void * vpeer )
1197{
1198    sendPex( vpeer );
1199    return TRUE;
1200}
1201
1202/**
1203***
1204**/
1205
1206tr_peermsgs*
1207tr_peerMsgsNew( struct tr_torrent * torrent, struct tr_peer * info )
1208{
1209    tr_peermsgs * msgs;
1210
1211    assert( info != NULL );
1212    assert( info->io != NULL );
1213
1214    msgs = tr_new0( tr_peermsgs, 1 );
1215    msgs->publisher = tr_publisherNew( );
1216    msgs->info = info;
1217    msgs->handle = torrent->handle;
1218    msgs->torrent = torrent;
1219    msgs->io = info->io;
1220    msgs->info->clientIsChoked = 1;
1221    msgs->info->peerIsChoked = 1;
1222    msgs->info->clientIsInterested = 0;
1223    msgs->info->peerIsInterested = 0;
1224    msgs->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1225    msgs->pulseTimer = tr_timerNew( msgs->handle, pulse, msgs, PEER_PULSE_INTERVAL );
1226    msgs->pexTimer = tr_timerNew( msgs->handle, pexPulse, msgs, PEX_INTERVAL );
1227    msgs->outMessages = evbuffer_new( );
1228    msgs->outBlock = evbuffer_new( );
1229    msgs->inBlock = evbuffer_new( );
1230
1231    tr_peerIoSetIOFuncs( msgs->io, canRead, didWrite, gotError, msgs );
1232    tr_peerIoSetIOMode( msgs->io, EV_READ|EV_WRITE, 0 );
1233
1234    /**
1235    ***  If we initiated this connection,
1236    ***  we may need to send LTEP/AZMP handshakes.
1237    ***  Otherwise we'll wait for the peer to send theirs first.
1238    **/
1239    if( !tr_peerIoIsIncoming( msgs->io ) )
1240    {
1241        const int extensions = tr_peerIoGetExtension( msgs->io );
1242        switch( extensions )
1243        {
1244            case LT_EXTENSIONS_NONE:
1245                /* no-op */
1246                break;
1247
1248            case LT_EXTENSIONS_LTEP:
1249                sendLtepHandshake( msgs );
1250                break;
1251
1252            case LT_EXTENSIONS_AZMP:
1253                dbgmsg( msgs, "FIXME: need to send AZMP handshake" );
1254                break;
1255        }
1256    }
1257
1258    sendBitfield( msgs );
1259    return msgs;
1260}
1261
1262void
1263tr_peerMsgsFree( tr_peermsgs* p )
1264{
1265    if( p != NULL )
1266    {
1267        tr_timerFree( &p->pulseTimer );
1268        tr_timerFree( &p->pexTimer );
1269        tr_publisherFree( &p->publisher );
1270        tr_list_foreach( p->clientAskedFor, tr_free );
1271        tr_list_free( &p->clientAskedFor );
1272        tr_list_foreach( p->peerAskedFor, tr_free );
1273        tr_list_free( &p->peerAskedFor );
1274        evbuffer_free( p->outMessages );
1275        evbuffer_free( p->outBlock );
1276        evbuffer_free( p->inBlock );
1277        tr_free( p );
1278    }
1279}
1280
1281tr_publisher_tag
1282tr_peerMsgsSubscribe( tr_peermsgs       * peer,
1283                      tr_delivery_func    func,
1284                      void              * userData )
1285{
1286    return tr_publisherSubscribe( peer->publisher, func, userData );
1287}
1288
1289void
1290tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
1291                        tr_publisher_tag    tag )
1292{
1293    tr_publisherUnsubscribe( peer->publisher, tag );
1294}
Note: See TracBrowser for help on using the repository browser.