source: trunk/libtransmission/peer-msgs.c @ 3871

Last change on this file since 3871 was 3871, checked in by charles, 15 years ago

this should improve download speeds. test it!

  • Property svn:keywords set to Date Rev Author Id
File size: 54.7 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 3871 2007-11-18 03:18:26Z charles $
11 */
12
13#include <assert.h>
14#include <ctype.h>
15#include <errno.h>
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19#include <libgen.h> /* basename */
20
21#include <netinet/in.h> /* struct in_addr */
22
23#include <event.h>
24
25#include "transmission.h"
26#include "bencode.h"
27#include "completion.h"
28#include "inout.h"
29#include "list.h"
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-mgr-private.h"
33#include "peer-msgs.h"
34#include "ratecontrol.h"
35#include "trevent.h"
36#include "utils.h"
37
38/**
39***
40**/
41
42enum
43{
44    BT_CHOKE                = 0,
45    BT_UNCHOKE              = 1,
46    BT_INTERESTED           = 2,
47    BT_NOT_INTERESTED       = 3,
48    BT_HAVE                 = 4,
49    BT_BITFIELD             = 5,
50    BT_REQUEST              = 6,
51    BT_PIECE                = 7,
52    BT_CANCEL               = 8,
53    BT_PORT                 = 9,
54    BT_SUGGEST              = 13,
55    BT_HAVE_ALL             = 14,
56    BT_HAVE_NONE            = 15,
57    BT_REJECT               = 16,
58    BT_ALLOWED_FAST         = 17,
59    BT_LTEP                 = 20,
60
61    LTEP_HANDSHAKE          = 0,
62
63    OUR_LTEP_PEX            = 1,
64
65    MAX_REQUEST_BYTE_COUNT  = (16 * 1024), /* drop requests who want too much */
66
67    /* idle seconds before we send a keepalive */
68    KEEPALIVE_INTERVAL_SECS = 90,
69
70    PEX_INTERVAL            = (60 * 1000), /* msec between calls to sendPex() */
71    PEER_PULSE_INTERVAL     = (250),       /* msec between calls to pulse() */
72    RATE_PULSE_INTERVAL     = (333),       /* msec between calls to ratePulse() */
73     
74    /* Fast Peers Extension constants */
75    MAX_FAST_ALLOWED_COUNT   = 10,          /* max. number of pieces we fast-allow to another peer */
76    MAX_FAST_ALLOWED_THRESHOLD = 10,        /* max threshold for allowing fast-pieces requests */
77
78};
79
80enum
81{
82    AWAITING_BT_LENGTH,
83    AWAITING_BT_ID,
84    AWAITING_BT_MESSAGE,
85    AWAITING_BT_PIECE
86};
87
88struct peer_request
89{
90    uint32_t index;
91    uint32_t offset;
92    uint32_t length;
93    time_t time_requested;
94};
95
96static int
97compareRequest( const void * va, const void * vb )
98{
99    int i;
100    const struct peer_request * a = va;
101    const struct peer_request * b = vb;
102    if(( i = tr_compareUint32( a->index, b->index ))) return i;
103    if(( i = tr_compareUint32( a->offset, b->offset ))) return i;
104    if(( i = tr_compareUint32( a->length, b->length ))) return i;
105    return 0;
106}
107
108/* this is raw, unchanged data from the peer regarding
109 * the current message that it's sending us. */
110struct tr_incoming
111{
112    uint32_t length; /* includes the +1 for id length */
113    uint8_t id;
114    struct peer_request blockReq; /* metadata for incoming blocks */
115    struct evbuffer * block; /* piece data for incoming blocks */
116};
117
118struct tr_peermsgs
119{
120    tr_peer * info;
121
122    tr_handle * handle;
123    tr_torrent * torrent;
124    tr_peerIo * io;
125
126    tr_publisher_t * publisher;
127
128    struct evbuffer * outBlock;    /* buffer of all the current piece message */
129    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
130    tr_list * peerAskedFor;
131    tr_list * peerAskedForFast;
132    tr_list * clientAskedFor;
133    tr_list * clientWillAskFor;
134
135    tr_timer * rateTimer;
136    tr_timer * pulseTimer;
137    tr_timer * pexTimer;
138
139    time_t lastReqAddedAt;
140    time_t clientSentPexAt;
141    time_t clientSentAnythingAt;
142
143    unsigned int peerSentBitfield         : 1;
144    unsigned int peerSupportsPex          : 1;
145    unsigned int clientSentLtepHandshake  : 1;
146    unsigned int peerSentLtepHandshake    : 1;
147    unsigned int sendingBlock             : 1;
148   
149    tr_bitfield * clientAllowedPieces;
150    tr_bitfield * peerAllowedPieces;
151   
152    tr_bitfield * clientSuggestedPieces;
153   
154    uint8_t state;
155    uint8_t ut_pex_id;
156    uint16_t pexCount;
157    uint32_t maxActiveRequests;
158    uint32_t minActiveRequests;
159
160    struct tr_incoming incoming; 
161
162    tr_pex * pex;
163};
164
165/**
166***
167**/
168
169static void
170myDebug( const char * file, int line,
171         const struct tr_peermsgs * msgs,
172         const char * fmt, ... )
173{
174    FILE * fp = tr_getLog( );
175    if( fp != NULL )
176    {
177        va_list args;
178        char timestr[64];
179        struct evbuffer * buf = evbuffer_new( );
180        char * myfile = tr_strdup( file );
181
182        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
183                             tr_getLogTimeStr( timestr, sizeof(timestr) ),
184                             msgs->torrent->info.name,
185                             tr_peerIoGetAddrStr( msgs->io ),
186                             msgs->info->client );
187        va_start( args, fmt );
188        evbuffer_add_vprintf( buf, fmt, args );
189        va_end( args );
190        evbuffer_add_printf( buf, " (%s:%d)\n", basename(myfile), line );
191        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
192
193        tr_free( myfile );
194        evbuffer_free( buf );
195    }
196}
197
198#define dbgmsg(msgs, fmt...) myDebug(__FILE__, __LINE__, msgs, ##fmt )
199
200/**
201***
202**/
203
204static void
205protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
206{
207    tr_peerIo * io = msgs->io;
208    struct evbuffer * out = msgs->outMessages;
209
210    dbgmsg( msgs, "requesting %u:%u->%u", req->index, req->offset, req->length );
211    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
212    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
213    tr_peerIoWriteUint32( io, out, req->index );
214    tr_peerIoWriteUint32( io, out, req->offset );
215    tr_peerIoWriteUint32( io, out, req->length );
216}
217
218static void
219protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
220{
221    tr_peerIo * io = msgs->io;
222    struct evbuffer * out = msgs->outMessages;
223
224    dbgmsg( msgs, "cancelling %u:%u->%u", req->index, req->offset, req->length );
225    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
226    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
227    tr_peerIoWriteUint32( io, out, req->index );
228    tr_peerIoWriteUint32( io, out, req->offset );
229    tr_peerIoWriteUint32( io, out, req->length );
230}
231
232static void
233protocolSendHave( tr_peermsgs * msgs, uint32_t index )
234{
235    tr_peerIo * io = msgs->io;
236    struct evbuffer * out = msgs->outMessages;
237
238    dbgmsg( msgs, "sending Have %u", index );
239    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
240    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
241    tr_peerIoWriteUint32( io, out, index );
242}
243
244static void
245protocolSendChoke( tr_peermsgs * msgs, int choke )
246{
247    tr_peerIo * io = msgs->io;
248    struct evbuffer * out = msgs->outMessages;
249
250    dbgmsg( msgs, "sending %s", (choke ? "Choke" : "Unchoke") );
251    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) );
252    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
253}
254
255/**
256***  EVENTS
257**/
258
259static const tr_peermsgs_event blankEvent = { 0, 0, 0, 0, 0.0f };
260
261static void
262publish( tr_peermsgs * msgs, tr_peermsgs_event * e )
263{
264    tr_publisherPublish( msgs->publisher, msgs->info, e );
265}
266
267static void
268fireGotAssertError( tr_peermsgs * msgs )
269{
270    tr_peermsgs_event e = blankEvent;
271    e.eventType = TR_PEERMSG_GOT_ASSERT_ERROR;
272    publish( msgs, &e );
273}
274
275static void
276fireGotError( tr_peermsgs * msgs )
277{
278    tr_peermsgs_event e = blankEvent;
279    e.eventType = TR_PEERMSG_GOT_ERROR;
280    publish( msgs, &e );
281}
282
283static void
284fireNeedReq( tr_peermsgs * msgs )
285{
286    tr_peermsgs_event e = blankEvent;
287    e.eventType = TR_PEERMSG_NEED_REQ;
288    publish( msgs, &e );
289}
290
291static void
292firePeerProgress( tr_peermsgs * msgs )
293{
294    tr_peermsgs_event e = blankEvent;
295    e.eventType = TR_PEERMSG_PEER_PROGRESS;
296    e.progress = msgs->info->progress;
297    publish( msgs, &e );
298}
299
300static void
301fireClientHave( tr_peermsgs * msgs, uint32_t pieceIndex )
302{
303    tr_peermsgs_event e = blankEvent;
304    e.eventType = TR_PEERMSG_CLIENT_HAVE;
305    e.pieceIndex = pieceIndex;
306    publish( msgs, &e );
307}
308
309static void
310fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
311{
312    tr_peermsgs_event e = blankEvent;
313    e.eventType = TR_PEERMSG_CLIENT_BLOCK;
314    e.pieceIndex = req->index;
315    e.offset = req->offset;
316    e.length = req->length;
317    publish( msgs, &e );
318}
319
320static void
321fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
322{
323    tr_peermsgs_event e = blankEvent;
324    e.eventType = TR_PEERMSG_CANCEL;
325    e.pieceIndex = req->index;
326    e.offset = req->offset;
327    e.length = req->length;
328    publish( msgs, &e );
329}
330
331/**
332***  INTEREST
333**/
334
335static int
336isPieceInteresting( const tr_peermsgs   * peer,
337                    int                   piece )
338{
339    const tr_torrent * torrent = peer->torrent;
340    if( torrent->info.pieces[piece].dnd ) /* we don't want it */
341        return FALSE;
342    if( tr_cpPieceIsComplete( torrent->completion, piece ) ) /* we have it */
343        return FALSE;
344    if( !tr_bitfieldHas( peer->info->have, piece ) ) /* peer doesn't have it */
345        return FALSE;
346    if( tr_bitfieldHas( peer->info->banned, piece ) ) /* peer is banned */
347        return FALSE;
348    return TRUE;
349}
350
351/* "interested" means we'll ask for piece data if they unchoke us */
352static int
353isPeerInteresting( const tr_peermsgs * msgs )
354{
355    int i;
356    const tr_torrent * torrent;
357    const tr_bitfield * bitfield;
358    const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
359
360    if( clientIsSeed )
361        return FALSE;
362
363    torrent = msgs->torrent;
364    bitfield = tr_cpPieceBitfield( torrent->completion );
365
366    if( !msgs->info->have )
367        return TRUE;
368
369    assert( bitfield->len == msgs->info->have->len );
370    for( i=0; i<torrent->info.pieceCount; ++i )
371        if( isPieceInteresting( msgs, i ) )
372            return TRUE;
373
374    return FALSE;
375}
376
377static void
378sendInterest( tr_peermsgs * msgs, int weAreInterested )
379{
380    assert( msgs != NULL );
381    assert( weAreInterested==0 || weAreInterested==1 );
382
383    msgs->info->clientIsInterested = weAreInterested;
384    dbgmsg( msgs, "Sending %s",
385            weAreInterested ? "Interested" : "Not Interested");
386
387    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
388    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages,
389                   weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
390}
391
392static void
393updateInterest( tr_peermsgs * msgs )
394{
395    const int i = isPeerInteresting( msgs );
396    if( i != msgs->info->clientIsInterested )
397        sendInterest( msgs, i );
398    if( i )
399        fireNeedReq( msgs );
400}
401
402#define MIN_CHOKE_PERIOD_SEC 10
403
404static void
405cancelAllRequestsToClientExceptFast( tr_peermsgs * msgs )
406{
407    tr_list_free( &msgs->peerAskedFor, tr_free );
408}
409
410void
411tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
412{
413    const time_t fibrillationTime = time(NULL) - MIN_CHOKE_PERIOD_SEC;
414
415    assert( msgs != NULL );
416    assert( msgs->info != NULL );
417    assert( choke==0 || choke==1 );
418
419    if( msgs->info->chokeChangedAt > fibrillationTime )
420    {
421        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
422    }
423    else if( msgs->info->peerIsChoked != choke )
424    {
425        msgs->info->peerIsChoked = choke;
426        if( choke )
427            cancelAllRequestsToClientExceptFast( msgs );
428        protocolSendChoke( msgs, choke );
429        msgs->info->chokeChangedAt = time( NULL );
430    }
431}
432
433/**
434***
435**/
436
437void
438tr_peerMsgsHave( tr_peermsgs * msgs,
439                 uint32_t      index )
440{
441    protocolSendHave( msgs, index );
442
443    /* since we have more pieces now, we might not be interested in this peer */
444    updateInterest( msgs );
445}
446#if 0
447static void
448sendFastSuggest( tr_peermsgs * msgs,
449                 uint32_t      pieceIndex )
450{
451    assert( msgs != NULL );
452   
453    if( tr_peerIoSupportsFEXT( msgs->io ) )
454    {
455        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
456        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
457        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
458    }
459}
460#endif
461static void
462sendFastHave( tr_peermsgs * msgs, int all )
463{
464    assert( msgs != NULL );
465   
466    if( tr_peerIoSupportsFEXT( msgs->io ) )
467    {
468        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
469        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, ( all ? BT_HAVE_ALL
470                                                                : BT_HAVE_NONE ) );
471        updateInterest( msgs );
472    }
473}
474
475static void
476sendFastReject( tr_peermsgs * msgs,
477                uint32_t      pieceIndex,
478                uint32_t      offset,
479                uint32_t      length )
480{
481    assert( msgs != NULL );
482
483    if( tr_peerIoSupportsFEXT( msgs->io ) )
484    {
485        const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
486        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
487        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_REJECT );
488        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
489        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
490        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
491    }
492}
493
494static void
495sendFastAllowed( tr_peermsgs * msgs,
496                 uint32_t      pieceIndex)
497{
498    assert( msgs != NULL );
499   
500    if( tr_peerIoSupportsFEXT( msgs->io ) )
501    {
502        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
503        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_ALLOWED_FAST );
504        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
505    }
506}
507
508
509
510static void
511sendFastAllowedSet( tr_peermsgs * msgs )
512{
513    int i = 0;
514    while (i <= msgs->torrent->info.pieceCount )
515    {
516        if ( tr_bitfieldHas( msgs->peerAllowedPieces, i) )
517            sendFastAllowed( msgs, i );
518        i++;
519    }
520}
521
522
523/**
524***
525**/
526
527static int
528reqIsValid( const tr_peermsgs   * msgs,
529            uint32_t              index,
530            uint32_t              offset,
531            uint32_t              length )
532{
533    const tr_torrent * tor = msgs->torrent;
534
535    if( index >= (uint32_t) tor->info.pieceCount )
536        return FALSE;
537    if ( (int)offset >= tr_torPieceCountBytes( tor, (int)index ) )
538        return FALSE;
539    if( length > MAX_REQUEST_BYTE_COUNT )
540        return FALSE;
541    if( tr_pieceOffset( tor, index, offset, length ) > tor->info.totalSize )
542        return FALSE;
543
544    return TRUE;
545}
546
547static int
548requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
549{
550    return reqIsValid( msgs, req->index, req->offset, req->length );
551}
552
553static void
554pumpRequestQueue( tr_peermsgs * msgs )
555{
556    const int max = msgs->maxActiveRequests;
557    const int min = msgs->minActiveRequests;
558    int count = tr_list_size( msgs->clientAskedFor );
559    int sent = 0;
560
561    if( count > min )
562        return;
563    if( msgs->info->clientIsChoked )
564        return;
565
566    while( ( count < max ) && ( msgs->clientWillAskFor != NULL ) )
567    {
568        struct peer_request * r = tr_list_pop_front( &msgs->clientWillAskFor );
569        assert( requestIsValid( msgs, r ) );
570        assert( tr_bitfieldHas( msgs->info->have, r->index ) );
571        protocolSendRequest( msgs, r );
572        r->time_requested = msgs->lastReqAddedAt = time( NULL );
573        tr_list_append( &msgs->clientAskedFor, r );
574        ++count;
575        ++sent;
576    }
577
578    if( sent )
579        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
580                sent,
581                tr_list_size(msgs->clientAskedFor),
582                tr_list_size(msgs->clientWillAskFor) );
583
584    if( count < max )
585        fireNeedReq( msgs );
586}
587
588static int
589pulse( void * vmsgs );
590
591int
592tr_peerMsgsAddRequest( tr_peermsgs * msgs,
593                       uint32_t      index, 
594                       uint32_t      offset, 
595                       uint32_t      length )
596{
597    const int req_max = msgs->maxActiveRequests;
598    struct peer_request tmp, *req;
599
600    assert( msgs != NULL );
601    assert( msgs->torrent != NULL );
602    assert( reqIsValid( msgs, index, offset, length ) );
603
604    /**
605    ***  Reasons to decline the request
606    **/
607
608    /* don't send requests to choked clients */
609    if( msgs->info->clientIsChoked ) {
610        dbgmsg( msgs, "declining request because they're choking us" );
611        return TR_ADDREQ_CLIENT_CHOKED;
612    }
613
614    /* peer doesn't have this piece */
615    if( !tr_bitfieldHas( msgs->info->have, index ) )
616        return TR_ADDREQ_MISSING;
617
618    /* peer's queue is full */
619    if( tr_list_size( msgs->clientWillAskFor ) >= req_max ) {
620        dbgmsg( msgs, "declining request because we're full" );
621        return TR_ADDREQ_FULL;
622    }
623
624    /* have we already asked for this piece? */
625    tmp.index = index;
626    tmp.offset = offset;
627    tmp.length = length;
628    if( tr_list_find( msgs->clientAskedFor, &tmp, compareRequest ) ) {
629        dbgmsg( msgs, "declining because it's a duplicate" );
630        return TR_ADDREQ_DUPLICATE;
631    }
632    if( tr_list_find( msgs->clientWillAskFor, &tmp, compareRequest ) ) {
633        dbgmsg( msgs, "declining because it's a duplicate" );
634        return TR_ADDREQ_DUPLICATE;
635    }
636
637    /**
638    ***  Accept this request
639    **/
640
641    dbgmsg( msgs, "added req for piece %d, offset %d", (int)index, (int)offset );
642    req = tr_new0( struct peer_request, 1 );
643    *req = tmp;
644    tr_list_append( &msgs->clientWillAskFor, req );
645    return TR_ADDREQ_OK;
646}
647
648static void
649cancelAllRequestsToPeer( tr_peermsgs * msgs )
650{
651    struct peer_request * req;
652
653    while(( req = tr_list_pop_front( &msgs->clientWillAskFor ) ))
654    {
655        fireCancelledReq( msgs, req );
656        tr_free( req );
657    }
658
659    while(( req = tr_list_pop_front( &msgs->clientAskedFor ) ))
660    {
661        fireCancelledReq( msgs, req );
662        protocolSendCancel( msgs, req );
663        tr_free( req );
664    }
665}
666
667void
668tr_peerMsgsCancel( tr_peermsgs * msgs,
669                   uint32_t      pieceIndex,
670                   uint32_t      offset,
671                   uint32_t      length )
672{
673    struct peer_request *req, tmp;
674
675    assert( msgs != NULL );
676    assert( length > 0 );
677
678    /* have we asked the peer for this piece? */
679    tmp.index = pieceIndex;
680    tmp.offset = offset;
681    tmp.length = length;
682
683    /* if it's only in the queue and hasn't been sent yet, free it */
684    if(( req = tr_list_remove( &msgs->clientWillAskFor, &tmp, compareRequest ) ))
685    {
686        fireCancelledReq( msgs, req );
687        tr_free( req );
688    }
689
690    /* if it's already been sent, send a cancel message too */
691    if(( req = tr_list_remove( &msgs->clientAskedFor, &tmp, compareRequest ) ))
692    {
693        protocolSendCancel( msgs, req );
694        fireCancelledReq( msgs, req );
695        tr_free( req );
696    }
697}
698
699/**
700***
701**/
702
703static void
704sendLtepHandshake( tr_peermsgs * msgs )
705{
706    benc_val_t val, *m;
707    char * buf;
708    int len;
709    int pex;
710    const char * v = TR_NAME " " USERAGENT_PREFIX;
711    const int port = tr_getPublicPort( msgs->handle );
712    struct evbuffer * outbuf;
713
714    if( msgs->clientSentLtepHandshake )
715        return;
716
717    outbuf = evbuffer_new( );
718    dbgmsg( msgs, "sending an ltep handshake" );
719    msgs->clientSentLtepHandshake = 1;
720
721    /* decide if we want to advertise pex support */
722    if( !tr_torrentIsPexEnabled( msgs->torrent ) )
723        pex = 0;
724    else if( msgs->peerSentLtepHandshake )
725        pex = msgs->peerSupportsPex ? 1 : 0;
726    else
727        pex = 1;
728
729    tr_bencInit( &val, TYPE_DICT );
730    tr_bencDictReserve( &val, 4 );
731    tr_bencInitInt( tr_bencDictAdd( &val, "e" ), 1 );
732    m  = tr_bencDictAdd( &val, "m" );
733    tr_bencInit( m, TYPE_DICT );
734    if( pex ) {
735        tr_bencDictReserve( m, 1 );
736        tr_bencInitInt( tr_bencDictAdd( m, "ut_pex" ), OUR_LTEP_PEX );
737    }
738    if( port > 0 )
739        tr_bencInitInt( tr_bencDictAdd( &val, "p" ), port );
740    tr_bencInitStr( tr_bencDictAdd( &val, "v" ), v, 0, 1 );
741    buf = tr_bencSave( &val, &len );
742
743    tr_peerIoWriteUint32( msgs->io, outbuf, 2*sizeof(uint8_t) + len );
744    tr_peerIoWriteUint8 ( msgs->io, outbuf, BT_LTEP );
745    tr_peerIoWriteUint8 ( msgs->io, outbuf, LTEP_HANDSHAKE );
746    tr_peerIoWriteBytes ( msgs->io, outbuf, buf, len );
747
748    tr_peerIoWriteBuf( msgs->io, outbuf );
749
750#if 0
751    dbgmsg( msgs, "here is the ltep handshake we sent:" );
752    tr_bencPrint( &val );
753    dbgmsg( msgs, "here is the ltep handshake we read [%s]:", tr_bencSave( &val, NULL ) );
754#endif
755
756    /* cleanup */
757    tr_bencFree( &val );
758    tr_free( buf );
759    evbuffer_free( outbuf );
760}
761
762static void
763parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
764{
765    benc_val_t val, * sub;
766    uint8_t * tmp = tr_new( uint8_t, len );
767
768    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
769    msgs->peerSentLtepHandshake = 1;
770
771    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
772        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
773        tr_free( tmp );
774        return;
775    }
776
777#if 0
778    dbgmsg( msgs, "here is the ltep handshake we read:" );
779    tr_bencPrint( &val );
780    dbgmsg( msgs, "here is the ltep handshake we read [%s]:", tr_bencSave( &val, NULL ) );
781#endif
782
783    /* does the peer prefer encrypted connections? */
784    sub = tr_bencDictFind( &val, "e" );
785    if( tr_bencIsInt( sub ) )
786        msgs->info->encryption_preference = sub->val.i
787                                      ? ENCRYPTION_PREFERENCE_YES
788                                      : ENCRYPTION_PREFERENCE_NO;
789
790    /* check supported messages for utorrent pex */
791    sub = tr_bencDictFind( &val, "m" );
792    if( tr_bencIsDict( sub ) ) {
793        sub = tr_bencDictFind( sub, "ut_pex" );
794        if( tr_bencIsInt( sub ) ) {
795            msgs->peerSupportsPex = 1;
796            msgs->ut_pex_id = (uint8_t) sub->val.i;
797            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
798        }
799    }
800
801    /* get peer's listening port */
802    sub = tr_bencDictFind( &val, "p" );
803    if( tr_bencIsInt( sub ) ) {
804        msgs->info->port = htons( (uint16_t)sub->val.i );
805        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
806    }
807
808    tr_bencFree( &val );
809    tr_free( tmp );
810}
811
812static void
813parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
814{
815    benc_val_t val, * sub;
816    uint8_t * tmp;
817
818    if( !tr_torrentIsPexEnabled( msgs->torrent ) ) /* no sharing! */
819        return;
820
821    tmp = tr_new( uint8_t, msglen );
822    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
823
824    if( tr_bencLoad( tmp, msglen, &val, NULL ) || !tr_bencIsDict( &val ) ) {
825        dbgmsg( msgs, "GET can't read extended-pex dictionary" );
826        tr_free( tmp );
827        return;
828    }
829
830    sub = tr_bencDictFind( &val, "added" );
831    if( tr_bencIsStr(sub) && ((sub->val.s.i % 6) == 0)) {
832        const int n = sub->val.s.i / 6 ;
833        dbgmsg( msgs, "got %d peers from uT pex", n );
834        tr_peerMgrAddPeers( msgs->handle->peerMgr,
835                            msgs->torrent->info.hash,
836                            TR_PEER_FROM_PEX,
837                            (uint8_t*)sub->val.s.s, n );
838    }
839
840    tr_bencFree( &val );
841    tr_free( tmp );
842}
843
844static void
845sendPex( tr_peermsgs * msgs );
846
847static void
848parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
849{
850    uint8_t ltep_msgid;
851
852    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
853    msglen--;
854
855    if( ltep_msgid == LTEP_HANDSHAKE )
856    {
857        dbgmsg( msgs, "got ltep handshake" );
858        parseLtepHandshake( msgs, msglen, inbuf );
859        if( tr_peerIoSupportsLTEP( msgs->io ) )
860        {
861            sendLtepHandshake( msgs );
862            sendPex( msgs );
863        }
864    }
865    else if( ltep_msgid == msgs->ut_pex_id )
866    {
867        dbgmsg( msgs, "got ut pex" );
868        msgs->peerSupportsPex = 1;
869        parseUtPex( msgs, msglen, inbuf );
870    }
871    else
872    {
873        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
874        evbuffer_drain( inbuf, msglen );
875    }
876}
877
878static int
879readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
880{
881    uint32_t len;
882
883    if( inlen < sizeof(len) )
884        return READ_MORE;
885
886    tr_peerIoReadUint32( msgs->io, inbuf, &len );
887
888    if( len == 0 ) /* peer sent us a keepalive message */
889        dbgmsg( msgs, "got KeepAlive" );
890    else {
891        msgs->incoming.length = len;
892        msgs->state = AWAITING_BT_ID;
893    }
894
895    return READ_AGAIN;
896}
897
898static int
899readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen );
900
901static int
902readBtId( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
903{
904    uint8_t id;
905
906    if( inlen < sizeof(uint8_t) )
907        return READ_MORE;
908
909    tr_peerIoReadUint8( msgs->io, inbuf, &id );
910    msgs->incoming.id = id;
911
912    if( id==BT_PIECE )
913    {
914        msgs->state = AWAITING_BT_PIECE;
915        return READ_AGAIN;
916    }
917    else if( msgs->incoming.length != 1 )
918    {
919        msgs->state = AWAITING_BT_MESSAGE;
920        return READ_AGAIN;
921    }
922    else return readBtMessage( msgs, inbuf, inlen-1 );
923}
924
925static void
926updatePeerProgress( tr_peermsgs * msgs )
927{
928    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
929    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
930    updateInterest( msgs );
931    firePeerProgress( msgs );
932}
933
934static int
935clientCanSendFastBlock( const tr_peermsgs * msgs UNUSED )
936{
937    /* We can't send a fast piece if a peer has more than MAX_FAST_ALLOWED_THRESHOLD pieces */
938    if( tr_bitfieldCountTrueBits( msgs->info->have ) > MAX_FAST_ALLOWED_THRESHOLD )
939        return FALSE;
940   
941    /* ...or if we don't have ourself enough pieces */
942    if( tr_bitfieldCountTrueBits( tr_cpPieceBitfield( msgs->torrent->completion ) ) < MAX_FAST_ALLOWED_THRESHOLD )
943        return FALSE;
944
945    /* Maybe a bandwidth limit ? */
946    return TRUE;
947}
948
949static void
950peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
951{
952    const int reqIsValid = requestIsValid( msgs, req );
953    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
954    const int peerIsChoked = msgs->info->peerIsChoked;
955    const int peerIsFast = tr_peerIoSupportsFEXT( msgs->io );
956    const int pieceIsFast = reqIsValid && tr_bitfieldHas( msgs->peerAllowedPieces, req->index );
957    const int canSendFast = clientCanSendFastBlock( msgs );
958
959    if( !reqIsValid ) /* bad request */
960    {
961        dbgmsg( msgs, "rejecting an invalid request." );
962        sendFastReject( msgs, req->index, req->offset, req->length );
963    }
964    else if( !clientHasPiece ) /* we don't have it */
965    {
966        dbgmsg( msgs, "rejecting request for a piece we don't have." );
967        sendFastReject( msgs, req->index, req->offset, req->length );
968    }
969    else if( peerIsChoked && !peerIsFast ) /* doesn't he know he's choked? */
970    {
971        tr_peerMsgsSetChoke( msgs, 1 );
972        sendFastReject( msgs, req->index, req->offset, req->length );
973    }
974    else if( peerIsChoked && peerIsFast && ( !pieceIsFast || !canSendFast ) )
975    {
976        sendFastReject( msgs, req->index, req->offset, req->length );
977    }
978    else /* YAY */
979    {
980        struct peer_request * tmp = tr_new( struct peer_request, 1 );
981        *tmp = *req;
982        if( peerIsFast && pieceIsFast )
983            tr_list_append( &msgs->peerAskedForFast, tmp );
984        else
985            tr_list_append( &msgs->peerAskedFor, tmp );
986    }
987}
988
989static int
990messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
991{
992    switch( id )
993    {
994        case BT_CHOKE:
995        case BT_UNCHOKE:
996        case BT_INTERESTED:
997        case BT_NOT_INTERESTED:
998        case BT_HAVE_ALL:
999        case BT_HAVE_NONE:
1000            return len==1;
1001
1002        case BT_HAVE:
1003        case BT_SUGGEST:
1004        case BT_ALLOWED_FAST:
1005            return len==5;
1006
1007        case BT_BITFIELD:
1008            return len == (msg->torrent->info.pieceCount+7u)/8u + 1u;
1009       
1010        case BT_REQUEST:
1011        case BT_CANCEL:
1012        case BT_REJECT:
1013            return len==13;
1014
1015        case BT_PIECE:
1016            return len>9 && len<=16393;
1017
1018        case BT_PORT:
1019            return len==3;
1020
1021        case BT_LTEP:
1022            return len >= 2;
1023
1024        default:
1025            return FALSE;
1026    }
1027}
1028
1029static int
1030clientGotBlock( tr_peermsgs * msgs, const uint8_t * block, const struct peer_request * req );
1031
1032static void
1033clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1034{
1035    tr_torrent * tor = msgs->torrent;
1036    tor->activityDate = tr_date( );
1037    tor->downloadedCur += byteCount;
1038    msgs->info->pieceDataActivityDate = time( NULL );
1039    msgs->info->credit += (int)(byteCount * SWIFT_REPAYMENT_RATIO);
1040    tr_rcTransferred( msgs->info->rcToClient, byteCount );
1041    tr_rcTransferred( tor->download, byteCount );
1042    tr_rcTransferred( tor->handle->download, byteCount );
1043}
1044
1045
1046static int
1047readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1048{
1049    struct peer_request * req = &msgs->incoming.blockReq;
1050    dbgmsg( msgs, "In readBtPiece" );
1051
1052    if( !req->length )
1053    {
1054        if( inlen < 8 )
1055            return READ_MORE;
1056
1057        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
1058        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
1059        req->length = msgs->incoming.length - 9;
1060        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1061        return READ_AGAIN;
1062    }
1063    else
1064    {
1065        int err;
1066
1067        /* read in another chunk of data */
1068        const size_t nLeft = req->length - EVBUFFER_LENGTH(msgs->incoming.block);
1069        size_t n = MIN( nLeft, inlen );
1070        uint8_t * buf = tr_new( uint8_t, n );
1071        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
1072        evbuffer_add( msgs->incoming.block, buf, n );
1073        clientGotBytes( msgs, n );
1074        tr_free( buf );
1075        dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
1076               (int)n, req->index, req->offset, req->length,
1077               (int)( req->length - EVBUFFER_LENGTH(msgs->incoming.block) ) );
1078        if( EVBUFFER_LENGTH(msgs->incoming.block) < req->length )
1079            return READ_MORE;
1080
1081        /* we've got the whole block ... process it */
1082        err = clientGotBlock( msgs, EVBUFFER_DATA(msgs->incoming.block), req );
1083
1084        /* cleanup */
1085        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH(msgs->incoming.block) );
1086        req->length = 0;
1087        msgs->state = AWAITING_BT_LENGTH;
1088        if( !err )
1089            return READ_AGAIN;
1090        else {
1091            fireGotAssertError( msgs );
1092            return READ_DONE;
1093        }
1094    }
1095}
1096
1097static int
1098readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1099{
1100    uint32_t ui32;
1101    uint32_t msglen = msgs->incoming.length;
1102    const uint8_t id = msgs->incoming.id;
1103    const size_t startBufLen = EVBUFFER_LENGTH( inbuf );
1104
1105    --msglen; // id length
1106
1107    if( inlen < msglen )
1108        return READ_MORE;
1109
1110    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id, (int)msglen, (int)inlen );
1111
1112    if( !messageLengthIsCorrect( msgs, id, msglen+1 ) )
1113    {
1114        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1115        fireGotError( msgs );
1116        return READ_DONE;
1117    }
1118
1119    switch( id )
1120    {
1121        case BT_CHOKE:
1122            dbgmsg( msgs, "got Choke" );
1123            msgs->info->clientIsChoked = 1;
1124            cancelAllRequestsToPeer( msgs );
1125            cancelAllRequestsToClientExceptFast( msgs );
1126            break;
1127
1128        case BT_UNCHOKE:
1129            dbgmsg( msgs, "got Unchoke" );
1130            msgs->info->clientIsChoked = 0;
1131            fireNeedReq( msgs );
1132            break;
1133
1134        case BT_INTERESTED:
1135            dbgmsg( msgs, "got Interested" );
1136            msgs->info->peerIsInterested = 1;
1137            tr_peerMsgsSetChoke( msgs, 0 );
1138            break;
1139
1140        case BT_NOT_INTERESTED:
1141            dbgmsg( msgs, "got Not Interested" );
1142            msgs->info->peerIsInterested = 0;
1143            break;
1144           
1145        case BT_HAVE:
1146            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1147            dbgmsg( msgs, "got Have: %u", ui32 );
1148            tr_bitfieldAdd( msgs->info->have, ui32 );
1149            /* If this is a fast-allowed piece for this peer, mark it as normal now */
1150            if( msgs->clientAllowedPieces != NULL && tr_bitfieldHas( msgs->clientAllowedPieces, ui32 ) )
1151                tr_bitfieldRem( msgs->clientAllowedPieces, ui32 );
1152            updatePeerProgress( msgs );
1153            tr_rcTransferred( msgs->torrent->swarmspeed, msgs->torrent->info.pieceSize );
1154            break;
1155
1156        case BT_BITFIELD: {
1157            const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
1158            dbgmsg( msgs, "got a bitfield" );
1159            msgs->peerSentBitfield = 1;
1160            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
1161            updatePeerProgress( msgs );
1162            tr_peerMsgsSetChoke( msgs, !clientIsSeed || (msgs->info->progress<1.0) );
1163            fireNeedReq( msgs );
1164            break;
1165        }
1166
1167        case BT_REQUEST: {
1168            struct peer_request req;
1169            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1170            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1171            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1172            dbgmsg( msgs, "got Request: %u:%u->%u", req.index, req.offset, req.length );
1173            peerMadeRequest( msgs, &req );
1174            break;
1175        }
1176
1177        case BT_CANCEL: {
1178            struct peer_request req;
1179            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1180            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1181            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1182            dbgmsg( msgs, "got a Cancel %u:%u->%u", req.index, req.offset, req.length );
1183            tr_free( tr_list_remove( &msgs->peerAskedForFast, &req, compareRequest ) );
1184            tr_free( tr_list_remove( &msgs->peerAskedFor, &req, compareRequest ) );
1185            break;
1186        }
1187
1188        case BT_PIECE:
1189            assert( 0 ); /* should be handled elsewhere! */
1190            break;
1191       
1192        case BT_PORT:
1193            dbgmsg( msgs, "Got a BT_PORT" );
1194            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1195            break;
1196           
1197        case BT_SUGGEST: {
1198            dbgmsg( msgs, "Got a BT_SUGGEST" );
1199            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1200            const tr_bitfield * bt = tr_cpPieceBitfield( msgs->torrent->completion );
1201            if( tr_bitfieldHas( bt, ui32 ) )
1202                tr_bitfieldAdd( msgs->clientSuggestedPieces, ui32 );
1203            break;
1204        }
1205           
1206        case BT_HAVE_ALL:
1207            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1208            tr_bitfieldAddRange( msgs->info->have, 0, msgs->torrent->info.pieceCount );
1209            updatePeerProgress( msgs );
1210            break;
1211       
1212       
1213        case BT_HAVE_NONE:
1214            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1215            tr_bitfieldClear( msgs->info->have );
1216            updatePeerProgress( msgs );
1217            break;
1218       
1219        case BT_REJECT: {
1220            struct peer_request req;
1221            dbgmsg( msgs, "Got a BT_REJECT" );
1222            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1223            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1224            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1225            tr_free( tr_list_remove( &msgs->clientAskedFor, &req, compareRequest ) );
1226            break;
1227        }
1228
1229        case BT_ALLOWED_FAST: {
1230            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1231            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1232            tr_bitfieldAdd( msgs->clientAllowedPieces, ui32 );
1233            break;
1234        }
1235
1236        case BT_LTEP:
1237            dbgmsg( msgs, "Got a BT_LTEP" );
1238            parseLtep( msgs, msglen, inbuf );
1239            break;
1240
1241        default:
1242            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1243            tr_peerIoDrain( msgs->io, inbuf, msglen );
1244            break;
1245    }
1246
1247    assert( msglen + 1 == msgs->incoming.length );
1248    assert( EVBUFFER_LENGTH(inbuf) == startBufLen - msglen );
1249
1250    msgs->state = AWAITING_BT_LENGTH;
1251    return READ_AGAIN;
1252}
1253
1254static void
1255peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1256{
1257    tr_torrent * tor = msgs->torrent;
1258    tor->activityDate = tr_date( );
1259    tor->uploadedCur += byteCount;
1260    msgs->info->pieceDataActivityDate = time( NULL );
1261    msgs->info->credit -= byteCount;
1262    tr_rcTransferred( msgs->info->rcToPeer, byteCount );
1263    tr_rcTransferred( tor->upload, byteCount );
1264    tr_rcTransferred( tor->handle->upload, byteCount );
1265}
1266
1267static size_t
1268getDownloadMax( const tr_peermsgs * msgs )
1269{
1270    static const size_t maxval = ~0;
1271    const tr_torrent * tor = msgs->torrent;
1272
1273    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1274        return tor->handle->useDownloadLimit
1275            ? tr_rcBytesLeft( tor->handle->download ) : maxval;
1276
1277    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
1278        return tr_rcBytesLeft( tor->download );
1279
1280    return maxval;
1281}
1282
1283static void
1284reassignBytesToCorrupt( tr_peermsgs * msgs, uint32_t byteCount )
1285{
1286    tr_torrent * tor = msgs->torrent;
1287
1288    /* increment the `corrupt' field */
1289    tor->corruptCur += byteCount;
1290
1291    /* decrement the `downloaded' field */
1292    if( tor->downloadedCur >= byteCount )
1293        tor->downloadedCur -= byteCount;
1294    else
1295        tor->downloadedCur = 0;
1296}
1297
1298
1299static void
1300gotBadPiece( tr_peermsgs * msgs, uint32_t pieceIndex )
1301{
1302    const uint32_t byteCount =
1303        tr_torPieceCountBytes( msgs->torrent, (int)pieceIndex );
1304    reassignBytesToCorrupt( msgs, byteCount );
1305}
1306
1307static void
1308clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1309{
1310    reassignBytesToCorrupt( msgs, req->length );
1311}
1312
1313static void
1314addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1315{
1316    if( !msgs->info->blame )
1317         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1318    tr_bitfieldAdd( msgs->info->blame, index );
1319}
1320
1321static int
1322clientGotBlock( tr_peermsgs                * msgs,
1323                const uint8_t              * data,
1324                const struct peer_request  * req )
1325{
1326    int i;
1327    tr_torrent * tor = msgs->torrent;
1328    const int block = _tr_block( tor, req->index, req->offset );
1329    struct peer_request *myreq;
1330
1331    assert( msgs != NULL );
1332    assert( req != NULL );
1333
1334    if( req->length != (uint32_t)tr_torBlockCountBytes( msgs->torrent, block ) )
1335    {
1336        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1337                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1338        return TR_ERROR_ASSERT;
1339    }
1340
1341    /* save the block */
1342    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1343
1344    /**
1345    *** Remove the block from our `we asked for this' list
1346    **/
1347
1348    myreq = tr_list_remove( &msgs->clientAskedFor, req, compareRequest );
1349    if( myreq == NULL ) {
1350        clientGotUnwantedBlock( msgs, req );
1351        dbgmsg( msgs, "we didn't ask for this message..." );
1352        return 0;
1353    }
1354
1355    dbgmsg( msgs, "got block %u:%u->%u (turnaround time %d secs)", 
1356                  myreq->index, myreq->offset, myreq->length,
1357                  (int)(time(NULL) - myreq->time_requested) );
1358    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1359                  tr_list_size(msgs->clientAskedFor));
1360
1361    tr_free( myreq );
1362    myreq = NULL;
1363
1364    /**
1365    *** Error checks
1366    **/
1367
1368    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1369        dbgmsg( msgs, "we have this block already..." );
1370        clientGotUnwantedBlock( msgs, req );
1371        return 0;
1372    }
1373
1374    /**
1375    ***  Save the block
1376    **/
1377
1378    msgs->info->peerSentPieceDataAt = time( NULL );
1379    i = tr_ioWrite( tor, req->index, req->offset, req->length, data );
1380    if( i )
1381        return 0;
1382
1383    tr_cpBlockAdd( tor->completion, block );
1384
1385    addPeerToBlamefield( msgs, req->index );
1386
1387    fireGotBlock( msgs, req );
1388
1389    /**
1390    ***  Handle if this was the last block in the piece
1391    **/
1392
1393    if( tr_cpPieceIsComplete( tor->completion, req->index ) )
1394    {
1395        if( tr_ioHash( tor, req->index ) )
1396        {
1397            gotBadPiece( msgs, req->index );
1398            return 0;
1399        }
1400
1401        fireClientHave( msgs, req->index );
1402    }
1403
1404    return 0;
1405}
1406
1407static ReadState
1408canRead( struct bufferevent * evin, void * vmsgs )
1409{
1410    ReadState ret;
1411    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1412    struct evbuffer * inbuf = EVBUFFER_INPUT ( evin );
1413    const size_t buflen = EVBUFFER_LENGTH( inbuf );
1414    const size_t n = MIN( buflen, getDownloadMax( msgs ) );
1415
1416    if( !n )
1417    {
1418        ret = READ_DONE;
1419    }
1420    else switch( msgs->state )
1421    {
1422        case AWAITING_BT_LENGTH:  ret = readBtLength ( msgs, inbuf, n ); break;
1423        case AWAITING_BT_ID:      ret = readBtId     ( msgs, inbuf, n ); break;
1424        case AWAITING_BT_MESSAGE: ret = readBtMessage( msgs, inbuf, n ); break;
1425        case AWAITING_BT_PIECE:   ret = readBtPiece  ( msgs, inbuf, n ); break;
1426        default: assert( 0 );
1427    }
1428
1429    return ret;
1430}
1431
1432static void
1433sendKeepalive( tr_peermsgs * msgs )
1434{
1435    dbgmsg( msgs, "sending a keepalive message" );
1436    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1437}
1438
1439/**
1440***
1441**/
1442
1443static int
1444canWrite( const tr_peermsgs * msgs )
1445{
1446    /* don't let our outbuffer get too large */
1447    return tr_peerIoWriteBytesWaiting( msgs->io ) < 4096;
1448}
1449
1450static size_t
1451getUploadMax( const tr_peermsgs * msgs )
1452{
1453    static const size_t maxval = ~0;
1454    const tr_torrent * tor = msgs->torrent;
1455    const int useSwift = SWIFT_ENABLED && !tr_torrentIsSeed( msgs->torrent );
1456    const size_t swiftBytes = msgs->info->credit;
1457    size_t speedBytes;
1458
1459    if( !canWrite( msgs ) )
1460        return 0;
1461
1462    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1463        speedBytes = tor->handle->useUploadLimit ? tr_rcBytesLeft( tor->handle->upload ) : maxval;
1464    else if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
1465        speedBytes = tr_rcBytesLeft( tor->upload );
1466    else
1467        speedBytes = ~0;
1468
1469    return useSwift ? MIN( speedBytes, swiftBytes )
1470                    : speedBytes;
1471}
1472
1473static int
1474ratePulse( void * vmsgs )
1475{
1476    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1477    msgs->info->rateToClient = tr_rcRate( msgs->info->rcToClient );
1478    msgs->info->rateToPeer = tr_rcRate( msgs->info->rcToPeer );
1479    msgs->maxActiveRequests = MIN( 8 + (int)(msgs->info->rateToClient/5), 100 );
1480    msgs->minActiveRequests = msgs->maxActiveRequests / 3;
1481    return TRUE;
1482}
1483
1484static struct peer_request*
1485popNextRequest( tr_peermsgs * msgs )
1486{
1487    struct peer_request * ret;
1488    ret = tr_list_pop_front( &msgs->peerAskedForFast );
1489    if( !ret )
1490        ret = tr_list_pop_front( &msgs->peerAskedFor);
1491    return ret;
1492}
1493
1494static void
1495updatePeerStatus( tr_peermsgs * msgs )
1496{
1497    tr_peer * peer = msgs->info;
1498
1499    if( !msgs->peerSentBitfield )
1500        peer->status = TR_PEER_STATUS_HANDSHAKE;
1501
1502    else if( ( time(NULL) - peer->pieceDataActivityDate ) < 3 )
1503        peer->status = peer->clientIsChoked
1504                       ? TR_PEER_STATUS_ACTIVE_AND_CHOKED
1505                       : TR_PEER_STATUS_ACTIVE;
1506
1507    else if( peer->peerIsChoked )
1508        peer->status = TR_PEER_STATUS_PEER_IS_CHOKED;
1509
1510    else if( peer->clientIsChoked )
1511        peer->status = peer->clientIsInterested
1512                       ? TR_PEER_STATUS_CLIENT_IS_INTERESTED
1513                       : TR_PEER_STATUS_CLIENT_IS_CHOKED;
1514
1515    else if( msgs->clientAskedFor != NULL )
1516        peer->status = TR_PEER_STATUS_REQUEST_SENT;
1517
1518    else
1519        peer->status = TR_PEER_STATUS_READY;
1520}
1521
1522static int
1523pulse( void * vmsgs )
1524{
1525    const time_t now = time( NULL );
1526    tr_peermsgs * msgs = vmsgs;
1527    struct peer_request * r;
1528    size_t len;
1529
1530    tr_peerIoTryRead( msgs->io );
1531    pumpRequestQueue( msgs );
1532    updatePeerStatus( msgs );
1533
1534    if( msgs->sendingBlock )
1535    {
1536        const size_t uploadMax = getUploadMax( msgs );
1537        size_t len = EVBUFFER_LENGTH( msgs->outBlock );
1538        const size_t outlen = MIN( len, uploadMax );
1539
1540        assert( len );
1541
1542        if( outlen && canWrite( msgs ) )
1543        {
1544            tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen );
1545            evbuffer_drain( msgs->outBlock, outlen );
1546            peerGotBytes( msgs, outlen );
1547
1548            len -= outlen;
1549            msgs->clientSentAnythingAt = now;
1550            msgs->sendingBlock = len!=0;
1551
1552            dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen, (int)len );
1553        }
1554    }
1555
1556    if( !msgs->sendingBlock )
1557    {
1558        if(( len = EVBUFFER_LENGTH( msgs->outMessages ) ))
1559        {
1560            tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1561            msgs->clientSentAnythingAt = now;
1562        }
1563        else if( !EVBUFFER_LENGTH( msgs->outBlock )
1564            && (( r = popNextRequest( msgs )))
1565            && requestIsValid( msgs, r )
1566            && tr_cpPieceIsComplete( msgs->torrent->completion, r->index ) )
1567        {
1568            uint8_t * buf = tr_new( uint8_t, r->length );
1569
1570            if( !tr_ioRead( msgs->torrent, r->index, r->offset, r->length, buf ) )
1571            {
1572                tr_peerIo * io = msgs->io;
1573                struct evbuffer * out = msgs->outBlock;
1574
1575                dbgmsg( msgs, "sending block %u:%u->%u", r->index, r->offset, r->length );
1576                tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 2*sizeof(uint32_t) + r->length );
1577                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1578                tr_peerIoWriteUint32( io, out, r->index );
1579                tr_peerIoWriteUint32( io, out, r->offset );
1580                tr_peerIoWriteBytes ( io, out, buf, r->length );
1581                msgs->sendingBlock = 1;
1582            }
1583
1584            tr_free( buf );
1585            tr_free( r );
1586        }
1587        else if( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
1588        {
1589            sendKeepalive( msgs );
1590        }
1591    }
1592
1593    return TRUE; /* loop forever */
1594}
1595
1596static void
1597gotError( struct bufferevent * evbuf UNUSED, short what, void * vmsgs )
1598{
1599    if( what & EVBUFFER_TIMEOUT )
1600        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1601
1602    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) ) {
1603        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1604                what, errno, strerror(errno) );
1605        fireGotError( vmsgs );
1606    }
1607}
1608
1609static void
1610sendBitfield( tr_peermsgs * msgs )
1611{
1612    const tr_bitfield * bitfield = tr_cpPieceBitfield( msgs->torrent->completion );
1613    struct evbuffer * out = msgs->outMessages;
1614
1615    dbgmsg( msgs, "sending peer a bitfield message" );
1616    tr_peerIoWriteUint32( msgs->io, out, sizeof(uint8_t) + bitfield->len );
1617    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1618    tr_peerIoWriteBytes ( msgs->io, out, bitfield->bits, bitfield->len );
1619}
1620
1621/**
1622***
1623**/
1624
1625/* some peers give us error messages if we send
1626   more than this many peers in a single pex message */
1627#define MAX_PEX_DIFFS 200
1628
1629typedef struct
1630{
1631    tr_pex * added;
1632    tr_pex * dropped;
1633    tr_pex * elements;
1634    int addedCount;
1635    int droppedCount;
1636    int elementCount;
1637    int diffCount;
1638}
1639PexDiffs;
1640
1641static void
1642pexAddedCb( void * vpex, void * userData )
1643{
1644    PexDiffs * diffs = (PexDiffs *) userData;
1645    tr_pex * pex = (tr_pex *) vpex;
1646    if( diffs->diffCount < MAX_PEX_DIFFS )
1647    {
1648        diffs->diffCount++;
1649        diffs->added[diffs->addedCount++] = *pex;
1650        diffs->elements[diffs->elementCount++] = *pex;
1651    }
1652}
1653
1654static void
1655pexRemovedCb( void * vpex, void * userData )
1656{
1657    PexDiffs * diffs = (PexDiffs *) userData;
1658    tr_pex * pex = (tr_pex *) vpex;
1659    if( diffs->diffCount < MAX_PEX_DIFFS )
1660    {
1661        diffs->diffCount++;
1662        diffs->dropped[diffs->droppedCount++] = *pex;
1663    }
1664}
1665
1666static void
1667pexElementCb( void * vpex, void * userData )
1668{
1669    PexDiffs * diffs = (PexDiffs *) userData;
1670    tr_pex * pex = (tr_pex *) vpex;
1671    if( diffs->diffCount < MAX_PEX_DIFFS )
1672    {
1673        diffs->diffCount++;
1674        diffs->elements[diffs->elementCount++] = *pex;
1675    }
1676}
1677
1678static void
1679sendPex( tr_peermsgs * msgs )
1680{
1681    if( msgs->peerSupportsPex && tr_torrentIsPexEnabled( msgs->torrent ) )
1682    {
1683        int i;
1684        tr_pex * newPex = NULL;
1685        const int newCount = tr_peerMgrGetPeers( msgs->handle->peerMgr, msgs->torrent->info.hash, &newPex );
1686        PexDiffs diffs;
1687        benc_val_t val, *added, *dropped, *flags;
1688        uint8_t *tmp, *walk;
1689        char * benc;
1690        int bencLen;
1691
1692        /* build the diffs */
1693        diffs.added = tr_new( tr_pex, newCount );
1694        diffs.addedCount = 0;
1695        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1696        diffs.droppedCount = 0;
1697        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1698        diffs.elementCount = 0;
1699        diffs.diffCount = 0;
1700        tr_set_compare( msgs->pex, msgs->pexCount,
1701                        newPex, newCount,
1702                        tr_pexCompare, sizeof(tr_pex),
1703                        pexRemovedCb, pexAddedCb, pexElementCb, &diffs );
1704        dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1705
1706        /* update peer */
1707        tr_free( msgs->pex );
1708        msgs->pex = diffs.elements;
1709        msgs->pexCount = diffs.elementCount;
1710
1711        /* build the pex payload */
1712        tr_bencInit( &val, TYPE_DICT );
1713        tr_bencDictReserve( &val, 3 );
1714
1715        /* "added" */
1716        added = tr_bencDictAdd( &val, "added" );
1717        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1718        for( i=0; i<diffs.addedCount; ++i ) {
1719            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1720            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1721        }
1722        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1723        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
1724
1725        /* "added.f" */
1726        flags = tr_bencDictAdd( &val, "added.f" );
1727        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1728        for( i=0; i<diffs.addedCount; ++i )
1729            *walk++ = diffs.added[i].flags;
1730        assert( ( walk - tmp ) == diffs.addedCount );
1731        tr_bencInitStr( flags, tmp, walk-tmp, FALSE );
1732
1733        /* "dropped" */
1734        dropped = tr_bencDictAdd( &val, "dropped" );
1735        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1736        for( i=0; i<diffs.droppedCount; ++i ) {
1737            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1738            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1739        }
1740        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1741        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
1742
1743        /* write the pex message */
1744        benc = tr_bencSave( &val, &bencLen );
1745        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 2*sizeof(uint8_t) + bencLen );
1746        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_LTEP );
1747        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, OUR_LTEP_PEX );
1748        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, benc, bencLen );
1749
1750        /* cleanup */
1751        tr_free( benc );
1752        tr_bencFree( &val );
1753        tr_free( diffs.added );
1754        tr_free( diffs.dropped );
1755        tr_free( newPex );
1756
1757        msgs->clientSentPexAt = time( NULL );
1758    }
1759}
1760
1761static int
1762pexPulse( void * vpeer )
1763{
1764    sendPex( vpeer );
1765    return TRUE;
1766}
1767
1768/**
1769***
1770**/
1771
1772tr_peermsgs*
1773tr_peerMsgsNew( struct tr_torrent * torrent,
1774                struct tr_peer    * info,
1775                tr_delivery_func    func,
1776                void              * userData,
1777                tr_publisher_tag  * setme )
1778{
1779    tr_peermsgs * m;
1780
1781    assert( info != NULL );
1782    assert( info->io != NULL );
1783
1784    m = tr_new0( tr_peermsgs, 1 );
1785    m->publisher = tr_publisherNew( );
1786    m->info = info;
1787    m->handle = torrent->handle;
1788    m->torrent = torrent;
1789    m->io = info->io;
1790    m->info->status = TR_PEER_STATUS_HANDSHAKE;
1791    m->info->clientIsChoked = 1;
1792    m->info->peerIsChoked = 1;
1793    m->info->clientIsInterested = 0;
1794    m->info->peerIsInterested = 0;
1795    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1796    m->state = AWAITING_BT_LENGTH;
1797    m->pulseTimer = tr_timerNew( m->handle, pulse, m, PEER_PULSE_INTERVAL );
1798    m->rateTimer = tr_timerNew( m->handle, ratePulse, m, RATE_PULSE_INTERVAL );
1799    m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL );
1800    m->outMessages = evbuffer_new( );
1801    m->incoming.block = evbuffer_new( );
1802    m->outBlock = evbuffer_new( );
1803    m->peerAllowedPieces = NULL;
1804    m->clientAllowedPieces = NULL;
1805    m->clientSuggestedPieces = NULL;
1806    *setme = tr_publisherSubscribe( m->publisher, func, userData );
1807   
1808    if ( tr_peerIoSupportsFEXT( m->io ) )
1809    {
1810        /* This peer is fastpeer-enabled, generate its allowed set
1811         * (before registering our callbacks) */
1812        if ( !m->peerAllowedPieces ) {
1813            const struct in_addr *peerAddr = tr_peerIoGetAddress( m->io, NULL );
1814           
1815            m->peerAllowedPieces = tr_peerMgrGenerateAllowedSet( MAX_FAST_ALLOWED_COUNT,
1816                                                                 m->torrent->info.pieceCount,
1817                                                                 m->torrent->info.hash,
1818                                                                 peerAddr );
1819        }
1820        m->clientAllowedPieces = tr_bitfieldNew( m->torrent->info.pieceCount );
1821       
1822        m->clientSuggestedPieces = tr_bitfieldNew( m->torrent->info.pieceCount );
1823    }
1824   
1825    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of inactivity */
1826    tr_peerIoSetIOFuncs( m->io, canRead, gotError, m );
1827    ratePulse( m );
1828
1829    if ( tr_peerIoSupportsLTEP( m->io ) )
1830        sendLtepHandshake( m );
1831
1832    if ( !tr_peerIoSupportsFEXT( m->io ) )
1833        sendBitfield( m );
1834    else {
1835        /* This peer is fastpeer-enabled, send it have-all or have-none if appropriate */
1836        uint32_t peerProgress;
1837        float completion = tr_cpPercentComplete( m->torrent->completion );
1838        if ( completion == 0.0f ) {
1839            sendFastHave( m, 0 );
1840        } else if ( completion == 1.0f ) {
1841            sendFastHave( m, 1 );
1842        } else {
1843            sendBitfield( m );
1844        }
1845        peerProgress = m->torrent->info.pieceCount * m->info->progress;
1846       
1847        if ( peerProgress < MAX_FAST_ALLOWED_COUNT )
1848            sendFastAllowedSet( m );
1849    }
1850
1851    return m;
1852}
1853
1854void
1855tr_peerMsgsFree( tr_peermsgs* msgs )
1856{
1857    if( msgs != NULL )
1858    {
1859        tr_timerFree( &msgs->pulseTimer );
1860        tr_timerFree( &msgs->rateTimer );
1861        tr_timerFree( &msgs->pexTimer );
1862        tr_publisherFree( &msgs->publisher );
1863        tr_list_free( &msgs->clientWillAskFor, tr_free );
1864        tr_list_free( &msgs->clientAskedFor, tr_free );
1865        tr_list_free( &msgs->peerAskedForFast, tr_free );
1866        tr_list_free( &msgs->peerAskedFor, tr_free );
1867        tr_bitfieldFree( msgs->peerAllowedPieces );
1868        tr_bitfieldFree( msgs->clientAllowedPieces );
1869        tr_bitfieldFree( msgs->clientSuggestedPieces );
1870        evbuffer_free( msgs->incoming.block );
1871        evbuffer_free( msgs->outMessages );
1872        evbuffer_free( msgs->outBlock );
1873        tr_free( msgs->pex );
1874
1875        memset( msgs, ~0, sizeof( tr_peermsgs ) );
1876        tr_free( msgs );
1877    }
1878}
1879
1880tr_publisher_tag
1881tr_peerMsgsSubscribe( tr_peermsgs       * peer,
1882                      tr_delivery_func    func,
1883                      void              * userData )
1884{
1885    return tr_publisherSubscribe( peer->publisher, func, userData );
1886}
1887
1888void
1889tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
1890                        tr_publisher_tag    tag )
1891{
1892    tr_publisherUnsubscribe( peer->publisher, tag );
1893}
1894
1895int
1896tr_peerMsgsIsPieceFastAllowed( const tr_peermsgs * peer,
1897                               uint32_t            index )
1898{
1899    return tr_bitfieldHas( peer->clientAllowedPieces, index );
1900}
1901
1902int
1903tr_peerMsgsIsPieceSuggested( const tr_peermsgs * peer,
1904                             uint32_t            index )
1905{
1906    return tr_bitfieldHas( peer->clientSuggestedPieces, index );
1907}
1908
Note: See TracBrowser for help on using the repository browser.