source: trunk/libtransmission/peer-msgs.c @ 4177

Last change on this file since 4177 was 4177, checked in by charles, 15 years ago

raise the maximum connection limit a bit.

  • Property svn:keywords set to Date Rev Author Id
File size: 56.1 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 4177 2007-12-15 16:36:43Z charles $
11 */
12
13#include <assert.h>
14#include <ctype.h>
15#include <errno.h>
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19#include <libgen.h> /* basename */
20
21#include <netinet/in.h> /* struct in_addr */
22
23#include <event.h>
24
25#include "transmission.h"
26#include "bencode.h"
27#include "completion.h"
28#include "inout.h"
29#include "list.h"
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-mgr-private.h"
33#include "peer-msgs.h"
34#include "ratecontrol.h"
35#include "stats.h"
36#include "trevent.h"
37#include "utils.h"
38
39/**
40***
41**/
42
43enum
44{
45    BT_CHOKE                = 0,
46    BT_UNCHOKE              = 1,
47    BT_INTERESTED           = 2,
48    BT_NOT_INTERESTED       = 3,
49    BT_HAVE                 = 4,
50    BT_BITFIELD             = 5,
51    BT_REQUEST              = 6,
52    BT_PIECE                = 7,
53    BT_CANCEL               = 8,
54    BT_PORT                 = 9,
55    BT_SUGGEST              = 13,
56    BT_HAVE_ALL             = 14,
57    BT_HAVE_NONE            = 15,
58    BT_REJECT               = 16,
59    BT_ALLOWED_FAST         = 17,
60    BT_LTEP                 = 20,
61
62    LTEP_HANDSHAKE          = 0,
63
64    TR_LTEP_PEX             = 1,
65
66    MAX_REQUEST_BYTE_COUNT  = (16 * 1024), /* drop requests who want too much */
67
68    /* idle seconds before we send a keepalive */
69    KEEPALIVE_INTERVAL_SECS = 90,
70
71    PEX_INTERVAL            = (60 * 1000), /* msec between calls to sendPex() */
72    PEER_PULSE_INTERVAL     = (100),       /* msec between calls to pulse() */
73    RATE_PULSE_INTERVAL     = (250),       /* msec between calls to ratePulse() */
74
75    /* 13 is the size of the BitTorrent overhead of a BT_PIECE message...
76     * since all (or nearly all) piece requests are 16 K, this typically
77     * means we'd fill a 4096 K outbuf 5 times (4096, 4096, 4096, 4096, 16)
78     * but by adding the 13 here we can shorten it to 4 refills... not sure
79     * if this actually makes any difference since the libevent layer smooths
80     * things out anyway, but it doesn't hurt either... */
81    MAX_OUTBUF_SIZE         = (4096 + 13),
82     
83    /* Fast Peers Extension constants */
84    MAX_FAST_ALLOWED_COUNT   = 10,          /* max. number of pieces we fast-allow to another peer */
85    MAX_FAST_ALLOWED_THRESHOLD = 10,        /* max threshold for allowing fast-pieces requests */
86
87};
88
89enum
90{
91    AWAITING_BT_LENGTH,
92    AWAITING_BT_ID,
93    AWAITING_BT_MESSAGE,
94    AWAITING_BT_PIECE
95};
96
97struct peer_request
98{
99    uint32_t index;
100    uint32_t offset;
101    uint32_t length;
102    time_t time_requested;
103};
104
105static int
106compareRequest( const void * va, const void * vb )
107{
108    int i;
109    const struct peer_request * a = va;
110    const struct peer_request * b = vb;
111    if(( i = tr_compareUint32( a->index, b->index ))) return i;
112    if(( i = tr_compareUint32( a->offset, b->offset ))) return i;
113    if(( i = tr_compareUint32( a->length, b->length ))) return i;
114    return 0;
115}
116
117/* this is raw, unchanged data from the peer regarding
118 * the current message that it's sending us. */
119struct tr_incoming
120{
121    uint32_t length; /* includes the +1 for id length */
122    uint8_t id;
123    struct peer_request blockReq; /* metadata for incoming blocks */
124    struct evbuffer * block; /* piece data for incoming blocks */
125};
126
127struct tr_peermsgs
128{
129    tr_peer * info;
130
131    tr_handle * handle;
132    tr_torrent * torrent;
133    tr_peerIo * io;
134
135    tr_publisher_t * publisher;
136
137    struct evbuffer * outBlock;    /* buffer of all the current piece message */
138    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
139    tr_list * peerAskedFor;
140    tr_list * peerAskedForFast;
141    tr_list * clientAskedFor;
142    tr_list * clientWillAskFor;
143
144    tr_timer * rateTimer;
145    tr_timer * pulseTimer;
146    tr_timer * pexTimer;
147
148    time_t lastReqAddedAt;
149    time_t clientSentPexAt;
150    time_t clientSentAnythingAt;
151
152    time_t clientSentPieceDataAt;
153    time_t peerSentPieceDataAt;
154
155    unsigned int peerSentBitfield         : 1;
156    unsigned int peerSupportsPex          : 1;
157    unsigned int clientSentLtepHandshake  : 1;
158    unsigned int peerSentLtepHandshake    : 1;
159    unsigned int sendingBlock             : 1;
160   
161    tr_bitfield * clientAllowedPieces;
162    tr_bitfield * peerAllowedPieces;
163   
164    tr_bitfield * clientSuggestedPieces;
165   
166    uint8_t state;
167    uint8_t ut_pex_id;
168    uint16_t pexCount;
169    uint32_t maxActiveRequests;
170    uint32_t minActiveRequests;
171
172    struct tr_incoming incoming; 
173
174    tr_pex * pex;
175};
176
177/**
178***
179**/
180
181static void
182myDebug( const char * file, int line,
183         const struct tr_peermsgs * msgs,
184         const char * fmt, ... )
185{
186    FILE * fp = tr_getLog( );
187    if( fp != NULL )
188    {
189        va_list args;
190        char timestr[64];
191        struct evbuffer * buf = evbuffer_new( );
192        char * myfile = tr_strdup( file );
193
194        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
195                             tr_getLogTimeStr( timestr, sizeof(timestr) ),
196                             msgs->torrent->info.name,
197                             tr_peerIoGetAddrStr( msgs->io ),
198                             msgs->info->client );
199        va_start( args, fmt );
200        evbuffer_add_vprintf( buf, fmt, args );
201        va_end( args );
202        evbuffer_add_printf( buf, " (%s:%d)\n", basename(myfile), line );
203        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
204
205        tr_free( myfile );
206        evbuffer_free( buf );
207    }
208}
209
210#define dbgmsg(msgs, fmt...) myDebug(__FILE__, __LINE__, msgs, ##fmt )
211
212/**
213***
214**/
215
216static void
217protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
218{
219    tr_peerIo * io = msgs->io;
220    struct evbuffer * out = msgs->outMessages;
221
222    dbgmsg( msgs, "requesting %u:%u->%u", req->index, req->offset, req->length );
223    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
224    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
225    tr_peerIoWriteUint32( io, out, req->index );
226    tr_peerIoWriteUint32( io, out, req->offset );
227    tr_peerIoWriteUint32( io, out, req->length );
228}
229
230static void
231protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
232{
233    tr_peerIo * io = msgs->io;
234    struct evbuffer * out = msgs->outMessages;
235
236    dbgmsg( msgs, "cancelling %u:%u->%u", req->index, req->offset, req->length );
237    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
238    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
239    tr_peerIoWriteUint32( io, out, req->index );
240    tr_peerIoWriteUint32( io, out, req->offset );
241    tr_peerIoWriteUint32( io, out, req->length );
242}
243
244static void
245protocolSendHave( tr_peermsgs * msgs, uint32_t index )
246{
247    tr_peerIo * io = msgs->io;
248    struct evbuffer * out = msgs->outMessages;
249
250    dbgmsg( msgs, "sending Have %u", index );
251    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
252    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
253    tr_peerIoWriteUint32( io, out, index );
254}
255
256static void
257protocolSendChoke( tr_peermsgs * msgs, int choke )
258{
259    tr_peerIo * io = msgs->io;
260    struct evbuffer * out = msgs->outMessages;
261
262    dbgmsg( msgs, "sending %s", (choke ? "Choke" : "Unchoke") );
263    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) );
264    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
265}
266
267/**
268***  EVENTS
269**/
270
271static const tr_peermsgs_event blankEvent = { 0, 0, 0, 0, 0.0f };
272
273static void
274publish( tr_peermsgs * msgs, tr_peermsgs_event * e )
275{
276    tr_publisherPublish( msgs->publisher, msgs->info, e );
277}
278
279static void
280fireGotAssertError( tr_peermsgs * msgs )
281{
282    tr_peermsgs_event e = blankEvent;
283    e.eventType = TR_PEERMSG_GOT_ASSERT_ERROR;
284    publish( msgs, &e );
285}
286
287static void
288fireGotError( tr_peermsgs * msgs )
289{
290    tr_peermsgs_event e = blankEvent;
291    e.eventType = TR_PEERMSG_GOT_ERROR;
292    publish( msgs, &e );
293}
294
295static void
296fireNeedReq( tr_peermsgs * msgs )
297{
298    tr_peermsgs_event e = blankEvent;
299    e.eventType = TR_PEERMSG_NEED_REQ;
300    publish( msgs, &e );
301}
302
303static void
304firePeerProgress( tr_peermsgs * msgs )
305{
306    tr_peermsgs_event e = blankEvent;
307    e.eventType = TR_PEERMSG_PEER_PROGRESS;
308    e.progress = msgs->info->progress;
309    publish( msgs, &e );
310}
311
312static void
313fireClientHave( tr_peermsgs * msgs, uint32_t pieceIndex )
314{
315    tr_peermsgs_event e = blankEvent;
316    e.eventType = TR_PEERMSG_CLIENT_HAVE;
317    e.pieceIndex = pieceIndex;
318    publish( msgs, &e );
319}
320
321static void
322fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
323{
324    tr_peermsgs_event e = blankEvent;
325    e.eventType = TR_PEERMSG_CLIENT_BLOCK;
326    e.pieceIndex = req->index;
327    e.offset = req->offset;
328    e.length = req->length;
329    publish( msgs, &e );
330}
331
332static void
333fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
334{
335    tr_peermsgs_event e = blankEvent;
336    e.eventType = TR_PEERMSG_CANCEL;
337    e.pieceIndex = req->index;
338    e.offset = req->offset;
339    e.length = req->length;
340    publish( msgs, &e );
341}
342
343/**
344***  INTEREST
345**/
346
347static int
348isPieceInteresting( const tr_peermsgs   * peer,
349                    int                   piece )
350{
351    const tr_torrent * torrent = peer->torrent;
352    if( torrent->info.pieces[piece].dnd ) /* we don't want it */
353        return FALSE;
354    if( tr_cpPieceIsComplete( torrent->completion, piece ) ) /* we have it */
355        return FALSE;
356    if( !tr_bitfieldHas( peer->info->have, piece ) ) /* peer doesn't have it */
357        return FALSE;
358    if( tr_bitfieldHas( peer->info->banned, piece ) ) /* peer is banned */
359        return FALSE;
360    return TRUE;
361}
362
363/* "interested" means we'll ask for piece data if they unchoke us */
364static int
365isPeerInteresting( const tr_peermsgs * msgs )
366{
367    int i;
368    const tr_torrent * torrent;
369    const tr_bitfield * bitfield;
370    const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
371
372    if( clientIsSeed )
373        return FALSE;
374
375    torrent = msgs->torrent;
376    bitfield = tr_cpPieceBitfield( torrent->completion );
377
378    if( !msgs->info->have )
379        return TRUE;
380
381    assert( bitfield->len == msgs->info->have->len );
382    for( i=0; i<torrent->info.pieceCount; ++i )
383        if( isPieceInteresting( msgs, i ) )
384            return TRUE;
385
386    return FALSE;
387}
388
389static void
390sendInterest( tr_peermsgs * msgs, int weAreInterested )
391{
392    assert( msgs != NULL );
393    assert( weAreInterested==0 || weAreInterested==1 );
394
395    msgs->info->clientIsInterested = weAreInterested;
396    dbgmsg( msgs, "Sending %s",
397            weAreInterested ? "Interested" : "Not Interested");
398
399    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
400    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages,
401                   weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
402}
403
404static void
405updateInterest( tr_peermsgs * msgs )
406{
407    const int i = isPeerInteresting( msgs );
408    if( i != msgs->info->clientIsInterested )
409        sendInterest( msgs, i );
410    if( i )
411        fireNeedReq( msgs );
412}
413
414#define MIN_CHOKE_PERIOD_SEC 10
415
416static void
417cancelAllRequestsToClientExceptFast( tr_peermsgs * msgs )
418{
419    tr_list_free( &msgs->peerAskedFor, tr_free );
420}
421
422void
423tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
424{
425    const time_t fibrillationTime = time(NULL) - MIN_CHOKE_PERIOD_SEC;
426
427    assert( msgs != NULL );
428    assert( msgs->info != NULL );
429    assert( choke==0 || choke==1 );
430
431    if( msgs->info->chokeChangedAt > fibrillationTime )
432    {
433        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
434    }
435    else if( msgs->info->peerIsChoked != choke )
436    {
437        msgs->info->peerIsChoked = choke;
438        if( choke )
439            cancelAllRequestsToClientExceptFast( msgs );
440        protocolSendChoke( msgs, choke );
441        msgs->info->chokeChangedAt = time( NULL );
442    }
443}
444
445/**
446***
447**/
448
449void
450tr_peerMsgsHave( tr_peermsgs * msgs,
451                 uint32_t      index )
452{
453    protocolSendHave( msgs, index );
454
455    /* since we have more pieces now, we might not be interested in this peer */
456    updateInterest( msgs );
457}
458#if 0
459static void
460sendFastSuggest( tr_peermsgs * msgs,
461                 uint32_t      pieceIndex )
462{
463    assert( msgs != NULL );
464   
465    if( tr_peerIoSupportsFEXT( msgs->io ) )
466    {
467        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
468        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
469        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
470    }
471}
472#endif
473static void
474sendFastHave( tr_peermsgs * msgs, int all )
475{
476    assert( msgs != NULL );
477   
478    if( tr_peerIoSupportsFEXT( msgs->io ) )
479    {
480        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
481        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, ( all ? BT_HAVE_ALL
482                                                                : BT_HAVE_NONE ) );
483        updateInterest( msgs );
484    }
485}
486
487static void
488sendFastReject( tr_peermsgs * msgs,
489                uint32_t      pieceIndex,
490                uint32_t      offset,
491                uint32_t      length )
492{
493    assert( msgs != NULL );
494
495    if( tr_peerIoSupportsFEXT( msgs->io ) )
496    {
497        const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
498        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
499        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_REJECT );
500        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
501        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
502        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
503    }
504}
505
506static void
507sendFastAllowed( tr_peermsgs * msgs,
508                 uint32_t      pieceIndex)
509{
510    assert( msgs != NULL );
511   
512    if( tr_peerIoSupportsFEXT( msgs->io ) )
513    {
514        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
515        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_ALLOWED_FAST );
516        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
517    }
518}
519
520
521
522static void
523sendFastAllowedSet( tr_peermsgs * msgs )
524{
525    int i = 0;
526    while (i <= msgs->torrent->info.pieceCount )
527    {
528        if ( tr_bitfieldHas( msgs->peerAllowedPieces, i) )
529            sendFastAllowed( msgs, i );
530        i++;
531    }
532}
533
534
535/**
536***
537**/
538
539static int
540reqIsValid( const tr_peermsgs   * msgs,
541            uint32_t              index,
542            uint32_t              offset,
543            uint32_t              length )
544{
545    const tr_torrent * tor = msgs->torrent;
546
547    if( index >= (uint32_t) tor->info.pieceCount )
548        return FALSE;
549    if ( (int)offset >= tr_torPieceCountBytes( tor, (int)index ) )
550        return FALSE;
551    if( length > MAX_REQUEST_BYTE_COUNT )
552        return FALSE;
553    if( tr_pieceOffset( tor, index, offset, length ) > tor->info.totalSize )
554        return FALSE;
555
556    return TRUE;
557}
558
559static int
560requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
561{
562    return reqIsValid( msgs, req->index, req->offset, req->length );
563}
564
565static void
566pumpRequestQueue( tr_peermsgs * msgs )
567{
568    const int max = msgs->maxActiveRequests;
569    const int min = msgs->minActiveRequests;
570    int count = tr_list_size( msgs->clientAskedFor );
571    int sent = 0;
572
573    if( count > min )
574        return;
575    if( msgs->info->clientIsChoked )
576        return;
577
578    while( ( count < max ) && ( msgs->clientWillAskFor != NULL ) )
579    {
580        struct peer_request * r = tr_list_pop_front( &msgs->clientWillAskFor );
581        assert( requestIsValid( msgs, r ) );
582        assert( tr_bitfieldHas( msgs->info->have, r->index ) );
583        protocolSendRequest( msgs, r );
584        r->time_requested = msgs->lastReqAddedAt = time( NULL );
585        tr_list_append( &msgs->clientAskedFor, r );
586        ++count;
587        ++sent;
588    }
589
590    if( sent )
591        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
592                sent,
593                tr_list_size(msgs->clientAskedFor),
594                tr_list_size(msgs->clientWillAskFor) );
595
596    if( count < max )
597        fireNeedReq( msgs );
598}
599
600static int
601pulse( void * vmsgs );
602
603int
604tr_peerMsgsAddRequest( tr_peermsgs * msgs,
605                       uint32_t      index, 
606                       uint32_t      offset, 
607                       uint32_t      length )
608{
609    const int req_max = msgs->maxActiveRequests;
610    struct peer_request tmp, *req;
611
612    assert( msgs != NULL );
613    assert( msgs->torrent != NULL );
614    assert( reqIsValid( msgs, index, offset, length ) );
615
616    /**
617    ***  Reasons to decline the request
618    **/
619
620    /* don't send requests to choked clients */
621    if( msgs->info->clientIsChoked ) {
622        dbgmsg( msgs, "declining request because they're choking us" );
623        return TR_ADDREQ_CLIENT_CHOKED;
624    }
625
626    /* peer doesn't have this piece */
627    if( !tr_bitfieldHas( msgs->info->have, index ) )
628        return TR_ADDREQ_MISSING;
629
630    /* peer's queue is full */
631    if( tr_list_size( msgs->clientWillAskFor ) >= req_max ) {
632        dbgmsg( msgs, "declining request because we're full" );
633        return TR_ADDREQ_FULL;
634    }
635
636    /* have we already asked for this piece? */
637    tmp.index = index;
638    tmp.offset = offset;
639    tmp.length = length;
640    if( tr_list_find( msgs->clientAskedFor, &tmp, compareRequest ) ) {
641        dbgmsg( msgs, "declining because it's a duplicate" );
642        return TR_ADDREQ_DUPLICATE;
643    }
644    if( tr_list_find( msgs->clientWillAskFor, &tmp, compareRequest ) ) {
645        dbgmsg( msgs, "declining because it's a duplicate" );
646        return TR_ADDREQ_DUPLICATE;
647    }
648
649    /**
650    ***  Accept this request
651    **/
652
653    dbgmsg( msgs, "added req for piece %d, offset %d", (int)index, (int)offset );
654    req = tr_new0( struct peer_request, 1 );
655    *req = tmp;
656    tr_list_append( &msgs->clientWillAskFor, req );
657    return TR_ADDREQ_OK;
658}
659
660static void
661cancelAllRequestsToPeer( tr_peermsgs * msgs )
662{
663    struct peer_request * req;
664
665    while(( req = tr_list_pop_front( &msgs->clientWillAskFor ) ))
666    {
667        fireCancelledReq( msgs, req );
668        tr_free( req );
669    }
670
671    while(( req = tr_list_pop_front( &msgs->clientAskedFor ) ))
672    {
673        fireCancelledReq( msgs, req );
674        protocolSendCancel( msgs, req );
675        tr_free( req );
676    }
677}
678
679void
680tr_peerMsgsCancel( tr_peermsgs * msgs,
681                   uint32_t      pieceIndex,
682                   uint32_t      offset,
683                   uint32_t      length )
684{
685    struct peer_request *req, tmp;
686
687    assert( msgs != NULL );
688    assert( length > 0 );
689
690    /* have we asked the peer for this piece? */
691    tmp.index = pieceIndex;
692    tmp.offset = offset;
693    tmp.length = length;
694
695    /* if it's only in the queue and hasn't been sent yet, free it */
696    if(( req = tr_list_remove( &msgs->clientWillAskFor, &tmp, compareRequest ) ))
697    {
698        fireCancelledReq( msgs, req );
699        tr_free( req );
700    }
701
702    /* if it's already been sent, send a cancel message too */
703    if(( req = tr_list_remove( &msgs->clientAskedFor, &tmp, compareRequest ) ))
704    {
705        protocolSendCancel( msgs, req );
706        fireCancelledReq( msgs, req );
707        tr_free( req );
708    }
709}
710
711/**
712***
713**/
714
715static void
716sendLtepHandshake( tr_peermsgs * msgs )
717{
718    benc_val_t val, *m;
719    char * buf;
720    int len;
721    int pex;
722    const char * v = TR_NAME " " USERAGENT_PREFIX;
723    const int port = tr_getPublicPort( msgs->handle );
724    struct evbuffer * outbuf;
725
726    if( msgs->clientSentLtepHandshake )
727        return;
728
729    outbuf = evbuffer_new( );
730    dbgmsg( msgs, "sending an ltep handshake" );
731    msgs->clientSentLtepHandshake = 1;
732
733    /* decide if we want to advertise pex support */
734    if( !tr_torrentIsPexEnabled( msgs->torrent ) )
735        pex = 0;
736    else if( msgs->peerSentLtepHandshake )
737        pex = msgs->peerSupportsPex ? 1 : 0;
738    else
739        pex = 1;
740
741    tr_bencInit( &val, TYPE_DICT );
742    tr_bencDictReserve( &val, 4 );
743    tr_bencInitInt( tr_bencDictAdd( &val, "e" ), 1 );
744    m  = tr_bencDictAdd( &val, "m" );
745    tr_bencInit( m, TYPE_DICT );
746    if( pex ) {
747        tr_bencDictReserve( m, 1 );
748        tr_bencInitInt( tr_bencDictAdd( m, "ut_pex" ), TR_LTEP_PEX );
749    }
750    if( port > 0 )
751        tr_bencInitInt( tr_bencDictAdd( &val, "p" ), port );
752    tr_bencInitStr( tr_bencDictAdd( &val, "v" ), v, 0, 1 );
753    buf = tr_bencSave( &val, &len );
754
755    tr_peerIoWriteUint32( msgs->io, outbuf, 2*sizeof(uint8_t) + len );
756    tr_peerIoWriteUint8 ( msgs->io, outbuf, BT_LTEP );
757    tr_peerIoWriteUint8 ( msgs->io, outbuf, LTEP_HANDSHAKE );
758    tr_peerIoWriteBytes ( msgs->io, outbuf, buf, len );
759
760    tr_peerIoWriteBuf( msgs->io, outbuf );
761
762#if 0
763    dbgmsg( msgs, "here is the ltep handshake we sent:" );
764    tr_bencPrint( &val );
765    dbgmsg( msgs, "here is the ltep handshake we read [%s]:", tr_bencSave( &val, NULL ) );
766#endif
767
768    /* cleanup */
769    tr_bencFree( &val );
770    tr_free( buf );
771    evbuffer_free( outbuf );
772}
773
774static void
775parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
776{
777    benc_val_t val, * sub;
778    uint8_t * tmp = tr_new( uint8_t, len );
779
780    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
781    msgs->peerSentLtepHandshake = 1;
782
783    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
784        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
785        tr_free( tmp );
786        return;
787    }
788
789#if 0
790    dbgmsg( msgs, "here is the ltep handshake we read:" );
791    tr_bencPrint( &val );
792    dbgmsg( msgs, "here is the ltep handshake we read [%s]:", tr_bencSave( &val, NULL ) );
793#endif
794
795    /* does the peer prefer encrypted connections? */
796    if(( sub = tr_bencDictFindType( &val, "e", TYPE_INT )))
797        msgs->info->encryption_preference = sub->val.i
798                                      ? ENCRYPTION_PREFERENCE_YES
799                                      : ENCRYPTION_PREFERENCE_NO;
800
801    /* check supported messages for utorrent pex */
802    if(( sub = tr_bencDictFindType( &val, "m", TYPE_DICT ))) {
803        if(( sub = tr_bencDictFindType( sub, "ut_pex", TYPE_INT ))) {
804            msgs->peerSupportsPex = 1;
805            msgs->ut_pex_id = (uint8_t) sub->val.i;
806            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
807        }
808    }
809
810    /* get peer's listening port */
811    if(( sub = tr_bencDictFindType( &val, "p", TYPE_INT ))) {
812        msgs->info->port = htons( (uint16_t)sub->val.i );
813        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
814    }
815
816    tr_bencFree( &val );
817    tr_free( tmp );
818}
819
820static void
821parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
822{
823    benc_val_t val, * sub;
824    uint8_t * tmp;
825
826    if( !tr_torrentIsPexEnabled( msgs->torrent ) ) /* no sharing! */
827        return;
828
829    tmp = tr_new( uint8_t, msglen );
830    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
831
832    if( tr_bencLoad( tmp, msglen, &val, NULL ) || ( val.type != TYPE_DICT ) ) {
833        dbgmsg( msgs, "GET can't read extended-pex dictionary" );
834        tr_free( tmp );
835        return;
836    }
837
838    if(( sub = tr_bencDictFindType( &val, "added", TYPE_STR ))) {
839        const int n = sub->val.s.i / 6 ;
840        dbgmsg( msgs, "got %d peers from uT pex", n );
841        tr_peerMgrAddPeers( msgs->handle->peerMgr,
842                            msgs->torrent->info.hash,
843                            TR_PEER_FROM_PEX,
844                            (uint8_t*)sub->val.s.s, n );
845    }
846
847    tr_bencFree( &val );
848    tr_free( tmp );
849}
850
851static void
852sendPex( tr_peermsgs * msgs );
853
854static void
855parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
856{
857    uint8_t ltep_msgid;
858
859    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
860    msglen--;
861
862    if( ltep_msgid == LTEP_HANDSHAKE )
863    {
864        dbgmsg( msgs, "got ltep handshake" );
865        parseLtepHandshake( msgs, msglen, inbuf );
866        if( tr_peerIoSupportsLTEP( msgs->io ) )
867        {
868            sendLtepHandshake( msgs );
869            sendPex( msgs );
870        }
871    }
872    else if( ltep_msgid == TR_LTEP_PEX )
873    {
874        dbgmsg( msgs, "got ut pex" );
875        msgs->peerSupportsPex = 1;
876        parseUtPex( msgs, msglen, inbuf );
877    }
878    else
879    {
880        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
881        evbuffer_drain( inbuf, msglen );
882    }
883}
884
885static int
886readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
887{
888    uint32_t len;
889
890    if( inlen < sizeof(len) )
891        return READ_MORE;
892
893    tr_peerIoReadUint32( msgs->io, inbuf, &len );
894
895    if( len == 0 ) /* peer sent us a keepalive message */
896        dbgmsg( msgs, "got KeepAlive" );
897    else {
898        msgs->incoming.length = len;
899        msgs->state = AWAITING_BT_ID;
900    }
901
902    return READ_AGAIN;
903}
904
905static int
906readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen );
907
908static int
909readBtId( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
910{
911    uint8_t id;
912
913    if( inlen < sizeof(uint8_t) )
914        return READ_MORE;
915
916    tr_peerIoReadUint8( msgs->io, inbuf, &id );
917    msgs->incoming.id = id;
918
919    if( id==BT_PIECE )
920    {
921        msgs->state = AWAITING_BT_PIECE;
922        return READ_AGAIN;
923    }
924    else if( msgs->incoming.length != 1 )
925    {
926        msgs->state = AWAITING_BT_MESSAGE;
927        return READ_AGAIN;
928    }
929    else return readBtMessage( msgs, inbuf, inlen-1 );
930}
931
932static void
933updatePeerProgress( tr_peermsgs * msgs )
934{
935    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
936    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
937    updateInterest( msgs );
938    firePeerProgress( msgs );
939}
940
941static int
942clientCanSendFastBlock( const tr_peermsgs * msgs UNUSED )
943{
944    /* We can't send a fast piece if a peer has more than MAX_FAST_ALLOWED_THRESHOLD pieces */
945    if( tr_bitfieldCountTrueBits( msgs->info->have ) > MAX_FAST_ALLOWED_THRESHOLD )
946        return FALSE;
947   
948    /* ...or if we don't have ourself enough pieces */
949    if( tr_bitfieldCountTrueBits( tr_cpPieceBitfield( msgs->torrent->completion ) ) < MAX_FAST_ALLOWED_THRESHOLD )
950        return FALSE;
951
952    /* Maybe a bandwidth limit ? */
953    return TRUE;
954}
955
956static void
957peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
958{
959    const int reqIsValid = requestIsValid( msgs, req );
960    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
961    const int peerIsChoked = msgs->info->peerIsChoked;
962    const int peerIsFast = tr_peerIoSupportsFEXT( msgs->io );
963    const int pieceIsFast = reqIsValid && tr_bitfieldHas( msgs->peerAllowedPieces, req->index );
964    const int canSendFast = clientCanSendFastBlock( msgs );
965
966    if( !reqIsValid ) /* bad request */
967    {
968        dbgmsg( msgs, "rejecting an invalid request." );
969        sendFastReject( msgs, req->index, req->offset, req->length );
970    }
971    else if( !clientHasPiece ) /* we don't have it */
972    {
973        dbgmsg( msgs, "rejecting request for a piece we don't have." );
974        sendFastReject( msgs, req->index, req->offset, req->length );
975    }
976    else if( peerIsChoked && !peerIsFast ) /* doesn't he know he's choked? */
977    {
978        tr_peerMsgsSetChoke( msgs, 1 );
979        sendFastReject( msgs, req->index, req->offset, req->length );
980    }
981    else if( peerIsChoked && peerIsFast && ( !pieceIsFast || !canSendFast ) )
982    {
983        sendFastReject( msgs, req->index, req->offset, req->length );
984    }
985    else /* YAY */
986    {
987        struct peer_request * tmp = tr_new( struct peer_request, 1 );
988        *tmp = *req;
989        if( peerIsFast && pieceIsFast )
990            tr_list_append( &msgs->peerAskedForFast, tmp );
991        else
992            tr_list_append( &msgs->peerAskedFor, tmp );
993    }
994}
995
996static int
997messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
998{
999    switch( id )
1000    {
1001        case BT_CHOKE:
1002        case BT_UNCHOKE:
1003        case BT_INTERESTED:
1004        case BT_NOT_INTERESTED:
1005        case BT_HAVE_ALL:
1006        case BT_HAVE_NONE:
1007            return len==1;
1008
1009        case BT_HAVE:
1010        case BT_SUGGEST:
1011        case BT_ALLOWED_FAST:
1012            return len==5;
1013
1014        case BT_BITFIELD:
1015            return len == (msg->torrent->info.pieceCount+7u)/8u + 1u;
1016       
1017        case BT_REQUEST:
1018        case BT_CANCEL:
1019        case BT_REJECT:
1020            return len==13;
1021
1022        case BT_PIECE:
1023            return len>9 && len<=16393;
1024
1025        case BT_PORT:
1026            return len==3;
1027
1028        case BT_LTEP:
1029            return len >= 2;
1030
1031        default:
1032            return FALSE;
1033    }
1034}
1035
1036static int
1037clientGotBlock( tr_peermsgs * msgs, const uint8_t * block, const struct peer_request * req );
1038
1039static void
1040clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1041{
1042    tr_torrent * tor = msgs->torrent;
1043    tor->activityDate = tr_date( );
1044    tor->downloadedCur += byteCount;
1045    msgs->peerSentPieceDataAt = time( NULL );
1046    msgs->info->pieceDataActivityDate = time( NULL );
1047    msgs->info->credit += (int)(byteCount * SWIFT_REPAYMENT_RATIO);
1048    tr_rcTransferred( msgs->info->rcToClient, byteCount );
1049    tr_rcTransferred( tor->download, byteCount );
1050    tr_rcTransferred( tor->handle->download, byteCount );
1051    tr_statsAddDownloaded( msgs->handle, byteCount );
1052}
1053
1054static int
1055readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1056{
1057    struct peer_request * req = &msgs->incoming.blockReq;
1058    assert( EVBUFFER_LENGTH(inbuf) >= inlen );
1059    dbgmsg( msgs, "In readBtPiece" );
1060
1061    if( !req->length )
1062    {
1063        if( inlen < 8 )
1064            return READ_MORE;
1065
1066        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
1067        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
1068        req->length = msgs->incoming.length - 9;
1069        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1070        return READ_AGAIN;
1071    }
1072    else
1073    {
1074        int err;
1075
1076        /* read in another chunk of data */
1077        const size_t nLeft = req->length - EVBUFFER_LENGTH(msgs->incoming.block);
1078        size_t n = MIN( nLeft, inlen );
1079        uint8_t * buf = tr_new( uint8_t, n );
1080        assert( EVBUFFER_LENGTH(inbuf) >= n );
1081        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
1082        evbuffer_add( msgs->incoming.block, buf, n );
1083        clientGotBytes( msgs, n );
1084        tr_free( buf );
1085        dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
1086               (int)n, req->index, req->offset, req->length,
1087               (int)( req->length - EVBUFFER_LENGTH(msgs->incoming.block) ) );
1088        if( EVBUFFER_LENGTH(msgs->incoming.block) < req->length )
1089            return READ_MORE;
1090
1091        /* we've got the whole block ... process it */
1092        err = clientGotBlock( msgs, EVBUFFER_DATA(msgs->incoming.block), req );
1093
1094        /* cleanup */
1095        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH(msgs->incoming.block) );
1096        req->length = 0;
1097        msgs->state = AWAITING_BT_LENGTH;
1098        if( !err )
1099            return READ_AGAIN;
1100        else {
1101            fireGotAssertError( msgs );
1102            return READ_DONE;
1103        }
1104    }
1105}
1106
1107static int
1108readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1109{
1110    uint32_t ui32;
1111    uint32_t msglen = msgs->incoming.length;
1112    const uint8_t id = msgs->incoming.id;
1113    const size_t startBufLen = EVBUFFER_LENGTH( inbuf );
1114
1115    --msglen; // id length
1116
1117    if( inlen < msglen )
1118        return READ_MORE;
1119
1120    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id, (int)msglen, (int)inlen );
1121
1122    if( !messageLengthIsCorrect( msgs, id, msglen+1 ) )
1123    {
1124        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1125        fireGotError( msgs );
1126        return READ_DONE;
1127    }
1128
1129    switch( id )
1130    {
1131        case BT_CHOKE:
1132            dbgmsg( msgs, "got Choke" );
1133            msgs->info->clientIsChoked = 1;
1134            cancelAllRequestsToPeer( msgs );
1135            cancelAllRequestsToClientExceptFast( msgs );
1136            break;
1137
1138        case BT_UNCHOKE:
1139            dbgmsg( msgs, "got Unchoke" );
1140            msgs->info->clientIsChoked = 0;
1141            fireNeedReq( msgs );
1142            break;
1143
1144        case BT_INTERESTED:
1145            dbgmsg( msgs, "got Interested" );
1146            msgs->info->peerIsInterested = 1;
1147            tr_peerMsgsSetChoke( msgs, 0 );
1148            break;
1149
1150        case BT_NOT_INTERESTED:
1151            dbgmsg( msgs, "got Not Interested" );
1152            msgs->info->peerIsInterested = 0;
1153            break;
1154           
1155        case BT_HAVE:
1156            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1157            dbgmsg( msgs, "got Have: %u", ui32 );
1158            tr_bitfieldAdd( msgs->info->have, ui32 );
1159            /* If this is a fast-allowed piece for this peer, mark it as normal now */
1160            if( msgs->clientAllowedPieces != NULL && tr_bitfieldHas( msgs->clientAllowedPieces, ui32 ) )
1161                tr_bitfieldRem( msgs->clientAllowedPieces, ui32 );
1162            updatePeerProgress( msgs );
1163            tr_rcTransferred( msgs->torrent->swarmspeed, msgs->torrent->info.pieceSize );
1164            break;
1165
1166        case BT_BITFIELD: {
1167            const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
1168            dbgmsg( msgs, "got a bitfield" );
1169            msgs->peerSentBitfield = 1;
1170            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
1171            updatePeerProgress( msgs );
1172            tr_peerMsgsSetChoke( msgs, !clientIsSeed || (msgs->info->progress<1.0) );
1173            fireNeedReq( msgs );
1174            break;
1175        }
1176
1177        case BT_REQUEST: {
1178            struct peer_request req;
1179            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1180            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1181            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1182            dbgmsg( msgs, "got Request: %u:%u->%u", req.index, req.offset, req.length );
1183            peerMadeRequest( msgs, &req );
1184            break;
1185        }
1186
1187        case BT_CANCEL: {
1188            struct peer_request req;
1189            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1190            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1191            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1192            dbgmsg( msgs, "got a Cancel %u:%u->%u", req.index, req.offset, req.length );
1193            tr_free( tr_list_remove( &msgs->peerAskedForFast, &req, compareRequest ) );
1194            tr_free( tr_list_remove( &msgs->peerAskedFor, &req, compareRequest ) );
1195            break;
1196        }
1197
1198        case BT_PIECE:
1199            assert( 0 ); /* should be handled elsewhere! */
1200            break;
1201       
1202        case BT_PORT:
1203            dbgmsg( msgs, "Got a BT_PORT" );
1204            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1205            break;
1206           
1207        case BT_SUGGEST: {
1208            const tr_bitfield * bt = tr_cpPieceBitfield( msgs->torrent->completion );
1209            dbgmsg( msgs, "Got a BT_SUGGEST" );
1210            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1211            if( tr_bitfieldHas( bt, ui32 ) )
1212                tr_bitfieldAdd( msgs->clientSuggestedPieces, ui32 );
1213            break;
1214        }
1215           
1216        case BT_HAVE_ALL:
1217            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1218            tr_bitfieldAddRange( msgs->info->have, 0, msgs->torrent->info.pieceCount );
1219            updatePeerProgress( msgs );
1220            break;
1221       
1222       
1223        case BT_HAVE_NONE:
1224            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1225            tr_bitfieldClear( msgs->info->have );
1226            updatePeerProgress( msgs );
1227            break;
1228       
1229        case BT_REJECT: {
1230            struct peer_request req;
1231            dbgmsg( msgs, "Got a BT_REJECT" );
1232            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1233            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1234            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1235            tr_free( tr_list_remove( &msgs->clientAskedFor, &req, compareRequest ) );
1236            break;
1237        }
1238
1239        case BT_ALLOWED_FAST: {
1240            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1241            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1242            tr_bitfieldAdd( msgs->clientAllowedPieces, ui32 );
1243            break;
1244        }
1245
1246        case BT_LTEP:
1247            dbgmsg( msgs, "Got a BT_LTEP" );
1248            parseLtep( msgs, msglen, inbuf );
1249            break;
1250
1251        default:
1252            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1253            tr_peerIoDrain( msgs->io, inbuf, msglen );
1254            break;
1255    }
1256
1257    assert( msglen + 1 == msgs->incoming.length );
1258    assert( EVBUFFER_LENGTH(inbuf) == startBufLen - msglen );
1259
1260    msgs->state = AWAITING_BT_LENGTH;
1261    return READ_AGAIN;
1262}
1263
1264static void
1265peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1266{
1267    tr_torrent * tor = msgs->torrent;
1268    tor->activityDate = tr_date( );
1269    tor->uploadedCur += byteCount;
1270    msgs->clientSentPieceDataAt = time( NULL );
1271    msgs->info->pieceDataActivityDate = time( NULL );
1272    msgs->info->credit -= byteCount;
1273    tr_rcTransferred( msgs->info->rcToPeer, byteCount );
1274    tr_rcTransferred( tor->upload, byteCount );
1275    tr_rcTransferred( tor->handle->upload, byteCount );
1276    tr_statsAddUploaded( msgs->handle, byteCount );
1277}
1278
1279static size_t
1280getDownloadMax( const tr_peermsgs * msgs )
1281{
1282    static const size_t maxval = ~0;
1283    const tr_torrent * tor = msgs->torrent;
1284
1285    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1286        return tor->handle->useDownloadLimit
1287            ? tr_rcBytesLeft( tor->handle->download ) : maxval;
1288
1289    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
1290        return tr_rcBytesLeft( tor->download );
1291
1292    return maxval;
1293}
1294
1295static void
1296reassignBytesToCorrupt( tr_peermsgs * msgs, uint32_t byteCount )
1297{
1298    tr_torrent * tor = msgs->torrent;
1299
1300    /* increment the `corrupt' field */
1301    tor->corruptCur += byteCount;
1302
1303    /* decrement the `downloaded' field */
1304    if( tor->downloadedCur >= byteCount )
1305        tor->downloadedCur -= byteCount;
1306    else
1307        tor->downloadedCur = 0;
1308}
1309
1310
1311static void
1312gotBadPiece( tr_peermsgs * msgs, uint32_t pieceIndex )
1313{
1314    const uint32_t byteCount =
1315        tr_torPieceCountBytes( msgs->torrent, (int)pieceIndex );
1316    reassignBytesToCorrupt( msgs, byteCount );
1317}
1318
1319static void
1320clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1321{
1322    reassignBytesToCorrupt( msgs, req->length );
1323}
1324
1325static void
1326addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1327{
1328    if( !msgs->info->blame )
1329         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1330    tr_bitfieldAdd( msgs->info->blame, index );
1331}
1332
1333static int
1334clientGotBlock( tr_peermsgs                * msgs,
1335                const uint8_t              * data,
1336                const struct peer_request  * req )
1337{
1338    int i;
1339    tr_torrent * tor = msgs->torrent;
1340    const int block = _tr_block( tor, req->index, req->offset );
1341    struct peer_request *myreq;
1342
1343    assert( msgs != NULL );
1344    assert( req != NULL );
1345
1346    if( req->length != (uint32_t)tr_torBlockCountBytes( msgs->torrent, block ) )
1347    {
1348        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1349                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1350        return TR_ERROR_ASSERT;
1351    }
1352
1353    /* save the block */
1354    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1355
1356    /**
1357    *** Remove the block from our `we asked for this' list
1358    **/
1359
1360    myreq = tr_list_remove( &msgs->clientAskedFor, req, compareRequest );
1361    if( myreq == NULL ) {
1362        clientGotUnwantedBlock( msgs, req );
1363        dbgmsg( msgs, "we didn't ask for this message..." );
1364        return 0;
1365    }
1366
1367    dbgmsg( msgs, "got block %u:%u->%u (turnaround time %d secs)", 
1368                  myreq->index, myreq->offset, myreq->length,
1369                  (int)(time(NULL) - myreq->time_requested) );
1370    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1371                  tr_list_size(msgs->clientAskedFor));
1372
1373    tr_free( myreq );
1374    myreq = NULL;
1375
1376    /**
1377    *** Error checks
1378    **/
1379
1380    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1381        dbgmsg( msgs, "we have this block already..." );
1382        clientGotUnwantedBlock( msgs, req );
1383        return 0;
1384    }
1385
1386    /**
1387    ***  Save the block
1388    **/
1389
1390    msgs->info->peerSentPieceDataAt = time( NULL );
1391    i = tr_ioWrite( tor, req->index, req->offset, req->length, data );
1392    if( i )
1393        return 0;
1394
1395    tr_cpBlockAdd( tor->completion, block );
1396
1397    addPeerToBlamefield( msgs, req->index );
1398
1399    fireGotBlock( msgs, req );
1400
1401    /**
1402    ***  Handle if this was the last block in the piece
1403    **/
1404
1405    if( tr_cpPieceIsComplete( tor->completion, req->index ) )
1406    {
1407        if( tr_ioHash( tor, req->index ) )
1408        {
1409            gotBadPiece( msgs, req->index );
1410            return 0;
1411        }
1412
1413        fireClientHave( msgs, req->index );
1414    }
1415
1416    return 0;
1417}
1418
1419static void
1420didWrite( struct bufferevent * evin UNUSED, void * vmsgs )
1421{
1422    pulse( vmsgs );
1423}
1424
1425static ReadState
1426canRead( struct bufferevent * evin, void * vmsgs )
1427{
1428    ReadState ret;
1429    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1430    struct evbuffer * inbuf = EVBUFFER_INPUT ( evin );
1431    const size_t buflen = EVBUFFER_LENGTH( inbuf );
1432    const size_t downloadMax = getDownloadMax( msgs );
1433    const size_t n = MIN( buflen, downloadMax );
1434
1435    if( !n )
1436    {
1437        ret = READ_DONE;
1438    }
1439    else switch( msgs->state )
1440    {
1441        case AWAITING_BT_LENGTH:  ret = readBtLength ( msgs, inbuf, n ); break;
1442        case AWAITING_BT_ID:      ret = readBtId     ( msgs, inbuf, n ); break;
1443        case AWAITING_BT_MESSAGE: ret = readBtMessage( msgs, inbuf, n ); break;
1444        case AWAITING_BT_PIECE:   ret = readBtPiece  ( msgs, inbuf, n ); break;
1445        default: assert( 0 );
1446    }
1447
1448    return ret;
1449}
1450
1451static void
1452sendKeepalive( tr_peermsgs * msgs )
1453{
1454    dbgmsg( msgs, "sending a keepalive message" );
1455    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1456}
1457
1458/**
1459***
1460**/
1461
1462static int
1463isSwiftEnabled( const tr_peermsgs * msgs )
1464{
1465    /* rationale: SWIFT is good for getting rid of deadbeats, but most
1466     * private trackers have ratios where you _want_ to feed deadbeats
1467     * as much as possible.  So we disable SWIFT on private torrents */
1468    return SWIFT_ENABLED
1469        && !tr_torrentIsSeed( msgs->torrent )
1470        && !tr_torrentIsPrivate( msgs->torrent );
1471}
1472
1473static size_t
1474getUploadMax( const tr_peermsgs * msgs )
1475{
1476    static const size_t maxval = ~0;
1477    const tr_torrent * tor = msgs->torrent;
1478    const int useSwift = isSwiftEnabled( msgs );
1479    const size_t swiftLeft = msgs->info->credit;
1480    size_t speedLeft;
1481    size_t bufLeft;
1482    size_t ret;
1483
1484    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1485        speedLeft = tor->handle->useUploadLimit ? tr_rcBytesLeft( tor->handle->upload ) : maxval;
1486    else if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
1487        speedLeft = tr_rcBytesLeft( tor->upload );
1488    else
1489        speedLeft = ~0;
1490
1491    bufLeft = MAX_OUTBUF_SIZE - tr_peerIoWriteBytesWaiting( msgs->io );
1492    ret = MIN( speedLeft, bufLeft );
1493    if( useSwift)
1494        ret = MIN( ret, swiftLeft );
1495    return ret;
1496}
1497
1498static int
1499ratePulse( void * vmsgs )
1500{
1501    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1502    msgs->info->rateToClient = tr_rcRate( msgs->info->rcToClient );
1503    msgs->info->rateToPeer = tr_rcRate( msgs->info->rcToPeer );
1504    msgs->maxActiveRequests = MIN( 8 + (int)(msgs->info->rateToClient/5), 100 );
1505    msgs->minActiveRequests = msgs->maxActiveRequests / 3;
1506    return TRUE;
1507}
1508
1509static struct peer_request*
1510popNextRequest( tr_peermsgs * msgs )
1511{
1512    struct peer_request * ret;
1513    ret = tr_list_pop_front( &msgs->peerAskedForFast );
1514    if( !ret )
1515        ret = tr_list_pop_front( &msgs->peerAskedFor);
1516    return ret;
1517}
1518
1519static void
1520updatePeerStatus( tr_peermsgs * msgs )
1521{
1522    const time_t now = time( NULL );
1523    tr_peer * peer = msgs->info;
1524    tr_peer_status status = 0;
1525
1526    if( !msgs->peerSentBitfield )
1527        status |= TR_PEER_STATUS_HANDSHAKE;
1528
1529    if( msgs->info->peerIsChoked )
1530        status |= TR_PEER_STATUS_PEER_IS_CHOKED;
1531
1532    if( msgs->info->peerIsInterested )
1533        status |= TR_PEER_STATUS_PEER_IS_INTERESTED;
1534
1535    if( msgs->info->clientIsChoked )
1536        status |= TR_PEER_STATUS_CLIENT_IS_CHOKED;
1537
1538    if( msgs->info->clientIsInterested )
1539        status |= TR_PEER_STATUS_CLIENT_IS_INTERESTED;
1540
1541    if( ( now - msgs->clientSentPieceDataAt ) < 3 )
1542        status |= TR_PEER_STATUS_CLIENT_IS_SENDING;
1543
1544    if( ( now - msgs->peerSentPieceDataAt ) < 3 )
1545        status |= TR_PEER_STATUS_PEER_IS_SENDING;
1546
1547    if( msgs->clientAskedFor != NULL )
1548        status |= TR_PEER_STATUS_CLIENT_SENT_REQUEST;
1549
1550    peer->status = status;
1551}
1552
1553static int
1554pulse( void * vmsgs )
1555{
1556    const time_t now = time( NULL );
1557    tr_peermsgs * msgs = vmsgs;
1558    struct peer_request * r;
1559
1560    tr_peerIoTryRead( msgs->io );
1561    pumpRequestQueue( msgs );
1562    updatePeerStatus( msgs );
1563
1564    if( msgs->sendingBlock )
1565    {
1566        const size_t uploadMax = getUploadMax( msgs );
1567        size_t len = EVBUFFER_LENGTH( msgs->outBlock );
1568        const size_t outlen = MIN( len, uploadMax );
1569
1570        assert( len );
1571
1572        if( outlen )
1573        {
1574            tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen );
1575            evbuffer_drain( msgs->outBlock, outlen );
1576            peerGotBytes( msgs, outlen );
1577
1578            len -= outlen;
1579            msgs->clientSentAnythingAt = now;
1580            msgs->sendingBlock = len!=0;
1581
1582            dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen, (int)len );
1583        }
1584        else dbgmsg( msgs, "stalled writing block... uploadMax %lu, outlen %lu", uploadMax, outlen );
1585    }
1586
1587    if( !msgs->sendingBlock )
1588    {
1589        if(( EVBUFFER_LENGTH( msgs->outMessages ) ))
1590        {
1591            dbgmsg( msgs, "flushing outMessages..." );
1592            tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1593            msgs->clientSentAnythingAt = now;
1594        }
1595        else if( !EVBUFFER_LENGTH( msgs->outBlock )
1596            && (( r = popNextRequest( msgs )))
1597            && requestIsValid( msgs, r )
1598            && tr_cpPieceIsComplete( msgs->torrent->completion, r->index ) )
1599        {
1600            uint8_t * buf = tr_new( uint8_t, r->length );
1601
1602            if( !tr_ioRead( msgs->torrent, r->index, r->offset, r->length, buf ) )
1603            {
1604                tr_peerIo * io = msgs->io;
1605                struct evbuffer * out = msgs->outBlock;
1606
1607                dbgmsg( msgs, "sending block %u:%u->%u", r->index, r->offset, r->length );
1608                tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 2*sizeof(uint32_t) + r->length );
1609                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1610                tr_peerIoWriteUint32( io, out, r->index );
1611                tr_peerIoWriteUint32( io, out, r->offset );
1612                tr_peerIoWriteBytes ( io, out, buf, r->length );
1613                msgs->sendingBlock = 1;
1614            }
1615
1616            tr_free( buf );
1617            tr_free( r );
1618        }
1619        else if( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
1620        {
1621            sendKeepalive( msgs );
1622        }
1623    }
1624
1625    return TRUE; /* loop forever */
1626}
1627
1628static void
1629gotError( struct bufferevent * evbuf UNUSED, short what, void * vmsgs )
1630{
1631    if( what & EVBUFFER_TIMEOUT )
1632        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1633
1634    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) ) {
1635        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1636                what, errno, strerror(errno) );
1637        fireGotError( vmsgs );
1638    }
1639}
1640
1641static void
1642sendBitfield( tr_peermsgs * msgs )
1643{
1644    const tr_bitfield * bitfield = tr_cpPieceBitfield( msgs->torrent->completion );
1645    struct evbuffer * out = msgs->outMessages;
1646
1647    dbgmsg( msgs, "sending peer a bitfield message" );
1648    tr_peerIoWriteUint32( msgs->io, out, sizeof(uint8_t) + bitfield->len );
1649    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1650    tr_peerIoWriteBytes ( msgs->io, out, bitfield->bits, bitfield->len );
1651}
1652
1653/**
1654***
1655**/
1656
1657/* some peers give us error messages if we send
1658   more than this many peers in a single pex message */
1659#define MAX_PEX_DIFFS 200
1660
1661typedef struct
1662{
1663    tr_pex * added;
1664    tr_pex * dropped;
1665    tr_pex * elements;
1666    int addedCount;
1667    int droppedCount;
1668    int elementCount;
1669    int diffCount;
1670}
1671PexDiffs;
1672
1673static void
1674pexAddedCb( void * vpex, void * userData )
1675{
1676    PexDiffs * diffs = (PexDiffs *) userData;
1677    tr_pex * pex = (tr_pex *) vpex;
1678    if( diffs->diffCount < MAX_PEX_DIFFS )
1679    {
1680        diffs->diffCount++;
1681        diffs->added[diffs->addedCount++] = *pex;
1682        diffs->elements[diffs->elementCount++] = *pex;
1683    }
1684}
1685
1686static void
1687pexRemovedCb( void * vpex, void * userData )
1688{
1689    PexDiffs * diffs = (PexDiffs *) userData;
1690    tr_pex * pex = (tr_pex *) vpex;
1691    if( diffs->diffCount < MAX_PEX_DIFFS )
1692    {
1693        diffs->diffCount++;
1694        diffs->dropped[diffs->droppedCount++] = *pex;
1695    }
1696}
1697
1698static void
1699pexElementCb( void * vpex, void * userData )
1700{
1701    PexDiffs * diffs = (PexDiffs *) userData;
1702    tr_pex * pex = (tr_pex *) vpex;
1703    if( diffs->diffCount < MAX_PEX_DIFFS )
1704    {
1705        diffs->diffCount++;
1706        diffs->elements[diffs->elementCount++] = *pex;
1707    }
1708}
1709
1710static void
1711sendPex( tr_peermsgs * msgs )
1712{
1713    if( msgs->peerSupportsPex && tr_torrentIsPexEnabled( msgs->torrent ) )
1714    {
1715        int i;
1716        tr_pex * newPex = NULL;
1717        const int newCount = tr_peerMgrGetPeers( msgs->handle->peerMgr, msgs->torrent->info.hash, &newPex );
1718        PexDiffs diffs;
1719        benc_val_t val, *added, *dropped, *flags;
1720        uint8_t *tmp, *walk;
1721        char * benc;
1722        int bencLen;
1723
1724        /* build the diffs */
1725        diffs.added = tr_new( tr_pex, newCount );
1726        diffs.addedCount = 0;
1727        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1728        diffs.droppedCount = 0;
1729        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1730        diffs.elementCount = 0;
1731        diffs.diffCount = 0;
1732        tr_set_compare( msgs->pex, msgs->pexCount,
1733                        newPex, newCount,
1734                        tr_pexCompare, sizeof(tr_pex),
1735                        pexRemovedCb, pexAddedCb, pexElementCb, &diffs );
1736        dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1737
1738        /* update peer */
1739        tr_free( msgs->pex );
1740        msgs->pex = diffs.elements;
1741        msgs->pexCount = diffs.elementCount;
1742
1743        /* build the pex payload */
1744        tr_bencInit( &val, TYPE_DICT );
1745        tr_bencDictReserve( &val, 3 );
1746
1747        /* "added" */
1748        added = tr_bencDictAdd( &val, "added" );
1749        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1750        for( i=0; i<diffs.addedCount; ++i ) {
1751            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1752            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1753        }
1754        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1755        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
1756
1757        /* "added.f" */
1758        flags = tr_bencDictAdd( &val, "added.f" );
1759        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1760        for( i=0; i<diffs.addedCount; ++i )
1761            *walk++ = diffs.added[i].flags;
1762        assert( ( walk - tmp ) == diffs.addedCount );
1763        tr_bencInitStr( flags, tmp, walk-tmp, FALSE );
1764
1765        /* "dropped" */
1766        dropped = tr_bencDictAdd( &val, "dropped" );
1767        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1768        for( i=0; i<diffs.droppedCount; ++i ) {
1769            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1770            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1771        }
1772        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1773        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
1774
1775        /* write the pex message */
1776        benc = tr_bencSave( &val, &bencLen );
1777        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 2*sizeof(uint8_t) + bencLen );
1778        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_LTEP );
1779        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, msgs->ut_pex_id );
1780        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, benc, bencLen );
1781
1782        /* cleanup */
1783        tr_free( benc );
1784        tr_bencFree( &val );
1785        tr_free( diffs.added );
1786        tr_free( diffs.dropped );
1787        tr_free( newPex );
1788
1789        msgs->clientSentPexAt = time( NULL );
1790    }
1791}
1792
1793static int
1794pexPulse( void * vpeer )
1795{
1796    sendPex( vpeer );
1797    return TRUE;
1798}
1799
1800/**
1801***
1802**/
1803
1804tr_peermsgs*
1805tr_peerMsgsNew( struct tr_torrent * torrent,
1806                struct tr_peer    * info,
1807                tr_delivery_func    func,
1808                void              * userData,
1809                tr_publisher_tag  * setme )
1810{
1811    tr_peermsgs * m;
1812
1813    assert( info != NULL );
1814    assert( info->io != NULL );
1815
1816    m = tr_new0( tr_peermsgs, 1 );
1817    m->publisher = tr_publisherNew( );
1818    m->info = info;
1819    m->handle = torrent->handle;
1820    m->torrent = torrent;
1821    m->io = info->io;
1822    m->info->status = TR_PEER_STATUS_HANDSHAKE;
1823    m->info->clientIsChoked = 1;
1824    m->info->peerIsChoked = 1;
1825    m->info->clientIsInterested = 0;
1826    m->info->peerIsInterested = 0;
1827    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1828    m->state = AWAITING_BT_LENGTH;
1829    m->pulseTimer = tr_timerNew( m->handle, pulse, m, PEER_PULSE_INTERVAL );
1830    m->rateTimer = tr_timerNew( m->handle, ratePulse, m, RATE_PULSE_INTERVAL );
1831    m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL );
1832    m->outMessages = evbuffer_new( );
1833    m->incoming.block = evbuffer_new( );
1834    m->outBlock = evbuffer_new( );
1835    m->peerAllowedPieces = NULL;
1836    m->clientAllowedPieces = NULL;
1837    m->clientSuggestedPieces = NULL;
1838    *setme = tr_publisherSubscribe( m->publisher, func, userData );
1839   
1840    if ( tr_peerIoSupportsFEXT( m->io ) )
1841    {
1842        /* This peer is fastpeer-enabled, generate its allowed set
1843         * (before registering our callbacks) */
1844        if ( !m->peerAllowedPieces ) {
1845            const struct in_addr *peerAddr = tr_peerIoGetAddress( m->io, NULL );
1846           
1847            m->peerAllowedPieces = tr_peerMgrGenerateAllowedSet( MAX_FAST_ALLOWED_COUNT,
1848                                                                 m->torrent->info.pieceCount,
1849                                                                 m->torrent->info.hash,
1850                                                                 peerAddr );
1851        }
1852        m->clientAllowedPieces = tr_bitfieldNew( m->torrent->info.pieceCount );
1853       
1854        m->clientSuggestedPieces = tr_bitfieldNew( m->torrent->info.pieceCount );
1855    }
1856   
1857    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of inactivity */
1858    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
1859    ratePulse( m );
1860
1861    if ( tr_peerIoSupportsLTEP( m->io ) )
1862        sendLtepHandshake( m );
1863
1864    if ( !tr_peerIoSupportsFEXT( m->io ) )
1865        sendBitfield( m );
1866    else {
1867        /* This peer is fastpeer-enabled, send it have-all or have-none if appropriate */
1868        uint32_t peerProgress;
1869        float completion = tr_cpPercentComplete( m->torrent->completion );
1870        if ( completion == 0.0f ) {
1871            sendFastHave( m, 0 );
1872        } else if ( completion == 1.0f ) {
1873            sendFastHave( m, 1 );
1874        } else {
1875            sendBitfield( m );
1876        }
1877        peerProgress = m->torrent->info.pieceCount * m->info->progress;
1878       
1879        if ( peerProgress < MAX_FAST_ALLOWED_COUNT )
1880            sendFastAllowedSet( m );
1881    }
1882
1883    return m;
1884}
1885
1886void
1887tr_peerMsgsFree( tr_peermsgs* msgs )
1888{
1889    if( msgs != NULL )
1890    {
1891        tr_timerFree( &msgs->pulseTimer );
1892        tr_timerFree( &msgs->rateTimer );
1893        tr_timerFree( &msgs->pexTimer );
1894        tr_publisherFree( &msgs->publisher );
1895        tr_list_free( &msgs->clientWillAskFor, tr_free );
1896        tr_list_free( &msgs->clientAskedFor, tr_free );
1897        tr_list_free( &msgs->peerAskedForFast, tr_free );
1898        tr_list_free( &msgs->peerAskedFor, tr_free );
1899        tr_bitfieldFree( msgs->peerAllowedPieces );
1900        tr_bitfieldFree( msgs->clientAllowedPieces );
1901        tr_bitfieldFree( msgs->clientSuggestedPieces );
1902        evbuffer_free( msgs->incoming.block );
1903        evbuffer_free( msgs->outMessages );
1904        evbuffer_free( msgs->outBlock );
1905        tr_free( msgs->pex );
1906
1907        memset( msgs, ~0, sizeof( tr_peermsgs ) );
1908        tr_free( msgs );
1909    }
1910}
1911
1912tr_publisher_tag
1913tr_peerMsgsSubscribe( tr_peermsgs       * peer,
1914                      tr_delivery_func    func,
1915                      void              * userData )
1916{
1917    return tr_publisherSubscribe( peer->publisher, func, userData );
1918}
1919
1920void
1921tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
1922                        tr_publisher_tag    tag )
1923{
1924    tr_publisherUnsubscribe( peer->publisher, tag );
1925}
1926
1927int
1928tr_peerMsgsIsPieceFastAllowed( const tr_peermsgs * peer,
1929                               uint32_t            index )
1930{
1931    return tr_bitfieldHas( peer->clientAllowedPieces, index );
1932}
1933
1934int
1935tr_peerMsgsIsPieceSuggested( const tr_peermsgs * peer,
1936                             uint32_t            index )
1937{
1938    return tr_bitfieldHas( peer->clientSuggestedPieces, index );
1939}
1940
Note: See TracBrowser for help on using the repository browser.