source: trunk/libtransmission/peer-msgs.c @ 5640

Last change on this file since 5640 was 5640, checked in by charles, 14 years ago

#881: undo r5631

  • Property svn:keywords set to Date Rev Author Id
File size: 55.3 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 5640 2008-04-18 00:02:04Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <stdio.h>
16#include <stdlib.h>
17#include <string.h>
18#include <libgen.h> /* basename */
19
20#include <event.h>
21
22#include "transmission.h"
23#include "bencode.h"
24#include "completion.h"
25#include "inout.h"
26#include "peer-io.h"
27#include "peer-mgr.h"
28#include "peer-mgr-private.h"
29#include "peer-msgs.h"
30#include "ratecontrol.h"
31#include "stats.h"
32#include "torrent.h"
33#include "trevent.h"
34#include "utils.h"
35
36/**
37***
38**/
39
40enum
41{
42    BT_CHOKE                = 0,
43    BT_UNCHOKE              = 1,
44    BT_INTERESTED           = 2,
45    BT_NOT_INTERESTED       = 3,
46    BT_HAVE                 = 4,
47    BT_BITFIELD             = 5,
48    BT_REQUEST              = 6,
49    BT_PIECE                = 7,
50    BT_CANCEL               = 8,
51    BT_PORT                 = 9,
52    BT_SUGGEST              = 13,
53    BT_HAVE_ALL             = 14,
54    BT_HAVE_NONE            = 15,
55    BT_REJECT               = 16,
56    BT_ALLOWED_FAST         = 17,
57    BT_LTEP                 = 20,
58
59    LTEP_HANDSHAKE          = 0,
60
61    TR_LTEP_PEX             = 1,
62
63    MIN_CHOKE_PERIOD_SEC    = (10),
64
65    /* idle seconds before we send a keepalive */
66    KEEPALIVE_INTERVAL_SECS = 90,
67
68    PEX_INTERVAL            = (90 * 1000), /* msec between sendPex() calls */
69    PEER_PULSE_INTERVAL     = (100),       /* msec between pulse() calls */
70    RATE_PULSE_INTERVAL     = (250),       /* msec between ratePulse() calls */
71
72    MAX_QUEUE_SIZE          = (100),
73    MAX_OUTBUF_SIZE         = (1024),
74     
75    /* (fast peers) max number of pieces we fast-allow to another peer */
76    MAX_FAST_ALLOWED_COUNT   = 10,
77
78    /* (fast peers) max threshold for allowing fast-pieces requests */
79    MAX_FAST_ALLOWED_THRESHOLD = 10,
80
81    /* how long an unsent request can stay queued before it's returned
82       back to the peer-mgr's pool of requests */
83    QUEUED_REQUEST_TTL_SECS = 20,
84
85    /* how long a sent request can stay queued before it's returned
86       back to the peer-mgr's pool of requests */
87    SENT_REQUEST_TTL_SECS = 120
88};
89
90/**
91***  REQUEST MANAGEMENT
92**/
93
94enum
95{
96    AWAITING_BT_LENGTH,
97    AWAITING_BT_ID,
98    AWAITING_BT_MESSAGE,
99    AWAITING_BT_PIECE
100};
101
102struct peer_request
103{
104    uint32_t index;
105    uint32_t offset;
106    uint32_t length;
107    time_t time_requested;
108};
109
110static int
111compareRequest( const void * va, const void * vb )
112{
113    int i;
114    const struct peer_request * a = va;
115    const struct peer_request * b = vb;
116    if(( i = tr_compareUint32( a->index, b->index ))) return i;
117    if(( i = tr_compareUint32( a->offset, b->offset ))) return i;
118    if(( i = tr_compareUint32( a->length, b->length ))) return i;
119    return 0;
120}
121
122struct request_list
123{
124    uint16_t count;
125    uint16_t max;
126    struct peer_request * requests;
127};
128
129static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
130
131static void
132reqListReserve( struct request_list * list, uint16_t max )
133{
134    if( list->max < max )
135    {
136        list->max = max;
137        list->requests = tr_renew( struct peer_request,
138                                   list->requests,
139                                   list->max );
140    }
141}
142
143static void
144reqListClear( struct request_list * list )
145{
146    tr_free( list->requests );
147    *list = REQUEST_LIST_INIT;
148}
149
150static void
151reqListCopy( struct request_list * dest, const struct request_list * src )
152{
153    dest->count = src->count;
154    dest->max = src->max;
155    dest->requests = tr_new( struct peer_request, dest->max );
156    memcpy( dest->requests, src->requests, sizeof( struct peer_request ) * dest->count );
157}
158
159static void
160reqListRemoveOne( struct request_list * list, int i )
161{
162    assert( 0<=i && i<list->count );
163
164    memmove( &list->requests[i],
165             &list->requests[i+1],
166             sizeof( struct peer_request ) * ( --list->count - i ) );
167}
168
169static void
170reqListAppend( struct request_list * list, const struct peer_request * req )
171{
172    if( ++list->count >= list->max )
173        reqListReserve( list, list->max + 8 );
174
175    list->requests[list->count-1] = *req;
176}
177
178static tr_errno
179reqListPop( struct request_list * list, struct peer_request * setme )
180{
181    tr_errno err;
182
183    if( !list->count )
184        err = TR_ERROR;
185    else {
186        *setme = list->requests[0];
187        reqListRemoveOne( list, 0 );
188        err = TR_OK;
189    }
190
191    return err;
192}
193
194static int
195reqListFind( struct request_list * list, const struct peer_request * key )
196{
197    uint16_t i;
198    for( i=0; i<list->count; ++i )
199        if( !compareRequest( key, list->requests+i ) )
200            return i;
201    return -1;
202}
203
204static tr_errno
205reqListRemove( struct request_list * list, const struct peer_request * key )
206{
207    tr_errno err;
208    const int i = reqListFind( list, key );
209
210    if( i < 0 )
211        err = TR_ERROR;
212    else {
213        err = TR_OK;
214        reqListRemoveOne( list, i );
215    }
216
217    return err;
218}
219
220/**
221***
222**/
223
224/* this is raw, unchanged data from the peer regarding
225 * the current message that it's sending us. */
226struct tr_incoming
227{
228    uint8_t id;
229    uint32_t length; /* includes the +1 for id length */
230    struct peer_request blockReq; /* metadata for incoming blocks */
231    struct evbuffer * block; /* piece data for incoming blocks */
232};
233
234struct tr_peermsgs
235{
236    unsigned int peerSentBitfield         : 1;
237    unsigned int peerSupportsPex          : 1;
238    unsigned int clientSentLtepHandshake  : 1;
239    unsigned int peerSentLtepHandshake    : 1;
240    unsigned int sendingBlock             : 1;
241   
242    uint8_t state;
243    uint8_t ut_pex_id;
244    uint16_t pexCount;
245    uint16_t maxActiveRequests;
246    uint16_t minActiveRequests;
247
248    tr_peer * info;
249
250    tr_handle * handle;
251    tr_torrent * torrent;
252    tr_peerIo * io;
253
254    tr_publisher_t * publisher;
255
256    struct evbuffer * outBlock;    /* buffer of the current piece message */
257    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
258
259    struct request_list peerAskedFor;
260    struct request_list peerAskedForFast;
261    struct request_list clientAskedFor;
262    struct request_list clientWillAskFor;
263
264    tr_timer * rateTimer;
265    tr_timer * pulseTimer;
266    tr_timer * pexTimer;
267
268    time_t clientSentPexAt;
269    time_t clientSentAnythingAt;
270   
271    tr_bitfield * peerAllowedPieces;
272
273    struct tr_incoming incoming; 
274
275    tr_pex * pex;
276};
277
278/**
279***
280**/
281
282static void
283myDebug( const char * file, int line,
284         const struct tr_peermsgs * msgs,
285         const char * fmt, ... )
286{
287    FILE * fp = tr_getLog( );
288    if( fp != NULL )
289    {
290        va_list args;
291        char timestr[64];
292        struct evbuffer * buf = evbuffer_new( );
293        char * myfile = tr_strdup( file );
294
295        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
296                             tr_getLogTimeStr( timestr, sizeof(timestr) ),
297                             msgs->torrent->info.name,
298                             tr_peerIoGetAddrStr( msgs->io ),
299                             msgs->info->client );
300        va_start( args, fmt );
301        evbuffer_add_vprintf( buf, fmt, args );
302        va_end( args );
303        evbuffer_add_printf( buf, " (%s:%d)\n", basename(myfile), line );
304        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
305
306        tr_free( myfile );
307        evbuffer_free( buf );
308    }
309}
310
311#define dbgmsg(msgs, fmt...) myDebug(__FILE__, __LINE__, msgs, ##fmt )
312
313/**
314***
315**/
316
317static void
318protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
319{
320    tr_peerIo * io = msgs->io;
321    struct evbuffer * out = msgs->outMessages;
322
323    dbgmsg( msgs, "requesting %u:%u->%u", req->index, req->offset, req->length );
324    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
325    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
326    tr_peerIoWriteUint32( io, out, req->index );
327    tr_peerIoWriteUint32( io, out, req->offset );
328    tr_peerIoWriteUint32( io, out, req->length );
329}
330
331static void
332protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
333{
334    tr_peerIo * io = msgs->io;
335    struct evbuffer * out = msgs->outMessages;
336
337    dbgmsg( msgs, "cancelling %u:%u->%u", req->index, req->offset, req->length );
338    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
339    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
340    tr_peerIoWriteUint32( io, out, req->index );
341    tr_peerIoWriteUint32( io, out, req->offset );
342    tr_peerIoWriteUint32( io, out, req->length );
343}
344
345static void
346protocolSendHave( tr_peermsgs * msgs, uint32_t index )
347{
348    tr_peerIo * io = msgs->io;
349    struct evbuffer * out = msgs->outMessages;
350
351    dbgmsg( msgs, "sending Have %u", index );
352    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
353    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
354    tr_peerIoWriteUint32( io, out, index );
355}
356
357static void
358protocolSendChoke( tr_peermsgs * msgs, int choke )
359{
360    tr_peerIo * io = msgs->io;
361    struct evbuffer * out = msgs->outMessages;
362
363    dbgmsg( msgs, "sending %s", (choke ? "Choke" : "Unchoke") );
364    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) );
365    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
366}
367
368/**
369***  EVENTS
370**/
371
372static const tr_peermsgs_event blankEvent = { 0, 0, 0, 0, 0.0f, 0 };
373
374static void
375publish( tr_peermsgs * msgs, tr_peermsgs_event * e )
376{
377    tr_publisherPublish( msgs->publisher, msgs->info, e );
378}
379
380static void
381fireError( tr_peermsgs * msgs, tr_errno err )
382{
383    tr_peermsgs_event e = blankEvent;
384    e.eventType = TR_PEERMSG_ERROR;
385    e.err = err;
386    publish( msgs, &e );
387}
388
389static void
390fireNeedReq( tr_peermsgs * msgs )
391{
392    tr_peermsgs_event e = blankEvent;
393    e.eventType = TR_PEERMSG_NEED_REQ;
394    publish( msgs, &e );
395}
396
397static void
398firePeerProgress( tr_peermsgs * msgs )
399{
400    tr_peermsgs_event e = blankEvent;
401    e.eventType = TR_PEERMSG_PEER_PROGRESS;
402    e.progress = msgs->info->progress;
403    publish( msgs, &e );
404}
405
406static void
407fireClientHave( tr_peermsgs * msgs, uint32_t pieceIndex )
408{
409    tr_peermsgs_event e = blankEvent;
410    e.eventType = TR_PEERMSG_CLIENT_HAVE;
411    e.pieceIndex = pieceIndex;
412    publish( msgs, &e );
413}
414
415static void
416fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
417{
418    tr_peermsgs_event e = blankEvent;
419    e.eventType = TR_PEERMSG_CLIENT_BLOCK;
420    e.pieceIndex = req->index;
421    e.offset = req->offset;
422    e.length = req->length;
423    publish( msgs, &e );
424}
425
426static void
427firePieceData( tr_peermsgs * msgs )
428{
429    tr_peermsgs_event e = blankEvent;
430    e.eventType = TR_PEERMSG_PIECE_DATA;
431    publish( msgs, &e );
432}
433
434static void
435fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
436{
437    tr_peermsgs_event e = blankEvent;
438    e.eventType = TR_PEERMSG_CANCEL;
439    e.pieceIndex = req->index;
440    e.offset = req->offset;
441    e.length = req->length;
442    publish( msgs, &e );
443}
444
445/**
446***  INTEREST
447**/
448
449static int
450isPieceInteresting( const tr_peermsgs   * peer,
451                    int                   piece )
452{
453    const tr_torrent * torrent = peer->torrent;
454
455    return ( ( !torrent->info.pieces[piece].dnd )               /* we want it */
456          && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have */
457          && ( tr_bitfieldHas( peer->info->have, piece ) ) );  /* peer has it */
458}
459
460/* "interested" means we'll ask for piece data if they unchoke us */
461static int
462isPeerInteresting( const tr_peermsgs * msgs )
463{
464    tr_piece_index_t i;
465    const tr_torrent * torrent;
466    const tr_bitfield * bitfield;
467    const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
468
469    if( clientIsSeed )
470        return FALSE;
471
472    torrent = msgs->torrent;
473    bitfield = tr_cpPieceBitfield( torrent->completion );
474
475    if( !msgs->info->have )
476        return TRUE;
477
478    assert( bitfield->len == msgs->info->have->len );
479    for( i=0; i<torrent->info.pieceCount; ++i )
480        if( isPieceInteresting( msgs, i ) )
481            return TRUE;
482
483    return FALSE;
484}
485
486static void
487sendInterest( tr_peermsgs * msgs, int weAreInterested )
488{
489    assert( msgs != NULL );
490    assert( weAreInterested==0 || weAreInterested==1 );
491
492    msgs->info->clientIsInterested = weAreInterested;
493    dbgmsg( msgs, "Sending %s",
494            weAreInterested ? "Interested" : "Not Interested");
495
496    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
497    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages,
498                   weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
499}
500
501static void
502updateInterest( tr_peermsgs * msgs )
503{
504    const int i = isPeerInteresting( msgs );
505    if( i != msgs->info->clientIsInterested )
506        sendInterest( msgs, i );
507    if( i )
508        fireNeedReq( msgs );
509}
510
511static void
512cancelAllRequestsToClientExceptFast( tr_peermsgs * msgs )
513{
514    reqListClear( &msgs->peerAskedFor );
515}
516
517void
518tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
519{
520    const time_t fibrillationTime = time(NULL) - MIN_CHOKE_PERIOD_SEC;
521
522    assert( msgs != NULL );
523    assert( msgs->info != NULL );
524    assert( choke==0 || choke==1 );
525
526    if( msgs->info->chokeChangedAt > fibrillationTime )
527    {
528        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
529    }
530    else if( msgs->info->peerIsChoked != choke )
531    {
532        msgs->info->peerIsChoked = choke;
533        if( choke )
534            cancelAllRequestsToClientExceptFast( msgs );
535        protocolSendChoke( msgs, choke );
536        msgs->info->chokeChangedAt = time( NULL );
537    }
538}
539
540/**
541***
542**/
543
544void
545tr_peerMsgsHave( tr_peermsgs * msgs,
546                 uint32_t      index )
547{
548    protocolSendHave( msgs, index );
549
550    /* since we have more pieces now, we might not be interested in this peer */
551    updateInterest( msgs );
552}
553#if 0
554static void
555sendFastSuggest( tr_peermsgs * msgs,
556                 uint32_t      pieceIndex )
557{
558    assert( msgs != NULL );
559   
560    if( tr_peerIoSupportsFEXT( msgs->io ) )
561    {
562        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
563        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
564        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
565    }
566}
567static void
568sendFastHave( tr_peermsgs * msgs, int all )
569{
570    assert( msgs != NULL );
571   
572    if( tr_peerIoSupportsFEXT( msgs->io ) )
573    {
574        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
575        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, ( all ? BT_HAVE_ALL
576                                                                : BT_HAVE_NONE ) );
577        updateInterest( msgs );
578    }
579}
580#endif
581
582static void
583sendFastReject( tr_peermsgs * msgs,
584                uint32_t      pieceIndex,
585                uint32_t      offset,
586                uint32_t      length )
587{
588    assert( msgs != NULL );
589
590    if( tr_peerIoSupportsFEXT( msgs->io ) )
591    {
592        const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
593        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
594        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_REJECT );
595        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
596        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
597        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
598    }
599}
600
601static tr_bitfield*
602getPeerAllowedPieces( tr_peermsgs * msgs )
603{
604    if( !msgs->peerAllowedPieces && tr_peerIoSupportsFEXT( msgs->io ) )
605    {
606        msgs->peerAllowedPieces = tr_peerMgrGenerateAllowedSet(
607            MAX_FAST_ALLOWED_COUNT,
608            msgs->torrent->info.pieceCount,
609            msgs->torrent->info.hash,
610            tr_peerIoGetAddress( msgs->io, NULL ) );
611    }
612
613    return msgs->peerAllowedPieces;
614}
615
616static void
617sendFastAllowed( tr_peermsgs * msgs,
618                 uint32_t      pieceIndex)
619{
620    assert( msgs != NULL );
621   
622    if( tr_peerIoSupportsFEXT( msgs->io ) )
623    {
624        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
625        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_ALLOWED_FAST );
626        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
627    }
628}
629
630static void
631sendFastAllowedSet( tr_peermsgs * msgs )
632{
633    tr_piece_index_t i = 0;
634
635    while (i <= msgs->torrent->info.pieceCount )
636    {
637        if ( tr_bitfieldHas( getPeerAllowedPieces( msgs ), i) )
638            sendFastAllowed( msgs, i );
639        i++;
640    }
641}
642
643static void
644maybeSendFastAllowedSet( tr_peermsgs * msgs )
645{
646    if( tr_bitfieldCountTrueBits( msgs->info->have ) <= MAX_FAST_ALLOWED_THRESHOLD )
647        sendFastAllowedSet( msgs );
648}
649
650
651/**
652***
653**/
654
655static int
656reqIsValid( const tr_peermsgs   * msgs,
657            uint32_t              index,
658            uint32_t              offset,
659            uint32_t              length )
660{
661    return tr_torrentReqIsValid( msgs->torrent, index, offset, length );
662}
663
664static int
665requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
666{
667    return reqIsValid( msgs, req->index, req->offset, req->length );
668}
669
670static void
671expireOldRequests( tr_peermsgs * msgs )
672{
673    int i;
674    time_t oldestAllowed;
675    struct request_list tmp = REQUEST_LIST_INIT;
676
677    /* cancel requests that have been queued for too long */
678    oldestAllowed = time( NULL ) - QUEUED_REQUEST_TTL_SECS;
679    reqListCopy( &tmp, &msgs->clientWillAskFor );
680    for( i=0; i<tmp.count; ++i ) {
681        const struct peer_request * req = &tmp.requests[i];
682        if( req->time_requested < oldestAllowed )
683            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
684    }
685    reqListClear( &tmp );
686
687    /* cancel requests that were sent too long ago */
688    oldestAllowed = time( NULL ) - SENT_REQUEST_TTL_SECS;
689    reqListCopy( &tmp, &msgs->clientAskedFor );
690    for( i=0; i<tmp.count; ++i ) {
691        const struct peer_request * req = &tmp.requests[i];
692        if( req->time_requested < oldestAllowed )
693            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
694    }
695    reqListClear( &tmp );
696}
697
698static void
699pumpRequestQueue( tr_peermsgs * msgs )
700{
701    const int max = msgs->maxActiveRequests;
702    const int min = msgs->minActiveRequests;
703    const time_t now = time( NULL );
704    int sent = 0;
705    int count = msgs->clientAskedFor.count;
706    struct peer_request req;
707
708    if( count > min )
709        return;
710    if( msgs->info->clientIsChoked )
711        return;
712
713    while( ( count < max ) && !reqListPop( &msgs->clientWillAskFor, &req ) )
714    {
715        assert( requestIsValid( msgs, &req ) );
716        assert( tr_bitfieldHas( msgs->info->have, req.index ) );
717
718        protocolSendRequest( msgs, &req );
719        req.time_requested = now;
720        reqListAppend( &msgs->clientAskedFor, &req );
721
722        ++count;
723        ++sent;
724    }
725
726    if( sent )
727        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
728                sent,
729                msgs->clientAskedFor.count,
730                msgs->clientWillAskFor.count );
731
732    if( count < max )
733        fireNeedReq( msgs );
734}
735
736static int
737pulse( void * vmsgs );
738
739int
740tr_peerMsgsAddRequest( tr_peermsgs * msgs,
741                       uint32_t      index, 
742                       uint32_t      offset, 
743                       uint32_t      length )
744{
745    const int req_max = msgs->maxActiveRequests;
746    struct peer_request req;
747
748    assert( msgs != NULL );
749    assert( msgs->torrent != NULL );
750    assert( reqIsValid( msgs, index, offset, length ) );
751
752    /**
753    ***  Reasons to decline the request
754    **/
755
756    /* don't send requests to choked clients */
757    if( msgs->info->clientIsChoked ) {
758        dbgmsg( msgs, "declining request because they're choking us" );
759        return TR_ADDREQ_CLIENT_CHOKED;
760    }
761
762    /* peer doesn't have this piece */
763    if( !tr_bitfieldHas( msgs->info->have, index ) )
764        return TR_ADDREQ_MISSING;
765
766    /* peer's queue is full */
767    if( msgs->clientWillAskFor.count >= req_max ) {
768        dbgmsg( msgs, "declining request because we're full" );
769        return TR_ADDREQ_FULL;
770    }
771
772    /* have we already asked for this piece? */
773    req.index = index;
774    req.offset = offset;
775    req.length = length;
776    if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
777        dbgmsg( msgs, "declining because it's a duplicate" );
778        return TR_ADDREQ_DUPLICATE;
779    }
780    if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
781        dbgmsg( msgs, "declining because it's a duplicate" );
782        return TR_ADDREQ_DUPLICATE;
783    }
784
785    /**
786    ***  Accept this request
787    **/
788
789    dbgmsg( msgs, "added req for piece %d, offset %d", (int)index, (int)offset );
790    req.time_requested = time( NULL );
791    reqListAppend( &msgs->clientWillAskFor, &req );
792    return TR_ADDREQ_OK;
793}
794
795static void
796cancelAllRequestsToPeer( tr_peermsgs * msgs )
797{
798    int i;
799    struct request_list a = msgs->clientWillAskFor;
800    struct request_list b = msgs->clientAskedFor;
801
802    msgs->clientAskedFor = REQUEST_LIST_INIT;
803    msgs->clientWillAskFor = REQUEST_LIST_INIT;
804
805    for( i=0; i<a.count; ++i )
806        fireCancelledReq( msgs, &a.requests[i] );
807
808    for( i=0; i<b.count; ++i ) {
809        fireCancelledReq( msgs, &b.requests[i] );
810        protocolSendCancel( msgs, &b.requests[i] );
811    }
812
813    reqListClear( &a );
814    reqListClear( &b );
815}
816
817void
818tr_peerMsgsCancel( tr_peermsgs * msgs,
819                   uint32_t      pieceIndex,
820                   uint32_t      offset,
821                   uint32_t      length )
822{
823    struct peer_request req;
824
825    assert( msgs != NULL );
826    assert( length > 0 );
827
828    /* have we asked the peer for this piece? */
829    req.index = pieceIndex;
830    req.offset = offset;
831    req.length = length;
832
833    /* if it's only in the queue and hasn't been sent yet, free it */
834    if( !reqListRemove( &msgs->clientWillAskFor, &req ) )
835        fireCancelledReq( msgs, &req );
836
837    /* if it's already been sent, send a cancel message too */
838    if( !reqListRemove( &msgs->clientAskedFor, &req ) ) {
839        protocolSendCancel( msgs, &req );
840        fireCancelledReq( msgs, &req );
841    }
842}
843
844/**
845***
846**/
847
848static void
849sendLtepHandshake( tr_peermsgs * msgs )
850{
851    tr_benc val, *m;
852    char * buf;
853    int len;
854    int pex;
855    struct evbuffer * outbuf;
856
857    if( msgs->clientSentLtepHandshake )
858        return;
859
860    outbuf = evbuffer_new( );
861    dbgmsg( msgs, "sending an ltep handshake" );
862    msgs->clientSentLtepHandshake = 1;
863
864    /* decide if we want to advertise pex support */
865    if( !tr_torrentAllowsPex( msgs->torrent ) )
866        pex = 0;
867    else if( msgs->peerSentLtepHandshake )
868        pex = msgs->peerSupportsPex ? 1 : 0;
869    else
870        pex = 1;
871
872    tr_bencInitDict( &val, 4 );
873    tr_bencDictAddInt( &val, "e", msgs->handle->encryptionMode != TR_PLAINTEXT_PREFERRED );
874    tr_bencDictAddInt( &val, "p", tr_getPublicPort( msgs->handle ) );
875    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
876    m  = tr_bencDictAddDict( &val, "m", 1 );
877    if( pex )
878        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
879    buf = tr_bencSave( &val, &len );
880
881    tr_peerIoWriteUint32( msgs->io, outbuf, 2*sizeof(uint8_t) + len );
882    tr_peerIoWriteUint8 ( msgs->io, outbuf, BT_LTEP );
883    tr_peerIoWriteUint8 ( msgs->io, outbuf, LTEP_HANDSHAKE );
884    tr_peerIoWriteBytes ( msgs->io, outbuf, buf, len );
885
886    tr_peerIoWriteBuf( msgs->io, outbuf );
887
888    dbgmsg( msgs, "here is the ltep handshake we sent [%*.*s]", len, len, buf );
889
890    /* cleanup */
891    tr_bencFree( &val );
892    tr_free( buf );
893    evbuffer_free( outbuf );
894}
895
896static void
897parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
898{
899    tr_benc val, * sub;
900    uint8_t * tmp = tr_new( uint8_t, len );
901
902    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
903    msgs->peerSentLtepHandshake = 1;
904
905    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
906        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
907        tr_free( tmp );
908        return;
909    }
910
911    dbgmsg( msgs, "here is the ltep handshake we got [%*.*s]", len, len, tmp );
912
913    /* does the peer prefer encrypted connections? */
914    if(( sub = tr_bencDictFindType( &val, "e", TYPE_INT )))
915        msgs->info->encryption_preference = sub->val.i
916                                      ? ENCRYPTION_PREFERENCE_YES
917                                      : ENCRYPTION_PREFERENCE_NO;
918
919    /* check supported messages for utorrent pex */
920    msgs->peerSupportsPex = 0;
921    if(( sub = tr_bencDictFindType( &val, "m", TYPE_DICT ))) {
922        if(( sub = tr_bencDictFindType( sub, "ut_pex", TYPE_INT ))) {
923            msgs->ut_pex_id = (uint8_t) sub->val.i;
924            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
925            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
926        }
927    }
928
929    /* get peer's listening port */
930    if(( sub = tr_bencDictFindType( &val, "p", TYPE_INT ))) {
931        msgs->info->port = htons( (uint16_t)sub->val.i );
932        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
933    }
934
935    tr_bencFree( &val );
936    tr_free( tmp );
937}
938
939static void
940parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
941{
942    int loaded = 0;
943    uint8_t * tmp = tr_new( uint8_t, msglen );
944    tr_benc val, *sub;
945    const tr_torrent * tor = msgs->torrent;
946    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
947
948    if( tr_torrentAllowsPex( tor )
949        && (( loaded = !tr_bencLoad( tmp, msglen, &val, NULL )))
950        && (( sub = tr_bencDictFindType( &val, "added", TYPE_STR ))))
951    {
952        const int n = sub->val.s.i / 6 ;
953        if( n )
954            tr_tordbg( tor, "Got %d peers from peer exchange", n );
955        tr_peerMgrAddPeers( msgs->handle->peerMgr,
956                            tor->info.hash,
957                            TR_PEER_FROM_PEX,
958                            (uint8_t*)sub->val.s.s, n );
959    }
960
961    if( loaded )
962        tr_bencFree( &val );
963    tr_free( tmp );
964}
965
966static void
967sendPex( tr_peermsgs * msgs );
968
969static void
970parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
971{
972    uint8_t ltep_msgid;
973
974    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
975    msglen--;
976
977    if( ltep_msgid == LTEP_HANDSHAKE )
978    {
979        dbgmsg( msgs, "got ltep handshake" );
980        parseLtepHandshake( msgs, msglen, inbuf );
981        if( tr_peerIoSupportsLTEP( msgs->io ) )
982        {
983            sendLtepHandshake( msgs );
984            sendPex( msgs );
985        }
986    }
987    else if( ltep_msgid == TR_LTEP_PEX )
988    {
989        dbgmsg( msgs, "got ut pex" );
990        msgs->peerSupportsPex = 1;
991        parseUtPex( msgs, msglen, inbuf );
992    }
993    else
994    {
995        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
996        evbuffer_drain( inbuf, msglen );
997    }
998}
999
1000static int
1001readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1002{
1003    uint32_t len;
1004
1005    if( inlen < sizeof(len) )
1006        return READ_MORE;
1007
1008    tr_peerIoReadUint32( msgs->io, inbuf, &len );
1009
1010    if( len == 0 ) /* peer sent us a keepalive message */
1011        dbgmsg( msgs, "got KeepAlive" );
1012    else {
1013        msgs->incoming.length = len;
1014        msgs->state = AWAITING_BT_ID;
1015    }
1016
1017    return READ_AGAIN;
1018}
1019
1020static int
1021readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen );
1022
1023static int
1024readBtId( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1025{
1026    uint8_t id;
1027
1028    if( inlen < sizeof(uint8_t) )
1029        return READ_MORE;
1030
1031    tr_peerIoReadUint8( msgs->io, inbuf, &id );
1032    msgs->incoming.id = id;
1033
1034    if( id==BT_PIECE )
1035    {
1036        msgs->state = AWAITING_BT_PIECE;
1037        return READ_AGAIN;
1038    }
1039    else if( msgs->incoming.length != 1 )
1040    {
1041        msgs->state = AWAITING_BT_MESSAGE;
1042        return READ_AGAIN;
1043    }
1044    else return readBtMessage( msgs, inbuf, inlen-1 );
1045}
1046
1047static void
1048updatePeerProgress( tr_peermsgs * msgs )
1049{
1050    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have )
1051                         / (float)msgs->torrent->info.pieceCount;
1052    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
1053    updateInterest( msgs );
1054    firePeerProgress( msgs );
1055}
1056
1057static int
1058clientCanSendFastBlock( const tr_peermsgs * msgs UNUSED )
1059{
1060    /* don't send a fast piece if peer has MAX_FAST_ALLOWED_THRESHOLD pieces */
1061    if( tr_bitfieldCountTrueBits( msgs->info->have ) > MAX_FAST_ALLOWED_THRESHOLD )
1062        return FALSE;
1063   
1064    /* ...or if we don't have ourself enough pieces */
1065    if( tr_bitfieldCountTrueBits( tr_cpPieceBitfield( msgs->torrent->completion ) ) < MAX_FAST_ALLOWED_THRESHOLD )
1066        return FALSE;
1067
1068    /* Maybe a bandwidth limit ? */
1069    return TRUE;
1070}
1071
1072static void
1073peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
1074{
1075    const int reqIsValid = requestIsValid( msgs, req );
1076    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
1077    const int peerIsChoked = msgs->info->peerIsChoked;
1078    const int peerIsFast = tr_peerIoSupportsFEXT( msgs->io );
1079    const int pieceIsFast = reqIsValid && tr_bitfieldHas( getPeerAllowedPieces( msgs ), req->index );
1080    const int canSendFast = clientCanSendFastBlock( msgs );
1081
1082    if( !reqIsValid ) /* bad request */
1083    {
1084        dbgmsg( msgs, "rejecting an invalid request." );
1085        sendFastReject( msgs, req->index, req->offset, req->length );
1086    }
1087    else if( !clientHasPiece ) /* we don't have it */
1088    {
1089        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1090        sendFastReject( msgs, req->index, req->offset, req->length );
1091    }
1092    else if( peerIsChoked && !peerIsFast ) /* doesn't he know he's choked? */
1093    {
1094        tr_peerMsgsSetChoke( msgs, 1 );
1095        sendFastReject( msgs, req->index, req->offset, req->length );
1096    }
1097    else if( peerIsChoked && peerIsFast && ( !pieceIsFast || !canSendFast ) )
1098    {
1099        sendFastReject( msgs, req->index, req->offset, req->length );
1100    }
1101    else /* YAY */
1102    {
1103        if( peerIsFast && pieceIsFast )
1104            reqListAppend( &msgs->peerAskedForFast, req );
1105        else
1106            reqListAppend( &msgs->peerAskedFor, req );
1107    }
1108}
1109
1110static int
1111messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1112{
1113    switch( id )
1114    {
1115        case BT_CHOKE:
1116        case BT_UNCHOKE:
1117        case BT_INTERESTED:
1118        case BT_NOT_INTERESTED:
1119        case BT_HAVE_ALL:
1120        case BT_HAVE_NONE:
1121            return len==1;
1122
1123        case BT_HAVE:
1124        case BT_SUGGEST:
1125        case BT_ALLOWED_FAST:
1126            return len==5;
1127
1128        case BT_BITFIELD:
1129            return len == (msg->torrent->info.pieceCount+7u)/8u + 1u;
1130       
1131        case BT_REQUEST:
1132        case BT_CANCEL:
1133        case BT_REJECT:
1134            return len==13;
1135
1136        case BT_PIECE:
1137            return len>9 && len<=16393;
1138
1139        case BT_PORT:
1140            return len==3;
1141
1142        case BT_LTEP:
1143            return len >= 2;
1144
1145        default:
1146            return FALSE;
1147    }
1148}
1149
1150static int
1151clientGotBlock( tr_peermsgs * msgs, const uint8_t * block, const struct peer_request * req );
1152
1153static void
1154clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1155{
1156    const time_t now = time( NULL );
1157    tr_torrent * tor = msgs->torrent;
1158    tor->activityDate = tr_date( );
1159    tor->downloadedCur += byteCount;
1160    msgs->info->pieceDataActivityDate = now;
1161    msgs->info->credit += (int)(byteCount * SWIFT_REPAYMENT_RATIO);
1162    tr_rcTransferred( msgs->info->rcToClient, byteCount );
1163    tr_rcTransferred( tor->download, byteCount );
1164    tr_rcTransferred( tor->handle->download, byteCount );
1165    tr_statsAddDownloaded( msgs->handle, byteCount );
1166    firePieceData( msgs );
1167}
1168
1169static int
1170readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1171{
1172    struct peer_request * req = &msgs->incoming.blockReq;
1173    assert( EVBUFFER_LENGTH(inbuf) >= inlen );
1174    dbgmsg( msgs, "In readBtPiece" );
1175
1176    if( !req->length )
1177    {
1178        if( inlen < 8 )
1179            return READ_MORE;
1180
1181        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
1182        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
1183        req->length = msgs->incoming.length - 9;
1184        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1185        return READ_AGAIN;
1186    }
1187    else
1188    {
1189        int err;
1190
1191        /* read in another chunk of data */
1192        const size_t nLeft = req->length - EVBUFFER_LENGTH(msgs->incoming.block);
1193        size_t n = MIN( nLeft, inlen );
1194        uint8_t * buf = tr_new( uint8_t, n );
1195        assert( EVBUFFER_LENGTH(inbuf) >= n );
1196        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
1197        evbuffer_add( msgs->incoming.block, buf, n );
1198        clientGotBytes( msgs, n );
1199        tr_free( buf );
1200        dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
1201               (int)n, req->index, req->offset, req->length,
1202               (int)( req->length - EVBUFFER_LENGTH(msgs->incoming.block) ) );
1203        if( EVBUFFER_LENGTH(msgs->incoming.block) < req->length )
1204            return READ_MORE;
1205
1206        /* we've got the whole block ... process it */
1207        err = clientGotBlock( msgs, EVBUFFER_DATA(msgs->incoming.block), req );
1208
1209        /* cleanup */
1210        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH(msgs->incoming.block) );
1211        req->length = 0;
1212        msgs->state = AWAITING_BT_LENGTH;
1213        if( !err )
1214            return READ_AGAIN;
1215        else {
1216            fireError( msgs, err );
1217            return READ_DONE;
1218        }
1219    }
1220}
1221
1222static int
1223readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1224{
1225    uint32_t ui32;
1226    uint32_t msglen = msgs->incoming.length;
1227    const uint8_t id = msgs->incoming.id;
1228    const size_t startBufLen = EVBUFFER_LENGTH( inbuf );
1229
1230    --msglen; /* id length */
1231
1232    if( inlen < msglen )
1233        return READ_MORE;
1234
1235    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id, (int)msglen, (int)inlen );
1236
1237    if( !messageLengthIsCorrect( msgs, id, msglen+1 ) )
1238    {
1239        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1240        fireError( msgs, TR_ERROR );
1241        return READ_DONE;
1242    }
1243
1244    switch( id )
1245    {
1246        case BT_CHOKE:
1247            dbgmsg( msgs, "got Choke" );
1248            msgs->info->clientIsChoked = 1;
1249            cancelAllRequestsToPeer( msgs );
1250            cancelAllRequestsToClientExceptFast( msgs );
1251            break;
1252
1253        case BT_UNCHOKE:
1254            dbgmsg( msgs, "got Unchoke" );
1255            msgs->info->clientIsChoked = 0;
1256            fireNeedReq( msgs );
1257            break;
1258
1259        case BT_INTERESTED:
1260            dbgmsg( msgs, "got Interested" );
1261            msgs->info->peerIsInterested = 1;
1262            tr_peerMsgsSetChoke( msgs, 0 );
1263            break;
1264
1265        case BT_NOT_INTERESTED:
1266            dbgmsg( msgs, "got Not Interested" );
1267            msgs->info->peerIsInterested = 0;
1268            break;
1269           
1270        case BT_HAVE:
1271            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1272            dbgmsg( msgs, "got Have: %u", ui32 );
1273            if( tr_bitfieldAdd( msgs->info->have, ui32 ) )
1274                fireError( msgs, TR_ERROR_PEER_MESSAGE );
1275            updatePeerProgress( msgs );
1276            tr_rcTransferred( msgs->torrent->swarmspeed, msgs->torrent->info.pieceSize );
1277            break;
1278
1279        case BT_BITFIELD: {
1280            const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
1281            int peerIsSeed;
1282            dbgmsg( msgs, "got a bitfield" );
1283            msgs->peerSentBitfield = 1;
1284            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
1285            updatePeerProgress( msgs );
1286            maybeSendFastAllowedSet( msgs );
1287            peerIsSeed = msgs->info->progress >= 1.0;
1288            tr_peerMsgsSetChoke( msgs, clientIsSeed && peerIsSeed );
1289            fireNeedReq( msgs );
1290            break;
1291        }
1292
1293        case BT_REQUEST: {
1294            struct peer_request r;
1295            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1296            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1297            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1298            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1299            peerMadeRequest( msgs, &r );
1300            break;
1301        }
1302
1303        case BT_CANCEL: {
1304            struct peer_request r;
1305            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1306            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1307            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1308            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1309            reqListRemove( &msgs->peerAskedForFast, &r );
1310            reqListRemove( &msgs->peerAskedFor, &r );
1311            break;
1312        }
1313
1314        case BT_PIECE:
1315            assert( 0 ); /* handled elsewhere! */
1316            break;
1317       
1318        case BT_PORT:
1319            dbgmsg( msgs, "Got a BT_PORT" );
1320            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1321            break;
1322           
1323        case BT_SUGGEST: {
1324            dbgmsg( msgs, "Got a BT_SUGGEST" );
1325            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1326            /* we don't do anything with this yet */
1327            break;
1328        }
1329           
1330        case BT_HAVE_ALL:
1331            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1332            tr_bitfieldAddRange( msgs->info->have, 0, msgs->torrent->info.pieceCount );
1333            updatePeerProgress( msgs );
1334            maybeSendFastAllowedSet( msgs );
1335            break;
1336       
1337       
1338        case BT_HAVE_NONE:
1339            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1340            tr_bitfieldClear( msgs->info->have );
1341            updatePeerProgress( msgs );
1342            maybeSendFastAllowedSet( msgs );
1343            break;
1344       
1345        case BT_REJECT: {
1346            struct peer_request r;
1347            dbgmsg( msgs, "Got a BT_REJECT" );
1348            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1349            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1350            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1351            reqListRemove( &msgs->clientAskedFor, &r );
1352            break;
1353        }
1354
1355        case BT_ALLOWED_FAST: {
1356            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1357            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1358            /* we don't do anything with this yet */
1359            break;
1360        }
1361
1362        case BT_LTEP:
1363            dbgmsg( msgs, "Got a BT_LTEP" );
1364            parseLtep( msgs, msglen, inbuf );
1365            break;
1366
1367        default:
1368            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1369            tr_peerIoDrain( msgs->io, inbuf, msglen );
1370            break;
1371    }
1372
1373    assert( msglen + 1 == msgs->incoming.length );
1374    assert( EVBUFFER_LENGTH(inbuf) == startBufLen - msglen );
1375
1376    msgs->state = AWAITING_BT_LENGTH;
1377    return READ_AGAIN;
1378}
1379
1380static void
1381peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1382{
1383    const time_t now = time( NULL );
1384    tr_torrent * tor = msgs->torrent;
1385    tor->activityDate = tr_date( );
1386    tor->uploadedCur += byteCount;
1387    msgs->info->pieceDataActivityDate = now;
1388    msgs->info->credit -= byteCount;
1389    tr_rcTransferred( msgs->info->rcToPeer, byteCount );
1390    tr_rcTransferred( tor->upload, byteCount );
1391    tr_rcTransferred( tor->handle->upload, byteCount );
1392    tr_statsAddUploaded( msgs->handle, byteCount );
1393    firePieceData( msgs );
1394}
1395
1396static size_t
1397getDownloadMax( const tr_peermsgs * msgs )
1398{
1399    static const size_t maxval = ~0;
1400    const tr_torrent * tor = msgs->torrent;
1401
1402    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1403        return tor->handle->useDownloadLimit
1404            ? tr_rcBytesLeft( tor->handle->download ) : maxval;
1405
1406    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
1407        return tr_rcBytesLeft( tor->download );
1408
1409    return maxval;
1410}
1411
1412static void
1413decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount )
1414{
1415    tr_torrent * tor = msgs->torrent;
1416    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1417}
1418
1419static void
1420reassignBytesToCorrupt( tr_peermsgs * msgs, uint32_t byteCount )
1421{
1422    tr_torrent * tor = msgs->torrent;
1423
1424    tor->corruptCur += byteCount;
1425
1426    decrementDownloadedCount( msgs, byteCount );
1427}
1428
1429static void
1430gotBadPiece( tr_peermsgs * msgs, tr_piece_index_t pieceIndex )
1431{
1432    const uint32_t byteCount =
1433        tr_torPieceCountBytes( msgs->torrent, pieceIndex );
1434    reassignBytesToCorrupt( msgs, byteCount );
1435}
1436
1437static void
1438clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1439{
1440    decrementDownloadedCount( msgs, req->length );
1441}
1442
1443static void
1444addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1445{
1446    if( !msgs->info->blame )
1447         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1448    tr_bitfieldAdd( msgs->info->blame, index );
1449}
1450
1451static tr_errno
1452clientGotBlock( tr_peermsgs                * msgs,
1453                const uint8_t              * data,
1454                const struct peer_request  * req )
1455{
1456    int err;
1457    tr_torrent * tor = msgs->torrent;
1458    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1459
1460    assert( msgs != NULL );
1461    assert( req != NULL );
1462
1463    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) )
1464    {
1465        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1466                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1467        return TR_ERROR;
1468    }
1469
1470    /* save the block */
1471    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1472
1473    /**
1474    *** Remove the block from our `we asked for this' list
1475    **/
1476
1477    if( reqListRemove( &msgs->clientAskedFor, req ) )
1478    {
1479        clientGotUnwantedBlock( msgs, req );
1480        dbgmsg( msgs, "we didn't ask for this message..." );
1481        return 0;
1482    }
1483
1484    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1485                  msgs->clientAskedFor.count );
1486
1487    /**
1488    *** Error checks
1489    **/
1490
1491    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1492        dbgmsg( msgs, "we have this block already..." );
1493        clientGotUnwantedBlock( msgs, req );
1494        return 0;
1495    }
1496
1497    /**
1498    ***  Save the block
1499    **/
1500
1501    msgs->info->peerSentPieceDataAt = time( NULL );
1502    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1503        return err;
1504
1505    tr_cpBlockAdd( tor->completion, block );
1506
1507    addPeerToBlamefield( msgs, req->index );
1508
1509    fireGotBlock( msgs, req );
1510
1511    /**
1512    ***  Handle if this was the last block in the piece
1513    **/
1514
1515    if( tr_cpPieceIsComplete( tor->completion, req->index ) )
1516    {
1517        const tr_errno err = tr_ioTestPiece( tor, req->index );
1518
1519        if( err )
1520            tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test: %s" ),
1521                       (unsigned long)req->index,
1522                       tr_errorString( err ) );
1523
1524        tr_torrentSetHasPiece( tor, req->index, !err );
1525        tr_torrentSetPieceChecked( tor, req->index, TRUE );
1526        tr_peerMgrSetBlame( tor->handle->peerMgr, tor->info.hash, req->index, !err );
1527
1528        if( !err )
1529            fireClientHave( msgs, req->index );
1530        else
1531            gotBadPiece( msgs, req->index );
1532    }
1533
1534    return 0;
1535}
1536
1537static void
1538didWrite( struct bufferevent * evin UNUSED, void * vmsgs )
1539{
1540    pulse( vmsgs );
1541}
1542
1543static ReadState
1544canRead( struct bufferevent * evin, void * vmsgs )
1545{
1546    ReadState ret;
1547    tr_peermsgs * msgs = vmsgs;
1548    struct evbuffer * in = EVBUFFER_INPUT ( evin );
1549    const size_t inlen = EVBUFFER_LENGTH( in );
1550
1551    if( !inlen )
1552    {
1553        ret = READ_DONE;
1554    }
1555    else if( msgs->state == AWAITING_BT_PIECE )
1556    {
1557        const size_t downloadMax = getDownloadMax( msgs );
1558        const size_t n = MIN( inlen, downloadMax );
1559        ret = n ? readBtPiece( msgs, in, n ) : READ_DONE;
1560    }
1561    else switch( msgs->state )
1562    {
1563        case AWAITING_BT_LENGTH:  ret = readBtLength ( msgs, in, inlen ); break;
1564        case AWAITING_BT_ID:      ret = readBtId     ( msgs, in, inlen ); break;
1565        case AWAITING_BT_MESSAGE: ret = readBtMessage( msgs, in, inlen ); break;
1566        default:                  assert( 0 );
1567    }
1568
1569    return ret;
1570}
1571
1572static void
1573sendKeepalive( tr_peermsgs * msgs )
1574{
1575    dbgmsg( msgs, "sending a keepalive message" );
1576    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1577}
1578
1579/**
1580***
1581**/
1582
1583static int
1584isSwiftEnabled( const tr_peermsgs * msgs )
1585{
1586    /* rationale: SWIFT is good for getting rid of deadbeats, but most
1587     * private trackers have ratios where you _want_ to feed deadbeats
1588     * as much as possible.  So we disable SWIFT on private torrents */
1589    return SWIFT_ENABLED
1590        && !tr_torrentIsSeed( msgs->torrent )
1591        && !tr_torrentIsPrivate( msgs->torrent );
1592}
1593
1594static size_t
1595getUploadMax( const tr_peermsgs * msgs )
1596{
1597    static const size_t maxval = ~0;
1598    const tr_torrent * tor = msgs->torrent;
1599    const int useSwift = isSwiftEnabled( msgs );
1600    const size_t swiftLeft = msgs->info->credit;
1601    size_t speedLeft;
1602    size_t bufLeft;
1603    size_t ret;
1604
1605    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1606        speedLeft = tor->handle->useUploadLimit ? tr_rcBytesLeft( tor->handle->upload ) : maxval;
1607    else if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
1608        speedLeft = tr_rcBytesLeft( tor->upload );
1609    else
1610        speedLeft = ~0;
1611
1612    bufLeft = MAX_OUTBUF_SIZE - tr_peerIoWriteBytesWaiting( msgs->io );
1613    ret = MIN( speedLeft, bufLeft );
1614    if( useSwift)
1615        ret = MIN( ret, swiftLeft );
1616    return ret;
1617}
1618
1619static int
1620ratePulse( void * vmsgs )
1621{
1622    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1623    msgs->info->rateToClient = tr_rcRate( msgs->info->rcToClient );
1624    msgs->info->rateToPeer = tr_rcRate( msgs->info->rcToPeer );
1625    msgs->maxActiveRequests = MIN( 4 + (int)(msgs->info->rateToClient/4), MAX_QUEUE_SIZE );
1626    msgs->minActiveRequests = msgs->maxActiveRequests / 3;
1627    return TRUE;
1628}
1629
1630static tr_errno
1631popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
1632{
1633    if( !reqListPop( &msgs->peerAskedForFast, setme ) )
1634        return 0;
1635    if( !reqListPop( &msgs->peerAskedFor, setme ) )
1636        return 0;
1637
1638    return TR_ERROR;
1639}
1640
1641static int
1642pulse( void * vmsgs )
1643{
1644    const time_t now = time( NULL );
1645    tr_peermsgs * msgs = vmsgs;
1646
1647    tr_peerIoTryRead( msgs->io );
1648    pumpRequestQueue( msgs );
1649    expireOldRequests( msgs );
1650
1651    if( msgs->sendingBlock )
1652    {
1653        const size_t uploadMax = getUploadMax( msgs );
1654        size_t len = EVBUFFER_LENGTH( msgs->outBlock );
1655        const size_t outlen = MIN( len, uploadMax );
1656
1657        assert( len );
1658
1659        if( outlen )
1660        {
1661            tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen );
1662            evbuffer_drain( msgs->outBlock, outlen );
1663            peerGotBytes( msgs, outlen );
1664
1665            len -= outlen;
1666            msgs->clientSentAnythingAt = now;
1667            msgs->sendingBlock = len!=0;
1668
1669            dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen, (int)len );
1670        }
1671        else dbgmsg( msgs, "stalled writing block... uploadMax %lu, outlen %lu", uploadMax, outlen );
1672    }
1673
1674    if( !msgs->sendingBlock )
1675    {
1676        struct peer_request req;
1677
1678        if(( EVBUFFER_LENGTH( msgs->outMessages ) ))
1679        {
1680            dbgmsg( msgs, "flushing outMessages..." );
1681            tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1682            msgs->clientSentAnythingAt = now;
1683        }
1684        else if( !EVBUFFER_LENGTH( msgs->outBlock )
1685            && !popNextRequest( msgs, &req )
1686            && requestIsValid( msgs, &req )
1687            && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) )
1688        {
1689            uint8_t * buf = tr_new( uint8_t, req.length );
1690
1691            if( !tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf ) )
1692            {
1693                tr_peerIo * io = msgs->io;
1694                struct evbuffer * out = msgs->outBlock;
1695
1696                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1697                tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 2*sizeof(uint32_t) + req.length );
1698                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1699                tr_peerIoWriteUint32( io, out, req.index );
1700                tr_peerIoWriteUint32( io, out, req.offset );
1701                tr_peerIoWriteBytes ( io, out, buf, req.length );
1702                msgs->sendingBlock = 1;
1703            }
1704
1705            tr_free( buf );
1706        }
1707        else if( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
1708        {
1709            sendKeepalive( msgs );
1710        }
1711    }
1712
1713    return TRUE; /* loop forever */
1714}
1715
1716static void
1717gotError( struct bufferevent * evbuf UNUSED, short what, void * vmsgs )
1718{
1719    if( what & EVBUFFER_TIMEOUT )
1720        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1721    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1722        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1723                what, errno, tr_strerror(errno) );
1724    fireError( vmsgs, TR_ERROR );
1725}
1726
1727static void
1728sendBitfield( tr_peermsgs * msgs )
1729{
1730    const tr_bitfield * bitfield = tr_cpPieceBitfield( msgs->torrent->completion );
1731    struct evbuffer * out = msgs->outMessages;
1732
1733    dbgmsg( msgs, "sending peer a bitfield message" );
1734    tr_peerIoWriteUint32( msgs->io, out, sizeof(uint8_t) + bitfield->len );
1735    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1736    tr_peerIoWriteBytes ( msgs->io, out, bitfield->bits, bitfield->len );
1737}
1738
1739/**
1740***
1741**/
1742
1743/* some peers give us error messages if we send
1744   more than this many peers in a single pex message
1745   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1746#define MAX_PEX_ADDED 50
1747#define MAX_PEX_DROPPED 50
1748
1749typedef struct
1750{
1751    tr_pex * added;
1752    tr_pex * dropped;
1753    tr_pex * elements;
1754    int addedCount;
1755    int droppedCount;
1756    int elementCount;
1757}
1758PexDiffs;
1759
1760static void
1761pexAddedCb( void * vpex, void * userData )
1762{
1763    PexDiffs * diffs = userData;
1764    tr_pex * pex = vpex;
1765    if( diffs->addedCount < MAX_PEX_ADDED )
1766    {
1767        diffs->added[diffs->addedCount++] = *pex;
1768        diffs->elements[diffs->elementCount++] = *pex;
1769    }
1770}
1771
1772static void
1773pexDroppedCb( void * vpex, void * userData )
1774{
1775    PexDiffs * diffs = userData;
1776    tr_pex * pex = vpex;
1777    if( diffs->droppedCount < MAX_PEX_DROPPED )
1778    {
1779        diffs->dropped[diffs->droppedCount++] = *pex;
1780    }
1781}
1782
1783static void
1784pexElementCb( void * vpex, void * userData )
1785{
1786    PexDiffs * diffs = userData;
1787    tr_pex * pex = vpex;
1788    diffs->elements[diffs->elementCount++] = *pex;
1789}
1790
1791static void
1792sendPex( tr_peermsgs * msgs )
1793{
1794    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
1795    {
1796        int i;
1797        tr_pex * newPex = NULL;
1798        const int newCount = tr_peerMgrGetPeers( msgs->handle->peerMgr, msgs->torrent->info.hash, &newPex );
1799        PexDiffs diffs;
1800        tr_benc val, *added, *dropped, *flags;
1801        uint8_t *tmp, *walk;
1802        char * benc;
1803        int bencLen;
1804
1805        /* build the diffs */
1806        diffs.added = tr_new( tr_pex, newCount );
1807        diffs.addedCount = 0;
1808        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1809        diffs.droppedCount = 0;
1810        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1811        diffs.elementCount = 0;
1812        tr_set_compare( msgs->pex, msgs->pexCount,
1813                        newPex, newCount,
1814                        tr_pexCompare, sizeof(tr_pex),
1815                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
1816        dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1817
1818        /* update peer */
1819        tr_free( msgs->pex );
1820        msgs->pex = diffs.elements;
1821        msgs->pexCount = diffs.elementCount;
1822
1823        /* build the pex payload */
1824        tr_bencInitDict( &val, 3 );
1825
1826        /* "added" */
1827        added = tr_bencDictAdd( &val, "added" );
1828        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1829        for( i=0; i<diffs.addedCount; ++i ) {
1830            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1831            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1832        }
1833        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1834        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
1835
1836        /* "added.f" */
1837        flags = tr_bencDictAdd( &val, "added.f" );
1838        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1839        for( i=0; i<diffs.addedCount; ++i )
1840            *walk++ = diffs.added[i].flags;
1841        assert( ( walk - tmp ) == diffs.addedCount );
1842        tr_bencInitStr( flags, tmp, walk-tmp, FALSE );
1843
1844        /* "dropped" */
1845        dropped = tr_bencDictAdd( &val, "dropped" );
1846        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1847        for( i=0; i<diffs.droppedCount; ++i ) {
1848            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1849            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1850        }
1851        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1852        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
1853
1854        /* write the pex message */
1855        benc = tr_bencSave( &val, &bencLen );
1856        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 2*sizeof(uint8_t) + bencLen );
1857        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_LTEP );
1858        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, msgs->ut_pex_id );
1859        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, benc, bencLen );
1860
1861        /* cleanup */
1862        tr_free( benc );
1863        tr_bencFree( &val );
1864        tr_free( diffs.added );
1865        tr_free( diffs.dropped );
1866        tr_free( newPex );
1867
1868        msgs->clientSentPexAt = time( NULL );
1869    }
1870}
1871
1872static int
1873pexPulse( void * vpeer )
1874{
1875    sendPex( vpeer );
1876    return TRUE;
1877}
1878
1879/**
1880***
1881**/
1882
1883tr_peermsgs*
1884tr_peerMsgsNew( struct tr_torrent * torrent,
1885                struct tr_peer    * info,
1886                tr_delivery_func    func,
1887                void              * userData,
1888                tr_publisher_tag  * setme )
1889{
1890    tr_peermsgs * m;
1891
1892    assert( info != NULL );
1893    assert( info->io != NULL );
1894
1895    m = tr_new0( tr_peermsgs, 1 );
1896    m->publisher = tr_publisherNew( );
1897    m->info = info;
1898    m->handle = torrent->handle;
1899    m->torrent = torrent;
1900    m->io = info->io;
1901    m->info->clientIsChoked = 1;
1902    m->info->peerIsChoked = 1;
1903    m->info->clientIsInterested = 0;
1904    m->info->peerIsInterested = 0;
1905    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1906    m->state = AWAITING_BT_LENGTH;
1907    m->pulseTimer = tr_timerNew( m->handle, pulse, m, PEER_PULSE_INTERVAL );
1908    m->rateTimer = tr_timerNew( m->handle, ratePulse, m, RATE_PULSE_INTERVAL );
1909    m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL );
1910    m->outMessages = evbuffer_new( );
1911    m->incoming.block = evbuffer_new( );
1912    m->outBlock = evbuffer_new( );
1913    m->peerAllowedPieces = NULL;
1914    m->peerAskedFor = REQUEST_LIST_INIT;
1915    m->peerAskedForFast = REQUEST_LIST_INIT;
1916    m->clientAskedFor = REQUEST_LIST_INIT;
1917    m->clientWillAskFor = REQUEST_LIST_INIT;
1918    *setme = tr_publisherSubscribe( m->publisher, func, userData );
1919
1920    if ( tr_peerIoSupportsLTEP( m->io ) )
1921        sendLtepHandshake( m );
1922
1923    sendBitfield( m );
1924   
1925    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of inactivity */
1926    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
1927    ratePulse( m );
1928
1929    return m;
1930}
1931
1932void
1933tr_peerMsgsFree( tr_peermsgs* msgs )
1934{
1935    if( msgs != NULL )
1936    {
1937        tr_timerFree( &msgs->pulseTimer );
1938        tr_timerFree( &msgs->rateTimer );
1939        tr_timerFree( &msgs->pexTimer );
1940        tr_publisherFree( &msgs->publisher );
1941        reqListClear( &msgs->clientWillAskFor );
1942        reqListClear( &msgs->clientAskedFor );
1943        reqListClear( &msgs->peerAskedForFast );
1944        reqListClear( &msgs->peerAskedFor );
1945        tr_bitfieldFree( msgs->peerAllowedPieces );
1946        evbuffer_free( msgs->incoming.block );
1947        evbuffer_free( msgs->outMessages );
1948        evbuffer_free( msgs->outBlock );
1949        tr_free( msgs->pex );
1950
1951        memset( msgs, ~0, sizeof( tr_peermsgs ) );
1952        tr_free( msgs );
1953    }
1954}
1955
1956tr_publisher_tag
1957tr_peerMsgsSubscribe( tr_peermsgs       * peer,
1958                      tr_delivery_func    func,
1959                      void              * userData )
1960{
1961    return tr_publisherSubscribe( peer->publisher, func, userData );
1962}
1963
1964void
1965tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
1966                        tr_publisher_tag    tag )
1967{
1968    tr_publisherUnsubscribe( peer->publisher, tag );
1969}
Note: See TracBrowser for help on using the repository browser.