source: trunk/libtransmission/peer-msgs.c @ 4053

Last change on this file since 4053 was 4053, checked in by charles, 15 years ago

small tweak to play nice with old C compilers. Thanks to fizz for reporting & submitting a patch

  • Property svn:keywords set to Date Rev Author Id
File size: 55.7 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 4053 2007-12-03 19:52:55Z charles $
11 */
12
13#include <assert.h>
14#include <ctype.h>
15#include <errno.h>
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19#include <libgen.h> /* basename */
20
21#include <netinet/in.h> /* struct in_addr */
22
23#include <event.h>
24
25#include "transmission.h"
26#include "bencode.h"
27#include "completion.h"
28#include "inout.h"
29#include "list.h"
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-mgr-private.h"
33#include "peer-msgs.h"
34#include "ratecontrol.h"
35#include "stats.h"
36#include "trevent.h"
37#include "utils.h"
38
39/**
40***
41**/
42
43enum
44{
45    BT_CHOKE                = 0,
46    BT_UNCHOKE              = 1,
47    BT_INTERESTED           = 2,
48    BT_NOT_INTERESTED       = 3,
49    BT_HAVE                 = 4,
50    BT_BITFIELD             = 5,
51    BT_REQUEST              = 6,
52    BT_PIECE                = 7,
53    BT_CANCEL               = 8,
54    BT_PORT                 = 9,
55    BT_SUGGEST              = 13,
56    BT_HAVE_ALL             = 14,
57    BT_HAVE_NONE            = 15,
58    BT_REJECT               = 16,
59    BT_ALLOWED_FAST         = 17,
60    BT_LTEP                 = 20,
61
62    LTEP_HANDSHAKE          = 0,
63
64    TR_LTEP_PEX             = 1,
65
66    MAX_REQUEST_BYTE_COUNT  = (16 * 1024), /* drop requests who want too much */
67
68    /* idle seconds before we send a keepalive */
69    KEEPALIVE_INTERVAL_SECS = 90,
70
71    PEX_INTERVAL            = (60 * 1000), /* msec between calls to sendPex() */
72    PEER_PULSE_INTERVAL     = (100),       /* msec between calls to pulse() */
73    RATE_PULSE_INTERVAL     = (250),       /* msec between calls to ratePulse() */
74
75    MAX_OUTBUF_SIZE         = 4096,
76     
77    /* Fast Peers Extension constants */
78    MAX_FAST_ALLOWED_COUNT   = 10,          /* max. number of pieces we fast-allow to another peer */
79    MAX_FAST_ALLOWED_THRESHOLD = 10,        /* max threshold for allowing fast-pieces requests */
80
81};
82
83enum
84{
85    AWAITING_BT_LENGTH,
86    AWAITING_BT_ID,
87    AWAITING_BT_MESSAGE,
88    AWAITING_BT_PIECE
89};
90
91struct peer_request
92{
93    uint32_t index;
94    uint32_t offset;
95    uint32_t length;
96    time_t time_requested;
97};
98
99static int
100compareRequest( const void * va, const void * vb )
101{
102    int i;
103    const struct peer_request * a = va;
104    const struct peer_request * b = vb;
105    if(( i = tr_compareUint32( a->index, b->index ))) return i;
106    if(( i = tr_compareUint32( a->offset, b->offset ))) return i;
107    if(( i = tr_compareUint32( a->length, b->length ))) return i;
108    return 0;
109}
110
111/* this is raw, unchanged data from the peer regarding
112 * the current message that it's sending us. */
113struct tr_incoming
114{
115    uint32_t length; /* includes the +1 for id length */
116    uint8_t id;
117    struct peer_request blockReq; /* metadata for incoming blocks */
118    struct evbuffer * block; /* piece data for incoming blocks */
119};
120
121struct tr_peermsgs
122{
123    tr_peer * info;
124
125    tr_handle * handle;
126    tr_torrent * torrent;
127    tr_peerIo * io;
128
129    tr_publisher_t * publisher;
130
131    struct evbuffer * outBlock;    /* buffer of all the current piece message */
132    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
133    tr_list * peerAskedFor;
134    tr_list * peerAskedForFast;
135    tr_list * clientAskedFor;
136    tr_list * clientWillAskFor;
137
138    tr_timer * rateTimer;
139    tr_timer * pulseTimer;
140    tr_timer * pexTimer;
141
142    time_t lastReqAddedAt;
143    time_t clientSentPexAt;
144    time_t clientSentAnythingAt;
145
146    time_t clientSentPieceDataAt;
147    time_t peerSentPieceDataAt;
148
149    unsigned int peerSentBitfield         : 1;
150    unsigned int peerSupportsPex          : 1;
151    unsigned int clientSentLtepHandshake  : 1;
152    unsigned int peerSentLtepHandshake    : 1;
153    unsigned int sendingBlock             : 1;
154   
155    tr_bitfield * clientAllowedPieces;
156    tr_bitfield * peerAllowedPieces;
157   
158    tr_bitfield * clientSuggestedPieces;
159   
160    uint8_t state;
161    uint8_t ut_pex_id;
162    uint16_t pexCount;
163    uint32_t maxActiveRequests;
164    uint32_t minActiveRequests;
165
166    struct tr_incoming incoming; 
167
168    tr_pex * pex;
169};
170
171/**
172***
173**/
174
175static void
176myDebug( const char * file, int line,
177         const struct tr_peermsgs * msgs,
178         const char * fmt, ... )
179{
180    FILE * fp = tr_getLog( );
181    if( fp != NULL )
182    {
183        va_list args;
184        char timestr[64];
185        struct evbuffer * buf = evbuffer_new( );
186        char * myfile = tr_strdup( file );
187
188        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
189                             tr_getLogTimeStr( timestr, sizeof(timestr) ),
190                             msgs->torrent->info.name,
191                             tr_peerIoGetAddrStr( msgs->io ),
192                             msgs->info->client );
193        va_start( args, fmt );
194        evbuffer_add_vprintf( buf, fmt, args );
195        va_end( args );
196        evbuffer_add_printf( buf, " (%s:%d)\n", basename(myfile), line );
197        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
198
199        tr_free( myfile );
200        evbuffer_free( buf );
201    }
202}
203
204#define dbgmsg(msgs, fmt...) myDebug(__FILE__, __LINE__, msgs, ##fmt )
205
206/**
207***
208**/
209
210static void
211protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
212{
213    tr_peerIo * io = msgs->io;
214    struct evbuffer * out = msgs->outMessages;
215
216    dbgmsg( msgs, "requesting %u:%u->%u", req->index, req->offset, req->length );
217    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
218    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
219    tr_peerIoWriteUint32( io, out, req->index );
220    tr_peerIoWriteUint32( io, out, req->offset );
221    tr_peerIoWriteUint32( io, out, req->length );
222}
223
224static void
225protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
226{
227    tr_peerIo * io = msgs->io;
228    struct evbuffer * out = msgs->outMessages;
229
230    dbgmsg( msgs, "cancelling %u:%u->%u", req->index, req->offset, req->length );
231    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
232    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
233    tr_peerIoWriteUint32( io, out, req->index );
234    tr_peerIoWriteUint32( io, out, req->offset );
235    tr_peerIoWriteUint32( io, out, req->length );
236}
237
238static void
239protocolSendHave( tr_peermsgs * msgs, uint32_t index )
240{
241    tr_peerIo * io = msgs->io;
242    struct evbuffer * out = msgs->outMessages;
243
244    dbgmsg( msgs, "sending Have %u", index );
245    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
246    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
247    tr_peerIoWriteUint32( io, out, index );
248}
249
250static void
251protocolSendChoke( tr_peermsgs * msgs, int choke )
252{
253    tr_peerIo * io = msgs->io;
254    struct evbuffer * out = msgs->outMessages;
255
256    dbgmsg( msgs, "sending %s", (choke ? "Choke" : "Unchoke") );
257    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) );
258    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
259}
260
261/**
262***  EVENTS
263**/
264
265static const tr_peermsgs_event blankEvent = { 0, 0, 0, 0, 0.0f };
266
267static void
268publish( tr_peermsgs * msgs, tr_peermsgs_event * e )
269{
270    tr_publisherPublish( msgs->publisher, msgs->info, e );
271}
272
273static void
274fireGotAssertError( tr_peermsgs * msgs )
275{
276    tr_peermsgs_event e = blankEvent;
277    e.eventType = TR_PEERMSG_GOT_ASSERT_ERROR;
278    publish( msgs, &e );
279}
280
281static void
282fireGotError( tr_peermsgs * msgs )
283{
284    tr_peermsgs_event e = blankEvent;
285    e.eventType = TR_PEERMSG_GOT_ERROR;
286    publish( msgs, &e );
287}
288
289static void
290fireNeedReq( tr_peermsgs * msgs )
291{
292    tr_peermsgs_event e = blankEvent;
293    e.eventType = TR_PEERMSG_NEED_REQ;
294    publish( msgs, &e );
295}
296
297static void
298firePeerProgress( tr_peermsgs * msgs )
299{
300    tr_peermsgs_event e = blankEvent;
301    e.eventType = TR_PEERMSG_PEER_PROGRESS;
302    e.progress = msgs->info->progress;
303    publish( msgs, &e );
304}
305
306static void
307fireClientHave( tr_peermsgs * msgs, uint32_t pieceIndex )
308{
309    tr_peermsgs_event e = blankEvent;
310    e.eventType = TR_PEERMSG_CLIENT_HAVE;
311    e.pieceIndex = pieceIndex;
312    publish( msgs, &e );
313}
314
315static void
316fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
317{
318    tr_peermsgs_event e = blankEvent;
319    e.eventType = TR_PEERMSG_CLIENT_BLOCK;
320    e.pieceIndex = req->index;
321    e.offset = req->offset;
322    e.length = req->length;
323    publish( msgs, &e );
324}
325
326static void
327fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
328{
329    tr_peermsgs_event e = blankEvent;
330    e.eventType = TR_PEERMSG_CANCEL;
331    e.pieceIndex = req->index;
332    e.offset = req->offset;
333    e.length = req->length;
334    publish( msgs, &e );
335}
336
337/**
338***  INTEREST
339**/
340
341static int
342isPieceInteresting( const tr_peermsgs   * peer,
343                    int                   piece )
344{
345    const tr_torrent * torrent = peer->torrent;
346    if( torrent->info.pieces[piece].dnd ) /* we don't want it */
347        return FALSE;
348    if( tr_cpPieceIsComplete( torrent->completion, piece ) ) /* we have it */
349        return FALSE;
350    if( !tr_bitfieldHas( peer->info->have, piece ) ) /* peer doesn't have it */
351        return FALSE;
352    if( tr_bitfieldHas( peer->info->banned, piece ) ) /* peer is banned */
353        return FALSE;
354    return TRUE;
355}
356
357/* "interested" means we'll ask for piece data if they unchoke us */
358static int
359isPeerInteresting( const tr_peermsgs * msgs )
360{
361    int i;
362    const tr_torrent * torrent;
363    const tr_bitfield * bitfield;
364    const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
365
366    if( clientIsSeed )
367        return FALSE;
368
369    torrent = msgs->torrent;
370    bitfield = tr_cpPieceBitfield( torrent->completion );
371
372    if( !msgs->info->have )
373        return TRUE;
374
375    assert( bitfield->len == msgs->info->have->len );
376    for( i=0; i<torrent->info.pieceCount; ++i )
377        if( isPieceInteresting( msgs, i ) )
378            return TRUE;
379
380    return FALSE;
381}
382
383static void
384sendInterest( tr_peermsgs * msgs, int weAreInterested )
385{
386    assert( msgs != NULL );
387    assert( weAreInterested==0 || weAreInterested==1 );
388
389    msgs->info->clientIsInterested = weAreInterested;
390    dbgmsg( msgs, "Sending %s",
391            weAreInterested ? "Interested" : "Not Interested");
392
393    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
394    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages,
395                   weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
396}
397
398static void
399updateInterest( tr_peermsgs * msgs )
400{
401    const int i = isPeerInteresting( msgs );
402    if( i != msgs->info->clientIsInterested )
403        sendInterest( msgs, i );
404    if( i )
405        fireNeedReq( msgs );
406}
407
408#define MIN_CHOKE_PERIOD_SEC 10
409
410static void
411cancelAllRequestsToClientExceptFast( tr_peermsgs * msgs )
412{
413    tr_list_free( &msgs->peerAskedFor, tr_free );
414}
415
416void
417tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
418{
419    const time_t fibrillationTime = time(NULL) - MIN_CHOKE_PERIOD_SEC;
420
421    assert( msgs != NULL );
422    assert( msgs->info != NULL );
423    assert( choke==0 || choke==1 );
424
425    if( msgs->info->chokeChangedAt > fibrillationTime )
426    {
427        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
428    }
429    else if( msgs->info->peerIsChoked != choke )
430    {
431        msgs->info->peerIsChoked = choke;
432        if( choke )
433            cancelAllRequestsToClientExceptFast( msgs );
434        protocolSendChoke( msgs, choke );
435        msgs->info->chokeChangedAt = time( NULL );
436    }
437}
438
439/**
440***
441**/
442
443void
444tr_peerMsgsHave( tr_peermsgs * msgs,
445                 uint32_t      index )
446{
447    protocolSendHave( msgs, index );
448
449    /* since we have more pieces now, we might not be interested in this peer */
450    updateInterest( msgs );
451}
452#if 0
453static void
454sendFastSuggest( tr_peermsgs * msgs,
455                 uint32_t      pieceIndex )
456{
457    assert( msgs != NULL );
458   
459    if( tr_peerIoSupportsFEXT( msgs->io ) )
460    {
461        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
462        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
463        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
464    }
465}
466#endif
467static void
468sendFastHave( tr_peermsgs * msgs, int all )
469{
470    assert( msgs != NULL );
471   
472    if( tr_peerIoSupportsFEXT( msgs->io ) )
473    {
474        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
475        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, ( all ? BT_HAVE_ALL
476                                                                : BT_HAVE_NONE ) );
477        updateInterest( msgs );
478    }
479}
480
481static void
482sendFastReject( tr_peermsgs * msgs,
483                uint32_t      pieceIndex,
484                uint32_t      offset,
485                uint32_t      length )
486{
487    assert( msgs != NULL );
488
489    if( tr_peerIoSupportsFEXT( msgs->io ) )
490    {
491        const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
492        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
493        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_REJECT );
494        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
495        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
496        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
497    }
498}
499
500static void
501sendFastAllowed( tr_peermsgs * msgs,
502                 uint32_t      pieceIndex)
503{
504    assert( msgs != NULL );
505   
506    if( tr_peerIoSupportsFEXT( msgs->io ) )
507    {
508        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
509        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_ALLOWED_FAST );
510        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
511    }
512}
513
514
515
516static void
517sendFastAllowedSet( tr_peermsgs * msgs )
518{
519    int i = 0;
520    while (i <= msgs->torrent->info.pieceCount )
521    {
522        if ( tr_bitfieldHas( msgs->peerAllowedPieces, i) )
523            sendFastAllowed( msgs, i );
524        i++;
525    }
526}
527
528
529/**
530***
531**/
532
533static int
534reqIsValid( const tr_peermsgs   * msgs,
535            uint32_t              index,
536            uint32_t              offset,
537            uint32_t              length )
538{
539    const tr_torrent * tor = msgs->torrent;
540
541    if( index >= (uint32_t) tor->info.pieceCount )
542        return FALSE;
543    if ( (int)offset >= tr_torPieceCountBytes( tor, (int)index ) )
544        return FALSE;
545    if( length > MAX_REQUEST_BYTE_COUNT )
546        return FALSE;
547    if( tr_pieceOffset( tor, index, offset, length ) > tor->info.totalSize )
548        return FALSE;
549
550    return TRUE;
551}
552
553static int
554requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
555{
556    return reqIsValid( msgs, req->index, req->offset, req->length );
557}
558
559static void
560pumpRequestQueue( tr_peermsgs * msgs )
561{
562    const int max = msgs->maxActiveRequests;
563    const int min = msgs->minActiveRequests;
564    int count = tr_list_size( msgs->clientAskedFor );
565    int sent = 0;
566
567    if( count > min )
568        return;
569    if( msgs->info->clientIsChoked )
570        return;
571
572    while( ( count < max ) && ( msgs->clientWillAskFor != NULL ) )
573    {
574        struct peer_request * r = tr_list_pop_front( &msgs->clientWillAskFor );
575        assert( requestIsValid( msgs, r ) );
576        assert( tr_bitfieldHas( msgs->info->have, r->index ) );
577        protocolSendRequest( msgs, r );
578        r->time_requested = msgs->lastReqAddedAt = time( NULL );
579        tr_list_append( &msgs->clientAskedFor, r );
580        ++count;
581        ++sent;
582    }
583
584    if( sent )
585        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
586                sent,
587                tr_list_size(msgs->clientAskedFor),
588                tr_list_size(msgs->clientWillAskFor) );
589
590    if( count < max )
591        fireNeedReq( msgs );
592}
593
594static int
595pulse( void * vmsgs );
596
597int
598tr_peerMsgsAddRequest( tr_peermsgs * msgs,
599                       uint32_t      index, 
600                       uint32_t      offset, 
601                       uint32_t      length )
602{
603    const int req_max = msgs->maxActiveRequests;
604    struct peer_request tmp, *req;
605
606    assert( msgs != NULL );
607    assert( msgs->torrent != NULL );
608    assert( reqIsValid( msgs, index, offset, length ) );
609
610    /**
611    ***  Reasons to decline the request
612    **/
613
614    /* don't send requests to choked clients */
615    if( msgs->info->clientIsChoked ) {
616        dbgmsg( msgs, "declining request because they're choking us" );
617        return TR_ADDREQ_CLIENT_CHOKED;
618    }
619
620    /* peer doesn't have this piece */
621    if( !tr_bitfieldHas( msgs->info->have, index ) )
622        return TR_ADDREQ_MISSING;
623
624    /* peer's queue is full */
625    if( tr_list_size( msgs->clientWillAskFor ) >= req_max ) {
626        dbgmsg( msgs, "declining request because we're full" );
627        return TR_ADDREQ_FULL;
628    }
629
630    /* have we already asked for this piece? */
631    tmp.index = index;
632    tmp.offset = offset;
633    tmp.length = length;
634    if( tr_list_find( msgs->clientAskedFor, &tmp, compareRequest ) ) {
635        dbgmsg( msgs, "declining because it's a duplicate" );
636        return TR_ADDREQ_DUPLICATE;
637    }
638    if( tr_list_find( msgs->clientWillAskFor, &tmp, compareRequest ) ) {
639        dbgmsg( msgs, "declining because it's a duplicate" );
640        return TR_ADDREQ_DUPLICATE;
641    }
642
643    /**
644    ***  Accept this request
645    **/
646
647    dbgmsg( msgs, "added req for piece %d, offset %d", (int)index, (int)offset );
648    req = tr_new0( struct peer_request, 1 );
649    *req = tmp;
650    tr_list_append( &msgs->clientWillAskFor, req );
651    return TR_ADDREQ_OK;
652}
653
654static void
655cancelAllRequestsToPeer( tr_peermsgs * msgs )
656{
657    struct peer_request * req;
658
659    while(( req = tr_list_pop_front( &msgs->clientWillAskFor ) ))
660    {
661        fireCancelledReq( msgs, req );
662        tr_free( req );
663    }
664
665    while(( req = tr_list_pop_front( &msgs->clientAskedFor ) ))
666    {
667        fireCancelledReq( msgs, req );
668        protocolSendCancel( msgs, req );
669        tr_free( req );
670    }
671}
672
673void
674tr_peerMsgsCancel( tr_peermsgs * msgs,
675                   uint32_t      pieceIndex,
676                   uint32_t      offset,
677                   uint32_t      length )
678{
679    struct peer_request *req, tmp;
680
681    assert( msgs != NULL );
682    assert( length > 0 );
683
684    /* have we asked the peer for this piece? */
685    tmp.index = pieceIndex;
686    tmp.offset = offset;
687    tmp.length = length;
688
689    /* if it's only in the queue and hasn't been sent yet, free it */
690    if(( req = tr_list_remove( &msgs->clientWillAskFor, &tmp, compareRequest ) ))
691    {
692        fireCancelledReq( msgs, req );
693        tr_free( req );
694    }
695
696    /* if it's already been sent, send a cancel message too */
697    if(( req = tr_list_remove( &msgs->clientAskedFor, &tmp, compareRequest ) ))
698    {
699        protocolSendCancel( msgs, req );
700        fireCancelledReq( msgs, req );
701        tr_free( req );
702    }
703}
704
705/**
706***
707**/
708
709static void
710sendLtepHandshake( tr_peermsgs * msgs )
711{
712    benc_val_t val, *m;
713    char * buf;
714    int len;
715    int pex;
716    const char * v = TR_NAME " " USERAGENT_PREFIX;
717    const int port = tr_getPublicPort( msgs->handle );
718    struct evbuffer * outbuf;
719
720    if( msgs->clientSentLtepHandshake )
721        return;
722
723    outbuf = evbuffer_new( );
724    dbgmsg( msgs, "sending an ltep handshake" );
725    msgs->clientSentLtepHandshake = 1;
726
727    /* decide if we want to advertise pex support */
728    if( !tr_torrentIsPexEnabled( msgs->torrent ) )
729        pex = 0;
730    else if( msgs->peerSentLtepHandshake )
731        pex = msgs->peerSupportsPex ? 1 : 0;
732    else
733        pex = 1;
734
735    tr_bencInit( &val, TYPE_DICT );
736    tr_bencDictReserve( &val, 4 );
737    tr_bencInitInt( tr_bencDictAdd( &val, "e" ), 1 );
738    m  = tr_bencDictAdd( &val, "m" );
739    tr_bencInit( m, TYPE_DICT );
740    if( pex ) {
741        tr_bencDictReserve( m, 1 );
742        tr_bencInitInt( tr_bencDictAdd( m, "ut_pex" ), TR_LTEP_PEX );
743    }
744    if( port > 0 )
745        tr_bencInitInt( tr_bencDictAdd( &val, "p" ), port );
746    tr_bencInitStr( tr_bencDictAdd( &val, "v" ), v, 0, 1 );
747    buf = tr_bencSave( &val, &len );
748
749    tr_peerIoWriteUint32( msgs->io, outbuf, 2*sizeof(uint8_t) + len );
750    tr_peerIoWriteUint8 ( msgs->io, outbuf, BT_LTEP );
751    tr_peerIoWriteUint8 ( msgs->io, outbuf, LTEP_HANDSHAKE );
752    tr_peerIoWriteBytes ( msgs->io, outbuf, buf, len );
753
754    tr_peerIoWriteBuf( msgs->io, outbuf );
755
756#if 0
757    dbgmsg( msgs, "here is the ltep handshake we sent:" );
758    tr_bencPrint( &val );
759    dbgmsg( msgs, "here is the ltep handshake we read [%s]:", tr_bencSave( &val, NULL ) );
760#endif
761
762    /* cleanup */
763    tr_bencFree( &val );
764    tr_free( buf );
765    evbuffer_free( outbuf );
766}
767
768static void
769parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
770{
771    benc_val_t val, * sub;
772    uint8_t * tmp = tr_new( uint8_t, len );
773
774    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
775    msgs->peerSentLtepHandshake = 1;
776
777    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
778        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
779        tr_free( tmp );
780        return;
781    }
782
783#if 0
784    dbgmsg( msgs, "here is the ltep handshake we read:" );
785    tr_bencPrint( &val );
786    dbgmsg( msgs, "here is the ltep handshake we read [%s]:", tr_bencSave( &val, NULL ) );
787#endif
788
789    /* does the peer prefer encrypted connections? */
790    if(( sub = tr_bencDictFindType( &val, "e", TYPE_INT )))
791        msgs->info->encryption_preference = sub->val.i
792                                      ? ENCRYPTION_PREFERENCE_YES
793                                      : ENCRYPTION_PREFERENCE_NO;
794
795    /* check supported messages for utorrent pex */
796    if(( sub = tr_bencDictFindType( &val, "m", TYPE_DICT ))) {
797        if(( sub = tr_bencDictFindType( sub, "ut_pex", TYPE_INT ))) {
798            msgs->peerSupportsPex = 1;
799            msgs->ut_pex_id = (uint8_t) sub->val.i;
800            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
801        }
802    }
803
804    /* get peer's listening port */
805    if(( sub = tr_bencDictFindType( &val, "p", TYPE_INT ))) {
806        msgs->info->port = htons( (uint16_t)sub->val.i );
807        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
808    }
809
810    tr_bencFree( &val );
811    tr_free( tmp );
812}
813
814static void
815parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
816{
817    benc_val_t val, * sub;
818    uint8_t * tmp;
819
820    if( !tr_torrentIsPexEnabled( msgs->torrent ) ) /* no sharing! */
821        return;
822
823    tmp = tr_new( uint8_t, msglen );
824    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
825
826    if( tr_bencLoad( tmp, msglen, &val, NULL ) || ( val.type != TYPE_DICT ) ) {
827        dbgmsg( msgs, "GET can't read extended-pex dictionary" );
828        tr_free( tmp );
829        return;
830    }
831
832    if(( sub = tr_bencDictFindType( &val, "added", TYPE_STR ))) {
833        const int n = sub->val.s.i / 6 ;
834        dbgmsg( msgs, "got %d peers from uT pex", n );
835        tr_peerMgrAddPeers( msgs->handle->peerMgr,
836                            msgs->torrent->info.hash,
837                            TR_PEER_FROM_PEX,
838                            (uint8_t*)sub->val.s.s, n );
839    }
840
841    tr_bencFree( &val );
842    tr_free( tmp );
843}
844
845static void
846sendPex( tr_peermsgs * msgs );
847
848static void
849parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
850{
851    uint8_t ltep_msgid;
852
853    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
854    msglen--;
855
856    if( ltep_msgid == LTEP_HANDSHAKE )
857    {
858        dbgmsg( msgs, "got ltep handshake" );
859        parseLtepHandshake( msgs, msglen, inbuf );
860        if( tr_peerIoSupportsLTEP( msgs->io ) )
861        {
862            sendLtepHandshake( msgs );
863            sendPex( msgs );
864        }
865    }
866    else if( ltep_msgid == TR_LTEP_PEX )
867    {
868        dbgmsg( msgs, "got ut pex" );
869        msgs->peerSupportsPex = 1;
870        parseUtPex( msgs, msglen, inbuf );
871    }
872    else
873    {
874        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
875        evbuffer_drain( inbuf, msglen );
876    }
877}
878
879static int
880readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
881{
882    uint32_t len;
883
884    if( inlen < sizeof(len) )
885        return READ_MORE;
886
887    tr_peerIoReadUint32( msgs->io, inbuf, &len );
888
889    if( len == 0 ) /* peer sent us a keepalive message */
890        dbgmsg( msgs, "got KeepAlive" );
891    else {
892        msgs->incoming.length = len;
893        msgs->state = AWAITING_BT_ID;
894    }
895
896    return READ_AGAIN;
897}
898
899static int
900readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen );
901
902static int
903readBtId( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
904{
905    uint8_t id;
906
907    if( inlen < sizeof(uint8_t) )
908        return READ_MORE;
909
910    tr_peerIoReadUint8( msgs->io, inbuf, &id );
911    msgs->incoming.id = id;
912
913    if( id==BT_PIECE )
914    {
915        msgs->state = AWAITING_BT_PIECE;
916        return READ_AGAIN;
917    }
918    else if( msgs->incoming.length != 1 )
919    {
920        msgs->state = AWAITING_BT_MESSAGE;
921        return READ_AGAIN;
922    }
923    else return readBtMessage( msgs, inbuf, inlen-1 );
924}
925
926static void
927updatePeerProgress( tr_peermsgs * msgs )
928{
929    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
930    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
931    updateInterest( msgs );
932    firePeerProgress( msgs );
933}
934
935static int
936clientCanSendFastBlock( const tr_peermsgs * msgs UNUSED )
937{
938    /* We can't send a fast piece if a peer has more than MAX_FAST_ALLOWED_THRESHOLD pieces */
939    if( tr_bitfieldCountTrueBits( msgs->info->have ) > MAX_FAST_ALLOWED_THRESHOLD )
940        return FALSE;
941   
942    /* ...or if we don't have ourself enough pieces */
943    if( tr_bitfieldCountTrueBits( tr_cpPieceBitfield( msgs->torrent->completion ) ) < MAX_FAST_ALLOWED_THRESHOLD )
944        return FALSE;
945
946    /* Maybe a bandwidth limit ? */
947    return TRUE;
948}
949
950static void
951peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
952{
953    const int reqIsValid = requestIsValid( msgs, req );
954    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
955    const int peerIsChoked = msgs->info->peerIsChoked;
956    const int peerIsFast = tr_peerIoSupportsFEXT( msgs->io );
957    const int pieceIsFast = reqIsValid && tr_bitfieldHas( msgs->peerAllowedPieces, req->index );
958    const int canSendFast = clientCanSendFastBlock( msgs );
959
960    if( !reqIsValid ) /* bad request */
961    {
962        dbgmsg( msgs, "rejecting an invalid request." );
963        sendFastReject( msgs, req->index, req->offset, req->length );
964    }
965    else if( !clientHasPiece ) /* we don't have it */
966    {
967        dbgmsg( msgs, "rejecting request for a piece we don't have." );
968        sendFastReject( msgs, req->index, req->offset, req->length );
969    }
970    else if( peerIsChoked && !peerIsFast ) /* doesn't he know he's choked? */
971    {
972        tr_peerMsgsSetChoke( msgs, 1 );
973        sendFastReject( msgs, req->index, req->offset, req->length );
974    }
975    else if( peerIsChoked && peerIsFast && ( !pieceIsFast || !canSendFast ) )
976    {
977        sendFastReject( msgs, req->index, req->offset, req->length );
978    }
979    else /* YAY */
980    {
981        struct peer_request * tmp = tr_new( struct peer_request, 1 );
982        *tmp = *req;
983        if( peerIsFast && pieceIsFast )
984            tr_list_append( &msgs->peerAskedForFast, tmp );
985        else
986            tr_list_append( &msgs->peerAskedFor, tmp );
987    }
988}
989
990static int
991messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
992{
993    switch( id )
994    {
995        case BT_CHOKE:
996        case BT_UNCHOKE:
997        case BT_INTERESTED:
998        case BT_NOT_INTERESTED:
999        case BT_HAVE_ALL:
1000        case BT_HAVE_NONE:
1001            return len==1;
1002
1003        case BT_HAVE:
1004        case BT_SUGGEST:
1005        case BT_ALLOWED_FAST:
1006            return len==5;
1007
1008        case BT_BITFIELD:
1009            return len == (msg->torrent->info.pieceCount+7u)/8u + 1u;
1010       
1011        case BT_REQUEST:
1012        case BT_CANCEL:
1013        case BT_REJECT:
1014            return len==13;
1015
1016        case BT_PIECE:
1017            return len>9 && len<=16393;
1018
1019        case BT_PORT:
1020            return len==3;
1021
1022        case BT_LTEP:
1023            return len >= 2;
1024
1025        default:
1026            return FALSE;
1027    }
1028}
1029
1030static int
1031clientGotBlock( tr_peermsgs * msgs, const uint8_t * block, const struct peer_request * req );
1032
1033static void
1034clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1035{
1036    tr_torrent * tor = msgs->torrent;
1037    tor->activityDate = tr_date( );
1038    tor->downloadedCur += byteCount;
1039    msgs->peerSentPieceDataAt = time( NULL );
1040    msgs->info->pieceDataActivityDate = time( NULL );
1041    msgs->info->credit += (int)(byteCount * SWIFT_REPAYMENT_RATIO);
1042    tr_rcTransferred( msgs->info->rcToClient, byteCount );
1043    tr_rcTransferred( tor->download, byteCount );
1044    tr_rcTransferred( tor->handle->download, byteCount );
1045    tr_statsAddDownloaded( msgs->handle, byteCount );
1046}
1047
1048static int
1049readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1050{
1051    struct peer_request * req = &msgs->incoming.blockReq;
1052    assert( EVBUFFER_LENGTH(inbuf) >= inlen );
1053    dbgmsg( msgs, "In readBtPiece" );
1054
1055    if( !req->length )
1056    {
1057        if( inlen < 8 )
1058            return READ_MORE;
1059
1060        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
1061        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
1062        req->length = msgs->incoming.length - 9;
1063        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1064        return READ_AGAIN;
1065    }
1066    else
1067    {
1068        int err;
1069
1070        /* read in another chunk of data */
1071        const size_t nLeft = req->length - EVBUFFER_LENGTH(msgs->incoming.block);
1072        size_t n = MIN( nLeft, inlen );
1073        uint8_t * buf = tr_new( uint8_t, n );
1074        assert( EVBUFFER_LENGTH(inbuf) >= n );
1075        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
1076        evbuffer_add( msgs->incoming.block, buf, n );
1077        clientGotBytes( msgs, n );
1078        tr_free( buf );
1079        dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
1080               (int)n, req->index, req->offset, req->length,
1081               (int)( req->length - EVBUFFER_LENGTH(msgs->incoming.block) ) );
1082        if( EVBUFFER_LENGTH(msgs->incoming.block) < req->length )
1083            return READ_MORE;
1084
1085        /* we've got the whole block ... process it */
1086        err = clientGotBlock( msgs, EVBUFFER_DATA(msgs->incoming.block), req );
1087
1088        /* cleanup */
1089        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH(msgs->incoming.block) );
1090        req->length = 0;
1091        msgs->state = AWAITING_BT_LENGTH;
1092        if( !err )
1093            return READ_AGAIN;
1094        else {
1095            fireGotAssertError( msgs );
1096            return READ_DONE;
1097        }
1098    }
1099}
1100
1101static int
1102readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1103{
1104    uint32_t ui32;
1105    uint32_t msglen = msgs->incoming.length;
1106    const uint8_t id = msgs->incoming.id;
1107    const size_t startBufLen = EVBUFFER_LENGTH( inbuf );
1108
1109    --msglen; // id length
1110
1111    if( inlen < msglen )
1112        return READ_MORE;
1113
1114    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id, (int)msglen, (int)inlen );
1115
1116    if( !messageLengthIsCorrect( msgs, id, msglen+1 ) )
1117    {
1118        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1119        fireGotError( msgs );
1120        return READ_DONE;
1121    }
1122
1123    switch( id )
1124    {
1125        case BT_CHOKE:
1126            dbgmsg( msgs, "got Choke" );
1127            msgs->info->clientIsChoked = 1;
1128            cancelAllRequestsToPeer( msgs );
1129            cancelAllRequestsToClientExceptFast( msgs );
1130            break;
1131
1132        case BT_UNCHOKE:
1133            dbgmsg( msgs, "got Unchoke" );
1134            msgs->info->clientIsChoked = 0;
1135            fireNeedReq( msgs );
1136            break;
1137
1138        case BT_INTERESTED:
1139            dbgmsg( msgs, "got Interested" );
1140            msgs->info->peerIsInterested = 1;
1141            tr_peerMsgsSetChoke( msgs, 0 );
1142            break;
1143
1144        case BT_NOT_INTERESTED:
1145            dbgmsg( msgs, "got Not Interested" );
1146            msgs->info->peerIsInterested = 0;
1147            break;
1148           
1149        case BT_HAVE:
1150            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1151            dbgmsg( msgs, "got Have: %u", ui32 );
1152            tr_bitfieldAdd( msgs->info->have, ui32 );
1153            /* If this is a fast-allowed piece for this peer, mark it as normal now */
1154            if( msgs->clientAllowedPieces != NULL && tr_bitfieldHas( msgs->clientAllowedPieces, ui32 ) )
1155                tr_bitfieldRem( msgs->clientAllowedPieces, ui32 );
1156            updatePeerProgress( msgs );
1157            tr_rcTransferred( msgs->torrent->swarmspeed, msgs->torrent->info.pieceSize );
1158            break;
1159
1160        case BT_BITFIELD: {
1161            const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
1162            dbgmsg( msgs, "got a bitfield" );
1163            msgs->peerSentBitfield = 1;
1164            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
1165            updatePeerProgress( msgs );
1166            tr_peerMsgsSetChoke( msgs, !clientIsSeed || (msgs->info->progress<1.0) );
1167            fireNeedReq( msgs );
1168            break;
1169        }
1170
1171        case BT_REQUEST: {
1172            struct peer_request req;
1173            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1174            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1175            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1176            dbgmsg( msgs, "got Request: %u:%u->%u", req.index, req.offset, req.length );
1177            peerMadeRequest( msgs, &req );
1178            break;
1179        }
1180
1181        case BT_CANCEL: {
1182            struct peer_request req;
1183            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1184            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1185            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1186            dbgmsg( msgs, "got a Cancel %u:%u->%u", req.index, req.offset, req.length );
1187            tr_free( tr_list_remove( &msgs->peerAskedForFast, &req, compareRequest ) );
1188            tr_free( tr_list_remove( &msgs->peerAskedFor, &req, compareRequest ) );
1189            break;
1190        }
1191
1192        case BT_PIECE:
1193            assert( 0 ); /* should be handled elsewhere! */
1194            break;
1195       
1196        case BT_PORT:
1197            dbgmsg( msgs, "Got a BT_PORT" );
1198            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1199            break;
1200           
1201        case BT_SUGGEST: {
1202            const tr_bitfield * bt = tr_cpPieceBitfield( msgs->torrent->completion );
1203            dbgmsg( msgs, "Got a BT_SUGGEST" );
1204            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1205            if( tr_bitfieldHas( bt, ui32 ) )
1206                tr_bitfieldAdd( msgs->clientSuggestedPieces, ui32 );
1207            break;
1208        }
1209           
1210        case BT_HAVE_ALL:
1211            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1212            tr_bitfieldAddRange( msgs->info->have, 0, msgs->torrent->info.pieceCount );
1213            updatePeerProgress( msgs );
1214            break;
1215       
1216       
1217        case BT_HAVE_NONE:
1218            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1219            tr_bitfieldClear( msgs->info->have );
1220            updatePeerProgress( msgs );
1221            break;
1222       
1223        case BT_REJECT: {
1224            struct peer_request req;
1225            dbgmsg( msgs, "Got a BT_REJECT" );
1226            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1227            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1228            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1229            tr_free( tr_list_remove( &msgs->clientAskedFor, &req, compareRequest ) );
1230            break;
1231        }
1232
1233        case BT_ALLOWED_FAST: {
1234            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1235            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1236            tr_bitfieldAdd( msgs->clientAllowedPieces, ui32 );
1237            break;
1238        }
1239
1240        case BT_LTEP:
1241            dbgmsg( msgs, "Got a BT_LTEP" );
1242            parseLtep( msgs, msglen, inbuf );
1243            break;
1244
1245        default:
1246            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1247            tr_peerIoDrain( msgs->io, inbuf, msglen );
1248            break;
1249    }
1250
1251    assert( msglen + 1 == msgs->incoming.length );
1252    assert( EVBUFFER_LENGTH(inbuf) == startBufLen - msglen );
1253
1254    msgs->state = AWAITING_BT_LENGTH;
1255    return READ_AGAIN;
1256}
1257
1258static void
1259peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1260{
1261    tr_torrent * tor = msgs->torrent;
1262    tor->activityDate = tr_date( );
1263    tor->uploadedCur += byteCount;
1264    msgs->clientSentPieceDataAt = time( NULL );
1265    msgs->info->pieceDataActivityDate = time( NULL );
1266    msgs->info->credit -= byteCount;
1267    tr_rcTransferred( msgs->info->rcToPeer, byteCount );
1268    tr_rcTransferred( tor->upload, byteCount );
1269    tr_rcTransferred( tor->handle->upload, byteCount );
1270    tr_statsAddUploaded( msgs->handle, byteCount );
1271}
1272
1273static size_t
1274getDownloadMax( const tr_peermsgs * msgs )
1275{
1276    static const size_t maxval = ~0;
1277    const tr_torrent * tor = msgs->torrent;
1278
1279    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1280        return tor->handle->useDownloadLimit
1281            ? tr_rcBytesLeft( tor->handle->download ) : maxval;
1282
1283    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
1284        return tr_rcBytesLeft( tor->download );
1285
1286    return maxval;
1287}
1288
1289static void
1290reassignBytesToCorrupt( tr_peermsgs * msgs, uint32_t byteCount )
1291{
1292    tr_torrent * tor = msgs->torrent;
1293
1294    /* increment the `corrupt' field */
1295    tor->corruptCur += byteCount;
1296
1297    /* decrement the `downloaded' field */
1298    if( tor->downloadedCur >= byteCount )
1299        tor->downloadedCur -= byteCount;
1300    else
1301        tor->downloadedCur = 0;
1302}
1303
1304
1305static void
1306gotBadPiece( tr_peermsgs * msgs, uint32_t pieceIndex )
1307{
1308    const uint32_t byteCount =
1309        tr_torPieceCountBytes( msgs->torrent, (int)pieceIndex );
1310    reassignBytesToCorrupt( msgs, byteCount );
1311}
1312
1313static void
1314clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1315{
1316    reassignBytesToCorrupt( msgs, req->length );
1317}
1318
1319static void
1320addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1321{
1322    if( !msgs->info->blame )
1323         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1324    tr_bitfieldAdd( msgs->info->blame, index );
1325}
1326
1327static int
1328clientGotBlock( tr_peermsgs                * msgs,
1329                const uint8_t              * data,
1330                const struct peer_request  * req )
1331{
1332    int i;
1333    tr_torrent * tor = msgs->torrent;
1334    const int block = _tr_block( tor, req->index, req->offset );
1335    struct peer_request *myreq;
1336
1337    assert( msgs != NULL );
1338    assert( req != NULL );
1339
1340    if( req->length != (uint32_t)tr_torBlockCountBytes( msgs->torrent, block ) )
1341    {
1342        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1343                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1344        return TR_ERROR_ASSERT;
1345    }
1346
1347    /* save the block */
1348    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1349
1350    /**
1351    *** Remove the block from our `we asked for this' list
1352    **/
1353
1354    myreq = tr_list_remove( &msgs->clientAskedFor, req, compareRequest );
1355    if( myreq == NULL ) {
1356        clientGotUnwantedBlock( msgs, req );
1357        dbgmsg( msgs, "we didn't ask for this message..." );
1358        return 0;
1359    }
1360
1361    dbgmsg( msgs, "got block %u:%u->%u (turnaround time %d secs)", 
1362                  myreq->index, myreq->offset, myreq->length,
1363                  (int)(time(NULL) - myreq->time_requested) );
1364    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1365                  tr_list_size(msgs->clientAskedFor));
1366
1367    tr_free( myreq );
1368    myreq = NULL;
1369
1370    /**
1371    *** Error checks
1372    **/
1373
1374    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1375        dbgmsg( msgs, "we have this block already..." );
1376        clientGotUnwantedBlock( msgs, req );
1377        return 0;
1378    }
1379
1380    /**
1381    ***  Save the block
1382    **/
1383
1384    msgs->info->peerSentPieceDataAt = time( NULL );
1385    i = tr_ioWrite( tor, req->index, req->offset, req->length, data );
1386    if( i )
1387        return 0;
1388
1389    tr_cpBlockAdd( tor->completion, block );
1390
1391    addPeerToBlamefield( msgs, req->index );
1392
1393    fireGotBlock( msgs, req );
1394
1395    /**
1396    ***  Handle if this was the last block in the piece
1397    **/
1398
1399    if( tr_cpPieceIsComplete( tor->completion, req->index ) )
1400    {
1401        if( tr_ioHash( tor, req->index ) )
1402        {
1403            gotBadPiece( msgs, req->index );
1404            return 0;
1405        }
1406
1407        fireClientHave( msgs, req->index );
1408    }
1409
1410    return 0;
1411}
1412
1413static void
1414didWrite( struct bufferevent * evin UNUSED, void * vmsgs )
1415{
1416    pulse( vmsgs );
1417}
1418
1419static ReadState
1420canRead( struct bufferevent * evin, void * vmsgs )
1421{
1422    ReadState ret;
1423    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1424    struct evbuffer * inbuf = EVBUFFER_INPUT ( evin );
1425    const size_t buflen = EVBUFFER_LENGTH( inbuf );
1426    const size_t downloadMax = getDownloadMax( msgs );
1427    const size_t n = MIN( buflen, downloadMax );
1428
1429    if( !n )
1430    {
1431        ret = READ_DONE;
1432    }
1433    else switch( msgs->state )
1434    {
1435        case AWAITING_BT_LENGTH:  ret = readBtLength ( msgs, inbuf, n ); break;
1436        case AWAITING_BT_ID:      ret = readBtId     ( msgs, inbuf, n ); break;
1437        case AWAITING_BT_MESSAGE: ret = readBtMessage( msgs, inbuf, n ); break;
1438        case AWAITING_BT_PIECE:   ret = readBtPiece  ( msgs, inbuf, n ); break;
1439        default: assert( 0 );
1440    }
1441
1442    return ret;
1443}
1444
1445static void
1446sendKeepalive( tr_peermsgs * msgs )
1447{
1448    dbgmsg( msgs, "sending a keepalive message" );
1449    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1450}
1451
1452/**
1453***
1454**/
1455
1456static int
1457isSwiftEnabled( const tr_peermsgs * msgs )
1458{
1459    /* rationale: SWIFT is good for getting rid of deadbeats, but most
1460     * private trackers have ratios where you _want_ to feed deadbeats
1461     * as much as possible.  So we disable SWIFT on private torrents */
1462    return SWIFT_ENABLED
1463        && !tr_torrentIsSeed( msgs->torrent )
1464        && !tr_torrentIsPrivate( msgs->torrent );
1465}
1466
1467static size_t
1468getUploadMax( const tr_peermsgs * msgs )
1469{
1470    static const size_t maxval = ~0;
1471    const tr_torrent * tor = msgs->torrent;
1472    const int useSwift = isSwiftEnabled( msgs );
1473    const size_t swiftLeft = msgs->info->credit;
1474    size_t speedLeft;
1475    size_t bufLeft;
1476    size_t ret;
1477
1478    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1479        speedLeft = tor->handle->useUploadLimit ? tr_rcBytesLeft( tor->handle->upload ) : maxval;
1480    else if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
1481        speedLeft = tr_rcBytesLeft( tor->upload );
1482    else
1483        speedLeft = ~0;
1484
1485    bufLeft = MAX_OUTBUF_SIZE - tr_peerIoWriteBytesWaiting( msgs->io );
1486    ret = MIN( speedLeft, bufLeft );
1487    if( useSwift)
1488        ret = MIN( ret, swiftLeft );
1489    return ret;
1490}
1491
1492static int
1493ratePulse( void * vmsgs )
1494{
1495    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1496    msgs->info->rateToClient = tr_rcRate( msgs->info->rcToClient );
1497    msgs->info->rateToPeer = tr_rcRate( msgs->info->rcToPeer );
1498    msgs->maxActiveRequests = MIN( 8 + (int)(msgs->info->rateToClient/5), 100 );
1499    msgs->minActiveRequests = msgs->maxActiveRequests / 3;
1500    return TRUE;
1501}
1502
1503static struct peer_request*
1504popNextRequest( tr_peermsgs * msgs )
1505{
1506    struct peer_request * ret;
1507    ret = tr_list_pop_front( &msgs->peerAskedForFast );
1508    if( !ret )
1509        ret = tr_list_pop_front( &msgs->peerAskedFor);
1510    return ret;
1511}
1512
1513static void
1514updatePeerStatus( tr_peermsgs * msgs )
1515{
1516    const time_t now = time( NULL );
1517    tr_peer * peer = msgs->info;
1518    tr_peer_status status = 0;
1519
1520    if( !msgs->peerSentBitfield )
1521        status |= TR_PEER_STATUS_HANDSHAKE;
1522
1523    if( msgs->info->peerIsChoked )
1524        status |= TR_PEER_STATUS_PEER_IS_CHOKED;
1525
1526    if( msgs->info->peerIsInterested )
1527        status |= TR_PEER_STATUS_PEER_IS_INTERESTED;
1528
1529    if( msgs->info->clientIsChoked )
1530        status |= TR_PEER_STATUS_CLIENT_IS_CHOKED;
1531
1532    if( msgs->info->clientIsInterested )
1533        status |= TR_PEER_STATUS_CLIENT_IS_INTERESTED;
1534
1535    if( ( now - msgs->clientSentPieceDataAt ) < 3 )
1536        status |= TR_PEER_STATUS_CLIENT_IS_SENDING;
1537
1538    if( ( now - msgs->peerSentPieceDataAt ) < 3 )
1539        status |= TR_PEER_STATUS_PEER_IS_SENDING;
1540
1541    if( msgs->clientAskedFor != NULL )
1542        status |= TR_PEER_STATUS_CLIENT_SENT_REQUEST;
1543
1544    peer->status = status;
1545}
1546
1547static int
1548pulse( void * vmsgs )
1549{
1550    const time_t now = time( NULL );
1551    tr_peermsgs * msgs = vmsgs;
1552    struct peer_request * r;
1553
1554    tr_peerIoTryRead( msgs->io );
1555    pumpRequestQueue( msgs );
1556    updatePeerStatus( msgs );
1557
1558    if( msgs->sendingBlock )
1559    {
1560        const size_t uploadMax = getUploadMax( msgs );
1561        size_t len = EVBUFFER_LENGTH( msgs->outBlock );
1562        const size_t outlen = MIN( len, uploadMax );
1563
1564        assert( len );
1565
1566        if( outlen )
1567        {
1568            tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen );
1569            evbuffer_drain( msgs->outBlock, outlen );
1570            peerGotBytes( msgs, outlen );
1571
1572            len -= outlen;
1573            msgs->clientSentAnythingAt = now;
1574            msgs->sendingBlock = len!=0;
1575
1576            dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen, (int)len );
1577        }
1578        else dbgmsg( msgs, "stalled writing block... uploadMax %lu, outlen %lu", uploadMax, outlen );
1579    }
1580
1581    if( !msgs->sendingBlock )
1582    {
1583        if(( EVBUFFER_LENGTH( msgs->outMessages ) ))
1584        {
1585            dbgmsg( msgs, "flushing outMessages..." );
1586            tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1587            msgs->clientSentAnythingAt = now;
1588        }
1589        else if( !EVBUFFER_LENGTH( msgs->outBlock )
1590            && (( r = popNextRequest( msgs )))
1591            && requestIsValid( msgs, r )
1592            && tr_cpPieceIsComplete( msgs->torrent->completion, r->index ) )
1593        {
1594            uint8_t * buf = tr_new( uint8_t, r->length );
1595
1596            if( !tr_ioRead( msgs->torrent, r->index, r->offset, r->length, buf ) )
1597            {
1598                tr_peerIo * io = msgs->io;
1599                struct evbuffer * out = msgs->outBlock;
1600
1601                dbgmsg( msgs, "sending block %u:%u->%u", r->index, r->offset, r->length );
1602                tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 2*sizeof(uint32_t) + r->length );
1603                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1604                tr_peerIoWriteUint32( io, out, r->index );
1605                tr_peerIoWriteUint32( io, out, r->offset );
1606                tr_peerIoWriteBytes ( io, out, buf, r->length );
1607                msgs->sendingBlock = 1;
1608            }
1609
1610            tr_free( buf );
1611            tr_free( r );
1612        }
1613        else if( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
1614        {
1615            sendKeepalive( msgs );
1616        }
1617    }
1618
1619    return TRUE; /* loop forever */
1620}
1621
1622static void
1623gotError( struct bufferevent * evbuf UNUSED, short what, void * vmsgs )
1624{
1625    if( what & EVBUFFER_TIMEOUT )
1626        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1627
1628    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) ) {
1629        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1630                what, errno, strerror(errno) );
1631        fireGotError( vmsgs );
1632    }
1633}
1634
1635static void
1636sendBitfield( tr_peermsgs * msgs )
1637{
1638    const tr_bitfield * bitfield = tr_cpPieceBitfield( msgs->torrent->completion );
1639    struct evbuffer * out = msgs->outMessages;
1640
1641    dbgmsg( msgs, "sending peer a bitfield message" );
1642    tr_peerIoWriteUint32( msgs->io, out, sizeof(uint8_t) + bitfield->len );
1643    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1644    tr_peerIoWriteBytes ( msgs->io, out, bitfield->bits, bitfield->len );
1645}
1646
1647/**
1648***
1649**/
1650
1651/* some peers give us error messages if we send
1652   more than this many peers in a single pex message */
1653#define MAX_PEX_DIFFS 200
1654
1655typedef struct
1656{
1657    tr_pex * added;
1658    tr_pex * dropped;
1659    tr_pex * elements;
1660    int addedCount;
1661    int droppedCount;
1662    int elementCount;
1663    int diffCount;
1664}
1665PexDiffs;
1666
1667static void
1668pexAddedCb( void * vpex, void * userData )
1669{
1670    PexDiffs * diffs = (PexDiffs *) userData;
1671    tr_pex * pex = (tr_pex *) vpex;
1672    if( diffs->diffCount < MAX_PEX_DIFFS )
1673    {
1674        diffs->diffCount++;
1675        diffs->added[diffs->addedCount++] = *pex;
1676        diffs->elements[diffs->elementCount++] = *pex;
1677    }
1678}
1679
1680static void
1681pexRemovedCb( void * vpex, void * userData )
1682{
1683    PexDiffs * diffs = (PexDiffs *) userData;
1684    tr_pex * pex = (tr_pex *) vpex;
1685    if( diffs->diffCount < MAX_PEX_DIFFS )
1686    {
1687        diffs->diffCount++;
1688        diffs->dropped[diffs->droppedCount++] = *pex;
1689    }
1690}
1691
1692static void
1693pexElementCb( void * vpex, void * userData )
1694{
1695    PexDiffs * diffs = (PexDiffs *) userData;
1696    tr_pex * pex = (tr_pex *) vpex;
1697    if( diffs->diffCount < MAX_PEX_DIFFS )
1698    {
1699        diffs->diffCount++;
1700        diffs->elements[diffs->elementCount++] = *pex;
1701    }
1702}
1703
1704static void
1705sendPex( tr_peermsgs * msgs )
1706{
1707    if( msgs->peerSupportsPex && tr_torrentIsPexEnabled( msgs->torrent ) )
1708    {
1709        int i;
1710        tr_pex * newPex = NULL;
1711        const int newCount = tr_peerMgrGetPeers( msgs->handle->peerMgr, msgs->torrent->info.hash, &newPex );
1712        PexDiffs diffs;
1713        benc_val_t val, *added, *dropped, *flags;
1714        uint8_t *tmp, *walk;
1715        char * benc;
1716        int bencLen;
1717
1718        /* build the diffs */
1719        diffs.added = tr_new( tr_pex, newCount );
1720        diffs.addedCount = 0;
1721        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1722        diffs.droppedCount = 0;
1723        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1724        diffs.elementCount = 0;
1725        diffs.diffCount = 0;
1726        tr_set_compare( msgs->pex, msgs->pexCount,
1727                        newPex, newCount,
1728                        tr_pexCompare, sizeof(tr_pex),
1729                        pexRemovedCb, pexAddedCb, pexElementCb, &diffs );
1730        dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1731
1732        /* update peer */
1733        tr_free( msgs->pex );
1734        msgs->pex = diffs.elements;
1735        msgs->pexCount = diffs.elementCount;
1736
1737        /* build the pex payload */
1738        tr_bencInit( &val, TYPE_DICT );
1739        tr_bencDictReserve( &val, 3 );
1740
1741        /* "added" */
1742        added = tr_bencDictAdd( &val, "added" );
1743        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1744        for( i=0; i<diffs.addedCount; ++i ) {
1745            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1746            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1747        }
1748        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1749        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
1750
1751        /* "added.f" */
1752        flags = tr_bencDictAdd( &val, "added.f" );
1753        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1754        for( i=0; i<diffs.addedCount; ++i )
1755            *walk++ = diffs.added[i].flags;
1756        assert( ( walk - tmp ) == diffs.addedCount );
1757        tr_bencInitStr( flags, tmp, walk-tmp, FALSE );
1758
1759        /* "dropped" */
1760        dropped = tr_bencDictAdd( &val, "dropped" );
1761        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1762        for( i=0; i<diffs.droppedCount; ++i ) {
1763            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1764            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1765        }
1766        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1767        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
1768
1769        /* write the pex message */
1770        benc = tr_bencSave( &val, &bencLen );
1771        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 2*sizeof(uint8_t) + bencLen );
1772        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_LTEP );
1773        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, msgs->ut_pex_id );
1774        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, benc, bencLen );
1775
1776        /* cleanup */
1777        tr_free( benc );
1778        tr_bencFree( &val );
1779        tr_free( diffs.added );
1780        tr_free( diffs.dropped );
1781        tr_free( newPex );
1782
1783        msgs->clientSentPexAt = time( NULL );
1784    }
1785}
1786
1787static int
1788pexPulse( void * vpeer )
1789{
1790    sendPex( vpeer );
1791    return TRUE;
1792}
1793
1794/**
1795***
1796**/
1797
1798tr_peermsgs*
1799tr_peerMsgsNew( struct tr_torrent * torrent,
1800                struct tr_peer    * info,
1801                tr_delivery_func    func,
1802                void              * userData,
1803                tr_publisher_tag  * setme )
1804{
1805    tr_peermsgs * m;
1806
1807    assert( info != NULL );
1808    assert( info->io != NULL );
1809
1810    m = tr_new0( tr_peermsgs, 1 );
1811    m->publisher = tr_publisherNew( );
1812    m->info = info;
1813    m->handle = torrent->handle;
1814    m->torrent = torrent;
1815    m->io = info->io;
1816    m->info->status = TR_PEER_STATUS_HANDSHAKE;
1817    m->info->clientIsChoked = 1;
1818    m->info->peerIsChoked = 1;
1819    m->info->clientIsInterested = 0;
1820    m->info->peerIsInterested = 0;
1821    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1822    m->state = AWAITING_BT_LENGTH;
1823    m->pulseTimer = tr_timerNew( m->handle, pulse, m, PEER_PULSE_INTERVAL );
1824    m->rateTimer = tr_timerNew( m->handle, ratePulse, m, RATE_PULSE_INTERVAL );
1825    m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL );
1826    m->outMessages = evbuffer_new( );
1827    m->incoming.block = evbuffer_new( );
1828    m->outBlock = evbuffer_new( );
1829    m->peerAllowedPieces = NULL;
1830    m->clientAllowedPieces = NULL;
1831    m->clientSuggestedPieces = NULL;
1832    *setme = tr_publisherSubscribe( m->publisher, func, userData );
1833   
1834    if ( tr_peerIoSupportsFEXT( m->io ) )
1835    {
1836        /* This peer is fastpeer-enabled, generate its allowed set
1837         * (before registering our callbacks) */
1838        if ( !m->peerAllowedPieces ) {
1839            const struct in_addr *peerAddr = tr_peerIoGetAddress( m->io, NULL );
1840           
1841            m->peerAllowedPieces = tr_peerMgrGenerateAllowedSet( MAX_FAST_ALLOWED_COUNT,
1842                                                                 m->torrent->info.pieceCount,
1843                                                                 m->torrent->info.hash,
1844                                                                 peerAddr );
1845        }
1846        m->clientAllowedPieces = tr_bitfieldNew( m->torrent->info.pieceCount );
1847       
1848        m->clientSuggestedPieces = tr_bitfieldNew( m->torrent->info.pieceCount );
1849    }
1850   
1851    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of inactivity */
1852    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
1853    ratePulse( m );
1854
1855    if ( tr_peerIoSupportsLTEP( m->io ) )
1856        sendLtepHandshake( m );
1857
1858    if ( !tr_peerIoSupportsFEXT( m->io ) )
1859        sendBitfield( m );
1860    else {
1861        /* This peer is fastpeer-enabled, send it have-all or have-none if appropriate */
1862        uint32_t peerProgress;
1863        float completion = tr_cpPercentComplete( m->torrent->completion );
1864        if ( completion == 0.0f ) {
1865            sendFastHave( m, 0 );
1866        } else if ( completion == 1.0f ) {
1867            sendFastHave( m, 1 );
1868        } else {
1869            sendBitfield( m );
1870        }
1871        peerProgress = m->torrent->info.pieceCount * m->info->progress;
1872       
1873        if ( peerProgress < MAX_FAST_ALLOWED_COUNT )
1874            sendFastAllowedSet( m );
1875    }
1876
1877    return m;
1878}
1879
1880void
1881tr_peerMsgsFree( tr_peermsgs* msgs )
1882{
1883    if( msgs != NULL )
1884    {
1885        tr_timerFree( &msgs->pulseTimer );
1886        tr_timerFree( &msgs->rateTimer );
1887        tr_timerFree( &msgs->pexTimer );
1888        tr_publisherFree( &msgs->publisher );
1889        tr_list_free( &msgs->clientWillAskFor, tr_free );
1890        tr_list_free( &msgs->clientAskedFor, tr_free );
1891        tr_list_free( &msgs->peerAskedForFast, tr_free );
1892        tr_list_free( &msgs->peerAskedFor, tr_free );
1893        tr_bitfieldFree( msgs->peerAllowedPieces );
1894        tr_bitfieldFree( msgs->clientAllowedPieces );
1895        tr_bitfieldFree( msgs->clientSuggestedPieces );
1896        evbuffer_free( msgs->incoming.block );
1897        evbuffer_free( msgs->outMessages );
1898        evbuffer_free( msgs->outBlock );
1899        tr_free( msgs->pex );
1900
1901        memset( msgs, ~0, sizeof( tr_peermsgs ) );
1902        tr_free( msgs );
1903    }
1904}
1905
1906tr_publisher_tag
1907tr_peerMsgsSubscribe( tr_peermsgs       * peer,
1908                      tr_delivery_func    func,
1909                      void              * userData )
1910{
1911    return tr_publisherSubscribe( peer->publisher, func, userData );
1912}
1913
1914void
1915tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
1916                        tr_publisher_tag    tag )
1917{
1918    tr_publisherUnsubscribe( peer->publisher, tag );
1919}
1920
1921int
1922tr_peerMsgsIsPieceFastAllowed( const tr_peermsgs * peer,
1923                               uint32_t            index )
1924{
1925    return tr_bitfieldHas( peer->clientAllowedPieces, index );
1926}
1927
1928int
1929tr_peerMsgsIsPieceSuggested( const tr_peermsgs * peer,
1930                             uint32_t            index )
1931{
1932    return tr_bitfieldHas( peer->clientSuggestedPieces, index );
1933}
1934
Note: See TracBrowser for help on using the repository browser.