source: trunk/libtransmission/peer-msgs.c @ 5913

Last change on this file since 5913 was 5913, checked in by charles, 14 years ago

sine we now have two public ports (peer and rpc), rename "publicPort" as "peerPort"

  • Property svn:keywords set to Date Rev Author Id
File size: 55.3 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 5913 2008-05-23 16:18:58Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19#include <libgen.h> /* basename */
20
21#include <event.h>
22
23#include "transmission.h"
24#include "bencode.h"
25#include "completion.h"
26#include "inout.h"
27#include "peer-io.h"
28#include "peer-mgr.h"
29#include "peer-mgr-private.h"
30#include "peer-msgs.h"
31#include "ratecontrol.h"
32#include "stats.h"
33#include "torrent.h"
34#include "trevent.h"
35#include "utils.h"
36
37/**
38***
39**/
40
41enum
42{
43    BT_CHOKE                = 0,
44    BT_UNCHOKE              = 1,
45    BT_INTERESTED           = 2,
46    BT_NOT_INTERESTED       = 3,
47    BT_HAVE                 = 4,
48    BT_BITFIELD             = 5,
49    BT_REQUEST              = 6,
50    BT_PIECE                = 7,
51    BT_CANCEL               = 8,
52    BT_PORT                 = 9,
53    BT_SUGGEST              = 13,
54    BT_HAVE_ALL             = 14,
55    BT_HAVE_NONE            = 15,
56    BT_REJECT               = 16,
57    BT_ALLOWED_FAST         = 17,
58    BT_LTEP                 = 20,
59
60    LTEP_HANDSHAKE          = 0,
61
62    TR_LTEP_PEX             = 1,
63
64    MIN_CHOKE_PERIOD_SEC    = (10),
65
66    /* idle seconds before we send a keepalive */
67    KEEPALIVE_INTERVAL_SECS = 90,
68
69    PEX_INTERVAL            = (90 * 1000), /* msec between sendPex() calls */
70    PEER_PULSE_INTERVAL     = (50),        /* msec between pulse() calls */
71    RATE_PULSE_INTERVAL     = (250),       /* msec between ratePulse() calls */
72
73    MAX_QUEUE_SIZE          = (100),
74     
75    /* (fast peers) max number of pieces we fast-allow to another peer */
76    MAX_FAST_ALLOWED_COUNT   = 10,
77
78    /* (fast peers) max threshold for allowing fast-pieces requests */
79    MAX_FAST_ALLOWED_THRESHOLD = 10,
80
81    /* how long an unsent request can stay queued before it's returned
82       back to the peer-mgr's pool of requests */
83    QUEUED_REQUEST_TTL_SECS = 20,
84
85    /* how long a sent request can stay queued before it's returned
86       back to the peer-mgr's pool of requests */
87    SENT_REQUEST_TTL_SECS = 120
88};
89
90/**
91***  REQUEST MANAGEMENT
92**/
93
94enum
95{
96    AWAITING_BT_LENGTH,
97    AWAITING_BT_ID,
98    AWAITING_BT_MESSAGE,
99    AWAITING_BT_PIECE
100};
101
102struct peer_request
103{
104    uint32_t index;
105    uint32_t offset;
106    uint32_t length;
107    time_t time_requested;
108};
109
110static int
111compareRequest( const void * va, const void * vb )
112{
113    int i;
114    const struct peer_request * a = va;
115    const struct peer_request * b = vb;
116    if(( i = tr_compareUint32( a->index, b->index ))) return i;
117    if(( i = tr_compareUint32( a->offset, b->offset ))) return i;
118    if(( i = tr_compareUint32( a->length, b->length ))) return i;
119    return 0;
120}
121
122struct request_list
123{
124    uint16_t count;
125    uint16_t max;
126    struct peer_request * requests;
127};
128
129static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
130
131static void
132reqListReserve( struct request_list * list, uint16_t max )
133{
134    if( list->max < max )
135    {
136        list->max = max;
137        list->requests = tr_renew( struct peer_request,
138                                   list->requests,
139                                   list->max );
140    }
141}
142
143static void
144reqListClear( struct request_list * list )
145{
146    tr_free( list->requests );
147    *list = REQUEST_LIST_INIT;
148}
149
150static void
151reqListCopy( struct request_list * dest, const struct request_list * src )
152{
153    dest->count = src->count;
154    dest->max = src->max;
155    dest->requests = tr_new( struct peer_request, dest->max );
156    memcpy( dest->requests, src->requests, sizeof( struct peer_request ) * dest->count );
157}
158
159static void
160reqListRemoveOne( struct request_list * list, int i )
161{
162    assert( 0<=i && i<list->count );
163
164    memmove( &list->requests[i],
165             &list->requests[i+1],
166             sizeof( struct peer_request ) * ( --list->count - i ) );
167}
168
169static void
170reqListAppend( struct request_list * list, const struct peer_request * req )
171{
172    if( ++list->count >= list->max )
173        reqListReserve( list, list->max + 8 );
174
175    list->requests[list->count-1] = *req;
176}
177
178static tr_errno
179reqListPop( struct request_list * list, struct peer_request * setme )
180{
181    tr_errno err;
182
183    if( !list->count )
184        err = TR_ERROR;
185    else {
186        *setme = list->requests[0];
187        reqListRemoveOne( list, 0 );
188        err = TR_OK;
189    }
190
191    return err;
192}
193
194static int
195reqListFind( struct request_list * list, const struct peer_request * key )
196{
197    uint16_t i;
198    for( i=0; i<list->count; ++i )
199        if( !compareRequest( key, list->requests+i ) )
200            return i;
201    return -1;
202}
203
204static tr_errno
205reqListRemove( struct request_list * list, const struct peer_request * key )
206{
207    tr_errno err;
208    const int i = reqListFind( list, key );
209
210    if( i < 0 )
211        err = TR_ERROR;
212    else {
213        err = TR_OK;
214        reqListRemoveOne( list, i );
215    }
216
217    return err;
218}
219
220/**
221***
222**/
223
224/* this is raw, unchanged data from the peer regarding
225 * the current message that it's sending us. */
226struct tr_incoming
227{
228    uint8_t id;
229    uint32_t length; /* includes the +1 for id length */
230    struct peer_request blockReq; /* metadata for incoming blocks */
231    struct evbuffer * block; /* piece data for incoming blocks */
232};
233
234struct tr_peermsgs
235{
236    unsigned int peerSentBitfield         : 1;
237    unsigned int peerSupportsPex          : 1;
238    unsigned int clientSentLtepHandshake  : 1;
239    unsigned int peerSentLtepHandshake    : 1;
240    unsigned int sendingBlock             : 1;
241   
242    uint8_t state;
243    uint8_t ut_pex_id;
244    uint16_t pexCount;
245    uint16_t maxActiveRequests;
246    uint16_t minActiveRequests;
247
248    tr_peer * info;
249
250    tr_handle * handle;
251    tr_torrent * torrent;
252    tr_peerIo * io;
253
254    tr_publisher_t * publisher;
255
256    struct evbuffer * outBlock;    /* buffer of the current piece message */
257    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
258
259    struct request_list peerAskedFor;
260    struct request_list peerAskedForFast;
261    struct request_list clientAskedFor;
262    struct request_list clientWillAskFor;
263
264    tr_timer * rateTimer;
265    tr_timer * pulseTimer;
266    tr_timer * pexTimer;
267
268    time_t clientSentPexAt;
269    time_t clientSentAnythingAt;
270   
271    tr_bitfield * peerAllowedPieces;
272
273    struct tr_incoming incoming; 
274
275    tr_pex * pex;
276};
277
278/**
279***
280**/
281
282static void
283myDebug( const char * file, int line,
284         const struct tr_peermsgs * msgs,
285         const char * fmt, ... )
286{
287    FILE * fp = tr_getLog( );
288    if( fp != NULL )
289    {
290        va_list args;
291        char timestr[64];
292        struct evbuffer * buf = evbuffer_new( );
293        char * myfile = tr_strdup( file );
294
295        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
296                             tr_getLogTimeStr( timestr, sizeof(timestr) ),
297                             msgs->torrent->info.name,
298                             tr_peerIoGetAddrStr( msgs->io ),
299                             msgs->info->client );
300        va_start( args, fmt );
301        evbuffer_add_vprintf( buf, fmt, args );
302        va_end( args );
303        evbuffer_add_printf( buf, " (%s:%d)\n", basename(myfile), line );
304        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
305
306        tr_free( myfile );
307        evbuffer_free( buf );
308    }
309}
310
311#define dbgmsg(msgs, fmt...) myDebug(__FILE__, __LINE__, msgs, ##fmt )
312
313/**
314***
315**/
316
317static void
318protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
319{
320    tr_peerIo * io = msgs->io;
321    struct evbuffer * out = msgs->outMessages;
322
323    dbgmsg( msgs, "requesting %u:%u->%u", req->index, req->offset, req->length );
324    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
325    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
326    tr_peerIoWriteUint32( io, out, req->index );
327    tr_peerIoWriteUint32( io, out, req->offset );
328    tr_peerIoWriteUint32( io, out, req->length );
329}
330
331static void
332protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
333{
334    tr_peerIo * io = msgs->io;
335    struct evbuffer * out = msgs->outMessages;
336
337    dbgmsg( msgs, "cancelling %u:%u->%u", req->index, req->offset, req->length );
338    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
339    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
340    tr_peerIoWriteUint32( io, out, req->index );
341    tr_peerIoWriteUint32( io, out, req->offset );
342    tr_peerIoWriteUint32( io, out, req->length );
343}
344
345static void
346protocolSendHave( tr_peermsgs * msgs, uint32_t index )
347{
348    tr_peerIo * io = msgs->io;
349    struct evbuffer * out = msgs->outMessages;
350
351    dbgmsg( msgs, "sending Have %u", index );
352    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
353    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
354    tr_peerIoWriteUint32( io, out, index );
355}
356
357static void
358protocolSendChoke( tr_peermsgs * msgs, int choke )
359{
360    tr_peerIo * io = msgs->io;
361    struct evbuffer * out = msgs->outMessages;
362
363    dbgmsg( msgs, "sending %s", (choke ? "Choke" : "Unchoke") );
364    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) );
365    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
366}
367
368/**
369***  EVENTS
370**/
371
372static const tr_peermsgs_event blankEvent = { 0, 0, 0, 0, 0.0f, 0 };
373
374static void
375publish( tr_peermsgs * msgs, tr_peermsgs_event * e )
376{
377    tr_publisherPublish( msgs->publisher, msgs->info, e );
378}
379
380static void
381fireError( tr_peermsgs * msgs, tr_errno err )
382{
383    tr_peermsgs_event e = blankEvent;
384    e.eventType = TR_PEERMSG_ERROR;
385    e.err = err;
386    publish( msgs, &e );
387}
388
389static void
390fireNeedReq( tr_peermsgs * msgs )
391{
392    tr_peermsgs_event e = blankEvent;
393    e.eventType = TR_PEERMSG_NEED_REQ;
394    publish( msgs, &e );
395}
396
397static void
398firePeerProgress( tr_peermsgs * msgs )
399{
400    tr_peermsgs_event e = blankEvent;
401    e.eventType = TR_PEERMSG_PEER_PROGRESS;
402    e.progress = msgs->info->progress;
403    publish( msgs, &e );
404}
405
406static void
407fireClientHave( tr_peermsgs * msgs, uint32_t pieceIndex )
408{
409    tr_peermsgs_event e = blankEvent;
410    e.eventType = TR_PEERMSG_CLIENT_HAVE;
411    e.pieceIndex = pieceIndex;
412    publish( msgs, &e );
413}
414
415static void
416fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
417{
418    tr_peermsgs_event e = blankEvent;
419    e.eventType = TR_PEERMSG_CLIENT_BLOCK;
420    e.pieceIndex = req->index;
421    e.offset = req->offset;
422    e.length = req->length;
423    publish( msgs, &e );
424}
425
426static void
427firePieceData( tr_peermsgs * msgs )
428{
429    tr_peermsgs_event e = blankEvent;
430    e.eventType = TR_PEERMSG_PIECE_DATA;
431    publish( msgs, &e );
432}
433
434static void
435fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
436{
437    tr_peermsgs_event e = blankEvent;
438    e.eventType = TR_PEERMSG_CANCEL;
439    e.pieceIndex = req->index;
440    e.offset = req->offset;
441    e.length = req->length;
442    publish( msgs, &e );
443}
444
445/**
446***  INTEREST
447**/
448
449static int
450isPieceInteresting( const tr_peermsgs   * peer,
451                    int                   piece )
452{
453    const tr_torrent * torrent = peer->torrent;
454
455    return ( ( !torrent->info.pieces[piece].dnd )               /* we want it */
456          && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have */
457          && ( tr_bitfieldHas( peer->info->have, piece ) ) );  /* peer has it */
458}
459
460/* "interested" means we'll ask for piece data if they unchoke us */
461static int
462isPeerInteresting( const tr_peermsgs * msgs )
463{
464    tr_piece_index_t i;
465    const tr_torrent * torrent;
466    const tr_bitfield * bitfield;
467    const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
468
469    if( clientIsSeed )
470        return FALSE;
471
472    torrent = msgs->torrent;
473    bitfield = tr_cpPieceBitfield( torrent->completion );
474
475    if( !msgs->info->have )
476        return TRUE;
477
478    assert( bitfield->len == msgs->info->have->len );
479    for( i=0; i<torrent->info.pieceCount; ++i )
480        if( isPieceInteresting( msgs, i ) )
481            return TRUE;
482
483    return FALSE;
484}
485
486static void
487sendInterest( tr_peermsgs * msgs, int weAreInterested )
488{
489    assert( msgs != NULL );
490    assert( weAreInterested==0 || weAreInterested==1 );
491
492    msgs->info->clientIsInterested = weAreInterested;
493    dbgmsg( msgs, "Sending %s",
494            weAreInterested ? "Interested" : "Not Interested");
495
496    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
497    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages,
498                   weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
499}
500
501static void
502updateInterest( tr_peermsgs * msgs )
503{
504    const int i = isPeerInteresting( msgs );
505    if( i != msgs->info->clientIsInterested )
506        sendInterest( msgs, i );
507    if( i )
508        fireNeedReq( msgs );
509}
510
511static void
512cancelAllRequestsToClientExceptFast( tr_peermsgs * msgs )
513{
514    reqListClear( &msgs->peerAskedFor );
515}
516
517void
518tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
519{
520    const time_t fibrillationTime = time(NULL) - MIN_CHOKE_PERIOD_SEC;
521
522    assert( msgs != NULL );
523    assert( msgs->info != NULL );
524    assert( choke==0 || choke==1 );
525
526    if( msgs->info->chokeChangedAt > fibrillationTime )
527    {
528        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
529    }
530    else if( msgs->info->peerIsChoked != choke )
531    {
532        msgs->info->peerIsChoked = choke;
533        if( choke )
534            cancelAllRequestsToClientExceptFast( msgs );
535        protocolSendChoke( msgs, choke );
536        msgs->info->chokeChangedAt = time( NULL );
537    }
538}
539
540/**
541***
542**/
543
544void
545tr_peerMsgsHave( tr_peermsgs * msgs,
546                 uint32_t      index )
547{
548    protocolSendHave( msgs, index );
549
550    /* since we have more pieces now, we might not be interested in this peer */
551    updateInterest( msgs );
552}
553#if 0
554static void
555sendFastSuggest( tr_peermsgs * msgs,
556                 uint32_t      pieceIndex )
557{
558    assert( msgs != NULL );
559   
560    if( tr_peerIoSupportsFEXT( msgs->io ) )
561    {
562        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
563        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
564        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
565    }
566}
567static void
568sendFastHave( tr_peermsgs * msgs, int all )
569{
570    assert( msgs != NULL );
571   
572    if( tr_peerIoSupportsFEXT( msgs->io ) )
573    {
574        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
575        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, ( all ? BT_HAVE_ALL
576                                                                : BT_HAVE_NONE ) );
577        updateInterest( msgs );
578    }
579}
580#endif
581
582static void
583sendFastReject( tr_peermsgs * msgs,
584                uint32_t      pieceIndex,
585                uint32_t      offset,
586                uint32_t      length )
587{
588    assert( msgs != NULL );
589
590    if( tr_peerIoSupportsFEXT( msgs->io ) )
591    {
592        const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
593        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
594        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_REJECT );
595        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
596        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
597        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
598    }
599}
600
601static tr_bitfield*
602getPeerAllowedPieces( tr_peermsgs * msgs )
603{
604    if( !msgs->peerAllowedPieces && tr_peerIoSupportsFEXT( msgs->io ) )
605    {
606        msgs->peerAllowedPieces = tr_peerMgrGenerateAllowedSet(
607            MAX_FAST_ALLOWED_COUNT,
608            msgs->torrent->info.pieceCount,
609            msgs->torrent->info.hash,
610            tr_peerIoGetAddress( msgs->io, NULL ) );
611    }
612
613    return msgs->peerAllowedPieces;
614}
615
616static void
617sendFastAllowed( tr_peermsgs * msgs,
618                 uint32_t      pieceIndex)
619{
620    assert( msgs != NULL );
621   
622    if( tr_peerIoSupportsFEXT( msgs->io ) )
623    {
624        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
625        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_ALLOWED_FAST );
626        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
627    }
628}
629
630static void
631sendFastAllowedSet( tr_peermsgs * msgs )
632{
633    tr_piece_index_t i = 0;
634
635    while (i <= msgs->torrent->info.pieceCount )
636    {
637        if ( tr_bitfieldHas( getPeerAllowedPieces( msgs ), i) )
638            sendFastAllowed( msgs, i );
639        i++;
640    }
641}
642
643static void
644maybeSendFastAllowedSet( tr_peermsgs * msgs )
645{
646    if( tr_bitfieldCountTrueBits( msgs->info->have ) <= MAX_FAST_ALLOWED_THRESHOLD )
647        sendFastAllowedSet( msgs );
648}
649
650
651/**
652***
653**/
654
655static int
656reqIsValid( const tr_peermsgs   * msgs,
657            uint32_t              index,
658            uint32_t              offset,
659            uint32_t              length )
660{
661    return tr_torrentReqIsValid( msgs->torrent, index, offset, length );
662}
663
664static int
665requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
666{
667    return reqIsValid( msgs, req->index, req->offset, req->length );
668}
669
670static void
671expireOldRequests( tr_peermsgs * msgs )
672{
673    int i;
674    time_t oldestAllowed;
675    struct request_list tmp = REQUEST_LIST_INIT;
676
677    /* cancel requests that have been queued for too long */
678    oldestAllowed = time( NULL ) - QUEUED_REQUEST_TTL_SECS;
679    reqListCopy( &tmp, &msgs->clientWillAskFor );
680    for( i=0; i<tmp.count; ++i ) {
681        const struct peer_request * req = &tmp.requests[i];
682        if( req->time_requested < oldestAllowed )
683            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
684    }
685    reqListClear( &tmp );
686
687    /* cancel requests that were sent too long ago */
688    oldestAllowed = time( NULL ) - SENT_REQUEST_TTL_SECS;
689    reqListCopy( &tmp, &msgs->clientAskedFor );
690    for( i=0; i<tmp.count; ++i ) {
691        const struct peer_request * req = &tmp.requests[i];
692        if( req->time_requested < oldestAllowed )
693            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
694    }
695    reqListClear( &tmp );
696}
697
698static void
699pumpRequestQueue( tr_peermsgs * msgs )
700{
701    const int max = msgs->maxActiveRequests;
702    const int min = msgs->minActiveRequests;
703    const time_t now = time( NULL );
704    int sent = 0;
705    int count = msgs->clientAskedFor.count;
706    struct peer_request req;
707
708    if( count > min )
709        return;
710    if( msgs->info->clientIsChoked )
711        return;
712
713    while( ( count < max ) && !reqListPop( &msgs->clientWillAskFor, &req ) )
714    {
715        assert( requestIsValid( msgs, &req ) );
716        assert( tr_bitfieldHas( msgs->info->have, req.index ) );
717
718        protocolSendRequest( msgs, &req );
719        req.time_requested = now;
720        reqListAppend( &msgs->clientAskedFor, &req );
721
722        ++count;
723        ++sent;
724    }
725
726    if( sent )
727        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
728                sent,
729                msgs->clientAskedFor.count,
730                msgs->clientWillAskFor.count );
731
732    if( count < max )
733        fireNeedReq( msgs );
734}
735
736static int
737pulse( void * vmsgs );
738
739int
740tr_peerMsgsAddRequest( tr_peermsgs * msgs,
741                       uint32_t      index, 
742                       uint32_t      offset, 
743                       uint32_t      length )
744{
745    const int req_max = msgs->maxActiveRequests;
746    struct peer_request req;
747
748    assert( msgs != NULL );
749    assert( msgs->torrent != NULL );
750    assert( reqIsValid( msgs, index, offset, length ) );
751
752    /**
753    ***  Reasons to decline the request
754    **/
755
756    /* don't send requests to choked clients */
757    if( msgs->info->clientIsChoked ) {
758        dbgmsg( msgs, "declining request because they're choking us" );
759        return TR_ADDREQ_CLIENT_CHOKED;
760    }
761
762    /* peer doesn't have this piece */
763    if( !tr_bitfieldHas( msgs->info->have, index ) )
764        return TR_ADDREQ_MISSING;
765
766    /* peer's queue is full */
767    if( msgs->clientWillAskFor.count >= req_max ) {
768        dbgmsg( msgs, "declining request because we're full" );
769        return TR_ADDREQ_FULL;
770    }
771
772    /* have we already asked for this piece? */
773    req.index = index;
774    req.offset = offset;
775    req.length = length;
776    if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
777        dbgmsg( msgs, "declining because it's a duplicate" );
778        return TR_ADDREQ_DUPLICATE;
779    }
780    if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
781        dbgmsg( msgs, "declining because it's a duplicate" );
782        return TR_ADDREQ_DUPLICATE;
783    }
784
785    /**
786    ***  Accept this request
787    **/
788
789    dbgmsg( msgs, "added req for piece %d, offset %d", (int)index, (int)offset );
790    req.time_requested = time( NULL );
791    reqListAppend( &msgs->clientWillAskFor, &req );
792    return TR_ADDREQ_OK;
793}
794
795static void
796cancelAllRequestsToPeer( tr_peermsgs * msgs )
797{
798    int i;
799    struct request_list a = msgs->clientWillAskFor;
800    struct request_list b = msgs->clientAskedFor;
801
802    msgs->clientAskedFor = REQUEST_LIST_INIT;
803    msgs->clientWillAskFor = REQUEST_LIST_INIT;
804
805    for( i=0; i<a.count; ++i )
806        fireCancelledReq( msgs, &a.requests[i] );
807
808    for( i=0; i<b.count; ++i ) {
809        fireCancelledReq( msgs, &b.requests[i] );
810        protocolSendCancel( msgs, &b.requests[i] );
811    }
812
813    reqListClear( &a );
814    reqListClear( &b );
815}
816
817void
818tr_peerMsgsCancel( tr_peermsgs * msgs,
819                   uint32_t      pieceIndex,
820                   uint32_t      offset,
821                   uint32_t      length )
822{
823    struct peer_request req;
824
825    assert( msgs != NULL );
826    assert( length > 0 );
827
828    /* have we asked the peer for this piece? */
829    req.index = pieceIndex;
830    req.offset = offset;
831    req.length = length;
832
833    /* if it's only in the queue and hasn't been sent yet, free it */
834    if( !reqListRemove( &msgs->clientWillAskFor, &req ) )
835        fireCancelledReq( msgs, &req );
836
837    /* if it's already been sent, send a cancel message too */
838    if( !reqListRemove( &msgs->clientAskedFor, &req ) ) {
839        protocolSendCancel( msgs, &req );
840        fireCancelledReq( msgs, &req );
841    }
842}
843
844/**
845***
846**/
847
848static void
849sendLtepHandshake( tr_peermsgs * msgs )
850{
851    tr_benc val, *m;
852    char * buf;
853    int len;
854    int pex;
855    struct evbuffer * outbuf;
856
857    if( msgs->clientSentLtepHandshake )
858        return;
859
860    outbuf = evbuffer_new( );
861    dbgmsg( msgs, "sending an ltep handshake" );
862    msgs->clientSentLtepHandshake = 1;
863
864    /* decide if we want to advertise pex support */
865    if( !tr_torrentAllowsPex( msgs->torrent ) )
866        pex = 0;
867    else if( msgs->peerSentLtepHandshake )
868        pex = msgs->peerSupportsPex ? 1 : 0;
869    else
870        pex = 1;
871
872    tr_bencInitDict( &val, 4 );
873    tr_bencDictAddInt( &val, "e", msgs->handle->encryptionMode != TR_PLAINTEXT_PREFERRED );
874    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->handle ) );
875    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
876    m  = tr_bencDictAddDict( &val, "m", 1 );
877    if( pex )
878        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
879    buf = tr_bencSave( &val, &len );
880
881    tr_peerIoWriteUint32( msgs->io, outbuf, 2*sizeof(uint8_t) + len );
882    tr_peerIoWriteUint8 ( msgs->io, outbuf, BT_LTEP );
883    tr_peerIoWriteUint8 ( msgs->io, outbuf, LTEP_HANDSHAKE );
884    tr_peerIoWriteBytes ( msgs->io, outbuf, buf, len );
885
886    tr_peerIoWriteBuf( msgs->io, outbuf );
887
888    dbgmsg( msgs, "here is the ltep handshake we sent [%*.*s]", len, len, buf );
889
890    /* cleanup */
891    tr_bencFree( &val );
892    tr_free( buf );
893    evbuffer_free( outbuf );
894}
895
896static void
897parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
898{
899    tr_benc val, * sub;
900    uint8_t * tmp = tr_new( uint8_t, len );
901
902    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
903    msgs->peerSentLtepHandshake = 1;
904
905    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
906        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
907        tr_free( tmp );
908        return;
909    }
910
911    dbgmsg( msgs, "here is the ltep handshake we got [%*.*s]", len, len, tmp );
912
913    /* does the peer prefer encrypted connections? */
914    if(( sub = tr_bencDictFindType( &val, "e", TYPE_INT )))
915        msgs->info->encryption_preference = sub->val.i
916                                      ? ENCRYPTION_PREFERENCE_YES
917                                      : ENCRYPTION_PREFERENCE_NO;
918
919    /* check supported messages for utorrent pex */
920    msgs->peerSupportsPex = 0;
921    if(( sub = tr_bencDictFindType( &val, "m", TYPE_DICT ))) {
922        if(( sub = tr_bencDictFindType( sub, "ut_pex", TYPE_INT ))) {
923            msgs->ut_pex_id = (uint8_t) sub->val.i;
924            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
925            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
926        }
927    }
928
929    /* get peer's listening port */
930    if(( sub = tr_bencDictFindType( &val, "p", TYPE_INT ))) {
931        msgs->info->port = htons( (uint16_t)sub->val.i );
932        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
933    }
934
935    tr_bencFree( &val );
936    tr_free( tmp );
937}
938
939static void
940parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
941{
942    int loaded = 0;
943    uint8_t * tmp = tr_new( uint8_t, msglen );
944    tr_benc val, *added;
945    const tr_torrent * tor = msgs->torrent;
946    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
947
948    if( tr_torrentAllowsPex( tor )
949        && (( loaded = !tr_bencLoad( tmp, msglen, &val, NULL )))
950        && (( added = tr_bencDictFindType( &val, "added", TYPE_STR ))))
951    {
952        const char * added_f = NULL;
953        tr_pex * pex;
954        size_t i, n;
955        tr_bencDictFindStr( &val, "added.f", &added_f );
956        pex = tr_peerMgrCompactToPex( added->val.s.s, added->val.s.i, added_f, &n );
957        for( i=0; i<n; ++i )
958            tr_peerMgrAddPex( msgs->handle->peerMgr, tor->info.hash,
959                              TR_PEER_FROM_PEX, pex+i );
960        tr_free( pex );
961    }
962
963    if( loaded )
964        tr_bencFree( &val );
965    tr_free( tmp );
966}
967
968static void
969sendPex( tr_peermsgs * msgs );
970
971static void
972parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
973{
974    uint8_t ltep_msgid;
975
976    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
977    msglen--;
978
979    if( ltep_msgid == LTEP_HANDSHAKE )
980    {
981        dbgmsg( msgs, "got ltep handshake" );
982        parseLtepHandshake( msgs, msglen, inbuf );
983        if( tr_peerIoSupportsLTEP( msgs->io ) )
984        {
985            sendLtepHandshake( msgs );
986            sendPex( msgs );
987        }
988    }
989    else if( ltep_msgid == TR_LTEP_PEX )
990    {
991        dbgmsg( msgs, "got ut pex" );
992        msgs->peerSupportsPex = 1;
993        parseUtPex( msgs, msglen, inbuf );
994    }
995    else
996    {
997        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
998        evbuffer_drain( inbuf, msglen );
999    }
1000}
1001
1002static int
1003readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1004{
1005    uint32_t len;
1006
1007    if( inlen < sizeof(len) )
1008        return READ_MORE;
1009
1010    tr_peerIoReadUint32( msgs->io, inbuf, &len );
1011
1012    if( len == 0 ) /* peer sent us a keepalive message */
1013        dbgmsg( msgs, "got KeepAlive" );
1014    else {
1015        msgs->incoming.length = len;
1016        msgs->state = AWAITING_BT_ID;
1017    }
1018
1019    return READ_AGAIN;
1020}
1021
1022static int
1023readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen );
1024
1025static int
1026readBtId( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1027{
1028    uint8_t id;
1029
1030    if( inlen < sizeof(uint8_t) )
1031        return READ_MORE;
1032
1033    tr_peerIoReadUint8( msgs->io, inbuf, &id );
1034    msgs->incoming.id = id;
1035
1036    if( id==BT_PIECE )
1037    {
1038        msgs->state = AWAITING_BT_PIECE;
1039        return READ_AGAIN;
1040    }
1041    else if( msgs->incoming.length != 1 )
1042    {
1043        msgs->state = AWAITING_BT_MESSAGE;
1044        return READ_AGAIN;
1045    }
1046    else return readBtMessage( msgs, inbuf, inlen-1 );
1047}
1048
1049static void
1050updatePeerProgress( tr_peermsgs * msgs )
1051{
1052    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have )
1053                         / (float)msgs->torrent->info.pieceCount;
1054    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
1055    updateInterest( msgs );
1056    firePeerProgress( msgs );
1057}
1058
1059static int
1060clientCanSendFastBlock( const tr_peermsgs * msgs UNUSED )
1061{
1062    /* don't send a fast piece if peer has MAX_FAST_ALLOWED_THRESHOLD pieces */
1063    if( tr_bitfieldCountTrueBits( msgs->info->have ) > MAX_FAST_ALLOWED_THRESHOLD )
1064        return FALSE;
1065   
1066    /* ...or if we don't have ourself enough pieces */
1067    if( tr_bitfieldCountTrueBits( tr_cpPieceBitfield( msgs->torrent->completion ) ) < MAX_FAST_ALLOWED_THRESHOLD )
1068        return FALSE;
1069
1070    /* Maybe a bandwidth limit ? */
1071    return TRUE;
1072}
1073
1074static void
1075peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
1076{
1077    const int reqIsValid = requestIsValid( msgs, req );
1078    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
1079    const int peerIsChoked = msgs->info->peerIsChoked;
1080    const int peerIsFast = tr_peerIoSupportsFEXT( msgs->io );
1081    const int pieceIsFast = reqIsValid && tr_bitfieldHas( getPeerAllowedPieces( msgs ), req->index );
1082    const int canSendFast = clientCanSendFastBlock( msgs );
1083
1084    if( !reqIsValid ) /* bad request */
1085    {
1086        dbgmsg( msgs, "rejecting an invalid request." );
1087        sendFastReject( msgs, req->index, req->offset, req->length );
1088    }
1089    else if( !clientHasPiece ) /* we don't have it */
1090    {
1091        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1092        sendFastReject( msgs, req->index, req->offset, req->length );
1093    }
1094    else if( peerIsChoked && !peerIsFast ) /* doesn't he know he's choked? */
1095    {
1096        tr_peerMsgsSetChoke( msgs, 1 );
1097        sendFastReject( msgs, req->index, req->offset, req->length );
1098    }
1099    else if( peerIsChoked && peerIsFast && ( !pieceIsFast || !canSendFast ) )
1100    {
1101        sendFastReject( msgs, req->index, req->offset, req->length );
1102    }
1103    else /* YAY */
1104    {
1105        if( peerIsFast && pieceIsFast )
1106            reqListAppend( &msgs->peerAskedForFast, req );
1107        else
1108            reqListAppend( &msgs->peerAskedFor, req );
1109    }
1110}
1111
1112static int
1113messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1114{
1115    switch( id )
1116    {
1117        case BT_CHOKE:
1118        case BT_UNCHOKE:
1119        case BT_INTERESTED:
1120        case BT_NOT_INTERESTED:
1121        case BT_HAVE_ALL:
1122        case BT_HAVE_NONE:
1123            return len==1;
1124
1125        case BT_HAVE:
1126        case BT_SUGGEST:
1127        case BT_ALLOWED_FAST:
1128            return len==5;
1129
1130        case BT_BITFIELD:
1131            return len == (msg->torrent->info.pieceCount+7u)/8u + 1u;
1132       
1133        case BT_REQUEST:
1134        case BT_CANCEL:
1135        case BT_REJECT:
1136            return len==13;
1137
1138        case BT_PIECE:
1139            return len>9 && len<=16393;
1140
1141        case BT_PORT:
1142            return len==3;
1143
1144        case BT_LTEP:
1145            return len >= 2;
1146
1147        default:
1148            return FALSE;
1149    }
1150}
1151
1152static int
1153clientGotBlock( tr_peermsgs * msgs, const uint8_t * block, const struct peer_request * req );
1154
1155static void
1156clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1157{
1158    const time_t now = time( NULL );
1159    tr_torrent * tor = msgs->torrent;
1160    tor->activityDate = tr_date( );
1161    tor->downloadedCur += byteCount;
1162    msgs->info->pieceDataActivityDate = now;
1163    msgs->info->credit += (int)(byteCount * SWIFT_REPAYMENT_RATIO);
1164    tr_rcTransferred( msgs->info->rcToClient, byteCount );
1165    tr_rcTransferred( tor->download, byteCount );
1166    tr_rcTransferred( tor->handle->download, byteCount );
1167    tr_statsAddDownloaded( msgs->handle, byteCount );
1168    firePieceData( msgs );
1169}
1170
1171static int
1172readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1173{
1174    struct peer_request * req = &msgs->incoming.blockReq;
1175    assert( EVBUFFER_LENGTH(inbuf) >= inlen );
1176    dbgmsg( msgs, "In readBtPiece" );
1177
1178    if( !req->length )
1179    {
1180        if( inlen < 8 )
1181            return READ_MORE;
1182
1183        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
1184        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
1185        req->length = msgs->incoming.length - 9;
1186        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1187        return READ_AGAIN;
1188    }
1189    else
1190    {
1191        int err;
1192
1193        /* read in another chunk of data */
1194        const size_t nLeft = req->length - EVBUFFER_LENGTH(msgs->incoming.block);
1195        size_t n = MIN( nLeft, inlen );
1196        uint8_t * buf = tr_new( uint8_t, n );
1197        assert( EVBUFFER_LENGTH(inbuf) >= n );
1198        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
1199        evbuffer_add( msgs->incoming.block, buf, n );
1200        clientGotBytes( msgs, n );
1201        tr_free( buf );
1202        dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
1203               (int)n, req->index, req->offset, req->length,
1204               (int)( req->length - EVBUFFER_LENGTH(msgs->incoming.block) ) );
1205        if( EVBUFFER_LENGTH(msgs->incoming.block) < req->length )
1206            return READ_MORE;
1207
1208        /* we've got the whole block ... process it */
1209        err = clientGotBlock( msgs, EVBUFFER_DATA(msgs->incoming.block), req );
1210
1211        /* cleanup */
1212        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH(msgs->incoming.block) );
1213        req->length = 0;
1214        msgs->state = AWAITING_BT_LENGTH;
1215        if( !err )
1216            return READ_AGAIN;
1217        else {
1218            fireError( msgs, err );
1219            return READ_DONE;
1220        }
1221    }
1222}
1223
1224static int
1225readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1226{
1227    uint32_t ui32;
1228    uint32_t msglen = msgs->incoming.length;
1229    const uint8_t id = msgs->incoming.id;
1230    const size_t startBufLen = EVBUFFER_LENGTH( inbuf );
1231
1232    --msglen; /* id length */
1233
1234    if( inlen < msglen )
1235        return READ_MORE;
1236
1237    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id, (int)msglen, (int)inlen );
1238
1239    if( !messageLengthIsCorrect( msgs, id, msglen+1 ) )
1240    {
1241        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1242        fireError( msgs, TR_ERROR );
1243        return READ_DONE;
1244    }
1245
1246    switch( id )
1247    {
1248        case BT_CHOKE:
1249            dbgmsg( msgs, "got Choke" );
1250            msgs->info->clientIsChoked = 1;
1251            cancelAllRequestsToPeer( msgs );
1252            cancelAllRequestsToClientExceptFast( msgs );
1253            break;
1254
1255        case BT_UNCHOKE:
1256            dbgmsg( msgs, "got Unchoke" );
1257            msgs->info->clientIsChoked = 0;
1258            fireNeedReq( msgs );
1259            break;
1260
1261        case BT_INTERESTED:
1262            dbgmsg( msgs, "got Interested" );
1263            msgs->info->peerIsInterested = 1;
1264            tr_peerMsgsSetChoke( msgs, 0 );
1265            break;
1266
1267        case BT_NOT_INTERESTED:
1268            dbgmsg( msgs, "got Not Interested" );
1269            msgs->info->peerIsInterested = 0;
1270            break;
1271           
1272        case BT_HAVE:
1273            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1274            dbgmsg( msgs, "got Have: %u", ui32 );
1275            if( tr_bitfieldAdd( msgs->info->have, ui32 ) )
1276                fireError( msgs, TR_ERROR_PEER_MESSAGE );
1277            updatePeerProgress( msgs );
1278            tr_rcTransferred( msgs->torrent->swarmSpeed, msgs->torrent->info.pieceSize );
1279            break;
1280
1281        case BT_BITFIELD: {
1282            dbgmsg( msgs, "got a bitfield" );
1283            msgs->peerSentBitfield = 1;
1284            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
1285            updatePeerProgress( msgs );
1286            maybeSendFastAllowedSet( msgs );
1287            fireNeedReq( msgs );
1288            break;
1289        }
1290
1291        case BT_REQUEST: {
1292            struct peer_request r;
1293            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1294            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1295            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1296            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1297            peerMadeRequest( msgs, &r );
1298            break;
1299        }
1300
1301        case BT_CANCEL: {
1302            struct peer_request r;
1303            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1304            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1305            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1306            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1307            reqListRemove( &msgs->peerAskedForFast, &r );
1308            reqListRemove( &msgs->peerAskedFor, &r );
1309            break;
1310        }
1311
1312        case BT_PIECE:
1313            assert( 0 ); /* handled elsewhere! */
1314            break;
1315       
1316        case BT_PORT:
1317            dbgmsg( msgs, "Got a BT_PORT" );
1318            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1319            break;
1320           
1321        case BT_SUGGEST: {
1322            dbgmsg( msgs, "Got a BT_SUGGEST" );
1323            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1324            /* we don't do anything with this yet */
1325            break;
1326        }
1327           
1328        case BT_HAVE_ALL:
1329            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1330            tr_bitfieldAddRange( msgs->info->have, 0, msgs->torrent->info.pieceCount );
1331            updatePeerProgress( msgs );
1332            maybeSendFastAllowedSet( msgs );
1333            break;
1334       
1335       
1336        case BT_HAVE_NONE:
1337            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1338            tr_bitfieldClear( msgs->info->have );
1339            updatePeerProgress( msgs );
1340            maybeSendFastAllowedSet( msgs );
1341            break;
1342       
1343        case BT_REJECT: {
1344            struct peer_request r;
1345            dbgmsg( msgs, "Got a BT_REJECT" );
1346            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1347            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1348            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1349            reqListRemove( &msgs->clientAskedFor, &r );
1350            break;
1351        }
1352
1353        case BT_ALLOWED_FAST: {
1354            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1355            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1356            /* we don't do anything with this yet */
1357            break;
1358        }
1359
1360        case BT_LTEP:
1361            dbgmsg( msgs, "Got a BT_LTEP" );
1362            parseLtep( msgs, msglen, inbuf );
1363            break;
1364
1365        default:
1366            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1367            tr_peerIoDrain( msgs->io, inbuf, msglen );
1368            break;
1369    }
1370
1371    assert( msglen + 1 == msgs->incoming.length );
1372    assert( EVBUFFER_LENGTH(inbuf) == startBufLen - msglen );
1373
1374    msgs->state = AWAITING_BT_LENGTH;
1375    return READ_AGAIN;
1376}
1377
1378static void
1379peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1380{
1381    const time_t now = time( NULL );
1382    tr_torrent * tor = msgs->torrent;
1383    tor->activityDate = tr_date( );
1384    tor->uploadedCur += byteCount;
1385    msgs->info->pieceDataActivityDate = now;
1386    msgs->info->credit -= byteCount;
1387    tr_rcTransferred( msgs->info->rcToPeer, byteCount );
1388    tr_rcTransferred( tor->upload, byteCount );
1389    tr_rcTransferred( tor->handle->upload, byteCount );
1390    tr_statsAddUploaded( msgs->handle, byteCount );
1391    firePieceData( msgs );
1392}
1393
1394static size_t
1395getDownloadMax( const tr_peermsgs * msgs )
1396{
1397    static const size_t maxval = ~0;
1398    const tr_torrent * tor = msgs->torrent;
1399
1400    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1401        return tor->handle->useDownloadLimit
1402            ? tr_rcBytesLeft( tor->handle->download ) : maxval;
1403
1404    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
1405        return tr_rcBytesLeft( tor->download );
1406
1407    return maxval;
1408}
1409
1410static void
1411decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount )
1412{
1413    tr_torrent * tor = msgs->torrent;
1414    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1415}
1416
1417static void
1418reassignBytesToCorrupt( tr_peermsgs * msgs, uint32_t byteCount )
1419{
1420    tr_torrent * tor = msgs->torrent;
1421
1422    tor->corruptCur += byteCount;
1423
1424    decrementDownloadedCount( msgs, byteCount );
1425}
1426
1427static void
1428gotBadPiece( tr_peermsgs * msgs, tr_piece_index_t pieceIndex )
1429{
1430    const uint32_t byteCount =
1431        tr_torPieceCountBytes( msgs->torrent, pieceIndex );
1432    reassignBytesToCorrupt( msgs, byteCount );
1433}
1434
1435static void
1436clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1437{
1438    decrementDownloadedCount( msgs, req->length );
1439}
1440
1441static void
1442addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1443{
1444    if( !msgs->info->blame )
1445         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1446    tr_bitfieldAdd( msgs->info->blame, index );
1447}
1448
1449static tr_errno
1450clientGotBlock( tr_peermsgs                * msgs,
1451                const uint8_t              * data,
1452                const struct peer_request  * req )
1453{
1454    int err;
1455    tr_torrent * tor = msgs->torrent;
1456    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1457
1458    assert( msgs != NULL );
1459    assert( req != NULL );
1460
1461    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) )
1462    {
1463        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1464                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1465        return TR_ERROR;
1466    }
1467
1468    /* save the block */
1469    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1470
1471    /**
1472    *** Remove the block from our `we asked for this' list
1473    **/
1474
1475    if( reqListRemove( &msgs->clientAskedFor, req ) )
1476    {
1477        clientGotUnwantedBlock( msgs, req );
1478        dbgmsg( msgs, "we didn't ask for this message..." );
1479        return 0;
1480    }
1481
1482    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1483                  msgs->clientAskedFor.count );
1484
1485    /**
1486    *** Error checks
1487    **/
1488
1489    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1490        dbgmsg( msgs, "we have this block already..." );
1491        clientGotUnwantedBlock( msgs, req );
1492        return 0;
1493    }
1494
1495    /**
1496    ***  Save the block
1497    **/
1498
1499    msgs->info->peerSentPieceDataAt = time( NULL );
1500    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1501        return err;
1502
1503    tr_cpBlockAdd( tor->completion, block );
1504
1505    addPeerToBlamefield( msgs, req->index );
1506
1507    fireGotBlock( msgs, req );
1508
1509    /**
1510    ***  Handle if this was the last block in the piece
1511    **/
1512
1513    if( tr_cpPieceIsComplete( tor->completion, req->index ) )
1514    {
1515        const tr_errno err = tr_ioTestPiece( tor, req->index );
1516
1517        if( err )
1518            tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test: %s" ),
1519                       (unsigned long)req->index,
1520                       tr_errorString( err ) );
1521
1522        tr_torrentSetHasPiece( tor, req->index, !err );
1523        tr_torrentSetPieceChecked( tor, req->index, TRUE );
1524        tr_peerMgrSetBlame( tor->handle->peerMgr, tor->info.hash, req->index, !err );
1525
1526        if( !err )
1527            fireClientHave( msgs, req->index );
1528        else
1529            gotBadPiece( msgs, req->index );
1530    }
1531
1532    return 0;
1533}
1534
1535static void
1536didWrite( struct bufferevent * evin UNUSED, void * vmsgs )
1537{
1538    pulse( vmsgs );
1539}
1540
1541static ReadState
1542canRead( struct bufferevent * evin, void * vmsgs )
1543{
1544    ReadState ret;
1545    tr_peermsgs * msgs = vmsgs;
1546    struct evbuffer * in = EVBUFFER_INPUT ( evin );
1547    const size_t inlen = EVBUFFER_LENGTH( in );
1548
1549    if( !inlen )
1550    {
1551        ret = READ_DONE;
1552    }
1553    else if( msgs->state == AWAITING_BT_PIECE )
1554    {
1555        const size_t downloadMax = getDownloadMax( msgs );
1556        const size_t n = MIN( inlen, downloadMax );
1557        ret = n ? readBtPiece( msgs, in, n ) : READ_DONE;
1558    }
1559    else switch( msgs->state )
1560    {
1561        case AWAITING_BT_LENGTH:  ret = readBtLength ( msgs, in, inlen ); break;
1562        case AWAITING_BT_ID:      ret = readBtId     ( msgs, in, inlen ); break;
1563        case AWAITING_BT_MESSAGE: ret = readBtMessage( msgs, in, inlen ); break;
1564        default:                  assert( 0 );
1565    }
1566
1567    return ret;
1568}
1569
1570static void
1571sendKeepalive( tr_peermsgs * msgs )
1572{
1573    dbgmsg( msgs, "sending a keepalive message" );
1574    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1575}
1576
1577/**
1578***
1579**/
1580
1581static int
1582isSwiftEnabled( const tr_peermsgs * msgs )
1583{
1584    /* rationale: SWIFT is good for getting rid of deadbeats, but most
1585     * private trackers have ratios where you _want_ to feed deadbeats
1586     * as much as possible.  So we disable SWIFT on private torrents */
1587    return SWIFT_ENABLED
1588        && !tr_torrentIsSeed( msgs->torrent )
1589        && !tr_torrentIsPrivate( msgs->torrent );
1590}
1591
1592static size_t
1593getUploadMax( const tr_peermsgs * msgs )
1594{
1595    static const size_t maxval = ~0;
1596    const tr_torrent * tor = msgs->torrent;
1597    const int useSwift = isSwiftEnabled( msgs );
1598    const size_t swiftLeft = msgs->info->credit;
1599    int speedLeft;
1600    int bufLeft;
1601    size_t ret;
1602
1603    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1604        speedLeft = tor->handle->useUploadLimit ? tr_rcBytesLeft( tor->handle->upload ) : maxval;
1605    else if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
1606        speedLeft = tr_rcBytesLeft( tor->upload );
1607    else
1608        speedLeft = INT_MAX;
1609
1610    /* this basically says the outbuf shouldn't have more than one block
1611     * queued up in it... blocksize, +13 for the size of the BT protocol's
1612     * block message overhead */
1613    bufLeft = tor->blockSize + 13 - tr_peerIoWriteBytesWaiting( msgs->io );
1614    ret = MIN( speedLeft, bufLeft );
1615    if( useSwift)
1616        ret = MIN( ret, swiftLeft );
1617    return ret;
1618}
1619
1620static int
1621ratePulse( void * vmsgs )
1622{
1623    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1624    msgs->info->rateToClient = tr_rcRate( msgs->info->rcToClient );
1625    msgs->info->rateToPeer = tr_rcRate( msgs->info->rcToPeer );
1626    msgs->maxActiveRequests = MIN( 4 + (int)(msgs->info->rateToClient/4), MAX_QUEUE_SIZE );
1627    msgs->minActiveRequests = msgs->maxActiveRequests / 3;
1628    return TRUE;
1629}
1630
1631static tr_errno
1632popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
1633{
1634    if( !reqListPop( &msgs->peerAskedForFast, setme ) )
1635        return 0;
1636    if( !reqListPop( &msgs->peerAskedFor, setme ) )
1637        return 0;
1638
1639    return TR_ERROR;
1640}
1641
1642static int
1643pulse( void * vmsgs )
1644{
1645    const time_t now = time( NULL );
1646    tr_peermsgs * msgs = vmsgs;
1647
1648    tr_peerIoTryRead( msgs->io );
1649    pumpRequestQueue( msgs );
1650    expireOldRequests( msgs );
1651
1652    if( msgs->sendingBlock )
1653    {
1654        const size_t uploadMax = getUploadMax( msgs );
1655        size_t len = EVBUFFER_LENGTH( msgs->outBlock );
1656        const size_t outlen = MIN( len, uploadMax );
1657
1658        assert( len );
1659
1660        if( outlen )
1661        {
1662            tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen );
1663            evbuffer_drain( msgs->outBlock, outlen );
1664            peerGotBytes( msgs, outlen );
1665
1666            len -= outlen;
1667            msgs->clientSentAnythingAt = now;
1668            msgs->sendingBlock = len!=0;
1669
1670            dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen, (int)len );
1671        }
1672        else dbgmsg( msgs, "stalled writing block... uploadMax %lu, outlen %lu", uploadMax, outlen );
1673    }
1674
1675    if( !msgs->sendingBlock )
1676    {
1677        struct peer_request req;
1678
1679        if(( EVBUFFER_LENGTH( msgs->outMessages ) ))
1680        {
1681            dbgmsg( msgs, "flushing outMessages..." );
1682            tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1683            msgs->clientSentAnythingAt = now;
1684        }
1685        else if( !EVBUFFER_LENGTH( msgs->outBlock )
1686            && !popNextRequest( msgs, &req )
1687            && requestIsValid( msgs, &req )
1688            && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) )
1689        {
1690            uint8_t * buf = tr_new( uint8_t, req.length );
1691
1692            if( !tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf ) )
1693            {
1694                tr_peerIo * io = msgs->io;
1695                struct evbuffer * out = msgs->outBlock;
1696
1697                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1698                tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 2*sizeof(uint32_t) + req.length );
1699                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1700                tr_peerIoWriteUint32( io, out, req.index );
1701                tr_peerIoWriteUint32( io, out, req.offset );
1702                tr_peerIoWriteBytes ( io, out, buf, req.length );
1703                msgs->sendingBlock = 1;
1704            }
1705
1706            tr_free( buf );
1707        }
1708        else if( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
1709        {
1710            sendKeepalive( msgs );
1711        }
1712    }
1713
1714    return TRUE; /* loop forever */
1715}
1716
1717static void
1718gotError( struct bufferevent * evbuf UNUSED, short what, void * vmsgs )
1719{
1720    if( what & EVBUFFER_TIMEOUT )
1721        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1722    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1723        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1724                what, errno, tr_strerror(errno) );
1725    fireError( vmsgs, TR_ERROR );
1726}
1727
1728static void
1729sendBitfield( tr_peermsgs * msgs )
1730{
1731    const tr_bitfield * bitfield = tr_cpPieceBitfield( msgs->torrent->completion );
1732    struct evbuffer * out = msgs->outMessages;
1733
1734    dbgmsg( msgs, "sending peer a bitfield message" );
1735    tr_peerIoWriteUint32( msgs->io, out, sizeof(uint8_t) + bitfield->len );
1736    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1737    tr_peerIoWriteBytes ( msgs->io, out, bitfield->bits, bitfield->len );
1738}
1739
1740/**
1741***
1742**/
1743
1744/* some peers give us error messages if we send
1745   more than this many peers in a single pex message
1746   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1747#define MAX_PEX_ADDED 50
1748#define MAX_PEX_DROPPED 50
1749
1750typedef struct
1751{
1752    tr_pex * added;
1753    tr_pex * dropped;
1754    tr_pex * elements;
1755    int addedCount;
1756    int droppedCount;
1757    int elementCount;
1758}
1759PexDiffs;
1760
1761static void
1762pexAddedCb( void * vpex, void * userData )
1763{
1764    PexDiffs * diffs = userData;
1765    tr_pex * pex = vpex;
1766    if( diffs->addedCount < MAX_PEX_ADDED )
1767    {
1768        diffs->added[diffs->addedCount++] = *pex;
1769        diffs->elements[diffs->elementCount++] = *pex;
1770    }
1771}
1772
1773static void
1774pexDroppedCb( void * vpex, void * userData )
1775{
1776    PexDiffs * diffs = userData;
1777    tr_pex * pex = vpex;
1778    if( diffs->droppedCount < MAX_PEX_DROPPED )
1779    {
1780        diffs->dropped[diffs->droppedCount++] = *pex;
1781    }
1782}
1783
1784static void
1785pexElementCb( void * vpex, void * userData )
1786{
1787    PexDiffs * diffs = userData;
1788    tr_pex * pex = vpex;
1789    diffs->elements[diffs->elementCount++] = *pex;
1790}
1791
1792static void
1793sendPex( tr_peermsgs * msgs )
1794{
1795    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
1796    {
1797        int i;
1798        tr_pex * newPex = NULL;
1799        const int newCount = tr_peerMgrGetPeers( msgs->handle->peerMgr, msgs->torrent->info.hash, &newPex );
1800        PexDiffs diffs;
1801        tr_benc val, *added, *dropped;
1802        uint8_t *tmp, *walk;
1803        char * benc;
1804        int bencLen;
1805
1806        /* build the diffs */
1807        diffs.added = tr_new( tr_pex, newCount );
1808        diffs.addedCount = 0;
1809        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1810        diffs.droppedCount = 0;
1811        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1812        diffs.elementCount = 0;
1813        tr_set_compare( msgs->pex, msgs->pexCount,
1814                        newPex, newCount,
1815                        tr_pexCompare, sizeof(tr_pex),
1816                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
1817        dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1818
1819        /* update peer */
1820        tr_free( msgs->pex );
1821        msgs->pex = diffs.elements;
1822        msgs->pexCount = diffs.elementCount;
1823
1824        /* build the pex payload */
1825        tr_bencInitDict( &val, 3 );
1826
1827        /* "added" */
1828        added = tr_bencDictAdd( &val, "added" );
1829        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1830        for( i=0; i<diffs.addedCount; ++i ) {
1831            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1832            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1833        }
1834        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1835        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
1836
1837        /* "added.f" */
1838        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1839        for( i=0; i<diffs.addedCount; ++i )
1840            *walk++ = diffs.added[i].flags;
1841        assert( ( walk - tmp ) == diffs.addedCount );
1842        tr_bencDictAddRaw( &val, "added.f", tmp, walk-tmp );
1843
1844        /* "dropped" */
1845        dropped = tr_bencDictAdd( &val, "dropped" );
1846        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1847        for( i=0; i<diffs.droppedCount; ++i ) {
1848            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1849            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1850        }
1851        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1852        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
1853
1854        /* write the pex message */
1855        benc = tr_bencSave( &val, &bencLen );
1856        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 2*sizeof(uint8_t) + bencLen );
1857        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_LTEP );
1858        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, msgs->ut_pex_id );
1859        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, benc, bencLen );
1860
1861        /* cleanup */
1862        tr_free( benc );
1863        tr_bencFree( &val );
1864        tr_free( diffs.added );
1865        tr_free( diffs.dropped );
1866        tr_free( newPex );
1867
1868        msgs->clientSentPexAt = time( NULL );
1869    }
1870}
1871
1872static int
1873pexPulse( void * vpeer )
1874{
1875    sendPex( vpeer );
1876    return TRUE;
1877}
1878
1879/**
1880***
1881**/
1882
1883tr_peermsgs*
1884tr_peerMsgsNew( struct tr_torrent * torrent,
1885                struct tr_peer    * info,
1886                tr_delivery_func    func,
1887                void              * userData,
1888                tr_publisher_tag  * setme )
1889{
1890    tr_peermsgs * m;
1891
1892    assert( info != NULL );
1893    assert( info->io != NULL );
1894
1895    m = tr_new0( tr_peermsgs, 1 );
1896    m->publisher = tr_publisherNew( );
1897    m->info = info;
1898    m->handle = torrent->handle;
1899    m->torrent = torrent;
1900    m->io = info->io;
1901    m->info->clientIsChoked = 1;
1902    m->info->peerIsChoked = 1;
1903    m->info->clientIsInterested = 0;
1904    m->info->peerIsInterested = 0;
1905    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1906    m->state = AWAITING_BT_LENGTH;
1907    m->pulseTimer = tr_timerNew( m->handle, pulse, m, PEER_PULSE_INTERVAL );
1908    m->rateTimer = tr_timerNew( m->handle, ratePulse, m, RATE_PULSE_INTERVAL );
1909    m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL );
1910    m->outMessages = evbuffer_new( );
1911    m->incoming.block = evbuffer_new( );
1912    m->outBlock = evbuffer_new( );
1913    m->peerAllowedPieces = NULL;
1914    m->peerAskedFor = REQUEST_LIST_INIT;
1915    m->peerAskedForFast = REQUEST_LIST_INIT;
1916    m->clientAskedFor = REQUEST_LIST_INIT;
1917    m->clientWillAskFor = REQUEST_LIST_INIT;
1918    *setme = tr_publisherSubscribe( m->publisher, func, userData );
1919
1920    if ( tr_peerIoSupportsLTEP( m->io ) )
1921        sendLtepHandshake( m );
1922
1923    sendBitfield( m );
1924   
1925    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of inactivity */
1926    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
1927    ratePulse( m );
1928
1929    return m;
1930}
1931
1932void
1933tr_peerMsgsFree( tr_peermsgs* msgs )
1934{
1935    if( msgs != NULL )
1936    {
1937        tr_timerFree( &msgs->pulseTimer );
1938        tr_timerFree( &msgs->rateTimer );
1939        tr_timerFree( &msgs->pexTimer );
1940        tr_publisherFree( &msgs->publisher );
1941        reqListClear( &msgs->clientWillAskFor );
1942        reqListClear( &msgs->clientAskedFor );
1943        reqListClear( &msgs->peerAskedForFast );
1944        reqListClear( &msgs->peerAskedFor );
1945        tr_bitfieldFree( msgs->peerAllowedPieces );
1946        evbuffer_free( msgs->incoming.block );
1947        evbuffer_free( msgs->outMessages );
1948        evbuffer_free( msgs->outBlock );
1949        tr_free( msgs->pex );
1950
1951        memset( msgs, ~0, sizeof( tr_peermsgs ) );
1952        tr_free( msgs );
1953    }
1954}
1955
1956tr_publisher_tag
1957tr_peerMsgsSubscribe( tr_peermsgs       * peer,
1958                      tr_delivery_func    func,
1959                      void              * userData )
1960{
1961    return tr_publisherSubscribe( peer->publisher, func, userData );
1962}
1963
1964void
1965tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
1966                        tr_publisher_tag    tag )
1967{
1968    tr_publisherUnsubscribe( peer->publisher, tag );
1969}
Note: See TracBrowser for help on using the repository browser.