source: trunk/libtransmission/peer-msgs.c @ 3864

Last change on this file since 3864 was 3864, checked in by charles, 15 years ago

reintroduce the "SWIFT" algorithm

  • Property svn:keywords set to Date Rev Author Id
File size: 54.3 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 3864 2007-11-18 01:00:49Z charles $
11 */
12
13#include <assert.h>
14#include <ctype.h>
15#include <errno.h>
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19#include <libgen.h> /* basename */
20
21#include <netinet/in.h> /* struct in_addr */
22
23#include <event.h>
24
25#include "transmission.h"
26#include "bencode.h"
27#include "completion.h"
28#include "inout.h"
29#include "list.h"
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-mgr-private.h"
33#include "peer-msgs.h"
34#include "ratecontrol.h"
35#include "trevent.h"
36#include "utils.h"
37
38/**
39***
40**/
41
42enum
43{
44    BT_CHOKE                = 0,
45    BT_UNCHOKE              = 1,
46    BT_INTERESTED           = 2,
47    BT_NOT_INTERESTED       = 3,
48    BT_HAVE                 = 4,
49    BT_BITFIELD             = 5,
50    BT_REQUEST              = 6,
51    BT_PIECE                = 7,
52    BT_CANCEL               = 8,
53    BT_PORT                 = 9,
54    BT_SUGGEST              = 13,
55    BT_HAVE_ALL             = 14,
56    BT_HAVE_NONE            = 15,
57    BT_REJECT               = 16,
58    BT_ALLOWED_FAST         = 17,
59    BT_LTEP                 = 20,
60
61    LTEP_HANDSHAKE          = 0,
62
63    OUR_LTEP_PEX            = 1,
64
65    MAX_REQUEST_BYTE_COUNT  = (16 * 1024), /* drop requests who want too much */
66
67    /* idle seconds before we send a keepalive */
68    KEEPALIVE_INTERVAL_SECS = 90,
69
70    PEX_INTERVAL            = (60 * 1000), /* msec between calls to sendPex() */
71    PEER_PULSE_INTERVAL     = (250),       /* msec between calls to pulse() */
72    RATE_PULSE_INTERVAL     = (333),       /* msec between calls to ratePulse() */
73     
74    /* Fast Peers Extension constants */
75    MAX_FAST_ALLOWED_COUNT   = 10,          /* max. number of pieces we fast-allow to another peer */
76    MAX_FAST_ALLOWED_THRESHOLD = 10,        /* max threshold for allowing fast-pieces requests */
77
78};
79
80enum
81{
82    AWAITING_BT_LENGTH,
83    AWAITING_BT_ID,
84    AWAITING_BT_MESSAGE,
85    AWAITING_BT_PIECE
86};
87
88struct peer_request
89{
90    uint32_t index;
91    uint32_t offset;
92    uint32_t length;
93    time_t time_requested;
94};
95
96static int
97compareRequest( const void * va, const void * vb )
98{
99    int i;
100    const struct peer_request * a = va;
101    const struct peer_request * b = vb;
102    if(( i = tr_compareUint32( a->index, b->index ))) return i;
103    if(( i = tr_compareUint32( a->offset, b->offset ))) return i;
104    if(( i = tr_compareUint32( a->length, b->length ))) return i;
105    return 0;
106}
107
108/* this is raw, unchanged data from the peer regarding
109 * the current message that it's sending us. */
110struct tr_incoming
111{
112    uint32_t length; /* includes the +1 for id length */
113    uint8_t id;
114    struct peer_request blockReq; /* metadata for incoming blocks */
115    struct evbuffer * block; /* piece data for incoming blocks */
116};
117
118struct tr_peermsgs
119{
120    tr_peer * info;
121
122    tr_handle * handle;
123    tr_torrent * torrent;
124    tr_peerIo * io;
125
126    tr_publisher_t * publisher;
127
128    struct evbuffer * outBlock;    /* buffer of all the current piece message */
129    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
130    tr_list * peerAskedFor;
131    tr_list * peerAskedForFast;
132    tr_list * clientAskedFor;
133    tr_list * clientWillAskFor;
134
135    tr_timer * rateTimer;
136    tr_timer * pulseTimer;
137    tr_timer * pexTimer;
138
139    time_t lastReqAddedAt;
140    time_t clientSentPexAt;
141    time_t clientSentAnythingAt;
142
143    unsigned int peerSentBitfield         : 1;
144    unsigned int peerSupportsPex          : 1;
145    unsigned int clientSentLtepHandshake  : 1;
146    unsigned int peerSentLtepHandshake    : 1;
147   
148    tr_bitfield * clientAllowedPieces;
149    tr_bitfield * peerAllowedPieces;
150   
151    tr_bitfield * clientSuggestedPieces;
152   
153    uint8_t state;
154    uint8_t ut_pex_id;
155    uint16_t pexCount;
156    uint32_t maxActiveRequests;
157    uint32_t minActiveRequests;
158
159    struct tr_incoming incoming; 
160
161    tr_pex * pex;
162};
163
164/**
165***
166**/
167
168static void
169myDebug( const char * file, int line,
170         const struct tr_peermsgs * msgs,
171         const char * fmt, ... )
172{
173    FILE * fp = tr_getLog( );
174    if( fp != NULL )
175    {
176        va_list args;
177        char timestr[64];
178        struct evbuffer * buf = evbuffer_new( );
179        char * myfile = tr_strdup( file );
180
181        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
182                             tr_getLogTimeStr( timestr, sizeof(timestr) ),
183                             msgs->torrent->info.name,
184                             tr_peerIoGetAddrStr( msgs->io ),
185                             msgs->info->client );
186        va_start( args, fmt );
187        evbuffer_add_vprintf( buf, fmt, args );
188        va_end( args );
189        evbuffer_add_printf( buf, " (%s:%d)\n", basename(myfile), line );
190        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
191
192        tr_free( myfile );
193        evbuffer_free( buf );
194    }
195}
196
197#define dbgmsg(msgs, fmt...) myDebug(__FILE__, __LINE__, msgs, ##fmt )
198
199/**
200***
201**/
202
203static void
204protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
205{
206    tr_peerIo * io = msgs->io;
207    struct evbuffer * out = msgs->outMessages;
208
209    dbgmsg( msgs, "requesting %u:%u->%u", req->index, req->offset, req->length );
210    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
211    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
212    tr_peerIoWriteUint32( io, out, req->index );
213    tr_peerIoWriteUint32( io, out, req->offset );
214    tr_peerIoWriteUint32( io, out, req->length );
215}
216
217static void
218protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
219{
220    tr_peerIo * io = msgs->io;
221    struct evbuffer * out = msgs->outMessages;
222
223    dbgmsg( msgs, "cancelling %u:%u->%u", req->index, req->offset, req->length );
224    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
225    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
226    tr_peerIoWriteUint32( io, out, req->index );
227    tr_peerIoWriteUint32( io, out, req->offset );
228    tr_peerIoWriteUint32( io, out, req->length );
229}
230
231static void
232protocolSendHave( tr_peermsgs * msgs, uint32_t index )
233{
234    tr_peerIo * io = msgs->io;
235    struct evbuffer * out = msgs->outMessages;
236
237    dbgmsg( msgs, "sending Have %u", index );
238    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
239    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
240    tr_peerIoWriteUint32( io, out, index );
241}
242
243static void
244protocolSendChoke( tr_peermsgs * msgs, int choke )
245{
246    tr_peerIo * io = msgs->io;
247    struct evbuffer * out = msgs->outMessages;
248
249    dbgmsg( msgs, "sending %s", (choke ? "Choke" : "Unchoke") );
250    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) );
251    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
252}
253
254/**
255***  EVENTS
256**/
257
258static const tr_peermsgs_event blankEvent = { 0, 0, 0, 0, 0.0f };
259
260static void
261publish( tr_peermsgs * msgs, tr_peermsgs_event * e )
262{
263    tr_publisherPublish( msgs->publisher, msgs->info, e );
264}
265
266static void
267fireGotAssertError( tr_peermsgs * msgs )
268{
269    tr_peermsgs_event e = blankEvent;
270    e.eventType = TR_PEERMSG_GOT_ASSERT_ERROR;
271    publish( msgs, &e );
272}
273
274static void
275fireGotError( tr_peermsgs * msgs )
276{
277    tr_peermsgs_event e = blankEvent;
278    e.eventType = TR_PEERMSG_GOT_ERROR;
279    publish( msgs, &e );
280}
281
282static void
283fireNeedReq( tr_peermsgs * msgs )
284{
285    tr_peermsgs_event e = blankEvent;
286    e.eventType = TR_PEERMSG_NEED_REQ;
287    publish( msgs, &e );
288}
289
290static void
291firePeerProgress( tr_peermsgs * msgs )
292{
293    tr_peermsgs_event e = blankEvent;
294    e.eventType = TR_PEERMSG_PEER_PROGRESS;
295    e.progress = msgs->info->progress;
296    publish( msgs, &e );
297}
298
299static void
300fireClientHave( tr_peermsgs * msgs, uint32_t pieceIndex )
301{
302    tr_peermsgs_event e = blankEvent;
303    e.eventType = TR_PEERMSG_CLIENT_HAVE;
304    e.pieceIndex = pieceIndex;
305    publish( msgs, &e );
306}
307
308static void
309fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
310{
311    tr_peermsgs_event e = blankEvent;
312    e.eventType = TR_PEERMSG_CLIENT_BLOCK;
313    e.pieceIndex = req->index;
314    e.offset = req->offset;
315    e.length = req->length;
316    publish( msgs, &e );
317}
318
319static void
320fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
321{
322    tr_peermsgs_event e = blankEvent;
323    e.eventType = TR_PEERMSG_CANCEL;
324    e.pieceIndex = req->index;
325    e.offset = req->offset;
326    e.length = req->length;
327    publish( msgs, &e );
328}
329
330/**
331***  INTEREST
332**/
333
334static int
335isPieceInteresting( const tr_peermsgs   * peer,
336                    int                   piece )
337{
338    const tr_torrent * torrent = peer->torrent;
339    if( torrent->info.pieces[piece].dnd ) /* we don't want it */
340        return FALSE;
341    if( tr_cpPieceIsComplete( torrent->completion, piece ) ) /* we have it */
342        return FALSE;
343    if( !tr_bitfieldHas( peer->info->have, piece ) ) /* peer doesn't have it */
344        return FALSE;
345    if( tr_bitfieldHas( peer->info->banned, piece ) ) /* peer is banned */
346        return FALSE;
347    return TRUE;
348}
349
350/* "interested" means we'll ask for piece data if they unchoke us */
351static int
352isPeerInteresting( const tr_peermsgs * msgs )
353{
354    int i;
355    const tr_torrent * torrent;
356    const tr_bitfield * bitfield;
357    const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
358
359    if( clientIsSeed )
360        return FALSE;
361
362    torrent = msgs->torrent;
363    bitfield = tr_cpPieceBitfield( torrent->completion );
364
365    if( !msgs->info->have )
366        return TRUE;
367
368    assert( bitfield->len == msgs->info->have->len );
369    for( i=0; i<torrent->info.pieceCount; ++i )
370        if( isPieceInteresting( msgs, i ) )
371            return TRUE;
372
373    return FALSE;
374}
375
376static void
377sendInterest( tr_peermsgs * msgs, int weAreInterested )
378{
379    assert( msgs != NULL );
380    assert( weAreInterested==0 || weAreInterested==1 );
381
382    msgs->info->clientIsInterested = weAreInterested;
383    dbgmsg( msgs, "Sending %s",
384            weAreInterested ? "Interested" : "Not Interested");
385
386    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
387    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages,
388                   weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
389}
390
391static void
392updateInterest( tr_peermsgs * msgs )
393{
394    const int i = isPeerInteresting( msgs );
395    if( i != msgs->info->clientIsInterested )
396        sendInterest( msgs, i );
397    if( i )
398        fireNeedReq( msgs );
399}
400
401#define MIN_CHOKE_PERIOD_SEC 10
402
403static void
404cancelAllRequestsToClientExceptFast( tr_peermsgs * msgs )
405{
406    tr_list_free( &msgs->peerAskedFor, tr_free );
407}
408
409void
410tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
411{
412    const time_t fibrillationTime = time(NULL) - MIN_CHOKE_PERIOD_SEC;
413
414    assert( msgs != NULL );
415    assert( msgs->info != NULL );
416    assert( choke==0 || choke==1 );
417
418    if( msgs->info->chokeChangedAt > fibrillationTime )
419    {
420        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
421    }
422    else if( msgs->info->peerIsChoked != choke )
423    {
424        msgs->info->peerIsChoked = choke;
425        if( choke )
426            cancelAllRequestsToClientExceptFast( msgs );
427        protocolSendChoke( msgs, choke );
428        msgs->info->chokeChangedAt = time( NULL );
429    }
430}
431
432/**
433***
434**/
435
436void
437tr_peerMsgsHave( tr_peermsgs * msgs,
438                 uint32_t      index )
439{
440    protocolSendHave( msgs, index );
441
442    /* since we have more pieces now, we might not be interested in this peer */
443    updateInterest( msgs );
444}
445#if 0
446static void
447sendFastSuggest( tr_peermsgs * msgs,
448                 uint32_t      pieceIndex )
449{
450    assert( msgs != NULL );
451   
452    if( tr_peerIoSupportsFEXT( msgs->io ) )
453    {
454        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
455        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
456        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
457    }
458}
459#endif
460static void
461sendFastHave( tr_peermsgs * msgs, int all )
462{
463    assert( msgs != NULL );
464   
465    if( tr_peerIoSupportsFEXT( msgs->io ) )
466    {
467        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
468        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, ( all ? BT_HAVE_ALL
469                                                                : BT_HAVE_NONE ) );
470        updateInterest( msgs );
471    }
472}
473
474static void
475sendFastReject( tr_peermsgs * msgs,
476                uint32_t      pieceIndex,
477                uint32_t      offset,
478                uint32_t      length )
479{
480    assert( msgs != NULL );
481
482    if( tr_peerIoSupportsFEXT( msgs->io ) )
483    {
484        const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
485        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
486        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_REJECT );
487        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
488        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
489        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
490    }
491}
492
493static void
494sendFastAllowed( tr_peermsgs * msgs,
495                 uint32_t      pieceIndex)
496{
497    assert( msgs != NULL );
498   
499    if( tr_peerIoSupportsFEXT( msgs->io ) )
500    {
501        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
502        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_ALLOWED_FAST );
503        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
504    }
505}
506
507
508
509static void
510sendFastAllowedSet( tr_peermsgs * msgs )
511{
512    int i = 0;
513    while (i <= msgs->torrent->info.pieceCount )
514    {
515        if ( tr_bitfieldHas( msgs->peerAllowedPieces, i) )
516            sendFastAllowed( msgs, i );
517        i++;
518    }
519}
520
521
522/**
523***
524**/
525
526static int
527reqIsValid( const tr_peermsgs   * msgs,
528            uint32_t              index,
529            uint32_t              offset,
530            uint32_t              length )
531{
532    const tr_torrent * tor = msgs->torrent;
533
534    if( index >= (uint32_t) tor->info.pieceCount )
535        return FALSE;
536    if ( (int)offset >= tr_torPieceCountBytes( tor, (int)index ) )
537        return FALSE;
538    if( length > MAX_REQUEST_BYTE_COUNT )
539        return FALSE;
540    if( tr_pieceOffset( tor, index, offset, length ) > tor->info.totalSize )
541        return FALSE;
542
543    return TRUE;
544}
545
546static int
547requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
548{
549    return reqIsValid( msgs, req->index, req->offset, req->length );
550}
551
552static void
553pumpRequestQueue( tr_peermsgs * msgs )
554{
555    const int max = msgs->maxActiveRequests;
556    const int min = msgs->minActiveRequests;
557    int count = tr_list_size( msgs->clientAskedFor );
558    int sent = 0;
559
560    if( count > min )
561        return;
562    if( msgs->info->clientIsChoked )
563        return;
564
565    while( ( count < max ) && ( msgs->clientWillAskFor != NULL ) )
566    {
567        struct peer_request * r = tr_list_pop_front( &msgs->clientWillAskFor );
568        protocolSendRequest( msgs, r );
569        r->time_requested = msgs->lastReqAddedAt = time( NULL );
570        tr_list_append( &msgs->clientAskedFor, r );
571        ++count;
572        ++sent;
573    }
574
575    if( sent )
576        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
577                sent,
578                tr_list_size(msgs->clientAskedFor),
579                tr_list_size(msgs->clientWillAskFor) );
580
581    if( count < max )
582        fireNeedReq( msgs );
583}
584
585static int
586pulse( void * vmsgs );
587
588int
589tr_peerMsgsAddRequest( tr_peermsgs * msgs,
590                       uint32_t      index, 
591                       uint32_t      offset, 
592                       uint32_t      length )
593{
594    const int req_max = msgs->maxActiveRequests;
595    struct peer_request tmp, *req;
596
597    assert( msgs != NULL );
598    assert( msgs->torrent != NULL );
599    assert( reqIsValid( msgs, index, offset, length ) );
600
601    /**
602    ***  Reasons to decline the request
603    **/
604
605    /* don't send requests to choked clients */
606    if( msgs->info->clientIsChoked ) {
607        dbgmsg( msgs, "declining request because they're choking us" );
608        return TR_ADDREQ_CLIENT_CHOKED;
609    }
610
611    /* peer doesn't have this piece */
612    if( !tr_bitfieldHas( msgs->info->have, index ) )
613        return TR_ADDREQ_MISSING;
614
615    /* peer's queue is full */
616    if( tr_list_size( msgs->clientWillAskFor ) >= req_max ) {
617        dbgmsg( msgs, "declining request because we're full" );
618        return TR_ADDREQ_FULL;
619    }
620
621    /* have we already asked for this piece? */
622    tmp.index = index;
623    tmp.offset = offset;
624    tmp.length = length;
625    if( tr_list_find( msgs->clientAskedFor, &tmp, compareRequest ) ) {
626        dbgmsg( msgs, "declining because it's a duplicate" );
627        return TR_ADDREQ_DUPLICATE;
628    }
629    if( tr_list_find( msgs->clientWillAskFor, &tmp, compareRequest ) ) {
630        dbgmsg( msgs, "declining because it's a duplicate" );
631        return TR_ADDREQ_DUPLICATE;
632    }
633
634    /**
635    ***  Accept this request
636    **/
637
638    dbgmsg( msgs, "added req for piece %d, offset %d", (int)index, (int)offset );
639    req = tr_new0( struct peer_request, 1 );
640    *req = tmp;
641    tr_list_append( &msgs->clientWillAskFor, req );
642    pulse( msgs );
643    return TR_ADDREQ_OK;
644}
645
646static void
647cancelAllRequestsToPeer( tr_peermsgs * msgs )
648{
649    struct peer_request * req;
650
651    while(( req = tr_list_pop_front( &msgs->clientWillAskFor ) ))
652    {
653        fireCancelledReq( msgs, req );
654        tr_free( req );
655    }
656
657    while(( req = tr_list_pop_front( &msgs->clientAskedFor ) ))
658    {
659        fireCancelledReq( msgs, req );
660        protocolSendCancel( msgs, req );
661        tr_free( req );
662    }
663}
664
665void
666tr_peerMsgsCancel( tr_peermsgs * msgs,
667                   uint32_t      pieceIndex,
668                   uint32_t      offset,
669                   uint32_t      length )
670{
671    struct peer_request *req, tmp;
672
673    assert( msgs != NULL );
674    assert( length > 0 );
675
676    /* have we asked the peer for this piece? */
677    tmp.index = pieceIndex;
678    tmp.offset = offset;
679    tmp.length = length;
680
681    /* if it's only in the queue and hasn't been sent yet, free it */
682    if(( req = tr_list_remove( &msgs->clientWillAskFor, &tmp, compareRequest ) ))
683    {
684        fireCancelledReq( msgs, req );
685        tr_free( req );
686    }
687
688    /* if it's already been sent, send a cancel message too */
689    if(( req = tr_list_remove( &msgs->clientAskedFor, &tmp, compareRequest ) ))
690    {
691        protocolSendCancel( msgs, req );
692        fireCancelledReq( msgs, req );
693        tr_free( req );
694    }
695}
696
697/**
698***
699**/
700
701static void
702sendLtepHandshake( tr_peermsgs * msgs )
703{
704    benc_val_t val, *m;
705    char * buf;
706    int len;
707    int pex;
708    const char * v = TR_NAME " " USERAGENT_PREFIX;
709    const int port = tr_getPublicPort( msgs->handle );
710    struct evbuffer * outbuf;
711
712    if( msgs->clientSentLtepHandshake )
713        return;
714
715    outbuf = evbuffer_new( );
716    dbgmsg( msgs, "sending an ltep handshake" );
717    msgs->clientSentLtepHandshake = 1;
718
719    /* decide if we want to advertise pex support */
720    if( !tr_torrentIsPexEnabled( msgs->torrent ) )
721        pex = 0;
722    else if( msgs->peerSentLtepHandshake )
723        pex = msgs->peerSupportsPex ? 1 : 0;
724    else
725        pex = 1;
726
727    tr_bencInit( &val, TYPE_DICT );
728    tr_bencDictReserve( &val, 4 );
729    tr_bencInitInt( tr_bencDictAdd( &val, "e" ), 1 );
730    m  = tr_bencDictAdd( &val, "m" );
731    tr_bencInit( m, TYPE_DICT );
732    if( pex ) {
733        tr_bencDictReserve( m, 1 );
734        tr_bencInitInt( tr_bencDictAdd( m, "ut_pex" ), OUR_LTEP_PEX );
735    }
736    if( port > 0 )
737        tr_bencInitInt( tr_bencDictAdd( &val, "p" ), port );
738    tr_bencInitStr( tr_bencDictAdd( &val, "v" ), v, 0, 1 );
739    buf = tr_bencSave( &val, &len );
740
741    tr_peerIoWriteUint32( msgs->io, outbuf, 2*sizeof(uint8_t) + len );
742    tr_peerIoWriteUint8 ( msgs->io, outbuf, BT_LTEP );
743    tr_peerIoWriteUint8 ( msgs->io, outbuf, LTEP_HANDSHAKE );
744    tr_peerIoWriteBytes ( msgs->io, outbuf, buf, len );
745
746    tr_peerIoWriteBuf( msgs->io, outbuf );
747
748#if 0
749    dbgmsg( msgs, "here is the ltep handshake we sent:" );
750    tr_bencPrint( &val );
751    dbgmsg( msgs, "here is the ltep handshake we read [%s]:", tr_bencSave( &val, NULL ) );
752#endif
753
754    /* cleanup */
755    tr_bencFree( &val );
756    tr_free( buf );
757    evbuffer_free( outbuf );
758}
759
760static void
761parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
762{
763    benc_val_t val, * sub;
764    uint8_t * tmp = tr_new( uint8_t, len );
765
766    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
767    msgs->peerSentLtepHandshake = 1;
768
769    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
770        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
771        tr_free( tmp );
772        return;
773    }
774
775#if 0
776    dbgmsg( msgs, "here is the ltep handshake we read:" );
777    tr_bencPrint( &val );
778    dbgmsg( msgs, "here is the ltep handshake we read [%s]:", tr_bencSave( &val, NULL ) );
779#endif
780
781    /* does the peer prefer encrypted connections? */
782    sub = tr_bencDictFind( &val, "e" );
783    if( tr_bencIsInt( sub ) )
784        msgs->info->encryption_preference = sub->val.i
785                                      ? ENCRYPTION_PREFERENCE_YES
786                                      : ENCRYPTION_PREFERENCE_NO;
787
788    /* check supported messages for utorrent pex */
789    sub = tr_bencDictFind( &val, "m" );
790    if( tr_bencIsDict( sub ) ) {
791        sub = tr_bencDictFind( sub, "ut_pex" );
792        if( tr_bencIsInt( sub ) ) {
793            msgs->peerSupportsPex = 1;
794            msgs->ut_pex_id = (uint8_t) sub->val.i;
795            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
796        }
797    }
798
799    /* get peer's listening port */
800    sub = tr_bencDictFind( &val, "p" );
801    if( tr_bencIsInt( sub ) ) {
802        msgs->info->port = htons( (uint16_t)sub->val.i );
803        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
804    }
805
806    tr_bencFree( &val );
807    tr_free( tmp );
808}
809
810static void
811parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
812{
813    benc_val_t val, * sub;
814    uint8_t * tmp;
815
816    if( !tr_torrentIsPexEnabled( msgs->torrent ) ) /* no sharing! */
817        return;
818
819    tmp = tr_new( uint8_t, msglen );
820    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
821
822    if( tr_bencLoad( tmp, msglen, &val, NULL ) || !tr_bencIsDict( &val ) ) {
823        dbgmsg( msgs, "GET can't read extended-pex dictionary" );
824        tr_free( tmp );
825        return;
826    }
827
828    sub = tr_bencDictFind( &val, "added" );
829    if( tr_bencIsStr(sub) && ((sub->val.s.i % 6) == 0)) {
830        const int n = sub->val.s.i / 6 ;
831        dbgmsg( msgs, "got %d peers from uT pex", n );
832        tr_peerMgrAddPeers( msgs->handle->peerMgr,
833                            msgs->torrent->info.hash,
834                            TR_PEER_FROM_PEX,
835                            (uint8_t*)sub->val.s.s, n );
836    }
837
838    tr_bencFree( &val );
839    tr_free( tmp );
840}
841
842static void
843sendPex( tr_peermsgs * msgs );
844
845static void
846parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
847{
848    uint8_t ltep_msgid;
849
850    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
851    msglen--;
852
853    if( ltep_msgid == LTEP_HANDSHAKE )
854    {
855        dbgmsg( msgs, "got ltep handshake" );
856        parseLtepHandshake( msgs, msglen, inbuf );
857        if( tr_peerIoSupportsLTEP( msgs->io ) )
858        {
859            sendLtepHandshake( msgs );
860            sendPex( msgs );
861        }
862    }
863    else if( ltep_msgid == msgs->ut_pex_id )
864    {
865        dbgmsg( msgs, "got ut pex" );
866        msgs->peerSupportsPex = 1;
867        parseUtPex( msgs, msglen, inbuf );
868    }
869    else
870    {
871        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
872        evbuffer_drain( inbuf, msglen );
873    }
874}
875
876static int
877readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
878{
879    uint32_t len;
880
881    if( inlen < sizeof(len) )
882        return READ_MORE;
883
884    tr_peerIoReadUint32( msgs->io, inbuf, &len );
885
886    if( len == 0 ) /* peer sent us a keepalive message */
887        dbgmsg( msgs, "got KeepAlive" );
888    else {
889        msgs->incoming.length = len;
890        msgs->state = AWAITING_BT_ID;
891    }
892
893    return READ_AGAIN;
894}
895
896static int
897readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen );
898
899static int
900readBtId( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
901{
902    uint8_t id;
903
904    if( inlen < sizeof(uint8_t) )
905        return READ_MORE;
906
907    tr_peerIoReadUint8( msgs->io, inbuf, &id );
908    msgs->incoming.id = id;
909
910    if( id==BT_PIECE )
911    {
912        msgs->state = AWAITING_BT_PIECE;
913        return READ_AGAIN;
914    }
915    else if( msgs->incoming.length != 1 )
916    {
917        msgs->state = AWAITING_BT_MESSAGE;
918        return READ_AGAIN;
919    }
920    else return readBtMessage( msgs, inbuf, inlen-1 );
921}
922
923static void
924updatePeerProgress( tr_peermsgs * msgs )
925{
926    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
927    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
928    updateInterest( msgs );
929    firePeerProgress( msgs );
930}
931
932static int
933clientCanSendFastBlock( const tr_peermsgs * msgs UNUSED )
934{
935    /* We can't send a fast piece if a peer has more than MAX_FAST_ALLOWED_THRESHOLD pieces */
936    if( tr_bitfieldCountTrueBits( msgs->info->have ) > MAX_FAST_ALLOWED_THRESHOLD )
937        return FALSE;
938   
939    /* ...or if we don't have ourself enough pieces */
940    if( tr_bitfieldCountTrueBits( tr_cpPieceBitfield( msgs->torrent->completion ) ) < MAX_FAST_ALLOWED_THRESHOLD )
941        return FALSE;
942
943    /* Maybe a bandwidth limit ? */
944    return TRUE;
945}
946
947static void
948peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
949{
950    const int reqIsValid = requestIsValid( msgs, req );
951    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
952    const int peerIsChoked = msgs->info->peerIsChoked;
953    const int peerIsFast = tr_peerIoSupportsFEXT( msgs->io );
954    const int pieceIsFast = reqIsValid && tr_bitfieldHas( msgs->peerAllowedPieces, req->index );
955    const int canSendFast = clientCanSendFastBlock( msgs );
956
957    if( !reqIsValid ) /* bad request */
958    {
959        dbgmsg( msgs, "rejecting an invalid request." );
960        sendFastReject( msgs, req->index, req->offset, req->length );
961    }
962    else if( !clientHasPiece ) /* we don't have it */
963    {
964        dbgmsg( msgs, "rejecting request for a piece we don't have." );
965        sendFastReject( msgs, req->index, req->offset, req->length );
966    }
967    else if( peerIsChoked && !peerIsFast ) /* doesn't he know he's choked? */
968    {
969        tr_peerMsgsSetChoke( msgs, 1 );
970        sendFastReject( msgs, req->index, req->offset, req->length );
971    }
972    else if( peerIsChoked && peerIsFast && ( !pieceIsFast || !canSendFast ) )
973    {
974        sendFastReject( msgs, req->index, req->offset, req->length );
975    }
976    else /* YAY */
977    {
978        struct peer_request * tmp = tr_new( struct peer_request, 1 );
979        *tmp = *req;
980        if( peerIsFast && pieceIsFast )
981            tr_list_append( &msgs->peerAskedForFast, tmp );
982        else
983            tr_list_append( &msgs->peerAskedFor, tmp );
984    }
985}
986
987static int
988messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
989{
990    switch( id )
991    {
992        case BT_CHOKE:
993        case BT_UNCHOKE:
994        case BT_INTERESTED:
995        case BT_NOT_INTERESTED:
996        case BT_HAVE_ALL:
997        case BT_HAVE_NONE:
998            return len==1;
999
1000        case BT_HAVE:
1001        case BT_SUGGEST:
1002        case BT_ALLOWED_FAST:
1003            return len==5;
1004
1005        case BT_BITFIELD:
1006            return len == (msg->torrent->info.pieceCount+7u)/8u + 1u;
1007       
1008        case BT_REQUEST:
1009        case BT_CANCEL:
1010        case BT_REJECT:
1011            return len==13;
1012
1013        case BT_PIECE:
1014            return len>9 && len<=16393;
1015
1016        case BT_PORT:
1017            return len==3;
1018
1019        case BT_LTEP:
1020            return len >= 2;
1021
1022        default:
1023            return FALSE;
1024    }
1025}
1026
1027static int
1028clientGotBlock( tr_peermsgs * msgs, const uint8_t * block, const struct peer_request * req );
1029
1030static void
1031clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1032{
1033    tr_torrent * tor = msgs->torrent;
1034    tor->activityDate = tr_date( );
1035    tor->downloadedCur += byteCount;
1036    msgs->info->pieceDataActivityDate = time( NULL );
1037    msgs->info->credit += (int)(byteCount * SWIFT_REPAYMENT_RATIO);
1038    tr_rcTransferred( msgs->info->rcToClient, byteCount );
1039    tr_rcTransferred( tor->download, byteCount );
1040    tr_rcTransferred( tor->handle->download, byteCount );
1041}
1042
1043
1044static int
1045readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1046{
1047    struct peer_request * req = &msgs->incoming.blockReq;
1048    dbgmsg( msgs, "In readBtPiece" );
1049
1050    if( !req->length )
1051    {
1052        if( inlen < 8 )
1053            return READ_MORE;
1054
1055        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
1056        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
1057        req->length = msgs->incoming.length - 9;
1058        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1059        return READ_AGAIN;
1060    }
1061    else
1062    {
1063        int err;
1064
1065        /* read in another chunk of data */
1066        const size_t nLeft = req->length - EVBUFFER_LENGTH(msgs->incoming.block);
1067        size_t n = MIN( nLeft, inlen );
1068        uint8_t * buf = tr_new( uint8_t, n );
1069        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
1070        evbuffer_add( msgs->incoming.block, buf, n );
1071        clientGotBytes( msgs, n );
1072        tr_free( buf );
1073        dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
1074               (int)n, req->index, req->offset, req->length,
1075               (int)( req->length - EVBUFFER_LENGTH(msgs->incoming.block) ) );
1076        if( EVBUFFER_LENGTH(msgs->incoming.block) < req->length )
1077            return READ_MORE;
1078
1079        /* we've got the whole block ... process it */
1080        err = clientGotBlock( msgs, EVBUFFER_DATA(msgs->incoming.block), req );
1081
1082        /* cleanup */
1083        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH(msgs->incoming.block) );
1084        req->length = 0;
1085        msgs->state = AWAITING_BT_LENGTH;
1086        if( !err )
1087            return READ_AGAIN;
1088        else {
1089            fireGotAssertError( msgs );
1090            return READ_DONE;
1091        }
1092    }
1093}
1094
1095static int
1096readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1097{
1098    uint32_t ui32;
1099    uint32_t msglen = msgs->incoming.length;
1100    const uint8_t id = msgs->incoming.id;
1101    const size_t startBufLen = EVBUFFER_LENGTH( inbuf );
1102
1103    --msglen; // id length
1104
1105    if( inlen < msglen )
1106        return READ_MORE;
1107
1108    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id, (int)msglen, (int)inlen );
1109
1110    if( !messageLengthIsCorrect( msgs, id, msglen+1 ) )
1111    {
1112        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1113        fireGotError( msgs );
1114        return READ_DONE;
1115    }
1116
1117    switch( id )
1118    {
1119        case BT_CHOKE:
1120            dbgmsg( msgs, "got Choke" );
1121            msgs->info->clientIsChoked = 1;
1122            cancelAllRequestsToPeer( msgs );
1123            cancelAllRequestsToClientExceptFast( msgs );
1124            break;
1125
1126        case BT_UNCHOKE:
1127            dbgmsg( msgs, "got Unchoke" );
1128            msgs->info->clientIsChoked = 0;
1129            fireNeedReq( msgs );
1130            break;
1131
1132        case BT_INTERESTED:
1133            dbgmsg( msgs, "got Interested" );
1134            msgs->info->peerIsInterested = 1;
1135            tr_peerMsgsSetChoke( msgs, 0 );
1136            break;
1137
1138        case BT_NOT_INTERESTED:
1139            dbgmsg( msgs, "got Not Interested" );
1140            msgs->info->peerIsInterested = 0;
1141            break;
1142           
1143        case BT_HAVE:
1144            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1145            dbgmsg( msgs, "got Have: %u", ui32 );
1146            tr_bitfieldAdd( msgs->info->have, ui32 );
1147            /* If this is a fast-allowed piece for this peer, mark it as normal now */
1148            if( msgs->clientAllowedPieces != NULL && tr_bitfieldHas( msgs->clientAllowedPieces, ui32 ) )
1149                tr_bitfieldRem( msgs->clientAllowedPieces, ui32 );
1150            updatePeerProgress( msgs );
1151            tr_rcTransferred( msgs->torrent->swarmspeed, msgs->torrent->info.pieceSize );
1152            break;
1153
1154        case BT_BITFIELD: {
1155            const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
1156            dbgmsg( msgs, "got a bitfield" );
1157            msgs->peerSentBitfield = 1;
1158            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
1159            updatePeerProgress( msgs );
1160            tr_peerMsgsSetChoke( msgs, !clientIsSeed || (msgs->info->progress<1.0) );
1161            fireNeedReq( msgs );
1162            break;
1163        }
1164
1165        case BT_REQUEST: {
1166            struct peer_request req;
1167            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1168            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1169            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1170            dbgmsg( msgs, "got Request: %u:%u->%u", req.index, req.offset, req.length );
1171            peerMadeRequest( msgs, &req );
1172            break;
1173        }
1174
1175        case BT_CANCEL: {
1176            struct peer_request req;
1177            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1178            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1179            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1180            dbgmsg( msgs, "got a Cancel %u:%u->%u", req.index, req.offset, req.length );
1181            tr_free( tr_list_remove( &msgs->peerAskedForFast, &req, compareRequest ) );
1182            tr_free( tr_list_remove( &msgs->peerAskedFor, &req, compareRequest ) );
1183            break;
1184        }
1185
1186        case BT_PIECE:
1187            assert( 0 ); /* should be handled elsewhere! */
1188            break;
1189       
1190        case BT_PORT:
1191            dbgmsg( msgs, "Got a BT_PORT" );
1192            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1193            break;
1194           
1195        case BT_SUGGEST: {
1196            dbgmsg( msgs, "Got a BT_SUGGEST" );
1197            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1198            const tr_bitfield * bt = tr_cpPieceBitfield( msgs->torrent->completion );
1199            if( tr_bitfieldHas( bt, ui32 ) )
1200                tr_bitfieldAdd( msgs->clientSuggestedPieces, ui32 );
1201            break;
1202        }
1203           
1204        case BT_HAVE_ALL:
1205            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1206            tr_bitfieldAddRange( msgs->info->have, 0, msgs->torrent->info.pieceCount );
1207            updatePeerProgress( msgs );
1208            break;
1209       
1210       
1211        case BT_HAVE_NONE:
1212            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1213            tr_bitfieldClear( msgs->info->have );
1214            updatePeerProgress( msgs );
1215            break;
1216       
1217        case BT_REJECT: {
1218            struct peer_request req;
1219            dbgmsg( msgs, "Got a BT_REJECT" );
1220            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1221            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1222            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1223            tr_free( tr_list_remove( &msgs->clientAskedFor, &req, compareRequest ) );
1224            break;
1225        }
1226
1227        case BT_ALLOWED_FAST: {
1228            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1229            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1230            tr_bitfieldAdd( msgs->clientAllowedPieces, ui32 );
1231            break;
1232        }
1233
1234        case BT_LTEP:
1235            dbgmsg( msgs, "Got a BT_LTEP" );
1236            parseLtep( msgs, msglen, inbuf );
1237            break;
1238
1239        default:
1240            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1241            tr_peerIoDrain( msgs->io, inbuf, msglen );
1242            break;
1243    }
1244
1245    assert( msglen + 1 == msgs->incoming.length );
1246    assert( EVBUFFER_LENGTH(inbuf) == startBufLen - msglen );
1247
1248    msgs->state = AWAITING_BT_LENGTH;
1249    return READ_AGAIN;
1250}
1251
1252static void
1253peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1254{
1255    tr_torrent * tor = msgs->torrent;
1256    tor->activityDate = tr_date( );
1257    tor->uploadedCur += byteCount;
1258    msgs->info->pieceDataActivityDate = time( NULL );
1259    msgs->info->credit -= byteCount;
1260    tr_rcTransferred( msgs->info->rcToPeer, byteCount );
1261    tr_rcTransferred( tor->upload, byteCount );
1262    tr_rcTransferred( tor->handle->upload, byteCount );
1263}
1264
1265static size_t
1266getDownloadMax( const tr_peermsgs * msgs )
1267{
1268    static const size_t maxval = ~0;
1269    const tr_torrent * tor = msgs->torrent;
1270
1271    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1272        return tor->handle->useDownloadLimit
1273            ? tr_rcBytesLeft( tor->handle->download ) : maxval;
1274
1275    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
1276        return tr_rcBytesLeft( tor->download );
1277
1278    return maxval;
1279}
1280
1281static void
1282reassignBytesToCorrupt( tr_peermsgs * msgs, uint32_t byteCount )
1283{
1284    tr_torrent * tor = msgs->torrent;
1285
1286    /* increment the `corrupt' field */
1287    tor->corruptCur += byteCount;
1288
1289    /* decrement the `downloaded' field */
1290    if( tor->downloadedCur >= byteCount )
1291        tor->downloadedCur -= byteCount;
1292    else
1293        tor->downloadedCur = 0;
1294}
1295
1296
1297static void
1298gotBadPiece( tr_peermsgs * msgs, uint32_t pieceIndex )
1299{
1300    const uint32_t byteCount =
1301        tr_torPieceCountBytes( msgs->torrent, (int)pieceIndex );
1302    reassignBytesToCorrupt( msgs, byteCount );
1303}
1304
1305static void
1306clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1307{
1308    reassignBytesToCorrupt( msgs, req->length );
1309}
1310
1311static void
1312addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1313{
1314    if( !msgs->info->blame )
1315         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1316    tr_bitfieldAdd( msgs->info->blame, index );
1317}
1318
1319static int
1320clientGotBlock( tr_peermsgs                * msgs,
1321                const uint8_t              * data,
1322                const struct peer_request  * req )
1323{
1324    int i;
1325    tr_torrent * tor = msgs->torrent;
1326    const int block = _tr_block( tor, req->index, req->offset );
1327    struct peer_request *myreq;
1328
1329    assert( msgs != NULL );
1330    assert( req != NULL );
1331
1332    if( req->length != (uint32_t)tr_torBlockCountBytes( msgs->torrent, block ) )
1333    {
1334        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1335                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1336        return TR_ERROR_ASSERT;
1337    }
1338
1339    /* save the block */
1340    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1341
1342    /**
1343    *** Remove the block from our `we asked for this' list
1344    **/
1345
1346    myreq = tr_list_remove( &msgs->clientAskedFor, req, compareRequest );
1347    if( myreq == NULL ) {
1348        clientGotUnwantedBlock( msgs, req );
1349        dbgmsg( msgs, "we didn't ask for this message..." );
1350        return 0;
1351    }
1352
1353    dbgmsg( msgs, "got block %u:%u->%u (turnaround time %d secs)", 
1354                  myreq->index, myreq->offset, myreq->length,
1355                  (int)(time(NULL) - myreq->time_requested) );
1356    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1357                  tr_list_size(msgs->clientAskedFor));
1358
1359    tr_free( myreq );
1360    myreq = NULL;
1361
1362    /**
1363    *** Error checks
1364    **/
1365
1366    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1367        dbgmsg( msgs, "we have this block already..." );
1368        clientGotUnwantedBlock( msgs, req );
1369        return 0;
1370    }
1371
1372    /**
1373    ***  Save the block
1374    **/
1375
1376    msgs->info->peerSentPieceDataAt = time( NULL );
1377    i = tr_ioWrite( tor, req->index, req->offset, req->length, data );
1378    if( i )
1379        return 0;
1380
1381    tr_cpBlockAdd( tor->completion, block );
1382
1383    addPeerToBlamefield( msgs, req->index );
1384
1385    fireGotBlock( msgs, req );
1386
1387    /**
1388    ***  Handle if this was the last block in the piece
1389    **/
1390
1391    if( tr_cpPieceIsComplete( tor->completion, req->index ) )
1392    {
1393        if( tr_ioHash( tor, req->index ) )
1394        {
1395            gotBadPiece( msgs, req->index );
1396            return 0;
1397        }
1398
1399        fireClientHave( msgs, req->index );
1400    }
1401
1402    return 0;
1403}
1404
1405static ReadState
1406canRead( struct bufferevent * evin, void * vmsgs )
1407{
1408    ReadState ret;
1409    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1410    struct evbuffer * inbuf = EVBUFFER_INPUT ( evin );
1411    const size_t buflen = EVBUFFER_LENGTH( inbuf );
1412    const size_t n = MIN( buflen, getDownloadMax( msgs ) );
1413
1414    if( !n )
1415    {
1416        ret = READ_DONE;
1417    }
1418    else switch( msgs->state )
1419    {
1420        case AWAITING_BT_LENGTH:  ret = readBtLength ( msgs, inbuf, n ); break;
1421        case AWAITING_BT_ID:      ret = readBtId     ( msgs, inbuf, n ); break;
1422        case AWAITING_BT_MESSAGE: ret = readBtMessage( msgs, inbuf, n ); break;
1423        case AWAITING_BT_PIECE:   ret = readBtPiece  ( msgs, inbuf, n ); break;
1424        default: assert( 0 );
1425    }
1426
1427    return ret;
1428}
1429
1430static void
1431sendKeepalive( tr_peermsgs * msgs )
1432{
1433    dbgmsg( msgs, "sending a keepalive message" );
1434    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1435}
1436
1437/**
1438***
1439**/
1440
1441static int
1442canWrite( const tr_peermsgs * msgs )
1443{
1444    /* don't let our outbuffer get too large */
1445    if( tr_peerIoWriteBytesWaiting( msgs->io ) > 4096 )
1446        return FALSE;
1447
1448    /* SWIFT */
1449    if( SWIFT_ENABLED && !tr_torrentIsSeed( msgs->torrent )
1450                      && ( msgs->info->credit < 0 ) )
1451        return FALSE;
1452
1453    return TRUE;
1454}
1455
1456static size_t
1457getUploadMax( const tr_peermsgs * msgs )
1458{
1459    static const size_t maxval = ~0;
1460    const tr_torrent * tor = msgs->torrent;
1461
1462    if( !canWrite( msgs ) )
1463        return 0;
1464
1465    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1466        return tor->handle->useUploadLimit
1467            ? tr_rcBytesLeft( tor->handle->upload ) : maxval;
1468
1469    if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
1470        return tr_rcBytesLeft( tor->upload );
1471
1472    return maxval;
1473}
1474
1475static int
1476ratePulse( void * vmsgs )
1477{
1478    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1479    msgs->info->rateToClient = tr_rcRate( msgs->info->rcToClient );
1480    msgs->info->rateToPeer = tr_rcRate( msgs->info->rcToPeer );
1481    msgs->maxActiveRequests = MIN( 8 + (int)(msgs->info->rateToClient/5), 100 );
1482    msgs->minActiveRequests = msgs->maxActiveRequests / 3;
1483    return TRUE;
1484}
1485
1486static struct peer_request*
1487popNextRequest( tr_peermsgs * msgs )
1488{
1489    struct peer_request * ret;
1490    ret = tr_list_pop_front( &msgs->peerAskedForFast );
1491    if( !ret )
1492        ret = tr_list_pop_front( &msgs->peerAskedFor);
1493    return ret;
1494}
1495
1496static void
1497updatePeerStatus( tr_peermsgs * msgs )
1498{
1499    tr_peer * peer = msgs->info;
1500
1501    if( !msgs->peerSentBitfield )
1502        peer->status = TR_PEER_STATUS_HANDSHAKE;
1503
1504    else if( ( time(NULL) - peer->pieceDataActivityDate ) < 3 )
1505        peer->status = peer->clientIsChoked
1506                       ? TR_PEER_STATUS_ACTIVE_AND_CHOKED
1507                       : TR_PEER_STATUS_ACTIVE;
1508
1509    else if( peer->peerIsChoked )
1510        peer->status = TR_PEER_STATUS_PEER_IS_CHOKED;
1511
1512    else if( peer->clientIsChoked )
1513        peer->status = peer->clientIsInterested
1514                       ? TR_PEER_STATUS_CLIENT_IS_INTERESTED
1515                       : TR_PEER_STATUS_CLIENT_IS_CHOKED;
1516
1517    else if( msgs->clientAskedFor != NULL )
1518        peer->status = TR_PEER_STATUS_REQUEST_SENT;
1519
1520    else
1521        peer->status = TR_PEER_STATUS_READY;
1522}
1523
1524static int
1525pulse( void * vmsgs )
1526{
1527    const time_t now = time( NULL );
1528    tr_peermsgs * msgs = vmsgs;
1529    struct peer_request * r;
1530    size_t len;
1531
1532    tr_peerIoTryRead( msgs->io );
1533    pumpRequestQueue( msgs );
1534    updatePeerStatus( msgs );
1535
1536    if( !canWrite( msgs ) )
1537    {
1538    }
1539    else if(( len = EVBUFFER_LENGTH( msgs->outBlock ) ))
1540    {
1541        const size_t uploadMax = getUploadMax( msgs );
1542        const size_t outlen = MIN( len, uploadMax );
1543        tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen );
1544        evbuffer_drain( msgs->outBlock, outlen );
1545        msgs->clientSentAnythingAt = now;
1546        peerGotBytes( msgs, outlen );
1547        len -= outlen;
1548        dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen, (int)len );
1549        fflush( stdout );
1550    }
1551    else if(( len = EVBUFFER_LENGTH( msgs->outMessages ) ))
1552    {
1553        tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1554        msgs->clientSentAnythingAt = now;
1555    }
1556    else if( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
1557    {
1558        sendKeepalive( msgs );
1559    }
1560
1561    if( !EVBUFFER_LENGTH( msgs->outBlock )
1562        && (( r = popNextRequest( msgs )))
1563        && requestIsValid( msgs, r )
1564        && tr_cpPieceIsComplete( msgs->torrent->completion, r->index ) )
1565    {
1566        uint8_t * buf = tr_new( uint8_t, r->length );
1567
1568        if( !tr_ioRead( msgs->torrent, r->index, r->offset, r->length, buf ) )
1569        {
1570            tr_peerIo * io = msgs->io;
1571            struct evbuffer * out = msgs->outBlock;
1572
1573            dbgmsg( msgs, "sending block %u:%u->%u", r->index, r->offset, r->length );
1574            tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 2*sizeof(uint32_t) + r->length );
1575            tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1576            tr_peerIoWriteUint32( io, out, r->index );
1577            tr_peerIoWriteUint32( io, out, r->offset );
1578            tr_peerIoWriteBytes ( io, out, buf, r->length );
1579        }
1580
1581        tr_free( buf );
1582        tr_free( r );
1583
1584        pulse( msgs ); /* start sending it right away */
1585    }
1586
1587    return TRUE; /* loop forever */
1588}
1589
1590static void
1591gotError( struct bufferevent * evbuf UNUSED, short what, void * vmsgs )
1592{
1593    if( what & EVBUFFER_TIMEOUT )
1594        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1595
1596    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) ) {
1597        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1598                what, errno, strerror(errno) );
1599        fireGotError( vmsgs );
1600    }
1601}
1602
1603static void
1604sendBitfield( tr_peermsgs * msgs )
1605{
1606    const tr_bitfield * bitfield = tr_cpPieceBitfield( msgs->torrent->completion );
1607    struct evbuffer * out = msgs->outMessages;
1608
1609    dbgmsg( msgs, "sending peer a bitfield message" );
1610    tr_peerIoWriteUint32( msgs->io, out, sizeof(uint8_t) + bitfield->len );
1611    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1612    tr_peerIoWriteBytes ( msgs->io, out, bitfield->bits, bitfield->len );
1613}
1614
1615/**
1616***
1617**/
1618
1619/* some peers give us error messages if we send
1620   more than this many peers in a single pex message */
1621#define MAX_PEX_DIFFS 200
1622
1623typedef struct
1624{
1625    tr_pex * added;
1626    tr_pex * dropped;
1627    tr_pex * elements;
1628    int addedCount;
1629    int droppedCount;
1630    int elementCount;
1631    int diffCount;
1632}
1633PexDiffs;
1634
1635static void
1636pexAddedCb( void * vpex, void * userData )
1637{
1638    PexDiffs * diffs = (PexDiffs *) userData;
1639    tr_pex * pex = (tr_pex *) vpex;
1640    if( diffs->diffCount < MAX_PEX_DIFFS )
1641    {
1642        diffs->diffCount++;
1643        diffs->added[diffs->addedCount++] = *pex;
1644        diffs->elements[diffs->elementCount++] = *pex;
1645    }
1646}
1647
1648static void
1649pexRemovedCb( void * vpex, void * userData )
1650{
1651    PexDiffs * diffs = (PexDiffs *) userData;
1652    tr_pex * pex = (tr_pex *) vpex;
1653    if( diffs->diffCount < MAX_PEX_DIFFS )
1654    {
1655        diffs->diffCount++;
1656        diffs->dropped[diffs->droppedCount++] = *pex;
1657    }
1658}
1659
1660static void
1661pexElementCb( void * vpex, void * userData )
1662{
1663    PexDiffs * diffs = (PexDiffs *) userData;
1664    tr_pex * pex = (tr_pex *) vpex;
1665    if( diffs->diffCount < MAX_PEX_DIFFS )
1666    {
1667        diffs->diffCount++;
1668        diffs->elements[diffs->elementCount++] = *pex;
1669    }
1670}
1671
1672static void
1673sendPex( tr_peermsgs * msgs )
1674{
1675    if( msgs->peerSupportsPex && tr_torrentIsPexEnabled( msgs->torrent ) )
1676    {
1677        int i;
1678        tr_pex * newPex = NULL;
1679        const int newCount = tr_peerMgrGetPeers( msgs->handle->peerMgr, msgs->torrent->info.hash, &newPex );
1680        PexDiffs diffs;
1681        benc_val_t val, *added, *dropped, *flags;
1682        uint8_t *tmp, *walk;
1683        char * benc;
1684        int bencLen;
1685
1686        /* build the diffs */
1687        diffs.added = tr_new( tr_pex, newCount );
1688        diffs.addedCount = 0;
1689        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1690        diffs.droppedCount = 0;
1691        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1692        diffs.elementCount = 0;
1693        diffs.diffCount = 0;
1694        tr_set_compare( msgs->pex, msgs->pexCount,
1695                        newPex, newCount,
1696                        tr_pexCompare, sizeof(tr_pex),
1697                        pexRemovedCb, pexAddedCb, pexElementCb, &diffs );
1698        dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1699
1700        /* update peer */
1701        tr_free( msgs->pex );
1702        msgs->pex = diffs.elements;
1703        msgs->pexCount = diffs.elementCount;
1704
1705        /* build the pex payload */
1706        tr_bencInit( &val, TYPE_DICT );
1707        tr_bencDictReserve( &val, 3 );
1708
1709        /* "added" */
1710        added = tr_bencDictAdd( &val, "added" );
1711        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1712        for( i=0; i<diffs.addedCount; ++i ) {
1713            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1714            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1715        }
1716        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1717        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
1718
1719        /* "added.f" */
1720        flags = tr_bencDictAdd( &val, "added.f" );
1721        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1722        for( i=0; i<diffs.addedCount; ++i )
1723            *walk++ = diffs.added[i].flags;
1724        assert( ( walk - tmp ) == diffs.addedCount );
1725        tr_bencInitStr( flags, tmp, walk-tmp, FALSE );
1726
1727        /* "dropped" */
1728        dropped = tr_bencDictAdd( &val, "dropped" );
1729        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1730        for( i=0; i<diffs.droppedCount; ++i ) {
1731            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1732            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1733        }
1734        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1735        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
1736
1737        /* write the pex message */
1738        benc = tr_bencSave( &val, &bencLen );
1739        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 2*sizeof(uint8_t) + bencLen );
1740        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_LTEP );
1741        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, OUR_LTEP_PEX );
1742        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, benc, bencLen );
1743
1744        /* cleanup */
1745        tr_free( benc );
1746        tr_bencFree( &val );
1747        tr_free( diffs.added );
1748        tr_free( diffs.dropped );
1749        tr_free( newPex );
1750
1751        msgs->clientSentPexAt = time( NULL );
1752    }
1753}
1754
1755static int
1756pexPulse( void * vpeer )
1757{
1758    sendPex( vpeer );
1759    return TRUE;
1760}
1761
1762/**
1763***
1764**/
1765
1766tr_peermsgs*
1767tr_peerMsgsNew( struct tr_torrent * torrent,
1768                struct tr_peer    * info,
1769                tr_delivery_func    func,
1770                void              * userData,
1771                tr_publisher_tag  * setme )
1772{
1773    tr_peermsgs * m;
1774
1775    assert( info != NULL );
1776    assert( info->io != NULL );
1777
1778    m = tr_new0( tr_peermsgs, 1 );
1779    m->publisher = tr_publisherNew( );
1780    m->info = info;
1781    m->handle = torrent->handle;
1782    m->torrent = torrent;
1783    m->io = info->io;
1784    m->info->status = TR_PEER_STATUS_HANDSHAKE;
1785    m->info->clientIsChoked = 1;
1786    m->info->peerIsChoked = 1;
1787    m->info->clientIsInterested = 0;
1788    m->info->peerIsInterested = 0;
1789    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1790    m->state = AWAITING_BT_LENGTH;
1791    m->pulseTimer = tr_timerNew( m->handle, pulse, m, PEER_PULSE_INTERVAL );
1792    m->rateTimer = tr_timerNew( m->handle, ratePulse, m, RATE_PULSE_INTERVAL );
1793    m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL );
1794    m->outMessages = evbuffer_new( );
1795    m->incoming.block = evbuffer_new( );
1796    m->outBlock = evbuffer_new( );
1797    m->peerAllowedPieces = NULL;
1798    m->clientAllowedPieces = NULL;
1799    m->clientSuggestedPieces = NULL;
1800    *setme = tr_publisherSubscribe( m->publisher, func, userData );
1801   
1802    if ( tr_peerIoSupportsFEXT( m->io ) )
1803    {
1804        /* This peer is fastpeer-enabled, generate its allowed set
1805         * (before registering our callbacks) */
1806        if ( !m->peerAllowedPieces ) {
1807            const struct in_addr *peerAddr = tr_peerIoGetAddress( m->io, NULL );
1808           
1809            m->peerAllowedPieces = tr_peerMgrGenerateAllowedSet( MAX_FAST_ALLOWED_COUNT,
1810                                                                 m->torrent->info.pieceCount,
1811                                                                 m->torrent->info.hash,
1812                                                                 peerAddr );
1813        }
1814        m->clientAllowedPieces = tr_bitfieldNew( m->torrent->info.pieceCount );
1815       
1816        m->clientSuggestedPieces = tr_bitfieldNew( m->torrent->info.pieceCount );
1817    }
1818   
1819    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of inactivity */
1820    tr_peerIoSetIOFuncs( m->io, canRead, gotError, m );
1821    ratePulse( m );
1822
1823    if ( tr_peerIoSupportsLTEP( m->io ) )
1824        sendLtepHandshake( m );
1825
1826    if ( !tr_peerIoSupportsFEXT( m->io ) )
1827        sendBitfield( m );
1828    else {
1829        /* This peer is fastpeer-enabled, send it have-all or have-none if appropriate */
1830        uint32_t peerProgress;
1831        float completion = tr_cpPercentComplete( m->torrent->completion );
1832        if ( completion == 0.0f ) {
1833            sendFastHave( m, 0 );
1834        } else if ( completion == 1.0f ) {
1835            sendFastHave( m, 1 );
1836        } else {
1837            sendBitfield( m );
1838        }
1839        peerProgress = m->torrent->info.pieceCount * m->info->progress;
1840       
1841        if ( peerProgress < MAX_FAST_ALLOWED_COUNT )
1842            sendFastAllowedSet( m );
1843    }
1844
1845    return m;
1846}
1847
1848void
1849tr_peerMsgsFree( tr_peermsgs* msgs )
1850{
1851    if( msgs != NULL )
1852    {
1853        tr_timerFree( &msgs->pulseTimer );
1854        tr_timerFree( &msgs->rateTimer );
1855        tr_timerFree( &msgs->pexTimer );
1856        tr_publisherFree( &msgs->publisher );
1857        tr_list_free( &msgs->clientWillAskFor, tr_free );
1858        tr_list_free( &msgs->clientAskedFor, tr_free );
1859        tr_list_free( &msgs->peerAskedForFast, tr_free );
1860        tr_list_free( &msgs->peerAskedFor, tr_free );
1861        tr_bitfieldFree( msgs->peerAllowedPieces );
1862        tr_bitfieldFree( msgs->clientAllowedPieces );
1863        tr_bitfieldFree( msgs->clientSuggestedPieces );
1864        evbuffer_free( msgs->incoming.block );
1865        evbuffer_free( msgs->outMessages );
1866        evbuffer_free( msgs->outBlock );
1867        tr_free( msgs->pex );
1868
1869        memset( msgs, ~0, sizeof( tr_peermsgs ) );
1870        tr_free( msgs );
1871    }
1872}
1873
1874tr_publisher_tag
1875tr_peerMsgsSubscribe( tr_peermsgs       * peer,
1876                      tr_delivery_func    func,
1877                      void              * userData )
1878{
1879    return tr_publisherSubscribe( peer->publisher, func, userData );
1880}
1881
1882void
1883tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
1884                        tr_publisher_tag    tag )
1885{
1886    tr_publisherUnsubscribe( peer->publisher, tag );
1887}
1888
1889int
1890tr_peerMsgsIsPieceFastAllowed( const tr_peermsgs * peer,
1891                               uint32_t            index )
1892{
1893    return tr_bitfieldHas( peer->clientAllowedPieces, index );
1894}
1895
1896int
1897tr_peerMsgsIsPieceSuggested( const tr_peermsgs * peer,
1898                             uint32_t            index )
1899{
1900    return tr_bitfieldHas( peer->clientSuggestedPieces, index );
1901}
1902
Note: See TracBrowser for help on using the repository browser.