source: trunk/libtransmission/peer-msgs.c @ 3188

Last change on this file since 3188 was 3188, checked in by charles, 15 years ago

only send a peer keepalive messages when we're not sending them anything else

  • Property svn:keywords set to Date Rev Author Id
File size: 38.7 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 3188 2007-09-26 17:34:33Z charles $
11 */
12
13#include <assert.h>
14#include <ctype.h>
15#include <stdio.h>
16#include <stdlib.h>
17#include <string.h>
18
19#include <arpa/inet.h>
20
21#include <sys/types.h> /* event.h needs this */
22#include <event.h>
23
24#include "transmission.h"
25#include "bencode.h"
26#include "completion.h"
27#include "inout.h"
28#include "list.h"
29#include "peer-io.h"
30#include "peer-mgr.h"
31#include "peer-mgr-private.h"
32#include "peer-msgs.h"
33#include "ratecontrol.h"
34#include "trevent.h"
35#include "utils.h"
36
37/**
38***
39**/
40
41enum
42{
43    BT_CHOKE                = 0,
44    BT_UNCHOKE              = 1,
45    BT_INTERESTED           = 2,
46    BT_NOT_INTERESTED       = 3,
47    BT_HAVE                 = 4,
48    BT_BITFIELD             = 5,
49    BT_REQUEST              = 6,
50    BT_PIECE                = 7,
51    BT_CANCEL               = 8,
52    BT_PORT                 = 9,
53    BT_SUGGEST              = 13,
54    BT_HAVE_ALL             = 14,
55    BT_HAVE_NONE            = 15,
56    BT_REJECT               = 16,
57    BT_ALLOWED_FAST         = 17,
58    BT_LTEP                 = 20,
59
60    LTEP_HANDSHAKE          = 0,
61
62    OUR_LTEP_PEX            = 1,
63
64    MAX_REQUEST_BYTE_COUNT  = (16 * 1024), /* drop requests who want too much */
65
66    KEEPALIVE_INTERVAL_SECS = 90,          /* idle seconds before we send a keepalive */
67    PEX_INTERVAL            = (60 * 1000), /* msec between calls to sendPex() */
68    PEER_PULSE_INTERVAL     = (100),       /* msec between calls to pulse() */
69};
70
71enum
72{
73    AWAITING_BT_LENGTH,
74    AWAITING_BT_MESSAGE,
75    READING_BT_PIECE
76};
77
78struct peer_request
79{
80    uint32_t index;
81    uint32_t offset;
82    uint32_t length;
83    time_t time_requested;
84};
85
86static int
87peer_request_compare( const void * va, const void * vb )
88{
89    struct peer_request * a = (struct peer_request*) va;
90    struct peer_request * b = (struct peer_request*) vb;
91    if( a->index != b->index ) return a->index - b->index;
92    if( a->offset != b->offset ) return a->offset - b->offset;
93    if( a->length != b->length ) return a->length - b->length;
94    return 0;
95}
96
97struct tr_peermsgs
98{
99    tr_peer * info;
100
101    tr_handle * handle;
102    tr_torrent * torrent;
103    tr_peerIo * io;
104
105    tr_publisher_t * publisher;
106
107    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
108    struct evbuffer * outBlock;    /* the block we're currently sending */
109    struct evbuffer * inBlock;     /* the block we're currently receiving */
110    tr_list * peerAskedFor;
111    tr_list * clientAskedFor;
112
113    tr_timer * pulseTimer;
114    tr_timer * pexTimer;
115
116    struct peer_request blockToUs; /* the block currntly being sent to us */
117
118    time_t lastReqAddedAt;
119    time_t clientSentPexAt;
120    time_t clientSentAnythingAt;
121
122    unsigned int notListening          : 1;
123    unsigned int peerSupportsPex       : 1;
124    unsigned int hasSentLtepHandshake  : 1;
125
126    uint8_t state;
127
128    uint8_t ut_pex_id;
129
130    uint16_t pexCount;
131
132    uint32_t incomingMessageLength;
133
134    tr_pex * pex;
135};
136
137/**
138***
139**/
140
141static void
142myDebug( const char * file, int line, const struct tr_peermsgs * msgs, const char * fmt, ... )
143{
144    FILE * fp = tr_getLog( );
145    if( fp != NULL )
146    {
147        va_list args;
148        const char * addr = tr_peerIoGetAddrStr( msgs->io );
149        struct evbuffer * buf = evbuffer_new( );
150        evbuffer_add_printf( buf, "[%s:%d] %s (%p) ", file, line, addr, msgs );
151        va_start( args, fmt );
152        evbuffer_add_vprintf( buf, fmt, args );
153        va_end( args );
154        fprintf( fp, "%s\n", EVBUFFER_DATA(buf) );
155        evbuffer_free( buf );
156    }
157}
158
159#define dbgmsg(handshake, fmt...) myDebug(__FILE__, __LINE__, handshake, ##fmt )
160
161/**
162***  EVENTS
163**/
164
165static const tr_peermsgs_event blankEvent = { 0, 0, 0, 0, 0.0f };
166
167static void
168publish( tr_peermsgs * msgs, tr_peermsgs_event * e )
169{
170    tr_publisherPublish( msgs->publisher, msgs->info, e );
171}
172
173static void
174fireGotError( tr_peermsgs * msgs )
175{
176    tr_peermsgs_event e = blankEvent;
177    e.eventType = TR_PEERMSG_GOT_ERROR;
178    publish( msgs, &e );
179}
180
181static void
182fireNeedReq( tr_peermsgs * msgs )
183{
184    tr_peermsgs_event e = blankEvent;
185    e.eventType = TR_PEERMSG_NEED_REQ;
186    publish( msgs, &e );
187}
188
189static void
190firePeerProgress( tr_peermsgs * msgs )
191{
192    tr_peermsgs_event e = blankEvent;
193    e.eventType = TR_PEERMSG_PEER_PROGRESS;
194    e.progress = msgs->info->progress;
195    publish( msgs, &e );
196}
197
198static void
199fireClientHave( tr_peermsgs * msgs, uint32_t pieceIndex )
200{
201    tr_peermsgs_event e = blankEvent;
202    e.eventType = TR_PEERMSG_CLIENT_HAVE;
203    e.pieceIndex = pieceIndex;
204    publish( msgs, &e );
205}
206
207static void
208fireGotBlock( tr_peermsgs * msgs, uint32_t pieceIndex, uint32_t offset, uint32_t length )
209{
210    tr_peermsgs_event e = blankEvent;
211    e.eventType = TR_PEERMSG_CLIENT_BLOCK;
212    e.pieceIndex = pieceIndex;
213    e.offset = offset;
214    e.length = length;
215    publish( msgs, &e );
216}
217
218/**
219***  INTEREST
220**/
221
222static int
223isPieceInteresting( const tr_peermsgs   * peer,
224                    int                   piece )
225{
226    const tr_torrent * torrent = peer->torrent;
227    if( torrent->info.pieces[piece].dnd ) /* we don't want it */
228        return FALSE;
229    if( tr_cpPieceIsComplete( torrent->completion, piece ) ) /* we already have it */
230        return FALSE;
231    if( !tr_bitfieldHas( peer->info->have, piece ) ) /* peer doesn't have it */
232        return FALSE;
233    if( tr_bitfieldHas( peer->info->banned, piece ) ) /* peer is banned for it */
234        return FALSE;
235    return TRUE;
236}
237
238static int
239isPeerInteresting( const tr_peermsgs * msgs )
240{
241    const int clientIsSeed = tr_cpGetStatus( msgs->torrent->completion ) != TR_CP_INCOMPLETE;
242    const int peerIsSeed = msgs->info->progress >= 1.0;
243
244    if( peerIsSeed )
245    {
246        return !clientIsSeed;
247    }
248    else if( clientIsSeed )
249    {
250        return !peerIsSeed;
251    }
252    else /* we're both leeches... */
253    {
254        int i;
255        const tr_torrent * torrent = msgs->torrent;
256        const tr_bitfield * bitfield = tr_cpPieceBitfield( torrent->completion );
257
258        if( !msgs->info->have ) /* We don't know what this peer has... what should this be? */
259            return TRUE;
260
261        assert( bitfield->len == msgs->info->have->len );
262        for( i=0; i<torrent->info.pieceCount; ++i )
263            if( isPieceInteresting( msgs, i ) )
264                return TRUE;
265
266        return FALSE;
267    }
268}
269
270static void
271sendInterest( tr_peermsgs * msgs, int weAreInterested )
272{
273    assert( msgs != NULL );
274    assert( weAreInterested==0 || weAreInterested==1 );
275
276    msgs->info->clientIsInterested = weAreInterested;
277    dbgmsg( msgs, ": sending an %s message", (weAreInterested ? "INTERESTED" : "NOT_INTERESTED") );
278
279    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
280    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
281}
282
283static void
284updateInterest( tr_peermsgs * msgs )
285{
286    const int i = isPeerInteresting( msgs );
287    if( i != msgs->info->clientIsInterested )
288        sendInterest( msgs, i );
289    if( i )
290        fireNeedReq( msgs );
291}
292
293void
294tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
295{
296    assert( msgs != NULL );
297    assert( msgs->info != NULL );
298    assert( choke==0 || choke==1 );
299
300    if( msgs->info->peerIsChoked != choke )
301    {
302        msgs->info->peerIsChoked = choke ? 1 : 0;
303        if( msgs->info )
304        {
305            tr_list_foreach( msgs->peerAskedFor, tr_free );
306            tr_list_free( &msgs->peerAskedFor );
307        }
308
309        dbgmsg( msgs, "sending a %s message", (choke ? "CHOKE" : "UNCHOKE") );
310        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
311        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, choke ? BT_CHOKE : BT_UNCHOKE );
312    }
313}
314
315/**
316***
317**/
318
319void
320tr_peerMsgsCancel( tr_peermsgs * msgs,
321                   uint32_t      pieceIndex,
322                   uint32_t      offset,
323                   uint32_t      length )
324{
325    tr_list * node;
326    struct peer_request tmp;
327
328    assert( msgs != NULL );
329    assert( length > 0 );
330
331    tmp.index = pieceIndex;
332    tmp.offset = offset;
333    tmp.length = length;
334
335    node = tr_list_find( msgs->clientAskedFor, &tmp, peer_request_compare );
336    if( node != NULL )
337    {
338        /* cancel the request */
339        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + 3*sizeof(uint32_t) );
340        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_CANCEL );
341        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
342        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
343        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
344
345        /* remove it from our "requested" list */
346        tr_list_remove_data( &msgs->peerAskedFor, node->data );
347    }
348}
349
350/**
351***
352**/
353
354void
355tr_peerMsgsHave( tr_peermsgs * msgs,
356                 uint32_t      pieceIndex )
357{
358    dbgmsg( msgs, "w00t telling them we HAVE piece #%d", pieceIndex );
359
360    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
361    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_HAVE );
362    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
363
364    updateInterest( msgs );
365}
366
367/**
368***
369**/
370
371static int
372pulse( void * vmsgs );
373
374static int
375reqIsValid( const tr_peermsgs * msgs, uint32_t index, uint32_t offset, uint32_t length )
376{
377    const tr_torrent * tor = msgs->torrent;
378
379    if( index >= (uint32_t) tor->info.pieceCount )
380        return FALSE;
381    if ( (int)offset >= tr_torPieceCountBytes( tor, (int)index ) )
382        return FALSE;
383    if( length > MAX_REQUEST_BYTE_COUNT )
384        return FALSE;
385    if( tr_pieceOffset( tor, index, offset, length ) > tor->info.totalSize )
386        return FALSE;
387
388    return TRUE;
389}
390
391static int
392requestIsValid( const tr_peermsgs * msgs, struct peer_request * req )
393{
394    return reqIsValid( msgs, req->index, req->offset, req->length );
395}
396
397int
398tr_peerMsgsAddRequest( tr_peermsgs * msgs,
399                       uint32_t      index, 
400                       uint32_t      offset, 
401                       uint32_t      length )
402{
403    struct peer_request * req;
404    int maxSize;
405
406    assert( msgs != NULL );
407    assert( msgs->torrent != NULL );
408    assert( reqIsValid( msgs, index, offset, length ) );
409
410    if( msgs->info->clientIsChoked )
411        return TR_ADDREQ_CLIENT_CHOKED;
412
413    if( !tr_bitfieldHas( msgs->info->have, index ) )
414        return TR_ADDREQ_MISSING;
415
416    maxSize = MIN( 2 + (int)(tr_peerIoGetRateToClient(msgs->io)/10), 100 );
417    //if( ( time(NULL) - msgs->lastReqAddedAt <= 5 ) && ( tr_list_size( msgs->clientAskedFor) >= maxSize ) )
418    if( tr_list_size( msgs->clientAskedFor) >= maxSize )
419        return TR_ADDREQ_FULL;
420
421    dbgmsg( msgs, "w00t peer has a max request queue size of %d... adding request for piece %d, offset %d", maxSize, (int)index, (int)offset );
422
423    /* queue the request */
424    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + 3*sizeof(uint32_t) );
425    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_REQUEST );
426    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, index );
427    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
428    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
429
430    /* add it to our `requests sent' list */
431    req = tr_new( struct peer_request, 1 );
432    req->index = index;
433    req->offset = offset;
434    req->length = length;
435    req->time_requested = msgs->lastReqAddedAt = time( NULL );
436    tr_list_append( &msgs->clientAskedFor, req );
437    pulse( msgs );
438
439    return TR_ADDREQ_OK;
440}
441
442/**
443***
444**/
445
446static void
447sendLtepHandshake( tr_peermsgs * msgs )
448{
449    benc_val_t val, *m;
450    char * buf;
451    int len;
452    const char * v = TR_NAME " " USERAGENT_PREFIX;
453    const int port = tr_getPublicPort( msgs->handle );
454    struct evbuffer * outbuf = evbuffer_new( );
455
456    dbgmsg( msgs, "sending an ltep handshake" );
457    tr_bencInit( &val, TYPE_DICT );
458    tr_bencDictReserve( &val, 3 );
459    m  = tr_bencDictAdd( &val, "m" );
460    tr_bencInit( m, TYPE_DICT );
461    tr_bencDictReserve( m, 1 );
462    tr_bencInitInt( tr_bencDictAdd( m, "ut_pex" ), OUR_LTEP_PEX );
463    if( port > 0 )
464        tr_bencInitInt( tr_bencDictAdd( &val, "p" ), port );
465    tr_bencInitStr( tr_bencDictAdd( &val, "v" ), v, 0, 1 );
466    buf = tr_bencSaveMalloc( &val,  &len );
467
468    tr_peerIoWriteUint32( msgs->io, outbuf, 2*sizeof(uint8_t) + len );
469    tr_peerIoWriteUint8 ( msgs->io, outbuf, BT_LTEP );
470    tr_peerIoWriteUint8 ( msgs->io, outbuf, LTEP_HANDSHAKE );
471    tr_peerIoWriteBytes ( msgs->io, outbuf, buf, len );
472
473    tr_peerIoWriteBuf( msgs->io, outbuf );
474    msgs->hasSentLtepHandshake = 1;
475
476    /* cleanup */
477    tr_bencFree( &val );
478    tr_free( buf );
479    evbuffer_free( outbuf );
480
481}
482
483static void
484parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
485{
486    benc_val_t val, * sub;
487    uint8_t * tmp = tr_new( uint8_t, len );
488
489    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
490
491    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
492        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
493        tr_free( tmp );
494        return;
495    }
496
497    /* check supported messages for utorrent pex */
498    sub = tr_bencDictFind( &val, "m" );
499    if( tr_bencIsDict( sub ) ) {
500        sub = tr_bencDictFind( sub, "ut_pex" );
501        if( tr_bencIsInt( sub ) ) {
502            msgs->peerSupportsPex = 1;
503            msgs->ut_pex_id = (uint8_t) sub->val.i;
504            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
505        }
506    }
507
508#if 0
509    /* get peer's client name */
510    sub = tr_bencDictFind( &val, "v" );
511    if( tr_bencIsStr( sub ) ) {
512        int i;
513        tr_free( msgs->info->client );
514        fprintf( stderr, "dictionary says client is [%s]\n", sub->val.s.s );
515        msgs->info->client = tr_strndup( sub->val.s.s, sub->val.s.i );
516for( i=0; i<sub->val.s.i; ++i ) { fprintf( stderr, "[%c] (%d)\n", sub->val.s.s[i], (int)sub->val.s.s[i] );
517                                  if( (int)msgs->info->client[i]==-75 ) msgs->info->client[i]='u'; }
518        fprintf( stderr, "msgs->client is now [%s]\n", msgs->info->client );
519    }
520#endif
521
522    /* get peer's listening port */
523    sub = tr_bencDictFind( &val, "p" );
524    if( tr_bencIsInt( sub ) ) {
525        msgs->info->port = htons( (uint16_t)sub->val.i );
526        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
527    }
528
529    tr_bencFree( &val );
530    tr_free( tmp );
531}
532
533static void
534parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
535{
536    benc_val_t val, * sub;
537    uint8_t * tmp;
538
539    if( msgs->torrent->pexDisabled ) /* no sharing! */
540        return;
541
542    tmp = tr_new( uint8_t, msglen );
543    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
544
545    if( tr_bencLoad( tmp, msglen, &val, NULL ) || !tr_bencIsDict( &val ) ) {
546        dbgmsg( msgs, "GET can't read extended-pex dictionary" );
547        tr_free( tmp );
548        return;
549    }
550
551    sub = tr_bencDictFind( &val, "added" );
552    if( tr_bencIsStr(sub) && ((sub->val.s.i % 6) == 0)) {
553        const int n = sub->val.s.i / 6 ;
554        dbgmsg( msgs, "got %d peers from uT pex", n );
555        tr_peerMgrAddPeers( msgs->handle->peerMgr,
556                            msgs->torrent->info.hash,
557                            TR_PEER_FROM_PEX,
558                            (uint8_t*)sub->val.s.s, n );
559    }
560
561    tr_bencFree( &val );
562    tr_free( tmp );
563}
564
565static void
566sendPex( tr_peermsgs * msgs );
567
568static void
569parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
570{
571    uint8_t ltep_msgid;
572
573    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
574    msglen--;
575
576    if( ltep_msgid == LTEP_HANDSHAKE )
577    {
578        dbgmsg( msgs, "got ltep handshake" );
579        parseLtepHandshake( msgs, msglen, inbuf );
580        if( !msgs->hasSentLtepHandshake )
581            sendLtepHandshake( msgs );
582    }
583    else if( ltep_msgid == msgs->ut_pex_id )
584    {
585        dbgmsg( msgs, "got ut pex" );
586        msgs->peerSupportsPex = 1;
587        parseUtPex( msgs, msglen, inbuf );
588    }
589    else
590    {
591        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
592        evbuffer_drain( inbuf, msglen );
593    }
594}
595
596static int
597readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf )
598{
599    uint32_t len;
600    const size_t needlen = sizeof(uint32_t);
601
602    if( EVBUFFER_LENGTH(inbuf) < needlen )
603        return READ_MORE;
604
605    tr_peerIoReadUint32( msgs->io, inbuf, &len );
606
607    if( len == 0 ) /* peer sent us a keepalive message */
608        dbgmsg( msgs, "peer sent us a keepalive message..." );
609    else {
610        dbgmsg( msgs, "peer is sending us a message with %"PRIu64" bytes...\n", (uint64_t)len );
611        msgs->incomingMessageLength = len;
612        msgs->state = AWAITING_BT_MESSAGE;
613    }
614
615    return READ_AGAIN;
616}
617
618static int
619readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf )
620{
621    uint8_t id;
622    uint32_t ui32;
623    uint32_t msglen = msgs->incomingMessageLength;
624
625    if( EVBUFFER_LENGTH(inbuf) < msglen )
626        return READ_MORE;
627
628    tr_peerIoReadUint8( msgs->io, inbuf, &id );
629    msglen--;
630    dbgmsg( msgs, "peer sent us a message... "
631                  "bt id number is %d, and remaining len is %d", (int)id, (int)msglen );
632
633    switch( id )
634    {
635        case BT_CHOKE:
636            assert( msglen == 0 );
637            dbgmsg( msgs, "w00t peer sent us a BT_CHOKE" );
638            msgs->info->clientIsChoked = 1;
639            tr_list_foreach( msgs->peerAskedFor, tr_free );
640            tr_list_free( &msgs->peerAskedFor );
641            tr_list_foreach( msgs->clientAskedFor, tr_free );
642            tr_list_free( &msgs->clientAskedFor );
643            break;
644
645        case BT_UNCHOKE:
646            assert( msglen == 0 );
647            dbgmsg( msgs, "w00t peer sent us a BT_UNCHOKE" );
648            msgs->info->clientIsChoked = 0;
649            fireNeedReq( msgs );
650            break;
651
652        case BT_INTERESTED:
653            assert( msglen == 0 );
654            dbgmsg( msgs, "w00t peer sent us a BT_INTERESTED" );
655            msgs->info->peerIsInterested = 1;
656            break;
657
658        case BT_NOT_INTERESTED:
659            assert( msglen == 0 );
660            dbgmsg( msgs, "w00t peer sent us a BT_NOT_INTERESTED" );
661            msgs->info->peerIsInterested = 0;
662            break;
663
664        case BT_HAVE:
665            assert( msglen == 4 );
666            dbgmsg( msgs, "w00t peer sent us a BT_HAVE" );
667            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
668            tr_bitfieldAdd( msgs->info->have, ui32 );
669            msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
670            dbgmsg( msgs, "after the HAVE message, peer progress is %f", msgs->info->progress );
671            updateInterest( msgs );
672            firePeerProgress( msgs );
673            break;
674
675        case BT_BITFIELD:
676            assert( msglen == msgs->info->have->len );
677            dbgmsg( msgs, "w00t peer sent us a BT_BITFIELD" );
678            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
679            msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
680            dbgmsg( msgs, "after the HAVE message, peer progress is %f", msgs->info->progress );
681            updateInterest( msgs );
682            fireNeedReq( msgs );
683            firePeerProgress( msgs );
684            break;
685
686        case BT_REQUEST: {
687            struct peer_request * req;
688            assert( msglen == 12 );
689            dbgmsg( msgs, "peer sent us a BT_REQUEST" );
690            req = tr_new( struct peer_request, 1 );
691            tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
692            tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
693            tr_peerIoReadUint32( msgs->io, inbuf, &req->length );
694            if( !msgs->info->peerIsChoked && requestIsValid( msgs, req ) )
695                tr_list_append( &msgs->peerAskedFor, req );
696            else
697                tr_free( req );
698            break;
699        }
700
701        case BT_CANCEL: {
702            struct peer_request req;
703            void * data;
704            assert( msglen == 12 );
705            dbgmsg( msgs, "peer sent us a BT_CANCEL" );
706            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
707            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
708            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
709            data = tr_list_remove( &msgs->peerAskedFor, &req, peer_request_compare );
710            tr_free( data );
711            break;
712        }
713
714        case BT_PIECE: {
715            dbgmsg( msgs, "peer sent us a BT_PIECE" );
716            assert( msgs->blockToUs.length == 0 );
717            tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.index );
718            tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.offset );
719            msgs->blockToUs.length = msglen - 8;
720            assert( EVBUFFER_LENGTH(msgs->inBlock) == 0 );
721            //evbuffer_drain( msgs->inBlock, ~0 );
722            msgs->state = msgs->blockToUs.length ? READING_BT_PIECE : AWAITING_BT_LENGTH;
723            return READ_AGAIN;
724            break;
725        }
726
727        case BT_PORT: {
728            assert( msglen == 2 );
729            dbgmsg( msgs, "peer sent us a BT_PORT" );
730            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
731            break;
732        }
733       
734        case BT_SUGGEST: {
735            /* tiennou TODO */
736            break;
737        }
738           
739        case BT_HAVE_ALL: {
740            /* tiennou TODO */
741            break;
742        }
743           
744        case BT_HAVE_NONE: {
745            /* tiennou TODO */
746            break;
747        }
748           
749        case BT_REJECT: {
750            /* tiennou TODO */
751            break;
752        }
753           
754        case BT_ALLOWED_FAST: {
755            /* tiennou TODO */
756            break;
757        }
758           
759        case BT_LTEP:
760            dbgmsg( msgs, "peer sent us a BT_LTEP" );
761            parseLtep( msgs, msglen, inbuf );
762            break;
763
764        default:
765            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
766            tr_peerIoDrain( msgs->io, inbuf, msglen );
767            assert( 0 );
768    }
769
770    msgs->incomingMessageLength = -1;
771    msgs->state = AWAITING_BT_LENGTH;
772    return READ_AGAIN;
773}
774
775static void
776clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
777{
778    tr_torrent * tor = msgs->torrent;
779    tor->activityDate = tr_date( );
780    tor->downloadedCur += byteCount;
781    tr_rcTransferred( tor->download, byteCount );
782    tr_rcTransferred( tor->handle->download, byteCount );
783}
784
785static void
786peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
787{
788    tr_torrent * tor = msgs->torrent;
789    tor->activityDate = tr_date( );
790    tor->uploadedCur += byteCount;
791    tr_rcTransferred( tor->upload, byteCount );
792    tr_rcTransferred( tor->handle->upload, byteCount );
793}
794
795static int
796canDownload( const tr_peermsgs * msgs )
797{
798    tr_torrent * tor = msgs->torrent;
799
800    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
801        return !tor->handle->useDownloadLimit || tr_rcCanTransfer( tor->handle->download );
802
803    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
804        return tr_rcCanTransfer( tor->download );
805
806    return TRUE;
807}
808
809static void
810reassignBytesToCorrupt( tr_peermsgs * msgs, uint32_t byteCount )
811{
812    tr_torrent * tor = msgs->torrent;
813
814    /* increment the `corrupt' field */
815    tor->corruptCur += byteCount;
816
817    /* decrement the `downloaded' field */
818    if( tor->downloadedCur >= byteCount )
819        tor->downloadedCur -= byteCount;
820    else
821        tor->downloadedCur = 0;
822}
823
824
825static void
826gotBadPiece( tr_peermsgs * msgs, uint32_t pieceIndex )
827{
828    const uint32_t byteCount = tr_torPieceCountBytes( msgs->torrent, (int)pieceIndex );
829    reassignBytesToCorrupt( msgs, byteCount );
830}
831
832static void
833gotUnwantedBlock( tr_peermsgs * msgs, uint32_t index UNUSED, uint32_t offset UNUSED, uint32_t length )
834{
835    reassignBytesToCorrupt( msgs, length );
836}
837
838static void
839addUsToBlamefield( tr_peermsgs * msgs, uint32_t index )
840{
841    if( !msgs->info->blame )
842         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
843    tr_bitfieldAdd( msgs->info->blame, index );
844}
845
846static void
847gotBlock( tr_peermsgs      * msgs,
848          struct evbuffer  * inbuf,
849          uint32_t           index,
850          uint32_t           offset,
851          uint32_t           length )
852{
853    tr_torrent * tor = msgs->torrent;
854    const int block = _tr_block( tor, index, offset );
855    struct peer_request key, *req;
856
857    /**
858    *** Remove the block from our `we asked for this' list
859    **/
860
861    key.index = index;
862    key.offset = offset;
863    key.length = length;
864    req = (struct peer_request*) tr_list_remove( &msgs->clientAskedFor, &key,
865                                                 peer_request_compare );
866    if( req == NULL ) {
867        gotUnwantedBlock( msgs, index, offset, length );
868        dbgmsg( msgs, "we didn't ask for this message..." );
869        return;
870    }
871    dbgmsg( msgs, "w00t peer sent us a block.  turnaround time was %d seconds", 
872                     (int)(time(NULL) - req->time_requested) );
873    tr_free( req );
874    dbgmsg( msgs, "peer has %d more blocks we've asked for",
875                  tr_list_size(msgs->clientAskedFor));
876
877    /**
878    *** Error checks
879    **/
880
881    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
882        dbgmsg( msgs, "have this block already..." );
883        tr_dbg( "have this block already..." );
884        gotUnwantedBlock( msgs, index, offset, length );
885        return;
886    }
887
888    if( (int)length != tr_torBlockCountBytes( tor, block ) ) {
889        dbgmsg( msgs, "block is the wrong length..." );
890        tr_dbg( "block is the wrong length..." );
891        gotUnwantedBlock( msgs, index, offset, length );
892        return;
893    }
894
895    /**
896    ***  Write the block
897    **/
898
899    if( tr_ioWrite( tor, index, offset, length, EVBUFFER_DATA( inbuf ))) {
900        return;
901    }
902
903    tr_cpBlockAdd( tor->completion, block );
904
905    addUsToBlamefield( msgs, index );
906
907    fireGotBlock( msgs, index, offset, length );
908    fireNeedReq( msgs );
909
910    /**
911    ***  Handle if this was the last block in the piece
912    **/
913
914    if( tr_cpPieceIsComplete( tor->completion, index ) )
915    {
916        if( tr_ioHash( tor, index ) )
917        {
918            gotBadPiece( msgs, index );
919            return;
920        }
921
922        fireClientHave( msgs, index );
923    }
924}
925
926
927static ReadState
928readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf )
929{
930    uint32_t inlen;
931    uint8_t * tmp;
932
933    assert( msgs != NULL );
934    assert( msgs->blockToUs.length > 0 );
935    assert( inbuf != NULL );
936    assert( EVBUFFER_LENGTH( inbuf ) > 0 );
937
938    /* read from the inbuf into our block buffer */
939    inlen = MIN( EVBUFFER_LENGTH(inbuf), msgs->blockToUs.length );
940    tmp = tr_new( uint8_t, inlen );
941    tr_peerIoReadBytes( msgs->io, inbuf, tmp, inlen );
942    evbuffer_add( msgs->inBlock, tmp, inlen );
943
944    /* update our tables accordingly */
945    assert( inlen >= msgs->blockToUs.length );
946    msgs->blockToUs.length -= inlen;
947    msgs->info->peerSentPieceDataAt = time( NULL );
948    clientGotBytes( msgs, inlen );
949
950    /* if this was the entire block, save it */
951    if( !msgs->blockToUs.length )
952    {
953        dbgmsg( msgs, "w00t -- got block index %u, offset %u", msgs->blockToUs.index, msgs->blockToUs.offset );
954        assert( (int)EVBUFFER_LENGTH( msgs->inBlock ) == tr_torBlockCountBytes( msgs->torrent, _tr_block(msgs->torrent,msgs->blockToUs.index, msgs->blockToUs.offset) ) );
955        gotBlock( msgs, msgs->inBlock,
956                        msgs->blockToUs.index,
957                        msgs->blockToUs.offset,
958                        EVBUFFER_LENGTH( msgs->inBlock ) );
959        evbuffer_drain( msgs->inBlock, ~0 );
960        msgs->state = AWAITING_BT_LENGTH;
961    }
962
963    /* cleanup */
964    tr_free( tmp );
965    return READ_AGAIN;
966}
967
968static ReadState
969canRead( struct bufferevent * evin, void * vmsgs )
970{
971    ReadState ret;
972    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
973    struct evbuffer * inbuf = EVBUFFER_INPUT ( evin );
974
975    if( !canDownload( msgs ) )
976    {
977        msgs->notListening = 1;
978        tr_peerIoSetIOMode ( msgs->io, 0, EV_READ );
979        ret = READ_DONE;
980    }
981    else switch( msgs->state )
982    {
983        case AWAITING_BT_LENGTH:  ret = readBtLength  ( msgs, inbuf ); break;
984        case AWAITING_BT_MESSAGE: ret = readBtMessage ( msgs, inbuf ); break;
985        case READING_BT_PIECE:    ret = readBtPiece   ( msgs, inbuf ); break;
986        default: assert( 0 );
987    }
988
989    return ret;
990}
991
992static void
993sendKeepalive( tr_peermsgs * msgs )
994{
995    dbgmsg( msgs, "sending a keepalive message" );
996    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
997}
998
999/**
1000***
1001**/
1002
1003static int
1004canWrite( const tr_peermsgs * msgs )
1005{
1006    /* don't let our outbuffer get too large */
1007    if( tr_peerIoWriteBytesWaiting( msgs->io ) > 2048 )
1008        return FALSE;
1009
1010    return TRUE;
1011}
1012
1013static int
1014canUpload( const tr_peermsgs * msgs )
1015{
1016    const tr_torrent * tor = msgs->torrent;
1017
1018    if( !canWrite( msgs ) )
1019        return FALSE;
1020
1021    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1022        return !tor->handle->useUploadLimit || tr_rcCanTransfer( tor->handle->upload );
1023
1024    if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
1025        return tr_rcCanTransfer( tor->upload );
1026
1027    return TRUE;
1028}
1029
1030static int
1031pulse( void * vmsgs )
1032{
1033    const time_t now = time( NULL );
1034    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1035    size_t len;
1036
1037    /* if we froze out a downloaded block because of speed limits,
1038       start listening to the peer again */
1039    if( msgs->notListening && canDownload( msgs ) )
1040    {
1041        fprintf( stderr, "msgs %p thawing out...\n", msgs );
1042        msgs->notListening = 0;
1043        tr_peerIoSetIOMode ( msgs->io, EV_READ, 0 );
1044    }
1045
1046    if( !canWrite( msgs ) )
1047    {
1048    }
1049    else if(( len = EVBUFFER_LENGTH( msgs->outBlock ) ))
1050    {
1051        while ( len && canUpload( msgs ) )
1052        {
1053            const size_t outlen = MIN( len, 2048 );
1054            tr_peerIoWrite( msgs->io, EVBUFFER_DATA(msgs->outBlock), outlen );
1055            evbuffer_drain( msgs->outBlock, outlen );
1056            peerGotBytes( msgs, outlen );
1057            len -= outlen;
1058            msgs->info->clientSentPieceDataAt = now;
1059            msgs->clientSentAnythingAt = now;
1060            dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen, (int)len );
1061        }
1062    }
1063    else if(( len = EVBUFFER_LENGTH( msgs->outMessages ) ))
1064    {
1065        tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1066        msgs->clientSentAnythingAt = now;
1067    }
1068    else if(( msgs->peerAskedFor ))
1069    {
1070        struct peer_request * req = tr_list_pop_front( &msgs->peerAskedFor );
1071        uint8_t * tmp = tr_new( uint8_t, req->length );
1072        const uint32_t msglen = sizeof(uint8_t) + 2*sizeof(uint32_t) + req->length;
1073        tr_ioRead( msgs->torrent, req->index, req->offset, req->length, tmp );
1074        tr_peerIoWriteUint32( msgs->io, msgs->outBlock, msglen );
1075        tr_peerIoWriteUint8 ( msgs->io, msgs->outBlock, BT_PIECE );
1076        tr_peerIoWriteUint32( msgs->io, msgs->outBlock, req->index );
1077        tr_peerIoWriteUint32( msgs->io, msgs->outBlock, req->offset );
1078        tr_peerIoWriteBytes ( msgs->io, msgs->outBlock, tmp, req->length );
1079        tr_free( tmp );
1080        dbgmsg( msgs, "putting req into out queue: index %d, offset %d, length %d ... %d blocks left in our queue", (int)req->index, (int)req->offset, (int)req->length, tr_list_size(msgs->peerAskedFor) );
1081        tr_free( req );
1082    }
1083    else if( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
1084    {
1085        sendKeepalive( msgs );
1086    }
1087
1088    return TRUE; /* loop forever */
1089}
1090
1091static void
1092didWrite( struct bufferevent * evin UNUSED, void * vpeer )
1093{
1094    pulse( (tr_peermsgs *) vpeer );
1095}
1096
1097static void
1098gotError( struct bufferevent * evbuf UNUSED, short what UNUSED, void * vpeer )
1099{
1100    fireGotError( vpeer );
1101}
1102
1103static void
1104sendBitfield( tr_peermsgs * msgs )
1105{
1106    const tr_bitfield * bitfield = tr_cpPieceBitfield( msgs->torrent->completion );
1107
1108    dbgmsg( msgs, "sending peer a bitfield message" );
1109    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + bitfield->len );
1110    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_BITFIELD );
1111    tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, bitfield->bits, bitfield->len );
1112}
1113
1114/**
1115***
1116**/
1117
1118#define MAX_DIFFS 50
1119
1120typedef struct
1121{
1122    tr_pex * added;
1123    tr_pex * dropped;
1124    tr_pex * elements;
1125    int addedCount;
1126    int droppedCount;
1127    int elementCount;
1128    int diffCount;
1129}
1130PexDiffs;
1131
1132static void
1133pexAddedCb( void * vpex, void * userData )
1134{
1135    PexDiffs * diffs = (PexDiffs *) userData;
1136    tr_pex * pex = (tr_pex *) vpex;
1137    if( diffs->diffCount < MAX_DIFFS )
1138    {
1139        diffs->diffCount++;
1140        diffs->added[diffs->addedCount++] = *pex;
1141        diffs->elements[diffs->elementCount++] = *pex;
1142    }
1143}
1144
1145static void
1146pexRemovedCb( void * vpex, void * userData )
1147{
1148    PexDiffs * diffs = (PexDiffs *) userData;
1149    tr_pex * pex = (tr_pex *) vpex;
1150    if( diffs->diffCount < MAX_DIFFS )
1151    {
1152        diffs->diffCount++;
1153        diffs->dropped[diffs->droppedCount++] = *pex;
1154    }
1155}
1156
1157static void
1158pexElementCb( void * vpex, void * userData )
1159{
1160    PexDiffs * diffs = (PexDiffs *) userData;
1161    tr_pex * pex = (tr_pex *) vpex;
1162    if( diffs->diffCount < MAX_DIFFS )
1163    {
1164        diffs->diffCount++;
1165        diffs->elements[diffs->elementCount++] = *pex;
1166    }
1167}
1168
1169static void
1170sendPex( tr_peermsgs * msgs )
1171{
1172    if( msgs->peerSupportsPex && !msgs->torrent->pexDisabled )
1173    {
1174        int i;
1175        tr_pex * newPex = NULL;
1176        const int newCount = tr_peerMgrGetPeers( msgs->handle->peerMgr, msgs->torrent->info.hash, &newPex );
1177        PexDiffs diffs;
1178        benc_val_t val, *added, *dropped, *flags;
1179        uint8_t *tmp, *walk;
1180        char * benc;
1181        int bencLen;
1182
1183        /* build the diffs */
1184        diffs.added = tr_new( tr_pex, newCount );
1185        diffs.addedCount = 0;
1186        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1187        diffs.droppedCount = 0;
1188        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1189        diffs.elementCount = 0;
1190        diffs.diffCount = 0;
1191        tr_set_compare( msgs->pex, msgs->pexCount,
1192                        newPex, newCount,
1193                        tr_pexCompare, sizeof(tr_pex),
1194                        pexRemovedCb, pexAddedCb, pexElementCb, &diffs );
1195        dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1196
1197        /* update peer */
1198        tr_free( msgs->pex );
1199        msgs->pex = diffs.elements;
1200        msgs->pexCount = diffs.elementCount;
1201
1202        /* build the pex payload */
1203        tr_bencInit( &val, TYPE_DICT );
1204        tr_bencDictReserve( &val, 3 );
1205
1206        /* "added" */
1207        added = tr_bencDictAdd( &val, "added" );
1208        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1209        for( i=0; i<diffs.addedCount; ++i ) {
1210            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1211            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1212        }
1213        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1214        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
1215
1216        /* "added.f" */
1217        flags = tr_bencDictAdd( &val, "added.f" );
1218        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1219        for( i=0; i<diffs.addedCount; ++i ) {
1220            dbgmsg( msgs, "PEX -->> -->> flag is %d", (int)diffs.added[i].flags );
1221            *walk++ = diffs.added[i].flags;
1222        }
1223        assert( ( walk - tmp ) == diffs.addedCount );
1224        tr_bencInitStr( flags, tmp, walk-tmp, FALSE );
1225
1226        /* "dropped" */
1227        dropped = tr_bencDictAdd( &val, "dropped" );
1228        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1229        for( i=0; i<diffs.droppedCount; ++i ) {
1230            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1231            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1232        }
1233        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1234        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
1235
1236        /* write the pex message */
1237        benc = tr_bencSaveMalloc( &val, &bencLen );
1238        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 2*sizeof(uint8_t) + bencLen );
1239        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_LTEP );
1240        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, OUR_LTEP_PEX );
1241        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, benc, bencLen );
1242
1243        /* cleanup */
1244        tr_free( benc );
1245        tr_bencFree( &val );
1246        tr_free( diffs.added );
1247        tr_free( diffs.dropped );
1248        tr_free( newPex );
1249
1250        msgs->clientSentPexAt = time( NULL );
1251    }
1252}
1253
1254static int
1255pexPulse( void * vpeer )
1256{
1257    sendPex( vpeer );
1258    return TRUE;
1259}
1260
1261/**
1262***
1263**/
1264
1265tr_peermsgs*
1266tr_peerMsgsNew( struct tr_torrent * torrent, struct tr_peer * info )
1267{
1268    tr_peermsgs * msgs;
1269
1270    assert( info != NULL );
1271    assert( info->io != NULL );
1272
1273    msgs = tr_new0( tr_peermsgs, 1 );
1274    msgs->publisher = tr_publisherNew( );
1275    msgs->info = info;
1276    msgs->handle = torrent->handle;
1277    msgs->torrent = torrent;
1278    msgs->io = info->io;
1279    msgs->info->clientIsChoked = 1;
1280    msgs->info->peerIsChoked = 1;
1281    msgs->info->clientIsInterested = 0;
1282    msgs->info->peerIsInterested = 0;
1283    msgs->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1284    msgs->pulseTimer = tr_timerNew( msgs->handle, pulse, msgs, PEER_PULSE_INTERVAL );
1285    msgs->pexTimer = tr_timerNew( msgs->handle, pexPulse, msgs, PEX_INTERVAL );
1286    msgs->outMessages = evbuffer_new( );
1287    msgs->outBlock = evbuffer_new( );
1288    msgs->inBlock = evbuffer_new( );
1289
1290    tr_peerIoSetIOFuncs( msgs->io, canRead, didWrite, gotError, msgs );
1291    tr_peerIoSetIOMode( msgs->io, EV_READ|EV_WRITE, 0 );
1292
1293    /**
1294    ***  If we initiated this connection,
1295    ***  we may need to send LTEP/AZMP handshakes.
1296    ***  Otherwise we'll wait for the peer to send theirs first.
1297    **/
1298    if( !tr_peerIoIsIncoming( msgs->io ) )
1299    {
1300        if ( tr_peerIoSupportsLTEP( msgs->io ) ) {
1301            sendLtepHandshake( msgs );
1302           
1303        } else if ( tr_peerIoSupportsAZMP( msgs->io ) ) {
1304            dbgmsg( msgs, "FIXME: need to send AZMP handshake" );
1305           
1306        } else {
1307            /* no-op */
1308        }
1309    }
1310
1311    sendBitfield( msgs );
1312    return msgs;
1313}
1314
1315void
1316tr_peerMsgsFree( tr_peermsgs* msgs )
1317{
1318    if( msgs != NULL )
1319    {
1320        tr_timerFree( &msgs->pulseTimer );
1321        tr_timerFree( &msgs->pexTimer );
1322        tr_publisherFree( &msgs->publisher );
1323        tr_list_foreach( msgs->clientAskedFor, tr_free );
1324        tr_list_free( &msgs->clientAskedFor );
1325        tr_list_foreach( msgs->peerAskedFor, tr_free );
1326        tr_list_free( &msgs->peerAskedFor );
1327        evbuffer_free( msgs->outMessages );
1328        evbuffer_free( msgs->outBlock );
1329        evbuffer_free( msgs->inBlock );
1330        tr_free( msgs->pex );
1331        msgs->pexCount = 0;
1332        tr_free( msgs );
1333    }
1334}
1335
1336tr_publisher_tag
1337tr_peerMsgsSubscribe( tr_peermsgs       * peer,
1338                      tr_delivery_func    func,
1339                      void              * userData )
1340{
1341    return tr_publisherSubscribe( peer->publisher, func, userData );
1342}
1343
1344void
1345tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
1346                        tr_publisher_tag    tag )
1347{
1348    tr_publisherUnsubscribe( peer->publisher, tag );
1349}
Note: See TracBrowser for help on using the repository browser.