source: trunk/libtransmission/peer-msgs.c @ 3243

Last change on this file since 3243 was 3243, checked in by charles, 15 years ago

tweaks

  • Property svn:keywords set to Date Rev Author Id
File size: 47.8 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 3243 2007-10-01 00:08:12Z charles $
11 */
12
13#include <assert.h>
14#include <ctype.h>
15#include <stdio.h>
16#include <stdlib.h>
17#include <string.h>
18
19#include <arpa/inet.h>
20
21#include <sys/types.h> /* event.h needs this */
22#include <event.h>
23
24#include "transmission.h"
25#include "bencode.h"
26#include "completion.h"
27#include "inout.h"
28#include "list.h"
29#include "peer-io.h"
30#include "peer-mgr.h"
31#include "peer-mgr-private.h"
32#include "peer-msgs.h"
33#include "ratecontrol.h"
34#include "trevent.h"
35#include "utils.h"
36
37/**
38***
39**/
40
41#define MAX_ALLOWED_SET_COUNT   10 /* number of pieces generated for allow-fast,
42                                    threshold for fast-allowing others */
43
44enum
45{
46    BT_CHOKE                = 0,
47    BT_UNCHOKE              = 1,
48    BT_INTERESTED           = 2,
49    BT_NOT_INTERESTED       = 3,
50    BT_HAVE                 = 4,
51    BT_BITFIELD             = 5,
52    BT_REQUEST              = 6,
53    BT_PIECE                = 7,
54    BT_CANCEL               = 8,
55    BT_PORT                 = 9,
56    BT_SUGGEST              = 13,
57    BT_HAVE_ALL             = 14,
58    BT_HAVE_NONE            = 15,
59    BT_REJECT               = 16,
60    BT_ALLOWED_FAST         = 17,
61    BT_LTEP                 = 20,
62
63    LTEP_HANDSHAKE          = 0,
64
65    OUR_LTEP_PEX            = 1,
66
67    MAX_REQUEST_BYTE_COUNT  = (16 * 1024), /* drop requests who want too much */
68
69    KEEPALIVE_INTERVAL_SECS = 90,          /* idle seconds before we send a keepalive */
70    PEX_INTERVAL            = (60 * 1000), /* msec between calls to sendPex() */
71    PEER_PULSE_INTERVAL     = (50) ,       /* msec between calls to pulse() */
72};
73
74enum
75{
76    AWAITING_BT_LENGTH,
77    AWAITING_BT_MESSAGE,
78    READING_BT_PIECE
79};
80
81struct peer_request
82{
83    uint32_t index;
84    uint32_t offset;
85    uint32_t length;
86    time_t time_requested;
87};
88
89static int
90peer_request_compare( const void * va, const void * vb )
91{
92    struct peer_request * a = (struct peer_request*) va;
93    struct peer_request * b = (struct peer_request*) vb;
94    if( a->index != b->index ) return a->index - b->index;
95    if( a->offset != b->offset ) return a->offset - b->offset;
96    if( a->length != b->length ) return a->length - b->length;
97    return 0;
98}
99
100struct tr_peermsgs
101{
102    tr_peer * info;
103
104    tr_handle * handle;
105    tr_torrent * torrent;
106    tr_peerIo * io;
107
108    tr_publisher_t * publisher;
109
110    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
111    struct evbuffer * outBlock;    /* the block we're currently sending */
112    struct evbuffer * inBlock;     /* the block we're currently receiving */
113    tr_list * peerAskedFor;
114    tr_list * clientAskedFor;
115
116    tr_timer * pulseTimer;
117    tr_timer * pexTimer;
118
119    struct peer_request blockToUs; /* the block currntly being sent to us */
120
121    time_t lastReqAddedAt;
122    time_t clientSentPexAt;
123    time_t clientSentAnythingAt;
124
125    unsigned int notListening             : 1;
126    unsigned int peerSupportsPex          : 1;
127    unsigned int clientSentLtepHandshake  : 1;
128    unsigned int peerSentLtepHandshake    : 1;
129   
130    tr_bitfield * clientAllowedPieces;
131    tr_bitfield * peerAllowedPieces;
132   
133    uint8_t state;
134
135    uint8_t ut_pex_id;
136
137    uint16_t pexCount;
138
139    uint32_t incomingMessageLength;
140
141    tr_pex * pex;
142};
143
144/**
145***
146**/
147
148static void
149myDebug( const char * file, int line, const struct tr_peermsgs * msgs, const char * fmt, ... )
150{
151    FILE * fp = tr_getLog( );
152    if( fp != NULL )
153    {
154        va_list args;
155        const char * addr = tr_peerIoGetAddrStr( msgs->io );
156        struct evbuffer * buf = evbuffer_new( );
157        evbuffer_add_printf( buf, "[%s:%d] %s (%p) ", file, line, addr, msgs->io );
158        va_start( args, fmt );
159        evbuffer_add_vprintf( buf, fmt, args );
160        va_end( args );
161        fprintf( fp, "%s\n", EVBUFFER_DATA(buf) );
162        evbuffer_free( buf );
163    }
164}
165
166#define dbgmsg(handshake, fmt...) myDebug(__FILE__, __LINE__, handshake, ##fmt )
167
168/**
169***  EVENTS
170**/
171
172static const tr_peermsgs_event blankEvent = { 0, 0, 0, 0, 0.0f };
173
174static void
175publish( tr_peermsgs * msgs, tr_peermsgs_event * e )
176{
177    tr_publisherPublish( msgs->publisher, msgs->info, e );
178}
179
180static void
181fireGotError( tr_peermsgs * msgs )
182{
183    tr_peermsgs_event e = blankEvent;
184    e.eventType = TR_PEERMSG_GOT_ERROR;
185    publish( msgs, &e );
186}
187
188static void
189fireNeedReq( tr_peermsgs * msgs )
190{
191    tr_peermsgs_event e = blankEvent;
192    e.eventType = TR_PEERMSG_NEED_REQ;
193    publish( msgs, &e );
194}
195
196static void
197firePeerProgress( tr_peermsgs * msgs )
198{
199    tr_peermsgs_event e = blankEvent;
200    e.eventType = TR_PEERMSG_PEER_PROGRESS;
201    e.progress = msgs->info->progress;
202    publish( msgs, &e );
203}
204
205static void
206fireClientHave( tr_peermsgs * msgs, uint32_t pieceIndex )
207{
208    tr_peermsgs_event e = blankEvent;
209    e.eventType = TR_PEERMSG_CLIENT_HAVE;
210    e.pieceIndex = pieceIndex;
211    publish( msgs, &e );
212}
213
214static void
215fireGotBlock( tr_peermsgs * msgs, uint32_t pieceIndex, uint32_t offset, uint32_t length )
216{
217    tr_peermsgs_event e = blankEvent;
218    e.eventType = TR_PEERMSG_CLIENT_BLOCK;
219    e.pieceIndex = pieceIndex;
220    e.offset = offset;
221    e.length = length;
222    publish( msgs, &e );
223}
224
225/**
226***  INTEREST
227**/
228
229static int
230isPieceInteresting( const tr_peermsgs   * peer,
231                    int                   piece )
232{
233    const tr_torrent * torrent = peer->torrent;
234    if( torrent->info.pieces[piece].dnd ) /* we don't want it */
235        return FALSE;
236    if( tr_cpPieceIsComplete( torrent->completion, piece ) ) /* we already have it */
237        return FALSE;
238    if( !tr_bitfieldHas( peer->info->have, piece ) ) /* peer doesn't have it */
239        return FALSE;
240    if( tr_bitfieldHas( peer->info->banned, piece ) ) /* peer is banned for it */
241        return FALSE;
242    return TRUE;
243}
244
245/* "interested" means we'll ask for piece data from the peer if they unchoke us */
246static int
247isPeerInteresting( const tr_peermsgs * msgs )
248{
249    int i;
250    const tr_torrent * torrent;
251    const tr_bitfield * bitfield;
252    const int clientIsSeed = tr_cpGetStatus( msgs->torrent->completion ) != TR_CP_INCOMPLETE;
253
254    if( clientIsSeed )
255        return FALSE;
256
257    torrent = msgs->torrent;
258    bitfield = tr_cpPieceBitfield( torrent->completion );
259
260    if( !msgs->info->have ) /* We don't know what this peer has... what should this be? */
261        return TRUE;
262
263    assert( bitfield->len == msgs->info->have->len );
264    for( i=0; i<torrent->info.pieceCount; ++i )
265        if( isPieceInteresting( msgs, i ) )
266            return TRUE;
267
268    return FALSE;
269}
270
271static void
272sendInterest( tr_peermsgs * msgs, int weAreInterested )
273{
274    assert( msgs != NULL );
275    assert( weAreInterested==0 || weAreInterested==1 );
276
277    msgs->info->clientIsInterested = weAreInterested;
278    dbgmsg( msgs, ": sending an %s message", (weAreInterested ? "INTERESTED" : "NOT_INTERESTED") );
279
280    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
281    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
282}
283
284static void
285updateInterest( tr_peermsgs * msgs )
286{
287    const int i = isPeerInteresting( msgs );
288    if( i != msgs->info->clientIsInterested )
289        sendInterest( msgs, i );
290    if( i )
291        fireNeedReq( msgs );
292}
293
294void
295tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
296{
297    assert( msgs != NULL );
298    assert( msgs->info != NULL );
299    assert( choke==0 || choke==1 );
300
301    if( msgs->info->peerIsChoked != choke )
302    {
303        msgs->info->peerIsChoked = choke;
304        tr_list * walk;
305       
306        if( choke )
307            for( walk = msgs->peerAskedFor; walk != NULL; )
308            {
309                tr_list * next = walk->next;
310                /* We shouldn't reject a peer's fast allowed requests at choke */
311                struct peer_request *req = walk->data;
312                if ( !tr_bitfieldHas( msgs->peerAllowedPieces, req->index ) )
313                {
314                    tr_list_remove_data( &msgs->peerAskedFor, req );
315                    tr_free( req );
316                }
317                walk = next;
318            }
319
320        dbgmsg( msgs, "sending a %s message", (choke ? "CHOKE" : "UNCHOKE") );
321        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
322        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, choke ? BT_CHOKE : BT_UNCHOKE );
323    }
324}
325
326/**
327***
328**/
329
330void
331tr_peerMsgsCancel( tr_peermsgs * msgs,
332                   uint32_t      pieceIndex,
333                   uint32_t      offset,
334                   uint32_t      length )
335{
336    tr_list * node;
337    struct peer_request tmp;
338
339    assert( msgs != NULL );
340    assert( length > 0 );
341
342    /* have we asked the peer for this piece? */
343    tmp.index = pieceIndex;
344    tmp.offset = offset;
345    tmp.length = length;
346    node = tr_list_remove( &msgs->clientAskedFor, &tmp, peer_request_compare );
347
348    /* if so, send a cancel message */
349    if( node != NULL ) {
350        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + 3*sizeof(uint32_t) );
351        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_CANCEL );
352        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
353        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
354        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
355        tr_free( node );
356    }
357}
358
359/**
360***
361**/
362
363void
364tr_peerMsgsHave( tr_peermsgs * msgs,
365                 uint32_t      pieceIndex )
366{
367    dbgmsg( msgs, "w00t telling them we HAVE piece #%d", pieceIndex );
368
369    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
370    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_HAVE );
371    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
372
373    updateInterest( msgs );
374}
375#if 0
376static void
377sendFastSuggest( tr_peermsgs * msgs,
378                 uint32_t      pieceIndex )
379{
380    dbgmsg( msgs, "w00t SUGGESTing them piece #%d", pieceIndex );
381    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
382    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
383    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
384   
385    updateInterest( msgs );
386}
387#endif
388static void
389sendFastHave( tr_peermsgs * msgs,
390              int           all)
391{
392    dbgmsg( msgs, "w00t telling them we %s pieces", (all ? "HAVE_ALL" : "HAVE_NONE" ) );
393    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
394    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, ( all ? BT_HAVE_ALL : BT_HAVE_NONE ) );
395   
396    updateInterest( msgs );
397}
398
399static void
400sendFastReject( tr_peermsgs * msgs,
401                uint32_t      pieceIndex,
402                uint32_t      offset,
403                uint32_t      length )
404{
405    assert( msgs != NULL );
406    assert( length > 0 );
407   
408    /* reject the request */
409    const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
410    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
411    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_REJECT );
412    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
413    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
414    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
415}
416
417static void
418sendFastAllowed( tr_peermsgs * msgs,
419                 uint32_t      pieceIndex)
420{
421    dbgmsg( msgs, "w00t telling them we ALLOW_FAST piece #%d", pieceIndex );
422    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
423    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_ALLOWED_FAST );
424    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
425}
426
427
428
429static void
430sendFastAllowedSet( tr_peermsgs * msgs )
431{
432    int i = 0;
433    while (i <= msgs->torrent->info.pieceCount )
434    {
435        if ( tr_bitfieldHas( msgs->peerAllowedPieces, i) )
436            sendFastAllowed( msgs, i );
437        i++;
438    }
439}
440
441
442/**
443***
444**/
445
446static int
447pulse( void * vmsgs );
448
449static int
450reqIsValid( const tr_peermsgs * msgs, uint32_t index, uint32_t offset, uint32_t length )
451{
452    const tr_torrent * tor = msgs->torrent;
453
454    if( index >= (uint32_t) tor->info.pieceCount )
455        return FALSE;
456    if ( (int)offset >= tr_torPieceCountBytes( tor, (int)index ) )
457        return FALSE;
458    if( length > MAX_REQUEST_BYTE_COUNT )
459        return FALSE;
460    if( tr_pieceOffset( tor, index, offset, length ) > tor->info.totalSize )
461        return FALSE;
462
463    return TRUE;
464}
465
466static int
467requestIsValid( const tr_peermsgs * msgs, struct peer_request * req )
468{
469    return reqIsValid( msgs, req->index, req->offset, req->length );
470}
471
472int
473tr_peerMsgsAddRequest( tr_peermsgs * msgs,
474                       uint32_t      index, 
475                       uint32_t      offset, 
476                       uint32_t      length )
477{
478    struct peer_request * req;
479    int maxSize;
480
481    assert( msgs != NULL );
482    assert( msgs->torrent != NULL );
483    assert( reqIsValid( msgs, index, offset, length ) );
484
485    if( msgs->info->clientIsChoked )
486        return TR_ADDREQ_CLIENT_CHOKED;
487
488    if( !tr_bitfieldHas( msgs->info->have, index ) )
489        return TR_ADDREQ_MISSING;
490
491    maxSize = MIN( 2 + (int)(tr_peerIoGetRateToClient(msgs->io)/10), 100 );
492    //if( ( time(NULL) - msgs->lastReqAddedAt <= 5 ) && ( tr_list_size( msgs->clientAskedFor) >= maxSize ) )
493    if( tr_list_size( msgs->clientAskedFor) >= maxSize )
494        return TR_ADDREQ_FULL;
495
496    dbgmsg( msgs, "w00t peer has a max request queue size of %d... adding request for piece %d, offset %d", maxSize, (int)index, (int)offset );
497
498    /* queue the request */
499    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + 3*sizeof(uint32_t) );
500    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_REQUEST );
501    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, index );
502    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
503    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
504
505    /* add it to our `requests sent' list */
506    req = tr_new( struct peer_request, 1 );
507    req->index = index;
508    req->offset = offset;
509    req->length = length;
510    req->time_requested = msgs->lastReqAddedAt = time( NULL );
511    tr_list_append( &msgs->clientAskedFor, req );
512    pulse( msgs );
513
514    return TR_ADDREQ_OK;
515}
516
517/**
518***
519**/
520
521static void
522sendLtepHandshake( tr_peermsgs * msgs )
523{
524    benc_val_t val, *m;
525    char * buf;
526    int len;
527    int pex;
528    const char * v = TR_NAME " " USERAGENT_PREFIX;
529    const int port = tr_getPublicPort( msgs->handle );
530    struct evbuffer * outbuf;
531
532    if( msgs->clientSentLtepHandshake )
533        return;
534
535    outbuf = evbuffer_new( );
536    dbgmsg( msgs, "sending an ltep handshake" );
537    msgs->clientSentLtepHandshake = 1;
538
539    /* decide if we want to advertise pex support */
540    if( msgs->torrent->pexDisabled )
541        pex = 0;
542    else if( msgs->peerSentLtepHandshake )
543        pex = msgs->peerSupportsPex ? 1 : 0;
544    else
545        pex = 1;
546
547    tr_bencInit( &val, TYPE_DICT );
548    tr_bencDictReserve( &val, 4 );
549    tr_bencInitInt( tr_bencDictAdd( &val, "e" ), 1 );
550    m  = tr_bencDictAdd( &val, "m" );
551    tr_bencInit( m, TYPE_DICT );
552    if( pex ) {
553        tr_bencDictReserve( m, 1 );
554        tr_bencInitInt( tr_bencDictAdd( m, "ut_pex" ), OUR_LTEP_PEX );
555    }
556    if( port > 0 )
557        tr_bencInitInt( tr_bencDictAdd( &val, "p" ), port );
558    tr_bencInitStr( tr_bencDictAdd( &val, "v" ), v, 0, 1 );
559    buf = tr_bencSaveMalloc( &val,  &len );
560
561    tr_peerIoWriteUint32( msgs->io, outbuf, 2*sizeof(uint8_t) + len );
562    tr_peerIoWriteUint8 ( msgs->io, outbuf, BT_LTEP );
563    tr_peerIoWriteUint8 ( msgs->io, outbuf, LTEP_HANDSHAKE );
564    tr_peerIoWriteBytes ( msgs->io, outbuf, buf, len );
565
566    tr_peerIoWriteBuf( msgs->io, outbuf );
567
568    dbgmsg( msgs, "here is the ltep handshake we sent:" );
569    tr_bencPrint( &val );
570
571    /* cleanup */
572    tr_bencFree( &val );
573    tr_free( buf );
574    evbuffer_free( outbuf );
575}
576
577static void
578parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
579{
580    benc_val_t val, * sub;
581    uint8_t * tmp = tr_new( uint8_t, len );
582
583    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
584    msgs->peerSentLtepHandshake = 1;
585
586    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
587        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
588        tr_free( tmp );
589        return;
590    }
591
592    dbgmsg( msgs, "here is the ltep handshake we read:" );
593    tr_bencPrint( &val );
594
595    /* does the peer prefer encrypted connections? */
596    sub = tr_bencDictFind( &val, "e" );
597    if( tr_bencIsInt( sub ) )
598        msgs->info->encryption_preference = sub->val.i
599                                      ? ENCRYPTION_PREFERENCE_YES
600                                      : ENCRYPTION_PREFERENCE_NO;
601
602    /* check supported messages for utorrent pex */
603    sub = tr_bencDictFind( &val, "m" );
604    if( tr_bencIsDict( sub ) ) {
605        sub = tr_bencDictFind( sub, "ut_pex" );
606        if( tr_bencIsInt( sub ) ) {
607            msgs->peerSupportsPex = 1;
608            msgs->ut_pex_id = (uint8_t) sub->val.i;
609            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
610        }
611    }
612
613#if 0
614    /* get peer's client name */
615    sub = tr_bencDictFind( &val, "v" );
616    if( tr_bencIsStr( sub ) ) {
617        int i;
618        tr_free( msgs->info->client );
619        fprintf( stderr, "dictionary says client is [%s]\n", sub->val.s.s );
620        msgs->info->client = tr_strndup( sub->val.s.s, sub->val.s.i );
621for( i=0; i<sub->val.s.i; ++i ) { fprintf( stderr, "[%c] (%d)\n", sub->val.s.s[i], (int)sub->val.s.s[i] );
622                                  if( (int)msgs->info->client[i]==-75 ) msgs->info->client[i]='u'; }
623        fprintf( stderr, "msgs->client is now [%s]\n", msgs->info->client );
624    }
625#endif
626
627    /* get peer's listening port */
628    sub = tr_bencDictFind( &val, "p" );
629    if( tr_bencIsInt( sub ) ) {
630        msgs->info->port = htons( (uint16_t)sub->val.i );
631        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
632    }
633
634    tr_bencFree( &val );
635    tr_free( tmp );
636}
637
638static void
639parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
640{
641    benc_val_t val, * sub;
642    uint8_t * tmp;
643
644    if( msgs->torrent->pexDisabled ) /* no sharing! */
645        return;
646
647    tmp = tr_new( uint8_t, msglen );
648    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
649
650    if( tr_bencLoad( tmp, msglen, &val, NULL ) || !tr_bencIsDict( &val ) ) {
651        dbgmsg( msgs, "GET can't read extended-pex dictionary" );
652        tr_free( tmp );
653        return;
654    }
655
656    sub = tr_bencDictFind( &val, "added" );
657    if( tr_bencIsStr(sub) && ((sub->val.s.i % 6) == 0)) {
658        const int n = sub->val.s.i / 6 ;
659        dbgmsg( msgs, "got %d peers from uT pex", n );
660        tr_peerMgrAddPeers( msgs->handle->peerMgr,
661                            msgs->torrent->info.hash,
662                            TR_PEER_FROM_PEX,
663                            (uint8_t*)sub->val.s.s, n );
664    }
665
666    tr_bencFree( &val );
667    tr_free( tmp );
668}
669
670static void
671sendPex( tr_peermsgs * msgs );
672
673static void
674parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
675{
676    uint8_t ltep_msgid;
677
678    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
679    msglen--;
680
681    if( ltep_msgid == LTEP_HANDSHAKE )
682    {
683        dbgmsg( msgs, "got ltep handshake" );
684        parseLtepHandshake( msgs, msglen, inbuf );
685        sendLtepHandshake( msgs );
686        sendPex( msgs );
687    }
688    else if( ltep_msgid == msgs->ut_pex_id )
689    {
690        dbgmsg( msgs, "got ut pex" );
691        msgs->peerSupportsPex = 1;
692        parseUtPex( msgs, msglen, inbuf );
693    }
694    else
695    {
696        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
697        evbuffer_drain( inbuf, msglen );
698    }
699}
700
701static int
702readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf )
703{
704    uint32_t len;
705    const size_t needlen = sizeof(uint32_t);
706
707    if( EVBUFFER_LENGTH(inbuf) < needlen )
708        return READ_MORE;
709
710    tr_peerIoReadUint32( msgs->io, inbuf, &len );
711
712    if( len == 0 ) /* peer sent us a keepalive message */
713        dbgmsg( msgs, "peer sent us a keepalive message..." );
714    else {
715        dbgmsg( msgs, "peer is sending us a message with %"PRIu64" bytes...", (uint64_t)len );
716        msgs->incomingMessageLength = len;
717        msgs->state = AWAITING_BT_MESSAGE;
718    }
719
720    return READ_AGAIN;
721}
722
723static int
724readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf )
725{
726    uint8_t id;
727    uint32_t ui32;
728    uint32_t msglen = msgs->incomingMessageLength;
729
730    if( EVBUFFER_LENGTH(inbuf) < msglen )
731        return READ_MORE;
732
733    tr_peerIoReadUint8( msgs->io, inbuf, &id );
734    msglen--;
735    dbgmsg( msgs, "peer sent us a message... "
736                  "bt id number is %d, and remaining len is %d", (int)id, (int)msglen );
737
738    switch( id )
739    {
740        case BT_CHOKE:
741            dbgmsg( msgs, "w00t peer sent us a BT_CHOKE" );
742            assert( msglen == 0 );
743            msgs->info->clientIsChoked = 1;
744           
745            tr_list * walk;
746            for( walk = msgs->peerAskedFor; walk != NULL; )
747            {
748                tr_list * next = walk->next;
749                /* We shouldn't reject a peer's fast allowed requests at choke */
750                struct peer_request *req = walk->data;
751                if ( !tr_bitfieldHas( msgs->peerAllowedPieces, req->index ) )
752                {
753                    tr_list_remove_data( &msgs->peerAskedFor, req );
754                    tr_free( req );
755                }
756                walk = next;
757            }
758            tr_list_free( &msgs->clientAskedFor, tr_free );
759            break;
760
761        case BT_UNCHOKE:
762            dbgmsg( msgs, "w00t peer sent us a BT_UNCHOKE" );
763            assert( msglen == 0 );
764            msgs->info->clientIsChoked = 0;
765            fireNeedReq( msgs );
766            break;
767
768        case BT_INTERESTED:
769            dbgmsg( msgs, "w00t peer sent us a BT_INTERESTED" );
770            assert( msglen == 0 );
771            msgs->info->peerIsInterested = 1;
772            tr_peerMsgsSetChoke( msgs, 0 );
773            break;
774
775        case BT_NOT_INTERESTED:
776            dbgmsg( msgs, "w00t peer sent us a BT_NOT_INTERESTED" );
777            assert( msglen == 0 );
778            msgs->info->peerIsInterested = 0;
779            break;
780
781        case BT_HAVE:
782            dbgmsg( msgs, "w00t peer sent us a BT_HAVE" );
783            assert( msglen == 4 );
784            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
785            tr_bitfieldAdd( msgs->info->have, ui32 );
786            msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
787            dbgmsg( msgs, "after the HAVE message, peer progress is %f", msgs->info->progress );
788            updateInterest( msgs );
789            firePeerProgress( msgs );
790            break;
791
792        case BT_BITFIELD:
793            dbgmsg( msgs, "w00t peer sent us a BT_BITFIELD" );
794            assert( msglen == msgs->info->have->len );
795            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
796            msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
797            dbgmsg( msgs, "after the HAVE message, peer progress is %f", msgs->info->progress );
798            updateInterest( msgs );
799            fireNeedReq( msgs );
800            firePeerProgress( msgs );
801            break;
802
803        case BT_REQUEST: {
804            struct peer_request * req;
805            dbgmsg( msgs, "peer sent us a BT_REQUEST" );
806            assert( msglen == 12 );
807            req = tr_new( struct peer_request, 1 );
808            tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
809            tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
810            tr_peerIoReadUint32( msgs->io, inbuf, &req->length );
811           
812            if ( !requestIsValid( msgs, req ) )
813            {
814                dbgmsg( msgs, "BT_REQUEST: invalid request, ignoring" );
815                tr_free( req );
816                break;
817            }
818            /*
819                If we're not choking him -> continue
820                If we're choking him
821                    it doesn't support FPE -> He's deaf, reCHOKE and bail...
822                    it support FPE
823                        If the asked piece is not allowed
824                            OR he's above our threshold
825                            OR we don't have the requested piece -> Reject
826                        Else
827                        Asked piece allowed AND he's below our threshold -> continue...
828             */
829   
830
831            if ( msgs->info->peerIsChoked )
832            {
833                if ( !tr_peerIoSupportsFEXT( msgs->io ) )
834                {
835                    dbgmsg( msgs, "BT_REQUEST: peer is choked, ignoring" );
836                    /* Didn't he get it? */
837                    tr_peerMsgsSetChoke( msgs, 1 );
838                    tr_free( req );
839                    break;
840                }
841                else
842                {
843                    if ( !tr_bitfieldHas( msgs->peerAllowedPieces, req->index )
844                         || ( msgs->info->progress * (float)msgs->torrent->info.pieceCount) >= MAX_ALLOWED_SET_COUNT
845                         || !tr_cpPieceIsComplete( msgs->torrent->completion, req->index ) )
846                    {
847                        dbgmsg( msgs, "BT_REQUEST: peer requests an un-fastallowed piece" );
848                        sendFastReject( msgs, req->index, req->offset, req->length );
849                        tr_free( req );
850                        break;
851                    }
852                    dbgmsg( msgs, "BT_REQUEST: fast allowed piece, accepting request" );
853                }   
854            }
855           
856            tr_list_append( &msgs->peerAskedFor, req );
857            break;
858        }
859
860        case BT_CANCEL: {
861            struct peer_request req;
862            void * data;
863            dbgmsg( msgs, "peer sent us a BT_CANCEL" );
864            assert( msglen == 12 );
865            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
866            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
867            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
868            data = tr_list_remove( &msgs->peerAskedFor, &req, peer_request_compare );
869            tr_free( data );
870            break;
871        }
872
873        case BT_PIECE: {
874            dbgmsg( msgs, "peer sent us a BT_PIECE" );
875            assert( msgs->blockToUs.length == 0 );
876            tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.index );
877            tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.offset );
878            msgs->blockToUs.length = msglen - 8;
879            assert( EVBUFFER_LENGTH(msgs->inBlock) == 0 );
880            //evbuffer_drain( msgs->inBlock, ~0 );
881            msgs->state = msgs->blockToUs.length ? READING_BT_PIECE : AWAITING_BT_LENGTH;
882            return READ_AGAIN;
883            break;
884        }
885
886        case BT_PORT: {
887            dbgmsg( msgs, "peer sent us a BT_PORT" );
888            assert( msglen == 2 );
889            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
890            break;
891        }
892       
893        case BT_SUGGEST: {
894            /* tiennou TODO */
895            break;
896        }
897           
898        case BT_HAVE_ALL: {
899            assert( msglen == 0 );
900            dbgmsg( msgs, "peer sent us a BT_HAVE_ALL" );
901            memset( msgs->info->have->bits, 1, msgs->info->have->len );
902            msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
903            dbgmsg( msgs, "after the HAVE_ALL message, peer progress is %f", msgs->info->progress );
904            updateInterest( msgs );
905            firePeerProgress( msgs );
906            break;
907        }
908           
909        case BT_HAVE_NONE: {
910            assert( msglen == 0 );
911            dbgmsg( msgs, "peer sent us a BT_HAVE_NONE" );
912            memset( msgs->info->have->bits, 1, msgs->info->have->len );
913            msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
914            dbgmsg( msgs, "after the HAVE_NONE message, peer progress is %f", msgs->info->progress );
915            updateInterest( msgs );
916            firePeerProgress( msgs );
917            break;
918        }
919           
920        case BT_REJECT: {
921            struct peer_request req;
922            tr_list * node;
923            assert( msglen == 12 );
924            dbgmsg( msgs, "peer sent us a BT_REJECT" );
925            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
926            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
927            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
928            node = tr_list_find( msgs->peerAskedFor, &req, peer_request_compare );
929            if( node != NULL ) {
930                void * data = node->data;
931                tr_list_remove_data( &msgs->peerAskedFor, data );
932                tr_free( data );
933                dbgmsg( msgs, "found the req that peer has rejected... cancelled." );
934            }
935            break;
936        }
937           
938        case BT_ALLOWED_FAST: {
939            assert( msglen == 4 );
940            dbgmsg( msgs, "peer sent us a BT_ALLOWED_FAST" );
941            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
942            tr_bitfieldAdd( msgs->clientAllowedPieces, ui32 );
943            break;
944        }
945           
946        case BT_LTEP:
947            dbgmsg( msgs, "peer sent us a BT_LTEP" );
948            parseLtep( msgs, msglen, inbuf );
949            break;
950
951        default:
952            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
953            tr_peerIoDrain( msgs->io, inbuf, msglen );
954            assert( 0 );
955    }
956
957    msgs->incomingMessageLength = -1;
958    msgs->state = AWAITING_BT_LENGTH;
959    return READ_AGAIN;
960}
961
962static void
963clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
964{
965    tr_torrent * tor = msgs->torrent;
966    tor->activityDate = tr_date( );
967    tor->downloadedCur += byteCount;
968    tr_rcTransferred( tor->download, byteCount );
969    tr_rcTransferred( tor->handle->download, byteCount );
970}
971
972static void
973peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
974{
975    tr_torrent * tor = msgs->torrent;
976    tor->activityDate = tr_date( );
977    tor->uploadedCur += byteCount;
978    tr_rcTransferred( tor->upload, byteCount );
979    tr_rcTransferred( tor->handle->upload, byteCount );
980}
981
982static int
983canDownload( const tr_peermsgs * msgs )
984{
985    tr_torrent * tor = msgs->torrent;
986
987    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
988        return !tor->handle->useDownloadLimit || tr_rcCanTransfer( tor->handle->download );
989
990    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
991        return tr_rcCanTransfer( tor->download );
992
993    return TRUE;
994}
995
996static void
997reassignBytesToCorrupt( tr_peermsgs * msgs, uint32_t byteCount )
998{
999    tr_torrent * tor = msgs->torrent;
1000
1001    /* increment the `corrupt' field */
1002    tor->corruptCur += byteCount;
1003
1004    /* decrement the `downloaded' field */
1005    if( tor->downloadedCur >= byteCount )
1006        tor->downloadedCur -= byteCount;
1007    else
1008        tor->downloadedCur = 0;
1009}
1010
1011
1012static void
1013gotBadPiece( tr_peermsgs * msgs, uint32_t pieceIndex )
1014{
1015    const uint32_t byteCount = tr_torPieceCountBytes( msgs->torrent, (int)pieceIndex );
1016    reassignBytesToCorrupt( msgs, byteCount );
1017}
1018
1019static void
1020gotUnwantedBlock( tr_peermsgs * msgs, uint32_t index UNUSED, uint32_t offset UNUSED, uint32_t length )
1021{
1022    reassignBytesToCorrupt( msgs, length );
1023}
1024
1025static void
1026addUsToBlamefield( tr_peermsgs * msgs, uint32_t index )
1027{
1028    if( !msgs->info->blame )
1029         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1030    tr_bitfieldAdd( msgs->info->blame, index );
1031}
1032
1033static void
1034gotBlock( tr_peermsgs      * msgs,
1035          struct evbuffer  * inbuf,
1036          uint32_t           index,
1037          uint32_t           offset,
1038          uint32_t           length )
1039{
1040    tr_torrent * tor = msgs->torrent;
1041    const int block = _tr_block( tor, index, offset );
1042    struct peer_request key, *req;
1043
1044    /**
1045    *** Remove the block from our `we asked for this' list
1046    **/
1047
1048    key.index = index;
1049    key.offset = offset;
1050    key.length = length;
1051    req = (struct peer_request*) tr_list_remove( &msgs->clientAskedFor, &key,
1052                                                 peer_request_compare );
1053    if( req == NULL ) {
1054        gotUnwantedBlock( msgs, index, offset, length );
1055        dbgmsg( msgs, "we didn't ask for this message..." );
1056        return;
1057    }
1058    dbgmsg( msgs, "w00t peer sent us a block.  turnaround time was %d seconds", 
1059                     (int)(time(NULL) - req->time_requested) );
1060    tr_free( req );
1061    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1062                  tr_list_size(msgs->clientAskedFor));
1063
1064    /**
1065    *** Error checks
1066    **/
1067
1068    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1069        dbgmsg( msgs, "have this block already..." );
1070        tr_dbg( "have this block already..." );
1071        gotUnwantedBlock( msgs, index, offset, length );
1072        return;
1073    }
1074
1075    if( (int)length != tr_torBlockCountBytes( tor, block ) ) {
1076        dbgmsg( msgs, "block is the wrong length..." );
1077        tr_dbg( "block is the wrong length..." );
1078        gotUnwantedBlock( msgs, index, offset, length );
1079        return;
1080    }
1081
1082    /**
1083    ***  Write the block
1084    **/
1085
1086    if( tr_ioWrite( tor, index, offset, length, EVBUFFER_DATA( inbuf ))) {
1087        return;
1088    }
1089
1090    tr_cpBlockAdd( tor->completion, block );
1091
1092    addUsToBlamefield( msgs, index );
1093
1094    fireGotBlock( msgs, index, offset, length );
1095    fireNeedReq( msgs );
1096
1097    /**
1098    ***  Handle if this was the last block in the piece
1099    **/
1100
1101    if( tr_cpPieceIsComplete( tor->completion, index ) )
1102    {
1103        if( tr_ioHash( tor, index ) )
1104        {
1105            gotBadPiece( msgs, index );
1106            return;
1107        }
1108
1109        fireClientHave( msgs, index );
1110    }
1111}
1112
1113
1114static ReadState
1115readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf )
1116{
1117    uint32_t inlen;
1118    uint8_t * tmp;
1119
1120    assert( msgs != NULL );
1121    assert( msgs->blockToUs.length > 0 );
1122    assert( inbuf != NULL );
1123    assert( EVBUFFER_LENGTH( inbuf ) > 0 );
1124
1125    /* read from the inbuf into our block buffer */
1126    inlen = MIN( EVBUFFER_LENGTH(inbuf), msgs->blockToUs.length );
1127    tmp = tr_new( uint8_t, inlen );
1128    tr_peerIoReadBytes( msgs->io, inbuf, tmp, inlen );
1129    evbuffer_add( msgs->inBlock, tmp, inlen );
1130
1131    /* update our tables accordingly */
1132    assert( inlen >= msgs->blockToUs.length );
1133    msgs->blockToUs.length -= inlen;
1134    msgs->info->peerSentPieceDataAt = time( NULL );
1135    clientGotBytes( msgs, inlen );
1136
1137    /* if this was the entire block, save it */
1138    if( !msgs->blockToUs.length )
1139    {
1140        dbgmsg( msgs, "w00t -- got block index %u, offset %u", msgs->blockToUs.index, msgs->blockToUs.offset );
1141        assert( (int)EVBUFFER_LENGTH( msgs->inBlock ) == tr_torBlockCountBytes( msgs->torrent, _tr_block(msgs->torrent,msgs->blockToUs.index, msgs->blockToUs.offset) ) );
1142        gotBlock( msgs, msgs->inBlock,
1143                        msgs->blockToUs.index,
1144                        msgs->blockToUs.offset,
1145                        EVBUFFER_LENGTH( msgs->inBlock ) );
1146        evbuffer_drain( msgs->inBlock, ~0 );
1147        msgs->state = AWAITING_BT_LENGTH;
1148    }
1149
1150    /* cleanup */
1151    tr_free( tmp );
1152    return READ_AGAIN;
1153}
1154
1155static ReadState
1156canRead( struct bufferevent * evin, void * vmsgs )
1157{
1158    ReadState ret;
1159    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1160    struct evbuffer * inbuf = EVBUFFER_INPUT ( evin );
1161
1162    if( !canDownload( msgs ) )
1163    {
1164        msgs->notListening = 1;
1165        tr_peerIoSetIOMode ( msgs->io, 0, EV_READ );
1166        ret = READ_DONE;
1167    }
1168    else switch( msgs->state )
1169    {
1170        case AWAITING_BT_LENGTH:  ret = readBtLength  ( msgs, inbuf ); break;
1171        case AWAITING_BT_MESSAGE: ret = readBtMessage ( msgs, inbuf ); break;
1172        case READING_BT_PIECE:    ret = readBtPiece   ( msgs, inbuf ); break;
1173        default: assert( 0 );
1174    }
1175
1176    return ret;
1177}
1178
1179static void
1180sendKeepalive( tr_peermsgs * msgs )
1181{
1182    dbgmsg( msgs, "sending a keepalive message" );
1183    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1184}
1185
1186/**
1187***
1188**/
1189
1190static int
1191canWrite( const tr_peermsgs * msgs )
1192{
1193    /* don't let our outbuffer get too large */
1194    if( tr_peerIoWriteBytesWaiting( msgs->io ) > 8192 )
1195        return FALSE;
1196
1197    return TRUE;
1198}
1199
1200static int
1201canUpload( const tr_peermsgs * msgs )
1202{
1203    const tr_torrent * tor = msgs->torrent;
1204
1205    if( !canWrite( msgs ) )
1206        return FALSE;
1207
1208    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1209        return !tor->handle->useUploadLimit || tr_rcCanTransfer( tor->handle->upload );
1210
1211    if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
1212        return tr_rcCanTransfer( tor->upload );
1213
1214    return TRUE;
1215}
1216
1217static int
1218pulse( void * vmsgs )
1219{
1220    const time_t now = time( NULL );
1221    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1222    size_t len;
1223
1224    /* if we froze out a downloaded block because of speed limits,
1225       start listening to the peer again */
1226    if( msgs->notListening && canDownload( msgs ) )
1227    {
1228        msgs->notListening = 0;
1229        tr_peerIoSetIOMode ( msgs->io, EV_READ, 0 );
1230    }
1231
1232    if( !canWrite( msgs ) )
1233    {
1234    }
1235#if 0
1236    else if(( len = EVBUFFER_LENGTH( msgs->outBlock ) ))
1237    {
1238        while ( len && canUpload( msgs ) )
1239        {
1240            const size_t outlen = len; //MIN( len, 2048 );
1241            tr_peerIoWrite( msgs->io, EVBUFFER_DATA(msgs->outBlock), outlen );
1242            evbuffer_drain( msgs->outBlock, outlen );
1243            peerGotBytes( msgs, outlen );
1244            len -= outlen;
1245            msgs->info->clientSentPieceDataAt = now;
1246            msgs->clientSentAnythingAt = now;
1247            dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen, (int)len );
1248        }
1249    }
1250#endif
1251    else if(( len = EVBUFFER_LENGTH( msgs->outMessages ) ))
1252    {
1253        tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1254        msgs->clientSentAnythingAt = now;
1255    }
1256    else if(( msgs->peerAskedFor ))
1257    {
1258        if( canUpload( msgs ) )
1259        {
1260            struct peer_request * req = tr_list_pop_front( &msgs->peerAskedFor );
1261            uint8_t * tmp = tr_new( uint8_t, req->length );
1262            const uint32_t msglen = sizeof(uint8_t) + 2*sizeof(uint32_t) + req->length;
1263            struct evbuffer * out = evbuffer_new( );
1264            assert( requestIsValid( msgs, req ) );
1265
1266            tr_peerIoWriteUint32( msgs->io, out, msglen );
1267            tr_peerIoWriteUint8 ( msgs->io, out, BT_PIECE );
1268            tr_peerIoWriteUint32( msgs->io, out, req->index );
1269            tr_peerIoWriteUint32( msgs->io, out, req->offset );
1270            tr_peerIoWriteBuf( msgs->io, out );
1271
1272            tr_ioRead( msgs->torrent, req->index, req->offset, req->length, tmp );
1273            tr_peerIoWrite( msgs->io, tmp, req->length );
1274            peerGotBytes( msgs, req->length );
1275
1276            dbgmsg( msgs, "putting req into out queue: index %d, offset %d, length %d ... %d blocks left in our queue", (int)req->index, (int)req->offset, (int)req->length, tr_list_size(msgs->peerAskedFor) );
1277
1278            tr_free( req );
1279            tr_free( tmp );
1280            evbuffer_free( out );
1281        }
1282    }
1283    else if( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
1284    {
1285        sendKeepalive( msgs );
1286    }
1287
1288    return TRUE; /* loop forever */
1289}
1290
1291static void
1292didWrite( struct bufferevent * evin UNUSED, void * vpeer )
1293{
1294    pulse( (tr_peermsgs *) vpeer );
1295}
1296
1297static void
1298gotError( struct bufferevent * evbuf UNUSED, short what UNUSED, void * vpeer )
1299{
1300    fireGotError( vpeer );
1301}
1302
1303static void
1304sendBitfield( tr_peermsgs * msgs )
1305{
1306    const tr_bitfield * bitfield = tr_cpPieceBitfield( msgs->torrent->completion );
1307
1308    dbgmsg( msgs, "sending peer a bitfield message" );
1309    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + bitfield->len );
1310    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_BITFIELD );
1311    tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, bitfield->bits, bitfield->len );
1312}
1313
1314/**
1315***
1316**/
1317
1318/* some peers give us error messages if we send
1319   more than this many peers in a single pex message */
1320#define MAX_PEX_DIFFS 200
1321
1322typedef struct
1323{
1324    tr_pex * added;
1325    tr_pex * dropped;
1326    tr_pex * elements;
1327    int addedCount;
1328    int droppedCount;
1329    int elementCount;
1330    int diffCount;
1331}
1332PexDiffs;
1333
1334static void
1335pexAddedCb( void * vpex, void * userData )
1336{
1337    PexDiffs * diffs = (PexDiffs *) userData;
1338    tr_pex * pex = (tr_pex *) vpex;
1339    if( diffs->diffCount < MAX_PEX_DIFFS )
1340    {
1341        diffs->diffCount++;
1342        diffs->added[diffs->addedCount++] = *pex;
1343        diffs->elements[diffs->elementCount++] = *pex;
1344    }
1345}
1346
1347static void
1348pexRemovedCb( void * vpex, void * userData )
1349{
1350    PexDiffs * diffs = (PexDiffs *) userData;
1351    tr_pex * pex = (tr_pex *) vpex;
1352    if( diffs->diffCount < MAX_PEX_DIFFS )
1353    {
1354        diffs->diffCount++;
1355        diffs->dropped[diffs->droppedCount++] = *pex;
1356    }
1357}
1358
1359static void
1360pexElementCb( void * vpex, void * userData )
1361{
1362    PexDiffs * diffs = (PexDiffs *) userData;
1363    tr_pex * pex = (tr_pex *) vpex;
1364    if( diffs->diffCount < MAX_PEX_DIFFS )
1365    {
1366        diffs->diffCount++;
1367        diffs->elements[diffs->elementCount++] = *pex;
1368    }
1369}
1370
1371static void
1372sendPex( tr_peermsgs * msgs )
1373{
1374    if( msgs->peerSupportsPex && !msgs->torrent->pexDisabled )
1375    {
1376        int i;
1377        tr_pex * newPex = NULL;
1378        const int newCount = tr_peerMgrGetPeers( msgs->handle->peerMgr, msgs->torrent->info.hash, &newPex );
1379        PexDiffs diffs;
1380        benc_val_t val, *added, *dropped, *flags;
1381        uint8_t *tmp, *walk;
1382        char * benc;
1383        int bencLen;
1384
1385        /* build the diffs */
1386        diffs.added = tr_new( tr_pex, newCount );
1387        diffs.addedCount = 0;
1388        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1389        diffs.droppedCount = 0;
1390        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1391        diffs.elementCount = 0;
1392        diffs.diffCount = 0;
1393        tr_set_compare( msgs->pex, msgs->pexCount,
1394                        newPex, newCount,
1395                        tr_pexCompare, sizeof(tr_pex),
1396                        pexRemovedCb, pexAddedCb, pexElementCb, &diffs );
1397        dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1398
1399        /* update peer */
1400        tr_free( msgs->pex );
1401        msgs->pex = diffs.elements;
1402        msgs->pexCount = diffs.elementCount;
1403
1404        /* build the pex payload */
1405        tr_bencInit( &val, TYPE_DICT );
1406        tr_bencDictReserve( &val, 3 );
1407
1408        /* "added" */
1409        added = tr_bencDictAdd( &val, "added" );
1410        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1411        for( i=0; i<diffs.addedCount; ++i ) {
1412            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1413            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1414        }
1415        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1416        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
1417
1418        /* "added.f" */
1419        flags = tr_bencDictAdd( &val, "added.f" );
1420        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1421        for( i=0; i<diffs.addedCount; ++i ) {
1422            dbgmsg( msgs, "PEX -->> -->> flag is %d", (int)diffs.added[i].flags );
1423            *walk++ = diffs.added[i].flags;
1424        }
1425        assert( ( walk - tmp ) == diffs.addedCount );
1426        tr_bencInitStr( flags, tmp, walk-tmp, FALSE );
1427
1428        /* "dropped" */
1429        dropped = tr_bencDictAdd( &val, "dropped" );
1430        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1431        for( i=0; i<diffs.droppedCount; ++i ) {
1432            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1433            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1434        }
1435        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1436        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
1437
1438        /* write the pex message */
1439        benc = tr_bencSaveMalloc( &val, &bencLen );
1440        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 2*sizeof(uint8_t) + bencLen );
1441        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_LTEP );
1442        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, OUR_LTEP_PEX );
1443        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, benc, bencLen );
1444
1445        /* cleanup */
1446        tr_free( benc );
1447        tr_bencFree( &val );
1448        tr_free( diffs.added );
1449        tr_free( diffs.dropped );
1450        tr_free( newPex );
1451
1452        msgs->clientSentPexAt = time( NULL );
1453    }
1454}
1455
1456static int
1457pexPulse( void * vpeer )
1458{
1459    sendPex( vpeer );
1460    return TRUE;
1461}
1462
1463/**
1464***
1465**/
1466
1467tr_peermsgs*
1468tr_peerMsgsNew( struct tr_torrent * torrent, struct tr_peer * info )
1469{
1470    tr_peermsgs * msgs;
1471
1472    assert( info != NULL );
1473    assert( info->io != NULL );
1474
1475    msgs = tr_new0( tr_peermsgs, 1 );
1476    msgs->publisher = tr_publisherNew( );
1477    msgs->info = info;
1478    msgs->handle = torrent->handle;
1479    msgs->torrent = torrent;
1480    msgs->io = info->io;
1481    msgs->info->clientIsChoked = 1;
1482    msgs->info->peerIsChoked = 1;
1483    msgs->info->clientIsInterested = 0;
1484    msgs->info->peerIsInterested = 0;
1485    msgs->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1486    msgs->pulseTimer = tr_timerNew( msgs->handle, pulse, msgs, PEER_PULSE_INTERVAL );
1487    msgs->pexTimer = tr_timerNew( msgs->handle, pexPulse, msgs, PEX_INTERVAL );
1488    msgs->outMessages = evbuffer_new( );
1489    msgs->outBlock = evbuffer_new( );
1490    msgs->inBlock = evbuffer_new( );
1491    msgs->peerAllowedPieces = NULL;
1492    msgs->clientAllowedPieces = NULL;
1493   
1494    if ( tr_peerIoSupportsFEXT( msgs->io ) )
1495    {
1496        /* This peer is fastpeer-enabled, generate its allowed set
1497         * (before registering our callbacks) */
1498        if ( !msgs->peerAllowedPieces ) {
1499            const struct in_addr *peerAddr = tr_peerIoGetAddress( msgs->io, NULL );
1500           
1501            msgs->peerAllowedPieces = tr_peerMgrGenerateAllowedSet( MAX_ALLOWED_SET_COUNT,
1502                                                                    msgs->torrent->info.pieceCount,
1503                                                                    msgs->torrent->info.hash,
1504                                                                    peerAddr );
1505        }
1506        msgs->clientAllowedPieces = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1507    }
1508   
1509    tr_peerIoSetIOFuncs( msgs->io, canRead, didWrite, gotError, msgs );
1510    tr_peerIoSetIOMode( msgs->io, EV_READ|EV_WRITE, 0 );
1511
1512    /**
1513    ***  If we initiated this connection,
1514    ***  we may need to send LTEP/AZMP handshakes.
1515    ***  Otherwise we'll wait for the peer to send theirs first.
1516    **/
1517    if( !tr_peerIoIsIncoming( msgs->io ) )
1518    {
1519        if ( tr_peerIoSupportsLTEP( msgs->io ) ) {
1520            sendLtepHandshake( msgs );
1521           
1522        } else if ( tr_peerIoSupportsAZMP( msgs->io ) ) {
1523            dbgmsg( msgs, "FIXME: need to send AZMP handshake" );
1524           
1525        } else {
1526            /* no-op */
1527        }
1528    }
1529   
1530    if ( tr_peerIoSupportsFEXT( msgs->io ) )
1531    {
1532        /* This peer is fastpeer-enabled, send it have-all or have-none if appropriate */
1533        float completion = tr_cpPercentComplete( msgs->torrent->completion );
1534        if ( completion == 0.0f ) {
1535            sendFastHave( msgs, 0 );
1536        } else if ( completion == 1.0f ) {
1537            sendFastHave( msgs, 1 );
1538        } else {
1539            sendBitfield( msgs );
1540        }
1541        uint32_t peerProgress = msgs->torrent->info.pieceCount * msgs->info->progress;
1542       
1543        if ( peerProgress < MAX_ALLOWED_SET_COUNT )
1544            sendFastAllowedSet( msgs );
1545    } else {
1546        sendBitfield( msgs );
1547    }
1548    return msgs;
1549}
1550
1551void
1552tr_peerMsgsFree( tr_peermsgs* msgs )
1553{
1554    if( msgs != NULL )
1555    {
1556        tr_timerFree( &msgs->pulseTimer );
1557        tr_timerFree( &msgs->pexTimer );
1558        tr_publisherFree( &msgs->publisher );
1559        tr_list_free( &msgs->clientAskedFor, tr_free );
1560        tr_list_free( &msgs->peerAskedFor, tr_free );
1561        evbuffer_free( msgs->outMessages );
1562        evbuffer_free( msgs->outBlock );
1563        evbuffer_free( msgs->inBlock );
1564        tr_free( msgs->pex );
1565        msgs->pexCount = 0;
1566        tr_free( msgs );
1567    }
1568}
1569
1570tr_publisher_tag
1571tr_peerMsgsSubscribe( tr_peermsgs       * peer,
1572                      tr_delivery_func    func,
1573                      void              * userData )
1574{
1575    return tr_publisherSubscribe( peer->publisher, func, userData );
1576}
1577
1578void
1579tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
1580                        tr_publisher_tag    tag )
1581{
1582    tr_publisherUnsubscribe( peer->publisher, tag );
1583}
Note: See TracBrowser for help on using the repository browser.