source: trunk/libtransmission/peer-msgs.c @ 3155

Last change on this file since 3155 was 3155, checked in by charles, 15 years ago
  • add sanity checks to incoming piece data requests. This may solve the inout.c:99 assertion failure.
  • rename the gtk client from transmission-gtk' to transmission' for parity with the mac client.
  • Property svn:keywords set to Date Rev Author Id
File size: 38.7 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 3155 2007-09-23 23:38:39Z charles $
11 */
12
13#include <assert.h>
14#include <ctype.h>
15#include <stdio.h>
16#include <stdlib.h>
17#include <string.h>
18
19#include <arpa/inet.h>
20
21#include <sys/types.h> /* event.h needs this */
22#include <event.h>
23
24#include "transmission.h"
25#include "bencode.h"
26#include "completion.h"
27#include "inout.h"
28#include "list.h"
29#include "peer-io.h"
30#include "peer-mgr.h"
31#include "peer-mgr-private.h"
32#include "peer-msgs.h"
33#include "ratecontrol.h"
34#include "trevent.h"
35#include "utils.h"
36
37/**
38***
39**/
40
41#define PEX_INTERVAL (60 * 1000)
42
43#define PEER_PULSE_INTERVAL (100)
44
45enum
46{
47    BT_CHOKE           = 0,
48    BT_UNCHOKE         = 1,
49    BT_INTERESTED      = 2,
50    BT_NOT_INTERESTED  = 3,
51    BT_HAVE            = 4,
52    BT_BITFIELD        = 5,
53    BT_REQUEST         = 6,
54    BT_PIECE           = 7,
55    BT_CANCEL          = 8,
56    BT_PORT            = 9,
57    BT_SUGGEST         = 13,
58    BT_HAVE_ALL        = 14,
59    BT_HAVE_NONE       = 15,
60    BT_REJECT          = 16,
61    BT_ALLOWED_FAST    = 17,
62    BT_LTEP            = 20,
63
64    LTEP_HANDSHAKE     = 0,
65
66    OUR_LTEP_PEX       = 1
67};
68
69enum
70{
71    AWAITING_BT_LENGTH,
72    AWAITING_BT_MESSAGE,
73    READING_BT_PIECE
74};
75
76struct peer_request
77{
78    uint32_t index;
79    uint32_t offset;
80    uint32_t length;
81    time_t time_requested;
82};
83
84static int
85peer_request_compare( const void * va, const void * vb )
86{
87    struct peer_request * a = (struct peer_request*) va;
88    struct peer_request * b = (struct peer_request*) vb;
89    if( a->index != b->index ) return a->index - b->index;
90    if( a->offset != b->offset ) return a->offset - b->offset;
91    if( a->length != b->length ) return a->length - b->length;
92    return 0;
93}
94
95struct tr_peermsgs
96{
97    tr_peer * info;
98
99    tr_handle * handle;
100    tr_torrent * torrent;
101    tr_peerIo * io;
102
103    tr_publisher_t * publisher;
104
105    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
106    struct evbuffer * outBlock;    /* the block we're currently sending */
107    struct evbuffer * inBlock;     /* the block we're currently receiving */
108    tr_list * peerAskedFor;
109    tr_list * clientAskedFor;
110
111    tr_timer * pulseTimer;
112    tr_timer * pexTimer;
113
114    struct peer_request blockToUs; /* the block currntly being sent to us */
115
116    time_t lastReqAddedAt;
117    time_t clientSentPexAt;
118
119    unsigned int notListening          : 1;
120    unsigned int peerSupportsPex       : 1;
121    unsigned int hasSentLtepHandshake  : 1;
122
123    uint8_t state;
124
125    uint8_t ut_pex_id;
126    uint16_t listeningPort;
127
128    uint16_t pexCount;
129
130    uint32_t incomingMessageLength;
131
132    tr_pex * pex;
133};
134
135/**
136***
137**/
138
139static void
140myDebug( const char * file, int line, const struct tr_peermsgs * msgs, const char * fmt, ... )
141{
142    FILE * fp = tr_getLog( );
143    if( fp != NULL )
144    {
145        va_list args;
146        const char * addr = tr_peerIoGetAddrStr( msgs->io );
147        struct evbuffer * buf = evbuffer_new( );
148        evbuffer_add_printf( buf, "[%s:%d] %s (%p) ", file, line, addr, msgs );
149        va_start( args, fmt );
150        evbuffer_add_vprintf( buf, fmt, args );
151        va_end( args );
152        fprintf( fp, "%s\n", EVBUFFER_DATA(buf) );
153        evbuffer_free( buf );
154    }
155}
156
157#define dbgmsg(handshake, fmt...) myDebug(__FILE__, __LINE__, handshake, ##fmt )
158
159/**
160***  EVENTS
161**/
162
163static const tr_peermsgs_event blankEvent = { 0, 0, 0, 0, 0.0f };
164
165static void
166publish( tr_peermsgs * msgs, tr_peermsgs_event * e )
167{
168    tr_publisherPublish( msgs->publisher, msgs->info, e );
169}
170
171static void
172fireGotError( tr_peermsgs * msgs )
173{
174    tr_peermsgs_event e = blankEvent;
175    e.eventType = TR_PEERMSG_GOT_ERROR;
176    publish( msgs, &e );
177}
178
179static void
180fireNeedReq( tr_peermsgs * msgs )
181{
182    tr_peermsgs_event e = blankEvent;
183    e.eventType = TR_PEERMSG_NEED_REQ;
184    publish( msgs, &e );
185}
186
187static void
188firePeerProgress( tr_peermsgs * msgs )
189{
190    tr_peermsgs_event e = blankEvent;
191    e.eventType = TR_PEERMSG_PEER_PROGRESS;
192    e.progress = msgs->info->progress;
193    publish( msgs, &e );
194}
195
196static void
197fireClientHave( tr_peermsgs * msgs, uint32_t pieceIndex )
198{
199    tr_peermsgs_event e = blankEvent;
200    e.eventType = TR_PEERMSG_CLIENT_HAVE;
201    e.pieceIndex = pieceIndex;
202    publish( msgs, &e );
203}
204
205static void
206fireGotBlock( tr_peermsgs * msgs, uint32_t pieceIndex, uint32_t offset, uint32_t length )
207{
208    tr_peermsgs_event e = blankEvent;
209    e.eventType = TR_PEERMSG_CLIENT_BLOCK;
210    e.pieceIndex = pieceIndex;
211    e.offset = offset;
212    e.length = length;
213    publish( msgs, &e );
214}
215
216/**
217***  INTEREST
218**/
219
220static int
221isPieceInteresting( const tr_peermsgs   * peer,
222                    int                   piece )
223{
224    const tr_torrent * torrent = peer->torrent;
225    if( torrent->info.pieces[piece].dnd ) /* we don't want it */
226        return FALSE;
227    if( tr_cpPieceIsComplete( torrent->completion, piece ) ) /* we already have it */
228        return FALSE;
229    if( !tr_bitfieldHas( peer->info->have, piece ) ) /* peer doesn't have it */
230        return FALSE;
231    if( tr_bitfieldHas( peer->info->banned, piece ) ) /* peer is banned for it */
232        return FALSE;
233    return TRUE;
234}
235
236static int
237isPeerInteresting( const tr_peermsgs * msgs )
238{
239    const int clientIsSeed = tr_cpGetStatus( msgs->torrent->completion ) != TR_CP_INCOMPLETE;
240    const int peerIsSeed = msgs->info->progress >= 1.0;
241
242    if( peerIsSeed )
243    {
244        return !clientIsSeed;
245    }
246    else if( clientIsSeed )
247    {
248        return !peerIsSeed;
249    }
250    else /* we're both leeches... */
251    {
252        int i;
253        const tr_torrent * torrent = msgs->torrent;
254        const tr_bitfield * bitfield = tr_cpPieceBitfield( torrent->completion );
255
256        if( !msgs->info->have ) /* We don't know what this peer has... what should this be? */
257            return TRUE;
258
259        assert( bitfield->len == msgs->info->have->len );
260        for( i=0; i<torrent->info.pieceCount; ++i )
261            if( isPieceInteresting( msgs, i ) )
262                return TRUE;
263
264        return FALSE;
265    }
266}
267
268static void
269sendInterest( tr_peermsgs * msgs, int weAreInterested )
270{
271    const uint32_t len = sizeof(uint8_t);
272    const uint8_t bt_msgid = weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED;
273
274    assert( msgs != NULL );
275    assert( weAreInterested==0 || weAreInterested==1 );
276
277    msgs->info->clientIsInterested = weAreInterested;
278    dbgmsg( msgs, ": sending an %s message", (weAreInterested ? "INTERESTED" : "NOT_INTERESTED") );
279    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
280    tr_peerIoWriteBytes( msgs->io, msgs->outMessages, &bt_msgid, 1 );
281}
282
283static void
284updateInterest( tr_peermsgs * msgs )
285{
286    const int i = isPeerInteresting( msgs );
287    if( i != msgs->info->clientIsInterested )
288        sendInterest( msgs, i );
289    if( i )
290        fireNeedReq( msgs );
291}
292
293void
294tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
295{
296    assert( msgs != NULL );
297    assert( msgs->info != NULL );
298    assert( choke==0 || choke==1 );
299
300    if( msgs->info->peerIsChoked != choke )
301    {
302        const uint32_t len = sizeof(uint8_t);
303        const uint8_t bt_msgid = choke ? BT_CHOKE : BT_UNCHOKE;
304
305        msgs->info->peerIsChoked = choke ? 1 : 0;
306        if( msgs->info )
307        {
308            tr_list_foreach( msgs->peerAskedFor, tr_free );
309            tr_list_free( &msgs->peerAskedFor );
310        }
311
312        dbgmsg( msgs, "sending a %s message", (choke ? "CHOKE" : "UNCHOKE") );
313        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
314        tr_peerIoWriteBytes( msgs->io, msgs->outMessages, &bt_msgid, 1 );
315    }
316}
317
318/**
319***
320**/
321
322void
323tr_peerMsgsCancel( tr_peermsgs * msgs,
324                   uint32_t      pieceIndex,
325                   uint32_t      offset,
326                   uint32_t      length )
327{
328    tr_list * node;
329    struct peer_request tmp;
330
331    assert( msgs != NULL );
332    assert( length > 0 );
333
334    tmp.index = pieceIndex;
335    tmp.offset = offset;
336    tmp.length = length;
337
338    node = tr_list_find( msgs->clientAskedFor, &tmp, peer_request_compare );
339    if( node != NULL )
340    {
341        /* cancel the request */
342        const uint8_t bt_msgid = BT_CANCEL;
343        const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
344        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
345        tr_peerIoWriteBytes( msgs->io, msgs->outMessages, &bt_msgid, 1 );
346        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
347        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
348        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
349
350        /* remove it from our "requested" list */
351        tr_list_remove_data( &msgs->peerAskedFor, node->data );
352    }
353}
354
355/**
356***
357**/
358
359void
360tr_peerMsgsHave( tr_peermsgs * msgs,
361                 uint32_t      pieceIndex )
362{
363    const uint8_t bt_msgid = BT_HAVE;
364    const uint32_t len = sizeof(uint8_t) + sizeof(uint32_t);
365    dbgmsg( msgs, "w00t telling them we HAVE piece #%d", pieceIndex );
366    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
367    tr_peerIoWriteBytes( msgs->io, msgs->outMessages, &bt_msgid, 1 );
368    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
369
370    updateInterest( msgs );
371}
372
373/**
374***
375**/
376
377static int
378pulse( void * vmsgs );
379
380int
381tr_peerMsgsAddRequest( tr_peermsgs * msgs,
382                       uint32_t      index, 
383                       uint32_t      offset, 
384                       uint32_t      length )
385{
386    const uint8_t bt_msgid = BT_REQUEST;
387    const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
388    struct peer_request * req;
389    int maxSize;
390
391    assert( msgs != NULL );
392    assert( msgs->torrent != NULL );
393    assert( index < ((uint32_t)msgs->torrent->info.pieceCount) );
394    assert( offset < (uint32_t)tr_torPieceCountBytes( msgs->torrent, (int)index ) );
395    assert( (offset + length) <= (uint32_t)tr_torPieceCountBytes( msgs->torrent, (int)index ) );
396
397    if( msgs->info->clientIsChoked )
398        return TR_ADDREQ_CLIENT_CHOKED;
399
400    if( !tr_bitfieldHas( msgs->info->have, index ) )
401        return TR_ADDREQ_MISSING;
402
403    maxSize = MIN( 2 + (int)(tr_peerIoGetRateToClient(msgs->io)/10), 100 );
404    //if( ( time(NULL) - msgs->lastReqAddedAt <= 5 ) && ( tr_list_size( msgs->clientAskedFor) >= maxSize ) )
405    if( tr_list_size( msgs->clientAskedFor) >= maxSize )
406        return TR_ADDREQ_FULL;
407
408    dbgmsg( msgs, "w00t peer has a max request queue size of %d... adding request for piece %d, offset %d", maxSize, (int)index, (int)offset );
409
410    /* queue the request */
411    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
412    tr_peerIoWriteBytes( msgs->io, msgs->outMessages, &bt_msgid, 1 );
413    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, index );
414    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
415    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
416
417    /* add it to our `requests sent' list */
418    req = tr_new( struct peer_request, 1 );
419    req->index = index;
420    req->offset = offset;
421    req->length = length;
422    req->time_requested = msgs->lastReqAddedAt = time( NULL );
423    tr_list_append( &msgs->clientAskedFor, req );
424    pulse( msgs );
425
426    return TR_ADDREQ_OK;
427}
428
429/**
430***
431**/
432
433static void
434sendLtepHandshake( tr_peermsgs * msgs )
435{
436    benc_val_t val, *m;
437    char * buf;
438    int len;
439    const char * v = TR_NAME " " USERAGENT_PREFIX;
440    const int port = tr_getPublicPort( msgs->handle );
441    struct evbuffer * outbuf = evbuffer_new( );
442    uint32_t msglen;
443    const uint8_t tr_msgid = 20; /* ltep extension id */
444    const uint8_t ltep_msgid = 0; /* handshake id */
445
446    dbgmsg( msgs, "sending an ltep handshake" );
447    tr_bencInit( &val, TYPE_DICT );
448    tr_bencDictReserve( &val, 3 );
449    m  = tr_bencDictAdd( &val, "m" );
450    tr_bencInit( m, TYPE_DICT );
451    tr_bencDictReserve( m, 1 );
452    tr_bencInitInt( tr_bencDictAdd( m, "ut_pex" ), OUR_LTEP_PEX );
453    if( port > 0 )
454        tr_bencInitInt( tr_bencDictAdd( &val, "p" ), port );
455    tr_bencInitStr( tr_bencDictAdd( &val, "v" ), v, 0, 1 );
456    buf = tr_bencSaveMalloc( &val,  &len );
457
458    msglen = sizeof(tr_msgid) + sizeof(ltep_msgid) + len;
459    tr_peerIoWriteUint32( msgs->io, outbuf, msglen );
460    tr_peerIoWriteBytes ( msgs->io, outbuf, &tr_msgid, 1 );
461    tr_peerIoWriteBytes ( msgs->io, outbuf, &ltep_msgid, 1 );
462    tr_peerIoWriteBytes ( msgs->io, outbuf, buf, len );
463
464    tr_peerIoWriteBuf( msgs->io, outbuf );
465
466    msgs->hasSentLtepHandshake = 1;
467
468    /* cleanup */
469    tr_bencFree( &val );
470    tr_free( buf );
471    evbuffer_free( outbuf );
472
473}
474
475static void
476parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
477{
478    benc_val_t val, * sub;
479    uint8_t * tmp = tr_new( uint8_t, len );
480
481    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
482
483    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
484        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
485        tr_free( tmp );
486        return;
487    }
488
489    /* check supported messages for utorrent pex */
490    sub = tr_bencDictFind( &val, "m" );
491    if( tr_bencIsDict( sub ) ) {
492        sub = tr_bencDictFind( sub, "ut_pex" );
493        if( tr_bencIsInt( sub ) ) {
494            msgs->peerSupportsPex = 1;
495            msgs->ut_pex_id = (uint8_t) sub->val.i;
496            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
497        }
498    }
499
500#if 0
501    /* get peer's client name */
502    sub = tr_bencDictFind( &val, "v" );
503    if( tr_bencIsStr( sub ) ) {
504        int i;
505        tr_free( msgs->info->client );
506        fprintf( stderr, "dictionary says client is [%s]\n", sub->val.s.s );
507        msgs->info->client = tr_strndup( sub->val.s.s, sub->val.s.i );
508for( i=0; i<sub->val.s.i; ++i ) { fprintf( stderr, "[%c] (%d)\n", sub->val.s.s[i], (int)sub->val.s.s[i] );
509                                  if( (int)msgs->info->client[i]==-75 ) msgs->info->client[i]='u'; }
510        fprintf( stderr, "msgs->client is now [%s]\n", msgs->info->client );
511    }
512#endif
513
514    /* get peer's listening port */
515    sub = tr_bencDictFind( &val, "p" );
516    if( tr_bencIsInt( sub ) ) {
517        msgs->listeningPort = htons( (uint16_t)sub->val.i );
518        dbgmsg( msgs, "msgs->port is now %hu", msgs->listeningPort );
519    }
520
521    tr_bencFree( &val );
522    tr_free( tmp );
523}
524
525static void
526parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
527{
528    benc_val_t val, * sub;
529    uint8_t * tmp;
530
531    if( msgs->torrent->pexDisabled ) /* no sharing! */
532        return;
533
534    tmp = tr_new( uint8_t, msglen );
535    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
536
537    if( tr_bencLoad( tmp, msglen, &val, NULL ) || !tr_bencIsDict( &val ) ) {
538        dbgmsg( msgs, "GET can't read extended-pex dictionary" );
539        tr_free( tmp );
540        return;
541    }
542
543    sub = tr_bencDictFind( &val, "added" );
544    if( tr_bencIsStr(sub) && ((sub->val.s.i % 6) == 0)) {
545        const int n = sub->val.s.i / 6 ;
546        dbgmsg( msgs, "got %d peers from uT pex", n );
547        tr_peerMgrAddPeers( msgs->handle->peerMgr,
548                            msgs->torrent->info.hash,
549                            TR_PEER_FROM_PEX,
550                            (uint8_t*)sub->val.s.s, n );
551    }
552
553    tr_bencFree( &val );
554    tr_free( tmp );
555}
556
557static void
558sendPex( tr_peermsgs * msgs );
559
560static void
561parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
562{
563    uint8_t ltep_msgid;
564
565    tr_peerIoReadBytes( msgs->io, inbuf, &ltep_msgid, 1 );
566    msglen--;
567
568    if( ltep_msgid == LTEP_HANDSHAKE )
569    {
570        dbgmsg( msgs, "got ltep handshake" );
571        parseLtepHandshake( msgs, msglen, inbuf );
572        if( !msgs->hasSentLtepHandshake )
573            sendLtepHandshake( msgs );
574    }
575    else if( ltep_msgid == msgs->ut_pex_id )
576    {
577        dbgmsg( msgs, "got ut pex" );
578        msgs->peerSupportsPex = 1;
579        parseUtPex( msgs, msglen, inbuf );
580    }
581    else
582    {
583        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
584        evbuffer_drain( inbuf, msglen );
585    }
586}
587
588static int
589readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf )
590{
591    uint32_t len;
592    const size_t needlen = sizeof(uint32_t);
593
594    if( EVBUFFER_LENGTH(inbuf) < needlen )
595        return READ_MORE;
596
597    tr_peerIoReadUint32( msgs->io, inbuf, &len );
598
599    if( len == 0 ) { /* peer sent us a keepalive message */
600        dbgmsg( msgs, "peer sent us a keepalive message..." );
601        msgs->info->peerSentKeepaliveAt = time( NULL );
602    } else {
603        dbgmsg( msgs, "peer is sending us a message with %"PRIu64" bytes...\n", (uint64_t)len );
604        msgs->incomingMessageLength = len;
605        msgs->state = AWAITING_BT_MESSAGE;
606    } return READ_AGAIN;
607}
608
609static int
610requestIsValid( const tr_peermsgs * msgs, struct peer_request * req )
611{
612    const tr_torrent * tor = msgs->torrent;
613    assert( req != NULL );
614    assert( req->index < (uint32_t)tor->info.pieceCount );
615    assert( (int)req->offset < tr_torPieceCountBytes( tor, (int)req->index ) );
616    assert( tr_pieceOffset( tor, req->index, req->offset, req->length ) <= tor->info.totalSize );
617    return TRUE;
618}
619
620static int
621readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf )
622{
623    uint8_t id;
624    uint32_t ui32;
625    uint32_t msglen = msgs->incomingMessageLength;
626
627    if( EVBUFFER_LENGTH(inbuf) < msglen )
628        return READ_MORE;
629
630    tr_peerIoReadBytes( msgs->io, inbuf, &id, 1 );
631    msglen--;
632    dbgmsg( msgs, "peer sent us a message... "
633                  "bt id number is %d, and remaining len is %d", (int)id, (int)msglen );
634
635    switch( id )
636    {
637        case BT_CHOKE:
638            assert( msglen == 0 );
639            dbgmsg( msgs, "w00t peer sent us a BT_CHOKE" );
640            msgs->info->clientIsChoked = 1;
641            tr_list_foreach( msgs->peerAskedFor, tr_free );
642            tr_list_free( &msgs->peerAskedFor );
643            tr_list_foreach( msgs->clientAskedFor, tr_free );
644            tr_list_free( &msgs->clientAskedFor );
645            break;
646
647        case BT_UNCHOKE:
648            assert( msglen == 0 );
649            dbgmsg( msgs, "w00t peer sent us a BT_UNCHOKE" );
650            msgs->info->clientIsChoked = 0;
651            fireNeedReq( msgs );
652            break;
653
654        case BT_INTERESTED:
655            assert( msglen == 0 );
656            dbgmsg( msgs, "w00t peer sent us a BT_INTERESTED" );
657            msgs->info->peerIsInterested = 1;
658            break;
659
660        case BT_NOT_INTERESTED:
661            assert( msglen == 0 );
662            dbgmsg( msgs, "w00t peer sent us a BT_NOT_INTERESTED" );
663            msgs->info->peerIsInterested = 0;
664            break;
665
666        case BT_HAVE:
667            assert( msglen == 4 );
668            dbgmsg( msgs, "w00t peer sent us a BT_HAVE" );
669            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
670            tr_bitfieldAdd( msgs->info->have, ui32 );
671            msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
672            dbgmsg( msgs, "after the HAVE message, peer progress is %f", msgs->info->progress );
673            updateInterest( msgs );
674            firePeerProgress( msgs );
675            break;
676
677        case BT_BITFIELD:
678            assert( msglen == msgs->info->have->len );
679            dbgmsg( msgs, "w00t peer sent us a BT_BITFIELD" );
680            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
681            msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
682            dbgmsg( msgs, "after the HAVE message, peer progress is %f", msgs->info->progress );
683            updateInterest( msgs );
684            fireNeedReq( msgs );
685            firePeerProgress( msgs );
686            break;
687
688        case BT_REQUEST: {
689            struct peer_request * req;
690            assert( msglen == 12 );
691            dbgmsg( msgs, "peer sent us a BT_REQUEST" );
692            req = tr_new( struct peer_request, 1 );
693            tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
694            tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
695            tr_peerIoReadUint32( msgs->io, inbuf, &req->length );
696            if( !msgs->info->peerIsChoked && requestIsValid( msgs, req ) )
697                tr_list_append( &msgs->peerAskedFor, req );
698            else
699                tr_free( req );
700            break;
701        }
702
703        case BT_CANCEL: {
704            struct peer_request req;
705            void * data;
706            assert( msglen == 12 );
707            dbgmsg( msgs, "peer sent us a BT_CANCEL" );
708            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
709            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
710            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
711            data = tr_list_remove( &msgs->peerAskedFor, &req, peer_request_compare );
712            tr_free( data );
713            break;
714        }
715
716        case BT_PIECE: {
717            dbgmsg( msgs, "peer sent us a BT_PIECE" );
718            assert( msgs->blockToUs.length == 0 );
719            msgs->state = READING_BT_PIECE;
720            tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.index );
721            tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.offset );
722            msgs->blockToUs.length = msglen - 8;
723            assert( msgs->blockToUs.length > 0 );
724            assert( EVBUFFER_LENGTH(msgs->inBlock) == 0 );
725            //evbuffer_drain( msgs->inBlock, ~0 );
726            return READ_AGAIN;
727            break;
728        }
729
730        case BT_PORT: {
731            assert( msglen == 2 );
732            dbgmsg( msgs, "peer sent us a BT_PORT" );
733            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->listeningPort );
734            break;
735        }
736       
737        case BT_SUGGEST: {
738            /* tiennou TODO */
739            break;
740        }
741           
742        case BT_HAVE_ALL: {
743            /* tiennou TODO */
744            break;
745        }
746           
747        case BT_HAVE_NONE: {
748            /* tiennou TODO */
749            break;
750        }
751           
752        case BT_REJECT: {
753            /* tiennou TODO */
754            break;
755        }
756           
757        case BT_ALLOWED_FAST: {
758            /* tiennou TODO */
759            break;
760        }
761           
762        case BT_LTEP:
763            dbgmsg( msgs, "peer sent us a BT_LTEP" );
764            parseLtep( msgs, msglen, inbuf );
765            break;
766
767        default:
768            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
769            tr_peerIoDrain( msgs->io, inbuf, msglen );
770            assert( 0 );
771    }
772
773    msgs->incomingMessageLength = -1;
774    msgs->state = AWAITING_BT_LENGTH;
775    return READ_AGAIN;
776}
777
778static void
779clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
780{
781    tr_torrent * tor = msgs->torrent;
782    tor->activityDate = tr_date( );
783    tor->downloadedCur += byteCount;
784    tr_rcTransferred( tor->download, byteCount );
785    tr_rcTransferred( tor->handle->download, byteCount );
786}
787
788static void
789peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
790{
791    tr_torrent * tor = msgs->torrent;
792    tor->downloadedCur += byteCount;
793    tor->uploadedCur += byteCount;
794    tr_rcTransferred( tor->upload, byteCount );
795    tr_rcTransferred( tor->handle->upload, byteCount );
796}
797
798static int
799canDownload( const tr_peermsgs * msgs )
800{
801    tr_torrent * tor = msgs->torrent;
802
803    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
804        return !tor->handle->useDownloadLimit || tr_rcCanTransfer( tor->handle->download );
805
806    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
807        return tr_rcCanTransfer( tor->download );
808
809    return TRUE;
810}
811
812static void
813reassignBytesToCorrupt( tr_peermsgs * msgs, uint32_t byteCount )
814{
815    tr_torrent * tor = msgs->torrent;
816
817    /* increment the `corrupt' field */
818    tor->corruptCur += byteCount;
819
820    /* decrement the `downloaded' field */
821    if( tor->downloadedCur >= byteCount )
822        tor->downloadedCur -= byteCount;
823    else
824        tor->downloadedCur = 0;
825}
826
827
828static void
829gotBadPiece( tr_peermsgs * msgs, uint32_t pieceIndex )
830{
831    const uint32_t byteCount = tr_torPieceCountBytes( msgs->torrent, (int)pieceIndex );
832    reassignBytesToCorrupt( msgs, byteCount );
833}
834
835static void
836gotUnwantedBlock( tr_peermsgs * msgs, uint32_t index UNUSED, uint32_t offset UNUSED, uint32_t length )
837{
838    reassignBytesToCorrupt( msgs, length );
839}
840
841static void
842addUsToBlamefield( tr_peermsgs * msgs, uint32_t index )
843{
844    if( !msgs->info->blame )
845         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
846    tr_bitfieldAdd( msgs->info->blame, index );
847}
848
849static void
850gotBlock( tr_peermsgs      * msgs,
851          struct evbuffer  * inbuf,
852          uint32_t           index,
853          uint32_t           offset,
854          uint32_t           length )
855{
856    tr_torrent * tor = msgs->torrent;
857    const int block = _tr_block( tor, index, offset );
858    struct peer_request key, *req;
859
860    /**
861    *** Remove the block from our `we asked for this' list
862    **/
863
864    key.index = index;
865    key.offset = offset;
866    key.length = length;
867    req = (struct peer_request*) tr_list_remove( &msgs->clientAskedFor, &key,
868                                                 peer_request_compare );
869    if( req == NULL ) {
870        gotUnwantedBlock( msgs, index, offset, length );
871        dbgmsg( msgs, "we didn't ask for this message..." );
872        return;
873    }
874    dbgmsg( msgs, "w00t peer sent us a block.  turnaround time was %d seconds", 
875                     (int)(time(NULL) - req->time_requested) );
876    tr_free( req );
877    dbgmsg( msgs, "peer has %d more blocks we've asked for",
878                  tr_list_size(msgs->clientAskedFor));
879
880    /**
881    *** Error checks
882    **/
883
884    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
885        dbgmsg( msgs, "have this block already..." );
886        tr_dbg( "have this block already..." );
887        gotUnwantedBlock( msgs, index, offset, length );
888        return;
889    }
890
891    if( (int)length != tr_torBlockCountBytes( tor, block ) ) {
892        dbgmsg( msgs, "block is the wrong length..." );
893        tr_dbg( "block is the wrong length..." );
894        gotUnwantedBlock( msgs, index, offset, length );
895        return;
896    }
897
898    /**
899    ***  Write the block
900    **/
901
902    if( tr_ioWrite( tor, index, offset, length, EVBUFFER_DATA( inbuf ))) {
903        return;
904    }
905
906    tr_cpBlockAdd( tor->completion, block );
907
908    addUsToBlamefield( msgs, index );
909
910    fireGotBlock( msgs, index, offset, length );
911    fireNeedReq( msgs );
912
913    /**
914    ***  Handle if this was the last block in the piece
915    **/
916
917    if( tr_cpPieceIsComplete( tor->completion, index ) )
918    {
919        if( tr_ioHash( tor, index ) )
920        {
921            gotBadPiece( msgs, index );
922            return;
923        }
924
925        fireClientHave( msgs, index );
926    }
927}
928
929
930static ReadState
931readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf )
932{
933    uint32_t inlen;
934    uint8_t * tmp;
935
936    assert( msgs != NULL );
937    assert( msgs->blockToUs.length > 0 );
938    assert( inbuf != NULL );
939    assert( EVBUFFER_LENGTH( inbuf ) > 0 );
940
941    /* read from the inbuf into our block buffer */
942    inlen = MIN( EVBUFFER_LENGTH(inbuf), msgs->blockToUs.length );
943    tmp = tr_new( uint8_t, inlen );
944    tr_peerIoReadBytes( msgs->io, inbuf, tmp, inlen );
945    evbuffer_add( msgs->inBlock, tmp, inlen );
946
947    /* update our tables accordingly */
948    assert( inlen >= msgs->blockToUs.length );
949    msgs->blockToUs.length -= inlen;
950    msgs->info->peerSentPieceDataAt = time( NULL );
951    clientGotBytes( msgs, inlen );
952
953    /* if this was the entire block, save it */
954    if( !msgs->blockToUs.length )
955    {
956        dbgmsg( msgs, "w00t -- got block index %u, offset %u", msgs->blockToUs.index, msgs->blockToUs.offset );
957        assert( (int)EVBUFFER_LENGTH( msgs->inBlock ) == tr_torBlockCountBytes( msgs->torrent, _tr_block(msgs->torrent,msgs->blockToUs.index, msgs->blockToUs.offset) ) );
958        gotBlock( msgs, msgs->inBlock,
959                        msgs->blockToUs.index,
960                        msgs->blockToUs.offset,
961                        EVBUFFER_LENGTH( msgs->inBlock ) );
962        evbuffer_drain( msgs->inBlock, ~0 );
963        msgs->state = AWAITING_BT_LENGTH;
964    }
965
966    /* cleanup */
967    tr_free( tmp );
968    return READ_AGAIN;
969}
970
971static ReadState
972canRead( struct bufferevent * evin, void * vmsgs )
973{
974    ReadState ret;
975    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
976    struct evbuffer * inbuf = EVBUFFER_INPUT ( evin );
977
978    if( !canDownload( msgs ) )
979    {
980        msgs->notListening = 1;
981        tr_peerIoSetIOMode ( msgs->io, 0, EV_READ );
982        ret = READ_DONE;
983    }
984    else switch( msgs->state )
985    {
986        case AWAITING_BT_LENGTH:  ret = readBtLength  ( msgs, inbuf ); break;
987        case AWAITING_BT_MESSAGE: ret = readBtMessage ( msgs, inbuf ); break;
988        case READING_BT_PIECE:    ret = readBtPiece   ( msgs, inbuf ); break;
989        default: assert( 0 );
990    }
991
992    return ret;
993}
994
995/**
996***
997**/
998
999static int
1000canWrite( const tr_peermsgs * msgs )
1001{
1002    /* don't let our outbuffer get too large */
1003    if( tr_peerIoWriteBytesWaiting( msgs->io ) > 1024 )
1004        return FALSE;
1005
1006    return TRUE;
1007}
1008
1009static int
1010canUpload( const tr_peermsgs * msgs )
1011{
1012    const tr_torrent * tor = msgs->torrent;
1013
1014    if( !canWrite( msgs ) )
1015        return FALSE;
1016
1017    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1018        return !tor->handle->useUploadLimit || tr_rcCanTransfer( tor->handle->upload );
1019
1020    if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
1021        return tr_rcCanTransfer( tor->upload );
1022
1023    return TRUE;
1024}
1025
1026static int
1027pulse( void * vmsgs )
1028{
1029    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1030    size_t len;
1031
1032    /* if we froze out a downloaded block because of speed limits,
1033       start listening to the peer again */
1034    if( msgs->notListening && canDownload( msgs ) )
1035    {
1036        fprintf( stderr, "msgs %p thawing out...\n", msgs );
1037        msgs->notListening = 0;
1038        tr_peerIoSetIOMode ( msgs->io, EV_READ, 0 );
1039    }
1040
1041    if( !canWrite( msgs ) )
1042    {
1043    }
1044    else if(( len = EVBUFFER_LENGTH( msgs->outBlock ) ))
1045    {
1046        while ( len && canUpload( msgs ) )
1047        {
1048            const size_t outlen = MIN( len, 1024 );
1049            tr_peerIoWrite( msgs->io, EVBUFFER_DATA(msgs->outBlock), outlen );
1050            evbuffer_drain( msgs->outBlock, outlen );
1051            peerGotBytes( msgs, outlen );
1052            len -= outlen;
1053            msgs->info->clientSentPieceDataAt = time( NULL );
1054            dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen, (int)len );
1055        }
1056    }
1057    else if(( len = EVBUFFER_LENGTH( msgs->outMessages ) ))
1058    {
1059        tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1060    }
1061    else if(( msgs->peerAskedFor ))
1062    {
1063        struct peer_request * req = tr_list_pop_front( &msgs->peerAskedFor );
1064        uint8_t * tmp = tr_new( uint8_t, req->length );
1065        const uint8_t msgid = BT_PIECE;
1066        const uint32_t msglen = sizeof(uint8_t) + sizeof(uint32_t)*2 + req->length;
1067        tr_ioRead( msgs->torrent, req->index, req->offset, req->length, tmp );
1068        tr_peerIoWriteUint32( msgs->io, msgs->outBlock, msglen );
1069        tr_peerIoWriteBytes ( msgs->io, msgs->outBlock, &msgid, 1 );
1070        tr_peerIoWriteUint32( msgs->io, msgs->outBlock, req->index );
1071        tr_peerIoWriteUint32( msgs->io, msgs->outBlock, req->offset );
1072        tr_peerIoWriteBytes ( msgs->io, msgs->outBlock, tmp, req->length );
1073        tr_free( tmp );
1074        dbgmsg( msgs, "putting req into out queue: index %d, offset %d, length %d ... %d blocks left in our queue", (int)req->index, (int)req->offset, (int)req->length, tr_list_size(msgs->peerAskedFor) );
1075        tr_free( req );
1076    }
1077
1078    return TRUE; /* loop forever */
1079}
1080
1081static void
1082didWrite( struct bufferevent * evin UNUSED, void * vpeer )
1083{
1084    pulse( (tr_peermsgs *) vpeer );
1085}
1086
1087static void
1088gotError( struct bufferevent * evbuf UNUSED, short what UNUSED, void * vpeer )
1089{
1090    fireGotError( vpeer );
1091}
1092
1093static void
1094sendBitfield( tr_peermsgs * msgs )
1095{
1096    const tr_bitfield * bitfield = tr_cpPieceBitfield( msgs->torrent->completion );
1097    const uint32_t len = sizeof(uint8_t) + bitfield->len;
1098    const uint8_t bt_msgid = BT_BITFIELD;
1099
1100    dbgmsg( msgs, "sending peer a bitfield message\n", msgs );
1101    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
1102    tr_peerIoWriteBytes( msgs->io, msgs->outMessages, &bt_msgid, 1 );
1103    tr_peerIoWriteBytes( msgs->io, msgs->outMessages, bitfield->bits, bitfield->len );
1104}
1105
1106/**
1107***
1108**/
1109
1110#define MAX_DIFFS 50
1111
1112typedef struct
1113{
1114    tr_pex * added;
1115    tr_pex * dropped;
1116    tr_pex * elements;
1117    int addedCount;
1118    int droppedCount;
1119    int elementCount;
1120    int diffCount;
1121}
1122PexDiffs;
1123
1124static void
1125pexAddedCb( void * vpex, void * userData )
1126{
1127    PexDiffs * diffs = (PexDiffs *) userData;
1128    tr_pex * pex = (tr_pex *) vpex;
1129    if( diffs->diffCount < MAX_DIFFS )
1130    {
1131        diffs->diffCount++;
1132        diffs->added[diffs->addedCount++] = *pex;
1133        diffs->elements[diffs->elementCount++] = *pex;
1134    }
1135}
1136
1137static void
1138pexRemovedCb( void * vpex, void * userData )
1139{
1140    PexDiffs * diffs = (PexDiffs *) userData;
1141    tr_pex * pex = (tr_pex *) vpex;
1142    if( diffs->diffCount < MAX_DIFFS )
1143    {
1144        diffs->diffCount++;
1145        diffs->dropped[diffs->droppedCount++] = *pex;
1146    }
1147}
1148
1149static void
1150pexElementCb( void * vpex, void * userData )
1151{
1152    PexDiffs * diffs = (PexDiffs *) userData;
1153    tr_pex * pex = (tr_pex *) vpex;
1154    if( diffs->diffCount < MAX_DIFFS )
1155    {
1156        diffs->diffCount++;
1157        diffs->elements[diffs->elementCount++] = *pex;
1158    }
1159}
1160
1161static void
1162sendPex( tr_peermsgs * msgs )
1163{
1164    if( msgs->peerSupportsPex && !msgs->torrent->pexDisabled )
1165    {
1166        int i;
1167        tr_pex * newPex = NULL;
1168        const int newCount = tr_peerMgrGetPeers( msgs->handle->peerMgr, msgs->torrent->info.hash, &newPex );
1169        PexDiffs diffs;
1170        benc_val_t val, *added, *dropped, *flags;
1171        uint8_t *tmp, *walk;
1172        char * benc;
1173        int bencLen;
1174        const uint8_t bt_msgid = BT_LTEP;
1175        const uint8_t ltep_msgid = OUR_LTEP_PEX;
1176
1177        /* build the diffs */
1178        diffs.added = tr_new( tr_pex, newCount );
1179        diffs.addedCount = 0;
1180        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1181        diffs.droppedCount = 0;
1182        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1183        diffs.elementCount = 0;
1184        diffs.diffCount = 0;
1185        tr_set_compare( msgs->pex, msgs->pexCount,
1186                        newPex, newCount,
1187                        tr_pexCompare, sizeof(tr_pex),
1188                        pexRemovedCb, pexAddedCb, pexElementCb, &diffs );
1189        dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1190
1191        /* update peer */
1192        tr_free( msgs->pex );
1193        msgs->pex = diffs.elements;
1194        msgs->pexCount = diffs.elementCount;
1195
1196        /* build the pex payload */
1197        tr_bencInit( &val, TYPE_DICT );
1198        tr_bencDictReserve( &val, 3 );
1199
1200        /* "added" */
1201        added = tr_bencDictAdd( &val, "added" );
1202        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1203        for( i=0; i<diffs.addedCount; ++i ) {
1204            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1205            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1206        }
1207        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1208        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
1209
1210        /* "added.f" */
1211        flags = tr_bencDictAdd( &val, "added.f" );
1212        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1213        for( i=0; i<diffs.addedCount; ++i ) {
1214            dbgmsg( msgs, "PEX -->> -->> flag is %d", (int)diffs.added[i].flags );
1215            *walk++ = diffs.added[i].flags;
1216        }
1217        assert( ( walk - tmp ) == diffs.addedCount );
1218        tr_bencInitStr( flags, tmp, walk-tmp, FALSE );
1219
1220        /* "dropped" */
1221        dropped = tr_bencDictAdd( &val, "dropped" );
1222        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1223        for( i=0; i<diffs.droppedCount; ++i ) {
1224            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1225            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1226        }
1227        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1228        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
1229
1230        /* write the pex message */
1231        benc = tr_bencSaveMalloc( &val, &bencLen );
1232        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 1 + 1 + bencLen );
1233        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, &bt_msgid, 1 );
1234        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, &ltep_msgid, 1 );
1235        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, benc, bencLen );
1236
1237        /* cleanup */
1238        tr_free( benc );
1239        tr_bencFree( &val );
1240        tr_free( diffs.added );
1241        tr_free( diffs.dropped );
1242        tr_free( newPex );
1243
1244        msgs->clientSentPexAt = time( NULL );
1245    }
1246}
1247
1248static int
1249pexPulse( void * vpeer )
1250{
1251    sendPex( vpeer );
1252    return TRUE;
1253}
1254
1255/**
1256***
1257**/
1258
1259tr_peermsgs*
1260tr_peerMsgsNew( struct tr_torrent * torrent, struct tr_peer * info )
1261{
1262    tr_peermsgs * msgs;
1263
1264    assert( info != NULL );
1265    assert( info->io != NULL );
1266
1267    msgs = tr_new0( tr_peermsgs, 1 );
1268    msgs->publisher = tr_publisherNew( );
1269    msgs->info = info;
1270    msgs->handle = torrent->handle;
1271    msgs->torrent = torrent;
1272    msgs->io = info->io;
1273    msgs->info->clientIsChoked = 1;
1274    msgs->info->peerIsChoked = 1;
1275    msgs->info->clientIsInterested = 0;
1276    msgs->info->peerIsInterested = 0;
1277    msgs->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1278    msgs->pulseTimer = tr_timerNew( msgs->handle, pulse, msgs, PEER_PULSE_INTERVAL );
1279    msgs->pexTimer = tr_timerNew( msgs->handle, pexPulse, msgs, PEX_INTERVAL );
1280    msgs->outMessages = evbuffer_new( );
1281    msgs->outBlock = evbuffer_new( );
1282    msgs->inBlock = evbuffer_new( );
1283
1284    tr_peerIoSetIOFuncs( msgs->io, canRead, didWrite, gotError, msgs );
1285    tr_peerIoSetIOMode( msgs->io, EV_READ|EV_WRITE, 0 );
1286
1287    /**
1288    ***  If we initiated this connection,
1289    ***  we may need to send LTEP/AZMP handshakes.
1290    ***  Otherwise we'll wait for the peer to send theirs first.
1291    **/
1292    if( !tr_peerIoIsIncoming( msgs->io ) )
1293    {
1294        if ( tr_peerIoSupportsLTEP( msgs->io ) ) {
1295            sendLtepHandshake( msgs );
1296           
1297        } else if ( tr_peerIoSupportsAZMP( msgs->io ) ) {
1298            dbgmsg( msgs, "FIXME: need to send AZMP handshake" );
1299           
1300        } else {
1301            /* no-op */
1302        }
1303    }
1304
1305    sendBitfield( msgs );
1306    return msgs;
1307}
1308
1309void
1310tr_peerMsgsFree( tr_peermsgs* p )
1311{
1312    if( p != NULL )
1313    {
1314        tr_timerFree( &p->pulseTimer );
1315        tr_timerFree( &p->pexTimer );
1316        tr_publisherFree( &p->publisher );
1317        tr_list_foreach( p->clientAskedFor, tr_free );
1318        tr_list_free( &p->clientAskedFor );
1319        tr_list_foreach( p->peerAskedFor, tr_free );
1320        tr_list_free( &p->peerAskedFor );
1321        evbuffer_free( p->outMessages );
1322        evbuffer_free( p->outBlock );
1323        evbuffer_free( p->inBlock );
1324        tr_free( p );
1325    }
1326}
1327
1328tr_publisher_tag
1329tr_peerMsgsSubscribe( tr_peermsgs       * peer,
1330                      tr_delivery_func    func,
1331                      void              * userData )
1332{
1333    return tr_publisherSubscribe( peer->publisher, func, userData );
1334}
1335
1336void
1337tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
1338                        tr_publisher_tag    tag )
1339{
1340    tr_publisherUnsubscribe( peer->publisher, tag );
1341}
Note: See TracBrowser for help on using the repository browser.