source: trunk/libtransmission/peer-msgs.c @ 3144

Last change on this file since 3144 was 3144, checked in by charles, 15 years ago

fix bug that tended to disconnect from valid peers when we were seeding. also, follow the BT spec's terminology a little closer.

  • Property svn:keywords set to Date Rev Author Id
File size: 38.4 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 3144 2007-09-23 02:19:59Z charles $
11 */
12
13#include <assert.h>
14#include <ctype.h>
15#include <stdio.h>
16#include <stdlib.h>
17#include <string.h>
18
19#include <arpa/inet.h>
20
21#include <sys/types.h> /* event.h needs this */
22#include <event.h>
23
24#include "transmission.h"
25#include "bencode.h"
26#include "completion.h"
27#include "inout.h"
28#include "list.h"
29#include "peer-io.h"
30#include "peer-mgr.h"
31#include "peer-mgr-private.h"
32#include "peer-msgs.h"
33#include "ratecontrol.h"
34#include "trevent.h"
35#include "utils.h"
36
37/**
38***
39**/
40
41#define PEX_INTERVAL (60 * 1000)
42
43#define PEER_PULSE_INTERVAL (100)
44
45enum
46{
47    BT_CHOKE           = 0,
48    BT_UNCHOKE         = 1,
49    BT_INTERESTED      = 2,
50    BT_NOT_INTERESTED  = 3,
51    BT_HAVE            = 4,
52    BT_BITFIELD        = 5,
53    BT_REQUEST         = 6,
54    BT_PIECE           = 7,
55    BT_CANCEL          = 8,
56    BT_PORT            = 9,
57    BT_SUGGEST         = 13,
58    BT_HAVE_ALL        = 14,
59    BT_HAVE_NONE       = 15,
60    BT_REJECT          = 16,
61    BT_ALLOWED_FAST    = 17,
62    BT_LTEP            = 20,
63
64    LTEP_HANDSHAKE     = 0,
65
66    OUR_LTEP_PEX       = 1
67};
68
69enum
70{
71    AWAITING_BT_LENGTH,
72    AWAITING_BT_MESSAGE,
73    READING_BT_PIECE
74};
75
76struct peer_request
77{
78    uint32_t index;
79    uint32_t offset;
80    uint32_t length;
81    time_t time_requested;
82};
83
84static int
85peer_request_compare( const void * va, const void * vb )
86{
87    struct peer_request * a = (struct peer_request*) va;
88    struct peer_request * b = (struct peer_request*) vb;
89    if( a->index != b->index ) return a->index - b->index;
90    if( a->offset != b->offset ) return a->offset - b->offset;
91    if( a->length != b->length ) return a->length - b->length;
92    return 0;
93}
94
95struct tr_peermsgs
96{
97    tr_peer * info;
98
99    tr_handle * handle;
100    tr_torrent * torrent;
101    tr_peerIo * io;
102
103    tr_publisher_t * publisher;
104
105    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
106    struct evbuffer * outBlock;    /* the block we're currently sending */
107    struct evbuffer * inBlock;     /* the block we're currently receiving */
108    tr_list * peerAskedFor;
109    tr_list * clientAskedFor;
110
111    tr_timer * pulseTimer;
112    tr_timer * pexTimer;
113
114    struct peer_request blockToUs; /* the block currntly being sent to us */
115
116    time_t lastReqAddedAt;
117    time_t clientSentPexAt;
118
119    unsigned int notListening          : 1;
120    unsigned int peerSupportsPex       : 1;
121    unsigned int hasSentLtepHandshake  : 1;
122
123    uint8_t state;
124
125    uint8_t ut_pex_id;
126    uint16_t listeningPort;
127
128    uint16_t pexCount;
129
130    uint32_t incomingMessageLength;
131
132    tr_pex * pex;
133};
134
135/**
136***
137**/
138
139static void
140myDebug( const char * file, int line, const struct tr_peermsgs * msgs, const char * fmt, ... )
141{
142    FILE * fp = tr_getLog( );
143    if( fp != NULL )
144    {
145        va_list args;
146        const char * addr = tr_peerIoGetAddrStr( msgs->io );
147        struct evbuffer * buf = evbuffer_new( );
148        evbuffer_add_printf( buf, "[%s:%d] %s (%p) ", file, line, addr, msgs );
149        va_start( args, fmt );
150        evbuffer_add_vprintf( buf, fmt, args );
151        va_end( args );
152        fprintf( fp, "%s\n", EVBUFFER_DATA(buf) );
153        evbuffer_free( buf );
154    }
155}
156
157#define dbgmsg(handshake, fmt...) myDebug(__FILE__, __LINE__, handshake, ##fmt )
158
159/**
160***  EVENTS
161**/
162
163static const tr_peermsgs_event blankEvent = { 0, 0, 0, 0, 0.0f };
164
165static void
166publish( tr_peermsgs * msgs, tr_peermsgs_event * e )
167{
168    tr_publisherPublish( msgs->publisher, msgs->info, e );
169}
170
171static void
172fireGotError( tr_peermsgs * msgs )
173{
174    tr_peermsgs_event e = blankEvent;
175    e.eventType = TR_PEERMSG_GOT_ERROR;
176    publish( msgs, &e );
177}
178
179static void
180fireNeedReq( tr_peermsgs * msgs )
181{
182    tr_peermsgs_event e = blankEvent;
183    e.eventType = TR_PEERMSG_NEED_REQ;
184    publish( msgs, &e );
185}
186
187static void
188firePeerProgress( tr_peermsgs * msgs )
189{
190    tr_peermsgs_event e = blankEvent;
191    e.eventType = TR_PEERMSG_PEER_PROGRESS;
192    e.progress = msgs->info->progress;
193    publish( msgs, &e );
194}
195
196static void
197fireClientHave( tr_peermsgs * msgs, uint32_t pieceIndex )
198{
199    tr_peermsgs_event e = blankEvent;
200    e.eventType = TR_PEERMSG_CLIENT_HAVE;
201    e.pieceIndex = pieceIndex;
202    publish( msgs, &e );
203}
204
205static void
206fireGotBlock( tr_peermsgs * msgs, uint32_t pieceIndex, uint32_t offset, uint32_t length )
207{
208    tr_peermsgs_event e = blankEvent;
209    e.eventType = TR_PEERMSG_CLIENT_BLOCK;
210    e.pieceIndex = pieceIndex;
211    e.offset = offset;
212    e.length = length;
213    publish( msgs, &e );
214}
215
216/**
217***  INTEREST
218**/
219
220static int
221isPieceInteresting( const tr_peermsgs   * peer,
222                    int                   piece )
223{
224    const tr_torrent * torrent = peer->torrent;
225    if( torrent->info.pieces[piece].dnd ) /* we don't want it */
226        return FALSE;
227    if( tr_cpPieceIsComplete( torrent->completion, piece ) ) /* we already have it */
228        return FALSE;
229    if( !tr_bitfieldHas( peer->info->have, piece ) ) /* peer doesn't have it */
230        return FALSE;
231    if( tr_bitfieldHas( peer->info->banned, piece ) ) /* peer is banned for it */
232        return FALSE;
233    return TRUE;
234}
235
236static int
237isPeerInteresting( const tr_peermsgs * msgs )
238{
239    const int clientIsSeed = tr_cpGetStatus( msgs->torrent->completion ) != TR_CP_INCOMPLETE;
240    const int peerIsSeed = msgs->info->progress >= 1.0;
241
242    if( peerIsSeed )
243    {
244        return !clientIsSeed;
245    }
246    else if( clientIsSeed )
247    {
248        return !peerIsSeed;
249    }
250    else /* we're both leeches... */
251    {
252        int i;
253        const tr_torrent * torrent = msgs->torrent;
254        const tr_bitfield * bitfield = tr_cpPieceBitfield( torrent->completion );
255
256        if( !msgs->info->have ) /* We don't know what this peer has... what should this be? */
257            return TRUE;
258
259        assert( bitfield->len == msgs->info->have->len );
260        for( i=0; i<torrent->info.pieceCount; ++i )
261            if( isPieceInteresting( msgs, i ) )
262                return TRUE;
263
264        return FALSE;
265    }
266}
267
268static void
269sendInterest( tr_peermsgs * msgs, int weAreInterested )
270{
271    const uint32_t len = sizeof(uint8_t);
272    const uint8_t bt_msgid = weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED;
273
274    assert( msgs != NULL );
275    assert( weAreInterested==0 || weAreInterested==1 );
276
277    msgs->info->clientIsInterested = weAreInterested;
278    dbgmsg( msgs, ": sending an %s message", (weAreInterested ? "INTERESTED" : "NOT_INTERESTED") );
279    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
280    tr_peerIoWriteBytes( msgs->io, msgs->outMessages, &bt_msgid, 1 );
281}
282
283static void
284updateInterest( tr_peermsgs * msgs )
285{
286    const int i = isPeerInteresting( msgs );
287    if( i != msgs->info->clientIsInterested )
288        sendInterest( msgs, i );
289    if( i )
290        fireNeedReq( msgs );
291}
292
293void
294tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
295{
296    assert( msgs != NULL );
297    assert( msgs->info != NULL );
298    assert( choke==0 || choke==1 );
299
300    if( msgs->info->peerIsChoked != choke )
301    {
302        const uint32_t len = sizeof(uint8_t);
303        const uint8_t bt_msgid = choke ? BT_CHOKE : BT_UNCHOKE;
304
305        msgs->info->peerIsChoked = choke ? 1 : 0;
306        if( msgs->info )
307        {
308            tr_list_foreach( msgs->peerAskedFor, tr_free );
309            tr_list_free( &msgs->peerAskedFor );
310        }
311
312        dbgmsg( msgs, "sending a %s message", (choke ? "CHOKE" : "UNCHOKE") );
313        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
314        tr_peerIoWriteBytes( msgs->io, msgs->outMessages, &bt_msgid, 1 );
315    }
316}
317
318/**
319***
320**/
321
322void
323tr_peerMsgsCancel( tr_peermsgs * msgs,
324                   uint32_t      pieceIndex,
325                   uint32_t      offset,
326                   uint32_t      length )
327{
328    tr_list * node;
329    struct peer_request tmp;
330
331    assert( msgs != NULL );
332    assert( length > 0 );
333
334    tmp.index = pieceIndex;
335    tmp.offset = offset;
336    tmp.length = length;
337
338    node = tr_list_find( msgs->clientAskedFor, &tmp, peer_request_compare );
339    if( node != NULL )
340    {
341        /* cancel the request */
342        const uint8_t bt_msgid = BT_CANCEL;
343        const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
344        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
345        tr_peerIoWriteBytes( msgs->io, msgs->outMessages, &bt_msgid, 1 );
346        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
347        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
348        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
349
350        /* remove it from our "requested" list */
351        tr_list_remove_data( &msgs->peerAskedFor, node->data );
352    }
353}
354
355/**
356***
357**/
358
359void
360tr_peerMsgsHave( tr_peermsgs * msgs,
361                 uint32_t      pieceIndex )
362{
363    const uint8_t bt_msgid = BT_HAVE;
364    const uint32_t len = sizeof(uint8_t) + sizeof(uint32_t);
365    dbgmsg( msgs, "w00t telling them we HAVE piece #%d", pieceIndex );
366    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
367    tr_peerIoWriteBytes( msgs->io, msgs->outMessages, &bt_msgid, 1 );
368    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
369
370    updateInterest( msgs );
371}
372
373/**
374***
375**/
376
377static int
378pulse( void * vmsgs );
379
380int
381tr_peerMsgsAddRequest( tr_peermsgs * msgs,
382                       uint32_t      index, 
383                       uint32_t      offset, 
384                       uint32_t      length )
385{
386    const uint8_t bt_msgid = BT_REQUEST;
387    const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
388    struct peer_request * req;
389    int maxSize;
390
391    assert( msgs != NULL );
392    assert( msgs->torrent != NULL );
393    assert( index < ((uint32_t)msgs->torrent->info.pieceCount) );
394    assert( offset < (uint32_t)tr_torPieceCountBytes( msgs->torrent, (int)index ) );
395    assert( (offset + length) <= (uint32_t)tr_torPieceCountBytes( msgs->torrent, (int)index ) );
396
397    if( msgs->info->clientIsChoked )
398        return TR_ADDREQ_CLIENT_CHOKED;
399
400    if( !tr_bitfieldHas( msgs->info->have, index ) )
401        return TR_ADDREQ_MISSING;
402
403    maxSize = MIN( 2 + (int)(tr_peerIoGetRateToClient(msgs->io)/10), 100 );
404    //if( ( time(NULL) - msgs->lastReqAddedAt <= 5 ) && ( tr_list_size( msgs->clientAskedFor) >= maxSize ) )
405    if( tr_list_size( msgs->clientAskedFor) >= maxSize )
406        return TR_ADDREQ_FULL;
407
408    dbgmsg( msgs, "w00t peer has a max request queue size of %d... adding request for piece %d, offset %d", maxSize, (int)index, (int)offset );
409
410    /* queue the request */
411    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
412    tr_peerIoWriteBytes( msgs->io, msgs->outMessages, &bt_msgid, 1 );
413    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, index );
414    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
415    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
416
417    /* add it to our `requests sent' list */
418    req = tr_new( struct peer_request, 1 );
419    req->index = index;
420    req->offset = offset;
421    req->length = length;
422    req->time_requested = msgs->lastReqAddedAt = time( NULL );
423    tr_list_append( &msgs->clientAskedFor, req );
424    pulse( msgs );
425
426    return TR_ADDREQ_OK;
427}
428
429/**
430***
431**/
432
433static void
434sendLtepHandshake( tr_peermsgs * msgs )
435{
436    benc_val_t val, *m;
437    char * buf;
438    int len;
439    const char * v = TR_NAME " " USERAGENT_PREFIX;
440    const int port = tr_getPublicPort( msgs->handle );
441    struct evbuffer * outbuf = evbuffer_new( );
442    uint32_t msglen;
443    const uint8_t tr_msgid = 20; /* ltep extension id */
444    const uint8_t ltep_msgid = 0; /* handshake id */
445
446    dbgmsg( msgs, "sending an ltep handshake" );
447    tr_bencInit( &val, TYPE_DICT );
448    tr_bencDictReserve( &val, 3 );
449    m  = tr_bencDictAdd( &val, "m" );
450    tr_bencInit( m, TYPE_DICT );
451    tr_bencDictReserve( m, 1 );
452    tr_bencInitInt( tr_bencDictAdd( m, "ut_pex" ), OUR_LTEP_PEX );
453    if( port > 0 )
454        tr_bencInitInt( tr_bencDictAdd( &val, "p" ), port );
455    tr_bencInitStr( tr_bencDictAdd( &val, "v" ), v, 0, 1 );
456    buf = tr_bencSaveMalloc( &val,  &len );
457
458    msglen = sizeof(tr_msgid) + sizeof(ltep_msgid) + len;
459    tr_peerIoWriteUint32( msgs->io, outbuf, msglen );
460    tr_peerIoWriteBytes ( msgs->io, outbuf, &tr_msgid, 1 );
461    tr_peerIoWriteBytes ( msgs->io, outbuf, &ltep_msgid, 1 );
462    tr_peerIoWriteBytes ( msgs->io, outbuf, buf, len );
463
464    tr_peerIoWriteBuf( msgs->io, outbuf );
465
466    msgs->hasSentLtepHandshake = 1;
467
468    /* cleanup */
469    tr_bencFree( &val );
470    tr_free( buf );
471    evbuffer_free( outbuf );
472
473}
474
475static void
476parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
477{
478    benc_val_t val, * sub;
479    uint8_t * tmp = tr_new( uint8_t, len );
480
481    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
482
483    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
484        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
485        tr_free( tmp );
486        return;
487    }
488
489    /* check supported messages for utorrent pex */
490    sub = tr_bencDictFind( &val, "m" );
491    if( tr_bencIsDict( sub ) ) {
492        sub = tr_bencDictFind( sub, "ut_pex" );
493        if( tr_bencIsInt( sub ) ) {
494            msgs->peerSupportsPex = 1;
495            msgs->ut_pex_id = (uint8_t) sub->val.i;
496            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
497        }
498    }
499
500#if 0
501    /* get peer's client name */
502    sub = tr_bencDictFind( &val, "v" );
503    if( tr_bencIsStr( sub ) ) {
504        int i;
505        tr_free( msgs->info->client );
506        fprintf( stderr, "dictionary says client is [%s]\n", sub->val.s.s );
507        msgs->info->client = tr_strndup( sub->val.s.s, sub->val.s.i );
508for( i=0; i<sub->val.s.i; ++i ) { fprintf( stderr, "[%c] (%d)\n", sub->val.s.s[i], (int)sub->val.s.s[i] );
509                                  if( (int)msgs->info->client[i]==-75 ) msgs->info->client[i]='u'; }
510        fprintf( stderr, "msgs->client is now [%s]\n", msgs->info->client );
511    }
512#endif
513
514    /* get peer's listening port */
515    sub = tr_bencDictFind( &val, "p" );
516    if( tr_bencIsInt( sub ) ) {
517        msgs->listeningPort = htons( (uint16_t)sub->val.i );
518        dbgmsg( msgs, "msgs->port is now %hu", msgs->listeningPort );
519    }
520
521    tr_bencFree( &val );
522    tr_free( tmp );
523}
524
525static void
526parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
527{
528    benc_val_t val, * sub;
529    uint8_t * tmp;
530
531    if( msgs->torrent->pexDisabled ) /* no sharing! */
532        return;
533
534    tmp = tr_new( uint8_t, msglen );
535    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
536
537    if( tr_bencLoad( tmp, msglen, &val, NULL ) || !tr_bencIsDict( &val ) ) {
538        dbgmsg( msgs, "GET can't read extended-pex dictionary" );
539        tr_free( tmp );
540        return;
541    }
542
543    sub = tr_bencDictFind( &val, "added" );
544    if( tr_bencIsStr(sub) && ((sub->val.s.i % 6) == 0)) {
545        const int n = sub->val.s.i / 6 ;
546        dbgmsg( msgs, "got %d peers from uT pex", n );
547        tr_peerMgrAddPeers( msgs->handle->peerMgr,
548                            msgs->torrent->info.hash,
549                            TR_PEER_FROM_PEX,
550                            (uint8_t*)sub->val.s.s, n );
551    }
552
553    tr_bencFree( &val );
554    tr_free( tmp );
555}
556
557static void
558sendPex( tr_peermsgs * msgs );
559
560static void
561parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
562{
563    uint8_t ltep_msgid;
564
565    tr_peerIoReadBytes( msgs->io, inbuf, &ltep_msgid, 1 );
566    msglen--;
567
568    if( ltep_msgid == LTEP_HANDSHAKE )
569    {
570        dbgmsg( msgs, "got ltep handshake" );
571        parseLtepHandshake( msgs, msglen, inbuf );
572        if( !msgs->hasSentLtepHandshake )
573            sendLtepHandshake( msgs );
574    }
575    else if( ltep_msgid == msgs->ut_pex_id )
576    {
577        dbgmsg( msgs, "got ut pex" );
578        msgs->peerSupportsPex = 1;
579        parseUtPex( msgs, msglen, inbuf );
580    }
581    else
582    {
583        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
584        evbuffer_drain( inbuf, msglen );
585    }
586}
587
588static int
589readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf )
590{
591    uint32_t len;
592    const size_t needlen = sizeof(uint32_t);
593
594    if( EVBUFFER_LENGTH(inbuf) < needlen )
595        return READ_MORE;
596
597    tr_peerIoReadUint32( msgs->io, inbuf, &len );
598
599    if( len == 0 ) { /* peer sent us a keepalive message */
600        dbgmsg( msgs, "peer sent us a keepalive message..." );
601        msgs->info->peerSentKeepaliveAt = time( NULL );
602    } else {
603        dbgmsg( msgs, "peer is sending us a message with %"PRIu64" bytes...\n", (uint64_t)len );
604        msgs->incomingMessageLength = len;
605        msgs->state = AWAITING_BT_MESSAGE;
606    } return READ_AGAIN;
607}
608
609static int
610readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf )
611{
612    uint8_t id;
613    uint32_t ui32;
614    uint32_t msglen = msgs->incomingMessageLength;
615
616    if( EVBUFFER_LENGTH(inbuf) < msglen )
617        return READ_MORE;
618
619    tr_peerIoReadBytes( msgs->io, inbuf, &id, 1 );
620    msglen--;
621    dbgmsg( msgs, "peer sent us a message... "
622                  "bt id number is %d, and remaining len is %d", (int)id, (int)msglen );
623
624    switch( id )
625    {
626        case BT_CHOKE:
627            assert( msglen == 0 );
628            dbgmsg( msgs, "w00t peer sent us a BT_CHOKE" );
629            msgs->info->clientIsChoked = 1;
630            tr_list_foreach( msgs->peerAskedFor, tr_free );
631            tr_list_free( &msgs->peerAskedFor );
632            tr_list_foreach( msgs->clientAskedFor, tr_free );
633            tr_list_free( &msgs->clientAskedFor );
634            break;
635
636        case BT_UNCHOKE:
637            assert( msglen == 0 );
638            dbgmsg( msgs, "w00t peer sent us a BT_UNCHOKE" );
639            msgs->info->clientIsChoked = 0;
640            fireNeedReq( msgs );
641            break;
642
643        case BT_INTERESTED:
644            assert( msglen == 0 );
645            dbgmsg( msgs, "w00t peer sent us a BT_INTERESTED" );
646            msgs->info->peerIsInterested = 1;
647            break;
648
649        case BT_NOT_INTERESTED:
650            assert( msglen == 0 );
651            dbgmsg( msgs, "w00t peer sent us a BT_NOT_INTERESTED" );
652            msgs->info->peerIsInterested = 0;
653            break;
654
655        case BT_HAVE:
656            assert( msglen == 4 );
657            dbgmsg( msgs, "w00t peer sent us a BT_HAVE" );
658            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
659            tr_bitfieldAdd( msgs->info->have, ui32 );
660            msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
661            dbgmsg( msgs, "after the HAVE message, peer progress is %f", msgs->info->progress );
662            updateInterest( msgs );
663            firePeerProgress( msgs );
664            break;
665
666        case BT_BITFIELD:
667            assert( msglen == msgs->info->have->len );
668            dbgmsg( msgs, "w00t peer sent us a BT_BITFIELD" );
669            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
670            msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
671            dbgmsg( msgs, "after the HAVE message, peer progress is %f", msgs->info->progress );
672            updateInterest( msgs );
673            fireNeedReq( msgs );
674            firePeerProgress( msgs );
675            break;
676
677        case BT_REQUEST: {
678            struct peer_request * req;
679            assert( msglen == 12 );
680            dbgmsg( msgs, "peer sent us a BT_REQUEST" );
681            req = tr_new( struct peer_request, 1 );
682            tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
683            tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
684            tr_peerIoReadUint32( msgs->io, inbuf, &req->length );
685            if( !msgs->info->peerIsChoked )
686                tr_list_append( &msgs->peerAskedFor, req );
687            break;
688        }
689
690        case BT_CANCEL: {
691            struct peer_request req;
692            tr_list * node;
693            assert( msglen == 12 );
694            dbgmsg( msgs, "peer sent us a BT_CANCEL" );
695            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
696            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
697            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
698            node = tr_list_find( msgs->peerAskedFor, &req, peer_request_compare );
699            if( node != NULL ) {
700                void * data = node->data;
701                tr_list_remove_data( &msgs->peerAskedFor, data );
702                tr_free( data );
703                dbgmsg( msgs, "found the req that peer is cancelling... cancelled." );
704            }
705            break;
706        }
707
708        case BT_PIECE: {
709            dbgmsg( msgs, "peer sent us a BT_PIECE" );
710            assert( msgs->blockToUs.length == 0 );
711            msgs->state = READING_BT_PIECE;
712            tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.index );
713            tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.offset );
714            msgs->blockToUs.length = msglen - 8;
715            assert( msgs->blockToUs.length > 0 );
716            assert( EVBUFFER_LENGTH(msgs->inBlock) == 0 );
717            //evbuffer_drain( msgs->inBlock, ~0 );
718            return READ_AGAIN;
719            break;
720        }
721
722        case BT_PORT: {
723            assert( msglen == 2 );
724            dbgmsg( msgs, "peer sent us a BT_PORT" );
725            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->listeningPort );
726            break;
727        }
728       
729        case BT_SUGGEST: {
730            /* tiennou TODO */
731            break;
732        }
733           
734        case BT_HAVE_ALL: {
735            /* tiennou TODO */
736            break;
737        }
738           
739        case BT_HAVE_NONE: {
740            /* tiennou TODO */
741            break;
742        }
743           
744        case BT_REJECT: {
745            /* tiennou TODO */
746            break;
747        }
748           
749        case BT_ALLOWED_FAST: {
750            /* tiennou TODO */
751            break;
752        }
753           
754        case BT_LTEP:
755            dbgmsg( msgs, "peer sent us a BT_LTEP" );
756            parseLtep( msgs, msglen, inbuf );
757            break;
758
759        default:
760            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
761            tr_peerIoDrain( msgs->io, inbuf, msglen );
762            assert( 0 );
763    }
764
765    msgs->incomingMessageLength = -1;
766    msgs->state = AWAITING_BT_LENGTH;
767    return READ_AGAIN;
768}
769
770static void
771clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
772{
773    tr_torrent * tor = msgs->torrent;
774    tor->downloadedCur += byteCount;
775    tr_rcTransferred( tor->download, byteCount );
776    tr_rcTransferred( tor->handle->download, byteCount );
777}
778
779static void
780peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
781{
782    tr_torrent * tor = msgs->torrent;
783    tor->uploadedCur += byteCount;
784    tr_rcTransferred( tor->upload, byteCount );
785    tr_rcTransferred( tor->handle->upload, byteCount );
786}
787
788static int
789canDownload( const tr_peermsgs * msgs )
790{
791    tr_torrent * tor = msgs->torrent;
792
793    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
794        return !tor->handle->useDownloadLimit || tr_rcCanTransfer( tor->handle->download );
795
796    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
797        return tr_rcCanTransfer( tor->download );
798
799    return TRUE;
800}
801
802static void
803reassignBytesToCorrupt( tr_peermsgs * msgs, uint32_t byteCount )
804{
805    tr_torrent * tor = msgs->torrent;
806
807    /* increment the `corrupt' field */
808    tor->corruptCur += byteCount;
809
810    /* decrement the `downloaded' field */
811    if( tor->downloadedCur >= byteCount )
812        tor->downloadedCur -= byteCount;
813    else
814        tor->downloadedCur = 0;
815}
816
817
818static void
819gotBadPiece( tr_peermsgs * msgs, uint32_t pieceIndex )
820{
821    const uint32_t byteCount = tr_torPieceCountBytes( msgs->torrent, (int)pieceIndex );
822    reassignBytesToCorrupt( msgs, byteCount );
823}
824
825static void
826gotUnwantedBlock( tr_peermsgs * msgs, uint32_t index UNUSED, uint32_t offset UNUSED, uint32_t length )
827{
828    reassignBytesToCorrupt( msgs, length );
829}
830
831static void
832addUsToBlamefield( tr_peermsgs * msgs, uint32_t index )
833{
834    if( !msgs->info->blame )
835         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
836    tr_bitfieldAdd( msgs->info->blame, index );
837}
838
839static void
840gotBlock( tr_peermsgs      * msgs,
841          struct evbuffer  * inbuf,
842          uint32_t           index,
843          uint32_t           offset,
844          uint32_t           length )
845{
846    tr_torrent * tor = msgs->torrent;
847    const int block = _tr_block( tor, index, offset );
848    struct peer_request key, *req;
849
850    /**
851    *** Remove the block from our `we asked for this' list
852    **/
853
854    key.index = index;
855    key.offset = offset;
856    key.length = length;
857    req = (struct peer_request*) tr_list_remove( &msgs->clientAskedFor, &key,
858                                                 peer_request_compare );
859    if( req == NULL ) {
860        gotUnwantedBlock( msgs, index, offset, length );
861        dbgmsg( msgs, "we didn't ask for this message..." );
862        return;
863    }
864    dbgmsg( msgs, "w00t peer sent us a block.  turnaround time was %d seconds", 
865                     (int)(time(NULL) - req->time_requested) );
866    tr_free( req );
867    dbgmsg( msgs, "peer has %d more blocks we've asked for",
868                  tr_list_size(msgs->clientAskedFor));
869
870    /**
871    *** Error checks
872    **/
873
874    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
875        dbgmsg( msgs, "have this block already..." );
876        tr_dbg( "have this block already..." );
877        gotUnwantedBlock( msgs, index, offset, length );
878        return;
879    }
880
881    if( (int)length != tr_torBlockCountBytes( tor, block ) ) {
882        dbgmsg( msgs, "block is the wrong length..." );
883        tr_dbg( "block is the wrong length..." );
884        gotUnwantedBlock( msgs, index, offset, length );
885        return;
886    }
887
888    /**
889    ***  Write the block
890    **/
891
892    if( tr_ioWrite( tor, index, offset, length, EVBUFFER_DATA( inbuf ))) {
893        return;
894    }
895
896    tr_cpBlockAdd( tor->completion, block );
897
898    addUsToBlamefield( msgs, index );
899
900    fireGotBlock( msgs, index, offset, length );
901    fireNeedReq( msgs );
902
903    /**
904    ***  Handle if this was the last block in the piece
905    **/
906
907    if( tr_cpPieceIsComplete( tor->completion, index ) )
908    {
909        if( tr_ioHash( tor, index ) )
910        {
911            gotBadPiece( msgs, index );
912            return;
913        }
914
915        fireClientHave( msgs, index );
916    }
917}
918
919
920static ReadState
921readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf )
922{
923    uint32_t inlen;
924    uint8_t * tmp;
925
926    assert( msgs != NULL );
927    assert( msgs->blockToUs.length > 0 );
928    assert( inbuf != NULL );
929    assert( EVBUFFER_LENGTH( inbuf ) > 0 );
930
931    /* read from the inbuf into our block buffer */
932    inlen = MIN( EVBUFFER_LENGTH(inbuf), msgs->blockToUs.length );
933    tmp = tr_new( uint8_t, inlen );
934    tr_peerIoReadBytes( msgs->io, inbuf, tmp, inlen );
935    evbuffer_add( msgs->inBlock, tmp, inlen );
936
937    /* update our tables accordingly */
938    assert( inlen >= msgs->blockToUs.length );
939    msgs->blockToUs.length -= inlen;
940    msgs->info->peerSentPieceDataAt = time( NULL );
941    clientGotBytes( msgs, inlen );
942
943    /* if this was the entire block, save it */
944    if( !msgs->blockToUs.length )
945    {
946        dbgmsg( msgs, "w00t -- got block index %u, offset %u", msgs->blockToUs.index, msgs->blockToUs.offset );
947        assert( (int)EVBUFFER_LENGTH( msgs->inBlock ) == tr_torBlockCountBytes( msgs->torrent, _tr_block(msgs->torrent,msgs->blockToUs.index, msgs->blockToUs.offset) ) );
948        gotBlock( msgs, msgs->inBlock,
949                        msgs->blockToUs.index,
950                        msgs->blockToUs.offset,
951                        EVBUFFER_LENGTH( msgs->inBlock ) );
952        evbuffer_drain( msgs->inBlock, ~0 );
953        msgs->state = AWAITING_BT_LENGTH;
954    }
955
956    /* cleanup */
957    tr_free( tmp );
958    return READ_AGAIN;
959}
960
961static ReadState
962canRead( struct bufferevent * evin, void * vmsgs )
963{
964    ReadState ret;
965    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
966    struct evbuffer * inbuf = EVBUFFER_INPUT ( evin );
967
968    if( !canDownload( msgs ) )
969    {
970        msgs->notListening = 1;
971        tr_peerIoSetIOMode ( msgs->io, 0, EV_READ );
972        ret = READ_DONE;
973    }
974    else switch( msgs->state )
975    {
976        case AWAITING_BT_LENGTH:  ret = readBtLength  ( msgs, inbuf ); break;
977        case AWAITING_BT_MESSAGE: ret = readBtMessage ( msgs, inbuf ); break;
978        case READING_BT_PIECE:    ret = readBtPiece   ( msgs, inbuf ); break;
979        default: assert( 0 );
980    }
981
982    return ret;
983}
984
985/**
986***
987**/
988
989static int
990canWrite( const tr_peermsgs * msgs )
991{
992    /* don't let our outbuffer get too large */
993    if( tr_peerIoWriteBytesWaiting( msgs->io ) > 1024 )
994        return FALSE;
995
996    return TRUE;
997}
998
999static int
1000canUpload( const tr_peermsgs * msgs )
1001{
1002    const tr_torrent * tor = msgs->torrent;
1003
1004    if( !canWrite( msgs ) )
1005        return FALSE;
1006
1007    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1008        return !tor->handle->useUploadLimit || tr_rcCanTransfer( tor->handle->upload );
1009
1010    if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
1011        return tr_rcCanTransfer( tor->upload );
1012
1013    return TRUE;
1014}
1015
1016static int
1017pulse( void * vmsgs )
1018{
1019    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1020    size_t len;
1021
1022    /* if we froze out a downloaded block because of speed limits,
1023       start listening to the peer again */
1024    if( msgs->notListening && canDownload( msgs ) )
1025    {
1026        fprintf( stderr, "msgs %p thawing out...\n", msgs );
1027        msgs->notListening = 0;
1028        tr_peerIoSetIOMode ( msgs->io, EV_READ, 0 );
1029    }
1030
1031    if( !canWrite( msgs ) )
1032    {
1033    }
1034    else if(( len = EVBUFFER_LENGTH( msgs->outBlock ) ))
1035    {
1036        while ( len && canUpload( msgs ) )
1037        {
1038            const size_t outlen = MIN( len, 1024 );
1039            tr_peerIoWrite( msgs->io, EVBUFFER_DATA(msgs->outBlock), outlen );
1040            evbuffer_drain( msgs->outBlock, outlen );
1041            peerGotBytes( msgs, outlen );
1042            len -= outlen;
1043            msgs->info->clientSentPieceDataAt = time( NULL );
1044            dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen, (int)len );
1045        }
1046    }
1047    else if(( len = EVBUFFER_LENGTH( msgs->outMessages ) ))
1048    {
1049        tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1050    }
1051    else if(( msgs->peerAskedFor ))
1052    {
1053        struct peer_request * req = tr_list_pop_front( &msgs->peerAskedFor );
1054        uint8_t * tmp = tr_new( uint8_t, req->length );
1055        const uint8_t msgid = BT_PIECE;
1056        const uint32_t msglen = sizeof(uint8_t) + sizeof(uint32_t)*2 + req->length;
1057        tr_ioRead( msgs->torrent, req->index, req->offset, req->length, tmp );
1058        tr_peerIoWriteUint32( msgs->io, msgs->outBlock, msglen );
1059        tr_peerIoWriteBytes ( msgs->io, msgs->outBlock, &msgid, 1 );
1060        tr_peerIoWriteUint32( msgs->io, msgs->outBlock, req->index );
1061        tr_peerIoWriteUint32( msgs->io, msgs->outBlock, req->offset );
1062        tr_peerIoWriteBytes ( msgs->io, msgs->outBlock, tmp, req->length );
1063        tr_free( tmp );
1064        dbgmsg( msgs, "putting req into out queue: index %d, offset %d, length %d ... %d blocks left in our queue", (int)req->index, (int)req->offset, (int)req->length, tr_list_size(msgs->peerAskedFor) );
1065        tr_free( req );
1066    }
1067
1068    return TRUE; /* loop forever */
1069}
1070
1071static void
1072didWrite( struct bufferevent * evin UNUSED, void * vpeer )
1073{
1074    pulse( (tr_peermsgs *) vpeer );
1075}
1076
1077static void
1078gotError( struct bufferevent * evbuf UNUSED, short what UNUSED, void * vpeer )
1079{
1080    fireGotError( vpeer );
1081}
1082
1083static void
1084sendBitfield( tr_peermsgs * msgs )
1085{
1086    const tr_bitfield * bitfield = tr_cpPieceBitfield( msgs->torrent->completion );
1087    const uint32_t len = sizeof(uint8_t) + bitfield->len;
1088    const uint8_t bt_msgid = BT_BITFIELD;
1089
1090    dbgmsg( msgs, "sending peer a bitfield message\n", msgs );
1091    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
1092    tr_peerIoWriteBytes( msgs->io, msgs->outMessages, &bt_msgid, 1 );
1093    tr_peerIoWriteBytes( msgs->io, msgs->outMessages, bitfield->bits, bitfield->len );
1094}
1095
1096/**
1097***
1098**/
1099
1100#define MAX_DIFFS 50
1101
1102typedef struct
1103{
1104    tr_pex * added;
1105    tr_pex * dropped;
1106    tr_pex * elements;
1107    int addedCount;
1108    int droppedCount;
1109    int elementCount;
1110    int diffCount;
1111}
1112PexDiffs;
1113
1114static void
1115pexAddedCb( void * vpex, void * userData )
1116{
1117    PexDiffs * diffs = (PexDiffs *) userData;
1118    tr_pex * pex = (tr_pex *) vpex;
1119    if( diffs->diffCount < MAX_DIFFS )
1120    {
1121        diffs->diffCount++;
1122        diffs->added[diffs->addedCount++] = *pex;
1123        diffs->elements[diffs->elementCount++] = *pex;
1124    }
1125}
1126
1127static void
1128pexRemovedCb( void * vpex, void * userData )
1129{
1130    PexDiffs * diffs = (PexDiffs *) userData;
1131    tr_pex * pex = (tr_pex *) vpex;
1132    if( diffs->diffCount < MAX_DIFFS )
1133    {
1134        diffs->diffCount++;
1135        diffs->dropped[diffs->droppedCount++] = *pex;
1136    }
1137}
1138
1139static void
1140pexElementCb( void * vpex, void * userData )
1141{
1142    PexDiffs * diffs = (PexDiffs *) userData;
1143    tr_pex * pex = (tr_pex *) vpex;
1144    if( diffs->diffCount < MAX_DIFFS )
1145    {
1146        diffs->diffCount++;
1147        diffs->elements[diffs->elementCount++] = *pex;
1148    }
1149}
1150
1151static void
1152sendPex( tr_peermsgs * msgs )
1153{
1154    if( msgs->peerSupportsPex && !msgs->torrent->pexDisabled )
1155    {
1156        int i;
1157        tr_pex * newPex = NULL;
1158        const int newCount = tr_peerMgrGetPeers( msgs->handle->peerMgr, msgs->torrent->info.hash, &newPex );
1159        PexDiffs diffs;
1160        benc_val_t val, *added, *dropped, *flags;
1161        uint8_t *tmp, *walk;
1162        char * benc;
1163        int bencLen;
1164        const uint8_t bt_msgid = BT_LTEP;
1165        const uint8_t ltep_msgid = OUR_LTEP_PEX;
1166
1167        /* build the diffs */
1168        diffs.added = tr_new( tr_pex, newCount );
1169        diffs.addedCount = 0;
1170        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1171        diffs.droppedCount = 0;
1172        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1173        diffs.elementCount = 0;
1174        diffs.diffCount = 0;
1175        tr_set_compare( msgs->pex, msgs->pexCount,
1176                        newPex, newCount,
1177                        tr_pexCompare, sizeof(tr_pex),
1178                        pexRemovedCb, pexAddedCb, pexElementCb, &diffs );
1179        dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1180
1181        /* update peer */
1182        tr_free( msgs->pex );
1183        msgs->pex = diffs.elements;
1184        msgs->pexCount = diffs.elementCount;
1185
1186        /* build the pex payload */
1187        tr_bencInit( &val, TYPE_DICT );
1188        tr_bencDictReserve( &val, 3 );
1189
1190        /* "added" */
1191        added = tr_bencDictAdd( &val, "added" );
1192        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1193        for( i=0; i<diffs.addedCount; ++i ) {
1194            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1195            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1196        }
1197        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1198        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
1199
1200        /* "added.f" */
1201        flags = tr_bencDictAdd( &val, "added.f" );
1202        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1203        for( i=0; i<diffs.addedCount; ++i ) {
1204            dbgmsg( msgs, "PEX -->> -->> flag is %d", (int)diffs.added[i].flags );
1205            *walk++ = diffs.added[i].flags;
1206        }
1207        assert( ( walk - tmp ) == diffs.addedCount );
1208        tr_bencInitStr( flags, tmp, walk-tmp, FALSE );
1209
1210        /* "dropped" */
1211        dropped = tr_bencDictAdd( &val, "dropped" );
1212        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1213        for( i=0; i<diffs.droppedCount; ++i ) {
1214            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1215            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1216        }
1217        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1218        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
1219
1220        /* write the pex message */
1221        benc = tr_bencSaveMalloc( &val, &bencLen );
1222        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 1 + 1 + bencLen );
1223        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, &bt_msgid, 1 );
1224        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, &ltep_msgid, 1 );
1225        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, benc, bencLen );
1226
1227        /* cleanup */
1228        tr_free( benc );
1229        tr_bencFree( &val );
1230        tr_free( diffs.added );
1231        tr_free( diffs.dropped );
1232        tr_free( newPex );
1233
1234        msgs->clientSentPexAt = time( NULL );
1235    }
1236}
1237
1238static int
1239pexPulse( void * vpeer )
1240{
1241    sendPex( vpeer );
1242    return TRUE;
1243}
1244
1245/**
1246***
1247**/
1248
1249tr_peermsgs*
1250tr_peerMsgsNew( struct tr_torrent * torrent, struct tr_peer * info )
1251{
1252    tr_peermsgs * msgs;
1253
1254    assert( info != NULL );
1255    assert( info->io != NULL );
1256
1257    msgs = tr_new0( tr_peermsgs, 1 );
1258    msgs->publisher = tr_publisherNew( );
1259    msgs->info = info;
1260    msgs->handle = torrent->handle;
1261    msgs->torrent = torrent;
1262    msgs->io = info->io;
1263    msgs->info->clientIsChoked = 1;
1264    msgs->info->peerIsChoked = 1;
1265    msgs->info->clientIsInterested = 0;
1266    msgs->info->peerIsInterested = 0;
1267    msgs->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1268    msgs->pulseTimer = tr_timerNew( msgs->handle, pulse, msgs, PEER_PULSE_INTERVAL );
1269    msgs->pexTimer = tr_timerNew( msgs->handle, pexPulse, msgs, PEX_INTERVAL );
1270    msgs->outMessages = evbuffer_new( );
1271    msgs->outBlock = evbuffer_new( );
1272    msgs->inBlock = evbuffer_new( );
1273
1274    tr_peerIoSetIOFuncs( msgs->io, canRead, didWrite, gotError, msgs );
1275    tr_peerIoSetIOMode( msgs->io, EV_READ|EV_WRITE, 0 );
1276
1277    /**
1278    ***  If we initiated this connection,
1279    ***  we may need to send LTEP/AZMP handshakes.
1280    ***  Otherwise we'll wait for the peer to send theirs first.
1281    **/
1282    if( !tr_peerIoIsIncoming( msgs->io ) )
1283    {
1284        if ( tr_peerIoSupportsLTEP( msgs->io ) ) {
1285            sendLtepHandshake( msgs );
1286           
1287        } else if ( tr_peerIoSupportsAZMP( msgs->io ) ) {
1288            dbgmsg( msgs, "FIXME: need to send AZMP handshake" );
1289           
1290        } else {
1291            /* no-op */
1292        }
1293    }
1294
1295    sendBitfield( msgs );
1296    return msgs;
1297}
1298
1299void
1300tr_peerMsgsFree( tr_peermsgs* p )
1301{
1302    if( p != NULL )
1303    {
1304        tr_timerFree( &p->pulseTimer );
1305        tr_timerFree( &p->pexTimer );
1306        tr_publisherFree( &p->publisher );
1307        tr_list_foreach( p->clientAskedFor, tr_free );
1308        tr_list_free( &p->clientAskedFor );
1309        tr_list_foreach( p->peerAskedFor, tr_free );
1310        tr_list_free( &p->peerAskedFor );
1311        evbuffer_free( p->outMessages );
1312        evbuffer_free( p->outBlock );
1313        evbuffer_free( p->inBlock );
1314        tr_free( p );
1315    }
1316}
1317
1318tr_publisher_tag
1319tr_peerMsgsSubscribe( tr_peermsgs       * peer,
1320                      tr_delivery_func    func,
1321                      void              * userData )
1322{
1323    return tr_publisherSubscribe( peer->publisher, func, userData );
1324}
1325
1326void
1327tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
1328                        tr_publisher_tag    tag )
1329{
1330    tr_publisherUnsubscribe( peer->publisher, tag );
1331}
Note: See TracBrowser for help on using the repository browser.