source: trunk/libtransmission/peer-msgs.c @ 3906

Last change on this file since 3906 was 3906, checked in by charles, 15 years ago

tweak the peer limits a bit for faster uploading

  • Property svn:keywords set to Date Rev Author Id
File size: 54.9 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 3906 2007-11-20 17:29:56Z charles $
11 */
12
13#include <assert.h>
14#include <ctype.h>
15#include <errno.h>
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19#include <libgen.h> /* basename */
20
21#include <netinet/in.h> /* struct in_addr */
22
23#include <event.h>
24
25#include "transmission.h"
26#include "bencode.h"
27#include "completion.h"
28#include "inout.h"
29#include "list.h"
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-mgr-private.h"
33#include "peer-msgs.h"
34#include "ratecontrol.h"
35#include "trevent.h"
36#include "utils.h"
37
38/**
39***
40**/
41
42enum
43{
44    BT_CHOKE                = 0,
45    BT_UNCHOKE              = 1,
46    BT_INTERESTED           = 2,
47    BT_NOT_INTERESTED       = 3,
48    BT_HAVE                 = 4,
49    BT_BITFIELD             = 5,
50    BT_REQUEST              = 6,
51    BT_PIECE                = 7,
52    BT_CANCEL               = 8,
53    BT_PORT                 = 9,
54    BT_SUGGEST              = 13,
55    BT_HAVE_ALL             = 14,
56    BT_HAVE_NONE            = 15,
57    BT_REJECT               = 16,
58    BT_ALLOWED_FAST         = 17,
59    BT_LTEP                 = 20,
60
61    LTEP_HANDSHAKE          = 0,
62
63    OUR_LTEP_PEX            = 1,
64
65    MAX_REQUEST_BYTE_COUNT  = (16 * 1024), /* drop requests who want too much */
66
67    /* idle seconds before we send a keepalive */
68    KEEPALIVE_INTERVAL_SECS = 90,
69
70    PEX_INTERVAL            = (60 * 1000), /* msec between calls to sendPex() */
71    PEER_PULSE_INTERVAL     = (100),       /* msec between calls to pulse() */
72    RATE_PULSE_INTERVAL     = (250),       /* msec between calls to ratePulse() */
73
74    MAX_OUTBUF_SIZE         = 4096,
75     
76    /* Fast Peers Extension constants */
77    MAX_FAST_ALLOWED_COUNT   = 10,          /* max. number of pieces we fast-allow to another peer */
78    MAX_FAST_ALLOWED_THRESHOLD = 10,        /* max threshold for allowing fast-pieces requests */
79
80};
81
82enum
83{
84    AWAITING_BT_LENGTH,
85    AWAITING_BT_ID,
86    AWAITING_BT_MESSAGE,
87    AWAITING_BT_PIECE
88};
89
90struct peer_request
91{
92    uint32_t index;
93    uint32_t offset;
94    uint32_t length;
95    time_t time_requested;
96};
97
98static int
99compareRequest( const void * va, const void * vb )
100{
101    int i;
102    const struct peer_request * a = va;
103    const struct peer_request * b = vb;
104    if(( i = tr_compareUint32( a->index, b->index ))) return i;
105    if(( i = tr_compareUint32( a->offset, b->offset ))) return i;
106    if(( i = tr_compareUint32( a->length, b->length ))) return i;
107    return 0;
108}
109
110/* this is raw, unchanged data from the peer regarding
111 * the current message that it's sending us. */
112struct tr_incoming
113{
114    uint32_t length; /* includes the +1 for id length */
115    uint8_t id;
116    struct peer_request blockReq; /* metadata for incoming blocks */
117    struct evbuffer * block; /* piece data for incoming blocks */
118};
119
120struct tr_peermsgs
121{
122    tr_peer * info;
123
124    tr_handle * handle;
125    tr_torrent * torrent;
126    tr_peerIo * io;
127
128    tr_publisher_t * publisher;
129
130    struct evbuffer * outBlock;    /* buffer of all the current piece message */
131    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
132    tr_list * peerAskedFor;
133    tr_list * peerAskedForFast;
134    tr_list * clientAskedFor;
135    tr_list * clientWillAskFor;
136
137    tr_timer * rateTimer;
138    tr_timer * pulseTimer;
139    tr_timer * pexTimer;
140
141    time_t lastReqAddedAt;
142    time_t clientSentPexAt;
143    time_t clientSentAnythingAt;
144
145    unsigned int peerSentBitfield         : 1;
146    unsigned int peerSupportsPex          : 1;
147    unsigned int clientSentLtepHandshake  : 1;
148    unsigned int peerSentLtepHandshake    : 1;
149    unsigned int sendingBlock             : 1;
150   
151    tr_bitfield * clientAllowedPieces;
152    tr_bitfield * peerAllowedPieces;
153   
154    tr_bitfield * clientSuggestedPieces;
155   
156    uint8_t state;
157    uint8_t ut_pex_id;
158    uint16_t pexCount;
159    uint32_t maxActiveRequests;
160    uint32_t minActiveRequests;
161
162    struct tr_incoming incoming; 
163
164    tr_pex * pex;
165};
166
167/**
168***
169**/
170
171static void
172myDebug( const char * file, int line,
173         const struct tr_peermsgs * msgs,
174         const char * fmt, ... )
175{
176    FILE * fp = tr_getLog( );
177    if( fp != NULL )
178    {
179        va_list args;
180        char timestr[64];
181        struct evbuffer * buf = evbuffer_new( );
182        char * myfile = tr_strdup( file );
183
184        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
185                             tr_getLogTimeStr( timestr, sizeof(timestr) ),
186                             msgs->torrent->info.name,
187                             tr_peerIoGetAddrStr( msgs->io ),
188                             msgs->info->client );
189        va_start( args, fmt );
190        evbuffer_add_vprintf( buf, fmt, args );
191        va_end( args );
192        evbuffer_add_printf( buf, " (%s:%d)\n", basename(myfile), line );
193        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
194
195        tr_free( myfile );
196        evbuffer_free( buf );
197    }
198}
199
200#define dbgmsg(msgs, fmt...) myDebug(__FILE__, __LINE__, msgs, ##fmt )
201
202/**
203***
204**/
205
206static void
207protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
208{
209    tr_peerIo * io = msgs->io;
210    struct evbuffer * out = msgs->outMessages;
211
212    dbgmsg( msgs, "requesting %u:%u->%u", req->index, req->offset, req->length );
213    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
214    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
215    tr_peerIoWriteUint32( io, out, req->index );
216    tr_peerIoWriteUint32( io, out, req->offset );
217    tr_peerIoWriteUint32( io, out, req->length );
218}
219
220static void
221protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
222{
223    tr_peerIo * io = msgs->io;
224    struct evbuffer * out = msgs->outMessages;
225
226    dbgmsg( msgs, "cancelling %u:%u->%u", req->index, req->offset, req->length );
227    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
228    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
229    tr_peerIoWriteUint32( io, out, req->index );
230    tr_peerIoWriteUint32( io, out, req->offset );
231    tr_peerIoWriteUint32( io, out, req->length );
232}
233
234static void
235protocolSendHave( tr_peermsgs * msgs, uint32_t index )
236{
237    tr_peerIo * io = msgs->io;
238    struct evbuffer * out = msgs->outMessages;
239
240    dbgmsg( msgs, "sending Have %u", index );
241    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
242    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
243    tr_peerIoWriteUint32( io, out, index );
244}
245
246static void
247protocolSendChoke( tr_peermsgs * msgs, int choke )
248{
249    tr_peerIo * io = msgs->io;
250    struct evbuffer * out = msgs->outMessages;
251
252    dbgmsg( msgs, "sending %s", (choke ? "Choke" : "Unchoke") );
253    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) );
254    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
255}
256
257/**
258***  EVENTS
259**/
260
261static const tr_peermsgs_event blankEvent = { 0, 0, 0, 0, 0.0f };
262
263static void
264publish( tr_peermsgs * msgs, tr_peermsgs_event * e )
265{
266    tr_publisherPublish( msgs->publisher, msgs->info, e );
267}
268
269static void
270fireGotAssertError( tr_peermsgs * msgs )
271{
272    tr_peermsgs_event e = blankEvent;
273    e.eventType = TR_PEERMSG_GOT_ASSERT_ERROR;
274    publish( msgs, &e );
275}
276
277static void
278fireGotError( tr_peermsgs * msgs )
279{
280    tr_peermsgs_event e = blankEvent;
281    e.eventType = TR_PEERMSG_GOT_ERROR;
282    publish( msgs, &e );
283}
284
285static void
286fireNeedReq( tr_peermsgs * msgs )
287{
288    tr_peermsgs_event e = blankEvent;
289    e.eventType = TR_PEERMSG_NEED_REQ;
290    publish( msgs, &e );
291}
292
293static void
294firePeerProgress( tr_peermsgs * msgs )
295{
296    tr_peermsgs_event e = blankEvent;
297    e.eventType = TR_PEERMSG_PEER_PROGRESS;
298    e.progress = msgs->info->progress;
299    publish( msgs, &e );
300}
301
302static void
303fireClientHave( tr_peermsgs * msgs, uint32_t pieceIndex )
304{
305    tr_peermsgs_event e = blankEvent;
306    e.eventType = TR_PEERMSG_CLIENT_HAVE;
307    e.pieceIndex = pieceIndex;
308    publish( msgs, &e );
309}
310
311static void
312fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
313{
314    tr_peermsgs_event e = blankEvent;
315    e.eventType = TR_PEERMSG_CLIENT_BLOCK;
316    e.pieceIndex = req->index;
317    e.offset = req->offset;
318    e.length = req->length;
319    publish( msgs, &e );
320}
321
322static void
323fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
324{
325    tr_peermsgs_event e = blankEvent;
326    e.eventType = TR_PEERMSG_CANCEL;
327    e.pieceIndex = req->index;
328    e.offset = req->offset;
329    e.length = req->length;
330    publish( msgs, &e );
331}
332
333/**
334***  INTEREST
335**/
336
337static int
338isPieceInteresting( const tr_peermsgs   * peer,
339                    int                   piece )
340{
341    const tr_torrent * torrent = peer->torrent;
342    if( torrent->info.pieces[piece].dnd ) /* we don't want it */
343        return FALSE;
344    if( tr_cpPieceIsComplete( torrent->completion, piece ) ) /* we have it */
345        return FALSE;
346    if( !tr_bitfieldHas( peer->info->have, piece ) ) /* peer doesn't have it */
347        return FALSE;
348    if( tr_bitfieldHas( peer->info->banned, piece ) ) /* peer is banned */
349        return FALSE;
350    return TRUE;
351}
352
353/* "interested" means we'll ask for piece data if they unchoke us */
354static int
355isPeerInteresting( const tr_peermsgs * msgs )
356{
357    int i;
358    const tr_torrent * torrent;
359    const tr_bitfield * bitfield;
360    const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
361
362    if( clientIsSeed )
363        return FALSE;
364
365    torrent = msgs->torrent;
366    bitfield = tr_cpPieceBitfield( torrent->completion );
367
368    if( !msgs->info->have )
369        return TRUE;
370
371    assert( bitfield->len == msgs->info->have->len );
372    for( i=0; i<torrent->info.pieceCount; ++i )
373        if( isPieceInteresting( msgs, i ) )
374            return TRUE;
375
376    return FALSE;
377}
378
379static void
380sendInterest( tr_peermsgs * msgs, int weAreInterested )
381{
382    assert( msgs != NULL );
383    assert( weAreInterested==0 || weAreInterested==1 );
384
385    msgs->info->clientIsInterested = weAreInterested;
386    dbgmsg( msgs, "Sending %s",
387            weAreInterested ? "Interested" : "Not Interested");
388
389    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
390    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages,
391                   weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
392}
393
394static void
395updateInterest( tr_peermsgs * msgs )
396{
397    const int i = isPeerInteresting( msgs );
398    if( i != msgs->info->clientIsInterested )
399        sendInterest( msgs, i );
400    if( i )
401        fireNeedReq( msgs );
402}
403
404#define MIN_CHOKE_PERIOD_SEC 10
405
406static void
407cancelAllRequestsToClientExceptFast( tr_peermsgs * msgs )
408{
409    tr_list_free( &msgs->peerAskedFor, tr_free );
410}
411
412void
413tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
414{
415    const time_t fibrillationTime = time(NULL) - MIN_CHOKE_PERIOD_SEC;
416
417    assert( msgs != NULL );
418    assert( msgs->info != NULL );
419    assert( choke==0 || choke==1 );
420
421    if( msgs->info->chokeChangedAt > fibrillationTime )
422    {
423        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
424    }
425    else if( msgs->info->peerIsChoked != choke )
426    {
427        msgs->info->peerIsChoked = choke;
428        if( choke )
429            cancelAllRequestsToClientExceptFast( msgs );
430        protocolSendChoke( msgs, choke );
431        msgs->info->chokeChangedAt = time( NULL );
432    }
433}
434
435/**
436***
437**/
438
439void
440tr_peerMsgsHave( tr_peermsgs * msgs,
441                 uint32_t      index )
442{
443    protocolSendHave( msgs, index );
444
445    /* since we have more pieces now, we might not be interested in this peer */
446    updateInterest( msgs );
447}
448#if 0
449static void
450sendFastSuggest( tr_peermsgs * msgs,
451                 uint32_t      pieceIndex )
452{
453    assert( msgs != NULL );
454   
455    if( tr_peerIoSupportsFEXT( msgs->io ) )
456    {
457        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
458        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
459        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
460    }
461}
462#endif
463static void
464sendFastHave( tr_peermsgs * msgs, int all )
465{
466    assert( msgs != NULL );
467   
468    if( tr_peerIoSupportsFEXT( msgs->io ) )
469    {
470        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
471        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, ( all ? BT_HAVE_ALL
472                                                                : BT_HAVE_NONE ) );
473        updateInterest( msgs );
474    }
475}
476
477static void
478sendFastReject( tr_peermsgs * msgs,
479                uint32_t      pieceIndex,
480                uint32_t      offset,
481                uint32_t      length )
482{
483    assert( msgs != NULL );
484
485    if( tr_peerIoSupportsFEXT( msgs->io ) )
486    {
487        const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
488        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
489        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_REJECT );
490        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
491        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
492        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
493    }
494}
495
496static void
497sendFastAllowed( tr_peermsgs * msgs,
498                 uint32_t      pieceIndex)
499{
500    assert( msgs != NULL );
501   
502    if( tr_peerIoSupportsFEXT( msgs->io ) )
503    {
504        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
505        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_ALLOWED_FAST );
506        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
507    }
508}
509
510
511
512static void
513sendFastAllowedSet( tr_peermsgs * msgs )
514{
515    int i = 0;
516    while (i <= msgs->torrent->info.pieceCount )
517    {
518        if ( tr_bitfieldHas( msgs->peerAllowedPieces, i) )
519            sendFastAllowed( msgs, i );
520        i++;
521    }
522}
523
524
525/**
526***
527**/
528
529static int
530reqIsValid( const tr_peermsgs   * msgs,
531            uint32_t              index,
532            uint32_t              offset,
533            uint32_t              length )
534{
535    const tr_torrent * tor = msgs->torrent;
536
537    if( index >= (uint32_t) tor->info.pieceCount )
538        return FALSE;
539    if ( (int)offset >= tr_torPieceCountBytes( tor, (int)index ) )
540        return FALSE;
541    if( length > MAX_REQUEST_BYTE_COUNT )
542        return FALSE;
543    if( tr_pieceOffset( tor, index, offset, length ) > tor->info.totalSize )
544        return FALSE;
545
546    return TRUE;
547}
548
549static int
550requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
551{
552    return reqIsValid( msgs, req->index, req->offset, req->length );
553}
554
555static void
556pumpRequestQueue( tr_peermsgs * msgs )
557{
558    const int max = msgs->maxActiveRequests;
559    const int min = msgs->minActiveRequests;
560    int count = tr_list_size( msgs->clientAskedFor );
561    int sent = 0;
562
563    if( count > min )
564        return;
565    if( msgs->info->clientIsChoked )
566        return;
567
568    while( ( count < max ) && ( msgs->clientWillAskFor != NULL ) )
569    {
570        struct peer_request * r = tr_list_pop_front( &msgs->clientWillAskFor );
571        assert( requestIsValid( msgs, r ) );
572        assert( tr_bitfieldHas( msgs->info->have, r->index ) );
573        protocolSendRequest( msgs, r );
574        r->time_requested = msgs->lastReqAddedAt = time( NULL );
575        tr_list_append( &msgs->clientAskedFor, r );
576        ++count;
577        ++sent;
578    }
579
580    if( sent )
581        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
582                sent,
583                tr_list_size(msgs->clientAskedFor),
584                tr_list_size(msgs->clientWillAskFor) );
585
586    if( count < max )
587        fireNeedReq( msgs );
588}
589
590static int
591pulse( void * vmsgs );
592
593int
594tr_peerMsgsAddRequest( tr_peermsgs * msgs,
595                       uint32_t      index, 
596                       uint32_t      offset, 
597                       uint32_t      length )
598{
599    const int req_max = msgs->maxActiveRequests;
600    struct peer_request tmp, *req;
601
602    assert( msgs != NULL );
603    assert( msgs->torrent != NULL );
604    assert( reqIsValid( msgs, index, offset, length ) );
605
606    /**
607    ***  Reasons to decline the request
608    **/
609
610    /* don't send requests to choked clients */
611    if( msgs->info->clientIsChoked ) {
612        dbgmsg( msgs, "declining request because they're choking us" );
613        return TR_ADDREQ_CLIENT_CHOKED;
614    }
615
616    /* peer doesn't have this piece */
617    if( !tr_bitfieldHas( msgs->info->have, index ) )
618        return TR_ADDREQ_MISSING;
619
620    /* peer's queue is full */
621    if( tr_list_size( msgs->clientWillAskFor ) >= req_max ) {
622        dbgmsg( msgs, "declining request because we're full" );
623        return TR_ADDREQ_FULL;
624    }
625
626    /* have we already asked for this piece? */
627    tmp.index = index;
628    tmp.offset = offset;
629    tmp.length = length;
630    if( tr_list_find( msgs->clientAskedFor, &tmp, compareRequest ) ) {
631        dbgmsg( msgs, "declining because it's a duplicate" );
632        return TR_ADDREQ_DUPLICATE;
633    }
634    if( tr_list_find( msgs->clientWillAskFor, &tmp, compareRequest ) ) {
635        dbgmsg( msgs, "declining because it's a duplicate" );
636        return TR_ADDREQ_DUPLICATE;
637    }
638
639    /**
640    ***  Accept this request
641    **/
642
643    dbgmsg( msgs, "added req for piece %d, offset %d", (int)index, (int)offset );
644    req = tr_new0( struct peer_request, 1 );
645    *req = tmp;
646    tr_list_append( &msgs->clientWillAskFor, req );
647    return TR_ADDREQ_OK;
648}
649
650static void
651cancelAllRequestsToPeer( tr_peermsgs * msgs )
652{
653    struct peer_request * req;
654
655    while(( req = tr_list_pop_front( &msgs->clientWillAskFor ) ))
656    {
657        fireCancelledReq( msgs, req );
658        tr_free( req );
659    }
660
661    while(( req = tr_list_pop_front( &msgs->clientAskedFor ) ))
662    {
663        fireCancelledReq( msgs, req );
664        protocolSendCancel( msgs, req );
665        tr_free( req );
666    }
667}
668
669void
670tr_peerMsgsCancel( tr_peermsgs * msgs,
671                   uint32_t      pieceIndex,
672                   uint32_t      offset,
673                   uint32_t      length )
674{
675    struct peer_request *req, tmp;
676
677    assert( msgs != NULL );
678    assert( length > 0 );
679
680    /* have we asked the peer for this piece? */
681    tmp.index = pieceIndex;
682    tmp.offset = offset;
683    tmp.length = length;
684
685    /* if it's only in the queue and hasn't been sent yet, free it */
686    if(( req = tr_list_remove( &msgs->clientWillAskFor, &tmp, compareRequest ) ))
687    {
688        fireCancelledReq( msgs, req );
689        tr_free( req );
690    }
691
692    /* if it's already been sent, send a cancel message too */
693    if(( req = tr_list_remove( &msgs->clientAskedFor, &tmp, compareRequest ) ))
694    {
695        protocolSendCancel( msgs, req );
696        fireCancelledReq( msgs, req );
697        tr_free( req );
698    }
699}
700
701/**
702***
703**/
704
705static void
706sendLtepHandshake( tr_peermsgs * msgs )
707{
708    benc_val_t val, *m;
709    char * buf;
710    int len;
711    int pex;
712    const char * v = TR_NAME " " USERAGENT_PREFIX;
713    const int port = tr_getPublicPort( msgs->handle );
714    struct evbuffer * outbuf;
715
716    if( msgs->clientSentLtepHandshake )
717        return;
718
719    outbuf = evbuffer_new( );
720    dbgmsg( msgs, "sending an ltep handshake" );
721    msgs->clientSentLtepHandshake = 1;
722
723    /* decide if we want to advertise pex support */
724    if( !tr_torrentIsPexEnabled( msgs->torrent ) )
725        pex = 0;
726    else if( msgs->peerSentLtepHandshake )
727        pex = msgs->peerSupportsPex ? 1 : 0;
728    else
729        pex = 1;
730
731    tr_bencInit( &val, TYPE_DICT );
732    tr_bencDictReserve( &val, 4 );
733    tr_bencInitInt( tr_bencDictAdd( &val, "e" ), 1 );
734    m  = tr_bencDictAdd( &val, "m" );
735    tr_bencInit( m, TYPE_DICT );
736    if( pex ) {
737        tr_bencDictReserve( m, 1 );
738        tr_bencInitInt( tr_bencDictAdd( m, "ut_pex" ), OUR_LTEP_PEX );
739    }
740    if( port > 0 )
741        tr_bencInitInt( tr_bencDictAdd( &val, "p" ), port );
742    tr_bencInitStr( tr_bencDictAdd( &val, "v" ), v, 0, 1 );
743    buf = tr_bencSave( &val, &len );
744
745    tr_peerIoWriteUint32( msgs->io, outbuf, 2*sizeof(uint8_t) + len );
746    tr_peerIoWriteUint8 ( msgs->io, outbuf, BT_LTEP );
747    tr_peerIoWriteUint8 ( msgs->io, outbuf, LTEP_HANDSHAKE );
748    tr_peerIoWriteBytes ( msgs->io, outbuf, buf, len );
749
750    tr_peerIoWriteBuf( msgs->io, outbuf );
751
752#if 0
753    dbgmsg( msgs, "here is the ltep handshake we sent:" );
754    tr_bencPrint( &val );
755    dbgmsg( msgs, "here is the ltep handshake we read [%s]:", tr_bencSave( &val, NULL ) );
756#endif
757
758    /* cleanup */
759    tr_bencFree( &val );
760    tr_free( buf );
761    evbuffer_free( outbuf );
762}
763
764static void
765parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
766{
767    benc_val_t val, * sub;
768    uint8_t * tmp = tr_new( uint8_t, len );
769
770    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
771    msgs->peerSentLtepHandshake = 1;
772
773    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
774        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
775        tr_free( tmp );
776        return;
777    }
778
779#if 0
780    dbgmsg( msgs, "here is the ltep handshake we read:" );
781    tr_bencPrint( &val );
782    dbgmsg( msgs, "here is the ltep handshake we read [%s]:", tr_bencSave( &val, NULL ) );
783#endif
784
785    /* does the peer prefer encrypted connections? */
786    sub = tr_bencDictFind( &val, "e" );
787    if( tr_bencIsInt( sub ) )
788        msgs->info->encryption_preference = sub->val.i
789                                      ? ENCRYPTION_PREFERENCE_YES
790                                      : ENCRYPTION_PREFERENCE_NO;
791
792    /* check supported messages for utorrent pex */
793    sub = tr_bencDictFind( &val, "m" );
794    if( tr_bencIsDict( sub ) ) {
795        sub = tr_bencDictFind( sub, "ut_pex" );
796        if( tr_bencIsInt( sub ) ) {
797            msgs->peerSupportsPex = 1;
798            msgs->ut_pex_id = (uint8_t) sub->val.i;
799            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
800        }
801    }
802
803    /* get peer's listening port */
804    sub = tr_bencDictFind( &val, "p" );
805    if( tr_bencIsInt( sub ) ) {
806        msgs->info->port = htons( (uint16_t)sub->val.i );
807        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
808    }
809
810    tr_bencFree( &val );
811    tr_free( tmp );
812}
813
814static void
815parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
816{
817    benc_val_t val, * sub;
818    uint8_t * tmp;
819
820    if( !tr_torrentIsPexEnabled( msgs->torrent ) ) /* no sharing! */
821        return;
822
823    tmp = tr_new( uint8_t, msglen );
824    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
825
826    if( tr_bencLoad( tmp, msglen, &val, NULL ) || !tr_bencIsDict( &val ) ) {
827        dbgmsg( msgs, "GET can't read extended-pex dictionary" );
828        tr_free( tmp );
829        return;
830    }
831
832    sub = tr_bencDictFind( &val, "added" );
833    if( tr_bencIsStr(sub) && ((sub->val.s.i % 6) == 0)) {
834        const int n = sub->val.s.i / 6 ;
835        dbgmsg( msgs, "got %d peers from uT pex", n );
836        tr_peerMgrAddPeers( msgs->handle->peerMgr,
837                            msgs->torrent->info.hash,
838                            TR_PEER_FROM_PEX,
839                            (uint8_t*)sub->val.s.s, n );
840    }
841
842    tr_bencFree( &val );
843    tr_free( tmp );
844}
845
846static void
847sendPex( tr_peermsgs * msgs );
848
849static void
850parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
851{
852    uint8_t ltep_msgid;
853
854    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
855    msglen--;
856
857    if( ltep_msgid == LTEP_HANDSHAKE )
858    {
859        dbgmsg( msgs, "got ltep handshake" );
860        parseLtepHandshake( msgs, msglen, inbuf );
861        if( tr_peerIoSupportsLTEP( msgs->io ) )
862        {
863            sendLtepHandshake( msgs );
864            sendPex( msgs );
865        }
866    }
867    else if( ltep_msgid == msgs->ut_pex_id )
868    {
869        dbgmsg( msgs, "got ut pex" );
870        msgs->peerSupportsPex = 1;
871        parseUtPex( msgs, msglen, inbuf );
872    }
873    else
874    {
875        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
876        evbuffer_drain( inbuf, msglen );
877    }
878}
879
880static int
881readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
882{
883    uint32_t len;
884
885    if( inlen < sizeof(len) )
886        return READ_MORE;
887
888    tr_peerIoReadUint32( msgs->io, inbuf, &len );
889
890    if( len == 0 ) /* peer sent us a keepalive message */
891        dbgmsg( msgs, "got KeepAlive" );
892    else {
893        msgs->incoming.length = len;
894        msgs->state = AWAITING_BT_ID;
895    }
896
897    return READ_AGAIN;
898}
899
900static int
901readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen );
902
903static int
904readBtId( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
905{
906    uint8_t id;
907
908    if( inlen < sizeof(uint8_t) )
909        return READ_MORE;
910
911    tr_peerIoReadUint8( msgs->io, inbuf, &id );
912    msgs->incoming.id = id;
913
914    if( id==BT_PIECE )
915    {
916        msgs->state = AWAITING_BT_PIECE;
917        return READ_AGAIN;
918    }
919    else if( msgs->incoming.length != 1 )
920    {
921        msgs->state = AWAITING_BT_MESSAGE;
922        return READ_AGAIN;
923    }
924    else return readBtMessage( msgs, inbuf, inlen-1 );
925}
926
927static void
928updatePeerProgress( tr_peermsgs * msgs )
929{
930    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
931    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
932    updateInterest( msgs );
933    firePeerProgress( msgs );
934}
935
936static int
937clientCanSendFastBlock( const tr_peermsgs * msgs UNUSED )
938{
939    /* We can't send a fast piece if a peer has more than MAX_FAST_ALLOWED_THRESHOLD pieces */
940    if( tr_bitfieldCountTrueBits( msgs->info->have ) > MAX_FAST_ALLOWED_THRESHOLD )
941        return FALSE;
942   
943    /* ...or if we don't have ourself enough pieces */
944    if( tr_bitfieldCountTrueBits( tr_cpPieceBitfield( msgs->torrent->completion ) ) < MAX_FAST_ALLOWED_THRESHOLD )
945        return FALSE;
946
947    /* Maybe a bandwidth limit ? */
948    return TRUE;
949}
950
951static void
952peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
953{
954    const int reqIsValid = requestIsValid( msgs, req );
955    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
956    const int peerIsChoked = msgs->info->peerIsChoked;
957    const int peerIsFast = tr_peerIoSupportsFEXT( msgs->io );
958    const int pieceIsFast = reqIsValid && tr_bitfieldHas( msgs->peerAllowedPieces, req->index );
959    const int canSendFast = clientCanSendFastBlock( msgs );
960
961    if( !reqIsValid ) /* bad request */
962    {
963        dbgmsg( msgs, "rejecting an invalid request." );
964        sendFastReject( msgs, req->index, req->offset, req->length );
965    }
966    else if( !clientHasPiece ) /* we don't have it */
967    {
968        dbgmsg( msgs, "rejecting request for a piece we don't have." );
969        sendFastReject( msgs, req->index, req->offset, req->length );
970    }
971    else if( peerIsChoked && !peerIsFast ) /* doesn't he know he's choked? */
972    {
973        tr_peerMsgsSetChoke( msgs, 1 );
974        sendFastReject( msgs, req->index, req->offset, req->length );
975    }
976    else if( peerIsChoked && peerIsFast && ( !pieceIsFast || !canSendFast ) )
977    {
978        sendFastReject( msgs, req->index, req->offset, req->length );
979    }
980    else /* YAY */
981    {
982        struct peer_request * tmp = tr_new( struct peer_request, 1 );
983        *tmp = *req;
984        if( peerIsFast && pieceIsFast )
985            tr_list_append( &msgs->peerAskedForFast, tmp );
986        else
987            tr_list_append( &msgs->peerAskedFor, tmp );
988    }
989}
990
991static int
992messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
993{
994    switch( id )
995    {
996        case BT_CHOKE:
997        case BT_UNCHOKE:
998        case BT_INTERESTED:
999        case BT_NOT_INTERESTED:
1000        case BT_HAVE_ALL:
1001        case BT_HAVE_NONE:
1002            return len==1;
1003
1004        case BT_HAVE:
1005        case BT_SUGGEST:
1006        case BT_ALLOWED_FAST:
1007            return len==5;
1008
1009        case BT_BITFIELD:
1010            return len == (msg->torrent->info.pieceCount+7u)/8u + 1u;
1011       
1012        case BT_REQUEST:
1013        case BT_CANCEL:
1014        case BT_REJECT:
1015            return len==13;
1016
1017        case BT_PIECE:
1018            return len>9 && len<=16393;
1019
1020        case BT_PORT:
1021            return len==3;
1022
1023        case BT_LTEP:
1024            return len >= 2;
1025
1026        default:
1027            return FALSE;
1028    }
1029}
1030
1031static int
1032clientGotBlock( tr_peermsgs * msgs, const uint8_t * block, const struct peer_request * req );
1033
1034static void
1035clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1036{
1037    tr_torrent * tor = msgs->torrent;
1038    tor->activityDate = tr_date( );
1039    tor->downloadedCur += byteCount;
1040    msgs->info->pieceDataActivityDate = time( NULL );
1041    msgs->info->credit += (int)(byteCount * SWIFT_REPAYMENT_RATIO);
1042    tr_rcTransferred( msgs->info->rcToClient, byteCount );
1043    tr_rcTransferred( tor->download, byteCount );
1044    tr_rcTransferred( tor->handle->download, byteCount );
1045}
1046
1047
1048static int
1049readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1050{
1051    struct peer_request * req = &msgs->incoming.blockReq;
1052    assert( EVBUFFER_LENGTH(inbuf) >= inlen );
1053    dbgmsg( msgs, "In readBtPiece" );
1054
1055    if( !req->length )
1056    {
1057        if( inlen < 8 )
1058            return READ_MORE;
1059
1060        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
1061        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
1062        req->length = msgs->incoming.length - 9;
1063        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1064        return READ_AGAIN;
1065    }
1066    else
1067    {
1068        int err;
1069
1070        /* read in another chunk of data */
1071        const size_t nLeft = req->length - EVBUFFER_LENGTH(msgs->incoming.block);
1072        size_t n = MIN( nLeft, inlen );
1073        uint8_t * buf = tr_new( uint8_t, n );
1074        assert( EVBUFFER_LENGTH(inbuf) >= n );
1075        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
1076        evbuffer_add( msgs->incoming.block, buf, n );
1077        clientGotBytes( msgs, n );
1078        tr_free( buf );
1079        dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
1080               (int)n, req->index, req->offset, req->length,
1081               (int)( req->length - EVBUFFER_LENGTH(msgs->incoming.block) ) );
1082        if( EVBUFFER_LENGTH(msgs->incoming.block) < req->length )
1083            return READ_MORE;
1084
1085        /* we've got the whole block ... process it */
1086        err = clientGotBlock( msgs, EVBUFFER_DATA(msgs->incoming.block), req );
1087
1088        /* cleanup */
1089        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH(msgs->incoming.block) );
1090        req->length = 0;
1091        msgs->state = AWAITING_BT_LENGTH;
1092        if( !err )
1093            return READ_AGAIN;
1094        else {
1095            fireGotAssertError( msgs );
1096            return READ_DONE;
1097        }
1098    }
1099}
1100
1101static int
1102readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1103{
1104    uint32_t ui32;
1105    uint32_t msglen = msgs->incoming.length;
1106    const uint8_t id = msgs->incoming.id;
1107    const size_t startBufLen = EVBUFFER_LENGTH( inbuf );
1108
1109    --msglen; // id length
1110
1111    if( inlen < msglen )
1112        return READ_MORE;
1113
1114    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id, (int)msglen, (int)inlen );
1115
1116    if( !messageLengthIsCorrect( msgs, id, msglen+1 ) )
1117    {
1118        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1119        fireGotError( msgs );
1120        return READ_DONE;
1121    }
1122
1123    switch( id )
1124    {
1125        case BT_CHOKE:
1126            dbgmsg( msgs, "got Choke" );
1127            msgs->info->clientIsChoked = 1;
1128            cancelAllRequestsToPeer( msgs );
1129            cancelAllRequestsToClientExceptFast( msgs );
1130            break;
1131
1132        case BT_UNCHOKE:
1133            dbgmsg( msgs, "got Unchoke" );
1134            msgs->info->clientIsChoked = 0;
1135            fireNeedReq( msgs );
1136            break;
1137
1138        case BT_INTERESTED:
1139            dbgmsg( msgs, "got Interested" );
1140            msgs->info->peerIsInterested = 1;
1141            tr_peerMsgsSetChoke( msgs, 0 );
1142            break;
1143
1144        case BT_NOT_INTERESTED:
1145            dbgmsg( msgs, "got Not Interested" );
1146            msgs->info->peerIsInterested = 0;
1147            break;
1148           
1149        case BT_HAVE:
1150            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1151            dbgmsg( msgs, "got Have: %u", ui32 );
1152            tr_bitfieldAdd( msgs->info->have, ui32 );
1153            /* If this is a fast-allowed piece for this peer, mark it as normal now */
1154            if( msgs->clientAllowedPieces != NULL && tr_bitfieldHas( msgs->clientAllowedPieces, ui32 ) )
1155                tr_bitfieldRem( msgs->clientAllowedPieces, ui32 );
1156            updatePeerProgress( msgs );
1157            tr_rcTransferred( msgs->torrent->swarmspeed, msgs->torrent->info.pieceSize );
1158            break;
1159
1160        case BT_BITFIELD: {
1161            const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
1162            dbgmsg( msgs, "got a bitfield" );
1163            msgs->peerSentBitfield = 1;
1164            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
1165            updatePeerProgress( msgs );
1166            tr_peerMsgsSetChoke( msgs, !clientIsSeed || (msgs->info->progress<1.0) );
1167            fireNeedReq( msgs );
1168            break;
1169        }
1170
1171        case BT_REQUEST: {
1172            struct peer_request req;
1173            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1174            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1175            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1176            dbgmsg( msgs, "got Request: %u:%u->%u", req.index, req.offset, req.length );
1177            peerMadeRequest( msgs, &req );
1178            break;
1179        }
1180
1181        case BT_CANCEL: {
1182            struct peer_request req;
1183            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1184            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1185            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1186            dbgmsg( msgs, "got a Cancel %u:%u->%u", req.index, req.offset, req.length );
1187            tr_free( tr_list_remove( &msgs->peerAskedForFast, &req, compareRequest ) );
1188            tr_free( tr_list_remove( &msgs->peerAskedFor, &req, compareRequest ) );
1189            break;
1190        }
1191
1192        case BT_PIECE:
1193            assert( 0 ); /* should be handled elsewhere! */
1194            break;
1195       
1196        case BT_PORT:
1197            dbgmsg( msgs, "Got a BT_PORT" );
1198            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1199            break;
1200           
1201        case BT_SUGGEST: {
1202            dbgmsg( msgs, "Got a BT_SUGGEST" );
1203            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1204            const tr_bitfield * bt = tr_cpPieceBitfield( msgs->torrent->completion );
1205            if( tr_bitfieldHas( bt, ui32 ) )
1206                tr_bitfieldAdd( msgs->clientSuggestedPieces, ui32 );
1207            break;
1208        }
1209           
1210        case BT_HAVE_ALL:
1211            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1212            tr_bitfieldAddRange( msgs->info->have, 0, msgs->torrent->info.pieceCount );
1213            updatePeerProgress( msgs );
1214            break;
1215       
1216       
1217        case BT_HAVE_NONE:
1218            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1219            tr_bitfieldClear( msgs->info->have );
1220            updatePeerProgress( msgs );
1221            break;
1222       
1223        case BT_REJECT: {
1224            struct peer_request req;
1225            dbgmsg( msgs, "Got a BT_REJECT" );
1226            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1227            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1228            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1229            tr_free( tr_list_remove( &msgs->clientAskedFor, &req, compareRequest ) );
1230            break;
1231        }
1232
1233        case BT_ALLOWED_FAST: {
1234            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1235            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1236            tr_bitfieldAdd( msgs->clientAllowedPieces, ui32 );
1237            break;
1238        }
1239
1240        case BT_LTEP:
1241            dbgmsg( msgs, "Got a BT_LTEP" );
1242            parseLtep( msgs, msglen, inbuf );
1243            break;
1244
1245        default:
1246            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1247            tr_peerIoDrain( msgs->io, inbuf, msglen );
1248            break;
1249    }
1250
1251    assert( msglen + 1 == msgs->incoming.length );
1252    assert( EVBUFFER_LENGTH(inbuf) == startBufLen - msglen );
1253
1254    msgs->state = AWAITING_BT_LENGTH;
1255    return READ_AGAIN;
1256}
1257
1258static void
1259peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1260{
1261    tr_torrent * tor = msgs->torrent;
1262    tor->activityDate = tr_date( );
1263    tor->uploadedCur += byteCount;
1264    msgs->info->pieceDataActivityDate = time( NULL );
1265    msgs->info->credit -= byteCount;
1266    tr_rcTransferred( msgs->info->rcToPeer, byteCount );
1267    tr_rcTransferred( tor->upload, byteCount );
1268    tr_rcTransferred( tor->handle->upload, byteCount );
1269}
1270
1271static size_t
1272getDownloadMax( const tr_peermsgs * msgs )
1273{
1274    static const size_t maxval = ~0;
1275    const tr_torrent * tor = msgs->torrent;
1276
1277    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1278        return tor->handle->useDownloadLimit
1279            ? tr_rcBytesLeft( tor->handle->download ) : maxval;
1280
1281    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
1282        return tr_rcBytesLeft( tor->download );
1283
1284    return maxval;
1285}
1286
1287static void
1288reassignBytesToCorrupt( tr_peermsgs * msgs, uint32_t byteCount )
1289{
1290    tr_torrent * tor = msgs->torrent;
1291
1292    /* increment the `corrupt' field */
1293    tor->corruptCur += byteCount;
1294
1295    /* decrement the `downloaded' field */
1296    if( tor->downloadedCur >= byteCount )
1297        tor->downloadedCur -= byteCount;
1298    else
1299        tor->downloadedCur = 0;
1300}
1301
1302
1303static void
1304gotBadPiece( tr_peermsgs * msgs, uint32_t pieceIndex )
1305{
1306    const uint32_t byteCount =
1307        tr_torPieceCountBytes( msgs->torrent, (int)pieceIndex );
1308    reassignBytesToCorrupt( msgs, byteCount );
1309}
1310
1311static void
1312clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1313{
1314    reassignBytesToCorrupt( msgs, req->length );
1315}
1316
1317static void
1318addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1319{
1320    if( !msgs->info->blame )
1321         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1322    tr_bitfieldAdd( msgs->info->blame, index );
1323}
1324
1325static int
1326clientGotBlock( tr_peermsgs                * msgs,
1327                const uint8_t              * data,
1328                const struct peer_request  * req )
1329{
1330    int i;
1331    tr_torrent * tor = msgs->torrent;
1332    const int block = _tr_block( tor, req->index, req->offset );
1333    struct peer_request *myreq;
1334
1335    assert( msgs != NULL );
1336    assert( req != NULL );
1337
1338    if( req->length != (uint32_t)tr_torBlockCountBytes( msgs->torrent, block ) )
1339    {
1340        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1341                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1342        return TR_ERROR_ASSERT;
1343    }
1344
1345    /* save the block */
1346    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1347
1348    /**
1349    *** Remove the block from our `we asked for this' list
1350    **/
1351
1352    myreq = tr_list_remove( &msgs->clientAskedFor, req, compareRequest );
1353    if( myreq == NULL ) {
1354        clientGotUnwantedBlock( msgs, req );
1355        dbgmsg( msgs, "we didn't ask for this message..." );
1356        return 0;
1357    }
1358
1359    dbgmsg( msgs, "got block %u:%u->%u (turnaround time %d secs)", 
1360                  myreq->index, myreq->offset, myreq->length,
1361                  (int)(time(NULL) - myreq->time_requested) );
1362    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1363                  tr_list_size(msgs->clientAskedFor));
1364
1365    tr_free( myreq );
1366    myreq = NULL;
1367
1368    /**
1369    *** Error checks
1370    **/
1371
1372    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1373        dbgmsg( msgs, "we have this block already..." );
1374        clientGotUnwantedBlock( msgs, req );
1375        return 0;
1376    }
1377
1378    /**
1379    ***  Save the block
1380    **/
1381
1382    msgs->info->peerSentPieceDataAt = time( NULL );
1383    i = tr_ioWrite( tor, req->index, req->offset, req->length, data );
1384    if( i )
1385        return 0;
1386
1387    tr_cpBlockAdd( tor->completion, block );
1388
1389    addPeerToBlamefield( msgs, req->index );
1390
1391    fireGotBlock( msgs, req );
1392
1393    /**
1394    ***  Handle if this was the last block in the piece
1395    **/
1396
1397    if( tr_cpPieceIsComplete( tor->completion, req->index ) )
1398    {
1399        if( tr_ioHash( tor, req->index ) )
1400        {
1401            gotBadPiece( msgs, req->index );
1402            return 0;
1403        }
1404
1405        fireClientHave( msgs, req->index );
1406    }
1407
1408    return 0;
1409}
1410
1411static ReadState
1412canRead( struct bufferevent * evin, void * vmsgs )
1413{
1414    ReadState ret;
1415    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1416    struct evbuffer * inbuf = EVBUFFER_INPUT ( evin );
1417    const size_t buflen = EVBUFFER_LENGTH( inbuf );
1418    const size_t downloadMax = getDownloadMax( msgs );
1419    const size_t n = MIN( buflen, downloadMax );
1420
1421    if( !n )
1422    {
1423        ret = READ_DONE;
1424    }
1425    else switch( msgs->state )
1426    {
1427        case AWAITING_BT_LENGTH:  ret = readBtLength ( msgs, inbuf, n ); break;
1428        case AWAITING_BT_ID:      ret = readBtId     ( msgs, inbuf, n ); break;
1429        case AWAITING_BT_MESSAGE: ret = readBtMessage( msgs, inbuf, n ); break;
1430        case AWAITING_BT_PIECE:   ret = readBtPiece  ( msgs, inbuf, n ); break;
1431        default: assert( 0 );
1432    }
1433
1434    return ret;
1435}
1436
1437static void
1438sendKeepalive( tr_peermsgs * msgs )
1439{
1440    dbgmsg( msgs, "sending a keepalive message" );
1441    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1442}
1443
1444/**
1445***
1446**/
1447
1448static size_t
1449getUploadMax( const tr_peermsgs * msgs )
1450{
1451    static const size_t maxval = ~0;
1452    const tr_torrent * tor = msgs->torrent;
1453    const int useSwift = SWIFT_ENABLED && !tr_torrentIsSeed( msgs->torrent );
1454    const size_t swiftLeft = msgs->info->credit;
1455    size_t speedLeft;
1456    size_t bufLeft;
1457    size_t ret;
1458
1459    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1460        speedLeft = tor->handle->useUploadLimit ? tr_rcBytesLeft( tor->handle->upload ) : maxval;
1461    else if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
1462        speedLeft = tr_rcBytesLeft( tor->upload );
1463    else
1464        speedLeft = ~0;
1465
1466    bufLeft = MAX_OUTBUF_SIZE - tr_peerIoWriteBytesWaiting( msgs->io );
1467    ret = MIN( speedLeft, bufLeft );
1468    if( useSwift)
1469        ret = MIN( ret, swiftLeft );
1470    return ret;
1471}
1472
1473static int
1474ratePulse( void * vmsgs )
1475{
1476    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1477    msgs->info->rateToClient = tr_rcRate( msgs->info->rcToClient );
1478    msgs->info->rateToPeer = tr_rcRate( msgs->info->rcToPeer );
1479    msgs->maxActiveRequests = MIN( 8 + (int)(msgs->info->rateToClient/5), 100 );
1480    msgs->minActiveRequests = msgs->maxActiveRequests / 3;
1481    return TRUE;
1482}
1483
1484static struct peer_request*
1485popNextRequest( tr_peermsgs * msgs )
1486{
1487    struct peer_request * ret;
1488    ret = tr_list_pop_front( &msgs->peerAskedForFast );
1489    if( !ret )
1490        ret = tr_list_pop_front( &msgs->peerAskedFor);
1491    return ret;
1492}
1493
1494static void
1495updatePeerStatus( tr_peermsgs * msgs )
1496{
1497    tr_peer * peer = msgs->info;
1498
1499    if( !msgs->peerSentBitfield )
1500        peer->status = TR_PEER_STATUS_HANDSHAKE;
1501
1502    else if( ( time(NULL) - peer->pieceDataActivityDate ) < 3 )
1503        peer->status = peer->clientIsChoked
1504                       ? TR_PEER_STATUS_ACTIVE_AND_CHOKED
1505                       : TR_PEER_STATUS_ACTIVE;
1506
1507    else if( peer->peerIsChoked )
1508        peer->status = TR_PEER_STATUS_PEER_IS_CHOKED;
1509
1510    else if( peer->clientIsChoked )
1511        peer->status = peer->clientIsInterested
1512                       ? TR_PEER_STATUS_CLIENT_IS_INTERESTED
1513                       : TR_PEER_STATUS_CLIENT_IS_CHOKED;
1514
1515    else if( msgs->clientAskedFor != NULL )
1516        peer->status = TR_PEER_STATUS_REQUEST_SENT;
1517
1518    else
1519        peer->status = TR_PEER_STATUS_READY;
1520}
1521
1522static int
1523pulse( void * vmsgs )
1524{
1525    const time_t now = time( NULL );
1526    tr_peermsgs * msgs = vmsgs;
1527    struct peer_request * r;
1528
1529    tr_peerIoTryRead( msgs->io );
1530    pumpRequestQueue( msgs );
1531    updatePeerStatus( msgs );
1532
1533    if( msgs->sendingBlock )
1534    {
1535        const size_t uploadMax = getUploadMax( msgs );
1536        size_t len = EVBUFFER_LENGTH( msgs->outBlock );
1537        const size_t outlen = MIN( len, uploadMax );
1538
1539        assert( len );
1540
1541        if( outlen )
1542        {
1543            tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen );
1544            evbuffer_drain( msgs->outBlock, outlen );
1545            peerGotBytes( msgs, outlen );
1546
1547            len -= outlen;
1548            msgs->clientSentAnythingAt = now;
1549            msgs->sendingBlock = len!=0;
1550
1551            dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen, (int)len );
1552        }
1553        else dbgmsg( msgs, "stalled writing block... uploadMax %lu, outlen %lu", uploadMax, outlen );
1554    }
1555
1556    if( !msgs->sendingBlock )
1557    {
1558        if(( EVBUFFER_LENGTH( msgs->outMessages ) ))
1559        {
1560            dbgmsg( msgs, "flushing outMessages..." );
1561            tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1562            msgs->clientSentAnythingAt = now;
1563        }
1564        else if( !EVBUFFER_LENGTH( msgs->outBlock )
1565            && (( r = popNextRequest( msgs )))
1566            && requestIsValid( msgs, r )
1567            && tr_cpPieceIsComplete( msgs->torrent->completion, r->index ) )
1568        {
1569            uint8_t * buf = tr_new( uint8_t, r->length );
1570
1571            if( !tr_ioRead( msgs->torrent, r->index, r->offset, r->length, buf ) )
1572            {
1573                tr_peerIo * io = msgs->io;
1574                struct evbuffer * out = msgs->outBlock;
1575
1576                dbgmsg( msgs, "sending block %u:%u->%u", r->index, r->offset, r->length );
1577                tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 2*sizeof(uint32_t) + r->length );
1578                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1579                tr_peerIoWriteUint32( io, out, r->index );
1580                tr_peerIoWriteUint32( io, out, r->offset );
1581                tr_peerIoWriteBytes ( io, out, buf, r->length );
1582                msgs->sendingBlock = 1;
1583            }
1584
1585            tr_free( buf );
1586            tr_free( r );
1587        }
1588        else if( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
1589        {
1590            sendKeepalive( msgs );
1591        }
1592    }
1593
1594    return TRUE; /* loop forever */
1595}
1596
1597static void
1598gotError( struct bufferevent * evbuf UNUSED, short what, void * vmsgs )
1599{
1600    if( what & EVBUFFER_TIMEOUT )
1601        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1602
1603    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) ) {
1604        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1605                what, errno, strerror(errno) );
1606        fireGotError( vmsgs );
1607    }
1608}
1609
1610static void
1611sendBitfield( tr_peermsgs * msgs )
1612{
1613    const tr_bitfield * bitfield = tr_cpPieceBitfield( msgs->torrent->completion );
1614    struct evbuffer * out = msgs->outMessages;
1615
1616    dbgmsg( msgs, "sending peer a bitfield message" );
1617    tr_peerIoWriteUint32( msgs->io, out, sizeof(uint8_t) + bitfield->len );
1618    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1619    tr_peerIoWriteBytes ( msgs->io, out, bitfield->bits, bitfield->len );
1620}
1621
1622/**
1623***
1624**/
1625
1626/* some peers give us error messages if we send
1627   more than this many peers in a single pex message */
1628#define MAX_PEX_DIFFS 200
1629
1630typedef struct
1631{
1632    tr_pex * added;
1633    tr_pex * dropped;
1634    tr_pex * elements;
1635    int addedCount;
1636    int droppedCount;
1637    int elementCount;
1638    int diffCount;
1639}
1640PexDiffs;
1641
1642static void
1643pexAddedCb( void * vpex, void * userData )
1644{
1645    PexDiffs * diffs = (PexDiffs *) userData;
1646    tr_pex * pex = (tr_pex *) vpex;
1647    if( diffs->diffCount < MAX_PEX_DIFFS )
1648    {
1649        diffs->diffCount++;
1650        diffs->added[diffs->addedCount++] = *pex;
1651        diffs->elements[diffs->elementCount++] = *pex;
1652    }
1653}
1654
1655static void
1656pexRemovedCb( void * vpex, void * userData )
1657{
1658    PexDiffs * diffs = (PexDiffs *) userData;
1659    tr_pex * pex = (tr_pex *) vpex;
1660    if( diffs->diffCount < MAX_PEX_DIFFS )
1661    {
1662        diffs->diffCount++;
1663        diffs->dropped[diffs->droppedCount++] = *pex;
1664    }
1665}
1666
1667static void
1668pexElementCb( void * vpex, void * userData )
1669{
1670    PexDiffs * diffs = (PexDiffs *) userData;
1671    tr_pex * pex = (tr_pex *) vpex;
1672    if( diffs->diffCount < MAX_PEX_DIFFS )
1673    {
1674        diffs->diffCount++;
1675        diffs->elements[diffs->elementCount++] = *pex;
1676    }
1677}
1678
1679static void
1680sendPex( tr_peermsgs * msgs )
1681{
1682    if( msgs->peerSupportsPex && tr_torrentIsPexEnabled( msgs->torrent ) )
1683    {
1684        int i;
1685        tr_pex * newPex = NULL;
1686        const int newCount = tr_peerMgrGetPeers( msgs->handle->peerMgr, msgs->torrent->info.hash, &newPex );
1687        PexDiffs diffs;
1688        benc_val_t val, *added, *dropped, *flags;
1689        uint8_t *tmp, *walk;
1690        char * benc;
1691        int bencLen;
1692
1693        /* build the diffs */
1694        diffs.added = tr_new( tr_pex, newCount );
1695        diffs.addedCount = 0;
1696        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1697        diffs.droppedCount = 0;
1698        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1699        diffs.elementCount = 0;
1700        diffs.diffCount = 0;
1701        tr_set_compare( msgs->pex, msgs->pexCount,
1702                        newPex, newCount,
1703                        tr_pexCompare, sizeof(tr_pex),
1704                        pexRemovedCb, pexAddedCb, pexElementCb, &diffs );
1705        dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1706
1707        /* update peer */
1708        tr_free( msgs->pex );
1709        msgs->pex = diffs.elements;
1710        msgs->pexCount = diffs.elementCount;
1711
1712        /* build the pex payload */
1713        tr_bencInit( &val, TYPE_DICT );
1714        tr_bencDictReserve( &val, 3 );
1715
1716        /* "added" */
1717        added = tr_bencDictAdd( &val, "added" );
1718        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1719        for( i=0; i<diffs.addedCount; ++i ) {
1720            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1721            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1722        }
1723        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1724        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
1725
1726        /* "added.f" */
1727        flags = tr_bencDictAdd( &val, "added.f" );
1728        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1729        for( i=0; i<diffs.addedCount; ++i )
1730            *walk++ = diffs.added[i].flags;
1731        assert( ( walk - tmp ) == diffs.addedCount );
1732        tr_bencInitStr( flags, tmp, walk-tmp, FALSE );
1733
1734        /* "dropped" */
1735        dropped = tr_bencDictAdd( &val, "dropped" );
1736        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1737        for( i=0; i<diffs.droppedCount; ++i ) {
1738            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1739            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1740        }
1741        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1742        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
1743
1744        /* write the pex message */
1745        benc = tr_bencSave( &val, &bencLen );
1746        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 2*sizeof(uint8_t) + bencLen );
1747        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_LTEP );
1748        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, OUR_LTEP_PEX );
1749        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, benc, bencLen );
1750
1751        /* cleanup */
1752        tr_free( benc );
1753        tr_bencFree( &val );
1754        tr_free( diffs.added );
1755        tr_free( diffs.dropped );
1756        tr_free( newPex );
1757
1758        msgs->clientSentPexAt = time( NULL );
1759    }
1760}
1761
1762static int
1763pexPulse( void * vpeer )
1764{
1765    sendPex( vpeer );
1766    return TRUE;
1767}
1768
1769/**
1770***
1771**/
1772
1773tr_peermsgs*
1774tr_peerMsgsNew( struct tr_torrent * torrent,
1775                struct tr_peer    * info,
1776                tr_delivery_func    func,
1777                void              * userData,
1778                tr_publisher_tag  * setme )
1779{
1780    tr_peermsgs * m;
1781
1782    assert( info != NULL );
1783    assert( info->io != NULL );
1784
1785    m = tr_new0( tr_peermsgs, 1 );
1786    m->publisher = tr_publisherNew( );
1787    m->info = info;
1788    m->handle = torrent->handle;
1789    m->torrent = torrent;
1790    m->io = info->io;
1791    m->info->status = TR_PEER_STATUS_HANDSHAKE;
1792    m->info->clientIsChoked = 1;
1793    m->info->peerIsChoked = 1;
1794    m->info->clientIsInterested = 0;
1795    m->info->peerIsInterested = 0;
1796    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1797    m->state = AWAITING_BT_LENGTH;
1798    m->pulseTimer = tr_timerNew( m->handle, pulse, m, PEER_PULSE_INTERVAL );
1799    m->rateTimer = tr_timerNew( m->handle, ratePulse, m, RATE_PULSE_INTERVAL );
1800    m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL );
1801    m->outMessages = evbuffer_new( );
1802    m->incoming.block = evbuffer_new( );
1803    m->outBlock = evbuffer_new( );
1804    m->peerAllowedPieces = NULL;
1805    m->clientAllowedPieces = NULL;
1806    m->clientSuggestedPieces = NULL;
1807    *setme = tr_publisherSubscribe( m->publisher, func, userData );
1808   
1809    if ( tr_peerIoSupportsFEXT( m->io ) )
1810    {
1811        /* This peer is fastpeer-enabled, generate its allowed set
1812         * (before registering our callbacks) */
1813        if ( !m->peerAllowedPieces ) {
1814            const struct in_addr *peerAddr = tr_peerIoGetAddress( m->io, NULL );
1815           
1816            m->peerAllowedPieces = tr_peerMgrGenerateAllowedSet( MAX_FAST_ALLOWED_COUNT,
1817                                                                 m->torrent->info.pieceCount,
1818                                                                 m->torrent->info.hash,
1819                                                                 peerAddr );
1820        }
1821        m->clientAllowedPieces = tr_bitfieldNew( m->torrent->info.pieceCount );
1822       
1823        m->clientSuggestedPieces = tr_bitfieldNew( m->torrent->info.pieceCount );
1824    }
1825   
1826    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of inactivity */
1827    tr_peerIoSetIOFuncs( m->io, canRead, gotError, m );
1828    ratePulse( m );
1829
1830    if ( tr_peerIoSupportsLTEP( m->io ) )
1831        sendLtepHandshake( m );
1832
1833    if ( !tr_peerIoSupportsFEXT( m->io ) )
1834        sendBitfield( m );
1835    else {
1836        /* This peer is fastpeer-enabled, send it have-all or have-none if appropriate */
1837        uint32_t peerProgress;
1838        float completion = tr_cpPercentComplete( m->torrent->completion );
1839        if ( completion == 0.0f ) {
1840            sendFastHave( m, 0 );
1841        } else if ( completion == 1.0f ) {
1842            sendFastHave( m, 1 );
1843        } else {
1844            sendBitfield( m );
1845        }
1846        peerProgress = m->torrent->info.pieceCount * m->info->progress;
1847       
1848        if ( peerProgress < MAX_FAST_ALLOWED_COUNT )
1849            sendFastAllowedSet( m );
1850    }
1851
1852    return m;
1853}
1854
1855void
1856tr_peerMsgsFree( tr_peermsgs* msgs )
1857{
1858    if( msgs != NULL )
1859    {
1860        tr_timerFree( &msgs->pulseTimer );
1861        tr_timerFree( &msgs->rateTimer );
1862        tr_timerFree( &msgs->pexTimer );
1863        tr_publisherFree( &msgs->publisher );
1864        tr_list_free( &msgs->clientWillAskFor, tr_free );
1865        tr_list_free( &msgs->clientAskedFor, tr_free );
1866        tr_list_free( &msgs->peerAskedForFast, tr_free );
1867        tr_list_free( &msgs->peerAskedFor, tr_free );
1868        tr_bitfieldFree( msgs->peerAllowedPieces );
1869        tr_bitfieldFree( msgs->clientAllowedPieces );
1870        tr_bitfieldFree( msgs->clientSuggestedPieces );
1871        evbuffer_free( msgs->incoming.block );
1872        evbuffer_free( msgs->outMessages );
1873        evbuffer_free( msgs->outBlock );
1874        tr_free( msgs->pex );
1875
1876        memset( msgs, ~0, sizeof( tr_peermsgs ) );
1877        tr_free( msgs );
1878    }
1879}
1880
1881tr_publisher_tag
1882tr_peerMsgsSubscribe( tr_peermsgs       * peer,
1883                      tr_delivery_func    func,
1884                      void              * userData )
1885{
1886    return tr_publisherSubscribe( peer->publisher, func, userData );
1887}
1888
1889void
1890tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
1891                        tr_publisher_tag    tag )
1892{
1893    tr_publisherUnsubscribe( peer->publisher, tag );
1894}
1895
1896int
1897tr_peerMsgsIsPieceFastAllowed( const tr_peermsgs * peer,
1898                               uint32_t            index )
1899{
1900    return tr_bitfieldHas( peer->clientAllowedPieces, index );
1901}
1902
1903int
1904tr_peerMsgsIsPieceSuggested( const tr_peermsgs * peer,
1905                             uint32_t            index )
1906{
1907    return tr_bitfieldHas( peer->clientSuggestedPieces, index );
1908}
1909
Note: See TracBrowser for help on using the repository browser.