source: trunk/libtransmission/peer-msgs.c @ 6496

Last change on this file since 6496 was 6496, checked in by charles, 13 years ago

readability improvments #1, #2, #3, #4. (muks)

  • Property svn:keywords set to Date Rev Author Id
File size: 56.0 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 6496 2008-08-12 13:51:11Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19#include <libgen.h> /* basename */
20
21#include <event.h>
22
23#include "transmission.h"
24#include "bencode.h"
25#include "completion.h"
26#include "inout.h"
27#include "peer-io.h"
28#include "peer-mgr.h"
29#include "peer-mgr-private.h"
30#include "peer-msgs.h"
31#include "ratecontrol.h"
32#include "stats.h"
33#include "torrent.h"
34#include "trevent.h"
35#include "utils.h"
36
37/**
38***
39**/
40
41enum
42{
43    BT_CHOKE                = 0,
44    BT_UNCHOKE              = 1,
45    BT_INTERESTED           = 2,
46    BT_NOT_INTERESTED       = 3,
47    BT_HAVE                 = 4,
48    BT_BITFIELD             = 5,
49    BT_REQUEST              = 6,
50    BT_PIECE                = 7,
51    BT_CANCEL               = 8,
52    BT_PORT                 = 9,
53    BT_SUGGEST              = 13,
54    BT_HAVE_ALL             = 14,
55    BT_HAVE_NONE            = 15,
56    BT_REJECT               = 16,
57    BT_ALLOWED_FAST         = 17,
58    BT_LTEP                 = 20,
59
60    LTEP_HANDSHAKE          = 0,
61
62    TR_LTEP_PEX             = 1,
63
64    MIN_CHOKE_PERIOD_SEC    = (10),
65
66    /* idle seconds before we send a keepalive */
67    KEEPALIVE_INTERVAL_SECS = 100,
68
69    PEX_INTERVAL            = (90 * 1000), /* msec between sendPex() calls */
70    PEER_PULSE_INTERVAL     = (50),        /* msec between peerPulse() calls */
71    RATE_PULSE_INTERVAL     = (250),       /* msec between ratePulse() calls */
72
73    MAX_QUEUE_SIZE          = (100),
74     
75    /* (fast peers) max number of pieces we fast-allow to another peer */
76    MAX_FAST_ALLOWED_COUNT   = 10,
77
78    /* (fast peers) max threshold for allowing fast-pieces requests */
79    MAX_FAST_ALLOWED_THRESHOLD = 10,
80
81    /* how long an unsent request can stay queued before it's returned
82       back to the peer-mgr's pool of requests */
83    QUEUED_REQUEST_TTL_SECS = 20,
84
85    /* how long a sent request can stay queued before it's returned
86       back to the peer-mgr's pool of requests */
87    SENT_REQUEST_TTL_SECS = 120,
88
89    /* used in lowering the outMessages queue period */
90    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
91    HIGH_PRIORITY_INTERVAL_SECS = 2,
92    LOW_PRIORITY_INTERVAL_SECS = 20
93};
94
95/**
96***  REQUEST MANAGEMENT
97**/
98
99enum
100{
101    AWAITING_BT_LENGTH,
102    AWAITING_BT_ID,
103    AWAITING_BT_MESSAGE,
104    AWAITING_BT_PIECE
105};
106
107struct peer_request
108{
109    uint32_t index;
110    uint32_t offset;
111    uint32_t length;
112    time_t time_requested;
113};
114
115static int
116compareRequest( const void * va, const void * vb )
117{
118    int i;
119    const struct peer_request * a = va;
120    const struct peer_request * b = vb;
121    if(( i = tr_compareUint32( a->index, b->index ))) return i;
122    if(( i = tr_compareUint32( a->offset, b->offset ))) return i;
123    if(( i = tr_compareUint32( a->length, b->length ))) return i;
124    return 0;
125}
126
127struct request_list
128{
129    uint16_t count;
130    uint16_t max;
131    struct peer_request * requests;
132};
133
134static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
135
136static void
137reqListReserve( struct request_list * list, uint16_t max )
138{
139    if( list->max < max )
140    {
141        list->max = max;
142        list->requests = tr_renew( struct peer_request,
143                                   list->requests,
144                                   list->max );
145    }
146}
147
148static void
149reqListClear( struct request_list * list )
150{
151    tr_free( list->requests );
152    *list = REQUEST_LIST_INIT;
153}
154
155static void
156reqListCopy( struct request_list * dest, const struct request_list * src )
157{
158    dest->count = dest->max = src->count;
159    dest->requests = tr_memdup( src->requests, dest->count * sizeof( struct peer_request ) );
160}
161
162static void
163reqListRemoveOne( struct request_list * list, int i )
164{
165    assert( 0<=i && i<list->count );
166
167    memmove( &list->requests[i],
168             &list->requests[i+1],
169             sizeof( struct peer_request ) * ( --list->count - i ) );
170}
171
172static void
173reqListAppend( struct request_list * list, const struct peer_request * req )
174{
175    if( ++list->count >= list->max )
176        reqListReserve( list, list->max + 8 );
177
178    list->requests[list->count-1] = *req;
179}
180
181static void
182reqListAppendPiece( const tr_torrent     * tor,
183                    struct request_list  * list,
184                    tr_piece_index_t       piece )
185{
186    const time_t now = time( NULL );
187    const size_t n = tr_torPieceCountBlocks( tor, piece );
188    const tr_block_index_t begin = tr_torPieceFirstBlock( tor, piece );
189    const tr_block_index_t end = begin + n;
190    tr_block_index_t i;
191
192    if( list->count + n >= list->max )
193        reqListReserve( list, list->max + n );
194
195    for( i=begin; i<end; ++i ) {
196        if( !tr_cpBlockIsComplete( tor->completion, i ) ) {
197            struct peer_request * req = list->requests + list->count++;
198            req->index = piece;
199            req->offset = (i * tor->blockSize) - (piece * tor->info.pieceSize);
200            req->length = tr_torBlockCountBytes( tor, i );
201            req->time_requested = now;
202            assert( tr_torrentReqIsValid( tor, req->index, req->offset, req->length ) );
203        }
204    }
205}
206
207static tr_errno
208reqListPop( struct request_list * list, struct peer_request * setme )
209{
210    tr_errno err;
211
212    if( !list->count )
213        err = TR_ERROR;
214    else {
215        *setme = list->requests[0];
216        reqListRemoveOne( list, 0 );
217        err = TR_OK;
218    }
219
220    return err;
221}
222
223static int
224reqListHasPiece( struct request_list * list, const tr_piece_index_t piece )
225{
226    uint16_t i;
227
228    for( i=0; i<list->count; ++i )
229        if( list->requests[i].index == piece )
230            return 1;
231
232    return 0;
233}
234
235static int
236reqListFind( struct request_list * list, const struct peer_request * key )
237{
238    uint16_t i;
239
240    for( i=0; i<list->count; ++i )
241        if( !compareRequest( key, list->requests+i ) )
242            return i;
243
244    return -1;
245}
246
247static tr_errno
248reqListRemove( struct request_list * list, const struct peer_request * key )
249{
250    tr_errno err;
251    const int i = reqListFind( list, key );
252
253    if( i < 0 )
254        err = TR_ERROR;
255    else {
256        err = TR_OK;
257        reqListRemoveOne( list, i );
258    }
259
260    return err;
261}
262
263/**
264***
265**/
266
267/* this is raw, unchanged data from the peer regarding
268 * the current message that it's sending us. */
269struct tr_incoming
270{
271    uint8_t id;
272    uint32_t length; /* includes the +1 for id length */
273    struct peer_request blockReq; /* metadata for incoming blocks */
274    struct evbuffer * block; /* piece data for incoming blocks */
275};
276
277struct tr_peermsgs
278{
279    unsigned int peerSentBitfield         : 1;
280    unsigned int peerSupportsPex          : 1;
281    unsigned int clientSentLtepHandshake  : 1;
282    unsigned int peerSentLtepHandshake    : 1;
283    unsigned int sendingBlock             : 1;
284   
285    uint8_t state;
286    uint8_t ut_pex_id;
287    uint16_t pexCount;
288    uint16_t minActiveRequests;
289    uint16_t maxActiveRequests;
290
291    /* how long the outMessages batch should be allowed to grow before
292     * it's flushed -- some messages (like requests >:) should be sent
293     * very quickly; others aren't as urgent. */
294    int outMessagesBatchPeriod;
295
296    tr_peer * info;
297
298    tr_handle * handle;
299    tr_torrent * torrent;
300    tr_peerIo * io;
301
302    tr_publisher_t * publisher;
303
304    struct evbuffer * outBlock;    /* buffer of the current piece message */
305    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
306
307    struct request_list peerAskedFor;
308    struct request_list peerAskedForFast;
309    struct request_list clientAskedFor;
310    struct request_list clientWillAskFor;
311
312    tr_timer * rateTimer;
313    tr_timer * pulseTimer;
314    tr_timer * pexTimer;
315
316    time_t clientSentPexAt;
317    time_t clientSentAnythingAt;
318
319    /* when we started batching the outMessages */
320    time_t outMessagesBatchedAt;
321   
322    tr_bitfield * peerAllowedPieces;
323
324    struct tr_incoming incoming; 
325
326    tr_pex * pex;
327};
328
329/**
330***
331**/
332
333static void
334myDebug( const char * file, int line,
335         const struct tr_peermsgs * msgs,
336         const char * fmt, ... )
337{
338    FILE * fp = tr_getLog( );
339    if( fp )
340    {
341        va_list args;
342        char timestr[64];
343        struct evbuffer * buf = evbuffer_new( );
344        char * myfile = tr_strdup( file );
345
346        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
347                             tr_getLogTimeStr( timestr, sizeof(timestr) ),
348                             msgs->torrent->info.name,
349                             tr_peerIoGetAddrStr( msgs->io ),
350                             msgs->info->client );
351        va_start( args, fmt );
352        evbuffer_add_vprintf( buf, fmt, args );
353        va_end( args );
354        evbuffer_add_printf( buf, " (%s:%d)\n", basename(myfile), line );
355        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
356
357        tr_free( myfile );
358        evbuffer_free( buf );
359    }
360}
361
362#define dbgmsg(msgs, fmt...) myDebug(__FILE__, __LINE__, msgs, ##fmt )
363
364/**
365***
366**/
367
368static void
369pokeBatchPeriod( tr_peermsgs * msgs, int interval )
370{
371    if( msgs->outMessagesBatchPeriod > interval )
372    {
373        msgs->outMessagesBatchPeriod = interval;
374        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
375    }
376}
377
378static void
379protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
380{
381    tr_peerIo * io = msgs->io;
382    struct evbuffer * out = msgs->outMessages;
383
384    dbgmsg( msgs, "requesting %u:%u->%u", req->index, req->offset, req->length );
385    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
386    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
387    tr_peerIoWriteUint32( io, out, req->index );
388    tr_peerIoWriteUint32( io, out, req->offset );
389    tr_peerIoWriteUint32( io, out, req->length );
390    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
391}
392
393static void
394protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
395{
396    tr_peerIo * io = msgs->io;
397    struct evbuffer * out = msgs->outMessages;
398
399    dbgmsg( msgs, "cancelling %u:%u->%u", req->index, req->offset, req->length );
400    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
401    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
402    tr_peerIoWriteUint32( io, out, req->index );
403    tr_peerIoWriteUint32( io, out, req->offset );
404    tr_peerIoWriteUint32( io, out, req->length );
405    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
406}
407
408static void
409protocolSendHave( tr_peermsgs * msgs, uint32_t index )
410{
411    tr_peerIo * io = msgs->io;
412    struct evbuffer * out = msgs->outMessages;
413
414    dbgmsg( msgs, "sending Have %u", index );
415    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
416    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
417    tr_peerIoWriteUint32( io, out, index );
418    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
419}
420
421static void
422protocolSendChoke( tr_peermsgs * msgs, int choke )
423{
424    tr_peerIo * io = msgs->io;
425    struct evbuffer * out = msgs->outMessages;
426
427    dbgmsg( msgs, "sending %s", (choke ? "Choke" : "Unchoke") );
428    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) );
429    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
430    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
431}
432
433/**
434***  EVENTS
435**/
436
437static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0 };
438
439static void
440publish( tr_peermsgs * msgs, tr_peer_event * e )
441{
442    tr_publisherPublish( msgs->publisher, msgs->info, e );
443}
444
445static void
446fireError( tr_peermsgs * msgs, tr_errno err )
447{
448    tr_peer_event e = blankEvent;
449    e.eventType = TR_PEER_ERROR;
450    e.err = err;
451    publish( msgs, &e );
452}
453
454static void
455fireNeedReq( tr_peermsgs * msgs )
456{
457    tr_peer_event e = blankEvent;
458    e.eventType = TR_PEER_NEED_REQ;
459    publish( msgs, &e );
460}
461
462static void
463firePeerProgress( tr_peermsgs * msgs )
464{
465    tr_peer_event e = blankEvent;
466    e.eventType = TR_PEER_PEER_PROGRESS;
467    e.progress = msgs->info->progress;
468    publish( msgs, &e );
469}
470
471static void
472fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
473{
474    tr_peer_event e = blankEvent;
475    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
476    e.pieceIndex = req->index;
477    e.offset = req->offset;
478    e.length = req->length;
479    publish( msgs, &e );
480}
481
482static void
483fireClientGotData( tr_peermsgs * msgs, uint32_t length )
484{
485    tr_peer_event e = blankEvent;
486    e.length = length;
487    e.eventType = TR_PEER_CLIENT_GOT_DATA;
488    publish( msgs, &e );
489}
490
491static void
492firePeerGotData( tr_peermsgs * msgs, uint32_t length )
493{
494    tr_peer_event e = blankEvent;
495    e.length = length;
496    e.eventType = TR_PEER_PEER_GOT_DATA;
497    publish( msgs, &e );
498}
499
500static void
501fireCancelledReq( tr_peermsgs * msgs, const tr_piece_index_t pieceIndex )
502{
503    tr_peer_event e = blankEvent;
504    e.eventType = TR_PEER_CANCEL;
505    e.pieceIndex = pieceIndex;
506    publish( msgs, &e );
507}
508
509/**
510***  INTEREST
511**/
512
513static int
514isPieceInteresting( const tr_peermsgs   * peer,
515                    int                   piece )
516{
517    const tr_torrent * torrent = peer->torrent;
518
519    return ( ( !torrent->info.pieces[piece].dnd )               /* we want it */
520          && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have */
521          && ( tr_bitfieldHas( peer->info->have, piece ) ) );  /* peer has it */
522}
523
524/* "interested" means we'll ask for piece data if they unchoke us */
525static int
526isPeerInteresting( const tr_peermsgs * msgs )
527{
528    tr_piece_index_t i;
529    const tr_torrent * torrent;
530    const tr_bitfield * bitfield;
531    const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
532
533    if( clientIsSeed )
534        return FALSE;
535
536    torrent = msgs->torrent;
537    bitfield = tr_cpPieceBitfield( torrent->completion );
538
539    if( !msgs->info->have )
540        return TRUE;
541
542    assert( bitfield->byteCount == msgs->info->have->byteCount );
543
544    for( i=0; i<torrent->info.pieceCount; ++i )
545        if( isPieceInteresting( msgs, i ) )
546            return TRUE;
547
548    return FALSE;
549}
550
551static void
552sendInterest( tr_peermsgs * msgs, int weAreInterested )
553{
554    assert( msgs );
555    assert( weAreInterested==0 || weAreInterested==1 );
556
557    msgs->info->clientIsInterested = weAreInterested;
558    dbgmsg( msgs, "Sending %s",
559            weAreInterested ? "Interested" : "Not Interested");
560
561    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
562    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages,
563                   weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
564    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
565}
566
567static void
568updateInterest( tr_peermsgs * msgs )
569{
570    const int i = isPeerInteresting( msgs );
571    if( i != msgs->info->clientIsInterested )
572        sendInterest( msgs, i );
573    if( i )
574        fireNeedReq( msgs );
575}
576
577static void
578cancelAllRequestsToClientExceptFast( tr_peermsgs * msgs )
579{
580    reqListClear( &msgs->peerAskedFor );
581}
582
583void
584tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
585{
586    const time_t fibrillationTime = time(NULL) - MIN_CHOKE_PERIOD_SEC;
587
588    assert( msgs );
589    assert( msgs->info );
590    assert( choke==0 || choke==1 );
591
592    if( msgs->info->chokeChangedAt > fibrillationTime )
593    {
594        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
595    }
596    else if( msgs->info->peerIsChoked != choke )
597    {
598        msgs->info->peerIsChoked = choke;
599        if( choke )
600            cancelAllRequestsToClientExceptFast( msgs );
601        protocolSendChoke( msgs, choke );
602        msgs->info->chokeChangedAt = time( NULL );
603    }
604}
605
606/**
607***
608**/
609
610void
611tr_peerMsgsHave( tr_peermsgs * msgs,
612                 uint32_t      index )
613{
614    protocolSendHave( msgs, index );
615
616    /* since we have more pieces now, we might not be interested in this peer */
617    updateInterest( msgs );
618}
619#if 0
620static void
621sendFastSuggest( tr_peermsgs * msgs,
622                 uint32_t      pieceIndex )
623{
624    assert( msgs );
625   
626    if( tr_peerIoSupportsFEXT( msgs->io ) )
627    {
628        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
629        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
630        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
631    }
632}
633static void
634sendFastHave( tr_peermsgs * msgs, int all )
635{
636    assert( msgs );
637   
638    if( tr_peerIoSupportsFEXT( msgs->io ) )
639    {
640        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
641        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, ( all ? BT_HAVE_ALL
642                                                                : BT_HAVE_NONE ) );
643        updateInterest( msgs );
644    }
645}
646#endif
647
648static void
649sendFastReject( tr_peermsgs * msgs,
650                uint32_t      pieceIndex,
651                uint32_t      offset,
652                uint32_t      length )
653{
654    assert( msgs );
655
656    if( tr_peerIoSupportsFEXT( msgs->io ) )
657    {
658        const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
659        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
660        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_REJECT );
661        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
662        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
663        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
664        pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
665    }
666}
667
668static tr_bitfield*
669getPeerAllowedPieces( tr_peermsgs * msgs )
670{
671    if( !msgs->peerAllowedPieces && tr_peerIoSupportsFEXT( msgs->io ) )
672    {
673        msgs->peerAllowedPieces = tr_peerMgrGenerateAllowedSet(
674            MAX_FAST_ALLOWED_COUNT,
675            msgs->torrent->info.pieceCount,
676            msgs->torrent->info.hash,
677            tr_peerIoGetAddress( msgs->io, NULL ) );
678    }
679
680    return msgs->peerAllowedPieces;
681}
682
683static void
684sendFastAllowed( tr_peermsgs * msgs,
685                 uint32_t      pieceIndex)
686{
687    assert( msgs );
688   
689    if( tr_peerIoSupportsFEXT( msgs->io ) )
690    {
691        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
692        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_ALLOWED_FAST );
693        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
694        pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
695    }
696}
697
698static void
699sendFastAllowedSet( tr_peermsgs * msgs )
700{
701    tr_piece_index_t i = 0;
702
703    while (i <= msgs->torrent->info.pieceCount )
704    {
705        if ( tr_bitfieldHas( getPeerAllowedPieces( msgs ), i) )
706            sendFastAllowed( msgs, i );
707        i++;
708    }
709}
710
711static void
712maybeSendFastAllowedSet( tr_peermsgs * msgs )
713{
714    if( tr_bitfieldCountTrueBits( msgs->info->have ) <= MAX_FAST_ALLOWED_THRESHOLD )
715        sendFastAllowedSet( msgs );
716}
717
718
719/**
720***
721**/
722
723static int
724reqIsValid( const tr_peermsgs   * peer,
725            uint32_t              index,
726            uint32_t              offset,
727            uint32_t              length )
728{
729    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
730}
731
732static int
733requestIsValid( const tr_peermsgs * peer, const struct peer_request * req )
734{
735    return reqIsValid( peer, req->index, req->offset, req->length );
736}
737
738static void
739tr_peerMsgsCancel( tr_peermsgs * msgs,
740                   uint32_t      pieceIndex )
741{
742    uint16_t i;
743    struct request_list tmp = REQUEST_LIST_INIT;
744    struct request_list * src;
745
746    src = &msgs->clientWillAskFor;
747    for( i=0; i<src->count; ++i )
748        if( src->requests[i].index != pieceIndex )
749            reqListAppend( &tmp, src->requests + i );
750
751    /* swap */
752    reqListClear( &msgs->clientWillAskFor );
753    msgs->clientWillAskFor = tmp;
754    tmp = REQUEST_LIST_INIT;
755
756    src = &msgs->clientAskedFor;
757    for( i=0; i<src->count; ++i )
758        if( src->requests[i].index == pieceIndex )
759            protocolSendCancel( msgs, src->requests + i );
760        else
761            reqListAppend( &tmp, src->requests + i );
762
763    /* swap */
764    reqListClear( &msgs->clientAskedFor );
765    msgs->clientAskedFor = tmp;
766    tmp = REQUEST_LIST_INIT;
767
768    fireCancelledReq( msgs, pieceIndex );
769}
770
771static void
772expireOldRequests( tr_peermsgs * msgs )
773{
774    int i;
775    time_t oldestAllowed;
776    struct request_list tmp = REQUEST_LIST_INIT;
777
778    /* cancel requests that have been queued for too long */
779    oldestAllowed = time( NULL ) - QUEUED_REQUEST_TTL_SECS;
780    reqListCopy( &tmp, &msgs->clientWillAskFor );
781    for( i=0; i<tmp.count; ++i ) {
782        const struct peer_request * req = &tmp.requests[i];
783        if( req->time_requested < oldestAllowed )
784            tr_peerMsgsCancel( msgs, req->index );
785    }
786    reqListClear( &tmp );
787
788    /* cancel requests that were sent too long ago */
789    oldestAllowed = time( NULL ) - SENT_REQUEST_TTL_SECS;
790    reqListCopy( &tmp, &msgs->clientAskedFor );
791    for( i=0; i<tmp.count; ++i ) {
792        const struct peer_request * req = &tmp.requests[i];
793        if( req->time_requested < oldestAllowed )
794            tr_peerMsgsCancel( msgs, req->index );
795    }
796    reqListClear( &tmp );
797}
798
799static void
800pumpRequestQueue( tr_peermsgs * msgs )
801{
802    const int max = msgs->maxActiveRequests;
803    const int min = msgs->minActiveRequests;
804    const time_t now = time( NULL );
805    int sent = 0;
806    int count = msgs->clientAskedFor.count;
807    struct peer_request req;
808
809    if( count > min )
810        return;
811    if( msgs->info->clientIsChoked )
812        return;
813
814    while( ( count < max ) && !reqListPop( &msgs->clientWillAskFor, &req ) )
815    {
816        assert( requestIsValid( msgs, &req ) );
817        assert( tr_bitfieldHas( msgs->info->have, req.index ) );
818
819        protocolSendRequest( msgs, &req );
820        req.time_requested = now;
821        reqListAppend( &msgs->clientAskedFor, &req );
822
823        ++count;
824        ++sent;
825    }
826
827    if( sent )
828        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
829                sent,
830                msgs->clientAskedFor.count,
831                msgs->clientWillAskFor.count );
832
833    if( count < max )
834        fireNeedReq( msgs );
835}
836
837static int
838peerPulse( void * vmsgs );
839
840static int
841requestQueueIsFull( const tr_peermsgs * msgs )
842{
843    const int req_max = msgs->maxActiveRequests;
844    return msgs->clientWillAskFor.count >= req_max;
845}
846
847int
848tr_peerMsgsAddRequest( tr_peermsgs       * msgs,
849                       tr_piece_index_t    piece )
850{
851    struct peer_request req;
852
853    assert( msgs );
854    assert( msgs->torrent );
855    assert( piece < msgs->torrent->info.pieceCount );
856
857    /**
858    ***  Reasons to decline the request
859    **/
860
861    /* don't send requests to choked clients */
862    if( msgs->info->clientIsChoked ) {
863        dbgmsg( msgs, "declining request because they're choking us" );
864        return TR_ADDREQ_CLIENT_CHOKED;
865    }
866
867    /* peer doesn't have this piece */
868    if( !tr_bitfieldHas( msgs->info->have, piece ) )
869        return TR_ADDREQ_MISSING;
870
871    /* peer's queue is full */
872    if( requestQueueIsFull( msgs ) ) {
873        dbgmsg( msgs, "declining request because we're full" );
874        return TR_ADDREQ_FULL;
875    }
876
877    /* have we already asked for this piece? */
878    if( reqListHasPiece( &msgs->clientAskedFor, piece ) || 
879        reqListHasPiece( &msgs->clientWillAskFor, piece ) ) {
880        dbgmsg( msgs, "declining because it's a duplicate" );
881        return TR_ADDREQ_DUPLICATE;
882    }
883
884    /**
885    ***  Accept this request
886    **/
887
888    dbgmsg( msgs, "added req for piece %lu", (unsigned long)piece );
889    req.time_requested = time( NULL );
890    reqListAppendPiece( msgs->torrent, &msgs->clientWillAskFor, piece );
891    return TR_ADDREQ_OK;
892}
893
894static void
895cancelAllRequestsToPeer( tr_peermsgs * msgs )
896{
897    int i;
898    struct request_list a = msgs->clientWillAskFor;
899    struct request_list b = msgs->clientAskedFor;
900
901    msgs->clientAskedFor = REQUEST_LIST_INIT;
902    msgs->clientWillAskFor = REQUEST_LIST_INIT;
903
904    for( i=0; i<a.count; ++i )
905        fireCancelledReq( msgs, a.requests[i].index );
906
907    for( i=0; i<b.count; ++i ) {
908        fireCancelledReq( msgs, b.requests[i].index );
909        protocolSendCancel( msgs, &b.requests[i] );
910    }
911
912    reqListClear( &a );
913    reqListClear( &b );
914}
915
916/**
917***
918**/
919
920static void
921sendLtepHandshake( tr_peermsgs * msgs )
922{
923    tr_benc val, *m;
924    char * buf;
925    int len;
926    int pex;
927    struct evbuffer * outbuf;
928
929    if( msgs->clientSentLtepHandshake )
930        return;
931
932    outbuf = evbuffer_new( );
933    dbgmsg( msgs, "sending an ltep handshake" );
934    msgs->clientSentLtepHandshake = 1;
935
936    /* decide if we want to advertise pex support */
937    if( !tr_torrentAllowsPex( msgs->torrent ) )
938        pex = 0;
939    else if( msgs->peerSentLtepHandshake )
940        pex = msgs->peerSupportsPex ? 1 : 0;
941    else
942        pex = 1;
943
944    tr_bencInitDict( &val, 4 );
945    tr_bencDictAddInt( &val, "e", msgs->handle->encryptionMode != TR_CLEAR_PREFERRED );
946    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->handle ) );
947    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
948    m  = tr_bencDictAddDict( &val, "m", 1 );
949    if( pex )
950        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
951    buf = tr_bencSave( &val, &len );
952
953    tr_peerIoWriteUint32( msgs->io, outbuf, 2*sizeof(uint8_t) + len );
954    tr_peerIoWriteUint8 ( msgs->io, outbuf, BT_LTEP );
955    tr_peerIoWriteUint8 ( msgs->io, outbuf, LTEP_HANDSHAKE );
956    tr_peerIoWriteBytes ( msgs->io, outbuf, buf, len );
957
958    tr_peerIoWriteBuf( msgs->io, outbuf );
959
960    dbgmsg( msgs, "here is the ltep handshake we sent [%*.*s]", len, len, buf );
961
962    /* cleanup */
963    tr_bencFree( &val );
964    tr_free( buf );
965    evbuffer_free( outbuf );
966}
967
968static void
969parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
970{
971    tr_benc val, * sub;
972    uint8_t * tmp = tr_new( uint8_t, len );
973
974    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
975    msgs->peerSentLtepHandshake = 1;
976
977    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
978        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
979        tr_free( tmp );
980        return;
981    }
982
983    dbgmsg( msgs, "here is the ltep handshake we got [%*.*s]", len, len, tmp );
984
985    /* does the peer prefer encrypted connections? */
986    if(( sub = tr_bencDictFindType( &val, "e", TYPE_INT )))
987        msgs->info->encryption_preference = sub->val.i
988                                      ? ENCRYPTION_PREFERENCE_YES
989                                      : ENCRYPTION_PREFERENCE_NO;
990
991    /* check supported messages for utorrent pex */
992    msgs->peerSupportsPex = 0;
993    if(( sub = tr_bencDictFindType( &val, "m", TYPE_DICT ))) {
994        if(( sub = tr_bencDictFindType( sub, "ut_pex", TYPE_INT ))) {
995            msgs->ut_pex_id = (uint8_t) sub->val.i;
996            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
997            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
998        }
999    }
1000
1001    /* get peer's listening port */
1002    if(( sub = tr_bencDictFindType( &val, "p", TYPE_INT ))) {
1003        msgs->info->port = htons( (uint16_t)sub->val.i );
1004        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
1005    }
1006
1007    tr_bencFree( &val );
1008    tr_free( tmp );
1009}
1010
1011static void
1012parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1013{
1014    int loaded = 0;
1015    uint8_t * tmp = tr_new( uint8_t, msglen );
1016    tr_benc val, *added;
1017    const tr_torrent * tor = msgs->torrent;
1018    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
1019
1020    if( tr_torrentAllowsPex( tor )
1021        && (( loaded = !tr_bencLoad( tmp, msglen, &val, NULL )))
1022        && (( added = tr_bencDictFindType( &val, "added", TYPE_STR ))))
1023    {
1024        const uint8_t * added_f = NULL;
1025        tr_pex * pex;
1026        size_t i, n;
1027        size_t added_f_len = 0;
1028        tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1029        pex = tr_peerMgrCompactToPex( added->val.s.s, added->val.s.i, added_f, added_f_len, &n );
1030        for( i=0; i<n; ++i )
1031            tr_peerMgrAddPex( msgs->handle->peerMgr, tor->info.hash,
1032                              TR_PEER_FROM_PEX, pex+i );
1033        tr_free( pex );
1034    }
1035
1036    if( loaded )
1037        tr_bencFree( &val );
1038    tr_free( tmp );
1039}
1040
1041static void
1042sendPex( tr_peermsgs * msgs );
1043
1044static void
1045parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1046{
1047    uint8_t ltep_msgid;
1048
1049    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
1050    msglen--;
1051
1052    if( ltep_msgid == LTEP_HANDSHAKE )
1053    {
1054        dbgmsg( msgs, "got ltep handshake" );
1055        parseLtepHandshake( msgs, msglen, inbuf );
1056        if( tr_peerIoSupportsLTEP( msgs->io ) )
1057        {
1058            sendLtepHandshake( msgs );
1059            sendPex( msgs );
1060        }
1061    }
1062    else if( ltep_msgid == TR_LTEP_PEX )
1063    {
1064        dbgmsg( msgs, "got ut pex" );
1065        msgs->peerSupportsPex = 1;
1066        parseUtPex( msgs, msglen, inbuf );
1067    }
1068    else
1069    {
1070        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1071        evbuffer_drain( inbuf, msglen );
1072    }
1073}
1074
1075static int
1076readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1077{
1078    uint32_t len;
1079
1080    if( inlen < sizeof(len) )
1081        return READ_MORE;
1082
1083    tr_peerIoReadUint32( msgs->io, inbuf, &len );
1084
1085    if( len == 0 ) /* peer sent us a keepalive message */
1086        dbgmsg( msgs, "got KeepAlive" );
1087    else {
1088        msgs->incoming.length = len;
1089        msgs->state = AWAITING_BT_ID;
1090    }
1091
1092    return READ_AGAIN;
1093}
1094
1095static int
1096readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen );
1097
1098static int
1099readBtId( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1100{
1101    uint8_t id;
1102
1103    if( inlen < sizeof(uint8_t) )
1104        return READ_MORE;
1105
1106    tr_peerIoReadUint8( msgs->io, inbuf, &id );
1107    msgs->incoming.id = id;
1108
1109    if( id==BT_PIECE )
1110    {
1111        msgs->state = AWAITING_BT_PIECE;
1112        return READ_AGAIN;
1113    }
1114    else if( msgs->incoming.length != 1 )
1115    {
1116        msgs->state = AWAITING_BT_MESSAGE;
1117        return READ_AGAIN;
1118    }
1119    else return readBtMessage( msgs, inbuf, inlen-1 );
1120}
1121
1122static void
1123updatePeerProgress( tr_peermsgs * msgs )
1124{
1125    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have )
1126                         / (float)msgs->torrent->info.pieceCount;
1127    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
1128    updateInterest( msgs );
1129    firePeerProgress( msgs );
1130}
1131
1132static int
1133clientCanSendFastBlock( const tr_peermsgs * msgs UNUSED )
1134{
1135    /* don't send a fast piece if peer has MAX_FAST_ALLOWED_THRESHOLD pieces */
1136    if( tr_bitfieldCountTrueBits( msgs->info->have ) > MAX_FAST_ALLOWED_THRESHOLD )
1137        return FALSE;
1138   
1139    /* ...or if we don't have ourself enough pieces */
1140    if( tr_bitfieldCountTrueBits( tr_cpPieceBitfield( msgs->torrent->completion ) ) < MAX_FAST_ALLOWED_THRESHOLD )
1141        return FALSE;
1142
1143    /* Maybe a bandwidth limit ? */
1144    return TRUE;
1145}
1146
1147static void
1148peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
1149{
1150    const int reqIsValid = requestIsValid( msgs, req );
1151    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
1152    const int peerIsChoked = msgs->info->peerIsChoked;
1153    const int peerIsFast = tr_peerIoSupportsFEXT( msgs->io );
1154    const int pieceIsFast = reqIsValid && tr_bitfieldHas( getPeerAllowedPieces( msgs ), req->index );
1155    const int canSendFast = clientCanSendFastBlock( msgs );
1156
1157    if( !reqIsValid ) /* bad request */
1158    {
1159        dbgmsg( msgs, "rejecting an invalid request." );
1160        sendFastReject( msgs, req->index, req->offset, req->length );
1161    }
1162    else if( !clientHasPiece ) /* we don't have it */
1163    {
1164        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1165        sendFastReject( msgs, req->index, req->offset, req->length );
1166    }
1167    else if( peerIsChoked && !peerIsFast ) /* doesn't he know he's choked? */
1168    {
1169        tr_peerMsgsSetChoke( msgs, 1 );
1170        sendFastReject( msgs, req->index, req->offset, req->length );
1171    }
1172    else if( peerIsChoked && peerIsFast && ( !pieceIsFast || !canSendFast ) )
1173    {
1174        sendFastReject( msgs, req->index, req->offset, req->length );
1175    }
1176    else /* YAY */
1177    {
1178        if( peerIsFast && pieceIsFast )
1179            reqListAppend( &msgs->peerAskedForFast, req );
1180        else
1181            reqListAppend( &msgs->peerAskedFor, req );
1182    }
1183}
1184
1185static int
1186messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1187{
1188    switch( id )
1189    {
1190        case BT_CHOKE:
1191        case BT_UNCHOKE:
1192        case BT_INTERESTED:
1193        case BT_NOT_INTERESTED:
1194        case BT_HAVE_ALL:
1195        case BT_HAVE_NONE:
1196            return len==1;
1197
1198        case BT_HAVE:
1199        case BT_SUGGEST:
1200        case BT_ALLOWED_FAST:
1201            return len==5;
1202
1203        case BT_BITFIELD:
1204            return len == (msg->torrent->info.pieceCount+7u)/8u + 1u;
1205       
1206        case BT_REQUEST:
1207        case BT_CANCEL:
1208        case BT_REJECT:
1209            return len==13;
1210
1211        case BT_PIECE:
1212            return len>9 && len<=16393;
1213
1214        case BT_PORT:
1215            return len==3;
1216
1217        case BT_LTEP:
1218            return len >= 2;
1219
1220        default:
1221            return FALSE;
1222    }
1223}
1224
1225static int
1226clientGotBlock( tr_peermsgs * msgs, const uint8_t * block, const struct peer_request * req );
1227
1228static void
1229clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1230{
1231    msgs->info->pieceDataActivityDate = time( NULL );
1232    tr_rcTransferred( msgs->info->rcToClient, byteCount );
1233    fireClientGotData( msgs, byteCount );
1234}
1235
1236static int
1237readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1238{
1239    struct peer_request * req = &msgs->incoming.blockReq;
1240    assert( EVBUFFER_LENGTH(inbuf) >= inlen );
1241    dbgmsg( msgs, "In readBtPiece" );
1242
1243    if( !req->length )
1244    {
1245        if( inlen < 8 )
1246            return READ_MORE;
1247
1248        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
1249        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
1250        req->length = msgs->incoming.length - 9;
1251        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1252        return READ_AGAIN;
1253    }
1254    else
1255    {
1256        int err;
1257
1258        /* read in another chunk of data */
1259        const size_t nLeft = req->length - EVBUFFER_LENGTH(msgs->incoming.block);
1260        size_t n = MIN( nLeft, inlen );
1261        uint8_t * buf = tr_new( uint8_t, n );
1262        assert( EVBUFFER_LENGTH(inbuf) >= n );
1263        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
1264        evbuffer_add( msgs->incoming.block, buf, n );
1265        clientGotBytes( msgs, n );
1266        tr_free( buf );
1267        dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
1268               (int)n, req->index, req->offset, req->length,
1269               (int)( req->length - EVBUFFER_LENGTH(msgs->incoming.block) ) );
1270        if( EVBUFFER_LENGTH(msgs->incoming.block) < req->length )
1271            return READ_MORE;
1272
1273        /* we've got the whole block ... process it */
1274        err = clientGotBlock( msgs, EVBUFFER_DATA(msgs->incoming.block), req );
1275
1276        /* cleanup */
1277        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH(msgs->incoming.block) );
1278        req->length = 0;
1279        msgs->state = AWAITING_BT_LENGTH;
1280        if( !err )
1281            return READ_AGAIN;
1282        else {
1283            fireError( msgs, err );
1284            return READ_DONE;
1285        }
1286    }
1287}
1288
1289static int
1290readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1291{
1292    uint32_t ui32;
1293    uint32_t msglen = msgs->incoming.length;
1294    const uint8_t id = msgs->incoming.id;
1295    const size_t startBufLen = EVBUFFER_LENGTH( inbuf );
1296
1297    --msglen; /* id length */
1298
1299    if( inlen < msglen )
1300        return READ_MORE;
1301
1302    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id, (int)msglen, (int)inlen );
1303
1304    if( !messageLengthIsCorrect( msgs, id, msglen+1 ) )
1305    {
1306        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1307        fireError( msgs, TR_ERROR );
1308        return READ_DONE;
1309    }
1310
1311    switch( id )
1312    {
1313        case BT_CHOKE:
1314            dbgmsg( msgs, "got Choke" );
1315            msgs->info->clientIsChoked = 1;
1316            cancelAllRequestsToPeer( msgs );
1317            cancelAllRequestsToClientExceptFast( msgs );
1318            break;
1319
1320        case BT_UNCHOKE:
1321            dbgmsg( msgs, "got Unchoke" );
1322            msgs->info->clientIsChoked = 0;
1323            fireNeedReq( msgs );
1324            break;
1325
1326        case BT_INTERESTED:
1327            dbgmsg( msgs, "got Interested" );
1328            msgs->info->peerIsInterested = 1;
1329            break;
1330
1331        case BT_NOT_INTERESTED:
1332            dbgmsg( msgs, "got Not Interested" );
1333            msgs->info->peerIsInterested = 0;
1334            break;
1335           
1336        case BT_HAVE:
1337            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1338            dbgmsg( msgs, "got Have: %u", ui32 );
1339            if( tr_bitfieldAdd( msgs->info->have, ui32 ) )
1340                fireError( msgs, TR_ERROR_PEER_MESSAGE );
1341            updatePeerProgress( msgs );
1342            tr_rcTransferred( msgs->torrent->swarmSpeed, msgs->torrent->info.pieceSize );
1343            break;
1344
1345        case BT_BITFIELD: {
1346            dbgmsg( msgs, "got a bitfield" );
1347            msgs->peerSentBitfield = 1;
1348            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
1349            updatePeerProgress( msgs );
1350            maybeSendFastAllowedSet( msgs );
1351            fireNeedReq( msgs );
1352            break;
1353        }
1354
1355        case BT_REQUEST: {
1356            struct peer_request r;
1357            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1358            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1359            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1360            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1361            peerMadeRequest( msgs, &r );
1362            break;
1363        }
1364
1365        case BT_CANCEL: {
1366            struct peer_request r;
1367            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1368            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1369            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1370            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1371            reqListRemove( &msgs->peerAskedForFast, &r );
1372            reqListRemove( &msgs->peerAskedFor, &r );
1373            break;
1374        }
1375
1376        case BT_PIECE:
1377            assert( 0 ); /* handled elsewhere! */
1378            break;
1379       
1380        case BT_PORT:
1381            dbgmsg( msgs, "Got a BT_PORT" );
1382            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1383            break;
1384           
1385        case BT_SUGGEST: {
1386            dbgmsg( msgs, "Got a BT_SUGGEST" );
1387            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1388            /* we don't do anything with this yet */
1389            break;
1390        }
1391           
1392        case BT_HAVE_ALL:
1393            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1394            tr_bitfieldAddRange( msgs->info->have, 0, msgs->torrent->info.pieceCount );
1395            updatePeerProgress( msgs );
1396            maybeSendFastAllowedSet( msgs );
1397            break;
1398       
1399       
1400        case BT_HAVE_NONE:
1401            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1402            tr_bitfieldClear( msgs->info->have );
1403            updatePeerProgress( msgs );
1404            maybeSendFastAllowedSet( msgs );
1405            break;
1406       
1407        case BT_REJECT: {
1408            struct peer_request r;
1409            dbgmsg( msgs, "Got a BT_REJECT" );
1410            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1411            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1412            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1413            reqListRemove( &msgs->clientAskedFor, &r );
1414            break;
1415        }
1416
1417        case BT_ALLOWED_FAST: {
1418            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1419            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1420            /* we don't do anything with this yet */
1421            break;
1422        }
1423
1424        case BT_LTEP:
1425            dbgmsg( msgs, "Got a BT_LTEP" );
1426            parseLtep( msgs, msglen, inbuf );
1427            break;
1428
1429        default:
1430            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1431            tr_peerIoDrain( msgs->io, inbuf, msglen );
1432            break;
1433    }
1434
1435    assert( msglen + 1 == msgs->incoming.length );
1436    assert( EVBUFFER_LENGTH(inbuf) == startBufLen - msglen );
1437
1438    msgs->state = AWAITING_BT_LENGTH;
1439    return READ_AGAIN;
1440}
1441
1442static void
1443peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1444{
1445    msgs->info->pieceDataActivityDate = time( NULL );
1446    tr_rcTransferred( msgs->info->rcToPeer, byteCount );
1447    firePeerGotData( msgs, byteCount );
1448}
1449
1450static size_t
1451getDownloadMax( const tr_peermsgs * msgs )
1452{
1453    static const size_t maxval = ~0;
1454    const tr_torrent * tor = msgs->torrent;
1455
1456    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1457        return tor->handle->useDownloadLimit
1458            ? tr_rcBytesLeft( tor->handle->download ) : maxval;
1459
1460    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
1461        return tr_rcBytesLeft( tor->download );
1462
1463    return maxval;
1464}
1465
1466static void
1467decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount )
1468{
1469    tr_torrent * tor = msgs->torrent;
1470    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1471}
1472
1473static void
1474clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1475{
1476    decrementDownloadedCount( msgs, req->length );
1477}
1478
1479static void
1480addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1481{
1482    if( !msgs->info->blame )
1483         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1484    tr_bitfieldAdd( msgs->info->blame, index );
1485}
1486
1487static tr_errno
1488clientGotBlock( tr_peermsgs                * msgs,
1489                const uint8_t              * data,
1490                const struct peer_request  * req )
1491{
1492    int err;
1493    tr_torrent * tor = msgs->torrent;
1494    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1495
1496    assert( msgs );
1497    assert( req );
1498
1499    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) )
1500    {
1501        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1502                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1503        return TR_ERROR;
1504    }
1505
1506    /* save the block */
1507    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1508
1509    /**
1510    *** Remove the block from our `we asked for this' list
1511    **/
1512
1513    if( reqListRemove( &msgs->clientAskedFor, req ) )
1514    {
1515        clientGotUnwantedBlock( msgs, req );
1516        dbgmsg( msgs, "we didn't ask for this message..." );
1517        return 0;
1518    }
1519
1520    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1521                  msgs->clientAskedFor.count );
1522
1523    /**
1524    *** Error checks
1525    **/
1526
1527    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1528        dbgmsg( msgs, "we have this block already..." );
1529        clientGotUnwantedBlock( msgs, req );
1530        return 0;
1531    }
1532
1533    /**
1534    ***  Save the block
1535    **/
1536
1537    msgs->info->peerSentPieceDataAt = time( NULL );
1538    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1539        return err;
1540
1541    addPeerToBlamefield( msgs, req->index );
1542    fireGotBlock( msgs, req );
1543    return 0;
1544}
1545
1546static void
1547didWrite( struct bufferevent * evin UNUSED, void * vmsgs )
1548{
1549    peerPulse( vmsgs );
1550}
1551
1552static ReadState
1553canRead( struct bufferevent * evin, void * vmsgs )
1554{
1555    ReadState ret;
1556    tr_peermsgs * msgs = vmsgs;
1557    struct evbuffer * in = EVBUFFER_INPUT ( evin );
1558    const size_t inlen = EVBUFFER_LENGTH( in );
1559
1560    if( !inlen )
1561    {
1562        ret = READ_DONE;
1563    }
1564    else if( msgs->state == AWAITING_BT_PIECE )
1565    {
1566        const size_t downloadMax = getDownloadMax( msgs );
1567        const size_t n = MIN( inlen, downloadMax );
1568        ret = n ? readBtPiece( msgs, in, n ) : READ_DONE;
1569    }
1570    else switch( msgs->state )
1571    {
1572        case AWAITING_BT_LENGTH:  ret = readBtLength ( msgs, in, inlen ); break;
1573        case AWAITING_BT_ID:      ret = readBtId     ( msgs, in, inlen ); break;
1574        case AWAITING_BT_MESSAGE: ret = readBtMessage( msgs, in, inlen ); break;
1575        default:                  assert( 0 );
1576    }
1577
1578    return ret;
1579}
1580
1581static void
1582sendKeepalive( tr_peermsgs * msgs )
1583{
1584    dbgmsg( msgs, "sending a keepalive message" );
1585    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1586    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1587}
1588
1589/**
1590***
1591**/
1592
1593static size_t
1594getUploadMax( const tr_peermsgs * msgs )
1595{
1596    static const size_t maxval = ~0;
1597    const tr_torrent * tor = msgs->torrent;
1598    int speedLeft;
1599    int bufLeft;
1600
1601    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1602        speedLeft = tor->handle->useUploadLimit ? tr_rcBytesLeft( tor->handle->upload ) : maxval;
1603    else if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
1604        speedLeft = tr_rcBytesLeft( tor->upload );
1605    else
1606        speedLeft = INT_MAX;
1607
1608    /* this basically says the outbuf shouldn't have more than one block
1609     * queued up in it... blocksize, +13 for the size of the BT protocol's
1610     * block message overhead */
1611    bufLeft = tor->blockSize + 13 - tr_peerIoWriteBytesWaiting( msgs->io );
1612    return MIN( speedLeft, bufLeft );
1613}
1614
1615static int
1616getMaxBlocksFromPeerSoon( const tr_peermsgs * peer )
1617{
1618    const double seconds = 30;
1619    const double bytesPerSecond = peer->info->rateToClient * 1024;
1620    const double totalBytes = bytesPerSecond * seconds;
1621    const int blockCount = totalBytes / peer->torrent->blockSize;
1622    /*fprintf( stderr, "rate %f -- blockCount %d\n", peer->info->rateToClient, blockCount );*/
1623    return blockCount;
1624}
1625
1626static int
1627ratePulse( void * vpeer )
1628{
1629    tr_peermsgs * peer = vpeer;
1630    peer->info->rateToClient = tr_rcRate( peer->info->rcToClient );
1631    peer->info->rateToPeer = tr_rcRate( peer->info->rcToPeer );
1632    peer->minActiveRequests = 4;
1633    peer->maxActiveRequests = peer->minActiveRequests + getMaxBlocksFromPeerSoon( peer );
1634    return TRUE;
1635}
1636
1637static tr_errno
1638popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
1639{
1640    if( !reqListPop( &msgs->peerAskedForFast, setme ) )
1641        return 0;
1642    if( !reqListPop( &msgs->peerAskedFor, setme ) )
1643        return 0;
1644
1645    return TR_ERROR;
1646}
1647
1648static int
1649peerPulse( void * vmsgs )
1650{
1651    const time_t now = time( NULL );
1652    tr_peermsgs * msgs = vmsgs;
1653
1654    tr_peerIoTryRead( msgs->io );
1655    pumpRequestQueue( msgs );
1656    expireOldRequests( msgs );
1657
1658    if( msgs->sendingBlock )
1659    {
1660        const size_t uploadMax = getUploadMax( msgs );
1661        size_t len = EVBUFFER_LENGTH( msgs->outBlock );
1662        const size_t outlen = MIN( len, uploadMax );
1663
1664        assert( len );
1665
1666        if( outlen )
1667        {
1668            tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen );
1669            evbuffer_drain( msgs->outBlock, outlen );
1670            peerGotBytes( msgs, outlen );
1671
1672            len -= outlen;
1673            msgs->clientSentAnythingAt = now;
1674            msgs->sendingBlock = len != 0;
1675
1676            dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen, (int)len );
1677        }
1678        else dbgmsg( msgs, "stalled writing block... uploadMax %lu, outlen %lu", uploadMax, outlen );
1679    }
1680
1681    if( !msgs->sendingBlock )
1682    {
1683        struct peer_request req;
1684        int haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1685
1686        if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1687        {
1688            dbgmsg( msgs, "started an outMessages batch (length is %d)", (int)EVBUFFER_LENGTH( msgs->outMessages ) );
1689            msgs->outMessagesBatchedAt = now;
1690        }
1691        else if( haveMessages
1692                 && ( ( now - msgs->outMessagesBatchedAt ) > msgs->outMessagesBatchPeriod ) )
1693        {
1694            dbgmsg( msgs, "flushing outMessages... (length is %d)", (int)EVBUFFER_LENGTH( msgs->outMessages ) );
1695            tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1696            msgs->clientSentAnythingAt = now;
1697            msgs->outMessagesBatchedAt = 0;
1698            msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1699        }
1700        else if( !EVBUFFER_LENGTH( msgs->outBlock )
1701            && !popNextRequest( msgs, &req )
1702            && requestIsValid( msgs, &req )
1703            && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) )
1704        {
1705            uint8_t * buf = tr_new( uint8_t, req.length );
1706
1707            if( !tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf ) )
1708            {
1709                tr_peerIo * io = msgs->io;
1710                struct evbuffer * out = msgs->outBlock;
1711
1712                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1713                tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 2*sizeof(uint32_t) + req.length );
1714                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1715                tr_peerIoWriteUint32( io, out, req.index );
1716                tr_peerIoWriteUint32( io, out, req.offset );
1717                tr_peerIoWriteBytes ( io, out, buf, req.length );
1718                msgs->sendingBlock = 1;
1719            }
1720
1721            tr_free( buf );
1722        }
1723        else if(    ( !haveMessages )
1724                 && ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
1725        {
1726            sendKeepalive( msgs );
1727        }
1728    }
1729
1730    return TRUE; /* loop forever */
1731}
1732
1733static void
1734gotError( struct bufferevent * evbuf UNUSED, short what, void * vmsgs )
1735{
1736    if( what & EVBUFFER_TIMEOUT )
1737        dbgmsg( vmsgs, "libevent got a timeout, what=%hd, secs=%d", what, evbuf->timeout_read );
1738    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1739        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1740                what, errno, tr_strerror(errno) );
1741    fireError( vmsgs, TR_ERROR );
1742}
1743
1744static void
1745sendBitfield( tr_peermsgs * msgs )
1746{
1747    const tr_bitfield * bitfield = tr_cpPieceBitfield( msgs->torrent->completion );
1748    struct evbuffer * out = msgs->outMessages;
1749
1750    dbgmsg( msgs, "sending peer a bitfield message" );
1751    tr_peerIoWriteUint32( msgs->io, out, sizeof(uint8_t) + bitfield->byteCount );
1752    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1753    tr_peerIoWriteBytes ( msgs->io, out, bitfield->bits, bitfield->byteCount );
1754    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1755}
1756
1757/**
1758***
1759**/
1760
1761/* some peers give us error messages if we send
1762   more than this many peers in a single pex message
1763   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1764#define MAX_PEX_ADDED 50
1765#define MAX_PEX_DROPPED 50
1766
1767typedef struct
1768{
1769    tr_pex * added;
1770    tr_pex * dropped;
1771    tr_pex * elements;
1772    int addedCount;
1773    int droppedCount;
1774    int elementCount;
1775}
1776PexDiffs;
1777
1778static void
1779pexAddedCb( void * vpex, void * userData )
1780{
1781    PexDiffs * diffs = userData;
1782    tr_pex * pex = vpex;
1783    if( diffs->addedCount < MAX_PEX_ADDED )
1784    {
1785        diffs->added[diffs->addedCount++] = *pex;
1786        diffs->elements[diffs->elementCount++] = *pex;
1787    }
1788}
1789
1790static void
1791pexDroppedCb( void * vpex, void * userData )
1792{
1793    PexDiffs * diffs = userData;
1794    tr_pex * pex = vpex;
1795    if( diffs->droppedCount < MAX_PEX_DROPPED )
1796    {
1797        diffs->dropped[diffs->droppedCount++] = *pex;
1798    }
1799}
1800
1801static void
1802pexElementCb( void * vpex, void * userData )
1803{
1804    PexDiffs * diffs = userData;
1805    tr_pex * pex = vpex;
1806    diffs->elements[diffs->elementCount++] = *pex;
1807}
1808
1809static void
1810sendPex( tr_peermsgs * msgs )
1811{
1812    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
1813    {
1814        int i;
1815        tr_pex * newPex = NULL;
1816        const int newCount = tr_peerMgrGetPeers( msgs->handle->peerMgr, msgs->torrent->info.hash, &newPex );
1817        PexDiffs diffs;
1818        tr_benc val, *added, *dropped;
1819        uint8_t *tmp, *walk;
1820        char * benc;
1821        int bencLen;
1822
1823        /* build the diffs */
1824        diffs.added = tr_new( tr_pex, newCount );
1825        diffs.addedCount = 0;
1826        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1827        diffs.droppedCount = 0;
1828        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1829        diffs.elementCount = 0;
1830        tr_set_compare( msgs->pex, msgs->pexCount,
1831                        newPex, newCount,
1832                        tr_pexCompare, sizeof(tr_pex),
1833                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
1834        dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1835
1836        /* update peer */
1837        tr_free( msgs->pex );
1838        msgs->pex = diffs.elements;
1839        msgs->pexCount = diffs.elementCount;
1840
1841        /* build the pex payload */
1842        tr_bencInitDict( &val, 3 );
1843
1844        /* "added" */
1845        added = tr_bencDictAdd( &val, "added" );
1846        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1847        for( i=0; i<diffs.addedCount; ++i ) {
1848            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1849            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1850        }
1851        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1852        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
1853
1854        /* "added.f" */
1855        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1856        for( i=0; i<diffs.addedCount; ++i )
1857            *walk++ = diffs.added[i].flags;
1858        assert( ( walk - tmp ) == diffs.addedCount );
1859        tr_bencDictAddRaw( &val, "added.f", tmp, walk-tmp );
1860        tr_free( tmp );
1861
1862        /* "dropped" */
1863        dropped = tr_bencDictAdd( &val, "dropped" );
1864        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1865        for( i=0; i<diffs.droppedCount; ++i ) {
1866            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1867            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1868        }
1869        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1870        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
1871
1872        /* write the pex message */
1873        benc = tr_bencSave( &val, &bencLen );
1874        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 2*sizeof(uint8_t) + bencLen );
1875        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_LTEP );
1876        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, msgs->ut_pex_id );
1877        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, benc, bencLen );
1878        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1879
1880        /* cleanup */
1881        tr_free( benc );
1882        tr_bencFree( &val );
1883        tr_free( diffs.added );
1884        tr_free( diffs.dropped );
1885        tr_free( newPex );
1886
1887        msgs->clientSentPexAt = time( NULL );
1888    }
1889}
1890
1891static int
1892pexPulse( void * vpeer )
1893{
1894    sendPex( vpeer );
1895    return TRUE;
1896}
1897
1898/**
1899***
1900**/
1901
1902tr_peermsgs*
1903tr_peerMsgsNew( struct tr_torrent * torrent,
1904                struct tr_peer    * info,
1905                tr_delivery_func    func,
1906                void              * userData,
1907                tr_publisher_tag  * setme )
1908{
1909    tr_peermsgs * m;
1910
1911    assert( info );
1912    assert( info->io );
1913
1914    m = tr_new0( tr_peermsgs, 1 );
1915    m->publisher = tr_publisherNew( );
1916    m->info = info;
1917    m->handle = torrent->handle;
1918    m->torrent = torrent;
1919    m->io = info->io;
1920    m->info->clientIsChoked = 1;
1921    m->info->peerIsChoked = 1;
1922    m->info->clientIsInterested = 0;
1923    m->info->peerIsInterested = 0;
1924    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1925    m->state = AWAITING_BT_LENGTH;
1926    m->pulseTimer = tr_timerNew( m->handle, peerPulse, m, PEER_PULSE_INTERVAL );
1927    m->rateTimer = tr_timerNew( m->handle, ratePulse, m, RATE_PULSE_INTERVAL );
1928    m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL );
1929    m->outMessages = evbuffer_new( );
1930    m->outMessagesBatchedAt = 0;
1931    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1932    m->incoming.block = evbuffer_new( );
1933    m->outBlock = evbuffer_new( );
1934    m->peerAllowedPieces = NULL;
1935    m->peerAskedFor = REQUEST_LIST_INIT;
1936    m->peerAskedForFast = REQUEST_LIST_INIT;
1937    m->clientAskedFor = REQUEST_LIST_INIT;
1938    m->clientWillAskFor = REQUEST_LIST_INIT;
1939    *setme = tr_publisherSubscribe( m->publisher, func, userData );
1940
1941    if ( tr_peerIoSupportsLTEP( m->io ) )
1942        sendLtepHandshake( m );
1943
1944    sendBitfield( m );
1945   
1946    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of inactivity */
1947    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
1948    ratePulse( m );
1949
1950    return m;
1951}
1952
1953void
1954tr_peerMsgsFree( tr_peermsgs* msgs )
1955{
1956    if( msgs )
1957    {
1958        tr_timerFree( &msgs->pulseTimer );
1959        tr_timerFree( &msgs->rateTimer );
1960        tr_timerFree( &msgs->pexTimer );
1961        tr_publisherFree( &msgs->publisher );
1962        reqListClear( &msgs->clientWillAskFor );
1963        reqListClear( &msgs->clientAskedFor );
1964        reqListClear( &msgs->peerAskedForFast );
1965        reqListClear( &msgs->peerAskedFor );
1966        tr_bitfieldFree( msgs->peerAllowedPieces );
1967        evbuffer_free( msgs->incoming.block );
1968        evbuffer_free( msgs->outMessages );
1969        evbuffer_free( msgs->outBlock );
1970        tr_free( msgs->pex );
1971
1972        memset( msgs, ~0, sizeof( tr_peermsgs ) );
1973        tr_free( msgs );
1974    }
1975}
1976
1977void
1978tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
1979                        tr_publisher_tag    tag )
1980{
1981    tr_publisherUnsubscribe( peer->publisher, tag );
1982}
Note: See TracBrowser for help on using the repository browser.