source: trunk/libtransmission/peer-msgs.c @ 6073

Last change on this file since 6073 was 6073, checked in by charles, 13 years ago

#800 initial support for GetRight?-style fetching of data through http and ftp servers specified in the .torrent's "url-list" tag

  • Property svn:keywords set to Date Rev Author Id
File size: 54.9 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 6073 2008-06-07 21:26:41Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19#include <libgen.h> /* basename */
20
21#include <event.h>
22
23#include "transmission.h"
24#include "bencode.h"
25#include "completion.h"
26#include "inout.h"
27#include "peer-io.h"
28#include "peer-mgr.h"
29#include "peer-mgr-private.h"
30#include "peer-msgs.h"
31#include "ratecontrol.h"
32#include "stats.h"
33#include "torrent.h"
34#include "trevent.h"
35#include "utils.h"
36
37/**
38***
39**/
40
41enum
42{
43    BT_CHOKE                = 0,
44    BT_UNCHOKE              = 1,
45    BT_INTERESTED           = 2,
46    BT_NOT_INTERESTED       = 3,
47    BT_HAVE                 = 4,
48    BT_BITFIELD             = 5,
49    BT_REQUEST              = 6,
50    BT_PIECE                = 7,
51    BT_CANCEL               = 8,
52    BT_PORT                 = 9,
53    BT_SUGGEST              = 13,
54    BT_HAVE_ALL             = 14,
55    BT_HAVE_NONE            = 15,
56    BT_REJECT               = 16,
57    BT_ALLOWED_FAST         = 17,
58    BT_LTEP                 = 20,
59
60    LTEP_HANDSHAKE          = 0,
61
62    TR_LTEP_PEX             = 1,
63
64    MIN_CHOKE_PERIOD_SEC    = (10),
65
66    /* idle seconds before we send a keepalive */
67    KEEPALIVE_INTERVAL_SECS = 100,
68
69    PEX_INTERVAL            = (90 * 1000), /* msec between sendPex() calls */
70    PEER_PULSE_INTERVAL     = (50),        /* msec between pulse() calls */
71    RATE_PULSE_INTERVAL     = (250),       /* msec between ratePulse() calls */
72
73    MAX_QUEUE_SIZE          = (100),
74     
75    /* (fast peers) max number of pieces we fast-allow to another peer */
76    MAX_FAST_ALLOWED_COUNT   = 10,
77
78    /* (fast peers) max threshold for allowing fast-pieces requests */
79    MAX_FAST_ALLOWED_THRESHOLD = 10,
80
81    /* how long an unsent request can stay queued before it's returned
82       back to the peer-mgr's pool of requests */
83    QUEUED_REQUEST_TTL_SECS = 20,
84
85    /* how long a sent request can stay queued before it's returned
86       back to the peer-mgr's pool of requests */
87    SENT_REQUEST_TTL_SECS = 120,
88
89    /* used in lowering the outMessages queue period */
90    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
91    HIGH_PRIORITY_INTERVAL_SECS = 5,
92    LOW_PRIORITY_INTERVAL_SECS = 30
93};
94
95/**
96***  REQUEST MANAGEMENT
97**/
98
99enum
100{
101    AWAITING_BT_LENGTH,
102    AWAITING_BT_ID,
103    AWAITING_BT_MESSAGE,
104    AWAITING_BT_PIECE
105};
106
107struct peer_request
108{
109    uint32_t index;
110    uint32_t offset;
111    uint32_t length;
112    time_t time_requested;
113};
114
115static int
116compareRequest( const void * va, const void * vb )
117{
118    int i;
119    const struct peer_request * a = va;
120    const struct peer_request * b = vb;
121    if(( i = tr_compareUint32( a->index, b->index ))) return i;
122    if(( i = tr_compareUint32( a->offset, b->offset ))) return i;
123    if(( i = tr_compareUint32( a->length, b->length ))) return i;
124    return 0;
125}
126
127struct request_list
128{
129    uint16_t count;
130    uint16_t max;
131    struct peer_request * requests;
132};
133
134static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
135
136static void
137reqListReserve( struct request_list * list, uint16_t max )
138{
139    if( list->max < max )
140    {
141        list->max = max;
142        list->requests = tr_renew( struct peer_request,
143                                   list->requests,
144                                   list->max );
145    }
146}
147
148static void
149reqListClear( struct request_list * list )
150{
151    tr_free( list->requests );
152    *list = REQUEST_LIST_INIT;
153}
154
155static void
156reqListCopy( struct request_list * dest, const struct request_list * src )
157{
158    dest->count = dest->max = src->count;
159    dest->requests = tr_memdup( src->requests, dest->count * sizeof( struct peer_request ) );
160}
161
162static void
163reqListRemoveOne( struct request_list * list, int i )
164{
165    assert( 0<=i && i<list->count );
166
167    memmove( &list->requests[i],
168             &list->requests[i+1],
169             sizeof( struct peer_request ) * ( --list->count - i ) );
170}
171
172static void
173reqListAppend( struct request_list * list, const struct peer_request * req )
174{
175    if( ++list->count >= list->max )
176        reqListReserve( list, list->max + 8 );
177
178    list->requests[list->count-1] = *req;
179}
180
181static tr_errno
182reqListPop( struct request_list * list, struct peer_request * setme )
183{
184    tr_errno err;
185
186    if( !list->count )
187        err = TR_ERROR;
188    else {
189        *setme = list->requests[0];
190        reqListRemoveOne( list, 0 );
191        err = TR_OK;
192    }
193
194    return err;
195}
196
197static int
198reqListFind( struct request_list * list, const struct peer_request * key )
199{
200    uint16_t i;
201    for( i=0; i<list->count; ++i )
202        if( !compareRequest( key, list->requests+i ) )
203            return i;
204    return -1;
205}
206
207static tr_errno
208reqListRemove( struct request_list * list, const struct peer_request * key )
209{
210    tr_errno err;
211    const int i = reqListFind( list, key );
212
213    if( i < 0 )
214        err = TR_ERROR;
215    else {
216        err = TR_OK;
217        reqListRemoveOne( list, i );
218    }
219
220    return err;
221}
222
223/**
224***
225**/
226
227/* this is raw, unchanged data from the peer regarding
228 * the current message that it's sending us. */
229struct tr_incoming
230{
231    uint8_t id;
232    uint32_t length; /* includes the +1 for id length */
233    struct peer_request blockReq; /* metadata for incoming blocks */
234    struct evbuffer * block; /* piece data for incoming blocks */
235};
236
237struct tr_peermsgs
238{
239    unsigned int peerSentBitfield         : 1;
240    unsigned int peerSupportsPex          : 1;
241    unsigned int clientSentLtepHandshake  : 1;
242    unsigned int peerSentLtepHandshake    : 1;
243    unsigned int sendingBlock             : 1;
244   
245    uint8_t state;
246    uint8_t ut_pex_id;
247    uint16_t pexCount;
248    uint16_t maxActiveRequests;
249    uint16_t minActiveRequests;
250
251    tr_peer * info;
252
253    tr_handle * handle;
254    tr_torrent * torrent;
255    tr_peerIo * io;
256
257    tr_publisher_t * publisher;
258
259    struct evbuffer * outBlock;    /* buffer of the current piece message */
260    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
261
262    struct request_list peerAskedFor;
263    struct request_list peerAskedForFast;
264    struct request_list clientAskedFor;
265    struct request_list clientWillAskFor;
266
267    tr_timer * rateTimer;
268    tr_timer * pulseTimer;
269    tr_timer * pexTimer;
270
271    time_t clientSentPexAt;
272    time_t clientSentAnythingAt;
273
274    /* when we started batching the outMessages */
275    time_t outMessagesBatchedAt;
276
277    /* how long the outMessages batch should be allowed to grow before
278     * it's flushed -- some messages (like requests >:) should be sent
279     * very quickly; others aren't as urgent. */
280    int outMessagesBatchPeriod;
281   
282    tr_bitfield * peerAllowedPieces;
283
284    struct tr_incoming incoming; 
285
286    tr_pex * pex;
287};
288
289/**
290***
291**/
292
293static void
294myDebug( const char * file, int line,
295         const struct tr_peermsgs * msgs,
296         const char * fmt, ... )
297{
298    FILE * fp = tr_getLog( );
299    if( fp != NULL )
300    {
301        va_list args;
302        char timestr[64];
303        struct evbuffer * buf = evbuffer_new( );
304        char * myfile = tr_strdup( file );
305
306        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
307                             tr_getLogTimeStr( timestr, sizeof(timestr) ),
308                             msgs->torrent->info.name,
309                             tr_peerIoGetAddrStr( msgs->io ),
310                             msgs->info->client );
311        va_start( args, fmt );
312        evbuffer_add_vprintf( buf, fmt, args );
313        va_end( args );
314        evbuffer_add_printf( buf, " (%s:%d)\n", basename(myfile), line );
315        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
316
317        tr_free( myfile );
318        evbuffer_free( buf );
319    }
320}
321
322#define dbgmsg(msgs, fmt...) myDebug(__FILE__, __LINE__, msgs, ##fmt )
323
324/**
325***
326**/
327
328static void
329pokeBatchPeriod( tr_peermsgs * msgs, int interval )
330{
331    if( msgs->outMessagesBatchPeriod > interval )
332    {
333        msgs->outMessagesBatchPeriod = interval;
334        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
335    }
336}
337
338static void
339protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
340{
341    tr_peerIo * io = msgs->io;
342    struct evbuffer * out = msgs->outMessages;
343
344    dbgmsg( msgs, "requesting %u:%u->%u", req->index, req->offset, req->length );
345    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
346    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
347    tr_peerIoWriteUint32( io, out, req->index );
348    tr_peerIoWriteUint32( io, out, req->offset );
349    tr_peerIoWriteUint32( io, out, req->length );
350    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
351}
352
353static void
354protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
355{
356    tr_peerIo * io = msgs->io;
357    struct evbuffer * out = msgs->outMessages;
358
359    dbgmsg( msgs, "cancelling %u:%u->%u", req->index, req->offset, req->length );
360    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
361    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
362    tr_peerIoWriteUint32( io, out, req->index );
363    tr_peerIoWriteUint32( io, out, req->offset );
364    tr_peerIoWriteUint32( io, out, req->length );
365    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
366}
367
368static void
369protocolSendHave( tr_peermsgs * msgs, uint32_t index )
370{
371    tr_peerIo * io = msgs->io;
372    struct evbuffer * out = msgs->outMessages;
373
374    dbgmsg( msgs, "sending Have %u", index );
375    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
376    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
377    tr_peerIoWriteUint32( io, out, index );
378    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
379}
380
381static void
382protocolSendChoke( tr_peermsgs * msgs, int choke )
383{
384    tr_peerIo * io = msgs->io;
385    struct evbuffer * out = msgs->outMessages;
386
387    dbgmsg( msgs, "sending %s", (choke ? "Choke" : "Unchoke") );
388    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) );
389    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
390    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
391}
392
393/**
394***  EVENTS
395**/
396
397static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0 };
398
399static void
400publish( tr_peermsgs * msgs, tr_peer_event * e )
401{
402    tr_publisherPublish( msgs->publisher, msgs->info, e );
403}
404
405static void
406fireError( tr_peermsgs * msgs, tr_errno err )
407{
408    tr_peer_event e = blankEvent;
409    e.eventType = TR_PEER_ERROR;
410    e.err = err;
411    publish( msgs, &e );
412}
413
414static void
415fireNeedReq( tr_peermsgs * msgs )
416{
417    tr_peer_event e = blankEvent;
418    e.eventType = TR_PEER_NEED_REQ;
419    publish( msgs, &e );
420}
421
422static void
423firePeerProgress( tr_peermsgs * msgs )
424{
425    tr_peer_event e = blankEvent;
426    e.eventType = TR_PEER_PEER_PROGRESS;
427    e.progress = msgs->info->progress;
428    publish( msgs, &e );
429}
430
431static void
432fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
433{
434    tr_peer_event e = blankEvent;
435    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
436    e.pieceIndex = req->index;
437    e.offset = req->offset;
438    e.length = req->length;
439    publish( msgs, &e );
440}
441
442static void
443fireClientGotData( tr_peermsgs * msgs, uint32_t length )
444{
445    tr_peer_event e = blankEvent;
446    e.length = length;
447    e.eventType = TR_PEER_CLIENT_GOT_DATA;
448    publish( msgs, &e );
449}
450
451static void
452firePeerGotData( tr_peermsgs * msgs, uint32_t length )
453{
454    tr_peer_event e = blankEvent;
455    e.length = length;
456    e.eventType = TR_PEER_PEER_GOT_DATA;
457    publish( msgs, &e );
458}
459
460static void
461fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
462{
463    tr_peer_event e = blankEvent;
464    e.eventType = TR_PEER_CANCEL;
465    e.pieceIndex = req->index;
466    e.offset = req->offset;
467    e.length = req->length;
468    publish( msgs, &e );
469}
470
471/**
472***  INTEREST
473**/
474
475static int
476isPieceInteresting( const tr_peermsgs   * peer,
477                    int                   piece )
478{
479    const tr_torrent * torrent = peer->torrent;
480
481    return ( ( !torrent->info.pieces[piece].dnd )               /* we want it */
482          && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have */
483          && ( tr_bitfieldHas( peer->info->have, piece ) ) );  /* peer has it */
484}
485
486/* "interested" means we'll ask for piece data if they unchoke us */
487static int
488isPeerInteresting( const tr_peermsgs * msgs )
489{
490    tr_piece_index_t i;
491    const tr_torrent * torrent;
492    const tr_bitfield * bitfield;
493    const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
494
495    if( clientIsSeed )
496        return FALSE;
497
498    torrent = msgs->torrent;
499    bitfield = tr_cpPieceBitfield( torrent->completion );
500
501    if( !msgs->info->have )
502        return TRUE;
503
504    assert( bitfield->byteCount == msgs->info->have->byteCount );
505
506    for( i=0; i<torrent->info.pieceCount; ++i )
507        if( isPieceInteresting( msgs, i ) )
508            return TRUE;
509
510    return FALSE;
511}
512
513static void
514sendInterest( tr_peermsgs * msgs, int weAreInterested )
515{
516    assert( msgs != NULL );
517    assert( weAreInterested==0 || weAreInterested==1 );
518
519    msgs->info->clientIsInterested = weAreInterested;
520    dbgmsg( msgs, "Sending %s",
521            weAreInterested ? "Interested" : "Not Interested");
522
523    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
524    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages,
525                   weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
526    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
527}
528
529static void
530updateInterest( tr_peermsgs * msgs )
531{
532    const int i = isPeerInteresting( msgs );
533    if( i != msgs->info->clientIsInterested )
534        sendInterest( msgs, i );
535    if( i )
536        fireNeedReq( msgs );
537}
538
539static void
540cancelAllRequestsToClientExceptFast( tr_peermsgs * msgs )
541{
542    reqListClear( &msgs->peerAskedFor );
543}
544
545void
546tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
547{
548    const time_t fibrillationTime = time(NULL) - MIN_CHOKE_PERIOD_SEC;
549
550    assert( msgs != NULL );
551    assert( msgs->info != NULL );
552    assert( choke==0 || choke==1 );
553
554    if( msgs->info->chokeChangedAt > fibrillationTime )
555    {
556        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
557    }
558    else if( msgs->info->peerIsChoked != choke )
559    {
560        msgs->info->peerIsChoked = choke;
561        if( choke )
562            cancelAllRequestsToClientExceptFast( msgs );
563        protocolSendChoke( msgs, choke );
564        msgs->info->chokeChangedAt = time( NULL );
565    }
566}
567
568/**
569***
570**/
571
572void
573tr_peerMsgsHave( tr_peermsgs * msgs,
574                 uint32_t      index )
575{
576    protocolSendHave( msgs, index );
577
578    /* since we have more pieces now, we might not be interested in this peer */
579    updateInterest( msgs );
580}
581#if 0
582static void
583sendFastSuggest( tr_peermsgs * msgs,
584                 uint32_t      pieceIndex )
585{
586    assert( msgs != NULL );
587   
588    if( tr_peerIoSupportsFEXT( msgs->io ) )
589    {
590        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
591        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
592        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
593    }
594}
595static void
596sendFastHave( tr_peermsgs * msgs, int all )
597{
598    assert( msgs != NULL );
599   
600    if( tr_peerIoSupportsFEXT( msgs->io ) )
601    {
602        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
603        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, ( all ? BT_HAVE_ALL
604                                                                : BT_HAVE_NONE ) );
605        updateInterest( msgs );
606    }
607}
608#endif
609
610static void
611sendFastReject( tr_peermsgs * msgs,
612                uint32_t      pieceIndex,
613                uint32_t      offset,
614                uint32_t      length )
615{
616    assert( msgs != NULL );
617
618    if( tr_peerIoSupportsFEXT( msgs->io ) )
619    {
620        const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
621        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
622        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_REJECT );
623        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
624        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
625        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
626        pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
627    }
628}
629
630static tr_bitfield*
631getPeerAllowedPieces( tr_peermsgs * msgs )
632{
633    if( !msgs->peerAllowedPieces && tr_peerIoSupportsFEXT( msgs->io ) )
634    {
635        msgs->peerAllowedPieces = tr_peerMgrGenerateAllowedSet(
636            MAX_FAST_ALLOWED_COUNT,
637            msgs->torrent->info.pieceCount,
638            msgs->torrent->info.hash,
639            tr_peerIoGetAddress( msgs->io, NULL ) );
640    }
641
642    return msgs->peerAllowedPieces;
643}
644
645static void
646sendFastAllowed( tr_peermsgs * msgs,
647                 uint32_t      pieceIndex)
648{
649    assert( msgs != NULL );
650   
651    if( tr_peerIoSupportsFEXT( msgs->io ) )
652    {
653        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
654        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_ALLOWED_FAST );
655        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
656        pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
657    }
658}
659
660static void
661sendFastAllowedSet( tr_peermsgs * msgs )
662{
663    tr_piece_index_t i = 0;
664
665    while (i <= msgs->torrent->info.pieceCount )
666    {
667        if ( tr_bitfieldHas( getPeerAllowedPieces( msgs ), i) )
668            sendFastAllowed( msgs, i );
669        i++;
670    }
671}
672
673static void
674maybeSendFastAllowedSet( tr_peermsgs * msgs )
675{
676    if( tr_bitfieldCountTrueBits( msgs->info->have ) <= MAX_FAST_ALLOWED_THRESHOLD )
677        sendFastAllowedSet( msgs );
678}
679
680
681/**
682***
683**/
684
685static int
686reqIsValid( const tr_peermsgs   * msgs,
687            uint32_t              index,
688            uint32_t              offset,
689            uint32_t              length )
690{
691    return tr_torrentReqIsValid( msgs->torrent, index, offset, length );
692}
693
694static int
695requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
696{
697    return reqIsValid( msgs, req->index, req->offset, req->length );
698}
699
700static void
701expireOldRequests( tr_peermsgs * msgs )
702{
703    int i;
704    time_t oldestAllowed;
705    struct request_list tmp = REQUEST_LIST_INIT;
706
707    /* cancel requests that have been queued for too long */
708    oldestAllowed = time( NULL ) - QUEUED_REQUEST_TTL_SECS;
709    reqListCopy( &tmp, &msgs->clientWillAskFor );
710    for( i=0; i<tmp.count; ++i ) {
711        const struct peer_request * req = &tmp.requests[i];
712        if( req->time_requested < oldestAllowed )
713            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
714    }
715    reqListClear( &tmp );
716
717    /* cancel requests that were sent too long ago */
718    oldestAllowed = time( NULL ) - SENT_REQUEST_TTL_SECS;
719    reqListCopy( &tmp, &msgs->clientAskedFor );
720    for( i=0; i<tmp.count; ++i ) {
721        const struct peer_request * req = &tmp.requests[i];
722        if( req->time_requested < oldestAllowed )
723            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
724    }
725    reqListClear( &tmp );
726}
727
728static void
729pumpRequestQueue( tr_peermsgs * msgs )
730{
731    const int max = msgs->maxActiveRequests;
732    const int min = msgs->minActiveRequests;
733    const time_t now = time( NULL );
734    int sent = 0;
735    int count = msgs->clientAskedFor.count;
736    struct peer_request req;
737
738    if( count > min )
739        return;
740    if( msgs->info->clientIsChoked )
741        return;
742
743    while( ( count < max ) && !reqListPop( &msgs->clientWillAskFor, &req ) )
744    {
745        assert( requestIsValid( msgs, &req ) );
746        assert( tr_bitfieldHas( msgs->info->have, req.index ) );
747
748        protocolSendRequest( msgs, &req );
749        req.time_requested = now;
750        reqListAppend( &msgs->clientAskedFor, &req );
751
752        ++count;
753        ++sent;
754    }
755
756    if( sent )
757        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
758                sent,
759                msgs->clientAskedFor.count,
760                msgs->clientWillAskFor.count );
761
762    if( count < max )
763        fireNeedReq( msgs );
764}
765
766static int
767pulse( void * vmsgs );
768
769int
770tr_peerMsgsAddRequest( tr_peermsgs * msgs,
771                       uint32_t      index, 
772                       uint32_t      offset, 
773                       uint32_t      length )
774{
775    const int req_max = msgs->maxActiveRequests;
776    struct peer_request req;
777
778    assert( msgs != NULL );
779    assert( msgs->torrent != NULL );
780    assert( reqIsValid( msgs, index, offset, length ) );
781
782    /**
783    ***  Reasons to decline the request
784    **/
785
786    /* don't send requests to choked clients */
787    if( msgs->info->clientIsChoked ) {
788        dbgmsg( msgs, "declining request because they're choking us" );
789        return TR_ADDREQ_CLIENT_CHOKED;
790    }
791
792    /* peer doesn't have this piece */
793    if( !tr_bitfieldHas( msgs->info->have, index ) )
794        return TR_ADDREQ_MISSING;
795
796    /* peer's queue is full */
797    if( msgs->clientWillAskFor.count >= req_max ) {
798        dbgmsg( msgs, "declining request because we're full" );
799        return TR_ADDREQ_FULL;
800    }
801
802    /* have we already asked for this piece? */
803    req.index = index;
804    req.offset = offset;
805    req.length = length;
806    if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
807        dbgmsg( msgs, "declining because it's a duplicate" );
808        return TR_ADDREQ_DUPLICATE;
809    }
810    if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
811        dbgmsg( msgs, "declining because it's a duplicate" );
812        return TR_ADDREQ_DUPLICATE;
813    }
814
815    /**
816    ***  Accept this request
817    **/
818
819    dbgmsg( msgs, "added req for piece %d, offset %d", (int)index, (int)offset );
820    req.time_requested = time( NULL );
821    reqListAppend( &msgs->clientWillAskFor, &req );
822    return TR_ADDREQ_OK;
823}
824
825static void
826cancelAllRequestsToPeer( tr_peermsgs * msgs )
827{
828    int i;
829    struct request_list a = msgs->clientWillAskFor;
830    struct request_list b = msgs->clientAskedFor;
831
832    msgs->clientAskedFor = REQUEST_LIST_INIT;
833    msgs->clientWillAskFor = REQUEST_LIST_INIT;
834
835    for( i=0; i<a.count; ++i )
836        fireCancelledReq( msgs, &a.requests[i] );
837
838    for( i=0; i<b.count; ++i ) {
839        fireCancelledReq( msgs, &b.requests[i] );
840        protocolSendCancel( msgs, &b.requests[i] );
841    }
842
843    reqListClear( &a );
844    reqListClear( &b );
845}
846
847void
848tr_peerMsgsCancel( tr_peermsgs * msgs,
849                   uint32_t      pieceIndex,
850                   uint32_t      offset,
851                   uint32_t      length )
852{
853    struct peer_request req;
854
855    assert( msgs != NULL );
856    assert( length > 0 );
857
858    /* have we asked the peer for this piece? */
859    req.index = pieceIndex;
860    req.offset = offset;
861    req.length = length;
862
863    /* if it's only in the queue and hasn't been sent yet, free it */
864    if( !reqListRemove( &msgs->clientWillAskFor, &req ) )
865        fireCancelledReq( msgs, &req );
866
867    /* if it's already been sent, send a cancel message too */
868    if( !reqListRemove( &msgs->clientAskedFor, &req ) ) {
869        protocolSendCancel( msgs, &req );
870        fireCancelledReq( msgs, &req );
871    }
872}
873
874/**
875***
876**/
877
878static void
879sendLtepHandshake( tr_peermsgs * msgs )
880{
881    tr_benc val, *m;
882    char * buf;
883    int len;
884    int pex;
885    struct evbuffer * outbuf;
886
887    if( msgs->clientSentLtepHandshake )
888        return;
889
890    outbuf = evbuffer_new( );
891    dbgmsg( msgs, "sending an ltep handshake" );
892    msgs->clientSentLtepHandshake = 1;
893
894    /* decide if we want to advertise pex support */
895    if( !tr_torrentAllowsPex( msgs->torrent ) )
896        pex = 0;
897    else if( msgs->peerSentLtepHandshake )
898        pex = msgs->peerSupportsPex ? 1 : 0;
899    else
900        pex = 1;
901
902    tr_bencInitDict( &val, 4 );
903    tr_bencDictAddInt( &val, "e", msgs->handle->encryptionMode != TR_PLAINTEXT_PREFERRED );
904    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->handle ) );
905    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
906    m  = tr_bencDictAddDict( &val, "m", 1 );
907    if( pex )
908        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
909    buf = tr_bencSave( &val, &len );
910
911    tr_peerIoWriteUint32( msgs->io, outbuf, 2*sizeof(uint8_t) + len );
912    tr_peerIoWriteUint8 ( msgs->io, outbuf, BT_LTEP );
913    tr_peerIoWriteUint8 ( msgs->io, outbuf, LTEP_HANDSHAKE );
914    tr_peerIoWriteBytes ( msgs->io, outbuf, buf, len );
915
916    tr_peerIoWriteBuf( msgs->io, outbuf );
917
918    dbgmsg( msgs, "here is the ltep handshake we sent [%*.*s]", len, len, buf );
919
920    /* cleanup */
921    tr_bencFree( &val );
922    tr_free( buf );
923    evbuffer_free( outbuf );
924}
925
926static void
927parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
928{
929    tr_benc val, * sub;
930    uint8_t * tmp = tr_new( uint8_t, len );
931
932    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
933    msgs->peerSentLtepHandshake = 1;
934
935    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
936        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
937        tr_free( tmp );
938        return;
939    }
940
941    dbgmsg( msgs, "here is the ltep handshake we got [%*.*s]", len, len, tmp );
942
943    /* does the peer prefer encrypted connections? */
944    if(( sub = tr_bencDictFindType( &val, "e", TYPE_INT )))
945        msgs->info->encryption_preference = sub->val.i
946                                      ? ENCRYPTION_PREFERENCE_YES
947                                      : ENCRYPTION_PREFERENCE_NO;
948
949    /* check supported messages for utorrent pex */
950    msgs->peerSupportsPex = 0;
951    if(( sub = tr_bencDictFindType( &val, "m", TYPE_DICT ))) {
952        if(( sub = tr_bencDictFindType( sub, "ut_pex", TYPE_INT ))) {
953            msgs->ut_pex_id = (uint8_t) sub->val.i;
954            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
955            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
956        }
957    }
958
959    /* get peer's listening port */
960    if(( sub = tr_bencDictFindType( &val, "p", TYPE_INT ))) {
961        msgs->info->port = htons( (uint16_t)sub->val.i );
962        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
963    }
964
965    tr_bencFree( &val );
966    tr_free( tmp );
967}
968
969static void
970parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
971{
972    int loaded = 0;
973    uint8_t * tmp = tr_new( uint8_t, msglen );
974    tr_benc val, *added;
975    const tr_torrent * tor = msgs->torrent;
976    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
977
978    if( tr_torrentAllowsPex( tor )
979        && (( loaded = !tr_bencLoad( tmp, msglen, &val, NULL )))
980        && (( added = tr_bencDictFindType( &val, "added", TYPE_STR ))))
981    {
982        const char * added_f = NULL;
983        tr_pex * pex;
984        size_t i, n;
985        tr_bencDictFindStr( &val, "added.f", &added_f );
986        pex = tr_peerMgrCompactToPex( added->val.s.s, added->val.s.i, added_f, &n );
987        for( i=0; i<n; ++i )
988            tr_peerMgrAddPex( msgs->handle->peerMgr, tor->info.hash,
989                              TR_PEER_FROM_PEX, pex+i );
990        tr_free( pex );
991    }
992
993    if( loaded )
994        tr_bencFree( &val );
995    tr_free( tmp );
996}
997
998static void
999sendPex( tr_peermsgs * msgs );
1000
1001static void
1002parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1003{
1004    uint8_t ltep_msgid;
1005
1006    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
1007    msglen--;
1008
1009    if( ltep_msgid == LTEP_HANDSHAKE )
1010    {
1011        dbgmsg( msgs, "got ltep handshake" );
1012        parseLtepHandshake( msgs, msglen, inbuf );
1013        if( tr_peerIoSupportsLTEP( msgs->io ) )
1014        {
1015            sendLtepHandshake( msgs );
1016            sendPex( msgs );
1017        }
1018    }
1019    else if( ltep_msgid == TR_LTEP_PEX )
1020    {
1021        dbgmsg( msgs, "got ut pex" );
1022        msgs->peerSupportsPex = 1;
1023        parseUtPex( msgs, msglen, inbuf );
1024    }
1025    else
1026    {
1027        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1028        evbuffer_drain( inbuf, msglen );
1029    }
1030}
1031
1032static int
1033readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1034{
1035    uint32_t len;
1036
1037    if( inlen < sizeof(len) )
1038        return READ_MORE;
1039
1040    tr_peerIoReadUint32( msgs->io, inbuf, &len );
1041
1042    if( len == 0 ) /* peer sent us a keepalive message */
1043        dbgmsg( msgs, "got KeepAlive" );
1044    else {
1045        msgs->incoming.length = len;
1046        msgs->state = AWAITING_BT_ID;
1047    }
1048
1049    return READ_AGAIN;
1050}
1051
1052static int
1053readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen );
1054
1055static int
1056readBtId( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1057{
1058    uint8_t id;
1059
1060    if( inlen < sizeof(uint8_t) )
1061        return READ_MORE;
1062
1063    tr_peerIoReadUint8( msgs->io, inbuf, &id );
1064    msgs->incoming.id = id;
1065
1066    if( id==BT_PIECE )
1067    {
1068        msgs->state = AWAITING_BT_PIECE;
1069        return READ_AGAIN;
1070    }
1071    else if( msgs->incoming.length != 1 )
1072    {
1073        msgs->state = AWAITING_BT_MESSAGE;
1074        return READ_AGAIN;
1075    }
1076    else return readBtMessage( msgs, inbuf, inlen-1 );
1077}
1078
1079static void
1080updatePeerProgress( tr_peermsgs * msgs )
1081{
1082    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have )
1083                         / (float)msgs->torrent->info.pieceCount;
1084    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
1085    updateInterest( msgs );
1086    firePeerProgress( msgs );
1087}
1088
1089static int
1090clientCanSendFastBlock( const tr_peermsgs * msgs UNUSED )
1091{
1092    /* don't send a fast piece if peer has MAX_FAST_ALLOWED_THRESHOLD pieces */
1093    if( tr_bitfieldCountTrueBits( msgs->info->have ) > MAX_FAST_ALLOWED_THRESHOLD )
1094        return FALSE;
1095   
1096    /* ...or if we don't have ourself enough pieces */
1097    if( tr_bitfieldCountTrueBits( tr_cpPieceBitfield( msgs->torrent->completion ) ) < MAX_FAST_ALLOWED_THRESHOLD )
1098        return FALSE;
1099
1100    /* Maybe a bandwidth limit ? */
1101    return TRUE;
1102}
1103
1104static void
1105peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
1106{
1107    const int reqIsValid = requestIsValid( msgs, req );
1108    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
1109    const int peerIsChoked = msgs->info->peerIsChoked;
1110    const int peerIsFast = tr_peerIoSupportsFEXT( msgs->io );
1111    const int pieceIsFast = reqIsValid && tr_bitfieldHas( getPeerAllowedPieces( msgs ), req->index );
1112    const int canSendFast = clientCanSendFastBlock( msgs );
1113
1114    if( !reqIsValid ) /* bad request */
1115    {
1116        dbgmsg( msgs, "rejecting an invalid request." );
1117        sendFastReject( msgs, req->index, req->offset, req->length );
1118    }
1119    else if( !clientHasPiece ) /* we don't have it */
1120    {
1121        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1122        sendFastReject( msgs, req->index, req->offset, req->length );
1123    }
1124    else if( peerIsChoked && !peerIsFast ) /* doesn't he know he's choked? */
1125    {
1126        tr_peerMsgsSetChoke( msgs, 1 );
1127        sendFastReject( msgs, req->index, req->offset, req->length );
1128    }
1129    else if( peerIsChoked && peerIsFast && ( !pieceIsFast || !canSendFast ) )
1130    {
1131        sendFastReject( msgs, req->index, req->offset, req->length );
1132    }
1133    else /* YAY */
1134    {
1135        if( peerIsFast && pieceIsFast )
1136            reqListAppend( &msgs->peerAskedForFast, req );
1137        else
1138            reqListAppend( &msgs->peerAskedFor, req );
1139    }
1140}
1141
1142static int
1143messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1144{
1145    switch( id )
1146    {
1147        case BT_CHOKE:
1148        case BT_UNCHOKE:
1149        case BT_INTERESTED:
1150        case BT_NOT_INTERESTED:
1151        case BT_HAVE_ALL:
1152        case BT_HAVE_NONE:
1153            return len==1;
1154
1155        case BT_HAVE:
1156        case BT_SUGGEST:
1157        case BT_ALLOWED_FAST:
1158            return len==5;
1159
1160        case BT_BITFIELD:
1161            return len == (msg->torrent->info.pieceCount+7u)/8u + 1u;
1162       
1163        case BT_REQUEST:
1164        case BT_CANCEL:
1165        case BT_REJECT:
1166            return len==13;
1167
1168        case BT_PIECE:
1169            return len>9 && len<=16393;
1170
1171        case BT_PORT:
1172            return len==3;
1173
1174        case BT_LTEP:
1175            return len >= 2;
1176
1177        default:
1178            return FALSE;
1179    }
1180}
1181
1182static int
1183clientGotBlock( tr_peermsgs * msgs, const uint8_t * block, const struct peer_request * req );
1184
1185static void
1186clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1187{
1188    msgs->info->pieceDataActivityDate = time( NULL );
1189    tr_rcTransferred( msgs->info->rcToClient, byteCount );
1190    fireClientGotData( msgs, byteCount );
1191}
1192
1193static int
1194readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1195{
1196    struct peer_request * req = &msgs->incoming.blockReq;
1197    assert( EVBUFFER_LENGTH(inbuf) >= inlen );
1198    dbgmsg( msgs, "In readBtPiece" );
1199
1200    if( !req->length )
1201    {
1202        if( inlen < 8 )
1203            return READ_MORE;
1204
1205        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
1206        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
1207        req->length = msgs->incoming.length - 9;
1208        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1209        return READ_AGAIN;
1210    }
1211    else
1212    {
1213        int err;
1214
1215        /* read in another chunk of data */
1216        const size_t nLeft = req->length - EVBUFFER_LENGTH(msgs->incoming.block);
1217        size_t n = MIN( nLeft, inlen );
1218        uint8_t * buf = tr_new( uint8_t, n );
1219        assert( EVBUFFER_LENGTH(inbuf) >= n );
1220        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
1221        evbuffer_add( msgs->incoming.block, buf, n );
1222        clientGotBytes( msgs, n );
1223        tr_free( buf );
1224        dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
1225               (int)n, req->index, req->offset, req->length,
1226               (int)( req->length - EVBUFFER_LENGTH(msgs->incoming.block) ) );
1227        if( EVBUFFER_LENGTH(msgs->incoming.block) < req->length )
1228            return READ_MORE;
1229
1230        /* we've got the whole block ... process it */
1231        err = clientGotBlock( msgs, EVBUFFER_DATA(msgs->incoming.block), req );
1232
1233        /* cleanup */
1234        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH(msgs->incoming.block) );
1235        req->length = 0;
1236        msgs->state = AWAITING_BT_LENGTH;
1237        if( !err )
1238            return READ_AGAIN;
1239        else {
1240            fireError( msgs, err );
1241            return READ_DONE;
1242        }
1243    }
1244}
1245
1246static int
1247readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1248{
1249    uint32_t ui32;
1250    uint32_t msglen = msgs->incoming.length;
1251    const uint8_t id = msgs->incoming.id;
1252    const size_t startBufLen = EVBUFFER_LENGTH( inbuf );
1253
1254    --msglen; /* id length */
1255
1256    if( inlen < msglen )
1257        return READ_MORE;
1258
1259    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id, (int)msglen, (int)inlen );
1260
1261    if( !messageLengthIsCorrect( msgs, id, msglen+1 ) )
1262    {
1263        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1264        fireError( msgs, TR_ERROR );
1265        return READ_DONE;
1266    }
1267
1268    switch( id )
1269    {
1270        case BT_CHOKE:
1271            dbgmsg( msgs, "got Choke" );
1272            msgs->info->clientIsChoked = 1;
1273            cancelAllRequestsToPeer( msgs );
1274            cancelAllRequestsToClientExceptFast( msgs );
1275            break;
1276
1277        case BT_UNCHOKE:
1278            dbgmsg( msgs, "got Unchoke" );
1279            msgs->info->clientIsChoked = 0;
1280            fireNeedReq( msgs );
1281            break;
1282
1283        case BT_INTERESTED:
1284            dbgmsg( msgs, "got Interested" );
1285            msgs->info->peerIsInterested = 1;
1286            tr_peerMsgsSetChoke( msgs, 0 );
1287            break;
1288
1289        case BT_NOT_INTERESTED:
1290            dbgmsg( msgs, "got Not Interested" );
1291            msgs->info->peerIsInterested = 0;
1292            break;
1293           
1294        case BT_HAVE:
1295            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1296            dbgmsg( msgs, "got Have: %u", ui32 );
1297            if( tr_bitfieldAdd( msgs->info->have, ui32 ) )
1298                fireError( msgs, TR_ERROR_PEER_MESSAGE );
1299            updatePeerProgress( msgs );
1300            tr_rcTransferred( msgs->torrent->swarmSpeed, msgs->torrent->info.pieceSize );
1301            break;
1302
1303        case BT_BITFIELD: {
1304            dbgmsg( msgs, "got a bitfield" );
1305            msgs->peerSentBitfield = 1;
1306            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
1307            updatePeerProgress( msgs );
1308            maybeSendFastAllowedSet( msgs );
1309            fireNeedReq( msgs );
1310            break;
1311        }
1312
1313        case BT_REQUEST: {
1314            struct peer_request r;
1315            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1316            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1317            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1318            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1319            peerMadeRequest( msgs, &r );
1320            break;
1321        }
1322
1323        case BT_CANCEL: {
1324            struct peer_request r;
1325            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1326            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1327            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1328            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1329            reqListRemove( &msgs->peerAskedForFast, &r );
1330            reqListRemove( &msgs->peerAskedFor, &r );
1331            break;
1332        }
1333
1334        case BT_PIECE:
1335            assert( 0 ); /* handled elsewhere! */
1336            break;
1337       
1338        case BT_PORT:
1339            dbgmsg( msgs, "Got a BT_PORT" );
1340            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1341            break;
1342           
1343        case BT_SUGGEST: {
1344            dbgmsg( msgs, "Got a BT_SUGGEST" );
1345            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1346            /* we don't do anything with this yet */
1347            break;
1348        }
1349           
1350        case BT_HAVE_ALL:
1351            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1352            tr_bitfieldAddRange( msgs->info->have, 0, msgs->torrent->info.pieceCount );
1353            updatePeerProgress( msgs );
1354            maybeSendFastAllowedSet( msgs );
1355            break;
1356       
1357       
1358        case BT_HAVE_NONE:
1359            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1360            tr_bitfieldClear( msgs->info->have );
1361            updatePeerProgress( msgs );
1362            maybeSendFastAllowedSet( msgs );
1363            break;
1364       
1365        case BT_REJECT: {
1366            struct peer_request r;
1367            dbgmsg( msgs, "Got a BT_REJECT" );
1368            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1369            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1370            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1371            reqListRemove( &msgs->clientAskedFor, &r );
1372            break;
1373        }
1374
1375        case BT_ALLOWED_FAST: {
1376            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1377            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1378            /* we don't do anything with this yet */
1379            break;
1380        }
1381
1382        case BT_LTEP:
1383            dbgmsg( msgs, "Got a BT_LTEP" );
1384            parseLtep( msgs, msglen, inbuf );
1385            break;
1386
1387        default:
1388            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1389            tr_peerIoDrain( msgs->io, inbuf, msglen );
1390            break;
1391    }
1392
1393    assert( msglen + 1 == msgs->incoming.length );
1394    assert( EVBUFFER_LENGTH(inbuf) == startBufLen - msglen );
1395
1396    msgs->state = AWAITING_BT_LENGTH;
1397    return READ_AGAIN;
1398}
1399
1400static void
1401peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1402{
1403    msgs->info->pieceDataActivityDate = time( NULL );
1404    tr_rcTransferred( msgs->info->rcToPeer, byteCount );
1405    firePeerGotData( msgs, byteCount );
1406}
1407
1408static size_t
1409getDownloadMax( const tr_peermsgs * msgs )
1410{
1411    static const size_t maxval = ~0;
1412    const tr_torrent * tor = msgs->torrent;
1413
1414    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1415        return tor->handle->useDownloadLimit
1416            ? tr_rcBytesLeft( tor->handle->download ) : maxval;
1417
1418    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
1419        return tr_rcBytesLeft( tor->download );
1420
1421    return maxval;
1422}
1423
1424static void
1425decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount )
1426{
1427    tr_torrent * tor = msgs->torrent;
1428    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1429}
1430
1431static void
1432clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1433{
1434    decrementDownloadedCount( msgs, req->length );
1435}
1436
1437static void
1438addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1439{
1440    if( !msgs->info->blame )
1441         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1442    tr_bitfieldAdd( msgs->info->blame, index );
1443}
1444
1445static tr_errno
1446clientGotBlock( tr_peermsgs                * msgs,
1447                const uint8_t              * data,
1448                const struct peer_request  * req )
1449{
1450    int err;
1451    tr_torrent * tor = msgs->torrent;
1452    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1453
1454    assert( msgs != NULL );
1455    assert( req != NULL );
1456
1457    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) )
1458    {
1459        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1460                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1461        return TR_ERROR;
1462    }
1463
1464    /* save the block */
1465    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1466
1467    /**
1468    *** Remove the block from our `we asked for this' list
1469    **/
1470
1471    if( reqListRemove( &msgs->clientAskedFor, req ) )
1472    {
1473        clientGotUnwantedBlock( msgs, req );
1474        dbgmsg( msgs, "we didn't ask for this message..." );
1475        return 0;
1476    }
1477
1478    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1479                  msgs->clientAskedFor.count );
1480
1481    /**
1482    *** Error checks
1483    **/
1484
1485    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1486        dbgmsg( msgs, "we have this block already..." );
1487        clientGotUnwantedBlock( msgs, req );
1488        return 0;
1489    }
1490
1491    /**
1492    ***  Save the block
1493    **/
1494
1495    msgs->info->peerSentPieceDataAt = time( NULL );
1496    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1497        return err;
1498
1499    addPeerToBlamefield( msgs, req->index );
1500    fireGotBlock( msgs, req );
1501    return 0;
1502}
1503
1504static void
1505didWrite( struct bufferevent * evin UNUSED, void * vmsgs )
1506{
1507    pulse( vmsgs );
1508}
1509
1510static ReadState
1511canRead( struct bufferevent * evin, void * vmsgs )
1512{
1513    ReadState ret;
1514    tr_peermsgs * msgs = vmsgs;
1515    struct evbuffer * in = EVBUFFER_INPUT ( evin );
1516    const size_t inlen = EVBUFFER_LENGTH( in );
1517
1518    if( !inlen )
1519    {
1520        ret = READ_DONE;
1521    }
1522    else if( msgs->state == AWAITING_BT_PIECE )
1523    {
1524        const size_t downloadMax = getDownloadMax( msgs );
1525        const size_t n = MIN( inlen, downloadMax );
1526        ret = n ? readBtPiece( msgs, in, n ) : READ_DONE;
1527    }
1528    else switch( msgs->state )
1529    {
1530        case AWAITING_BT_LENGTH:  ret = readBtLength ( msgs, in, inlen ); break;
1531        case AWAITING_BT_ID:      ret = readBtId     ( msgs, in, inlen ); break;
1532        case AWAITING_BT_MESSAGE: ret = readBtMessage( msgs, in, inlen ); break;
1533        default:                  assert( 0 );
1534    }
1535
1536    return ret;
1537}
1538
1539static void
1540sendKeepalive( tr_peermsgs * msgs )
1541{
1542    dbgmsg( msgs, "sending a keepalive message" );
1543    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1544    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1545}
1546
1547/**
1548***
1549**/
1550
1551static size_t
1552getUploadMax( const tr_peermsgs * msgs )
1553{
1554    static const size_t maxval = ~0;
1555    const tr_torrent * tor = msgs->torrent;
1556    int speedLeft;
1557    int bufLeft;
1558
1559    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1560        speedLeft = tor->handle->useUploadLimit ? tr_rcBytesLeft( tor->handle->upload ) : maxval;
1561    else if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
1562        speedLeft = tr_rcBytesLeft( tor->upload );
1563    else
1564        speedLeft = INT_MAX;
1565
1566    /* this basically says the outbuf shouldn't have more than one block
1567     * queued up in it... blocksize, +13 for the size of the BT protocol's
1568     * block message overhead */
1569    bufLeft = tor->blockSize + 13 - tr_peerIoWriteBytesWaiting( msgs->io );
1570    return MIN( speedLeft, bufLeft );
1571}
1572
1573static int
1574ratePulse( void * vmsgs )
1575{
1576    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1577    msgs->info->rateToClient = tr_rcRate( msgs->info->rcToClient );
1578    msgs->info->rateToPeer = tr_rcRate( msgs->info->rcToPeer );
1579    msgs->maxActiveRequests = MIN( 4 + (int)(msgs->info->rateToClient/4), MAX_QUEUE_SIZE );
1580    msgs->minActiveRequests = msgs->maxActiveRequests / 3;
1581    return TRUE;
1582}
1583
1584static tr_errno
1585popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
1586{
1587    if( !reqListPop( &msgs->peerAskedForFast, setme ) )
1588        return 0;
1589    if( !reqListPop( &msgs->peerAskedFor, setme ) )
1590        return 0;
1591
1592    return TR_ERROR;
1593}
1594
1595static int
1596pulse( void * vmsgs )
1597{
1598    const time_t now = time( NULL );
1599    tr_peermsgs * msgs = vmsgs;
1600
1601    tr_peerIoTryRead( msgs->io );
1602    pumpRequestQueue( msgs );
1603    expireOldRequests( msgs );
1604
1605    if( msgs->sendingBlock )
1606    {
1607        const size_t uploadMax = getUploadMax( msgs );
1608        size_t len = EVBUFFER_LENGTH( msgs->outBlock );
1609        const size_t outlen = MIN( len, uploadMax );
1610
1611        assert( len );
1612
1613        if( outlen )
1614        {
1615            tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen );
1616            evbuffer_drain( msgs->outBlock, outlen );
1617            peerGotBytes( msgs, outlen );
1618
1619            len -= outlen;
1620            msgs->clientSentAnythingAt = now;
1621            msgs->sendingBlock = len!=0;
1622
1623            dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen, (int)len );
1624        }
1625        else dbgmsg( msgs, "stalled writing block... uploadMax %lu, outlen %lu", uploadMax, outlen );
1626    }
1627
1628    if( !msgs->sendingBlock )
1629    {
1630        struct peer_request req;
1631        int haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1632
1633        if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1634        {
1635            dbgmsg( msgs, "started an outMessages batch (length is %d)", (int)EVBUFFER_LENGTH( msgs->outMessages ) );
1636            msgs->outMessagesBatchedAt = now;
1637        }
1638        else if( haveMessages
1639                 && ( ( now - msgs->outMessagesBatchedAt ) > msgs->outMessagesBatchPeriod ) )
1640        {
1641            dbgmsg( msgs, "flushing outMessages... (length is %d)", (int)EVBUFFER_LENGTH( msgs->outMessages ) );
1642            tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1643            msgs->clientSentAnythingAt = now;
1644            msgs->outMessagesBatchedAt = 0;
1645            msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1646        }
1647        else if( !EVBUFFER_LENGTH( msgs->outBlock )
1648            && !popNextRequest( msgs, &req )
1649            && requestIsValid( msgs, &req )
1650            && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) )
1651        {
1652            uint8_t * buf = tr_new( uint8_t, req.length );
1653
1654            if( !tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf ) )
1655            {
1656                tr_peerIo * io = msgs->io;
1657                struct evbuffer * out = msgs->outBlock;
1658
1659                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1660                tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 2*sizeof(uint32_t) + req.length );
1661                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1662                tr_peerIoWriteUint32( io, out, req.index );
1663                tr_peerIoWriteUint32( io, out, req.offset );
1664                tr_peerIoWriteBytes ( io, out, buf, req.length );
1665                msgs->sendingBlock = 1;
1666            }
1667
1668            tr_free( buf );
1669        }
1670        else if(    ( !haveMessages )
1671                 && ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
1672        {
1673            sendKeepalive( msgs );
1674        }
1675    }
1676
1677    return TRUE; /* loop forever */
1678}
1679
1680static void
1681gotError( struct bufferevent * evbuf UNUSED, short what, void * vmsgs )
1682{
1683    if( what & EVBUFFER_TIMEOUT )
1684        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1685    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1686        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1687                what, errno, tr_strerror(errno) );
1688    fireError( vmsgs, TR_ERROR );
1689}
1690
1691static void
1692sendBitfield( tr_peermsgs * msgs )
1693{
1694    const tr_bitfield * bitfield = tr_cpPieceBitfield( msgs->torrent->completion );
1695    struct evbuffer * out = msgs->outMessages;
1696
1697    dbgmsg( msgs, "sending peer a bitfield message" );
1698    tr_peerIoWriteUint32( msgs->io, out, sizeof(uint8_t) + bitfield->byteCount );
1699    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1700    tr_peerIoWriteBytes ( msgs->io, out, bitfield->bits, bitfield->byteCount );
1701    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1702}
1703
1704/**
1705***
1706**/
1707
1708/* some peers give us error messages if we send
1709   more than this many peers in a single pex message
1710   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1711#define MAX_PEX_ADDED 50
1712#define MAX_PEX_DROPPED 50
1713
1714typedef struct
1715{
1716    tr_pex * added;
1717    tr_pex * dropped;
1718    tr_pex * elements;
1719    int addedCount;
1720    int droppedCount;
1721    int elementCount;
1722}
1723PexDiffs;
1724
1725static void
1726pexAddedCb( void * vpex, void * userData )
1727{
1728    PexDiffs * diffs = userData;
1729    tr_pex * pex = vpex;
1730    if( diffs->addedCount < MAX_PEX_ADDED )
1731    {
1732        diffs->added[diffs->addedCount++] = *pex;
1733        diffs->elements[diffs->elementCount++] = *pex;
1734    }
1735}
1736
1737static void
1738pexDroppedCb( void * vpex, void * userData )
1739{
1740    PexDiffs * diffs = userData;
1741    tr_pex * pex = vpex;
1742    if( diffs->droppedCount < MAX_PEX_DROPPED )
1743    {
1744        diffs->dropped[diffs->droppedCount++] = *pex;
1745    }
1746}
1747
1748static void
1749pexElementCb( void * vpex, void * userData )
1750{
1751    PexDiffs * diffs = userData;
1752    tr_pex * pex = vpex;
1753    diffs->elements[diffs->elementCount++] = *pex;
1754}
1755
1756static void
1757sendPex( tr_peermsgs * msgs )
1758{
1759    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
1760    {
1761        int i;
1762        tr_pex * newPex = NULL;
1763        const int newCount = tr_peerMgrGetPeers( msgs->handle->peerMgr, msgs->torrent->info.hash, &newPex );
1764        PexDiffs diffs;
1765        tr_benc val, *added, *dropped;
1766        uint8_t *tmp, *walk;
1767        char * benc;
1768        int bencLen;
1769
1770        /* build the diffs */
1771        diffs.added = tr_new( tr_pex, newCount );
1772        diffs.addedCount = 0;
1773        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1774        diffs.droppedCount = 0;
1775        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1776        diffs.elementCount = 0;
1777        tr_set_compare( msgs->pex, msgs->pexCount,
1778                        newPex, newCount,
1779                        tr_pexCompare, sizeof(tr_pex),
1780                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
1781        dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1782
1783        /* update peer */
1784        tr_free( msgs->pex );
1785        msgs->pex = diffs.elements;
1786        msgs->pexCount = diffs.elementCount;
1787
1788        /* build the pex payload */
1789        tr_bencInitDict( &val, 3 );
1790
1791        /* "added" */
1792        added = tr_bencDictAdd( &val, "added" );
1793        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1794        for( i=0; i<diffs.addedCount; ++i ) {
1795            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1796            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1797        }
1798        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1799        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
1800
1801        /* "added.f" */
1802        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1803        for( i=0; i<diffs.addedCount; ++i )
1804            *walk++ = diffs.added[i].flags;
1805        assert( ( walk - tmp ) == diffs.addedCount );
1806        tr_bencDictAddRaw( &val, "added.f", tmp, walk-tmp );
1807
1808        /* "dropped" */
1809        dropped = tr_bencDictAdd( &val, "dropped" );
1810        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1811        for( i=0; i<diffs.droppedCount; ++i ) {
1812            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1813            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1814        }
1815        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1816        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
1817
1818        /* write the pex message */
1819        benc = tr_bencSave( &val, &bencLen );
1820        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 2*sizeof(uint8_t) + bencLen );
1821        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_LTEP );
1822        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, msgs->ut_pex_id );
1823        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, benc, bencLen );
1824        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1825
1826        /* cleanup */
1827        tr_free( benc );
1828        tr_bencFree( &val );
1829        tr_free( diffs.added );
1830        tr_free( diffs.dropped );
1831        tr_free( newPex );
1832
1833        msgs->clientSentPexAt = time( NULL );
1834    }
1835}
1836
1837static int
1838pexPulse( void * vpeer )
1839{
1840    sendPex( vpeer );
1841    return TRUE;
1842}
1843
1844/**
1845***
1846**/
1847
1848tr_peermsgs*
1849tr_peerMsgsNew( struct tr_torrent * torrent,
1850                struct tr_peer    * info,
1851                tr_delivery_func    func,
1852                void              * userData,
1853                tr_publisher_tag  * setme )
1854{
1855    tr_peermsgs * m;
1856
1857    assert( info != NULL );
1858    assert( info->io != NULL );
1859
1860    m = tr_new0( tr_peermsgs, 1 );
1861    m->publisher = tr_publisherNew( );
1862    m->info = info;
1863    m->handle = torrent->handle;
1864    m->torrent = torrent;
1865    m->io = info->io;
1866    m->info->clientIsChoked = 1;
1867    m->info->peerIsChoked = 1;
1868    m->info->clientIsInterested = 0;
1869    m->info->peerIsInterested = 0;
1870    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1871    m->state = AWAITING_BT_LENGTH;
1872    m->pulseTimer = tr_timerNew( m->handle, pulse, m, PEER_PULSE_INTERVAL );
1873    m->rateTimer = tr_timerNew( m->handle, ratePulse, m, RATE_PULSE_INTERVAL );
1874    m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL );
1875    m->outMessages = evbuffer_new( );
1876    m->outMessagesBatchedAt = 0;
1877    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1878    m->incoming.block = evbuffer_new( );
1879    m->outBlock = evbuffer_new( );
1880    m->peerAllowedPieces = NULL;
1881    m->peerAskedFor = REQUEST_LIST_INIT;
1882    m->peerAskedForFast = REQUEST_LIST_INIT;
1883    m->clientAskedFor = REQUEST_LIST_INIT;
1884    m->clientWillAskFor = REQUEST_LIST_INIT;
1885    *setme = tr_publisherSubscribe( m->publisher, func, userData );
1886
1887    if ( tr_peerIoSupportsLTEP( m->io ) )
1888        sendLtepHandshake( m );
1889
1890    sendBitfield( m );
1891   
1892    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of inactivity */
1893    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
1894    ratePulse( m );
1895
1896    return m;
1897}
1898
1899void
1900tr_peerMsgsFree( tr_peermsgs* msgs )
1901{
1902    if( msgs != NULL )
1903    {
1904        tr_timerFree( &msgs->pulseTimer );
1905        tr_timerFree( &msgs->rateTimer );
1906        tr_timerFree( &msgs->pexTimer );
1907        tr_publisherFree( &msgs->publisher );
1908        reqListClear( &msgs->clientWillAskFor );
1909        reqListClear( &msgs->clientAskedFor );
1910        reqListClear( &msgs->peerAskedForFast );
1911        reqListClear( &msgs->peerAskedFor );
1912        tr_bitfieldFree( msgs->peerAllowedPieces );
1913        evbuffer_free( msgs->incoming.block );
1914        evbuffer_free( msgs->outMessages );
1915        evbuffer_free( msgs->outBlock );
1916        tr_free( msgs->pex );
1917
1918        memset( msgs, ~0, sizeof( tr_peermsgs ) );
1919        tr_free( msgs );
1920    }
1921}
1922
1923tr_publisher_tag
1924tr_peerMsgsSubscribe( tr_peermsgs       * peer,
1925                      tr_delivery_func    func,
1926                      void              * userData )
1927{
1928    return tr_publisherSubscribe( peer->publisher, func, userData );
1929}
1930
1931void
1932tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
1933                        tr_publisher_tag    tag )
1934{
1935    tr_publisherUnsubscribe( peer->publisher, tag );
1936}
Note: See TracBrowser for help on using the repository browser.