source: trunk/libtransmission/peer-msgs.c @ 3367

Last change on this file since 3367 was 3367, checked in by charles, 15 years ago

fix the "swarm speed" bug reported by Gimp_

  • Property svn:keywords set to Date Rev Author Id
File size: 50.3 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 3367 2007-10-11 04:17:28Z charles $
11 */
12
13#include <assert.h>
14#include <ctype.h>
15#include <errno.h>
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <arpa/inet.h>
21
22#include <sys/types.h> /* event.h needs this */
23#include <event.h>
24
25#include "transmission.h"
26#include "bencode.h"
27#include "completion.h"
28#include "inout.h"
29#include "list.h"
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-mgr-private.h"
33#include "peer-msgs.h"
34#include "ratecontrol.h"
35#include "trevent.h"
36#include "utils.h"
37
38/**
39***
40**/
41
42#define MAX_ALLOWED_SET_COUNT   10 /* number of pieces generated for allow-fast,
43                                    threshold for fast-allowing others */
44
45enum
46{
47    BT_CHOKE                = 0,
48    BT_UNCHOKE              = 1,
49    BT_INTERESTED           = 2,
50    BT_NOT_INTERESTED       = 3,
51    BT_HAVE                 = 4,
52    BT_BITFIELD             = 5,
53    BT_REQUEST              = 6,
54    BT_PIECE                = 7,
55    BT_CANCEL               = 8,
56    BT_PORT                 = 9,
57    BT_SUGGEST              = 13,
58    BT_HAVE_ALL             = 14,
59    BT_HAVE_NONE            = 15,
60    BT_REJECT               = 16,
61    BT_ALLOWED_FAST         = 17,
62    BT_LTEP                 = 20,
63
64    LTEP_HANDSHAKE          = 0,
65
66    OUR_LTEP_PEX            = 1,
67
68    MAX_REQUEST_BYTE_COUNT  = (16 * 1024), /* drop requests who want too much */
69
70    KEEPALIVE_INTERVAL_SECS = 90,          /* idle seconds before we send a keepalive */
71    PEX_INTERVAL            = (60 * 1000), /* msec between calls to sendPex() */
72    PEER_PULSE_INTERVAL     = (66),        /* msec between calls to pulse() */
73    RATE_PULSE_INTERVAL     = (333),       /* msec between calls to ratePulse() */
74};
75
76enum
77{
78    AWAITING_BT_LENGTH,
79    AWAITING_BT_MESSAGE,
80    READING_BT_PIECE
81};
82
83struct peer_request
84{
85    uint32_t index;
86    uint32_t offset;
87    uint32_t length;
88    time_t time_requested;
89};
90
91static int
92compareRequest( const void * va, const void * vb )
93{
94    struct peer_request * a = (struct peer_request*) va;
95    struct peer_request * b = (struct peer_request*) vb;
96    if( a->index != b->index ) return a->index - b->index;
97    if( a->offset != b->offset ) return a->offset - b->offset;
98    if( a->length != b->length ) return a->length - b->length;
99    return 0;
100}
101
102struct tr_peermsgs
103{
104    tr_peer * info;
105
106    tr_handle * handle;
107    tr_torrent * torrent;
108    tr_peerIo * io;
109
110    tr_publisher_t * publisher;
111
112    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
113    struct evbuffer * inBlock;     /* the block we're currently receiving */
114    tr_list * peerAskedFor;
115    tr_list * clientAskedFor;
116    tr_list * clientWillAskFor;
117
118    tr_timer * rateTimer;
119    tr_timer * pulseTimer;
120    tr_timer * pexTimer;
121
122    struct peer_request blockToUs; /* the block currntly being sent to us */
123
124    time_t lastReqAddedAt;
125    time_t clientSentPexAt;
126    time_t clientSentAnythingAt;
127
128    unsigned int notListening             : 1;
129    unsigned int peerSupportsPex          : 1;
130    unsigned int clientSentLtepHandshake  : 1;
131    unsigned int peerSentLtepHandshake    : 1;
132   
133    tr_bitfield * clientAllowedPieces;
134    tr_bitfield * peerAllowedPieces;
135   
136    uint8_t state;
137    uint8_t ut_pex_id;
138    uint16_t pexCount;
139    uint32_t incomingMessageLength;
140    uint32_t maxActiveRequests;
141    uint32_t minActiveRequests;
142
143    tr_pex * pex;
144};
145
146/**
147***
148**/
149
150static void
151myDebug( const char * file, int line,
152         const struct tr_peermsgs * msgs,
153         const char * fmt, ... )
154{
155    FILE * fp = tr_getLog( );
156    if( fp != NULL )
157    {
158        va_list args;
159        struct evbuffer * buf = evbuffer_new( );
160        char timestr[64];
161        evbuffer_add_printf( buf, "[%s] %s [%s]: ",
162                             tr_getLogTimeStr( timestr, sizeof(timestr) ),
163                             tr_peerIoGetAddrStr( msgs->io ),
164                             msgs->info->client );
165        va_start( args, fmt );
166        evbuffer_add_vprintf( buf, fmt, args );
167        va_end( args );
168        evbuffer_add_printf( buf, " (%s:%d)\n", file, line );
169
170        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
171        evbuffer_free( buf );
172    }
173}
174
175#define dbgmsg(msgs, fmt...) myDebug(__FILE__, __LINE__, msgs, ##fmt )
176
177/**
178***
179**/
180
181static void
182protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
183{
184    tr_peerIo * io = msgs->io;
185    struct evbuffer * out = msgs->outMessages;
186
187    dbgmsg( msgs, "requesting %u:%u->%u", req->index, req->offset, req->length );
188    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
189    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
190    tr_peerIoWriteUint32( io, out, req->index );
191    tr_peerIoWriteUint32( io, out, req->offset );
192    tr_peerIoWriteUint32( io, out, req->length );
193}
194
195static void
196protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
197{
198    tr_peerIo * io = msgs->io;
199    struct evbuffer * out = msgs->outMessages;
200
201    dbgmsg( msgs, "cancelling %u:%u->%u", req->index, req->offset, req->length );
202    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
203    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
204    tr_peerIoWriteUint32( io, out, req->index );
205    tr_peerIoWriteUint32( io, out, req->offset );
206    tr_peerIoWriteUint32( io, out, req->length );
207}
208
209static void
210protocolSendHave( tr_peermsgs * msgs, uint32_t index )
211{
212    tr_peerIo * io = msgs->io;
213    struct evbuffer * out = msgs->outMessages;
214
215    dbgmsg( msgs, "sending Have %u", index );
216    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
217    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
218    tr_peerIoWriteUint32( io, out, index );
219}
220
221static void
222protocolSendChoke( tr_peermsgs * msgs, int choke )
223{
224    tr_peerIo * io = msgs->io;
225    struct evbuffer * out = msgs->outMessages;
226
227    dbgmsg( msgs, "sending %s", (choke ? "Choke" : "Unchoke") );
228    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) );
229    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
230}
231
232static void
233protocolSendPiece( tr_peermsgs                * msgs,
234                   const struct peer_request  * r,
235                   const uint8_t              * pieceData )
236{
237    tr_peerIo * io = msgs->io;
238    struct evbuffer * out = evbuffer_new( );
239
240    dbgmsg( msgs, "sending block %u:%u->%u", r->index, r->offset, r->length );
241    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 2*sizeof(uint32_t) + r->length );
242    tr_peerIoWriteUint8 ( io, out, BT_PIECE );
243    tr_peerIoWriteUint32( io, out, r->index );
244    tr_peerIoWriteUint32( io, out, r->offset );
245    tr_peerIoWriteBytes ( io, out, pieceData, r->length );
246    tr_peerIoWriteBuf   ( io, out );
247
248    evbuffer_free( out );
249}
250
251/**
252***  EVENTS
253**/
254
255static const tr_peermsgs_event blankEvent = { 0, 0, 0, 0, 0.0f };
256
257static void
258publish( tr_peermsgs * msgs, tr_peermsgs_event * e )
259{
260    tr_publisherPublish( msgs->publisher, msgs->info, e );
261}
262
263static void
264fireGotError( tr_peermsgs * msgs )
265{
266    tr_peermsgs_event e = blankEvent;
267    e.eventType = TR_PEERMSG_GOT_ERROR;
268    publish( msgs, &e );
269}
270
271static void
272fireNeedReq( tr_peermsgs * msgs )
273{
274    tr_peermsgs_event e = blankEvent;
275    e.eventType = TR_PEERMSG_NEED_REQ;
276    publish( msgs, &e );
277}
278
279static void
280firePeerProgress( tr_peermsgs * msgs )
281{
282    tr_peermsgs_event e = blankEvent;
283    e.eventType = TR_PEERMSG_PEER_PROGRESS;
284    e.progress = msgs->info->progress;
285    publish( msgs, &e );
286}
287
288static void
289fireClientHave( tr_peermsgs * msgs, uint32_t pieceIndex )
290{
291    tr_peermsgs_event e = blankEvent;
292    e.eventType = TR_PEERMSG_CLIENT_HAVE;
293    e.pieceIndex = pieceIndex;
294    publish( msgs, &e );
295}
296
297static void
298fireGotBlock( tr_peermsgs * msgs, uint32_t pieceIndex, uint32_t offset, uint32_t length )
299{
300    tr_peermsgs_event e = blankEvent;
301    e.eventType = TR_PEERMSG_CLIENT_BLOCK;
302    e.pieceIndex = pieceIndex;
303    e.offset = offset;
304    e.length = length;
305    publish( msgs, &e );
306}
307
308static void
309fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
310{
311    tr_peermsgs_event e = blankEvent;
312    e.eventType = TR_PEERMSG_CANCEL;
313    e.pieceIndex = req->index;
314    e.offset = req->offset;
315    e.length = req->length;
316    publish( msgs, &e );
317}
318
319/**
320***  INTEREST
321**/
322
323static int
324isPieceInteresting( const tr_peermsgs   * peer,
325                    int                   piece )
326{
327    const tr_torrent * torrent = peer->torrent;
328    if( torrent->info.pieces[piece].dnd ) /* we don't want it */
329        return FALSE;
330    if( tr_cpPieceIsComplete( torrent->completion, piece ) ) /* we have it */
331        return FALSE;
332    if( !tr_bitfieldHas( peer->info->have, piece ) ) /* peer doesn't have it */
333        return FALSE;
334    if( tr_bitfieldHas( peer->info->banned, piece ) ) /* peer is banned */
335        return FALSE;
336    return TRUE;
337}
338
339/* "interested" means we'll ask for piece data if they unchoke us */
340static int
341isPeerInteresting( const tr_peermsgs * msgs )
342{
343    int i;
344    const tr_torrent * torrent;
345    const tr_bitfield * bitfield;
346    const int clientIsSeed =
347        tr_cpGetStatus( msgs->torrent->completion ) != TR_CP_INCOMPLETE;
348
349    if( clientIsSeed )
350        return FALSE;
351
352    torrent = msgs->torrent;
353    bitfield = tr_cpPieceBitfield( torrent->completion );
354
355    if( !msgs->info->have )
356        return TRUE;
357
358    assert( bitfield->len == msgs->info->have->len );
359    for( i=0; i<torrent->info.pieceCount; ++i )
360        if( isPieceInteresting( msgs, i ) )
361            return TRUE;
362
363    return FALSE;
364}
365
366static void
367sendInterest( tr_peermsgs * msgs, int weAreInterested )
368{
369    assert( msgs != NULL );
370    assert( weAreInterested==0 || weAreInterested==1 );
371
372    msgs->info->clientIsInterested = weAreInterested;
373    dbgmsg( msgs, "Sending %s",
374            weAreInterested ? "Interested" : "Not Interested");
375
376    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
377    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages,
378                   weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
379}
380
381static void
382updateInterest( tr_peermsgs * msgs )
383{
384    const int i = isPeerInteresting( msgs );
385    if( i != msgs->info->clientIsInterested )
386        sendInterest( msgs, i );
387    if( i )
388        fireNeedReq( msgs );
389}
390
391void
392tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
393{
394    assert( msgs != NULL );
395    assert( msgs->info != NULL );
396    assert( choke==0 || choke==1 );
397
398    if( msgs->info->peerIsChoked != choke )
399    {
400        msgs->info->peerIsChoked = choke;
401        tr_list * walk;
402       
403        if( choke )
404            for( walk = msgs->peerAskedFor; walk != NULL; )
405            {
406                tr_list * next = walk->next;
407                /* don't reject a peer's fast allowed requests at choke */
408                struct peer_request *req = walk->data;
409                if ( !tr_bitfieldHas( msgs->peerAllowedPieces, req->index ) )
410                {
411                    tr_list_remove_data( &msgs->peerAskedFor, req );
412                    tr_free( req );
413                }
414                walk = next;
415            }
416
417        protocolSendChoke( msgs, choke );
418        msgs->info->chokeChangedAt = time( NULL );
419    }
420}
421
422/**
423***
424**/
425
426void
427tr_peerMsgsHave( tr_peermsgs * msgs,
428                 uint32_t      index )
429{
430    protocolSendHave( msgs, index );
431
432    /* since we have more pieces now, we might not be interested in this peer */
433    updateInterest( msgs );
434}
435#if 0
436static void
437sendFastSuggest( tr_peermsgs * msgs,
438                 uint32_t      pieceIndex )
439{
440    dbgmsg( msgs, "w00t SUGGESTing them piece #%d", pieceIndex );
441    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
442    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
443    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
444   
445    updateInterest( msgs );
446}
447#endif
448static void
449sendFastHave( tr_peermsgs * msgs,
450              int           all)
451{
452    dbgmsg( msgs, "w00t telling them we %s pieces", (all ? "HAVE_ALL" : "HAVE_NONE" ) );
453    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
454    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, ( all ? BT_HAVE_ALL : BT_HAVE_NONE ) );
455   
456    updateInterest( msgs );
457}
458
459static void
460sendFastReject( tr_peermsgs * msgs,
461                uint32_t      pieceIndex,
462                uint32_t      offset,
463                uint32_t      length )
464{
465    assert( msgs != NULL );
466    assert( length > 0 );
467   
468    /* reject the request */
469    const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
470    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
471    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_REJECT );
472    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
473    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
474    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
475}
476
477static void
478sendFastAllowed( tr_peermsgs * msgs,
479                 uint32_t      pieceIndex)
480{
481    dbgmsg( msgs, "w00t telling them we ALLOW_FAST piece #%d", pieceIndex );
482    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
483    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_ALLOWED_FAST );
484    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
485}
486
487
488
489static void
490sendFastAllowedSet( tr_peermsgs * msgs )
491{
492    int i = 0;
493    while (i <= msgs->torrent->info.pieceCount )
494    {
495        if ( tr_bitfieldHas( msgs->peerAllowedPieces, i) )
496            sendFastAllowed( msgs, i );
497        i++;
498    }
499}
500
501
502/**
503***
504**/
505
506static int
507reqIsValid( const tr_peermsgs * msgs, uint32_t index, uint32_t offset, uint32_t length )
508{
509    const tr_torrent * tor = msgs->torrent;
510
511    if( index >= (uint32_t) tor->info.pieceCount )
512        return FALSE;
513    if ( (int)offset >= tr_torPieceCountBytes( tor, (int)index ) )
514        return FALSE;
515    if( length > MAX_REQUEST_BYTE_COUNT )
516        return FALSE;
517    if( tr_pieceOffset( tor, index, offset, length ) > tor->info.totalSize )
518        return FALSE;
519
520    return TRUE;
521}
522
523static int
524requestIsValid( const tr_peermsgs * msgs, struct peer_request * req )
525{
526    return reqIsValid( msgs, req->index, req->offset, req->length );
527}
528
529static void
530pumpRequestQueue( tr_peermsgs * msgs )
531{
532    const int max = msgs->maxActiveRequests;
533    const int min = msgs->minActiveRequests;
534    int count = tr_list_size( msgs->clientAskedFor );
535    int sent = 0;
536
537    if( count > min )
538        return;
539    if( msgs->info->clientIsChoked )
540        return;
541
542    while( ( count < max ) && ( msgs->clientWillAskFor != NULL ) )
543    {
544        struct peer_request * req = tr_list_pop_front( &msgs->clientWillAskFor );
545        protocolSendRequest( msgs, req );
546        req->time_requested = msgs->lastReqAddedAt = time( NULL );
547        tr_list_append( &msgs->clientAskedFor, req );
548        ++count;
549        ++sent;
550    }
551
552    if( sent )
553        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
554                sent,
555                tr_list_size(msgs->clientAskedFor),
556                tr_list_size(msgs->clientWillAskFor) );
557
558    if( count < max )
559        fireNeedReq( msgs );
560}
561
562int
563tr_peerMsgsAddRequest( tr_peermsgs * msgs,
564                       uint32_t      index, 
565                       uint32_t      offset, 
566                       uint32_t      length )
567{
568    const int req_max = msgs->maxActiveRequests;
569    struct peer_request tmp, *req;
570
571    assert( msgs != NULL );
572    assert( msgs->torrent != NULL );
573    assert( reqIsValid( msgs, index, offset, length ) );
574
575    /**
576    ***  Reasons to decline the request
577    **/
578
579    /* don't send requests to choked clients */
580    if( msgs->info->clientIsChoked ) {
581        dbgmsg( msgs, "declining request because they're choking us" );
582        return TR_ADDREQ_CLIENT_CHOKED;
583    }
584
585    /* peer doesn't have this piece */
586    if( !tr_bitfieldHas( msgs->info->have, index ) )
587        return TR_ADDREQ_MISSING;
588
589    /* peer's queue is full */
590    if( tr_list_size( msgs->clientWillAskFor ) >= req_max ) {
591        dbgmsg( msgs, "declining request because we're full" );
592        return TR_ADDREQ_FULL;
593    }
594
595    /* have we already asked for this piece? */
596    tmp.index = index;
597    tmp.offset = offset;
598    tmp.length = length;
599    if( tr_list_find( msgs->clientAskedFor, &tmp, compareRequest ) ) {
600        dbgmsg( msgs, "declining because it's a duplicate" );
601        return TR_ADDREQ_DUPLICATE;
602    }
603    if( tr_list_find( msgs->clientWillAskFor, &tmp, compareRequest ) ) {
604        dbgmsg( msgs, "declining because it's a duplicate" );
605        return TR_ADDREQ_DUPLICATE;
606    }
607
608    /**
609    ***  Accept this request
610    **/
611
612    dbgmsg( msgs, "added req for piece %d, offset %d", (int)index, (int)offset );
613    req = tr_new0( struct peer_request, 1 );
614    *req = tmp;
615    tr_list_append( &msgs->clientWillAskFor, req );
616    return TR_ADDREQ_OK;
617}
618
619static void
620tr_peerMsgsCancelAllRequests( tr_peermsgs * msgs )
621{
622    struct peer_request * req;
623
624    while(( req = tr_list_pop_front( &msgs->clientWillAskFor ) ))
625    {
626        fireCancelledReq( msgs, req );
627        tr_free( req );
628    }
629
630    while(( req = tr_list_pop_front( &msgs->clientAskedFor ) ))
631    {
632        fireCancelledReq( msgs, req );
633        protocolSendCancel( msgs, req );
634        tr_free( req );
635    }
636}
637
638void
639tr_peerMsgsCancel( tr_peermsgs * msgs,
640                   uint32_t      pieceIndex,
641                   uint32_t      offset,
642                   uint32_t      length )
643{
644    struct peer_request *req, tmp;
645
646    assert( msgs != NULL );
647    assert( length > 0 );
648
649    /* have we asked the peer for this piece? */
650    tmp.index = pieceIndex;
651    tmp.offset = offset;
652    tmp.length = length;
653
654    /* if it's only in the queue and hasn't been sent yet, free it */
655    if(( req = tr_list_remove( &msgs->clientWillAskFor, &tmp, compareRequest ) ))
656    {
657        fireCancelledReq( msgs, req );
658        tr_free( req );
659    }
660
661    /* if it's already been sent, send a cancel message too */
662    if(( req = tr_list_remove( &msgs->clientAskedFor, &tmp, compareRequest ) ))
663    {
664        protocolSendCancel( msgs, req );
665        fireCancelledReq( msgs, req );
666        tr_free( req );
667    }
668}
669
670/**
671***
672**/
673
674static void
675sendLtepHandshake( tr_peermsgs * msgs )
676{
677    benc_val_t val, *m;
678    char * buf;
679    int len;
680    int pex;
681    const char * v = TR_NAME " " USERAGENT_PREFIX;
682    const int port = tr_getPublicPort( msgs->handle );
683    struct evbuffer * outbuf;
684
685    if( msgs->clientSentLtepHandshake )
686        return;
687
688    outbuf = evbuffer_new( );
689    dbgmsg( msgs, "sending an ltep handshake" );
690    msgs->clientSentLtepHandshake = 1;
691
692    /* decide if we want to advertise pex support */
693    if( msgs->torrent->pexDisabled )
694        pex = 0;
695    else if( msgs->peerSentLtepHandshake )
696        pex = msgs->peerSupportsPex ? 1 : 0;
697    else
698        pex = 1;
699
700    tr_bencInit( &val, TYPE_DICT );
701    tr_bencDictReserve( &val, 4 );
702    tr_bencInitInt( tr_bencDictAdd( &val, "e" ), 1 );
703    m  = tr_bencDictAdd( &val, "m" );
704    tr_bencInit( m, TYPE_DICT );
705    if( pex ) {
706        tr_bencDictReserve( m, 1 );
707        tr_bencInitInt( tr_bencDictAdd( m, "ut_pex" ), OUR_LTEP_PEX );
708    }
709    if( port > 0 )
710        tr_bencInitInt( tr_bencDictAdd( &val, "p" ), port );
711    tr_bencInitStr( tr_bencDictAdd( &val, "v" ), v, 0, 1 );
712    buf = tr_bencSaveMalloc( &val,  &len );
713
714    tr_peerIoWriteUint32( msgs->io, outbuf, 2*sizeof(uint8_t) + len );
715    tr_peerIoWriteUint8 ( msgs->io, outbuf, BT_LTEP );
716    tr_peerIoWriteUint8 ( msgs->io, outbuf, LTEP_HANDSHAKE );
717    tr_peerIoWriteBytes ( msgs->io, outbuf, buf, len );
718
719    tr_peerIoWriteBuf( msgs->io, outbuf );
720
721#if 0
722    dbgmsg( msgs, "here is the ltep handshake we sent:" );
723    tr_bencPrint( &val );
724#endif
725
726    /* cleanup */
727    tr_bencFree( &val );
728    tr_free( buf );
729    evbuffer_free( outbuf );
730}
731
732static void
733parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
734{
735    benc_val_t val, * sub;
736    uint8_t * tmp = tr_new( uint8_t, len );
737
738    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
739    msgs->peerSentLtepHandshake = 1;
740
741    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
742        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
743        tr_free( tmp );
744        return;
745    }
746
747#if 0
748    dbgmsg( msgs, "here is the ltep handshake we read:" );
749    tr_bencPrint( &val );
750#endif
751
752    /* does the peer prefer encrypted connections? */
753    sub = tr_bencDictFind( &val, "e" );
754    if( tr_bencIsInt( sub ) )
755        msgs->info->encryption_preference = sub->val.i
756                                      ? ENCRYPTION_PREFERENCE_YES
757                                      : ENCRYPTION_PREFERENCE_NO;
758
759    /* check supported messages for utorrent pex */
760    sub = tr_bencDictFind( &val, "m" );
761    if( tr_bencIsDict( sub ) ) {
762        sub = tr_bencDictFind( sub, "ut_pex" );
763        if( tr_bencIsInt( sub ) ) {
764            msgs->peerSupportsPex = 1;
765            msgs->ut_pex_id = (uint8_t) sub->val.i;
766            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
767        }
768    }
769
770    /* get peer's listening port */
771    sub = tr_bencDictFind( &val, "p" );
772    if( tr_bencIsInt( sub ) ) {
773        msgs->info->port = htons( (uint16_t)sub->val.i );
774        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
775    }
776
777    tr_bencFree( &val );
778    tr_free( tmp );
779}
780
781static void
782parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
783{
784    benc_val_t val, * sub;
785    uint8_t * tmp;
786
787    if( msgs->torrent->pexDisabled ) /* no sharing! */
788        return;
789
790    tmp = tr_new( uint8_t, msglen );
791    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
792
793    if( tr_bencLoad( tmp, msglen, &val, NULL ) || !tr_bencIsDict( &val ) ) {
794        dbgmsg( msgs, "GET can't read extended-pex dictionary" );
795        tr_free( tmp );
796        return;
797    }
798
799    sub = tr_bencDictFind( &val, "added" );
800    if( tr_bencIsStr(sub) && ((sub->val.s.i % 6) == 0)) {
801        const int n = sub->val.s.i / 6 ;
802        dbgmsg( msgs, "got %d peers from uT pex", n );
803        tr_peerMgrAddPeers( msgs->handle->peerMgr,
804                            msgs->torrent->info.hash,
805                            TR_PEER_FROM_PEX,
806                            (uint8_t*)sub->val.s.s, n );
807    }
808
809    tr_bencFree( &val );
810    tr_free( tmp );
811}
812
813static void
814sendPex( tr_peermsgs * msgs );
815
816static void
817parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
818{
819    uint8_t ltep_msgid;
820
821    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
822    msglen--;
823
824    if( ltep_msgid == LTEP_HANDSHAKE )
825    {
826        dbgmsg( msgs, "got ltep handshake" );
827        parseLtepHandshake( msgs, msglen, inbuf );
828        sendLtepHandshake( msgs );
829        sendPex( msgs );
830    }
831    else if( ltep_msgid == msgs->ut_pex_id )
832    {
833        dbgmsg( msgs, "got ut pex" );
834        msgs->peerSupportsPex = 1;
835        parseUtPex( msgs, msglen, inbuf );
836    }
837    else
838    {
839        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
840        evbuffer_drain( inbuf, msglen );
841    }
842}
843
844static int
845readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf )
846{
847    uint32_t len;
848    const size_t needlen = sizeof(uint32_t);
849
850    if( EVBUFFER_LENGTH(inbuf) < needlen )
851        return READ_MORE;
852
853    tr_peerIoReadUint32( msgs->io, inbuf, &len );
854
855    if( len == 0 ) /* peer sent us a keepalive message */
856        dbgmsg( msgs, "got KeepAlive" );
857    else {
858        msgs->incomingMessageLength = len;
859        msgs->state = AWAITING_BT_MESSAGE;
860    }
861
862    return READ_AGAIN;
863}
864
865static void
866updatePeerProgress( tr_peermsgs * msgs )
867{
868    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
869    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
870    updateInterest( msgs );
871    firePeerProgress( msgs );
872}
873
874static int
875readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf )
876{
877    uint8_t id;
878    uint32_t ui32;
879    uint32_t msglen = msgs->incomingMessageLength;
880
881    if( EVBUFFER_LENGTH(inbuf) < msglen )
882        return READ_MORE;
883
884    tr_peerIoReadUint8( msgs->io, inbuf, &id );
885    msglen--;
886    dbgmsg( msgs, "got BT id %d, len %d", (int)id, (int)msglen );
887
888    switch( id )
889    {
890        case BT_CHOKE:
891            dbgmsg( msgs, "got Choke" );
892            assert( msglen == 0 );
893            msgs->info->clientIsChoked = 1;
894#if 0
895            tr_list * walk;
896            for( walk = msgs->peerAskedFor; walk != NULL; )
897            {
898                tr_list * next = walk->next;
899                /* We shouldn't reject a peer's fast allowed requests at choke */
900                struct peer_request *req = walk->data;
901                if ( !tr_bitfieldHas( msgs->peerAllowedPieces, req->index ) )
902                {
903                    tr_list_remove_data( &msgs->peerAskedFor, req );
904                    tr_free( req );
905                }
906                walk = next;
907            }
908#endif
909            tr_peerMsgsCancelAllRequests( msgs );
910            break;
911
912        case BT_UNCHOKE:
913            dbgmsg( msgs, "got Unchoke" );
914            assert( msglen == 0 );
915            msgs->info->clientIsChoked = 0;
916            fireNeedReq( msgs );
917            break;
918
919        case BT_INTERESTED:
920            dbgmsg( msgs, "got Interested" );
921            assert( msglen == 0 );
922            msgs->info->peerIsInterested = 1;
923            tr_peerMsgsSetChoke( msgs, 0 );
924            break;
925
926        case BT_NOT_INTERESTED:
927            dbgmsg( msgs, "got Not Interested" );
928            assert( msglen == 0 );
929            msgs->info->peerIsInterested = 0;
930            break;
931
932        case BT_HAVE:
933            assert( msglen == 4 );
934            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
935            tr_bitfieldAdd( msgs->info->have, ui32 );
936            updatePeerProgress( msgs );
937            tr_rcTransferred( msgs->torrent->swarmspeed, msgs->torrent->info.pieceSize );
938            dbgmsg( msgs, "got Have: %u", ui32 );
939            break;
940
941        case BT_BITFIELD: {
942            const int clientIsSeed = tr_cpGetStatus( msgs->torrent->completion ) != TR_CP_INCOMPLETE;
943            assert( msglen == msgs->info->have->len );
944            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
945            updatePeerProgress( msgs );
946            tr_peerMsgsSetChoke( msgs, !clientIsSeed || (msgs->info->progress<1.0) );
947            fireNeedReq( msgs );
948            break;
949        }
950
951        case BT_REQUEST: {
952            struct peer_request * req;
953            assert( msglen == 12 );
954            req = tr_new( struct peer_request, 1 );
955            tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
956            tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
957            tr_peerIoReadUint32( msgs->io, inbuf, &req->length );
958            dbgmsg( msgs, "got Request: %u:%u->%u", req->index, req->offset, req->length );
959           
960            if ( !requestIsValid( msgs, req ) )
961            {
962                dbgmsg( msgs, "BT_REQUEST: invalid request, ignoring" );
963                tr_free( req );
964                break;
965            }
966            /*
967                If we're not choking him -> continue
968                If we're choking him
969                    it doesn't support FPE -> He's deaf, reCHOKE and bail...
970                    it support FPE
971                        If the asked piece is not allowed
972                            OR he's above our threshold
973                            OR we don't have the requested piece -> Reject
974                        Else
975                        Asked piece allowed AND he's below our threshold -> continue...
976             */
977   
978
979            if ( msgs->info->peerIsChoked )
980            {
981                if ( !tr_peerIoSupportsFEXT( msgs->io ) )
982                {
983                    dbgmsg( msgs, "BT_REQUEST: peer is choked, ignoring" );
984                    /* Didn't he get it? */
985                    tr_peerMsgsSetChoke( msgs, 1 );
986                    tr_free( req );
987                    break;
988                }
989                else
990                {
991                    if ( !tr_bitfieldHas( msgs->peerAllowedPieces, req->index )
992                         || ( msgs->info->progress * (float)msgs->torrent->info.pieceCount) >= MAX_ALLOWED_SET_COUNT
993                         || !tr_cpPieceIsComplete( msgs->torrent->completion, req->index ) )
994                    {
995                        dbgmsg( msgs, "BT_REQUEST: peer requests an un-fastallowed piece" );
996                        sendFastReject( msgs, req->index, req->offset, req->length );
997                        tr_free( req );
998                        break;
999                    }
1000                    dbgmsg( msgs, "BT_REQUEST: fast allowed piece, accepting request" );
1001                }   
1002            }
1003           
1004            tr_list_append( &msgs->peerAskedFor, req );
1005            break;
1006        }
1007
1008        case BT_CANCEL: {
1009            struct peer_request req;
1010            void * data;
1011            assert( msglen == 12 );
1012            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1013            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1014            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1015            dbgmsg( msgs, "got a Cancel %u:%u->%u", req.index, req.offset, req.length );
1016            data = tr_list_remove( &msgs->peerAskedFor, &req, compareRequest );
1017            tr_free( data );
1018            break;
1019        }
1020
1021        case BT_PIECE: {
1022            dbgmsg( msgs, "got a Piece!" );
1023            assert( msgs->blockToUs.length == 0 );
1024            tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.index );
1025            tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.offset );
1026            msgs->blockToUs.length = msglen - 8;
1027            assert( EVBUFFER_LENGTH(msgs->inBlock) == 0 );
1028            msgs->state = msgs->blockToUs.length ? READING_BT_PIECE : AWAITING_BT_LENGTH;
1029            return READ_AGAIN;
1030            break;
1031        }
1032
1033        case BT_PORT: {
1034            dbgmsg( msgs, "Got a BT_PORT" );
1035            assert( msglen == 2 );
1036            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1037            break;
1038        }
1039       
1040        case BT_SUGGEST: {
1041            /* tiennou TODO */
1042            break;
1043        }
1044           
1045        case BT_HAVE_ALL: {
1046            assert( msglen == 0 );
1047            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1048            memset( msgs->info->have->bits, 1, msgs->info->have->len );
1049            updatePeerProgress( msgs );
1050            break;
1051        }
1052           
1053        case BT_HAVE_NONE: {
1054            assert( msglen == 0 );
1055            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1056            memset( msgs->info->have->bits, 1, msgs->info->have->len );
1057            updatePeerProgress( msgs );
1058            break;
1059        }
1060           
1061        case BT_REJECT: {
1062            struct peer_request req;
1063            tr_list * node;
1064            assert( msglen == 12 );
1065            dbgmsg( msgs, "Got a BT_REJECT" );
1066            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1067            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1068            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1069            node = tr_list_find( msgs->peerAskedFor, &req, compareRequest );
1070            if( node != NULL ) {
1071                void * data = node->data;
1072                tr_list_remove_data( &msgs->peerAskedFor, data );
1073                tr_free( data );
1074                dbgmsg( msgs, "found the req that peer has rejected... cancelled." );
1075            }
1076            break;
1077        }
1078           
1079        case BT_ALLOWED_FAST: {
1080            assert( msglen == 4 );
1081            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1082            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1083            tr_bitfieldAdd( msgs->clientAllowedPieces, ui32 );
1084            break;
1085        }
1086           
1087        case BT_LTEP:
1088            dbgmsg( msgs, "Got a BT_LTEP" );
1089            parseLtep( msgs, msglen, inbuf );
1090            break;
1091
1092        default:
1093            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1094            tr_peerIoDrain( msgs->io, inbuf, msglen );
1095            assert( 0 );
1096    }
1097
1098    msgs->incomingMessageLength = -1;
1099    msgs->state = AWAITING_BT_LENGTH;
1100    return READ_AGAIN;
1101}
1102
1103static void
1104clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1105{
1106    tr_torrent * tor = msgs->torrent;
1107    tor->activityDate = tr_date( );
1108    tor->downloadedCur += byteCount;
1109    msgs->info->pieceDataActivityDate = time( NULL );
1110    tr_rcTransferred( msgs->info->rcToClient, byteCount );
1111    tr_rcTransferred( tor->download, byteCount );
1112    tr_rcTransferred( tor->handle->download, byteCount );
1113}
1114
1115static void
1116peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1117{
1118    tr_torrent * tor = msgs->torrent;
1119    tor->activityDate = tr_date( );
1120    tor->uploadedCur += byteCount;
1121    msgs->info->pieceDataActivityDate = time( NULL );
1122    tr_rcTransferred( msgs->info->rcToPeer, byteCount );
1123    tr_rcTransferred( tor->upload, byteCount );
1124    tr_rcTransferred( tor->handle->upload, byteCount );
1125}
1126
1127static int
1128canDownload( const tr_peermsgs * msgs )
1129{
1130    tr_torrent * tor = msgs->torrent;
1131
1132    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1133        return !tor->handle->useDownloadLimit || tr_rcCanTransfer( tor->handle->download );
1134
1135    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
1136        return tr_rcCanTransfer( tor->download );
1137
1138    return TRUE;
1139}
1140
1141static void
1142reassignBytesToCorrupt( tr_peermsgs * msgs, uint32_t byteCount )
1143{
1144    tr_torrent * tor = msgs->torrent;
1145
1146    /* increment the `corrupt' field */
1147    tor->corruptCur += byteCount;
1148
1149    /* decrement the `downloaded' field */
1150    if( tor->downloadedCur >= byteCount )
1151        tor->downloadedCur -= byteCount;
1152    else
1153        tor->downloadedCur = 0;
1154}
1155
1156
1157static void
1158gotBadPiece( tr_peermsgs * msgs, uint32_t pieceIndex )
1159{
1160    const uint32_t byteCount = tr_torPieceCountBytes( msgs->torrent, (int)pieceIndex );
1161    reassignBytesToCorrupt( msgs, byteCount );
1162}
1163
1164static void
1165gotUnwantedBlock( tr_peermsgs * msgs,
1166                  uint32_t      index UNUSED,
1167                  uint32_t      offset UNUSED,
1168                  uint32_t      length )
1169{
1170    reassignBytesToCorrupt( msgs, length );
1171}
1172
1173static void
1174addUsToBlamefield( tr_peermsgs * msgs, uint32_t index )
1175{
1176    if( !msgs->info->blame )
1177         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1178    tr_bitfieldAdd( msgs->info->blame, index );
1179}
1180
1181static void
1182gotBlock( tr_peermsgs      * msgs,
1183          struct evbuffer  * inbuf,
1184          uint32_t           index,
1185          uint32_t           offset,
1186          uint32_t           length )
1187{
1188    tr_torrent * tor = msgs->torrent;
1189    const int block = _tr_block( tor, index, offset );
1190    struct peer_request key, *req;
1191
1192    /**
1193    *** Remove the block from our `we asked for this' list
1194    **/
1195
1196    key.index = index;
1197    key.offset = offset;
1198    key.length = length;
1199    req = (struct peer_request*) tr_list_remove( &msgs->clientAskedFor, &key,
1200                                                 compareRequest );
1201    if( req == NULL ) {
1202        gotUnwantedBlock( msgs, index, offset, length );
1203        dbgmsg( msgs, "we didn't ask for this message..." );
1204        return;
1205    }
1206    dbgmsg( msgs, "Got block %u:%u->%u (turnaround time %d secs)", 
1207                     req->index, req->offset, req->length,
1208                     (int)(time(NULL) - req->time_requested) );
1209    tr_free( req );
1210    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1211                  tr_list_size(msgs->clientAskedFor));
1212
1213    /**
1214    *** Error checks
1215    **/
1216
1217    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1218        dbgmsg( msgs, "have this block already..." );
1219        tr_dbg( "have this block already..." );
1220        gotUnwantedBlock( msgs, index, offset, length );
1221        return;
1222    }
1223
1224    if( (int)length != tr_torBlockCountBytes( tor, block ) ) {
1225        dbgmsg( msgs, "block is the wrong length..." );
1226        tr_dbg( "block is the wrong length..." );
1227        gotUnwantedBlock( msgs, index, offset, length );
1228        return;
1229    }
1230
1231    /**
1232    ***  Write the block
1233    **/
1234
1235    if( tr_ioWrite( tor, index, offset, length, EVBUFFER_DATA( inbuf )))
1236        return;
1237
1238    tr_cpBlockAdd( tor->completion, block );
1239
1240    addUsToBlamefield( msgs, index );
1241
1242    fireGotBlock( msgs, index, offset, length );
1243
1244    /**
1245    ***  Handle if this was the last block in the piece
1246    **/
1247
1248    if( tr_cpPieceIsComplete( tor->completion, index ) )
1249    {
1250        if( tr_ioHash( tor, index ) )
1251        {
1252            gotBadPiece( msgs, index );
1253            return;
1254        }
1255
1256        fireClientHave( msgs, index );
1257    }
1258}
1259
1260
1261static ReadState
1262readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf )
1263{
1264    uint32_t inlen;
1265    uint8_t * tmp;
1266
1267    assert( msgs != NULL );
1268    assert( msgs->blockToUs.length > 0 );
1269    assert( inbuf != NULL );
1270    assert( EVBUFFER_LENGTH( inbuf ) > 0 );
1271
1272    /* read from the inbuf into our block buffer */
1273    inlen = MIN( EVBUFFER_LENGTH(inbuf), msgs->blockToUs.length );
1274    tmp = tr_new( uint8_t, inlen );
1275    tr_peerIoReadBytes( msgs->io, inbuf, tmp, inlen );
1276    evbuffer_add( msgs->inBlock, tmp, inlen );
1277
1278    /* update our tables accordingly */
1279    assert( inlen >= msgs->blockToUs.length );
1280    msgs->blockToUs.length -= inlen;
1281    msgs->info->peerSentPieceDataAt = time( NULL );
1282    clientGotBytes( msgs, inlen );
1283
1284    /* if this was the entire block, save it */
1285    if( !msgs->blockToUs.length )
1286    {
1287        dbgmsg( msgs, "got block %u:%u", msgs->blockToUs.index, msgs->blockToUs.offset );
1288        assert( (int)EVBUFFER_LENGTH( msgs->inBlock ) == tr_torBlockCountBytes( msgs->torrent, _tr_block(msgs->torrent,msgs->blockToUs.index, msgs->blockToUs.offset) ) );
1289        gotBlock( msgs, msgs->inBlock,
1290                        msgs->blockToUs.index,
1291                        msgs->blockToUs.offset,
1292                        EVBUFFER_LENGTH( msgs->inBlock ) );
1293        evbuffer_drain( msgs->inBlock, ~0 );
1294        msgs->state = AWAITING_BT_LENGTH;
1295    }
1296
1297    /* cleanup */
1298    tr_free( tmp );
1299    return READ_AGAIN;
1300}
1301
1302static ReadState
1303canRead( struct bufferevent * evin, void * vmsgs )
1304{
1305    ReadState ret;
1306    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1307    struct evbuffer * inbuf = EVBUFFER_INPUT ( evin );
1308
1309    if( !canDownload( msgs ) )
1310    {
1311        msgs->notListening = 1;
1312        tr_peerIoSetIOMode ( msgs->io, 0, EV_READ );
1313        ret = READ_DONE;
1314    }
1315    else switch( msgs->state )
1316    {
1317        case AWAITING_BT_LENGTH:  ret = readBtLength  ( msgs, inbuf ); break;
1318        case AWAITING_BT_MESSAGE: ret = readBtMessage ( msgs, inbuf ); break;
1319        case READING_BT_PIECE:    ret = readBtPiece   ( msgs, inbuf ); break;
1320        default: assert( 0 );
1321    }
1322
1323    return ret;
1324}
1325
1326static void
1327sendKeepalive( tr_peermsgs * msgs )
1328{
1329    dbgmsg( msgs, "sending a keepalive message" );
1330    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1331}
1332
1333/**
1334***
1335**/
1336
1337static int
1338canWrite( const tr_peermsgs * msgs )
1339{
1340    /* don't let our outbuffer get too large */
1341    if( tr_peerIoWriteBytesWaiting( msgs->io ) > 8192 )
1342        return FALSE;
1343
1344    return TRUE;
1345}
1346
1347static int
1348canUpload( const tr_peermsgs * msgs )
1349{
1350    const tr_torrent * tor = msgs->torrent;
1351
1352    if( !canWrite( msgs ) )
1353        return FALSE;
1354
1355    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1356        return !tor->handle->useUploadLimit || tr_rcCanTransfer( tor->handle->upload );
1357
1358    if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
1359        return tr_rcCanTransfer( tor->upload );
1360
1361    return TRUE;
1362}
1363
1364static int
1365ratePulse( void * vmsgs )
1366{
1367    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1368    msgs->info->rateToClient = tr_rcRate( msgs->info->rcToClient );
1369    msgs->info->rateToPeer = tr_rcRate( msgs->info->rcToPeer );
1370    msgs->maxActiveRequests = MIN( 8 + (int)(msgs->info->rateToClient/10), 100 );
1371    msgs->minActiveRequests = msgs->maxActiveRequests / 2;
1372    return TRUE;
1373}
1374
1375static int
1376pulse( void * vmsgs )
1377{
1378    const time_t now = time( NULL );
1379    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1380    size_t len;
1381
1382    /* if we froze out a downloaded block because of speed limits,
1383       start listening to the peer again */
1384    if( msgs->notListening && canDownload( msgs ) )
1385    {
1386        msgs->notListening = 0;
1387        tr_peerIoSetIOMode ( msgs->io, EV_READ, 0 );
1388    }
1389
1390    pumpRequestQueue( msgs );
1391
1392    if( !canWrite( msgs ) )
1393    {
1394    }
1395    else if(( len = EVBUFFER_LENGTH( msgs->outMessages ) ))
1396    {
1397        tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1398        msgs->clientSentAnythingAt = now;
1399    }
1400    else if(( msgs->peerAskedFor ))
1401    {
1402        if( canUpload( msgs ) )
1403        {
1404            struct peer_request * r = tr_list_pop_front( &msgs->peerAskedFor );
1405            uint8_t * buf = tr_new( uint8_t, r->length );
1406
1407            if( requestIsValid( msgs, r )
1408                && tr_cpPieceIsComplete( msgs->torrent->completion, r->index )
1409                && !tr_ioRead( msgs->torrent, r->index, r->offset, r->length, buf ) )
1410            {
1411                protocolSendPiece( msgs, r, buf );
1412                peerGotBytes( msgs, r->length );
1413                msgs->clientSentAnythingAt = now;
1414            }
1415
1416            tr_free( buf );
1417            tr_free( r );
1418        }
1419    }
1420    else if( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
1421    {
1422        sendKeepalive( msgs );
1423    }
1424
1425    return TRUE; /* loop forever */
1426}
1427
1428static void
1429didWrite( struct bufferevent * evin UNUSED, void * vmsgs )
1430{
1431    pulse( vmsgs );
1432}
1433
1434static void
1435gotError( struct bufferevent * evbuf UNUSED, short what, void * vmsgs )
1436{
1437    dbgmsg( vmsgs, "libevent got an error! what=%d, errno=%d (%s)",
1438            (int)what, errno, strerror(errno) );
1439    fireGotError( vmsgs );
1440}
1441
1442static void
1443sendBitfield( tr_peermsgs * msgs )
1444{
1445    const tr_bitfield * bitfield = tr_cpPieceBitfield( msgs->torrent->completion );
1446    struct evbuffer * out = msgs->outMessages;
1447
1448    dbgmsg( msgs, "sending peer a bitfield message" );
1449    tr_peerIoWriteUint32( msgs->io, out, sizeof(uint8_t) + bitfield->len );
1450    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1451    tr_peerIoWriteBytes ( msgs->io, out, bitfield->bits, bitfield->len );
1452}
1453
1454/**
1455***
1456**/
1457
1458/* some peers give us error messages if we send
1459   more than this many peers in a single pex message */
1460#define MAX_PEX_DIFFS 200
1461
1462typedef struct
1463{
1464    tr_pex * added;
1465    tr_pex * dropped;
1466    tr_pex * elements;
1467    int addedCount;
1468    int droppedCount;
1469    int elementCount;
1470    int diffCount;
1471}
1472PexDiffs;
1473
1474static void
1475pexAddedCb( void * vpex, void * userData )
1476{
1477    PexDiffs * diffs = (PexDiffs *) userData;
1478    tr_pex * pex = (tr_pex *) vpex;
1479    if( diffs->diffCount < MAX_PEX_DIFFS )
1480    {
1481        diffs->diffCount++;
1482        diffs->added[diffs->addedCount++] = *pex;
1483        diffs->elements[diffs->elementCount++] = *pex;
1484    }
1485}
1486
1487static void
1488pexRemovedCb( void * vpex, void * userData )
1489{
1490    PexDiffs * diffs = (PexDiffs *) userData;
1491    tr_pex * pex = (tr_pex *) vpex;
1492    if( diffs->diffCount < MAX_PEX_DIFFS )
1493    {
1494        diffs->diffCount++;
1495        diffs->dropped[diffs->droppedCount++] = *pex;
1496    }
1497}
1498
1499static void
1500pexElementCb( void * vpex, void * userData )
1501{
1502    PexDiffs * diffs = (PexDiffs *) userData;
1503    tr_pex * pex = (tr_pex *) vpex;
1504    if( diffs->diffCount < MAX_PEX_DIFFS )
1505    {
1506        diffs->diffCount++;
1507        diffs->elements[diffs->elementCount++] = *pex;
1508    }
1509}
1510
1511static void
1512sendPex( tr_peermsgs * msgs )
1513{
1514    if( msgs->peerSupportsPex && !msgs->torrent->pexDisabled )
1515    {
1516        int i;
1517        tr_pex * newPex = NULL;
1518        const int newCount = tr_peerMgrGetPeers( msgs->handle->peerMgr, msgs->torrent->info.hash, &newPex );
1519        PexDiffs diffs;
1520        benc_val_t val, *added, *dropped, *flags;
1521        uint8_t *tmp, *walk;
1522        char * benc;
1523        int bencLen;
1524
1525        /* build the diffs */
1526        diffs.added = tr_new( tr_pex, newCount );
1527        diffs.addedCount = 0;
1528        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1529        diffs.droppedCount = 0;
1530        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1531        diffs.elementCount = 0;
1532        diffs.diffCount = 0;
1533        tr_set_compare( msgs->pex, msgs->pexCount,
1534                        newPex, newCount,
1535                        tr_pexCompare, sizeof(tr_pex),
1536                        pexRemovedCb, pexAddedCb, pexElementCb, &diffs );
1537        dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1538
1539        /* update peer */
1540        tr_free( msgs->pex );
1541        msgs->pex = diffs.elements;
1542        msgs->pexCount = diffs.elementCount;
1543
1544        /* build the pex payload */
1545        tr_bencInit( &val, TYPE_DICT );
1546        tr_bencDictReserve( &val, 3 );
1547
1548        /* "added" */
1549        added = tr_bencDictAdd( &val, "added" );
1550        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1551        for( i=0; i<diffs.addedCount; ++i ) {
1552            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1553            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1554        }
1555        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1556        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
1557
1558        /* "added.f" */
1559        flags = tr_bencDictAdd( &val, "added.f" );
1560        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1561        for( i=0; i<diffs.addedCount; ++i )
1562            *walk++ = diffs.added[i].flags;
1563        assert( ( walk - tmp ) == diffs.addedCount );
1564        tr_bencInitStr( flags, tmp, walk-tmp, FALSE );
1565
1566        /* "dropped" */
1567        dropped = tr_bencDictAdd( &val, "dropped" );
1568        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1569        for( i=0; i<diffs.droppedCount; ++i ) {
1570            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1571            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1572        }
1573        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1574        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
1575
1576        /* write the pex message */
1577        benc = tr_bencSaveMalloc( &val, &bencLen );
1578        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 2*sizeof(uint8_t) + bencLen );
1579        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_LTEP );
1580        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, OUR_LTEP_PEX );
1581        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, benc, bencLen );
1582
1583        /* cleanup */
1584        tr_free( benc );
1585        tr_bencFree( &val );
1586        tr_free( diffs.added );
1587        tr_free( diffs.dropped );
1588        tr_free( newPex );
1589
1590        msgs->clientSentPexAt = time( NULL );
1591    }
1592}
1593
1594static int
1595pexPulse( void * vpeer )
1596{
1597    sendPex( vpeer );
1598    return TRUE;
1599}
1600
1601/**
1602***
1603**/
1604
1605tr_peermsgs*
1606tr_peerMsgsNew( struct tr_torrent * torrent,
1607                struct tr_peer    * info,
1608                tr_delivery_func    func,
1609                void              * userData,
1610                tr_publisher_tag  * setme )
1611{
1612    tr_peermsgs * m;
1613
1614    assert( info != NULL );
1615    assert( info->io != NULL );
1616
1617    m = tr_new0( tr_peermsgs, 1 );
1618    m->publisher = tr_publisherNew( );
1619    m->info = info;
1620    m->handle = torrent->handle;
1621    m->torrent = torrent;
1622    m->io = info->io;
1623    m->info->clientIsChoked = 1;
1624    m->info->peerIsChoked = 1;
1625    m->info->clientIsInterested = 0;
1626    m->info->peerIsInterested = 0;
1627    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1628    m->pulseTimer = tr_timerNew( m->handle, pulse, m, PEER_PULSE_INTERVAL );
1629    m->rateTimer = tr_timerNew( m->handle, ratePulse, m, RATE_PULSE_INTERVAL );
1630    m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL );
1631    m->outMessages = evbuffer_new( );
1632    m->inBlock = evbuffer_new( );
1633    m->peerAllowedPieces = NULL;
1634    m->clientAllowedPieces = NULL;
1635    setme = tr_publisherSubscribe( m->publisher, func, userData );
1636    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* error if we don't read or write for 2.5 minutes */
1637   
1638    if ( tr_peerIoSupportsFEXT( m->io ) )
1639    {
1640        /* This peer is fastpeer-enabled, generate its allowed set
1641         * (before registering our callbacks) */
1642        if ( !m->peerAllowedPieces ) {
1643            const struct in_addr *peerAddr = tr_peerIoGetAddress( m->io, NULL );
1644           
1645            m->peerAllowedPieces = tr_peerMgrGenerateAllowedSet( MAX_ALLOWED_SET_COUNT,
1646                                                                 m->torrent->info.pieceCount,
1647                                                                 m->torrent->info.hash,
1648                                                                 peerAddr );
1649        }
1650        m->clientAllowedPieces = tr_bitfieldNew( m->torrent->info.pieceCount );
1651    }
1652   
1653    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
1654    tr_peerIoSetIOMode( m->io, EV_READ|EV_WRITE, 0 );
1655    ratePulse( m );
1656
1657    /**
1658    ***  If we initiated this connection,
1659    ***  we may need to send LTEP/AZMP handshakes.
1660    ***  Otherwise we'll wait for the peer to send theirs first.
1661    **/
1662    if( !tr_peerIoIsIncoming( m->io ) )
1663    {
1664        if ( tr_peerIoSupportsLTEP( m->io ) ) {
1665            sendLtepHandshake( m );
1666           
1667        } else if ( tr_peerIoSupportsAZMP( m->io ) ) {
1668            dbgmsg( m, "FIXME: need to send AZMP handshake" );
1669           
1670        } else {
1671            /* no-op */
1672        }
1673    }
1674   
1675    if ( tr_peerIoSupportsFEXT( m->io ) )
1676    {
1677        /* This peer is fastpeer-enabled, send it have-all or have-none if appropriate */
1678        float completion = tr_cpPercentComplete( m->torrent->completion );
1679        if ( completion == 0.0f ) {
1680            sendFastHave( m, 0 );
1681        } else if ( completion == 1.0f ) {
1682            sendFastHave( m, 1 );
1683        } else {
1684            sendBitfield( m );
1685        }
1686        uint32_t peerProgress = m->torrent->info.pieceCount * m->info->progress;
1687       
1688        if ( peerProgress < MAX_ALLOWED_SET_COUNT )
1689            sendFastAllowedSet( m );
1690    } else {
1691        sendBitfield( m );
1692    }
1693    return m;
1694}
1695
1696void
1697tr_peerMsgsFree( tr_peermsgs* msgs )
1698{
1699    if( msgs != NULL )
1700    {
1701        tr_timerFree( &msgs->pulseTimer );
1702        tr_timerFree( &msgs->rateTimer );
1703        tr_timerFree( &msgs->pexTimer );
1704        tr_publisherFree( &msgs->publisher );
1705        tr_list_free( &msgs->clientWillAskFor, tr_free );
1706        tr_list_free( &msgs->clientAskedFor, tr_free );
1707        tr_list_free( &msgs->peerAskedFor, tr_free );
1708        evbuffer_free( msgs->outMessages );
1709        evbuffer_free( msgs->inBlock );
1710        tr_free( msgs->pex );
1711        msgs->pexCount = 0;
1712        tr_free( msgs );
1713    }
1714}
1715
1716tr_publisher_tag
1717tr_peerMsgsSubscribe( tr_peermsgs       * peer,
1718                      tr_delivery_func    func,
1719                      void              * userData )
1720{
1721    return tr_publisherSubscribe( peer->publisher, func, userData );
1722}
1723
1724void
1725tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
1726                        tr_publisher_tag    tag )
1727{
1728    tr_publisherUnsubscribe( peer->publisher, tag );
1729}
1730
1731int
1732tr_peerMsgIsPieceFastAllowed( const tr_peermsgs * peer,
1733                              uint32_t            index )
1734{
1735    return tr_bitfieldHas( peer->clientAllowedPieces, index );
1736}
1737
Note: See TracBrowser for help on using the repository browser.