source: trunk/libtransmission/peer-msgs.c @ 5651

Last change on this file since 5651 was 5651, checked in by charles, 14 years ago

add #include <limits.h> to pick up a definition of INT_MAX to make Hudson happy

  • Property svn:keywords set to Date Rev Author Id
File size: 55.5 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 5651 2008-04-19 19:39:39Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19#include <libgen.h> /* basename */
20
21#include <event.h>
22
23#include "transmission.h"
24#include "bencode.h"
25#include "completion.h"
26#include "inout.h"
27#include "peer-io.h"
28#include "peer-mgr.h"
29#include "peer-mgr-private.h"
30#include "peer-msgs.h"
31#include "ratecontrol.h"
32#include "stats.h"
33#include "torrent.h"
34#include "trevent.h"
35#include "utils.h"
36
37/**
38***
39**/
40
41enum
42{
43    BT_CHOKE                = 0,
44    BT_UNCHOKE              = 1,
45    BT_INTERESTED           = 2,
46    BT_NOT_INTERESTED       = 3,
47    BT_HAVE                 = 4,
48    BT_BITFIELD             = 5,
49    BT_REQUEST              = 6,
50    BT_PIECE                = 7,
51    BT_CANCEL               = 8,
52    BT_PORT                 = 9,
53    BT_SUGGEST              = 13,
54    BT_HAVE_ALL             = 14,
55    BT_HAVE_NONE            = 15,
56    BT_REJECT               = 16,
57    BT_ALLOWED_FAST         = 17,
58    BT_LTEP                 = 20,
59
60    LTEP_HANDSHAKE          = 0,
61
62    TR_LTEP_PEX             = 1,
63
64    MIN_CHOKE_PERIOD_SEC    = (10),
65
66    /* idle seconds before we send a keepalive */
67    KEEPALIVE_INTERVAL_SECS = 90,
68
69    PEX_INTERVAL            = (90 * 1000), /* msec between sendPex() calls */
70    PEER_PULSE_INTERVAL     = (100),       /* msec between pulse() calls */
71    RATE_PULSE_INTERVAL     = (250),       /* msec between ratePulse() calls */
72
73    MAX_QUEUE_SIZE          = (100),
74     
75    /* (fast peers) max number of pieces we fast-allow to another peer */
76    MAX_FAST_ALLOWED_COUNT   = 10,
77
78    /* (fast peers) max threshold for allowing fast-pieces requests */
79    MAX_FAST_ALLOWED_THRESHOLD = 10,
80
81    /* how long an unsent request can stay queued before it's returned
82       back to the peer-mgr's pool of requests */
83    QUEUED_REQUEST_TTL_SECS = 20,
84
85    /* how long a sent request can stay queued before it's returned
86       back to the peer-mgr's pool of requests */
87    SENT_REQUEST_TTL_SECS = 120
88};
89
90/**
91***  REQUEST MANAGEMENT
92**/
93
94enum
95{
96    AWAITING_BT_LENGTH,
97    AWAITING_BT_ID,
98    AWAITING_BT_MESSAGE,
99    AWAITING_BT_PIECE
100};
101
102struct peer_request
103{
104    uint32_t index;
105    uint32_t offset;
106    uint32_t length;
107    time_t time_requested;
108};
109
110static int
111compareRequest( const void * va, const void * vb )
112{
113    int i;
114    const struct peer_request * a = va;
115    const struct peer_request * b = vb;
116    if(( i = tr_compareUint32( a->index, b->index ))) return i;
117    if(( i = tr_compareUint32( a->offset, b->offset ))) return i;
118    if(( i = tr_compareUint32( a->length, b->length ))) return i;
119    return 0;
120}
121
122struct request_list
123{
124    uint16_t count;
125    uint16_t max;
126    struct peer_request * requests;
127};
128
129static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
130
131static void
132reqListReserve( struct request_list * list, uint16_t max )
133{
134    if( list->max < max )
135    {
136        list->max = max;
137        list->requests = tr_renew( struct peer_request,
138                                   list->requests,
139                                   list->max );
140    }
141}
142
143static void
144reqListClear( struct request_list * list )
145{
146    tr_free( list->requests );
147    *list = REQUEST_LIST_INIT;
148}
149
150static void
151reqListCopy( struct request_list * dest, const struct request_list * src )
152{
153    dest->count = src->count;
154    dest->max = src->max;
155    dest->requests = tr_new( struct peer_request, dest->max );
156    memcpy( dest->requests, src->requests, sizeof( struct peer_request ) * dest->count );
157}
158
159static void
160reqListRemoveOne( struct request_list * list, int i )
161{
162    assert( 0<=i && i<list->count );
163
164    memmove( &list->requests[i],
165             &list->requests[i+1],
166             sizeof( struct peer_request ) * ( --list->count - i ) );
167}
168
169static void
170reqListAppend( struct request_list * list, const struct peer_request * req )
171{
172    if( ++list->count >= list->max )
173        reqListReserve( list, list->max + 8 );
174
175    list->requests[list->count-1] = *req;
176}
177
178static tr_errno
179reqListPop( struct request_list * list, struct peer_request * setme )
180{
181    tr_errno err;
182
183    if( !list->count )
184        err = TR_ERROR;
185    else {
186        *setme = list->requests[0];
187        reqListRemoveOne( list, 0 );
188        err = TR_OK;
189    }
190
191    return err;
192}
193
194static int
195reqListFind( struct request_list * list, const struct peer_request * key )
196{
197    uint16_t i;
198    for( i=0; i<list->count; ++i )
199        if( !compareRequest( key, list->requests+i ) )
200            return i;
201    return -1;
202}
203
204static tr_errno
205reqListRemove( struct request_list * list, const struct peer_request * key )
206{
207    tr_errno err;
208    const int i = reqListFind( list, key );
209
210    if( i < 0 )
211        err = TR_ERROR;
212    else {
213        err = TR_OK;
214        reqListRemoveOne( list, i );
215    }
216
217    return err;
218}
219
220/**
221***
222**/
223
224/* this is raw, unchanged data from the peer regarding
225 * the current message that it's sending us. */
226struct tr_incoming
227{
228    uint8_t id;
229    uint32_t length; /* includes the +1 for id length */
230    struct peer_request blockReq; /* metadata for incoming blocks */
231    struct evbuffer * block; /* piece data for incoming blocks */
232};
233
234struct tr_peermsgs
235{
236    unsigned int peerSentBitfield         : 1;
237    unsigned int peerSupportsPex          : 1;
238    unsigned int clientSentLtepHandshake  : 1;
239    unsigned int peerSentLtepHandshake    : 1;
240    unsigned int sendingBlock             : 1;
241   
242    uint8_t state;
243    uint8_t ut_pex_id;
244    uint16_t pexCount;
245    uint16_t maxActiveRequests;
246    uint16_t minActiveRequests;
247
248    tr_peer * info;
249
250    tr_handle * handle;
251    tr_torrent * torrent;
252    tr_peerIo * io;
253
254    tr_publisher_t * publisher;
255
256    struct evbuffer * outBlock;    /* buffer of the current piece message */
257    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
258
259    struct request_list peerAskedFor;
260    struct request_list peerAskedForFast;
261    struct request_list clientAskedFor;
262    struct request_list clientWillAskFor;
263
264    tr_timer * rateTimer;
265    tr_timer * pulseTimer;
266    tr_timer * pexTimer;
267
268    time_t clientSentPexAt;
269    time_t clientSentAnythingAt;
270   
271    tr_bitfield * peerAllowedPieces;
272
273    struct tr_incoming incoming; 
274
275    tr_pex * pex;
276};
277
278/**
279***
280**/
281
282static void
283myDebug( const char * file, int line,
284         const struct tr_peermsgs * msgs,
285         const char * fmt, ... )
286{
287    FILE * fp = tr_getLog( );
288    if( fp != NULL )
289    {
290        va_list args;
291        char timestr[64];
292        struct evbuffer * buf = evbuffer_new( );
293        char * myfile = tr_strdup( file );
294
295        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
296                             tr_getLogTimeStr( timestr, sizeof(timestr) ),
297                             msgs->torrent->info.name,
298                             tr_peerIoGetAddrStr( msgs->io ),
299                             msgs->info->client );
300        va_start( args, fmt );
301        evbuffer_add_vprintf( buf, fmt, args );
302        va_end( args );
303        evbuffer_add_printf( buf, " (%s:%d)\n", basename(myfile), line );
304        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
305
306        tr_free( myfile );
307        evbuffer_free( buf );
308    }
309}
310
311#define dbgmsg(msgs, fmt...) myDebug(__FILE__, __LINE__, msgs, ##fmt )
312
313/**
314***
315**/
316
317static void
318protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
319{
320    tr_peerIo * io = msgs->io;
321    struct evbuffer * out = msgs->outMessages;
322
323    dbgmsg( msgs, "requesting %u:%u->%u", req->index, req->offset, req->length );
324    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
325    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
326    tr_peerIoWriteUint32( io, out, req->index );
327    tr_peerIoWriteUint32( io, out, req->offset );
328    tr_peerIoWriteUint32( io, out, req->length );
329}
330
331static void
332protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
333{
334    tr_peerIo * io = msgs->io;
335    struct evbuffer * out = msgs->outMessages;
336
337    dbgmsg( msgs, "cancelling %u:%u->%u", req->index, req->offset, req->length );
338    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
339    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
340    tr_peerIoWriteUint32( io, out, req->index );
341    tr_peerIoWriteUint32( io, out, req->offset );
342    tr_peerIoWriteUint32( io, out, req->length );
343}
344
345static void
346protocolSendHave( tr_peermsgs * msgs, uint32_t index )
347{
348    tr_peerIo * io = msgs->io;
349    struct evbuffer * out = msgs->outMessages;
350
351    dbgmsg( msgs, "sending Have %u", index );
352    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
353    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
354    tr_peerIoWriteUint32( io, out, index );
355}
356
357static void
358protocolSendChoke( tr_peermsgs * msgs, int choke )
359{
360    tr_peerIo * io = msgs->io;
361    struct evbuffer * out = msgs->outMessages;
362
363    dbgmsg( msgs, "sending %s", (choke ? "Choke" : "Unchoke") );
364    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) );
365    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
366}
367
368/**
369***  EVENTS
370**/
371
372static const tr_peermsgs_event blankEvent = { 0, 0, 0, 0, 0.0f, 0 };
373
374static void
375publish( tr_peermsgs * msgs, tr_peermsgs_event * e )
376{
377    tr_publisherPublish( msgs->publisher, msgs->info, e );
378}
379
380static void
381fireError( tr_peermsgs * msgs, tr_errno err )
382{
383    tr_peermsgs_event e = blankEvent;
384    e.eventType = TR_PEERMSG_ERROR;
385    e.err = err;
386    publish( msgs, &e );
387}
388
389static void
390fireNeedReq( tr_peermsgs * msgs )
391{
392    tr_peermsgs_event e = blankEvent;
393    e.eventType = TR_PEERMSG_NEED_REQ;
394    publish( msgs, &e );
395}
396
397static void
398firePeerProgress( tr_peermsgs * msgs )
399{
400    tr_peermsgs_event e = blankEvent;
401    e.eventType = TR_PEERMSG_PEER_PROGRESS;
402    e.progress = msgs->info->progress;
403    publish( msgs, &e );
404}
405
406static void
407fireClientHave( tr_peermsgs * msgs, uint32_t pieceIndex )
408{
409    tr_peermsgs_event e = blankEvent;
410    e.eventType = TR_PEERMSG_CLIENT_HAVE;
411    e.pieceIndex = pieceIndex;
412    publish( msgs, &e );
413}
414
415static void
416fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
417{
418    tr_peermsgs_event e = blankEvent;
419    e.eventType = TR_PEERMSG_CLIENT_BLOCK;
420    e.pieceIndex = req->index;
421    e.offset = req->offset;
422    e.length = req->length;
423    publish( msgs, &e );
424}
425
426static void
427firePieceData( tr_peermsgs * msgs )
428{
429    tr_peermsgs_event e = blankEvent;
430    e.eventType = TR_PEERMSG_PIECE_DATA;
431    publish( msgs, &e );
432}
433
434static void
435fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
436{
437    tr_peermsgs_event e = blankEvent;
438    e.eventType = TR_PEERMSG_CANCEL;
439    e.pieceIndex = req->index;
440    e.offset = req->offset;
441    e.length = req->length;
442    publish( msgs, &e );
443}
444
445/**
446***  INTEREST
447**/
448
449static int
450isPieceInteresting( const tr_peermsgs   * peer,
451                    int                   piece )
452{
453    const tr_torrent * torrent = peer->torrent;
454
455    return ( ( !torrent->info.pieces[piece].dnd )               /* we want it */
456          && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have */
457          && ( tr_bitfieldHas( peer->info->have, piece ) ) );  /* peer has it */
458}
459
460/* "interested" means we'll ask for piece data if they unchoke us */
461static int
462isPeerInteresting( const tr_peermsgs * msgs )
463{
464    tr_piece_index_t i;
465    const tr_torrent * torrent;
466    const tr_bitfield * bitfield;
467    const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
468
469    if( clientIsSeed )
470        return FALSE;
471
472    torrent = msgs->torrent;
473    bitfield = tr_cpPieceBitfield( torrent->completion );
474
475    if( !msgs->info->have )
476        return TRUE;
477
478    assert( bitfield->len == msgs->info->have->len );
479    for( i=0; i<torrent->info.pieceCount; ++i )
480        if( isPieceInteresting( msgs, i ) )
481            return TRUE;
482
483    return FALSE;
484}
485
486static void
487sendInterest( tr_peermsgs * msgs, int weAreInterested )
488{
489    assert( msgs != NULL );
490    assert( weAreInterested==0 || weAreInterested==1 );
491
492    msgs->info->clientIsInterested = weAreInterested;
493    dbgmsg( msgs, "Sending %s",
494            weAreInterested ? "Interested" : "Not Interested");
495
496    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
497    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages,
498                   weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
499}
500
501static void
502updateInterest( tr_peermsgs * msgs )
503{
504    const int i = isPeerInteresting( msgs );
505    if( i != msgs->info->clientIsInterested )
506        sendInterest( msgs, i );
507    if( i )
508        fireNeedReq( msgs );
509}
510
511static void
512cancelAllRequestsToClientExceptFast( tr_peermsgs * msgs )
513{
514    reqListClear( &msgs->peerAskedFor );
515}
516
517void
518tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
519{
520    const time_t fibrillationTime = time(NULL) - MIN_CHOKE_PERIOD_SEC;
521
522    assert( msgs != NULL );
523    assert( msgs->info != NULL );
524    assert( choke==0 || choke==1 );
525
526    if( msgs->info->chokeChangedAt > fibrillationTime )
527    {
528        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
529    }
530    else if( msgs->info->peerIsChoked != choke )
531    {
532        msgs->info->peerIsChoked = choke;
533        if( choke )
534            cancelAllRequestsToClientExceptFast( msgs );
535        protocolSendChoke( msgs, choke );
536        msgs->info->chokeChangedAt = time( NULL );
537    }
538}
539
540/**
541***
542**/
543
544void
545tr_peerMsgsHave( tr_peermsgs * msgs,
546                 uint32_t      index )
547{
548    protocolSendHave( msgs, index );
549
550    /* since we have more pieces now, we might not be interested in this peer */
551    updateInterest( msgs );
552}
553#if 0
554static void
555sendFastSuggest( tr_peermsgs * msgs,
556                 uint32_t      pieceIndex )
557{
558    assert( msgs != NULL );
559   
560    if( tr_peerIoSupportsFEXT( msgs->io ) )
561    {
562        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
563        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
564        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
565    }
566}
567static void
568sendFastHave( tr_peermsgs * msgs, int all )
569{
570    assert( msgs != NULL );
571   
572    if( tr_peerIoSupportsFEXT( msgs->io ) )
573    {
574        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
575        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, ( all ? BT_HAVE_ALL
576                                                                : BT_HAVE_NONE ) );
577        updateInterest( msgs );
578    }
579}
580#endif
581
582static void
583sendFastReject( tr_peermsgs * msgs,
584                uint32_t      pieceIndex,
585                uint32_t      offset,
586                uint32_t      length )
587{
588    assert( msgs != NULL );
589
590    if( tr_peerIoSupportsFEXT( msgs->io ) )
591    {
592        const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
593        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
594        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_REJECT );
595        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
596        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
597        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
598    }
599}
600
601static tr_bitfield*
602getPeerAllowedPieces( tr_peermsgs * msgs )
603{
604    if( !msgs->peerAllowedPieces && tr_peerIoSupportsFEXT( msgs->io ) )
605    {
606        msgs->peerAllowedPieces = tr_peerMgrGenerateAllowedSet(
607            MAX_FAST_ALLOWED_COUNT,
608            msgs->torrent->info.pieceCount,
609            msgs->torrent->info.hash,
610            tr_peerIoGetAddress( msgs->io, NULL ) );
611    }
612
613    return msgs->peerAllowedPieces;
614}
615
616static void
617sendFastAllowed( tr_peermsgs * msgs,
618                 uint32_t      pieceIndex)
619{
620    assert( msgs != NULL );
621   
622    if( tr_peerIoSupportsFEXT( msgs->io ) )
623    {
624        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
625        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_ALLOWED_FAST );
626        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
627    }
628}
629
630static void
631sendFastAllowedSet( tr_peermsgs * msgs )
632{
633    tr_piece_index_t i = 0;
634
635    while (i <= msgs->torrent->info.pieceCount )
636    {
637        if ( tr_bitfieldHas( getPeerAllowedPieces( msgs ), i) )
638            sendFastAllowed( msgs, i );
639        i++;
640    }
641}
642
643static void
644maybeSendFastAllowedSet( tr_peermsgs * msgs )
645{
646    if( tr_bitfieldCountTrueBits( msgs->info->have ) <= MAX_FAST_ALLOWED_THRESHOLD )
647        sendFastAllowedSet( msgs );
648}
649
650
651/**
652***
653**/
654
655static int
656reqIsValid( const tr_peermsgs   * msgs,
657            uint32_t              index,
658            uint32_t              offset,
659            uint32_t              length )
660{
661    return tr_torrentReqIsValid( msgs->torrent, index, offset, length );
662}
663
664static int
665requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
666{
667    return reqIsValid( msgs, req->index, req->offset, req->length );
668}
669
670static void
671expireOldRequests( tr_peermsgs * msgs )
672{
673    int i;
674    time_t oldestAllowed;
675    struct request_list tmp = REQUEST_LIST_INIT;
676
677    /* cancel requests that have been queued for too long */
678    oldestAllowed = time( NULL ) - QUEUED_REQUEST_TTL_SECS;
679    reqListCopy( &tmp, &msgs->clientWillAskFor );
680    for( i=0; i<tmp.count; ++i ) {
681        const struct peer_request * req = &tmp.requests[i];
682        if( req->time_requested < oldestAllowed )
683            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
684    }
685    reqListClear( &tmp );
686
687    /* cancel requests that were sent too long ago */
688    oldestAllowed = time( NULL ) - SENT_REQUEST_TTL_SECS;
689    reqListCopy( &tmp, &msgs->clientAskedFor );
690    for( i=0; i<tmp.count; ++i ) {
691        const struct peer_request * req = &tmp.requests[i];
692        if( req->time_requested < oldestAllowed )
693            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
694    }
695    reqListClear( &tmp );
696}
697
698static void
699pumpRequestQueue( tr_peermsgs * msgs )
700{
701    const int max = msgs->maxActiveRequests;
702    const int min = msgs->minActiveRequests;
703    const time_t now = time( NULL );
704    int sent = 0;
705    int count = msgs->clientAskedFor.count;
706    struct peer_request req;
707
708    if( count > min )
709        return;
710    if( msgs->info->clientIsChoked )
711        return;
712
713    while( ( count < max ) && !reqListPop( &msgs->clientWillAskFor, &req ) )
714    {
715        assert( requestIsValid( msgs, &req ) );
716        assert( tr_bitfieldHas( msgs->info->have, req.index ) );
717
718        protocolSendRequest( msgs, &req );
719        req.time_requested = now;
720        reqListAppend( &msgs->clientAskedFor, &req );
721
722        ++count;
723        ++sent;
724    }
725
726    if( sent )
727        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
728                sent,
729                msgs->clientAskedFor.count,
730                msgs->clientWillAskFor.count );
731
732    if( count < max )
733        fireNeedReq( msgs );
734}
735
736static int
737pulse( void * vmsgs );
738
739int
740tr_peerMsgsAddRequest( tr_peermsgs * msgs,
741                       uint32_t      index, 
742                       uint32_t      offset, 
743                       uint32_t      length )
744{
745    const int req_max = msgs->maxActiveRequests;
746    struct peer_request req;
747
748    assert( msgs != NULL );
749    assert( msgs->torrent != NULL );
750    assert( reqIsValid( msgs, index, offset, length ) );
751
752    /**
753    ***  Reasons to decline the request
754    **/
755
756    /* don't send requests to choked clients */
757    if( msgs->info->clientIsChoked ) {
758        dbgmsg( msgs, "declining request because they're choking us" );
759        return TR_ADDREQ_CLIENT_CHOKED;
760    }
761
762    /* peer doesn't have this piece */
763    if( !tr_bitfieldHas( msgs->info->have, index ) )
764        return TR_ADDREQ_MISSING;
765
766    /* peer's queue is full */
767    if( msgs->clientWillAskFor.count >= req_max ) {
768        dbgmsg( msgs, "declining request because we're full" );
769        return TR_ADDREQ_FULL;
770    }
771
772    /* have we already asked for this piece? */
773    req.index = index;
774    req.offset = offset;
775    req.length = length;
776    if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
777        dbgmsg( msgs, "declining because it's a duplicate" );
778        return TR_ADDREQ_DUPLICATE;
779    }
780    if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
781        dbgmsg( msgs, "declining because it's a duplicate" );
782        return TR_ADDREQ_DUPLICATE;
783    }
784
785    /**
786    ***  Accept this request
787    **/
788
789    dbgmsg( msgs, "added req for piece %d, offset %d", (int)index, (int)offset );
790    req.time_requested = time( NULL );
791    reqListAppend( &msgs->clientWillAskFor, &req );
792    return TR_ADDREQ_OK;
793}
794
795static void
796cancelAllRequestsToPeer( tr_peermsgs * msgs )
797{
798    int i;
799    struct request_list a = msgs->clientWillAskFor;
800    struct request_list b = msgs->clientAskedFor;
801
802    msgs->clientAskedFor = REQUEST_LIST_INIT;
803    msgs->clientWillAskFor = REQUEST_LIST_INIT;
804
805    for( i=0; i<a.count; ++i )
806        fireCancelledReq( msgs, &a.requests[i] );
807
808    for( i=0; i<b.count; ++i ) {
809        fireCancelledReq( msgs, &b.requests[i] );
810        protocolSendCancel( msgs, &b.requests[i] );
811    }
812
813    reqListClear( &a );
814    reqListClear( &b );
815}
816
817void
818tr_peerMsgsCancel( tr_peermsgs * msgs,
819                   uint32_t      pieceIndex,
820                   uint32_t      offset,
821                   uint32_t      length )
822{
823    struct peer_request req;
824
825    assert( msgs != NULL );
826    assert( length > 0 );
827
828    /* have we asked the peer for this piece? */
829    req.index = pieceIndex;
830    req.offset = offset;
831    req.length = length;
832
833    /* if it's only in the queue and hasn't been sent yet, free it */
834    if( !reqListRemove( &msgs->clientWillAskFor, &req ) )
835        fireCancelledReq( msgs, &req );
836
837    /* if it's already been sent, send a cancel message too */
838    if( !reqListRemove( &msgs->clientAskedFor, &req ) ) {
839        protocolSendCancel( msgs, &req );
840        fireCancelledReq( msgs, &req );
841    }
842}
843
844/**
845***
846**/
847
848static void
849sendLtepHandshake( tr_peermsgs * msgs )
850{
851    tr_benc val, *m;
852    char * buf;
853    int len;
854    int pex;
855    struct evbuffer * outbuf;
856
857    if( msgs->clientSentLtepHandshake )
858        return;
859
860    outbuf = evbuffer_new( );
861    dbgmsg( msgs, "sending an ltep handshake" );
862    msgs->clientSentLtepHandshake = 1;
863
864    /* decide if we want to advertise pex support */
865    if( !tr_torrentAllowsPex( msgs->torrent ) )
866        pex = 0;
867    else if( msgs->peerSentLtepHandshake )
868        pex = msgs->peerSupportsPex ? 1 : 0;
869    else
870        pex = 1;
871
872    tr_bencInitDict( &val, 4 );
873    tr_bencDictAddInt( &val, "e", msgs->handle->encryptionMode != TR_PLAINTEXT_PREFERRED );
874    tr_bencDictAddInt( &val, "p", tr_getPublicPort( msgs->handle ) );
875    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
876    m  = tr_bencDictAddDict( &val, "m", 1 );
877    if( pex )
878        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
879    buf = tr_bencSave( &val, &len );
880
881    tr_peerIoWriteUint32( msgs->io, outbuf, 2*sizeof(uint8_t) + len );
882    tr_peerIoWriteUint8 ( msgs->io, outbuf, BT_LTEP );
883    tr_peerIoWriteUint8 ( msgs->io, outbuf, LTEP_HANDSHAKE );
884    tr_peerIoWriteBytes ( msgs->io, outbuf, buf, len );
885
886    tr_peerIoWriteBuf( msgs->io, outbuf );
887
888    dbgmsg( msgs, "here is the ltep handshake we sent [%*.*s]", len, len, buf );
889
890    /* cleanup */
891    tr_bencFree( &val );
892    tr_free( buf );
893    evbuffer_free( outbuf );
894}
895
896static void
897parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
898{
899    tr_benc val, * sub;
900    uint8_t * tmp = tr_new( uint8_t, len );
901
902    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
903    msgs->peerSentLtepHandshake = 1;
904
905    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
906        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
907        tr_free( tmp );
908        return;
909    }
910
911    dbgmsg( msgs, "here is the ltep handshake we got [%*.*s]", len, len, tmp );
912
913    /* does the peer prefer encrypted connections? */
914    if(( sub = tr_bencDictFindType( &val, "e", TYPE_INT )))
915        msgs->info->encryption_preference = sub->val.i
916                                      ? ENCRYPTION_PREFERENCE_YES
917                                      : ENCRYPTION_PREFERENCE_NO;
918
919    /* check supported messages for utorrent pex */
920    msgs->peerSupportsPex = 0;
921    if(( sub = tr_bencDictFindType( &val, "m", TYPE_DICT ))) {
922        if(( sub = tr_bencDictFindType( sub, "ut_pex", TYPE_INT ))) {
923            msgs->ut_pex_id = (uint8_t) sub->val.i;
924            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
925            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
926        }
927    }
928
929    /* get peer's listening port */
930    if(( sub = tr_bencDictFindType( &val, "p", TYPE_INT ))) {
931        msgs->info->port = htons( (uint16_t)sub->val.i );
932        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
933    }
934
935    tr_bencFree( &val );
936    tr_free( tmp );
937}
938
939static void
940parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
941{
942    int loaded = 0;
943    uint8_t * tmp = tr_new( uint8_t, msglen );
944    tr_benc val, *added;
945    const tr_torrent * tor = msgs->torrent;
946    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
947
948    if( tr_torrentAllowsPex( tor )
949        && (( loaded = !tr_bencLoad( tmp, msglen, &val, NULL )))
950        && (( added = tr_bencDictFindType( &val, "added", TYPE_STR ))))
951    {
952        const char * added_f = NULL;
953        tr_pex * pex;
954        size_t i, n;
955        tr_bencDictFindStr( &val, "added.f", &added_f );
956        pex = tr_peerMgrCompactToPex( added->val.s.s, added->val.s.i, added_f, &n );
957        for( i=0; i<n; ++i )
958            tr_peerMgrAddPex( msgs->handle->peerMgr, tor->info.hash,
959                              TR_PEER_FROM_PEX, pex+i );
960        tr_free( pex );
961    }
962
963    if( loaded )
964        tr_bencFree( &val );
965    tr_free( tmp );
966}
967
968static void
969sendPex( tr_peermsgs * msgs );
970
971static void
972parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
973{
974    uint8_t ltep_msgid;
975
976    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
977    msglen--;
978
979    if( ltep_msgid == LTEP_HANDSHAKE )
980    {
981        dbgmsg( msgs, "got ltep handshake" );
982        parseLtepHandshake( msgs, msglen, inbuf );
983        if( tr_peerIoSupportsLTEP( msgs->io ) )
984        {
985            sendLtepHandshake( msgs );
986            sendPex( msgs );
987        }
988    }
989    else if( ltep_msgid == TR_LTEP_PEX )
990    {
991        dbgmsg( msgs, "got ut pex" );
992        msgs->peerSupportsPex = 1;
993        parseUtPex( msgs, msglen, inbuf );
994    }
995    else
996    {
997        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
998        evbuffer_drain( inbuf, msglen );
999    }
1000}
1001
1002static int
1003readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1004{
1005    uint32_t len;
1006
1007    if( inlen < sizeof(len) )
1008        return READ_MORE;
1009
1010    tr_peerIoReadUint32( msgs->io, inbuf, &len );
1011
1012    if( len == 0 ) /* peer sent us a keepalive message */
1013        dbgmsg( msgs, "got KeepAlive" );
1014    else {
1015        msgs->incoming.length = len;
1016        msgs->state = AWAITING_BT_ID;
1017    }
1018
1019    return READ_AGAIN;
1020}
1021
1022static int
1023readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen );
1024
1025static int
1026readBtId( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1027{
1028    uint8_t id;
1029
1030    if( inlen < sizeof(uint8_t) )
1031        return READ_MORE;
1032
1033    tr_peerIoReadUint8( msgs->io, inbuf, &id );
1034    msgs->incoming.id = id;
1035
1036    if( id==BT_PIECE )
1037    {
1038        msgs->state = AWAITING_BT_PIECE;
1039        return READ_AGAIN;
1040    }
1041    else if( msgs->incoming.length != 1 )
1042    {
1043        msgs->state = AWAITING_BT_MESSAGE;
1044        return READ_AGAIN;
1045    }
1046    else return readBtMessage( msgs, inbuf, inlen-1 );
1047}
1048
1049static void
1050updatePeerProgress( tr_peermsgs * msgs )
1051{
1052    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have )
1053                         / (float)msgs->torrent->info.pieceCount;
1054    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
1055    updateInterest( msgs );
1056    firePeerProgress( msgs );
1057}
1058
1059static int
1060clientCanSendFastBlock( const tr_peermsgs * msgs UNUSED )
1061{
1062    /* don't send a fast piece if peer has MAX_FAST_ALLOWED_THRESHOLD pieces */
1063    if( tr_bitfieldCountTrueBits( msgs->info->have ) > MAX_FAST_ALLOWED_THRESHOLD )
1064        return FALSE;
1065   
1066    /* ...or if we don't have ourself enough pieces */
1067    if( tr_bitfieldCountTrueBits( tr_cpPieceBitfield( msgs->torrent->completion ) ) < MAX_FAST_ALLOWED_THRESHOLD )
1068        return FALSE;
1069
1070    /* Maybe a bandwidth limit ? */
1071    return TRUE;
1072}
1073
1074static void
1075peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
1076{
1077    const int reqIsValid = requestIsValid( msgs, req );
1078    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
1079    const int peerIsChoked = msgs->info->peerIsChoked;
1080    const int peerIsFast = tr_peerIoSupportsFEXT( msgs->io );
1081    const int pieceIsFast = reqIsValid && tr_bitfieldHas( getPeerAllowedPieces( msgs ), req->index );
1082    const int canSendFast = clientCanSendFastBlock( msgs );
1083
1084    if( !reqIsValid ) /* bad request */
1085    {
1086        dbgmsg( msgs, "rejecting an invalid request." );
1087        sendFastReject( msgs, req->index, req->offset, req->length );
1088    }
1089    else if( !clientHasPiece ) /* we don't have it */
1090    {
1091        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1092        sendFastReject( msgs, req->index, req->offset, req->length );
1093    }
1094    else if( peerIsChoked && !peerIsFast ) /* doesn't he know he's choked? */
1095    {
1096        tr_peerMsgsSetChoke( msgs, 1 );
1097        sendFastReject( msgs, req->index, req->offset, req->length );
1098    }
1099    else if( peerIsChoked && peerIsFast && ( !pieceIsFast || !canSendFast ) )
1100    {
1101        sendFastReject( msgs, req->index, req->offset, req->length );
1102    }
1103    else /* YAY */
1104    {
1105        if( peerIsFast && pieceIsFast )
1106            reqListAppend( &msgs->peerAskedForFast, req );
1107        else
1108            reqListAppend( &msgs->peerAskedFor, req );
1109    }
1110}
1111
1112static int
1113messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1114{
1115    switch( id )
1116    {
1117        case BT_CHOKE:
1118        case BT_UNCHOKE:
1119        case BT_INTERESTED:
1120        case BT_NOT_INTERESTED:
1121        case BT_HAVE_ALL:
1122        case BT_HAVE_NONE:
1123            return len==1;
1124
1125        case BT_HAVE:
1126        case BT_SUGGEST:
1127        case BT_ALLOWED_FAST:
1128            return len==5;
1129
1130        case BT_BITFIELD:
1131            return len == (msg->torrent->info.pieceCount+7u)/8u + 1u;
1132       
1133        case BT_REQUEST:
1134        case BT_CANCEL:
1135        case BT_REJECT:
1136            return len==13;
1137
1138        case BT_PIECE:
1139            return len>9 && len<=16393;
1140
1141        case BT_PORT:
1142            return len==3;
1143
1144        case BT_LTEP:
1145            return len >= 2;
1146
1147        default:
1148            return FALSE;
1149    }
1150}
1151
1152static int
1153clientGotBlock( tr_peermsgs * msgs, const uint8_t * block, const struct peer_request * req );
1154
1155static void
1156clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1157{
1158    const time_t now = time( NULL );
1159    tr_torrent * tor = msgs->torrent;
1160    tor->activityDate = tr_date( );
1161    tor->downloadedCur += byteCount;
1162    msgs->info->pieceDataActivityDate = now;
1163    msgs->info->credit += (int)(byteCount * SWIFT_REPAYMENT_RATIO);
1164    tr_rcTransferred( msgs->info->rcToClient, byteCount );
1165    tr_rcTransferred( tor->download, byteCount );
1166    tr_rcTransferred( tor->handle->download, byteCount );
1167    tr_statsAddDownloaded( msgs->handle, byteCount );
1168    firePieceData( msgs );
1169}
1170
1171static int
1172readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1173{
1174    struct peer_request * req = &msgs->incoming.blockReq;
1175    assert( EVBUFFER_LENGTH(inbuf) >= inlen );
1176    dbgmsg( msgs, "In readBtPiece" );
1177
1178    if( !req->length )
1179    {
1180        if( inlen < 8 )
1181            return READ_MORE;
1182
1183        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
1184        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
1185        req->length = msgs->incoming.length - 9;
1186        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1187        return READ_AGAIN;
1188    }
1189    else
1190    {
1191        int err;
1192
1193        /* read in another chunk of data */
1194        const size_t nLeft = req->length - EVBUFFER_LENGTH(msgs->incoming.block);
1195        size_t n = MIN( nLeft, inlen );
1196        uint8_t * buf = tr_new( uint8_t, n );
1197        assert( EVBUFFER_LENGTH(inbuf) >= n );
1198        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
1199        evbuffer_add( msgs->incoming.block, buf, n );
1200        clientGotBytes( msgs, n );
1201        tr_free( buf );
1202        dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
1203               (int)n, req->index, req->offset, req->length,
1204               (int)( req->length - EVBUFFER_LENGTH(msgs->incoming.block) ) );
1205        if( EVBUFFER_LENGTH(msgs->incoming.block) < req->length )
1206            return READ_MORE;
1207
1208        /* we've got the whole block ... process it */
1209        err = clientGotBlock( msgs, EVBUFFER_DATA(msgs->incoming.block), req );
1210
1211        /* cleanup */
1212        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH(msgs->incoming.block) );
1213        req->length = 0;
1214        msgs->state = AWAITING_BT_LENGTH;
1215        if( !err )
1216            return READ_AGAIN;
1217        else {
1218            fireError( msgs, err );
1219            return READ_DONE;
1220        }
1221    }
1222}
1223
1224static int
1225readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1226{
1227    uint32_t ui32;
1228    uint32_t msglen = msgs->incoming.length;
1229    const uint8_t id = msgs->incoming.id;
1230    const size_t startBufLen = EVBUFFER_LENGTH( inbuf );
1231
1232    --msglen; /* id length */
1233
1234    if( inlen < msglen )
1235        return READ_MORE;
1236
1237    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id, (int)msglen, (int)inlen );
1238
1239    if( !messageLengthIsCorrect( msgs, id, msglen+1 ) )
1240    {
1241        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1242        fireError( msgs, TR_ERROR );
1243        return READ_DONE;
1244    }
1245
1246    switch( id )
1247    {
1248        case BT_CHOKE:
1249            dbgmsg( msgs, "got Choke" );
1250            msgs->info->clientIsChoked = 1;
1251            cancelAllRequestsToPeer( msgs );
1252            cancelAllRequestsToClientExceptFast( msgs );
1253            break;
1254
1255        case BT_UNCHOKE:
1256            dbgmsg( msgs, "got Unchoke" );
1257            msgs->info->clientIsChoked = 0;
1258            fireNeedReq( msgs );
1259            break;
1260
1261        case BT_INTERESTED:
1262            dbgmsg( msgs, "got Interested" );
1263            msgs->info->peerIsInterested = 1;
1264            tr_peerMsgsSetChoke( msgs, 0 );
1265            break;
1266
1267        case BT_NOT_INTERESTED:
1268            dbgmsg( msgs, "got Not Interested" );
1269            msgs->info->peerIsInterested = 0;
1270            break;
1271           
1272        case BT_HAVE:
1273            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1274            dbgmsg( msgs, "got Have: %u", ui32 );
1275            if( tr_bitfieldAdd( msgs->info->have, ui32 ) )
1276                fireError( msgs, TR_ERROR_PEER_MESSAGE );
1277            updatePeerProgress( msgs );
1278            tr_rcTransferred( msgs->torrent->swarmspeed, msgs->torrent->info.pieceSize );
1279            break;
1280
1281        case BT_BITFIELD: {
1282            const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
1283            int peerIsSeed;
1284            dbgmsg( msgs, "got a bitfield" );
1285            msgs->peerSentBitfield = 1;
1286            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
1287            updatePeerProgress( msgs );
1288            maybeSendFastAllowedSet( msgs );
1289            peerIsSeed = msgs->info->progress >= 1.0;
1290            tr_peerMsgsSetChoke( msgs, clientIsSeed && peerIsSeed );
1291            fireNeedReq( msgs );
1292            break;
1293        }
1294
1295        case BT_REQUEST: {
1296            struct peer_request r;
1297            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1298            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1299            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1300            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1301            peerMadeRequest( msgs, &r );
1302            break;
1303        }
1304
1305        case BT_CANCEL: {
1306            struct peer_request r;
1307            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1308            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1309            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1310            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1311            reqListRemove( &msgs->peerAskedForFast, &r );
1312            reqListRemove( &msgs->peerAskedFor, &r );
1313            break;
1314        }
1315
1316        case BT_PIECE:
1317            assert( 0 ); /* handled elsewhere! */
1318            break;
1319       
1320        case BT_PORT:
1321            dbgmsg( msgs, "Got a BT_PORT" );
1322            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1323            break;
1324           
1325        case BT_SUGGEST: {
1326            dbgmsg( msgs, "Got a BT_SUGGEST" );
1327            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1328            /* we don't do anything with this yet */
1329            break;
1330        }
1331           
1332        case BT_HAVE_ALL:
1333            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1334            tr_bitfieldAddRange( msgs->info->have, 0, msgs->torrent->info.pieceCount );
1335            updatePeerProgress( msgs );
1336            maybeSendFastAllowedSet( msgs );
1337            break;
1338       
1339       
1340        case BT_HAVE_NONE:
1341            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1342            tr_bitfieldClear( msgs->info->have );
1343            updatePeerProgress( msgs );
1344            maybeSendFastAllowedSet( msgs );
1345            break;
1346       
1347        case BT_REJECT: {
1348            struct peer_request r;
1349            dbgmsg( msgs, "Got a BT_REJECT" );
1350            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1351            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1352            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1353            reqListRemove( &msgs->clientAskedFor, &r );
1354            break;
1355        }
1356
1357        case BT_ALLOWED_FAST: {
1358            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1359            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1360            /* we don't do anything with this yet */
1361            break;
1362        }
1363
1364        case BT_LTEP:
1365            dbgmsg( msgs, "Got a BT_LTEP" );
1366            parseLtep( msgs, msglen, inbuf );
1367            break;
1368
1369        default:
1370            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1371            tr_peerIoDrain( msgs->io, inbuf, msglen );
1372            break;
1373    }
1374
1375    assert( msglen + 1 == msgs->incoming.length );
1376    assert( EVBUFFER_LENGTH(inbuf) == startBufLen - msglen );
1377
1378    msgs->state = AWAITING_BT_LENGTH;
1379    return READ_AGAIN;
1380}
1381
1382static void
1383peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1384{
1385    const time_t now = time( NULL );
1386    tr_torrent * tor = msgs->torrent;
1387    tor->activityDate = tr_date( );
1388    tor->uploadedCur += byteCount;
1389    msgs->info->pieceDataActivityDate = now;
1390    msgs->info->credit -= byteCount;
1391    tr_rcTransferred( msgs->info->rcToPeer, byteCount );
1392    tr_rcTransferred( tor->upload, byteCount );
1393    tr_rcTransferred( tor->handle->upload, byteCount );
1394    tr_statsAddUploaded( msgs->handle, byteCount );
1395    firePieceData( msgs );
1396}
1397
1398static size_t
1399getDownloadMax( const tr_peermsgs * msgs )
1400{
1401    static const size_t maxval = ~0;
1402    const tr_torrent * tor = msgs->torrent;
1403
1404    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1405        return tor->handle->useDownloadLimit
1406            ? tr_rcBytesLeft( tor->handle->download ) : maxval;
1407
1408    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
1409        return tr_rcBytesLeft( tor->download );
1410
1411    return maxval;
1412}
1413
1414static void
1415decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount )
1416{
1417    tr_torrent * tor = msgs->torrent;
1418    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1419}
1420
1421static void
1422reassignBytesToCorrupt( tr_peermsgs * msgs, uint32_t byteCount )
1423{
1424    tr_torrent * tor = msgs->torrent;
1425
1426    tor->corruptCur += byteCount;
1427
1428    decrementDownloadedCount( msgs, byteCount );
1429}
1430
1431static void
1432gotBadPiece( tr_peermsgs * msgs, tr_piece_index_t pieceIndex )
1433{
1434    const uint32_t byteCount =
1435        tr_torPieceCountBytes( msgs->torrent, pieceIndex );
1436    reassignBytesToCorrupt( msgs, byteCount );
1437}
1438
1439static void
1440clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1441{
1442    decrementDownloadedCount( msgs, req->length );
1443}
1444
1445static void
1446addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1447{
1448    if( !msgs->info->blame )
1449         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1450    tr_bitfieldAdd( msgs->info->blame, index );
1451}
1452
1453static tr_errno
1454clientGotBlock( tr_peermsgs                * msgs,
1455                const uint8_t              * data,
1456                const struct peer_request  * req )
1457{
1458    int err;
1459    tr_torrent * tor = msgs->torrent;
1460    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1461
1462    assert( msgs != NULL );
1463    assert( req != NULL );
1464
1465    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) )
1466    {
1467        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1468                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1469        return TR_ERROR;
1470    }
1471
1472    /* save the block */
1473    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1474
1475    /**
1476    *** Remove the block from our `we asked for this' list
1477    **/
1478
1479    if( reqListRemove( &msgs->clientAskedFor, req ) )
1480    {
1481        clientGotUnwantedBlock( msgs, req );
1482        dbgmsg( msgs, "we didn't ask for this message..." );
1483        return 0;
1484    }
1485
1486    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1487                  msgs->clientAskedFor.count );
1488
1489    /**
1490    *** Error checks
1491    **/
1492
1493    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1494        dbgmsg( msgs, "we have this block already..." );
1495        clientGotUnwantedBlock( msgs, req );
1496        return 0;
1497    }
1498
1499    /**
1500    ***  Save the block
1501    **/
1502
1503    msgs->info->peerSentPieceDataAt = time( NULL );
1504    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1505        return err;
1506
1507    tr_cpBlockAdd( tor->completion, block );
1508
1509    addPeerToBlamefield( msgs, req->index );
1510
1511    fireGotBlock( msgs, req );
1512
1513    /**
1514    ***  Handle if this was the last block in the piece
1515    **/
1516
1517    if( tr_cpPieceIsComplete( tor->completion, req->index ) )
1518    {
1519        const tr_errno err = tr_ioTestPiece( tor, req->index );
1520
1521        if( err )
1522            tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test: %s" ),
1523                       (unsigned long)req->index,
1524                       tr_errorString( err ) );
1525
1526        tr_torrentSetHasPiece( tor, req->index, !err );
1527        tr_torrentSetPieceChecked( tor, req->index, TRUE );
1528        tr_peerMgrSetBlame( tor->handle->peerMgr, tor->info.hash, req->index, !err );
1529
1530        if( !err )
1531            fireClientHave( msgs, req->index );
1532        else
1533            gotBadPiece( msgs, req->index );
1534    }
1535
1536    return 0;
1537}
1538
1539static void
1540didWrite( struct bufferevent * evin UNUSED, void * vmsgs )
1541{
1542    pulse( vmsgs );
1543}
1544
1545static ReadState
1546canRead( struct bufferevent * evin, void * vmsgs )
1547{
1548    ReadState ret;
1549    tr_peermsgs * msgs = vmsgs;
1550    struct evbuffer * in = EVBUFFER_INPUT ( evin );
1551    const size_t inlen = EVBUFFER_LENGTH( in );
1552
1553    if( !inlen )
1554    {
1555        ret = READ_DONE;
1556    }
1557    else if( msgs->state == AWAITING_BT_PIECE )
1558    {
1559        const size_t downloadMax = getDownloadMax( msgs );
1560        const size_t n = MIN( inlen, downloadMax );
1561        ret = n ? readBtPiece( msgs, in, n ) : READ_DONE;
1562    }
1563    else switch( msgs->state )
1564    {
1565        case AWAITING_BT_LENGTH:  ret = readBtLength ( msgs, in, inlen ); break;
1566        case AWAITING_BT_ID:      ret = readBtId     ( msgs, in, inlen ); break;
1567        case AWAITING_BT_MESSAGE: ret = readBtMessage( msgs, in, inlen ); break;
1568        default:                  assert( 0 );
1569    }
1570
1571    return ret;
1572}
1573
1574static void
1575sendKeepalive( tr_peermsgs * msgs )
1576{
1577    dbgmsg( msgs, "sending a keepalive message" );
1578    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1579}
1580
1581/**
1582***
1583**/
1584
1585static int
1586isSwiftEnabled( const tr_peermsgs * msgs )
1587{
1588    /* rationale: SWIFT is good for getting rid of deadbeats, but most
1589     * private trackers have ratios where you _want_ to feed deadbeats
1590     * as much as possible.  So we disable SWIFT on private torrents */
1591    return SWIFT_ENABLED
1592        && !tr_torrentIsSeed( msgs->torrent )
1593        && !tr_torrentIsPrivate( msgs->torrent );
1594}
1595
1596static size_t
1597getUploadMax( const tr_peermsgs * msgs )
1598{
1599    static const size_t maxval = ~0;
1600    const tr_torrent * tor = msgs->torrent;
1601    const int useSwift = isSwiftEnabled( msgs );
1602    const size_t swiftLeft = msgs->info->credit;
1603    int speedLeft;
1604    int bufLeft;
1605    size_t ret;
1606
1607    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1608        speedLeft = tor->handle->useUploadLimit ? tr_rcBytesLeft( tor->handle->upload ) : maxval;
1609    else if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
1610        speedLeft = tr_rcBytesLeft( tor->upload );
1611    else
1612        speedLeft = INT_MAX;
1613
1614    /* this basically says the outbuf shouldn't have more than one block
1615     * queued up in it... blocksize, +13 for the size of the BT protocol's
1616     * block message overhead */
1617    bufLeft = tor->blockSize + 13 - tr_peerIoWriteBytesWaiting( msgs->io );
1618    ret = MIN( speedLeft, bufLeft );
1619    if( useSwift)
1620        ret = MIN( ret, swiftLeft );
1621    return ret;
1622}
1623
1624static int
1625ratePulse( void * vmsgs )
1626{
1627    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1628    msgs->info->rateToClient = tr_rcRate( msgs->info->rcToClient );
1629    msgs->info->rateToPeer = tr_rcRate( msgs->info->rcToPeer );
1630    msgs->maxActiveRequests = MIN( 4 + (int)(msgs->info->rateToClient/4), MAX_QUEUE_SIZE );
1631    msgs->minActiveRequests = msgs->maxActiveRequests / 3;
1632    return TRUE;
1633}
1634
1635static tr_errno
1636popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
1637{
1638    if( !reqListPop( &msgs->peerAskedForFast, setme ) )
1639        return 0;
1640    if( !reqListPop( &msgs->peerAskedFor, setme ) )
1641        return 0;
1642
1643    return TR_ERROR;
1644}
1645
1646static int
1647pulse( void * vmsgs )
1648{
1649    const time_t now = time( NULL );
1650    tr_peermsgs * msgs = vmsgs;
1651
1652    tr_peerIoTryRead( msgs->io );
1653    pumpRequestQueue( msgs );
1654    expireOldRequests( msgs );
1655
1656    if( msgs->sendingBlock )
1657    {
1658        const size_t uploadMax = getUploadMax( msgs );
1659        size_t len = EVBUFFER_LENGTH( msgs->outBlock );
1660        const size_t outlen = MIN( len, uploadMax );
1661
1662        assert( len );
1663
1664        if( outlen )
1665        {
1666            tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen );
1667            evbuffer_drain( msgs->outBlock, outlen );
1668            peerGotBytes( msgs, outlen );
1669
1670            len -= outlen;
1671            msgs->clientSentAnythingAt = now;
1672            msgs->sendingBlock = len!=0;
1673
1674            dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen, (int)len );
1675        }
1676        else dbgmsg( msgs, "stalled writing block... uploadMax %lu, outlen %lu", uploadMax, outlen );
1677    }
1678
1679    if( !msgs->sendingBlock )
1680    {
1681        struct peer_request req;
1682
1683        if(( EVBUFFER_LENGTH( msgs->outMessages ) ))
1684        {
1685            dbgmsg( msgs, "flushing outMessages..." );
1686            tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1687            msgs->clientSentAnythingAt = now;
1688        }
1689        else if( !EVBUFFER_LENGTH( msgs->outBlock )
1690            && !popNextRequest( msgs, &req )
1691            && requestIsValid( msgs, &req )
1692            && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) )
1693        {
1694            uint8_t * buf = tr_new( uint8_t, req.length );
1695
1696            if( !tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf ) )
1697            {
1698                tr_peerIo * io = msgs->io;
1699                struct evbuffer * out = msgs->outBlock;
1700
1701                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1702                tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 2*sizeof(uint32_t) + req.length );
1703                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1704                tr_peerIoWriteUint32( io, out, req.index );
1705                tr_peerIoWriteUint32( io, out, req.offset );
1706                tr_peerIoWriteBytes ( io, out, buf, req.length );
1707                msgs->sendingBlock = 1;
1708            }
1709
1710            tr_free( buf );
1711        }
1712        else if( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
1713        {
1714            sendKeepalive( msgs );
1715        }
1716    }
1717
1718    return TRUE; /* loop forever */
1719}
1720
1721static void
1722gotError( struct bufferevent * evbuf UNUSED, short what, void * vmsgs )
1723{
1724    if( what & EVBUFFER_TIMEOUT )
1725        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1726    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1727        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1728                what, errno, tr_strerror(errno) );
1729    fireError( vmsgs, TR_ERROR );
1730}
1731
1732static void
1733sendBitfield( tr_peermsgs * msgs )
1734{
1735    const tr_bitfield * bitfield = tr_cpPieceBitfield( msgs->torrent->completion );
1736    struct evbuffer * out = msgs->outMessages;
1737
1738    dbgmsg( msgs, "sending peer a bitfield message" );
1739    tr_peerIoWriteUint32( msgs->io, out, sizeof(uint8_t) + bitfield->len );
1740    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1741    tr_peerIoWriteBytes ( msgs->io, out, bitfield->bits, bitfield->len );
1742}
1743
1744/**
1745***
1746**/
1747
1748/* some peers give us error messages if we send
1749   more than this many peers in a single pex message
1750   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1751#define MAX_PEX_ADDED 50
1752#define MAX_PEX_DROPPED 50
1753
1754typedef struct
1755{
1756    tr_pex * added;
1757    tr_pex * dropped;
1758    tr_pex * elements;
1759    int addedCount;
1760    int droppedCount;
1761    int elementCount;
1762}
1763PexDiffs;
1764
1765static void
1766pexAddedCb( void * vpex, void * userData )
1767{
1768    PexDiffs * diffs = userData;
1769    tr_pex * pex = vpex;
1770    if( diffs->addedCount < MAX_PEX_ADDED )
1771    {
1772        diffs->added[diffs->addedCount++] = *pex;
1773        diffs->elements[diffs->elementCount++] = *pex;
1774    }
1775}
1776
1777static void
1778pexDroppedCb( void * vpex, void * userData )
1779{
1780    PexDiffs * diffs = userData;
1781    tr_pex * pex = vpex;
1782    if( diffs->droppedCount < MAX_PEX_DROPPED )
1783    {
1784        diffs->dropped[diffs->droppedCount++] = *pex;
1785    }
1786}
1787
1788static void
1789pexElementCb( void * vpex, void * userData )
1790{
1791    PexDiffs * diffs = userData;
1792    tr_pex * pex = vpex;
1793    diffs->elements[diffs->elementCount++] = *pex;
1794}
1795
1796static void
1797sendPex( tr_peermsgs * msgs )
1798{
1799    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
1800    {
1801        int i;
1802        tr_pex * newPex = NULL;
1803        const int newCount = tr_peerMgrGetPeers( msgs->handle->peerMgr, msgs->torrent->info.hash, &newPex );
1804        PexDiffs diffs;
1805        tr_benc val, *added, *dropped;
1806        uint8_t *tmp, *walk;
1807        char * benc;
1808        int bencLen;
1809
1810        /* build the diffs */
1811        diffs.added = tr_new( tr_pex, newCount );
1812        diffs.addedCount = 0;
1813        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1814        diffs.droppedCount = 0;
1815        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1816        diffs.elementCount = 0;
1817        tr_set_compare( msgs->pex, msgs->pexCount,
1818                        newPex, newCount,
1819                        tr_pexCompare, sizeof(tr_pex),
1820                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
1821        dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1822
1823        /* update peer */
1824        tr_free( msgs->pex );
1825        msgs->pex = diffs.elements;
1826        msgs->pexCount = diffs.elementCount;
1827
1828        /* build the pex payload */
1829        tr_bencInitDict( &val, 3 );
1830
1831        /* "added" */
1832        added = tr_bencDictAdd( &val, "added" );
1833        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1834        for( i=0; i<diffs.addedCount; ++i ) {
1835            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1836            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1837        }
1838        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1839        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
1840
1841        /* "added.f" */
1842        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1843        for( i=0; i<diffs.addedCount; ++i )
1844            *walk++ = diffs.added[i].flags;
1845        assert( ( walk - tmp ) == diffs.addedCount );
1846        tr_bencDictAddRaw( &val, "added.f", tmp, walk-tmp );
1847
1848        /* "dropped" */
1849        dropped = tr_bencDictAdd( &val, "dropped" );
1850        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1851        for( i=0; i<diffs.droppedCount; ++i ) {
1852            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1853            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1854        }
1855        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1856        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
1857
1858        /* write the pex message */
1859        benc = tr_bencSave( &val, &bencLen );
1860        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 2*sizeof(uint8_t) + bencLen );
1861        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_LTEP );
1862        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, msgs->ut_pex_id );
1863        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, benc, bencLen );
1864
1865        /* cleanup */
1866        tr_free( benc );
1867        tr_bencFree( &val );
1868        tr_free( diffs.added );
1869        tr_free( diffs.dropped );
1870        tr_free( newPex );
1871
1872        msgs->clientSentPexAt = time( NULL );
1873    }
1874}
1875
1876static int
1877pexPulse( void * vpeer )
1878{
1879    sendPex( vpeer );
1880    return TRUE;
1881}
1882
1883/**
1884***
1885**/
1886
1887tr_peermsgs*
1888tr_peerMsgsNew( struct tr_torrent * torrent,
1889                struct tr_peer    * info,
1890                tr_delivery_func    func,
1891                void              * userData,
1892                tr_publisher_tag  * setme )
1893{
1894    tr_peermsgs * m;
1895
1896    assert( info != NULL );
1897    assert( info->io != NULL );
1898
1899    m = tr_new0( tr_peermsgs, 1 );
1900    m->publisher = tr_publisherNew( );
1901    m->info = info;
1902    m->handle = torrent->handle;
1903    m->torrent = torrent;
1904    m->io = info->io;
1905    m->info->clientIsChoked = 1;
1906    m->info->peerIsChoked = 1;
1907    m->info->clientIsInterested = 0;
1908    m->info->peerIsInterested = 0;
1909    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1910    m->state = AWAITING_BT_LENGTH;
1911    m->pulseTimer = tr_timerNew( m->handle, pulse, m, PEER_PULSE_INTERVAL );
1912    m->rateTimer = tr_timerNew( m->handle, ratePulse, m, RATE_PULSE_INTERVAL );
1913    m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL );
1914    m->outMessages = evbuffer_new( );
1915    m->incoming.block = evbuffer_new( );
1916    m->outBlock = evbuffer_new( );
1917    m->peerAllowedPieces = NULL;
1918    m->peerAskedFor = REQUEST_LIST_INIT;
1919    m->peerAskedForFast = REQUEST_LIST_INIT;
1920    m->clientAskedFor = REQUEST_LIST_INIT;
1921    m->clientWillAskFor = REQUEST_LIST_INIT;
1922    *setme = tr_publisherSubscribe( m->publisher, func, userData );
1923
1924    if ( tr_peerIoSupportsLTEP( m->io ) )
1925        sendLtepHandshake( m );
1926
1927    sendBitfield( m );
1928   
1929    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of inactivity */
1930    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
1931    ratePulse( m );
1932
1933    return m;
1934}
1935
1936void
1937tr_peerMsgsFree( tr_peermsgs* msgs )
1938{
1939    if( msgs != NULL )
1940    {
1941        tr_timerFree( &msgs->pulseTimer );
1942        tr_timerFree( &msgs->rateTimer );
1943        tr_timerFree( &msgs->pexTimer );
1944        tr_publisherFree( &msgs->publisher );
1945        reqListClear( &msgs->clientWillAskFor );
1946        reqListClear( &msgs->clientAskedFor );
1947        reqListClear( &msgs->peerAskedForFast );
1948        reqListClear( &msgs->peerAskedFor );
1949        tr_bitfieldFree( msgs->peerAllowedPieces );
1950        evbuffer_free( msgs->incoming.block );
1951        evbuffer_free( msgs->outMessages );
1952        evbuffer_free( msgs->outBlock );
1953        tr_free( msgs->pex );
1954
1955        memset( msgs, ~0, sizeof( tr_peermsgs ) );
1956        tr_free( msgs );
1957    }
1958}
1959
1960tr_publisher_tag
1961tr_peerMsgsSubscribe( tr_peermsgs       * peer,
1962                      tr_delivery_func    func,
1963                      void              * userData )
1964{
1965    return tr_publisherSubscribe( peer->publisher, func, userData );
1966}
1967
1968void
1969tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
1970                        tr_publisher_tag    tag )
1971{
1972    tr_publisherUnsubscribe( peer->publisher, tag );
1973}
Note: See TracBrowser for help on using the repository browser.