source: trunk/libtransmission/peer-msgs.c @ 3197

Last change on this file since 3197 was 3197, checked in by charles, 15 years ago

some experimental code. (1) try to improve throughput to peers. (2) add first draft of new tr_stat fields requested by BentMyWookie? (3) raise the per-torrent peer limit to 100 to match LibTorrent?'s defaults

  • Property svn:keywords set to Date Rev Author Id
File size: 38.7 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 3197 2007-09-27 03:03:38Z charles $
11 */
12
13#include <assert.h>
14#include <ctype.h>
15#include <stdio.h>
16#include <stdlib.h>
17#include <string.h>
18
19#include <arpa/inet.h>
20
21#include <sys/types.h> /* event.h needs this */
22#include <event.h>
23
24#include "transmission.h"
25#include "bencode.h"
26#include "completion.h"
27#include "inout.h"
28#include "list.h"
29#include "peer-io.h"
30#include "peer-mgr.h"
31#include "peer-mgr-private.h"
32#include "peer-msgs.h"
33#include "ratecontrol.h"
34#include "trevent.h"
35#include "utils.h"
36
37/**
38***
39**/
40
41enum
42{
43    BT_CHOKE                = 0,
44    BT_UNCHOKE              = 1,
45    BT_INTERESTED           = 2,
46    BT_NOT_INTERESTED       = 3,
47    BT_HAVE                 = 4,
48    BT_BITFIELD             = 5,
49    BT_REQUEST              = 6,
50    BT_PIECE                = 7,
51    BT_CANCEL               = 8,
52    BT_PORT                 = 9,
53    BT_SUGGEST              = 13,
54    BT_HAVE_ALL             = 14,
55    BT_HAVE_NONE            = 15,
56    BT_REJECT               = 16,
57    BT_ALLOWED_FAST         = 17,
58    BT_LTEP                 = 20,
59
60    LTEP_HANDSHAKE          = 0,
61
62    OUR_LTEP_PEX            = 1,
63
64    MAX_REQUEST_BYTE_COUNT  = (16 * 1024), /* drop requests who want too much */
65
66    KEEPALIVE_INTERVAL_SECS = 90,          /* idle seconds before we send a keepalive */
67    PEX_INTERVAL            = (60 * 1000), /* msec between calls to sendPex() */
68    PEER_PULSE_INTERVAL     = (100),       /* msec between calls to pulse() */
69};
70
71enum
72{
73    AWAITING_BT_LENGTH,
74    AWAITING_BT_MESSAGE,
75    READING_BT_PIECE
76};
77
78struct peer_request
79{
80    uint32_t index;
81    uint32_t offset;
82    uint32_t length;
83    time_t time_requested;
84};
85
86static int
87peer_request_compare( const void * va, const void * vb )
88{
89    struct peer_request * a = (struct peer_request*) va;
90    struct peer_request * b = (struct peer_request*) vb;
91    if( a->index != b->index ) return a->index - b->index;
92    if( a->offset != b->offset ) return a->offset - b->offset;
93    if( a->length != b->length ) return a->length - b->length;
94    return 0;
95}
96
97struct tr_peermsgs
98{
99    tr_peer * info;
100
101    tr_handle * handle;
102    tr_torrent * torrent;
103    tr_peerIo * io;
104
105    tr_publisher_t * publisher;
106
107    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
108    struct evbuffer * outBlock;    /* the block we're currently sending */
109    struct evbuffer * inBlock;     /* the block we're currently receiving */
110    tr_list * peerAskedFor;
111    tr_list * clientAskedFor;
112
113    tr_timer * pulseTimer;
114    tr_timer * pexTimer;
115
116    struct peer_request blockToUs; /* the block currntly being sent to us */
117
118    time_t lastReqAddedAt;
119    time_t clientSentPexAt;
120    time_t clientSentAnythingAt;
121
122    unsigned int notListening          : 1;
123    unsigned int peerSupportsPex       : 1;
124    unsigned int hasSentLtepHandshake  : 1;
125
126    uint8_t state;
127
128    uint8_t ut_pex_id;
129
130    uint16_t pexCount;
131
132    uint32_t incomingMessageLength;
133
134    tr_pex * pex;
135};
136
137/**
138***
139**/
140
141static void
142myDebug( const char * file, int line, const struct tr_peermsgs * msgs, const char * fmt, ... )
143{
144    FILE * fp = tr_getLog( );
145    if( fp != NULL )
146    {
147        va_list args;
148        const char * addr = tr_peerIoGetAddrStr( msgs->io );
149        struct evbuffer * buf = evbuffer_new( );
150        evbuffer_add_printf( buf, "[%s:%d] %s (%p) ", file, line, addr, msgs );
151        va_start( args, fmt );
152        evbuffer_add_vprintf( buf, fmt, args );
153        va_end( args );
154        fprintf( fp, "%s\n", EVBUFFER_DATA(buf) );
155        evbuffer_free( buf );
156    }
157}
158
159#define dbgmsg(handshake, fmt...) myDebug(__FILE__, __LINE__, handshake, ##fmt )
160
161/**
162***  EVENTS
163**/
164
165static const tr_peermsgs_event blankEvent = { 0, 0, 0, 0, 0.0f };
166
167static void
168publish( tr_peermsgs * msgs, tr_peermsgs_event * e )
169{
170    tr_publisherPublish( msgs->publisher, msgs->info, e );
171}
172
173static void
174fireGotError( tr_peermsgs * msgs )
175{
176    tr_peermsgs_event e = blankEvent;
177    e.eventType = TR_PEERMSG_GOT_ERROR;
178    publish( msgs, &e );
179}
180
181static void
182fireNeedReq( tr_peermsgs * msgs )
183{
184    tr_peermsgs_event e = blankEvent;
185    e.eventType = TR_PEERMSG_NEED_REQ;
186    publish( msgs, &e );
187}
188
189static void
190firePeerProgress( tr_peermsgs * msgs )
191{
192    tr_peermsgs_event e = blankEvent;
193    e.eventType = TR_PEERMSG_PEER_PROGRESS;
194    e.progress = msgs->info->progress;
195    publish( msgs, &e );
196}
197
198static void
199fireClientHave( tr_peermsgs * msgs, uint32_t pieceIndex )
200{
201    tr_peermsgs_event e = blankEvent;
202    e.eventType = TR_PEERMSG_CLIENT_HAVE;
203    e.pieceIndex = pieceIndex;
204    publish( msgs, &e );
205}
206
207static void
208fireGotBlock( tr_peermsgs * msgs, uint32_t pieceIndex, uint32_t offset, uint32_t length )
209{
210    tr_peermsgs_event e = blankEvent;
211    e.eventType = TR_PEERMSG_CLIENT_BLOCK;
212    e.pieceIndex = pieceIndex;
213    e.offset = offset;
214    e.length = length;
215    publish( msgs, &e );
216}
217
218/**
219***  INTEREST
220**/
221
222static int
223isPieceInteresting( const tr_peermsgs   * peer,
224                    int                   piece )
225{
226    const tr_torrent * torrent = peer->torrent;
227    if( torrent->info.pieces[piece].dnd ) /* we don't want it */
228        return FALSE;
229    if( tr_cpPieceIsComplete( torrent->completion, piece ) ) /* we already have it */
230        return FALSE;
231    if( !tr_bitfieldHas( peer->info->have, piece ) ) /* peer doesn't have it */
232        return FALSE;
233    if( tr_bitfieldHas( peer->info->banned, piece ) ) /* peer is banned for it */
234        return FALSE;
235    return TRUE;
236}
237
238static int
239isPeerInteresting( const tr_peermsgs * msgs )
240{
241    const int clientIsSeed = tr_cpGetStatus( msgs->torrent->completion ) != TR_CP_INCOMPLETE;
242    const int peerIsSeed = msgs->info->progress >= 1.0;
243
244    if( peerIsSeed )
245    {
246        return !clientIsSeed;
247    }
248    else if( clientIsSeed )
249    {
250        return !peerIsSeed;
251    }
252    else /* we're both leeches... */
253    {
254        int i;
255        const tr_torrent * torrent = msgs->torrent;
256        const tr_bitfield * bitfield = tr_cpPieceBitfield( torrent->completion );
257
258        if( !msgs->info->have ) /* We don't know what this peer has... what should this be? */
259            return TRUE;
260
261        assert( bitfield->len == msgs->info->have->len );
262        for( i=0; i<torrent->info.pieceCount; ++i )
263            if( isPieceInteresting( msgs, i ) )
264                return TRUE;
265
266        return FALSE;
267    }
268}
269
270static void
271sendInterest( tr_peermsgs * msgs, int weAreInterested )
272{
273    assert( msgs != NULL );
274    assert( weAreInterested==0 || weAreInterested==1 );
275
276    msgs->info->clientIsInterested = weAreInterested;
277    dbgmsg( msgs, ": sending an %s message", (weAreInterested ? "INTERESTED" : "NOT_INTERESTED") );
278
279    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
280    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
281}
282
283static void
284updateInterest( tr_peermsgs * msgs )
285{
286    const int i = isPeerInteresting( msgs );
287    if( i != msgs->info->clientIsInterested )
288        sendInterest( msgs, i );
289    if( i )
290        fireNeedReq( msgs );
291}
292
293void
294tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
295{
296    assert( msgs != NULL );
297    assert( msgs->info != NULL );
298    assert( choke==0 || choke==1 );
299
300    if( msgs->info->peerIsChoked != choke )
301    {
302        msgs->info->peerIsChoked = choke ? 1 : 0;
303        if( msgs->info )
304        {
305            tr_list_foreach( msgs->peerAskedFor, tr_free );
306            tr_list_free( &msgs->peerAskedFor );
307        }
308
309        dbgmsg( msgs, "sending a %s message", (choke ? "CHOKE" : "UNCHOKE") );
310        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
311        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, choke ? BT_CHOKE : BT_UNCHOKE );
312    }
313}
314
315/**
316***
317**/
318
319void
320tr_peerMsgsCancel( tr_peermsgs * msgs,
321                   uint32_t      pieceIndex,
322                   uint32_t      offset,
323                   uint32_t      length )
324{
325    tr_list * node;
326    struct peer_request tmp;
327
328    assert( msgs != NULL );
329    assert( length > 0 );
330
331    tmp.index = pieceIndex;
332    tmp.offset = offset;
333    tmp.length = length;
334
335    node = tr_list_find( msgs->clientAskedFor, &tmp, peer_request_compare );
336    if( node != NULL )
337    {
338        /* cancel the request */
339        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + 3*sizeof(uint32_t) );
340        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_CANCEL );
341        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
342        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
343        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
344
345        /* remove it from our "requested" list */
346        tr_list_remove_data( &msgs->peerAskedFor, node->data );
347    }
348}
349
350/**
351***
352**/
353
354void
355tr_peerMsgsHave( tr_peermsgs * msgs,
356                 uint32_t      pieceIndex )
357{
358    dbgmsg( msgs, "w00t telling them we HAVE piece #%d", pieceIndex );
359
360    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
361    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_HAVE );
362    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
363
364    updateInterest( msgs );
365}
366
367/**
368***
369**/
370
371static int
372pulse( void * vmsgs );
373
374static int
375reqIsValid( const tr_peermsgs * msgs, uint32_t index, uint32_t offset, uint32_t length )
376{
377    const tr_torrent * tor = msgs->torrent;
378
379    if( index >= (uint32_t) tor->info.pieceCount )
380        return FALSE;
381    if ( (int)offset >= tr_torPieceCountBytes( tor, (int)index ) )
382        return FALSE;
383    if( length > MAX_REQUEST_BYTE_COUNT )
384        return FALSE;
385    if( tr_pieceOffset( tor, index, offset, length ) > tor->info.totalSize )
386        return FALSE;
387
388    return TRUE;
389}
390
391static int
392requestIsValid( const tr_peermsgs * msgs, struct peer_request * req )
393{
394    return reqIsValid( msgs, req->index, req->offset, req->length );
395}
396
397int
398tr_peerMsgsAddRequest( tr_peermsgs * msgs,
399                       uint32_t      index, 
400                       uint32_t      offset, 
401                       uint32_t      length )
402{
403    struct peer_request * req;
404    int maxSize;
405
406    assert( msgs != NULL );
407    assert( msgs->torrent != NULL );
408    assert( reqIsValid( msgs, index, offset, length ) );
409
410    if( msgs->info->clientIsChoked )
411        return TR_ADDREQ_CLIENT_CHOKED;
412
413    if( !tr_bitfieldHas( msgs->info->have, index ) )
414        return TR_ADDREQ_MISSING;
415
416    maxSize = MIN( 2 + (int)(tr_peerIoGetRateToClient(msgs->io)/10), 100 );
417    //if( ( time(NULL) - msgs->lastReqAddedAt <= 5 ) && ( tr_list_size( msgs->clientAskedFor) >= maxSize ) )
418    if( tr_list_size( msgs->clientAskedFor) >= maxSize )
419        return TR_ADDREQ_FULL;
420
421    dbgmsg( msgs, "w00t peer has a max request queue size of %d... adding request for piece %d, offset %d", maxSize, (int)index, (int)offset );
422
423    /* queue the request */
424    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + 3*sizeof(uint32_t) );
425    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_REQUEST );
426    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, index );
427    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
428    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
429
430    /* add it to our `requests sent' list */
431    req = tr_new( struct peer_request, 1 );
432    req->index = index;
433    req->offset = offset;
434    req->length = length;
435    req->time_requested = msgs->lastReqAddedAt = time( NULL );
436    tr_list_append( &msgs->clientAskedFor, req );
437    pulse( msgs );
438
439    return TR_ADDREQ_OK;
440}
441
442/**
443***
444**/
445
446static void
447sendLtepHandshake( tr_peermsgs * msgs )
448{
449    benc_val_t val, *m;
450    char * buf;
451    int len;
452    const char * v = TR_NAME " " USERAGENT_PREFIX;
453    const int port = tr_getPublicPort( msgs->handle );
454    struct evbuffer * outbuf = evbuffer_new( );
455
456    dbgmsg( msgs, "sending an ltep handshake" );
457    tr_bencInit( &val, TYPE_DICT );
458    tr_bencDictReserve( &val, 3 );
459    m  = tr_bencDictAdd( &val, "m" );
460    tr_bencInit( m, TYPE_DICT );
461    tr_bencDictReserve( m, 1 );
462    tr_bencInitInt( tr_bencDictAdd( m, "ut_pex" ), OUR_LTEP_PEX );
463    if( port > 0 )
464        tr_bencInitInt( tr_bencDictAdd( &val, "p" ), port );
465    tr_bencInitStr( tr_bencDictAdd( &val, "v" ), v, 0, 1 );
466    buf = tr_bencSaveMalloc( &val,  &len );
467
468    tr_peerIoWriteUint32( msgs->io, outbuf, 2*sizeof(uint8_t) + len );
469    tr_peerIoWriteUint8 ( msgs->io, outbuf, BT_LTEP );
470    tr_peerIoWriteUint8 ( msgs->io, outbuf, LTEP_HANDSHAKE );
471    tr_peerIoWriteBytes ( msgs->io, outbuf, buf, len );
472
473    tr_peerIoWriteBuf( msgs->io, outbuf );
474    msgs->hasSentLtepHandshake = 1;
475
476    /* cleanup */
477    tr_bencFree( &val );
478    tr_free( buf );
479    evbuffer_free( outbuf );
480
481}
482
483static void
484parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
485{
486    benc_val_t val, * sub;
487    uint8_t * tmp = tr_new( uint8_t, len );
488
489    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
490
491    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
492        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
493        tr_free( tmp );
494        return;
495    }
496
497    /* check supported messages for utorrent pex */
498    sub = tr_bencDictFind( &val, "m" );
499    if( tr_bencIsDict( sub ) ) {
500        sub = tr_bencDictFind( sub, "ut_pex" );
501        if( tr_bencIsInt( sub ) ) {
502            msgs->peerSupportsPex = 1;
503            msgs->ut_pex_id = (uint8_t) sub->val.i;
504            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
505        }
506    }
507
508#if 0
509    /* get peer's client name */
510    sub = tr_bencDictFind( &val, "v" );
511    if( tr_bencIsStr( sub ) ) {
512        int i;
513        tr_free( msgs->info->client );
514        fprintf( stderr, "dictionary says client is [%s]\n", sub->val.s.s );
515        msgs->info->client = tr_strndup( sub->val.s.s, sub->val.s.i );
516for( i=0; i<sub->val.s.i; ++i ) { fprintf( stderr, "[%c] (%d)\n", sub->val.s.s[i], (int)sub->val.s.s[i] );
517                                  if( (int)msgs->info->client[i]==-75 ) msgs->info->client[i]='u'; }
518        fprintf( stderr, "msgs->client is now [%s]\n", msgs->info->client );
519    }
520#endif
521
522    /* get peer's listening port */
523    sub = tr_bencDictFind( &val, "p" );
524    if( tr_bencIsInt( sub ) ) {
525        msgs->info->port = htons( (uint16_t)sub->val.i );
526        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
527    }
528
529    tr_bencFree( &val );
530    tr_free( tmp );
531}
532
533static void
534parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
535{
536    benc_val_t val, * sub;
537    uint8_t * tmp;
538
539    if( msgs->torrent->pexDisabled ) /* no sharing! */
540        return;
541
542    tmp = tr_new( uint8_t, msglen );
543    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
544
545    if( tr_bencLoad( tmp, msglen, &val, NULL ) || !tr_bencIsDict( &val ) ) {
546        dbgmsg( msgs, "GET can't read extended-pex dictionary" );
547        tr_free( tmp );
548        return;
549    }
550
551    sub = tr_bencDictFind( &val, "added" );
552    if( tr_bencIsStr(sub) && ((sub->val.s.i % 6) == 0)) {
553        const int n = sub->val.s.i / 6 ;
554        dbgmsg( msgs, "got %d peers from uT pex", n );
555        tr_peerMgrAddPeers( msgs->handle->peerMgr,
556                            msgs->torrent->info.hash,
557                            TR_PEER_FROM_PEX,
558                            (uint8_t*)sub->val.s.s, n );
559    }
560
561    tr_bencFree( &val );
562    tr_free( tmp );
563}
564
565static void
566sendPex( tr_peermsgs * msgs );
567
568static void
569parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
570{
571    uint8_t ltep_msgid;
572
573    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
574    msglen--;
575
576    if( ltep_msgid == LTEP_HANDSHAKE )
577    {
578        dbgmsg( msgs, "got ltep handshake" );
579        parseLtepHandshake( msgs, msglen, inbuf );
580        if( !msgs->hasSentLtepHandshake )
581            sendLtepHandshake( msgs );
582    }
583    else if( ltep_msgid == msgs->ut_pex_id )
584    {
585        dbgmsg( msgs, "got ut pex" );
586        msgs->peerSupportsPex = 1;
587        parseUtPex( msgs, msglen, inbuf );
588    }
589    else
590    {
591        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
592        evbuffer_drain( inbuf, msglen );
593    }
594}
595
596static int
597readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf )
598{
599    uint32_t len;
600    const size_t needlen = sizeof(uint32_t);
601
602    if( EVBUFFER_LENGTH(inbuf) < needlen )
603        return READ_MORE;
604
605    tr_peerIoReadUint32( msgs->io, inbuf, &len );
606
607    if( len == 0 ) /* peer sent us a keepalive message */
608        dbgmsg( msgs, "peer sent us a keepalive message..." );
609    else {
610        dbgmsg( msgs, "peer is sending us a message with %"PRIu64" bytes...", (uint64_t)len );
611        msgs->incomingMessageLength = len;
612        msgs->state = AWAITING_BT_MESSAGE;
613    }
614
615    return READ_AGAIN;
616}
617
618static int
619readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf )
620{
621    uint8_t id;
622    uint32_t ui32;
623    uint32_t msglen = msgs->incomingMessageLength;
624
625    if( EVBUFFER_LENGTH(inbuf) < msglen )
626        return READ_MORE;
627
628    tr_peerIoReadUint8( msgs->io, inbuf, &id );
629    msglen--;
630    dbgmsg( msgs, "peer sent us a message... "
631                  "bt id number is %d, and remaining len is %d", (int)id, (int)msglen );
632
633    switch( id )
634    {
635        case BT_CHOKE:
636            assert( msglen == 0 );
637            dbgmsg( msgs, "w00t peer sent us a BT_CHOKE" );
638            msgs->info->clientIsChoked = 1;
639            tr_list_foreach( msgs->peerAskedFor, tr_free );
640            tr_list_free( &msgs->peerAskedFor );
641            tr_list_foreach( msgs->clientAskedFor, tr_free );
642            tr_list_free( &msgs->clientAskedFor );
643            break;
644
645        case BT_UNCHOKE:
646            assert( msglen == 0 );
647            dbgmsg( msgs, "w00t peer sent us a BT_UNCHOKE" );
648            msgs->info->clientIsChoked = 0;
649            fireNeedReq( msgs );
650            break;
651
652        case BT_INTERESTED:
653            assert( msglen == 0 );
654            dbgmsg( msgs, "w00t peer sent us a BT_INTERESTED" );
655            msgs->info->peerIsInterested = 1;
656            break;
657
658        case BT_NOT_INTERESTED:
659            assert( msglen == 0 );
660            dbgmsg( msgs, "w00t peer sent us a BT_NOT_INTERESTED" );
661            msgs->info->peerIsInterested = 0;
662            break;
663
664        case BT_HAVE:
665            assert( msglen == 4 );
666            dbgmsg( msgs, "w00t peer sent us a BT_HAVE" );
667            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
668            tr_bitfieldAdd( msgs->info->have, ui32 );
669            msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
670            dbgmsg( msgs, "after the HAVE message, peer progress is %f", msgs->info->progress );
671            updateInterest( msgs );
672            firePeerProgress( msgs );
673            break;
674
675        case BT_BITFIELD:
676            assert( msglen == msgs->info->have->len );
677            dbgmsg( msgs, "w00t peer sent us a BT_BITFIELD" );
678            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
679            msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
680            dbgmsg( msgs, "after the HAVE message, peer progress is %f", msgs->info->progress );
681            updateInterest( msgs );
682            fireNeedReq( msgs );
683            firePeerProgress( msgs );
684            break;
685
686        case BT_REQUEST: {
687            struct peer_request * req;
688            assert( msglen == 12 );
689            dbgmsg( msgs, "peer sent us a BT_REQUEST" );
690            req = tr_new( struct peer_request, 1 );
691            tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
692            tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
693            tr_peerIoReadUint32( msgs->io, inbuf, &req->length );
694            if( !msgs->info->peerIsChoked && requestIsValid( msgs, req ) )
695                tr_list_append( &msgs->peerAskedFor, req );
696            else
697                tr_free( req );
698            break;
699        }
700
701        case BT_CANCEL: {
702            struct peer_request req;
703            void * data;
704            assert( msglen == 12 );
705            dbgmsg( msgs, "peer sent us a BT_CANCEL" );
706            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
707            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
708            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
709            data = tr_list_remove( &msgs->peerAskedFor, &req, peer_request_compare );
710            tr_free( data );
711            break;
712        }
713
714        case BT_PIECE: {
715            dbgmsg( msgs, "peer sent us a BT_PIECE" );
716            assert( msgs->blockToUs.length == 0 );
717            tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.index );
718            tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.offset );
719            msgs->blockToUs.length = msglen - 8;
720            assert( EVBUFFER_LENGTH(msgs->inBlock) == 0 );
721            //evbuffer_drain( msgs->inBlock, ~0 );
722            msgs->state = msgs->blockToUs.length ? READING_BT_PIECE : AWAITING_BT_LENGTH;
723            return READ_AGAIN;
724            break;
725        }
726
727        case BT_PORT: {
728            assert( msglen == 2 );
729            dbgmsg( msgs, "peer sent us a BT_PORT" );
730            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
731            break;
732        }
733       
734        case BT_SUGGEST: {
735            /* tiennou TODO */
736            break;
737        }
738           
739        case BT_HAVE_ALL: {
740            /* tiennou TODO */
741            break;
742        }
743           
744        case BT_HAVE_NONE: {
745            /* tiennou TODO */
746            break;
747        }
748           
749        case BT_REJECT: {
750            /* tiennou TODO */
751            break;
752        }
753           
754        case BT_ALLOWED_FAST: {
755            /* tiennou TODO */
756            break;
757        }
758           
759        case BT_LTEP:
760            dbgmsg( msgs, "peer sent us a BT_LTEP" );
761            parseLtep( msgs, msglen, inbuf );
762            break;
763
764        default:
765            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
766            tr_peerIoDrain( msgs->io, inbuf, msglen );
767            assert( 0 );
768    }
769
770    msgs->incomingMessageLength = -1;
771    msgs->state = AWAITING_BT_LENGTH;
772    return READ_AGAIN;
773}
774
775static void
776clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
777{
778    tr_torrent * tor = msgs->torrent;
779    tor->activityDate = tr_date( );
780    tor->downloadedCur += byteCount;
781    tr_rcTransferred( tor->download, byteCount );
782    tr_rcTransferred( tor->handle->download, byteCount );
783}
784
785static void
786peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
787{
788    tr_torrent * tor = msgs->torrent;
789    tor->activityDate = tr_date( );
790    tor->uploadedCur += byteCount;
791    tr_rcTransferred( tor->upload, byteCount );
792    tr_rcTransferred( tor->handle->upload, byteCount );
793}
794
795static int
796canDownload( const tr_peermsgs * msgs )
797{
798    tr_torrent * tor = msgs->torrent;
799
800    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
801        return !tor->handle->useDownloadLimit || tr_rcCanTransfer( tor->handle->download );
802
803    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
804        return tr_rcCanTransfer( tor->download );
805
806    return TRUE;
807}
808
809static void
810reassignBytesToCorrupt( tr_peermsgs * msgs, uint32_t byteCount )
811{
812    tr_torrent * tor = msgs->torrent;
813
814    /* increment the `corrupt' field */
815    tor->corruptCur += byteCount;
816
817    /* decrement the `downloaded' field */
818    if( tor->downloadedCur >= byteCount )
819        tor->downloadedCur -= byteCount;
820    else
821        tor->downloadedCur = 0;
822}
823
824
825static void
826gotBadPiece( tr_peermsgs * msgs, uint32_t pieceIndex )
827{
828    const uint32_t byteCount = tr_torPieceCountBytes( msgs->torrent, (int)pieceIndex );
829    reassignBytesToCorrupt( msgs, byteCount );
830}
831
832static void
833gotUnwantedBlock( tr_peermsgs * msgs, uint32_t index UNUSED, uint32_t offset UNUSED, uint32_t length )
834{
835    reassignBytesToCorrupt( msgs, length );
836}
837
838static void
839addUsToBlamefield( tr_peermsgs * msgs, uint32_t index )
840{
841    if( !msgs->info->blame )
842         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
843    tr_bitfieldAdd( msgs->info->blame, index );
844}
845
846static void
847gotBlock( tr_peermsgs      * msgs,
848          struct evbuffer  * inbuf,
849          uint32_t           index,
850          uint32_t           offset,
851          uint32_t           length )
852{
853    tr_torrent * tor = msgs->torrent;
854    const int block = _tr_block( tor, index, offset );
855    struct peer_request key, *req;
856
857    /**
858    *** Remove the block from our `we asked for this' list
859    **/
860
861    key.index = index;
862    key.offset = offset;
863    key.length = length;
864    req = (struct peer_request*) tr_list_remove( &msgs->clientAskedFor, &key,
865                                                 peer_request_compare );
866    if( req == NULL ) {
867        gotUnwantedBlock( msgs, index, offset, length );
868        dbgmsg( msgs, "we didn't ask for this message..." );
869        return;
870    }
871    dbgmsg( msgs, "w00t peer sent us a block.  turnaround time was %d seconds", 
872                     (int)(time(NULL) - req->time_requested) );
873    tr_free( req );
874    dbgmsg( msgs, "peer has %d more blocks we've asked for",
875                  tr_list_size(msgs->clientAskedFor));
876
877    /**
878    *** Error checks
879    **/
880
881    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
882        dbgmsg( msgs, "have this block already..." );
883        tr_dbg( "have this block already..." );
884        gotUnwantedBlock( msgs, index, offset, length );
885        return;
886    }
887
888    if( (int)length != tr_torBlockCountBytes( tor, block ) ) {
889        dbgmsg( msgs, "block is the wrong length..." );
890        tr_dbg( "block is the wrong length..." );
891        gotUnwantedBlock( msgs, index, offset, length );
892        return;
893    }
894
895    /**
896    ***  Write the block
897    **/
898
899    if( tr_ioWrite( tor, index, offset, length, EVBUFFER_DATA( inbuf ))) {
900        return;
901    }
902
903    tr_cpBlockAdd( tor->completion, block );
904
905    addUsToBlamefield( msgs, index );
906
907    fireGotBlock( msgs, index, offset, length );
908    fireNeedReq( msgs );
909
910    /**
911    ***  Handle if this was the last block in the piece
912    **/
913
914    if( tr_cpPieceIsComplete( tor->completion, index ) )
915    {
916        if( tr_ioHash( tor, index ) )
917        {
918            gotBadPiece( msgs, index );
919            return;
920        }
921
922        fireClientHave( msgs, index );
923    }
924}
925
926
927static ReadState
928readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf )
929{
930    uint32_t inlen;
931    uint8_t * tmp;
932
933    assert( msgs != NULL );
934    assert( msgs->blockToUs.length > 0 );
935    assert( inbuf != NULL );
936    assert( EVBUFFER_LENGTH( inbuf ) > 0 );
937
938    /* read from the inbuf into our block buffer */
939    inlen = MIN( EVBUFFER_LENGTH(inbuf), msgs->blockToUs.length );
940    tmp = tr_new( uint8_t, inlen );
941    tr_peerIoReadBytes( msgs->io, inbuf, tmp, inlen );
942    evbuffer_add( msgs->inBlock, tmp, inlen );
943
944    /* update our tables accordingly */
945    assert( inlen >= msgs->blockToUs.length );
946    msgs->blockToUs.length -= inlen;
947    msgs->info->peerSentPieceDataAt = time( NULL );
948    clientGotBytes( msgs, inlen );
949
950    /* if this was the entire block, save it */
951    if( !msgs->blockToUs.length )
952    {
953        dbgmsg( msgs, "w00t -- got block index %u, offset %u", msgs->blockToUs.index, msgs->blockToUs.offset );
954        assert( (int)EVBUFFER_LENGTH( msgs->inBlock ) == tr_torBlockCountBytes( msgs->torrent, _tr_block(msgs->torrent,msgs->blockToUs.index, msgs->blockToUs.offset) ) );
955        gotBlock( msgs, msgs->inBlock,
956                        msgs->blockToUs.index,
957                        msgs->blockToUs.offset,
958                        EVBUFFER_LENGTH( msgs->inBlock ) );
959        evbuffer_drain( msgs->inBlock, ~0 );
960        msgs->state = AWAITING_BT_LENGTH;
961    }
962
963    /* cleanup */
964    tr_free( tmp );
965    return READ_AGAIN;
966}
967
968static ReadState
969canRead( struct bufferevent * evin, void * vmsgs )
970{
971    ReadState ret;
972    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
973    struct evbuffer * inbuf = EVBUFFER_INPUT ( evin );
974
975    if( !canDownload( msgs ) )
976    {
977        msgs->notListening = 1;
978        tr_peerIoSetIOMode ( msgs->io, 0, EV_READ );
979        ret = READ_DONE;
980    }
981    else switch( msgs->state )
982    {
983        case AWAITING_BT_LENGTH:  ret = readBtLength  ( msgs, inbuf ); break;
984        case AWAITING_BT_MESSAGE: ret = readBtMessage ( msgs, inbuf ); break;
985        case READING_BT_PIECE:    ret = readBtPiece   ( msgs, inbuf ); break;
986        default: assert( 0 );
987    }
988
989    return ret;
990}
991
992static void
993sendKeepalive( tr_peermsgs * msgs )
994{
995    dbgmsg( msgs, "sending a keepalive message" );
996    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
997}
998
999/**
1000***
1001**/
1002
1003static int
1004canWrite( const tr_peermsgs * msgs )
1005{
1006    /* don't let our outbuffer get too large */
1007    if( tr_peerIoWriteBytesWaiting( msgs->io ) > 8192 )
1008        return FALSE;
1009
1010    return TRUE;
1011}
1012
1013static int
1014canUpload( const tr_peermsgs * msgs )
1015{
1016    const tr_torrent * tor = msgs->torrent;
1017
1018    if( !canWrite( msgs ) )
1019        return FALSE;
1020
1021    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1022        return !tor->handle->useUploadLimit || tr_rcCanTransfer( tor->handle->upload );
1023
1024    if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
1025        return tr_rcCanTransfer( tor->upload );
1026
1027    return TRUE;
1028}
1029
1030static int
1031pulse( void * vmsgs )
1032{
1033    const time_t now = time( NULL );
1034    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1035    size_t len;
1036
1037    /* if we froze out a downloaded block because of speed limits,
1038       start listening to the peer again */
1039    if( msgs->notListening && canDownload( msgs ) )
1040    {
1041        fprintf( stderr, "msgs %p thawing out...\n", msgs );
1042        msgs->notListening = 0;
1043        tr_peerIoSetIOMode ( msgs->io, EV_READ, 0 );
1044    }
1045
1046    if( !canWrite( msgs ) )
1047    {
1048    }
1049    else if(( len = EVBUFFER_LENGTH( msgs->outBlock ) ))
1050    {
1051        while ( len && canUpload( msgs ) )
1052        {
1053            const size_t outlen = len; //MIN( len, 2048 );
1054            tr_peerIoWrite( msgs->io, EVBUFFER_DATA(msgs->outBlock), outlen );
1055            evbuffer_drain( msgs->outBlock, outlen );
1056            peerGotBytes( msgs, outlen );
1057            len -= outlen;
1058            msgs->info->clientSentPieceDataAt = now;
1059            msgs->clientSentAnythingAt = now;
1060            dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen, (int)len );
1061        }
1062    }
1063    else if(( len = EVBUFFER_LENGTH( msgs->outMessages ) ))
1064    {
1065        tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1066        msgs->clientSentAnythingAt = now;
1067    }
1068    else if(( msgs->peerAskedFor ))
1069    {
1070        struct peer_request * req = tr_list_pop_front( &msgs->peerAskedFor );
1071        uint8_t * tmp = tr_new( uint8_t, req->length );
1072        const uint32_t msglen = sizeof(uint8_t) + 2*sizeof(uint32_t) + req->length;
1073        tr_ioRead( msgs->torrent, req->index, req->offset, req->length, tmp );
1074        tr_peerIoWriteUint32( msgs->io, msgs->outBlock, msglen );
1075        tr_peerIoWriteUint8 ( msgs->io, msgs->outBlock, BT_PIECE );
1076        tr_peerIoWriteUint32( msgs->io, msgs->outBlock, req->index );
1077        tr_peerIoWriteUint32( msgs->io, msgs->outBlock, req->offset );
1078        tr_peerIoWriteBytes ( msgs->io, msgs->outBlock, tmp, req->length );
1079        tr_free( tmp );
1080        dbgmsg( msgs, "putting req into out queue: index %d, offset %d, length %d ... %d blocks left in our queue", (int)req->index, (int)req->offset, (int)req->length, tr_list_size(msgs->peerAskedFor) );
1081        tr_free( req );
1082    }
1083    else if( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
1084    {
1085        sendKeepalive( msgs );
1086    }
1087
1088    return TRUE; /* loop forever */
1089}
1090
1091static void
1092didWrite( struct bufferevent * evin UNUSED, void * vpeer )
1093{
1094    pulse( (tr_peermsgs *) vpeer );
1095}
1096
1097static void
1098gotError( struct bufferevent * evbuf UNUSED, short what UNUSED, void * vpeer )
1099{
1100    fireGotError( vpeer );
1101}
1102
1103static void
1104sendBitfield( tr_peermsgs * msgs )
1105{
1106    const tr_bitfield * bitfield = tr_cpPieceBitfield( msgs->torrent->completion );
1107
1108    dbgmsg( msgs, "sending peer a bitfield message" );
1109    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + bitfield->len );
1110    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_BITFIELD );
1111    tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, bitfield->bits, bitfield->len );
1112}
1113
1114/**
1115***
1116**/
1117
1118#define MAX_DIFFS 50
1119
1120typedef struct
1121{
1122    tr_pex * added;
1123    tr_pex * dropped;
1124    tr_pex * elements;
1125    int addedCount;
1126    int droppedCount;
1127    int elementCount;
1128    int diffCount;
1129}
1130PexDiffs;
1131
1132static void
1133pexAddedCb( void * vpex, void * userData )
1134{
1135    PexDiffs * diffs = (PexDiffs *) userData;
1136    tr_pex * pex = (tr_pex *) vpex;
1137    if( diffs->diffCount < MAX_DIFFS )
1138    {
1139        diffs->diffCount++;
1140        diffs->added[diffs->addedCount++] = *pex;
1141        diffs->elements[diffs->elementCount++] = *pex;
1142    }
1143}
1144
1145static void
1146pexRemovedCb( void * vpex, void * userData )
1147{
1148    PexDiffs * diffs = (PexDiffs *) userData;
1149    tr_pex * pex = (tr_pex *) vpex;
1150    if( diffs->diffCount < MAX_DIFFS )
1151    {
1152        diffs->diffCount++;
1153        diffs->dropped[diffs->droppedCount++] = *pex;
1154    }
1155}
1156
1157static void
1158pexElementCb( void * vpex, void * userData )
1159{
1160    PexDiffs * diffs = (PexDiffs *) userData;
1161    tr_pex * pex = (tr_pex *) vpex;
1162    if( diffs->diffCount < MAX_DIFFS )
1163    {
1164        diffs->diffCount++;
1165        diffs->elements[diffs->elementCount++] = *pex;
1166    }
1167}
1168
1169static void
1170sendPex( tr_peermsgs * msgs )
1171{
1172    if( msgs->peerSupportsPex && !msgs->torrent->pexDisabled )
1173    {
1174        int i;
1175        tr_pex * newPex = NULL;
1176        const int newCount = tr_peerMgrGetPeers( msgs->handle->peerMgr, msgs->torrent->info.hash, &newPex );
1177        PexDiffs diffs;
1178        benc_val_t val, *added, *dropped, *flags;
1179        uint8_t *tmp, *walk;
1180        char * benc;
1181        int bencLen;
1182
1183        /* build the diffs */
1184        diffs.added = tr_new( tr_pex, newCount );
1185        diffs.addedCount = 0;
1186        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1187        diffs.droppedCount = 0;
1188        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1189        diffs.elementCount = 0;
1190        diffs.diffCount = 0;
1191        tr_set_compare( msgs->pex, msgs->pexCount,
1192                        newPex, newCount,
1193                        tr_pexCompare, sizeof(tr_pex),
1194                        pexRemovedCb, pexAddedCb, pexElementCb, &diffs );
1195        dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1196
1197        /* update peer */
1198        tr_free( msgs->pex );
1199        msgs->pex = diffs.elements;
1200        msgs->pexCount = diffs.elementCount;
1201
1202        /* build the pex payload */
1203        tr_bencInit( &val, TYPE_DICT );
1204        tr_bencDictReserve( &val, 3 );
1205
1206        /* "added" */
1207        added = tr_bencDictAdd( &val, "added" );
1208        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1209        for( i=0; i<diffs.addedCount; ++i ) {
1210            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1211            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1212        }
1213        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1214        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
1215
1216        /* "added.f" */
1217        flags = tr_bencDictAdd( &val, "added.f" );
1218        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1219        for( i=0; i<diffs.addedCount; ++i ) {
1220            dbgmsg( msgs, "PEX -->> -->> flag is %d", (int)diffs.added[i].flags );
1221            *walk++ = diffs.added[i].flags;
1222        }
1223        assert( ( walk - tmp ) == diffs.addedCount );
1224        tr_bencInitStr( flags, tmp, walk-tmp, FALSE );
1225
1226        /* "dropped" */
1227        dropped = tr_bencDictAdd( &val, "dropped" );
1228        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1229        for( i=0; i<diffs.droppedCount; ++i ) {
1230            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1231            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1232        }
1233        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1234        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
1235
1236        /* write the pex message */
1237        benc = tr_bencSaveMalloc( &val, &bencLen );
1238        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 2*sizeof(uint8_t) + bencLen );
1239        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_LTEP );
1240        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, OUR_LTEP_PEX );
1241        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, benc, bencLen );
1242
1243        /* cleanup */
1244        tr_free( benc );
1245        tr_bencFree( &val );
1246        tr_free( diffs.added );
1247        tr_free( diffs.dropped );
1248        tr_free( newPex );
1249
1250        msgs->clientSentPexAt = time( NULL );
1251    }
1252}
1253
1254static int
1255pexPulse( void * vpeer )
1256{
1257    sendPex( vpeer );
1258    return TRUE;
1259}
1260
1261/**
1262***
1263**/
1264
1265tr_peermsgs*
1266tr_peerMsgsNew( struct tr_torrent * torrent, struct tr_peer * info )
1267{
1268    tr_peermsgs * msgs;
1269
1270    assert( info != NULL );
1271    assert( info->io != NULL );
1272
1273    msgs = tr_new0( tr_peermsgs, 1 );
1274    msgs->publisher = tr_publisherNew( );
1275    msgs->info = info;
1276    msgs->handle = torrent->handle;
1277    msgs->torrent = torrent;
1278    msgs->io = info->io;
1279    msgs->info->clientIsChoked = 1;
1280    msgs->info->peerIsChoked = 1;
1281    msgs->info->clientIsInterested = 0;
1282    msgs->info->peerIsInterested = 0;
1283    msgs->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1284    msgs->pulseTimer = tr_timerNew( msgs->handle, pulse, msgs, PEER_PULSE_INTERVAL );
1285    msgs->pexTimer = tr_timerNew( msgs->handle, pexPulse, msgs, PEX_INTERVAL );
1286    msgs->outMessages = evbuffer_new( );
1287    msgs->outBlock = evbuffer_new( );
1288    msgs->inBlock = evbuffer_new( );
1289
1290    tr_peerIoSetIOFuncs( msgs->io, canRead, didWrite, gotError, msgs );
1291    tr_peerIoSetIOMode( msgs->io, EV_READ|EV_WRITE, 0 );
1292
1293    /**
1294    ***  If we initiated this connection,
1295    ***  we may need to send LTEP/AZMP handshakes.
1296    ***  Otherwise we'll wait for the peer to send theirs first.
1297    **/
1298    if( !tr_peerIoIsIncoming( msgs->io ) )
1299    {
1300        if ( tr_peerIoSupportsLTEP( msgs->io ) ) {
1301            sendLtepHandshake( msgs );
1302           
1303        } else if ( tr_peerIoSupportsAZMP( msgs->io ) ) {
1304            dbgmsg( msgs, "FIXME: need to send AZMP handshake" );
1305           
1306        } else {
1307            /* no-op */
1308        }
1309    }
1310
1311    sendBitfield( msgs );
1312    return msgs;
1313}
1314
1315void
1316tr_peerMsgsFree( tr_peermsgs* msgs )
1317{
1318    if( msgs != NULL )
1319    {
1320        tr_timerFree( &msgs->pulseTimer );
1321        tr_timerFree( &msgs->pexTimer );
1322        tr_publisherFree( &msgs->publisher );
1323        tr_list_foreach( msgs->clientAskedFor, tr_free );
1324        tr_list_free( &msgs->clientAskedFor );
1325        tr_list_foreach( msgs->peerAskedFor, tr_free );
1326        tr_list_free( &msgs->peerAskedFor );
1327        evbuffer_free( msgs->outMessages );
1328        evbuffer_free( msgs->outBlock );
1329        evbuffer_free( msgs->inBlock );
1330        tr_free( msgs->pex );
1331        msgs->pexCount = 0;
1332        tr_free( msgs );
1333    }
1334}
1335
1336tr_publisher_tag
1337tr_peerMsgsSubscribe( tr_peermsgs       * peer,
1338                      tr_delivery_func    func,
1339                      void              * userData )
1340{
1341    return tr_publisherSubscribe( peer->publisher, func, userData );
1342}
1343
1344void
1345tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
1346                        tr_publisher_tag    tag )
1347{
1348    tr_publisherUnsubscribe( peer->publisher, tag );
1349}
Note: See TracBrowser for help on using the repository browser.