source: trunk/libtransmission/peer-msgs.c @ 5650

Last change on this file since 5650 was 5650, checked in by charles, 14 years ago

peer-msgs: faster upload speeds in situations with few peers. this patch needs wider testing for side-effects wrt speed limits.

  • Property svn:keywords set to Date Rev Author Id
File size: 55.5 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 5650 2008-04-19 19:37:05Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <stdio.h>
16#include <stdlib.h>
17#include <string.h>
18#include <libgen.h> /* basename */
19
20#include <event.h>
21
22#include "transmission.h"
23#include "bencode.h"
24#include "completion.h"
25#include "inout.h"
26#include "peer-io.h"
27#include "peer-mgr.h"
28#include "peer-mgr-private.h"
29#include "peer-msgs.h"
30#include "ratecontrol.h"
31#include "stats.h"
32#include "torrent.h"
33#include "trevent.h"
34#include "utils.h"
35
36/**
37***
38**/
39
40enum
41{
42    BT_CHOKE                = 0,
43    BT_UNCHOKE              = 1,
44    BT_INTERESTED           = 2,
45    BT_NOT_INTERESTED       = 3,
46    BT_HAVE                 = 4,
47    BT_BITFIELD             = 5,
48    BT_REQUEST              = 6,
49    BT_PIECE                = 7,
50    BT_CANCEL               = 8,
51    BT_PORT                 = 9,
52    BT_SUGGEST              = 13,
53    BT_HAVE_ALL             = 14,
54    BT_HAVE_NONE            = 15,
55    BT_REJECT               = 16,
56    BT_ALLOWED_FAST         = 17,
57    BT_LTEP                 = 20,
58
59    LTEP_HANDSHAKE          = 0,
60
61    TR_LTEP_PEX             = 1,
62
63    MIN_CHOKE_PERIOD_SEC    = (10),
64
65    /* idle seconds before we send a keepalive */
66    KEEPALIVE_INTERVAL_SECS = 90,
67
68    PEX_INTERVAL            = (90 * 1000), /* msec between sendPex() calls */
69    PEER_PULSE_INTERVAL     = (100),       /* msec between pulse() calls */
70    RATE_PULSE_INTERVAL     = (250),       /* msec between ratePulse() calls */
71
72    MAX_QUEUE_SIZE          = (100),
73     
74    /* (fast peers) max number of pieces we fast-allow to another peer */
75    MAX_FAST_ALLOWED_COUNT   = 10,
76
77    /* (fast peers) max threshold for allowing fast-pieces requests */
78    MAX_FAST_ALLOWED_THRESHOLD = 10,
79
80    /* how long an unsent request can stay queued before it's returned
81       back to the peer-mgr's pool of requests */
82    QUEUED_REQUEST_TTL_SECS = 20,
83
84    /* how long a sent request can stay queued before it's returned
85       back to the peer-mgr's pool of requests */
86    SENT_REQUEST_TTL_SECS = 120
87};
88
89/**
90***  REQUEST MANAGEMENT
91**/
92
93enum
94{
95    AWAITING_BT_LENGTH,
96    AWAITING_BT_ID,
97    AWAITING_BT_MESSAGE,
98    AWAITING_BT_PIECE
99};
100
101struct peer_request
102{
103    uint32_t index;
104    uint32_t offset;
105    uint32_t length;
106    time_t time_requested;
107};
108
109static int
110compareRequest( const void * va, const void * vb )
111{
112    int i;
113    const struct peer_request * a = va;
114    const struct peer_request * b = vb;
115    if(( i = tr_compareUint32( a->index, b->index ))) return i;
116    if(( i = tr_compareUint32( a->offset, b->offset ))) return i;
117    if(( i = tr_compareUint32( a->length, b->length ))) return i;
118    return 0;
119}
120
121struct request_list
122{
123    uint16_t count;
124    uint16_t max;
125    struct peer_request * requests;
126};
127
128static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
129
130static void
131reqListReserve( struct request_list * list, uint16_t max )
132{
133    if( list->max < max )
134    {
135        list->max = max;
136        list->requests = tr_renew( struct peer_request,
137                                   list->requests,
138                                   list->max );
139    }
140}
141
142static void
143reqListClear( struct request_list * list )
144{
145    tr_free( list->requests );
146    *list = REQUEST_LIST_INIT;
147}
148
149static void
150reqListCopy( struct request_list * dest, const struct request_list * src )
151{
152    dest->count = src->count;
153    dest->max = src->max;
154    dest->requests = tr_new( struct peer_request, dest->max );
155    memcpy( dest->requests, src->requests, sizeof( struct peer_request ) * dest->count );
156}
157
158static void
159reqListRemoveOne( struct request_list * list, int i )
160{
161    assert( 0<=i && i<list->count );
162
163    memmove( &list->requests[i],
164             &list->requests[i+1],
165             sizeof( struct peer_request ) * ( --list->count - i ) );
166}
167
168static void
169reqListAppend( struct request_list * list, const struct peer_request * req )
170{
171    if( ++list->count >= list->max )
172        reqListReserve( list, list->max + 8 );
173
174    list->requests[list->count-1] = *req;
175}
176
177static tr_errno
178reqListPop( struct request_list * list, struct peer_request * setme )
179{
180    tr_errno err;
181
182    if( !list->count )
183        err = TR_ERROR;
184    else {
185        *setme = list->requests[0];
186        reqListRemoveOne( list, 0 );
187        err = TR_OK;
188    }
189
190    return err;
191}
192
193static int
194reqListFind( struct request_list * list, const struct peer_request * key )
195{
196    uint16_t i;
197    for( i=0; i<list->count; ++i )
198        if( !compareRequest( key, list->requests+i ) )
199            return i;
200    return -1;
201}
202
203static tr_errno
204reqListRemove( struct request_list * list, const struct peer_request * key )
205{
206    tr_errno err;
207    const int i = reqListFind( list, key );
208
209    if( i < 0 )
210        err = TR_ERROR;
211    else {
212        err = TR_OK;
213        reqListRemoveOne( list, i );
214    }
215
216    return err;
217}
218
219/**
220***
221**/
222
223/* this is raw, unchanged data from the peer regarding
224 * the current message that it's sending us. */
225struct tr_incoming
226{
227    uint8_t id;
228    uint32_t length; /* includes the +1 for id length */
229    struct peer_request blockReq; /* metadata for incoming blocks */
230    struct evbuffer * block; /* piece data for incoming blocks */
231};
232
233struct tr_peermsgs
234{
235    unsigned int peerSentBitfield         : 1;
236    unsigned int peerSupportsPex          : 1;
237    unsigned int clientSentLtepHandshake  : 1;
238    unsigned int peerSentLtepHandshake    : 1;
239    unsigned int sendingBlock             : 1;
240   
241    uint8_t state;
242    uint8_t ut_pex_id;
243    uint16_t pexCount;
244    uint16_t maxActiveRequests;
245    uint16_t minActiveRequests;
246
247    tr_peer * info;
248
249    tr_handle * handle;
250    tr_torrent * torrent;
251    tr_peerIo * io;
252
253    tr_publisher_t * publisher;
254
255    struct evbuffer * outBlock;    /* buffer of the current piece message */
256    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
257
258    struct request_list peerAskedFor;
259    struct request_list peerAskedForFast;
260    struct request_list clientAskedFor;
261    struct request_list clientWillAskFor;
262
263    tr_timer * rateTimer;
264    tr_timer * pulseTimer;
265    tr_timer * pexTimer;
266
267    time_t clientSentPexAt;
268    time_t clientSentAnythingAt;
269   
270    tr_bitfield * peerAllowedPieces;
271
272    struct tr_incoming incoming; 
273
274    tr_pex * pex;
275};
276
277/**
278***
279**/
280
281static void
282myDebug( const char * file, int line,
283         const struct tr_peermsgs * msgs,
284         const char * fmt, ... )
285{
286    FILE * fp = tr_getLog( );
287    if( fp != NULL )
288    {
289        va_list args;
290        char timestr[64];
291        struct evbuffer * buf = evbuffer_new( );
292        char * myfile = tr_strdup( file );
293
294        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
295                             tr_getLogTimeStr( timestr, sizeof(timestr) ),
296                             msgs->torrent->info.name,
297                             tr_peerIoGetAddrStr( msgs->io ),
298                             msgs->info->client );
299        va_start( args, fmt );
300        evbuffer_add_vprintf( buf, fmt, args );
301        va_end( args );
302        evbuffer_add_printf( buf, " (%s:%d)\n", basename(myfile), line );
303        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
304
305        tr_free( myfile );
306        evbuffer_free( buf );
307    }
308}
309
310#define dbgmsg(msgs, fmt...) myDebug(__FILE__, __LINE__, msgs, ##fmt )
311
312/**
313***
314**/
315
316static void
317protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
318{
319    tr_peerIo * io = msgs->io;
320    struct evbuffer * out = msgs->outMessages;
321
322    dbgmsg( msgs, "requesting %u:%u->%u", req->index, req->offset, req->length );
323    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
324    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
325    tr_peerIoWriteUint32( io, out, req->index );
326    tr_peerIoWriteUint32( io, out, req->offset );
327    tr_peerIoWriteUint32( io, out, req->length );
328}
329
330static void
331protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
332{
333    tr_peerIo * io = msgs->io;
334    struct evbuffer * out = msgs->outMessages;
335
336    dbgmsg( msgs, "cancelling %u:%u->%u", req->index, req->offset, req->length );
337    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
338    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
339    tr_peerIoWriteUint32( io, out, req->index );
340    tr_peerIoWriteUint32( io, out, req->offset );
341    tr_peerIoWriteUint32( io, out, req->length );
342}
343
344static void
345protocolSendHave( tr_peermsgs * msgs, uint32_t index )
346{
347    tr_peerIo * io = msgs->io;
348    struct evbuffer * out = msgs->outMessages;
349
350    dbgmsg( msgs, "sending Have %u", index );
351    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
352    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
353    tr_peerIoWriteUint32( io, out, index );
354}
355
356static void
357protocolSendChoke( tr_peermsgs * msgs, int choke )
358{
359    tr_peerIo * io = msgs->io;
360    struct evbuffer * out = msgs->outMessages;
361
362    dbgmsg( msgs, "sending %s", (choke ? "Choke" : "Unchoke") );
363    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) );
364    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
365}
366
367/**
368***  EVENTS
369**/
370
371static const tr_peermsgs_event blankEvent = { 0, 0, 0, 0, 0.0f, 0 };
372
373static void
374publish( tr_peermsgs * msgs, tr_peermsgs_event * e )
375{
376    tr_publisherPublish( msgs->publisher, msgs->info, e );
377}
378
379static void
380fireError( tr_peermsgs * msgs, tr_errno err )
381{
382    tr_peermsgs_event e = blankEvent;
383    e.eventType = TR_PEERMSG_ERROR;
384    e.err = err;
385    publish( msgs, &e );
386}
387
388static void
389fireNeedReq( tr_peermsgs * msgs )
390{
391    tr_peermsgs_event e = blankEvent;
392    e.eventType = TR_PEERMSG_NEED_REQ;
393    publish( msgs, &e );
394}
395
396static void
397firePeerProgress( tr_peermsgs * msgs )
398{
399    tr_peermsgs_event e = blankEvent;
400    e.eventType = TR_PEERMSG_PEER_PROGRESS;
401    e.progress = msgs->info->progress;
402    publish( msgs, &e );
403}
404
405static void
406fireClientHave( tr_peermsgs * msgs, uint32_t pieceIndex )
407{
408    tr_peermsgs_event e = blankEvent;
409    e.eventType = TR_PEERMSG_CLIENT_HAVE;
410    e.pieceIndex = pieceIndex;
411    publish( msgs, &e );
412}
413
414static void
415fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
416{
417    tr_peermsgs_event e = blankEvent;
418    e.eventType = TR_PEERMSG_CLIENT_BLOCK;
419    e.pieceIndex = req->index;
420    e.offset = req->offset;
421    e.length = req->length;
422    publish( msgs, &e );
423}
424
425static void
426firePieceData( tr_peermsgs * msgs )
427{
428    tr_peermsgs_event e = blankEvent;
429    e.eventType = TR_PEERMSG_PIECE_DATA;
430    publish( msgs, &e );
431}
432
433static void
434fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
435{
436    tr_peermsgs_event e = blankEvent;
437    e.eventType = TR_PEERMSG_CANCEL;
438    e.pieceIndex = req->index;
439    e.offset = req->offset;
440    e.length = req->length;
441    publish( msgs, &e );
442}
443
444/**
445***  INTEREST
446**/
447
448static int
449isPieceInteresting( const tr_peermsgs   * peer,
450                    int                   piece )
451{
452    const tr_torrent * torrent = peer->torrent;
453
454    return ( ( !torrent->info.pieces[piece].dnd )               /* we want it */
455          && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have */
456          && ( tr_bitfieldHas( peer->info->have, piece ) ) );  /* peer has it */
457}
458
459/* "interested" means we'll ask for piece data if they unchoke us */
460static int
461isPeerInteresting( const tr_peermsgs * msgs )
462{
463    tr_piece_index_t i;
464    const tr_torrent * torrent;
465    const tr_bitfield * bitfield;
466    const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
467
468    if( clientIsSeed )
469        return FALSE;
470
471    torrent = msgs->torrent;
472    bitfield = tr_cpPieceBitfield( torrent->completion );
473
474    if( !msgs->info->have )
475        return TRUE;
476
477    assert( bitfield->len == msgs->info->have->len );
478    for( i=0; i<torrent->info.pieceCount; ++i )
479        if( isPieceInteresting( msgs, i ) )
480            return TRUE;
481
482    return FALSE;
483}
484
485static void
486sendInterest( tr_peermsgs * msgs, int weAreInterested )
487{
488    assert( msgs != NULL );
489    assert( weAreInterested==0 || weAreInterested==1 );
490
491    msgs->info->clientIsInterested = weAreInterested;
492    dbgmsg( msgs, "Sending %s",
493            weAreInterested ? "Interested" : "Not Interested");
494
495    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
496    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages,
497                   weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
498}
499
500static void
501updateInterest( tr_peermsgs * msgs )
502{
503    const int i = isPeerInteresting( msgs );
504    if( i != msgs->info->clientIsInterested )
505        sendInterest( msgs, i );
506    if( i )
507        fireNeedReq( msgs );
508}
509
510static void
511cancelAllRequestsToClientExceptFast( tr_peermsgs * msgs )
512{
513    reqListClear( &msgs->peerAskedFor );
514}
515
516void
517tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
518{
519    const time_t fibrillationTime = time(NULL) - MIN_CHOKE_PERIOD_SEC;
520
521    assert( msgs != NULL );
522    assert( msgs->info != NULL );
523    assert( choke==0 || choke==1 );
524
525    if( msgs->info->chokeChangedAt > fibrillationTime )
526    {
527        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
528    }
529    else if( msgs->info->peerIsChoked != choke )
530    {
531        msgs->info->peerIsChoked = choke;
532        if( choke )
533            cancelAllRequestsToClientExceptFast( msgs );
534        protocolSendChoke( msgs, choke );
535        msgs->info->chokeChangedAt = time( NULL );
536    }
537}
538
539/**
540***
541**/
542
543void
544tr_peerMsgsHave( tr_peermsgs * msgs,
545                 uint32_t      index )
546{
547    protocolSendHave( msgs, index );
548
549    /* since we have more pieces now, we might not be interested in this peer */
550    updateInterest( msgs );
551}
552#if 0
553static void
554sendFastSuggest( tr_peermsgs * msgs,
555                 uint32_t      pieceIndex )
556{
557    assert( msgs != NULL );
558   
559    if( tr_peerIoSupportsFEXT( msgs->io ) )
560    {
561        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
562        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
563        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
564    }
565}
566static void
567sendFastHave( tr_peermsgs * msgs, int all )
568{
569    assert( msgs != NULL );
570   
571    if( tr_peerIoSupportsFEXT( msgs->io ) )
572    {
573        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
574        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, ( all ? BT_HAVE_ALL
575                                                                : BT_HAVE_NONE ) );
576        updateInterest( msgs );
577    }
578}
579#endif
580
581static void
582sendFastReject( tr_peermsgs * msgs,
583                uint32_t      pieceIndex,
584                uint32_t      offset,
585                uint32_t      length )
586{
587    assert( msgs != NULL );
588
589    if( tr_peerIoSupportsFEXT( msgs->io ) )
590    {
591        const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
592        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
593        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_REJECT );
594        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
595        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
596        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
597    }
598}
599
600static tr_bitfield*
601getPeerAllowedPieces( tr_peermsgs * msgs )
602{
603    if( !msgs->peerAllowedPieces && tr_peerIoSupportsFEXT( msgs->io ) )
604    {
605        msgs->peerAllowedPieces = tr_peerMgrGenerateAllowedSet(
606            MAX_FAST_ALLOWED_COUNT,
607            msgs->torrent->info.pieceCount,
608            msgs->torrent->info.hash,
609            tr_peerIoGetAddress( msgs->io, NULL ) );
610    }
611
612    return msgs->peerAllowedPieces;
613}
614
615static void
616sendFastAllowed( tr_peermsgs * msgs,
617                 uint32_t      pieceIndex)
618{
619    assert( msgs != NULL );
620   
621    if( tr_peerIoSupportsFEXT( msgs->io ) )
622    {
623        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
624        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_ALLOWED_FAST );
625        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
626    }
627}
628
629static void
630sendFastAllowedSet( tr_peermsgs * msgs )
631{
632    tr_piece_index_t i = 0;
633
634    while (i <= msgs->torrent->info.pieceCount )
635    {
636        if ( tr_bitfieldHas( getPeerAllowedPieces( msgs ), i) )
637            sendFastAllowed( msgs, i );
638        i++;
639    }
640}
641
642static void
643maybeSendFastAllowedSet( tr_peermsgs * msgs )
644{
645    if( tr_bitfieldCountTrueBits( msgs->info->have ) <= MAX_FAST_ALLOWED_THRESHOLD )
646        sendFastAllowedSet( msgs );
647}
648
649
650/**
651***
652**/
653
654static int
655reqIsValid( const tr_peermsgs   * msgs,
656            uint32_t              index,
657            uint32_t              offset,
658            uint32_t              length )
659{
660    return tr_torrentReqIsValid( msgs->torrent, index, offset, length );
661}
662
663static int
664requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
665{
666    return reqIsValid( msgs, req->index, req->offset, req->length );
667}
668
669static void
670expireOldRequests( tr_peermsgs * msgs )
671{
672    int i;
673    time_t oldestAllowed;
674    struct request_list tmp = REQUEST_LIST_INIT;
675
676    /* cancel requests that have been queued for too long */
677    oldestAllowed = time( NULL ) - QUEUED_REQUEST_TTL_SECS;
678    reqListCopy( &tmp, &msgs->clientWillAskFor );
679    for( i=0; i<tmp.count; ++i ) {
680        const struct peer_request * req = &tmp.requests[i];
681        if( req->time_requested < oldestAllowed )
682            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
683    }
684    reqListClear( &tmp );
685
686    /* cancel requests that were sent too long ago */
687    oldestAllowed = time( NULL ) - SENT_REQUEST_TTL_SECS;
688    reqListCopy( &tmp, &msgs->clientAskedFor );
689    for( i=0; i<tmp.count; ++i ) {
690        const struct peer_request * req = &tmp.requests[i];
691        if( req->time_requested < oldestAllowed )
692            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
693    }
694    reqListClear( &tmp );
695}
696
697static void
698pumpRequestQueue( tr_peermsgs * msgs )
699{
700    const int max = msgs->maxActiveRequests;
701    const int min = msgs->minActiveRequests;
702    const time_t now = time( NULL );
703    int sent = 0;
704    int count = msgs->clientAskedFor.count;
705    struct peer_request req;
706
707    if( count > min )
708        return;
709    if( msgs->info->clientIsChoked )
710        return;
711
712    while( ( count < max ) && !reqListPop( &msgs->clientWillAskFor, &req ) )
713    {
714        assert( requestIsValid( msgs, &req ) );
715        assert( tr_bitfieldHas( msgs->info->have, req.index ) );
716
717        protocolSendRequest( msgs, &req );
718        req.time_requested = now;
719        reqListAppend( &msgs->clientAskedFor, &req );
720
721        ++count;
722        ++sent;
723    }
724
725    if( sent )
726        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
727                sent,
728                msgs->clientAskedFor.count,
729                msgs->clientWillAskFor.count );
730
731    if( count < max )
732        fireNeedReq( msgs );
733}
734
735static int
736pulse( void * vmsgs );
737
738int
739tr_peerMsgsAddRequest( tr_peermsgs * msgs,
740                       uint32_t      index, 
741                       uint32_t      offset, 
742                       uint32_t      length )
743{
744    const int req_max = msgs->maxActiveRequests;
745    struct peer_request req;
746
747    assert( msgs != NULL );
748    assert( msgs->torrent != NULL );
749    assert( reqIsValid( msgs, index, offset, length ) );
750
751    /**
752    ***  Reasons to decline the request
753    **/
754
755    /* don't send requests to choked clients */
756    if( msgs->info->clientIsChoked ) {
757        dbgmsg( msgs, "declining request because they're choking us" );
758        return TR_ADDREQ_CLIENT_CHOKED;
759    }
760
761    /* peer doesn't have this piece */
762    if( !tr_bitfieldHas( msgs->info->have, index ) )
763        return TR_ADDREQ_MISSING;
764
765    /* peer's queue is full */
766    if( msgs->clientWillAskFor.count >= req_max ) {
767        dbgmsg( msgs, "declining request because we're full" );
768        return TR_ADDREQ_FULL;
769    }
770
771    /* have we already asked for this piece? */
772    req.index = index;
773    req.offset = offset;
774    req.length = length;
775    if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
776        dbgmsg( msgs, "declining because it's a duplicate" );
777        return TR_ADDREQ_DUPLICATE;
778    }
779    if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
780        dbgmsg( msgs, "declining because it's a duplicate" );
781        return TR_ADDREQ_DUPLICATE;
782    }
783
784    /**
785    ***  Accept this request
786    **/
787
788    dbgmsg( msgs, "added req for piece %d, offset %d", (int)index, (int)offset );
789    req.time_requested = time( NULL );
790    reqListAppend( &msgs->clientWillAskFor, &req );
791    return TR_ADDREQ_OK;
792}
793
794static void
795cancelAllRequestsToPeer( tr_peermsgs * msgs )
796{
797    int i;
798    struct request_list a = msgs->clientWillAskFor;
799    struct request_list b = msgs->clientAskedFor;
800
801    msgs->clientAskedFor = REQUEST_LIST_INIT;
802    msgs->clientWillAskFor = REQUEST_LIST_INIT;
803
804    for( i=0; i<a.count; ++i )
805        fireCancelledReq( msgs, &a.requests[i] );
806
807    for( i=0; i<b.count; ++i ) {
808        fireCancelledReq( msgs, &b.requests[i] );
809        protocolSendCancel( msgs, &b.requests[i] );
810    }
811
812    reqListClear( &a );
813    reqListClear( &b );
814}
815
816void
817tr_peerMsgsCancel( tr_peermsgs * msgs,
818                   uint32_t      pieceIndex,
819                   uint32_t      offset,
820                   uint32_t      length )
821{
822    struct peer_request req;
823
824    assert( msgs != NULL );
825    assert( length > 0 );
826
827    /* have we asked the peer for this piece? */
828    req.index = pieceIndex;
829    req.offset = offset;
830    req.length = length;
831
832    /* if it's only in the queue and hasn't been sent yet, free it */
833    if( !reqListRemove( &msgs->clientWillAskFor, &req ) )
834        fireCancelledReq( msgs, &req );
835
836    /* if it's already been sent, send a cancel message too */
837    if( !reqListRemove( &msgs->clientAskedFor, &req ) ) {
838        protocolSendCancel( msgs, &req );
839        fireCancelledReq( msgs, &req );
840    }
841}
842
843/**
844***
845**/
846
847static void
848sendLtepHandshake( tr_peermsgs * msgs )
849{
850    tr_benc val, *m;
851    char * buf;
852    int len;
853    int pex;
854    struct evbuffer * outbuf;
855
856    if( msgs->clientSentLtepHandshake )
857        return;
858
859    outbuf = evbuffer_new( );
860    dbgmsg( msgs, "sending an ltep handshake" );
861    msgs->clientSentLtepHandshake = 1;
862
863    /* decide if we want to advertise pex support */
864    if( !tr_torrentAllowsPex( msgs->torrent ) )
865        pex = 0;
866    else if( msgs->peerSentLtepHandshake )
867        pex = msgs->peerSupportsPex ? 1 : 0;
868    else
869        pex = 1;
870
871    tr_bencInitDict( &val, 4 );
872    tr_bencDictAddInt( &val, "e", msgs->handle->encryptionMode != TR_PLAINTEXT_PREFERRED );
873    tr_bencDictAddInt( &val, "p", tr_getPublicPort( msgs->handle ) );
874    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
875    m  = tr_bencDictAddDict( &val, "m", 1 );
876    if( pex )
877        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
878    buf = tr_bencSave( &val, &len );
879
880    tr_peerIoWriteUint32( msgs->io, outbuf, 2*sizeof(uint8_t) + len );
881    tr_peerIoWriteUint8 ( msgs->io, outbuf, BT_LTEP );
882    tr_peerIoWriteUint8 ( msgs->io, outbuf, LTEP_HANDSHAKE );
883    tr_peerIoWriteBytes ( msgs->io, outbuf, buf, len );
884
885    tr_peerIoWriteBuf( msgs->io, outbuf );
886
887    dbgmsg( msgs, "here is the ltep handshake we sent [%*.*s]", len, len, buf );
888
889    /* cleanup */
890    tr_bencFree( &val );
891    tr_free( buf );
892    evbuffer_free( outbuf );
893}
894
895static void
896parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
897{
898    tr_benc val, * sub;
899    uint8_t * tmp = tr_new( uint8_t, len );
900
901    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
902    msgs->peerSentLtepHandshake = 1;
903
904    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
905        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
906        tr_free( tmp );
907        return;
908    }
909
910    dbgmsg( msgs, "here is the ltep handshake we got [%*.*s]", len, len, tmp );
911
912    /* does the peer prefer encrypted connections? */
913    if(( sub = tr_bencDictFindType( &val, "e", TYPE_INT )))
914        msgs->info->encryption_preference = sub->val.i
915                                      ? ENCRYPTION_PREFERENCE_YES
916                                      : ENCRYPTION_PREFERENCE_NO;
917
918    /* check supported messages for utorrent pex */
919    msgs->peerSupportsPex = 0;
920    if(( sub = tr_bencDictFindType( &val, "m", TYPE_DICT ))) {
921        if(( sub = tr_bencDictFindType( sub, "ut_pex", TYPE_INT ))) {
922            msgs->ut_pex_id = (uint8_t) sub->val.i;
923            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
924            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
925        }
926    }
927
928    /* get peer's listening port */
929    if(( sub = tr_bencDictFindType( &val, "p", TYPE_INT ))) {
930        msgs->info->port = htons( (uint16_t)sub->val.i );
931        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
932    }
933
934    tr_bencFree( &val );
935    tr_free( tmp );
936}
937
938static void
939parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
940{
941    int loaded = 0;
942    uint8_t * tmp = tr_new( uint8_t, msglen );
943    tr_benc val, *added;
944    const tr_torrent * tor = msgs->torrent;
945    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
946
947    if( tr_torrentAllowsPex( tor )
948        && (( loaded = !tr_bencLoad( tmp, msglen, &val, NULL )))
949        && (( added = tr_bencDictFindType( &val, "added", TYPE_STR ))))
950    {
951        const char * added_f = NULL;
952        tr_pex * pex;
953        size_t i, n;
954        tr_bencDictFindStr( &val, "added.f", &added_f );
955        pex = tr_peerMgrCompactToPex( added->val.s.s, added->val.s.i, added_f, &n );
956        for( i=0; i<n; ++i )
957            tr_peerMgrAddPex( msgs->handle->peerMgr, tor->info.hash,
958                              TR_PEER_FROM_PEX, pex+i );
959        tr_free( pex );
960    }
961
962    if( loaded )
963        tr_bencFree( &val );
964    tr_free( tmp );
965}
966
967static void
968sendPex( tr_peermsgs * msgs );
969
970static void
971parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
972{
973    uint8_t ltep_msgid;
974
975    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
976    msglen--;
977
978    if( ltep_msgid == LTEP_HANDSHAKE )
979    {
980        dbgmsg( msgs, "got ltep handshake" );
981        parseLtepHandshake( msgs, msglen, inbuf );
982        if( tr_peerIoSupportsLTEP( msgs->io ) )
983        {
984            sendLtepHandshake( msgs );
985            sendPex( msgs );
986        }
987    }
988    else if( ltep_msgid == TR_LTEP_PEX )
989    {
990        dbgmsg( msgs, "got ut pex" );
991        msgs->peerSupportsPex = 1;
992        parseUtPex( msgs, msglen, inbuf );
993    }
994    else
995    {
996        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
997        evbuffer_drain( inbuf, msglen );
998    }
999}
1000
1001static int
1002readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1003{
1004    uint32_t len;
1005
1006    if( inlen < sizeof(len) )
1007        return READ_MORE;
1008
1009    tr_peerIoReadUint32( msgs->io, inbuf, &len );
1010
1011    if( len == 0 ) /* peer sent us a keepalive message */
1012        dbgmsg( msgs, "got KeepAlive" );
1013    else {
1014        msgs->incoming.length = len;
1015        msgs->state = AWAITING_BT_ID;
1016    }
1017
1018    return READ_AGAIN;
1019}
1020
1021static int
1022readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen );
1023
1024static int
1025readBtId( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1026{
1027    uint8_t id;
1028
1029    if( inlen < sizeof(uint8_t) )
1030        return READ_MORE;
1031
1032    tr_peerIoReadUint8( msgs->io, inbuf, &id );
1033    msgs->incoming.id = id;
1034
1035    if( id==BT_PIECE )
1036    {
1037        msgs->state = AWAITING_BT_PIECE;
1038        return READ_AGAIN;
1039    }
1040    else if( msgs->incoming.length != 1 )
1041    {
1042        msgs->state = AWAITING_BT_MESSAGE;
1043        return READ_AGAIN;
1044    }
1045    else return readBtMessage( msgs, inbuf, inlen-1 );
1046}
1047
1048static void
1049updatePeerProgress( tr_peermsgs * msgs )
1050{
1051    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have )
1052                         / (float)msgs->torrent->info.pieceCount;
1053    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
1054    updateInterest( msgs );
1055    firePeerProgress( msgs );
1056}
1057
1058static int
1059clientCanSendFastBlock( const tr_peermsgs * msgs UNUSED )
1060{
1061    /* don't send a fast piece if peer has MAX_FAST_ALLOWED_THRESHOLD pieces */
1062    if( tr_bitfieldCountTrueBits( msgs->info->have ) > MAX_FAST_ALLOWED_THRESHOLD )
1063        return FALSE;
1064   
1065    /* ...or if we don't have ourself enough pieces */
1066    if( tr_bitfieldCountTrueBits( tr_cpPieceBitfield( msgs->torrent->completion ) ) < MAX_FAST_ALLOWED_THRESHOLD )
1067        return FALSE;
1068
1069    /* Maybe a bandwidth limit ? */
1070    return TRUE;
1071}
1072
1073static void
1074peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
1075{
1076    const int reqIsValid = requestIsValid( msgs, req );
1077    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
1078    const int peerIsChoked = msgs->info->peerIsChoked;
1079    const int peerIsFast = tr_peerIoSupportsFEXT( msgs->io );
1080    const int pieceIsFast = reqIsValid && tr_bitfieldHas( getPeerAllowedPieces( msgs ), req->index );
1081    const int canSendFast = clientCanSendFastBlock( msgs );
1082
1083    if( !reqIsValid ) /* bad request */
1084    {
1085        dbgmsg( msgs, "rejecting an invalid request." );
1086        sendFastReject( msgs, req->index, req->offset, req->length );
1087    }
1088    else if( !clientHasPiece ) /* we don't have it */
1089    {
1090        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1091        sendFastReject( msgs, req->index, req->offset, req->length );
1092    }
1093    else if( peerIsChoked && !peerIsFast ) /* doesn't he know he's choked? */
1094    {
1095        tr_peerMsgsSetChoke( msgs, 1 );
1096        sendFastReject( msgs, req->index, req->offset, req->length );
1097    }
1098    else if( peerIsChoked && peerIsFast && ( !pieceIsFast || !canSendFast ) )
1099    {
1100        sendFastReject( msgs, req->index, req->offset, req->length );
1101    }
1102    else /* YAY */
1103    {
1104        if( peerIsFast && pieceIsFast )
1105            reqListAppend( &msgs->peerAskedForFast, req );
1106        else
1107            reqListAppend( &msgs->peerAskedFor, req );
1108    }
1109}
1110
1111static int
1112messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1113{
1114    switch( id )
1115    {
1116        case BT_CHOKE:
1117        case BT_UNCHOKE:
1118        case BT_INTERESTED:
1119        case BT_NOT_INTERESTED:
1120        case BT_HAVE_ALL:
1121        case BT_HAVE_NONE:
1122            return len==1;
1123
1124        case BT_HAVE:
1125        case BT_SUGGEST:
1126        case BT_ALLOWED_FAST:
1127            return len==5;
1128
1129        case BT_BITFIELD:
1130            return len == (msg->torrent->info.pieceCount+7u)/8u + 1u;
1131       
1132        case BT_REQUEST:
1133        case BT_CANCEL:
1134        case BT_REJECT:
1135            return len==13;
1136
1137        case BT_PIECE:
1138            return len>9 && len<=16393;
1139
1140        case BT_PORT:
1141            return len==3;
1142
1143        case BT_LTEP:
1144            return len >= 2;
1145
1146        default:
1147            return FALSE;
1148    }
1149}
1150
1151static int
1152clientGotBlock( tr_peermsgs * msgs, const uint8_t * block, const struct peer_request * req );
1153
1154static void
1155clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1156{
1157    const time_t now = time( NULL );
1158    tr_torrent * tor = msgs->torrent;
1159    tor->activityDate = tr_date( );
1160    tor->downloadedCur += byteCount;
1161    msgs->info->pieceDataActivityDate = now;
1162    msgs->info->credit += (int)(byteCount * SWIFT_REPAYMENT_RATIO);
1163    tr_rcTransferred( msgs->info->rcToClient, byteCount );
1164    tr_rcTransferred( tor->download, byteCount );
1165    tr_rcTransferred( tor->handle->download, byteCount );
1166    tr_statsAddDownloaded( msgs->handle, byteCount );
1167    firePieceData( msgs );
1168}
1169
1170static int
1171readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1172{
1173    struct peer_request * req = &msgs->incoming.blockReq;
1174    assert( EVBUFFER_LENGTH(inbuf) >= inlen );
1175    dbgmsg( msgs, "In readBtPiece" );
1176
1177    if( !req->length )
1178    {
1179        if( inlen < 8 )
1180            return READ_MORE;
1181
1182        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
1183        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
1184        req->length = msgs->incoming.length - 9;
1185        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1186        return READ_AGAIN;
1187    }
1188    else
1189    {
1190        int err;
1191
1192        /* read in another chunk of data */
1193        const size_t nLeft = req->length - EVBUFFER_LENGTH(msgs->incoming.block);
1194        size_t n = MIN( nLeft, inlen );
1195        uint8_t * buf = tr_new( uint8_t, n );
1196        assert( EVBUFFER_LENGTH(inbuf) >= n );
1197        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
1198        evbuffer_add( msgs->incoming.block, buf, n );
1199        clientGotBytes( msgs, n );
1200        tr_free( buf );
1201        dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
1202               (int)n, req->index, req->offset, req->length,
1203               (int)( req->length - EVBUFFER_LENGTH(msgs->incoming.block) ) );
1204        if( EVBUFFER_LENGTH(msgs->incoming.block) < req->length )
1205            return READ_MORE;
1206
1207        /* we've got the whole block ... process it */
1208        err = clientGotBlock( msgs, EVBUFFER_DATA(msgs->incoming.block), req );
1209
1210        /* cleanup */
1211        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH(msgs->incoming.block) );
1212        req->length = 0;
1213        msgs->state = AWAITING_BT_LENGTH;
1214        if( !err )
1215            return READ_AGAIN;
1216        else {
1217            fireError( msgs, err );
1218            return READ_DONE;
1219        }
1220    }
1221}
1222
1223static int
1224readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1225{
1226    uint32_t ui32;
1227    uint32_t msglen = msgs->incoming.length;
1228    const uint8_t id = msgs->incoming.id;
1229    const size_t startBufLen = EVBUFFER_LENGTH( inbuf );
1230
1231    --msglen; /* id length */
1232
1233    if( inlen < msglen )
1234        return READ_MORE;
1235
1236    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id, (int)msglen, (int)inlen );
1237
1238    if( !messageLengthIsCorrect( msgs, id, msglen+1 ) )
1239    {
1240        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1241        fireError( msgs, TR_ERROR );
1242        return READ_DONE;
1243    }
1244
1245    switch( id )
1246    {
1247        case BT_CHOKE:
1248            dbgmsg( msgs, "got Choke" );
1249            msgs->info->clientIsChoked = 1;
1250            cancelAllRequestsToPeer( msgs );
1251            cancelAllRequestsToClientExceptFast( msgs );
1252            break;
1253
1254        case BT_UNCHOKE:
1255            dbgmsg( msgs, "got Unchoke" );
1256            msgs->info->clientIsChoked = 0;
1257            fireNeedReq( msgs );
1258            break;
1259
1260        case BT_INTERESTED:
1261            dbgmsg( msgs, "got Interested" );
1262            msgs->info->peerIsInterested = 1;
1263            tr_peerMsgsSetChoke( msgs, 0 );
1264            break;
1265
1266        case BT_NOT_INTERESTED:
1267            dbgmsg( msgs, "got Not Interested" );
1268            msgs->info->peerIsInterested = 0;
1269            break;
1270           
1271        case BT_HAVE:
1272            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1273            dbgmsg( msgs, "got Have: %u", ui32 );
1274            if( tr_bitfieldAdd( msgs->info->have, ui32 ) )
1275                fireError( msgs, TR_ERROR_PEER_MESSAGE );
1276            updatePeerProgress( msgs );
1277            tr_rcTransferred( msgs->torrent->swarmspeed, msgs->torrent->info.pieceSize );
1278            break;
1279
1280        case BT_BITFIELD: {
1281            const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
1282            int peerIsSeed;
1283            dbgmsg( msgs, "got a bitfield" );
1284            msgs->peerSentBitfield = 1;
1285            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
1286            updatePeerProgress( msgs );
1287            maybeSendFastAllowedSet( msgs );
1288            peerIsSeed = msgs->info->progress >= 1.0;
1289            tr_peerMsgsSetChoke( msgs, clientIsSeed && peerIsSeed );
1290            fireNeedReq( msgs );
1291            break;
1292        }
1293
1294        case BT_REQUEST: {
1295            struct peer_request r;
1296            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1297            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1298            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1299            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1300            peerMadeRequest( msgs, &r );
1301            break;
1302        }
1303
1304        case BT_CANCEL: {
1305            struct peer_request r;
1306            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1307            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1308            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1309            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1310            reqListRemove( &msgs->peerAskedForFast, &r );
1311            reqListRemove( &msgs->peerAskedFor, &r );
1312            break;
1313        }
1314
1315        case BT_PIECE:
1316            assert( 0 ); /* handled elsewhere! */
1317            break;
1318       
1319        case BT_PORT:
1320            dbgmsg( msgs, "Got a BT_PORT" );
1321            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1322            break;
1323           
1324        case BT_SUGGEST: {
1325            dbgmsg( msgs, "Got a BT_SUGGEST" );
1326            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1327            /* we don't do anything with this yet */
1328            break;
1329        }
1330           
1331        case BT_HAVE_ALL:
1332            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1333            tr_bitfieldAddRange( msgs->info->have, 0, msgs->torrent->info.pieceCount );
1334            updatePeerProgress( msgs );
1335            maybeSendFastAllowedSet( msgs );
1336            break;
1337       
1338       
1339        case BT_HAVE_NONE:
1340            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1341            tr_bitfieldClear( msgs->info->have );
1342            updatePeerProgress( msgs );
1343            maybeSendFastAllowedSet( msgs );
1344            break;
1345       
1346        case BT_REJECT: {
1347            struct peer_request r;
1348            dbgmsg( msgs, "Got a BT_REJECT" );
1349            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1350            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1351            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1352            reqListRemove( &msgs->clientAskedFor, &r );
1353            break;
1354        }
1355
1356        case BT_ALLOWED_FAST: {
1357            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1358            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1359            /* we don't do anything with this yet */
1360            break;
1361        }
1362
1363        case BT_LTEP:
1364            dbgmsg( msgs, "Got a BT_LTEP" );
1365            parseLtep( msgs, msglen, inbuf );
1366            break;
1367
1368        default:
1369            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1370            tr_peerIoDrain( msgs->io, inbuf, msglen );
1371            break;
1372    }
1373
1374    assert( msglen + 1 == msgs->incoming.length );
1375    assert( EVBUFFER_LENGTH(inbuf) == startBufLen - msglen );
1376
1377    msgs->state = AWAITING_BT_LENGTH;
1378    return READ_AGAIN;
1379}
1380
1381static void
1382peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1383{
1384    const time_t now = time( NULL );
1385    tr_torrent * tor = msgs->torrent;
1386    tor->activityDate = tr_date( );
1387    tor->uploadedCur += byteCount;
1388    msgs->info->pieceDataActivityDate = now;
1389    msgs->info->credit -= byteCount;
1390    tr_rcTransferred( msgs->info->rcToPeer, byteCount );
1391    tr_rcTransferred( tor->upload, byteCount );
1392    tr_rcTransferred( tor->handle->upload, byteCount );
1393    tr_statsAddUploaded( msgs->handle, byteCount );
1394    firePieceData( msgs );
1395}
1396
1397static size_t
1398getDownloadMax( const tr_peermsgs * msgs )
1399{
1400    static const size_t maxval = ~0;
1401    const tr_torrent * tor = msgs->torrent;
1402
1403    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1404        return tor->handle->useDownloadLimit
1405            ? tr_rcBytesLeft( tor->handle->download ) : maxval;
1406
1407    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
1408        return tr_rcBytesLeft( tor->download );
1409
1410    return maxval;
1411}
1412
1413static void
1414decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount )
1415{
1416    tr_torrent * tor = msgs->torrent;
1417    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1418}
1419
1420static void
1421reassignBytesToCorrupt( tr_peermsgs * msgs, uint32_t byteCount )
1422{
1423    tr_torrent * tor = msgs->torrent;
1424
1425    tor->corruptCur += byteCount;
1426
1427    decrementDownloadedCount( msgs, byteCount );
1428}
1429
1430static void
1431gotBadPiece( tr_peermsgs * msgs, tr_piece_index_t pieceIndex )
1432{
1433    const uint32_t byteCount =
1434        tr_torPieceCountBytes( msgs->torrent, pieceIndex );
1435    reassignBytesToCorrupt( msgs, byteCount );
1436}
1437
1438static void
1439clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1440{
1441    decrementDownloadedCount( msgs, req->length );
1442}
1443
1444static void
1445addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1446{
1447    if( !msgs->info->blame )
1448         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1449    tr_bitfieldAdd( msgs->info->blame, index );
1450}
1451
1452static tr_errno
1453clientGotBlock( tr_peermsgs                * msgs,
1454                const uint8_t              * data,
1455                const struct peer_request  * req )
1456{
1457    int err;
1458    tr_torrent * tor = msgs->torrent;
1459    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1460
1461    assert( msgs != NULL );
1462    assert( req != NULL );
1463
1464    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) )
1465    {
1466        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1467                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1468        return TR_ERROR;
1469    }
1470
1471    /* save the block */
1472    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1473
1474    /**
1475    *** Remove the block from our `we asked for this' list
1476    **/
1477
1478    if( reqListRemove( &msgs->clientAskedFor, req ) )
1479    {
1480        clientGotUnwantedBlock( msgs, req );
1481        dbgmsg( msgs, "we didn't ask for this message..." );
1482        return 0;
1483    }
1484
1485    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1486                  msgs->clientAskedFor.count );
1487
1488    /**
1489    *** Error checks
1490    **/
1491
1492    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1493        dbgmsg( msgs, "we have this block already..." );
1494        clientGotUnwantedBlock( msgs, req );
1495        return 0;
1496    }
1497
1498    /**
1499    ***  Save the block
1500    **/
1501
1502    msgs->info->peerSentPieceDataAt = time( NULL );
1503    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1504        return err;
1505
1506    tr_cpBlockAdd( tor->completion, block );
1507
1508    addPeerToBlamefield( msgs, req->index );
1509
1510    fireGotBlock( msgs, req );
1511
1512    /**
1513    ***  Handle if this was the last block in the piece
1514    **/
1515
1516    if( tr_cpPieceIsComplete( tor->completion, req->index ) )
1517    {
1518        const tr_errno err = tr_ioTestPiece( tor, req->index );
1519
1520        if( err )
1521            tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test: %s" ),
1522                       (unsigned long)req->index,
1523                       tr_errorString( err ) );
1524
1525        tr_torrentSetHasPiece( tor, req->index, !err );
1526        tr_torrentSetPieceChecked( tor, req->index, TRUE );
1527        tr_peerMgrSetBlame( tor->handle->peerMgr, tor->info.hash, req->index, !err );
1528
1529        if( !err )
1530            fireClientHave( msgs, req->index );
1531        else
1532            gotBadPiece( msgs, req->index );
1533    }
1534
1535    return 0;
1536}
1537
1538static void
1539didWrite( struct bufferevent * evin UNUSED, void * vmsgs )
1540{
1541    pulse( vmsgs );
1542}
1543
1544static ReadState
1545canRead( struct bufferevent * evin, void * vmsgs )
1546{
1547    ReadState ret;
1548    tr_peermsgs * msgs = vmsgs;
1549    struct evbuffer * in = EVBUFFER_INPUT ( evin );
1550    const size_t inlen = EVBUFFER_LENGTH( in );
1551
1552    if( !inlen )
1553    {
1554        ret = READ_DONE;
1555    }
1556    else if( msgs->state == AWAITING_BT_PIECE )
1557    {
1558        const size_t downloadMax = getDownloadMax( msgs );
1559        const size_t n = MIN( inlen, downloadMax );
1560        ret = n ? readBtPiece( msgs, in, n ) : READ_DONE;
1561    }
1562    else switch( msgs->state )
1563    {
1564        case AWAITING_BT_LENGTH:  ret = readBtLength ( msgs, in, inlen ); break;
1565        case AWAITING_BT_ID:      ret = readBtId     ( msgs, in, inlen ); break;
1566        case AWAITING_BT_MESSAGE: ret = readBtMessage( msgs, in, inlen ); break;
1567        default:                  assert( 0 );
1568    }
1569
1570    return ret;
1571}
1572
1573static void
1574sendKeepalive( tr_peermsgs * msgs )
1575{
1576    dbgmsg( msgs, "sending a keepalive message" );
1577    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1578}
1579
1580/**
1581***
1582**/
1583
1584static int
1585isSwiftEnabled( const tr_peermsgs * msgs )
1586{
1587    /* rationale: SWIFT is good for getting rid of deadbeats, but most
1588     * private trackers have ratios where you _want_ to feed deadbeats
1589     * as much as possible.  So we disable SWIFT on private torrents */
1590    return SWIFT_ENABLED
1591        && !tr_torrentIsSeed( msgs->torrent )
1592        && !tr_torrentIsPrivate( msgs->torrent );
1593}
1594
1595static size_t
1596getUploadMax( const tr_peermsgs * msgs )
1597{
1598    static const size_t maxval = ~0;
1599    const tr_torrent * tor = msgs->torrent;
1600    const int useSwift = isSwiftEnabled( msgs );
1601    const size_t swiftLeft = msgs->info->credit;
1602    int speedLeft;
1603    int bufLeft;
1604    size_t ret;
1605
1606    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1607        speedLeft = tor->handle->useUploadLimit ? tr_rcBytesLeft( tor->handle->upload ) : maxval;
1608    else if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
1609        speedLeft = tr_rcBytesLeft( tor->upload );
1610    else
1611        speedLeft = INT_MAX;
1612
1613    /* this basically says the outbuf shouldn't have more than one block
1614     * queued up in it... blocksize, +13 for the size of the BT protocol's
1615     * block message overhead */
1616    bufLeft = tor->blockSize + 13 - tr_peerIoWriteBytesWaiting( msgs->io );
1617    ret = MIN( speedLeft, bufLeft );
1618    if( useSwift)
1619        ret = MIN( ret, swiftLeft );
1620    return ret;
1621}
1622
1623static int
1624ratePulse( void * vmsgs )
1625{
1626    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1627    msgs->info->rateToClient = tr_rcRate( msgs->info->rcToClient );
1628    msgs->info->rateToPeer = tr_rcRate( msgs->info->rcToPeer );
1629    msgs->maxActiveRequests = MIN( 4 + (int)(msgs->info->rateToClient/4), MAX_QUEUE_SIZE );
1630    msgs->minActiveRequests = msgs->maxActiveRequests / 3;
1631    return TRUE;
1632}
1633
1634static tr_errno
1635popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
1636{
1637    if( !reqListPop( &msgs->peerAskedForFast, setme ) )
1638        return 0;
1639    if( !reqListPop( &msgs->peerAskedFor, setme ) )
1640        return 0;
1641
1642    return TR_ERROR;
1643}
1644
1645static int
1646pulse( void * vmsgs )
1647{
1648    const time_t now = time( NULL );
1649    tr_peermsgs * msgs = vmsgs;
1650
1651    tr_peerIoTryRead( msgs->io );
1652    pumpRequestQueue( msgs );
1653    expireOldRequests( msgs );
1654
1655    if( msgs->sendingBlock )
1656    {
1657        const size_t uploadMax = getUploadMax( msgs );
1658        size_t len = EVBUFFER_LENGTH( msgs->outBlock );
1659        const size_t outlen = MIN( len, uploadMax );
1660
1661        assert( len );
1662
1663        if( outlen )
1664        {
1665            tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen );
1666            evbuffer_drain( msgs->outBlock, outlen );
1667            peerGotBytes( msgs, outlen );
1668
1669            len -= outlen;
1670            msgs->clientSentAnythingAt = now;
1671            msgs->sendingBlock = len!=0;
1672
1673            dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen, (int)len );
1674        }
1675        else dbgmsg( msgs, "stalled writing block... uploadMax %lu, outlen %lu", uploadMax, outlen );
1676    }
1677
1678    if( !msgs->sendingBlock )
1679    {
1680        struct peer_request req;
1681
1682        if(( EVBUFFER_LENGTH( msgs->outMessages ) ))
1683        {
1684            dbgmsg( msgs, "flushing outMessages..." );
1685            tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1686            msgs->clientSentAnythingAt = now;
1687        }
1688        else if( !EVBUFFER_LENGTH( msgs->outBlock )
1689            && !popNextRequest( msgs, &req )
1690            && requestIsValid( msgs, &req )
1691            && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) )
1692        {
1693            uint8_t * buf = tr_new( uint8_t, req.length );
1694
1695            if( !tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf ) )
1696            {
1697                tr_peerIo * io = msgs->io;
1698                struct evbuffer * out = msgs->outBlock;
1699
1700                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1701                tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 2*sizeof(uint32_t) + req.length );
1702                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1703                tr_peerIoWriteUint32( io, out, req.index );
1704                tr_peerIoWriteUint32( io, out, req.offset );
1705                tr_peerIoWriteBytes ( io, out, buf, req.length );
1706                msgs->sendingBlock = 1;
1707            }
1708
1709            tr_free( buf );
1710        }
1711        else if( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
1712        {
1713            sendKeepalive( msgs );
1714        }
1715    }
1716
1717    return TRUE; /* loop forever */
1718}
1719
1720static void
1721gotError( struct bufferevent * evbuf UNUSED, short what, void * vmsgs )
1722{
1723    if( what & EVBUFFER_TIMEOUT )
1724        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1725    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1726        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1727                what, errno, tr_strerror(errno) );
1728    fireError( vmsgs, TR_ERROR );
1729}
1730
1731static void
1732sendBitfield( tr_peermsgs * msgs )
1733{
1734    const tr_bitfield * bitfield = tr_cpPieceBitfield( msgs->torrent->completion );
1735    struct evbuffer * out = msgs->outMessages;
1736
1737    dbgmsg( msgs, "sending peer a bitfield message" );
1738    tr_peerIoWriteUint32( msgs->io, out, sizeof(uint8_t) + bitfield->len );
1739    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1740    tr_peerIoWriteBytes ( msgs->io, out, bitfield->bits, bitfield->len );
1741}
1742
1743/**
1744***
1745**/
1746
1747/* some peers give us error messages if we send
1748   more than this many peers in a single pex message
1749   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1750#define MAX_PEX_ADDED 50
1751#define MAX_PEX_DROPPED 50
1752
1753typedef struct
1754{
1755    tr_pex * added;
1756    tr_pex * dropped;
1757    tr_pex * elements;
1758    int addedCount;
1759    int droppedCount;
1760    int elementCount;
1761}
1762PexDiffs;
1763
1764static void
1765pexAddedCb( void * vpex, void * userData )
1766{
1767    PexDiffs * diffs = userData;
1768    tr_pex * pex = vpex;
1769    if( diffs->addedCount < MAX_PEX_ADDED )
1770    {
1771        diffs->added[diffs->addedCount++] = *pex;
1772        diffs->elements[diffs->elementCount++] = *pex;
1773    }
1774}
1775
1776static void
1777pexDroppedCb( void * vpex, void * userData )
1778{
1779    PexDiffs * diffs = userData;
1780    tr_pex * pex = vpex;
1781    if( diffs->droppedCount < MAX_PEX_DROPPED )
1782    {
1783        diffs->dropped[diffs->droppedCount++] = *pex;
1784    }
1785}
1786
1787static void
1788pexElementCb( void * vpex, void * userData )
1789{
1790    PexDiffs * diffs = userData;
1791    tr_pex * pex = vpex;
1792    diffs->elements[diffs->elementCount++] = *pex;
1793}
1794
1795static void
1796sendPex( tr_peermsgs * msgs )
1797{
1798    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
1799    {
1800        int i;
1801        tr_pex * newPex = NULL;
1802        const int newCount = tr_peerMgrGetPeers( msgs->handle->peerMgr, msgs->torrent->info.hash, &newPex );
1803        PexDiffs diffs;
1804        tr_benc val, *added, *dropped;
1805        uint8_t *tmp, *walk;
1806        char * benc;
1807        int bencLen;
1808
1809        /* build the diffs */
1810        diffs.added = tr_new( tr_pex, newCount );
1811        diffs.addedCount = 0;
1812        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1813        diffs.droppedCount = 0;
1814        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1815        diffs.elementCount = 0;
1816        tr_set_compare( msgs->pex, msgs->pexCount,
1817                        newPex, newCount,
1818                        tr_pexCompare, sizeof(tr_pex),
1819                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
1820        dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1821
1822        /* update peer */
1823        tr_free( msgs->pex );
1824        msgs->pex = diffs.elements;
1825        msgs->pexCount = diffs.elementCount;
1826
1827        /* build the pex payload */
1828        tr_bencInitDict( &val, 3 );
1829
1830        /* "added" */
1831        added = tr_bencDictAdd( &val, "added" );
1832        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1833        for( i=0; i<diffs.addedCount; ++i ) {
1834            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1835            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1836        }
1837        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1838        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
1839
1840        /* "added.f" */
1841        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1842        for( i=0; i<diffs.addedCount; ++i )
1843            *walk++ = diffs.added[i].flags;
1844        assert( ( walk - tmp ) == diffs.addedCount );
1845        tr_bencDictAddRaw( &val, "added.f", tmp, walk-tmp );
1846
1847        /* "dropped" */
1848        dropped = tr_bencDictAdd( &val, "dropped" );
1849        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1850        for( i=0; i<diffs.droppedCount; ++i ) {
1851            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1852            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1853        }
1854        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1855        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
1856
1857        /* write the pex message */
1858        benc = tr_bencSave( &val, &bencLen );
1859        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 2*sizeof(uint8_t) + bencLen );
1860        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_LTEP );
1861        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, msgs->ut_pex_id );
1862        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, benc, bencLen );
1863
1864        /* cleanup */
1865        tr_free( benc );
1866        tr_bencFree( &val );
1867        tr_free( diffs.added );
1868        tr_free( diffs.dropped );
1869        tr_free( newPex );
1870
1871        msgs->clientSentPexAt = time( NULL );
1872    }
1873}
1874
1875static int
1876pexPulse( void * vpeer )
1877{
1878    sendPex( vpeer );
1879    return TRUE;
1880}
1881
1882/**
1883***
1884**/
1885
1886tr_peermsgs*
1887tr_peerMsgsNew( struct tr_torrent * torrent,
1888                struct tr_peer    * info,
1889                tr_delivery_func    func,
1890                void              * userData,
1891                tr_publisher_tag  * setme )
1892{
1893    tr_peermsgs * m;
1894
1895    assert( info != NULL );
1896    assert( info->io != NULL );
1897
1898    m = tr_new0( tr_peermsgs, 1 );
1899    m->publisher = tr_publisherNew( );
1900    m->info = info;
1901    m->handle = torrent->handle;
1902    m->torrent = torrent;
1903    m->io = info->io;
1904    m->info->clientIsChoked = 1;
1905    m->info->peerIsChoked = 1;
1906    m->info->clientIsInterested = 0;
1907    m->info->peerIsInterested = 0;
1908    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1909    m->state = AWAITING_BT_LENGTH;
1910    m->pulseTimer = tr_timerNew( m->handle, pulse, m, PEER_PULSE_INTERVAL );
1911    m->rateTimer = tr_timerNew( m->handle, ratePulse, m, RATE_PULSE_INTERVAL );
1912    m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL );
1913    m->outMessages = evbuffer_new( );
1914    m->incoming.block = evbuffer_new( );
1915    m->outBlock = evbuffer_new( );
1916    m->peerAllowedPieces = NULL;
1917    m->peerAskedFor = REQUEST_LIST_INIT;
1918    m->peerAskedForFast = REQUEST_LIST_INIT;
1919    m->clientAskedFor = REQUEST_LIST_INIT;
1920    m->clientWillAskFor = REQUEST_LIST_INIT;
1921    *setme = tr_publisherSubscribe( m->publisher, func, userData );
1922
1923    if ( tr_peerIoSupportsLTEP( m->io ) )
1924        sendLtepHandshake( m );
1925
1926    sendBitfield( m );
1927   
1928    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of inactivity */
1929    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
1930    ratePulse( m );
1931
1932    return m;
1933}
1934
1935void
1936tr_peerMsgsFree( tr_peermsgs* msgs )
1937{
1938    if( msgs != NULL )
1939    {
1940        tr_timerFree( &msgs->pulseTimer );
1941        tr_timerFree( &msgs->rateTimer );
1942        tr_timerFree( &msgs->pexTimer );
1943        tr_publisherFree( &msgs->publisher );
1944        reqListClear( &msgs->clientWillAskFor );
1945        reqListClear( &msgs->clientAskedFor );
1946        reqListClear( &msgs->peerAskedForFast );
1947        reqListClear( &msgs->peerAskedFor );
1948        tr_bitfieldFree( msgs->peerAllowedPieces );
1949        evbuffer_free( msgs->incoming.block );
1950        evbuffer_free( msgs->outMessages );
1951        evbuffer_free( msgs->outBlock );
1952        tr_free( msgs->pex );
1953
1954        memset( msgs, ~0, sizeof( tr_peermsgs ) );
1955        tr_free( msgs );
1956    }
1957}
1958
1959tr_publisher_tag
1960tr_peerMsgsSubscribe( tr_peermsgs       * peer,
1961                      tr_delivery_func    func,
1962                      void              * userData )
1963{
1964    return tr_publisherSubscribe( peer->publisher, func, userData );
1965}
1966
1967void
1968tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
1969                        tr_publisher_tag    tag )
1970{
1971    tr_publisherUnsubscribe( peer->publisher, tag );
1972}
Note: See TracBrowser for help on using the repository browser.