source: trunk/libtransmission/peer-msgs.c @ 3832

Last change on this file since 3832 was 3832, checked in by charles, 15 years ago

if we haven't advertised that we support pex, then even if a peer sends us a pex message, don't send one back.

  • Property svn:keywords set to Date Rev Author Id
File size: 51.1 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 3832 2007-11-16 04:42:51Z charles $
11 */
12
13#include <assert.h>
14#include <ctype.h>
15#include <errno.h>
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19#include <libgen.h> /* basename */
20
21#include <netinet/in.h> /* struct in_addr */
22
23#include <event.h>
24
25#include "transmission.h"
26#include "bencode.h"
27#include "completion.h"
28#include "inout.h"
29#include "list.h"
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-mgr-private.h"
33#include "peer-msgs.h"
34#include "ratecontrol.h"
35#include "trevent.h"
36#include "utils.h"
37
38/**
39***
40**/
41
42enum
43{
44    BT_CHOKE                = 0,
45    BT_UNCHOKE              = 1,
46    BT_INTERESTED           = 2,
47    BT_NOT_INTERESTED       = 3,
48    BT_HAVE                 = 4,
49    BT_BITFIELD             = 5,
50    BT_REQUEST              = 6,
51    BT_PIECE                = 7,
52    BT_CANCEL               = 8,
53    BT_PORT                 = 9,
54    BT_SUGGEST              = 13,
55    BT_HAVE_ALL             = 14,
56    BT_HAVE_NONE            = 15,
57    BT_REJECT               = 16,
58    BT_ALLOWED_FAST         = 17,
59    BT_LTEP                 = 20,
60
61    LTEP_HANDSHAKE          = 0,
62
63    OUR_LTEP_PEX            = 1,
64
65    MAX_REQUEST_BYTE_COUNT  = (16 * 1024), /* drop requests who want too much */
66
67    /* idle seconds before we send a keepalive */
68    KEEPALIVE_INTERVAL_SECS = 90,
69
70    PEX_INTERVAL            = (60 * 1000), /* msec between calls to sendPex() */
71    PEER_PULSE_INTERVAL     = (100),       /* msec between calls to pulse() */
72    RATE_PULSE_INTERVAL     = (333),       /* msec between calls to ratePulse() */
73     
74    /* Fast Peers Extension constants */
75    MAX_FAST_ALLOWED_COUNT   = 10,          /* max. number of pieces we fast-allow to another peer */
76    MAX_FAST_ALLOWED_THRESHOLD = 10,        /* max threshold for allowing fast-pieces requests */
77
78};
79
80enum
81{
82    AWAITING_BT_LENGTH,
83    AWAITING_BT_MESSAGE
84};
85
86struct peer_request
87{
88    uint32_t index;
89    uint32_t offset;
90    uint32_t length;
91    time_t time_requested;
92};
93
94static int
95compareRequest( const void * va, const void * vb )
96{
97    int i;
98    const struct peer_request * a = va;
99    const struct peer_request * b = vb;
100    if(( i = tr_compareUint32( a->index, b->index ))) return i;
101    if(( i = tr_compareUint32( a->offset, b->offset ))) return i;
102    if(( i = tr_compareUint32( a->length, b->length ))) return i;
103    return 0;
104}
105
106struct tr_peermsgs
107{
108    tr_peer * info;
109
110    tr_handle * handle;
111    tr_torrent * torrent;
112    tr_peerIo * io;
113
114    tr_publisher_t * publisher;
115
116    struct evbuffer * outBlock;    /* buffer of all the current piece message */
117    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
118    tr_list * peerAskedFor;
119    tr_list * peerAskedForFast;
120    tr_list * clientAskedFor;
121    tr_list * clientWillAskFor;
122
123    tr_timer * rateTimer;
124    tr_timer * pulseTimer;
125    tr_timer * pexTimer;
126
127    time_t lastReqAddedAt;
128    time_t clientSentPexAt;
129    time_t clientSentAnythingAt;
130
131    unsigned int notListening             : 1;
132    unsigned int peerSupportsPex          : 1;
133    unsigned int clientSentLtepHandshake  : 1;
134    unsigned int peerSentLtepHandshake    : 1;
135   
136    tr_bitfield * clientAllowedPieces;
137    tr_bitfield * peerAllowedPieces;
138   
139    tr_bitfield * clientSuggestedPieces;
140   
141    uint8_t state;
142    uint8_t ut_pex_id;
143    uint16_t pexCount;
144    uint32_t incomingMessageLength;
145    uint32_t maxActiveRequests;
146    uint32_t minActiveRequests;
147
148    tr_pex * pex;
149};
150
151/**
152***
153**/
154
155static void
156myDebug( const char * file, int line,
157         const struct tr_peermsgs * msgs,
158         const char * fmt, ... )
159{
160    FILE * fp = tr_getLog( );
161    if( fp != NULL )
162    {
163        va_list args;
164        char timestr[64];
165        struct evbuffer * buf = evbuffer_new( );
166        char * myfile = tr_strdup( file );
167
168        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
169                             tr_getLogTimeStr( timestr, sizeof(timestr) ),
170                             msgs->torrent->info.name,
171                             tr_peerIoGetAddrStr( msgs->io ),
172                             msgs->info->client );
173        va_start( args, fmt );
174        evbuffer_add_vprintf( buf, fmt, args );
175        va_end( args );
176        evbuffer_add_printf( buf, " (%s:%d)\n", basename(myfile), line );
177        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
178
179        tr_free( myfile );
180        evbuffer_free( buf );
181    }
182}
183
184#define dbgmsg(msgs, fmt...) myDebug(__FILE__, __LINE__, msgs, ##fmt )
185
186/**
187***
188**/
189
190static void
191protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
192{
193    tr_peerIo * io = msgs->io;
194    struct evbuffer * out = msgs->outMessages;
195
196    dbgmsg( msgs, "requesting %u:%u->%u", req->index, req->offset, req->length );
197    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
198    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
199    tr_peerIoWriteUint32( io, out, req->index );
200    tr_peerIoWriteUint32( io, out, req->offset );
201    tr_peerIoWriteUint32( io, out, req->length );
202}
203
204static void
205protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
206{
207    tr_peerIo * io = msgs->io;
208    struct evbuffer * out = msgs->outMessages;
209
210    dbgmsg( msgs, "cancelling %u:%u->%u", req->index, req->offset, req->length );
211    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
212    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
213    tr_peerIoWriteUint32( io, out, req->index );
214    tr_peerIoWriteUint32( io, out, req->offset );
215    tr_peerIoWriteUint32( io, out, req->length );
216}
217
218static void
219protocolSendHave( tr_peermsgs * msgs, uint32_t index )
220{
221    tr_peerIo * io = msgs->io;
222    struct evbuffer * out = msgs->outMessages;
223
224    dbgmsg( msgs, "sending Have %u", index );
225    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
226    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
227    tr_peerIoWriteUint32( io, out, index );
228}
229
230static void
231protocolSendChoke( tr_peermsgs * msgs, int choke )
232{
233    tr_peerIo * io = msgs->io;
234    struct evbuffer * out = msgs->outMessages;
235
236    dbgmsg( msgs, "sending %s", (choke ? "Choke" : "Unchoke") );
237    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) );
238    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
239}
240
241/**
242***  EVENTS
243**/
244
245static const tr_peermsgs_event blankEvent = { 0, 0, 0, 0, 0.0f };
246
247static void
248publish( tr_peermsgs * msgs, tr_peermsgs_event * e )
249{
250    tr_publisherPublish( msgs->publisher, msgs->info, e );
251}
252
253static void
254fireGotAssertError( tr_peermsgs * msgs )
255{
256    tr_peermsgs_event e = blankEvent;
257    e.eventType = TR_PEERMSG_GOT_ASSERT_ERROR;
258    publish( msgs, &e );
259}
260
261static void
262fireGotError( tr_peermsgs * msgs )
263{
264    tr_peermsgs_event e = blankEvent;
265    e.eventType = TR_PEERMSG_GOT_ERROR;
266    publish( msgs, &e );
267}
268
269static void
270fireNeedReq( tr_peermsgs * msgs )
271{
272    tr_peermsgs_event e = blankEvent;
273    e.eventType = TR_PEERMSG_NEED_REQ;
274    publish( msgs, &e );
275}
276
277static void
278firePeerProgress( tr_peermsgs * msgs )
279{
280    tr_peermsgs_event e = blankEvent;
281    e.eventType = TR_PEERMSG_PEER_PROGRESS;
282    e.progress = msgs->info->progress;
283    publish( msgs, &e );
284}
285
286static void
287fireClientHave( tr_peermsgs * msgs, uint32_t pieceIndex )
288{
289    tr_peermsgs_event e = blankEvent;
290    e.eventType = TR_PEERMSG_CLIENT_HAVE;
291    e.pieceIndex = pieceIndex;
292    publish( msgs, &e );
293}
294
295static void
296fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
297{
298    tr_peermsgs_event e = blankEvent;
299    e.eventType = TR_PEERMSG_CLIENT_BLOCK;
300    e.pieceIndex = req->index;
301    e.offset = req->offset;
302    e.length = req->length;
303    publish( msgs, &e );
304}
305
306static void
307fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
308{
309    tr_peermsgs_event e = blankEvent;
310    e.eventType = TR_PEERMSG_CANCEL;
311    e.pieceIndex = req->index;
312    e.offset = req->offset;
313    e.length = req->length;
314    publish( msgs, &e );
315}
316
317/**
318***  INTEREST
319**/
320
321static int
322isPieceInteresting( const tr_peermsgs   * peer,
323                    int                   piece )
324{
325    const tr_torrent * torrent = peer->torrent;
326    if( torrent->info.pieces[piece].dnd ) /* we don't want it */
327        return FALSE;
328    if( tr_cpPieceIsComplete( torrent->completion, piece ) ) /* we have it */
329        return FALSE;
330    if( !tr_bitfieldHas( peer->info->have, piece ) ) /* peer doesn't have it */
331        return FALSE;
332    if( tr_bitfieldHas( peer->info->banned, piece ) ) /* peer is banned */
333        return FALSE;
334    return TRUE;
335}
336
337/* "interested" means we'll ask for piece data if they unchoke us */
338static int
339isPeerInteresting( const tr_peermsgs * msgs )
340{
341    int i;
342    const tr_torrent * torrent;
343    const tr_bitfield * bitfield;
344    const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
345
346    if( clientIsSeed )
347        return FALSE;
348
349    torrent = msgs->torrent;
350    bitfield = tr_cpPieceBitfield( torrent->completion );
351
352    if( !msgs->info->have )
353        return TRUE;
354
355    assert( bitfield->len == msgs->info->have->len );
356    for( i=0; i<torrent->info.pieceCount; ++i )
357        if( isPieceInteresting( msgs, i ) )
358            return TRUE;
359
360    return FALSE;
361}
362
363static void
364sendInterest( tr_peermsgs * msgs, int weAreInterested )
365{
366    assert( msgs != NULL );
367    assert( weAreInterested==0 || weAreInterested==1 );
368
369    msgs->info->clientIsInterested = weAreInterested;
370    dbgmsg( msgs, "Sending %s",
371            weAreInterested ? "Interested" : "Not Interested");
372
373    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
374    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages,
375                   weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
376}
377
378static void
379updateInterest( tr_peermsgs * msgs )
380{
381    const int i = isPeerInteresting( msgs );
382    if( i != msgs->info->clientIsInterested )
383        sendInterest( msgs, i );
384    if( i )
385        fireNeedReq( msgs );
386}
387
388#define MIN_CHOKE_PERIOD_SEC 10
389
390static void
391cancelAllRequestsToClientExceptFast( tr_peermsgs * msgs )
392{
393    tr_list_free( &msgs->peerAskedFor, tr_free );
394}
395
396void
397tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
398{
399    const time_t fibrillationTime = time(NULL) - MIN_CHOKE_PERIOD_SEC;
400
401    assert( msgs != NULL );
402    assert( msgs->info != NULL );
403    assert( choke==0 || choke==1 );
404
405    if( msgs->info->chokeChangedAt > fibrillationTime )
406    {
407        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
408    }
409    else if( msgs->info->peerIsChoked != choke )
410    {
411        msgs->info->peerIsChoked = choke;
412        if( choke )
413            cancelAllRequestsToClientExceptFast( msgs );
414        protocolSendChoke( msgs, choke );
415        msgs->info->chokeChangedAt = time( NULL );
416    }
417}
418
419/**
420***
421**/
422
423void
424tr_peerMsgsHave( tr_peermsgs * msgs,
425                 uint32_t      index )
426{
427    protocolSendHave( msgs, index );
428
429    /* since we have more pieces now, we might not be interested in this peer */
430    updateInterest( msgs );
431}
432#if 0
433static void
434sendFastSuggest( tr_peermsgs * msgs,
435                 uint32_t      pieceIndex )
436{
437    assert( msgs != NULL );
438   
439    if( tr_peerIoSupportsFEXT( msgs->io ) )
440    {
441        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
442        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
443        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
444    }
445}
446#endif
447static void
448sendFastHave( tr_peermsgs * msgs, int all )
449{
450    assert( msgs != NULL );
451   
452    if( tr_peerIoSupportsFEXT( msgs->io ) )
453    {
454        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
455        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, ( all ? BT_HAVE_ALL
456                                                                : BT_HAVE_NONE ) );
457        updateInterest( msgs );
458    }
459}
460
461static void
462sendFastReject( tr_peermsgs * msgs,
463                uint32_t      pieceIndex,
464                uint32_t      offset,
465                uint32_t      length )
466{
467    assert( msgs != NULL );
468
469    if( tr_peerIoSupportsFEXT( msgs->io ) )
470    {
471        const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
472        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
473        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_REJECT );
474        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
475        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
476        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
477    }
478}
479
480static void
481sendFastAllowed( tr_peermsgs * msgs,
482                 uint32_t      pieceIndex)
483{
484    assert( msgs != NULL );
485   
486    if( tr_peerIoSupportsFEXT( msgs->io ) )
487    {
488        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
489        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_ALLOWED_FAST );
490        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
491    }
492}
493
494
495
496static void
497sendFastAllowedSet( tr_peermsgs * msgs )
498{
499    int i = 0;
500    while (i <= msgs->torrent->info.pieceCount )
501    {
502        if ( tr_bitfieldHas( msgs->peerAllowedPieces, i) )
503            sendFastAllowed( msgs, i );
504        i++;
505    }
506}
507
508
509/**
510***
511**/
512
513static int
514reqIsValid( const tr_peermsgs   * msgs,
515            uint32_t              index,
516            uint32_t              offset,
517            uint32_t              length )
518{
519    const tr_torrent * tor = msgs->torrent;
520
521    if( index >= (uint32_t) tor->info.pieceCount )
522        return FALSE;
523    if ( (int)offset >= tr_torPieceCountBytes( tor, (int)index ) )
524        return FALSE;
525    if( length > MAX_REQUEST_BYTE_COUNT )
526        return FALSE;
527    if( tr_pieceOffset( tor, index, offset, length ) > tor->info.totalSize )
528        return FALSE;
529
530    return TRUE;
531}
532
533static int
534requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
535{
536    return reqIsValid( msgs, req->index, req->offset, req->length );
537}
538
539static void
540pumpRequestQueue( tr_peermsgs * msgs )
541{
542    const int max = msgs->maxActiveRequests;
543    const int min = msgs->minActiveRequests;
544    int count = tr_list_size( msgs->clientAskedFor );
545    int sent = 0;
546
547    if( count > min )
548        return;
549    if( msgs->info->clientIsChoked )
550        return;
551
552    while( ( count < max ) && ( msgs->clientWillAskFor != NULL ) )
553    {
554        struct peer_request * r = tr_list_pop_front( &msgs->clientWillAskFor );
555        protocolSendRequest( msgs, r );
556        r->time_requested = msgs->lastReqAddedAt = time( NULL );
557        tr_list_append( &msgs->clientAskedFor, r );
558        ++count;
559        ++sent;
560    }
561
562    if( sent )
563        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
564                sent,
565                tr_list_size(msgs->clientAskedFor),
566                tr_list_size(msgs->clientWillAskFor) );
567
568    if( count < max )
569        fireNeedReq( msgs );
570}
571
572static int
573pulse( void * vmsgs );
574
575int
576tr_peerMsgsAddRequest( tr_peermsgs * msgs,
577                       uint32_t      index, 
578                       uint32_t      offset, 
579                       uint32_t      length )
580{
581    const int req_max = msgs->maxActiveRequests;
582    struct peer_request tmp, *req;
583
584    assert( msgs != NULL );
585    assert( msgs->torrent != NULL );
586    assert( reqIsValid( msgs, index, offset, length ) );
587
588    /**
589    ***  Reasons to decline the request
590    **/
591
592    /* don't send requests to choked clients */
593    if( msgs->info->clientIsChoked ) {
594        dbgmsg( msgs, "declining request because they're choking us" );
595        return TR_ADDREQ_CLIENT_CHOKED;
596    }
597
598    /* peer doesn't have this piece */
599    if( !tr_bitfieldHas( msgs->info->have, index ) )
600        return TR_ADDREQ_MISSING;
601
602    /* peer's queue is full */
603    if( tr_list_size( msgs->clientWillAskFor ) >= req_max ) {
604        dbgmsg( msgs, "declining request because we're full" );
605        return TR_ADDREQ_FULL;
606    }
607
608    /* have we already asked for this piece? */
609    tmp.index = index;
610    tmp.offset = offset;
611    tmp.length = length;
612    if( tr_list_find( msgs->clientAskedFor, &tmp, compareRequest ) ) {
613        dbgmsg( msgs, "declining because it's a duplicate" );
614        return TR_ADDREQ_DUPLICATE;
615    }
616    if( tr_list_find( msgs->clientWillAskFor, &tmp, compareRequest ) ) {
617        dbgmsg( msgs, "declining because it's a duplicate" );
618        return TR_ADDREQ_DUPLICATE;
619    }
620
621    /**
622    ***  Accept this request
623    **/
624
625    dbgmsg( msgs, "added req for piece %d, offset %d", (int)index, (int)offset );
626    req = tr_new0( struct peer_request, 1 );
627    *req = tmp;
628    tr_list_append( &msgs->clientWillAskFor, req );
629    pulse( msgs );
630    return TR_ADDREQ_OK;
631}
632
633static void
634cancelAllRequestsToPeer( tr_peermsgs * msgs )
635{
636    struct peer_request * req;
637
638    while(( req = tr_list_pop_front( &msgs->clientWillAskFor ) ))
639    {
640        fireCancelledReq( msgs, req );
641        tr_free( req );
642    }
643
644    while(( req = tr_list_pop_front( &msgs->clientAskedFor ) ))
645    {
646        fireCancelledReq( msgs, req );
647        protocolSendCancel( msgs, req );
648        tr_free( req );
649    }
650}
651
652void
653tr_peerMsgsCancel( tr_peermsgs * msgs,
654                   uint32_t      pieceIndex,
655                   uint32_t      offset,
656                   uint32_t      length )
657{
658    struct peer_request *req, tmp;
659
660    assert( msgs != NULL );
661    assert( length > 0 );
662
663    /* have we asked the peer for this piece? */
664    tmp.index = pieceIndex;
665    tmp.offset = offset;
666    tmp.length = length;
667
668    /* if it's only in the queue and hasn't been sent yet, free it */
669    if(( req = tr_list_remove( &msgs->clientWillAskFor, &tmp, compareRequest ) ))
670    {
671        fireCancelledReq( msgs, req );
672        tr_free( req );
673    }
674
675    /* if it's already been sent, send a cancel message too */
676    if(( req = tr_list_remove( &msgs->clientAskedFor, &tmp, compareRequest ) ))
677    {
678        protocolSendCancel( msgs, req );
679        fireCancelledReq( msgs, req );
680        tr_free( req );
681    }
682}
683
684/**
685***
686**/
687
688static void
689sendLtepHandshake( tr_peermsgs * msgs )
690{
691    benc_val_t val, *m;
692    char * buf;
693    int len;
694    int pex;
695    const char * v = TR_NAME " " USERAGENT_PREFIX;
696    const int port = tr_getPublicPort( msgs->handle );
697    struct evbuffer * outbuf;
698
699    if( msgs->clientSentLtepHandshake )
700        return;
701
702    outbuf = evbuffer_new( );
703    dbgmsg( msgs, "sending an ltep handshake" );
704    msgs->clientSentLtepHandshake = 1;
705
706    /* decide if we want to advertise pex support */
707    if( !tr_torrentIsPexEnabled( msgs->torrent ) )
708        pex = 0;
709    else if( msgs->peerSentLtepHandshake )
710        pex = msgs->peerSupportsPex ? 1 : 0;
711    else
712        pex = 1;
713
714    tr_bencInit( &val, TYPE_DICT );
715    tr_bencDictReserve( &val, 4 );
716    tr_bencInitInt( tr_bencDictAdd( &val, "e" ), 1 );
717    m  = tr_bencDictAdd( &val, "m" );
718    tr_bencInit( m, TYPE_DICT );
719    if( pex ) {
720        tr_bencDictReserve( m, 1 );
721        tr_bencInitInt( tr_bencDictAdd( m, "ut_pex" ), OUR_LTEP_PEX );
722    }
723    if( port > 0 )
724        tr_bencInitInt( tr_bencDictAdd( &val, "p" ), port );
725    tr_bencInitStr( tr_bencDictAdd( &val, "v" ), v, 0, 1 );
726    buf = tr_bencSave( &val, &len );
727
728    tr_peerIoWriteUint32( msgs->io, outbuf, 2*sizeof(uint8_t) + len );
729    tr_peerIoWriteUint8 ( msgs->io, outbuf, BT_LTEP );
730    tr_peerIoWriteUint8 ( msgs->io, outbuf, LTEP_HANDSHAKE );
731    tr_peerIoWriteBytes ( msgs->io, outbuf, buf, len );
732
733    tr_peerIoWriteBuf( msgs->io, outbuf );
734
735#if 0
736    dbgmsg( msgs, "here is the ltep handshake we sent:" );
737    tr_bencPrint( &val );
738    dbgmsg( msgs, "here is the ltep handshake we read [%s]:", tr_bencSave( &val, NULL ) );
739#endif
740
741    /* cleanup */
742    tr_bencFree( &val );
743    tr_free( buf );
744    evbuffer_free( outbuf );
745}
746
747static void
748parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
749{
750    benc_val_t val, * sub;
751    uint8_t * tmp = tr_new( uint8_t, len );
752
753    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
754    msgs->peerSentLtepHandshake = 1;
755
756    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
757        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
758        tr_free( tmp );
759        return;
760    }
761
762#if 0
763    dbgmsg( msgs, "here is the ltep handshake we read:" );
764    tr_bencPrint( &val );
765    dbgmsg( msgs, "here is the ltep handshake we read [%s]:", tr_bencSave( &val, NULL ) );
766#endif
767
768    /* does the peer prefer encrypted connections? */
769    sub = tr_bencDictFind( &val, "e" );
770    if( tr_bencIsInt( sub ) )
771        msgs->info->encryption_preference = sub->val.i
772                                      ? ENCRYPTION_PREFERENCE_YES
773                                      : ENCRYPTION_PREFERENCE_NO;
774
775    /* check supported messages for utorrent pex */
776    sub = tr_bencDictFind( &val, "m" );
777    if( tr_bencIsDict( sub ) ) {
778        sub = tr_bencDictFind( sub, "ut_pex" );
779        if( tr_bencIsInt( sub ) ) {
780            msgs->peerSupportsPex = 1;
781            msgs->ut_pex_id = (uint8_t) sub->val.i;
782            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
783        }
784    }
785
786    /* get peer's listening port */
787    sub = tr_bencDictFind( &val, "p" );
788    if( tr_bencIsInt( sub ) ) {
789        msgs->info->port = htons( (uint16_t)sub->val.i );
790        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
791    }
792
793    tr_bencFree( &val );
794    tr_free( tmp );
795}
796
797static void
798parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
799{
800    benc_val_t val, * sub;
801    uint8_t * tmp;
802
803    if( !tr_torrentIsPexEnabled( msgs->torrent ) ) /* no sharing! */
804        return;
805
806    tmp = tr_new( uint8_t, msglen );
807    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
808
809    if( tr_bencLoad( tmp, msglen, &val, NULL ) || !tr_bencIsDict( &val ) ) {
810        dbgmsg( msgs, "GET can't read extended-pex dictionary" );
811        tr_free( tmp );
812        return;
813    }
814
815    sub = tr_bencDictFind( &val, "added" );
816    if( tr_bencIsStr(sub) && ((sub->val.s.i % 6) == 0)) {
817        const int n = sub->val.s.i / 6 ;
818        dbgmsg( msgs, "got %d peers from uT pex", n );
819        tr_peerMgrAddPeers( msgs->handle->peerMgr,
820                            msgs->torrent->info.hash,
821                            TR_PEER_FROM_PEX,
822                            (uint8_t*)sub->val.s.s, n );
823    }
824
825    tr_bencFree( &val );
826    tr_free( tmp );
827}
828
829static void
830sendPex( tr_peermsgs * msgs );
831
832static void
833parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
834{
835    uint8_t ltep_msgid;
836
837    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
838    msglen--;
839
840    if( ltep_msgid == LTEP_HANDSHAKE )
841    {
842        dbgmsg( msgs, "got ltep handshake" );
843        parseLtepHandshake( msgs, msglen, inbuf );
844        if( tr_peerIoSupportsLTEP( msgs->io ) )
845        {
846            sendLtepHandshake( msgs );
847            sendPex( msgs );
848        }
849    }
850    else if( ltep_msgid == msgs->ut_pex_id )
851    {
852        dbgmsg( msgs, "got ut pex" );
853        msgs->peerSupportsPex = 1;
854        parseUtPex( msgs, msglen, inbuf );
855    }
856    else
857    {
858        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
859        evbuffer_drain( inbuf, msglen );
860    }
861}
862
863static int
864readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf )
865{
866    uint32_t len;
867    const size_t needlen = sizeof(uint32_t);
868
869    if( EVBUFFER_LENGTH(inbuf) < needlen )
870        return READ_MORE;
871
872    tr_peerIoReadUint32( msgs->io, inbuf, &len );
873
874    if( len == 0 ) /* peer sent us a keepalive message */
875        dbgmsg( msgs, "got KeepAlive" );
876    else {
877        msgs->incomingMessageLength = len;
878        msgs->state = AWAITING_BT_MESSAGE;
879    }
880
881    return READ_AGAIN;
882}
883
884static void
885updatePeerProgress( tr_peermsgs * msgs )
886{
887    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
888    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
889    updateInterest( msgs );
890    firePeerProgress( msgs );
891}
892
893static int
894clientCanSendFastBlock( const tr_peermsgs * msgs UNUSED )
895{
896    /* We can't send a fast piece if a peer has more than MAX_FAST_ALLOWED_THRESHOLD pieces */
897    if( tr_bitfieldCountTrueBits( msgs->info->have ) > MAX_FAST_ALLOWED_THRESHOLD )
898        return FALSE;
899   
900    /* ...or if we don't have ourself enough pieces */
901    if( tr_bitfieldCountTrueBits( tr_cpPieceBitfield( msgs->torrent->completion ) ) < MAX_FAST_ALLOWED_THRESHOLD )
902        return FALSE;
903
904    /* Maybe a bandwidth limit ? */
905    return TRUE;
906}
907
908static void
909peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
910{
911    const int reqIsValid = requestIsValid( msgs, req );
912    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
913    const int peerIsChoked = msgs->info->peerIsChoked;
914    const int peerIsFast = tr_peerIoSupportsFEXT( msgs->io );
915    const int pieceIsFast = reqIsValid && tr_bitfieldHas( msgs->peerAllowedPieces, req->index );
916    const int canSendFast = clientCanSendFastBlock( msgs );
917
918    if( !reqIsValid ) /* bad request */
919    {
920        dbgmsg( msgs, "rejecting an invalid request." );
921        sendFastReject( msgs, req->index, req->offset, req->length );
922    }
923    else if( !clientHasPiece ) /* we don't have it */
924    {
925        dbgmsg( msgs, "rejecting request for a piece we don't have." );
926        sendFastReject( msgs, req->index, req->offset, req->length );
927    }
928    else if( peerIsChoked && !peerIsFast ) /* doesn't he know he's choked? */
929    {
930        tr_peerMsgsSetChoke( msgs, 1 );
931        sendFastReject( msgs, req->index, req->offset, req->length );
932    }
933    else if( peerIsChoked && peerIsFast && ( !pieceIsFast || !canSendFast ) )
934    {
935        sendFastReject( msgs, req->index, req->offset, req->length );
936    }
937    else /* YAY */
938    {
939        struct peer_request * tmp = tr_new( struct peer_request, 1 );
940        *tmp = *req;
941        if( peerIsFast && pieceIsFast )
942            tr_list_append( &msgs->peerAskedForFast, tmp );
943        else
944            tr_list_append( &msgs->peerAskedFor, tmp );
945    }
946}
947
948static int
949messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
950{
951    switch( id )
952    {
953        case BT_CHOKE:
954        case BT_UNCHOKE:
955        case BT_INTERESTED:
956        case BT_NOT_INTERESTED:
957        case BT_HAVE_ALL:
958        case BT_HAVE_NONE:
959            return len==1;
960
961        case BT_HAVE:
962        case BT_SUGGEST:
963        case BT_ALLOWED_FAST:
964            return len==5;
965
966        case BT_BITFIELD:
967            return len == (msg->torrent->info.pieceCount+7u)/8u + 1u;
968       
969        case BT_REQUEST:
970        case BT_CANCEL:
971        case BT_REJECT:
972            return len==13;
973
974        case BT_PIECE:
975            return len>9 && len<=16393;
976
977        case BT_PORT:
978            return len==3;
979
980        case BT_LTEP:
981            return len >= 2;
982
983        default:
984            return FALSE;
985    }
986}
987
988static int
989clientGotBlock( tr_peermsgs * msgs, const uint8_t * block, const struct peer_request * req );
990
991static int
992readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf )
993{
994    int ret;
995    uint8_t id;
996    uint32_t ui32;
997    uint32_t msglen = msgs->incomingMessageLength;
998    const size_t startBufLen = EVBUFFER_LENGTH( inbuf );
999
1000    if( EVBUFFER_LENGTH(inbuf) < msglen )
1001        return READ_MORE;
1002
1003    tr_peerIoReadUint8( msgs->io, inbuf, &id );
1004    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id, (int)msglen, (int)EVBUFFER_LENGTH(inbuf) );
1005
1006    if( !messageLengthIsCorrect( msgs, id, msglen ) )
1007    {
1008        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1009        fireGotError( msgs );
1010        return READ_DONE;
1011    }
1012
1013    --msglen;
1014    ret = 0;
1015    switch( id )
1016    {
1017        case BT_CHOKE:
1018            dbgmsg( msgs, "got Choke" );
1019            msgs->info->clientIsChoked = 1;
1020            cancelAllRequestsToPeer( msgs );
1021            cancelAllRequestsToClientExceptFast( msgs );
1022            break;
1023
1024        case BT_UNCHOKE:
1025            dbgmsg( msgs, "got Unchoke" );
1026            msgs->info->clientIsChoked = 0;
1027            fireNeedReq( msgs );
1028            break;
1029
1030        case BT_INTERESTED:
1031            dbgmsg( msgs, "got Interested" );
1032            msgs->info->peerIsInterested = 1;
1033            tr_peerMsgsSetChoke( msgs, 0 );
1034            break;
1035
1036        case BT_NOT_INTERESTED:
1037            dbgmsg( msgs, "got Not Interested" );
1038            msgs->info->peerIsInterested = 0;
1039            break;
1040           
1041        case BT_HAVE:
1042            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1043            dbgmsg( msgs, "got Have: %u", ui32 );
1044            tr_bitfieldAdd( msgs->info->have, ui32 );
1045            /* If this is a fast-allowed piece for this peer, mark it as normal now */
1046            if( msgs->clientAllowedPieces != NULL && tr_bitfieldHas( msgs->clientAllowedPieces, ui32 ) )
1047                tr_bitfieldRem( msgs->clientAllowedPieces, ui32 );
1048            updatePeerProgress( msgs );
1049            tr_rcTransferred( msgs->torrent->swarmspeed, msgs->torrent->info.pieceSize );
1050            break;
1051
1052        case BT_BITFIELD: {
1053            const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
1054            dbgmsg( msgs, "got a bitfield" );
1055            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
1056            updatePeerProgress( msgs );
1057            tr_peerMsgsSetChoke( msgs, !clientIsSeed || (msgs->info->progress<1.0) );
1058            fireNeedReq( msgs );
1059            break;
1060        }
1061
1062        case BT_REQUEST: {
1063            struct peer_request req;
1064            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1065            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1066            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1067            dbgmsg( msgs, "got Request: %u:%u->%u", req.index, req.offset, req.length );
1068            peerMadeRequest( msgs, &req );
1069            break;
1070        }
1071
1072        case BT_CANCEL: {
1073            struct peer_request req;
1074            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1075            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1076            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1077            dbgmsg( msgs, "got a Cancel %u:%u->%u", req.index, req.offset, req.length );
1078            tr_free( tr_list_remove( &msgs->peerAskedForFast, &req, compareRequest ) );
1079            tr_free( tr_list_remove( &msgs->peerAskedFor, &req, compareRequest ) );
1080            break;
1081        }
1082
1083        case BT_PIECE: {
1084            uint8_t * block;
1085            struct peer_request req;
1086            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1087            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1088            req.length = msglen - 8;
1089            block = tr_new( uint8_t, req.length );
1090            tr_peerIoReadBytes( msgs->io, inbuf, block, req.length );
1091            dbgmsg( msgs, "got a Block %u:%u->%u", req.index, req.offset, req.length );
1092            ret = clientGotBlock( msgs, block, &req );
1093            tr_free( block );
1094            break;
1095        }
1096       
1097        case BT_PORT:
1098            dbgmsg( msgs, "Got a BT_PORT" );
1099            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1100            break;
1101           
1102        case BT_SUGGEST: {
1103            dbgmsg( msgs, "Got a BT_SUGGEST" );
1104            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1105            const tr_bitfield * bt = tr_cpPieceBitfield( msgs->torrent->completion );
1106            if( tr_bitfieldHas( bt, ui32 ) )
1107                tr_bitfieldAdd( msgs->clientSuggestedPieces, ui32 );
1108            break;
1109        }
1110           
1111        case BT_HAVE_ALL:
1112            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1113            tr_bitfieldAddRange( msgs->info->have, 0, msgs->torrent->info.pieceCount );
1114            updatePeerProgress( msgs );
1115            break;
1116       
1117       
1118        case BT_HAVE_NONE:
1119            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1120            tr_bitfieldClear( msgs->info->have );
1121            updatePeerProgress( msgs );
1122            break;
1123       
1124        case BT_REJECT: {
1125            struct peer_request req;
1126            dbgmsg( msgs, "Got a BT_REJECT" );
1127            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1128            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1129            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1130            tr_free( tr_list_remove( &msgs->clientAskedFor, &req, compareRequest ) );
1131            break;
1132        }
1133
1134        case BT_ALLOWED_FAST: {
1135            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1136            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1137            tr_bitfieldAdd( msgs->clientAllowedPieces, ui32 );
1138            break;
1139        }
1140
1141        case BT_LTEP:
1142            dbgmsg( msgs, "Got a BT_LTEP" );
1143            parseLtep( msgs, msglen, inbuf );
1144            break;
1145
1146        default:
1147            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1148            tr_peerIoDrain( msgs->io, inbuf, msglen );
1149assert( 0 );
1150            break;
1151    }
1152
1153    dbgmsg( msgs, "startBufLen was %d, msglen was %d, current inbuf len is %d", (int)startBufLen, (int)(msglen+1), (int)EVBUFFER_LENGTH(inbuf) );
1154
1155    if( ret == (int)TR_ERROR_ASSERT )
1156    {
1157        fireGotAssertError( msgs );
1158        return READ_DONE;
1159    }
1160    else if( ret == TR_OK )
1161    {
1162        assert( msglen + 1 == msgs->incomingMessageLength );
1163        assert( EVBUFFER_LENGTH(inbuf) == startBufLen - msgs->incomingMessageLength );
1164
1165        msgs->incomingMessageLength = -1;
1166        msgs->state = AWAITING_BT_LENGTH;
1167        return READ_AGAIN;
1168    }
1169    else
1170    {
1171        fireGotError( msgs );
1172        return READ_DONE;
1173    }
1174}
1175
1176static void
1177clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1178{
1179    tr_torrent * tor = msgs->torrent;
1180    tor->activityDate = tr_date( );
1181    tor->downloadedCur += byteCount;
1182    msgs->info->pieceDataActivityDate = time( NULL );
1183    tr_rcTransferred( msgs->info->rcToClient, byteCount );
1184    tr_rcTransferred( tor->download, byteCount );
1185    tr_rcTransferred( tor->handle->download, byteCount );
1186}
1187
1188static void
1189peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1190{
1191    tr_torrent * tor = msgs->torrent;
1192    tor->activityDate = tr_date( );
1193    tor->uploadedCur += byteCount;
1194    msgs->info->pieceDataActivityDate = time( NULL );
1195    tr_rcTransferred( msgs->info->rcToPeer, byteCount );
1196    tr_rcTransferred( tor->upload, byteCount );
1197    tr_rcTransferred( tor->handle->upload, byteCount );
1198}
1199
1200static int
1201canDownload( const tr_peermsgs * msgs )
1202{
1203    tr_torrent * tor = msgs->torrent;
1204
1205    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1206        return !tor->handle->useDownloadLimit || tr_rcCanTransfer( tor->handle->download );
1207
1208    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
1209        return tr_rcCanTransfer( tor->download );
1210
1211    return TRUE;
1212}
1213
1214static void
1215reassignBytesToCorrupt( tr_peermsgs * msgs, uint32_t byteCount )
1216{
1217    tr_torrent * tor = msgs->torrent;
1218
1219    /* increment the `corrupt' field */
1220    tor->corruptCur += byteCount;
1221
1222    /* decrement the `downloaded' field */
1223    if( tor->downloadedCur >= byteCount )
1224        tor->downloadedCur -= byteCount;
1225    else
1226        tor->downloadedCur = 0;
1227}
1228
1229
1230static void
1231gotBadPiece( tr_peermsgs * msgs, uint32_t pieceIndex )
1232{
1233    const uint32_t byteCount = tr_torPieceCountBytes( msgs->torrent, (int)pieceIndex );
1234    reassignBytesToCorrupt( msgs, byteCount );
1235}
1236
1237static void
1238clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1239{
1240    reassignBytesToCorrupt( msgs, req->length );
1241}
1242
1243static void
1244addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1245{
1246    if( !msgs->info->blame )
1247         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1248    tr_bitfieldAdd( msgs->info->blame, index );
1249}
1250
1251static int
1252clientGotBlock( tr_peermsgs * msgs, const uint8_t * data, const struct peer_request * req )
1253{
1254    int i;
1255    tr_torrent * tor = msgs->torrent;
1256    const int block = _tr_block( tor, req->index, req->offset );
1257    struct peer_request *myreq;
1258
1259    assert( msgs != NULL );
1260    assert( req != NULL );
1261
1262    if( req->length != (uint32_t)tr_torBlockCountBytes( msgs->torrent, block ) )
1263    {
1264        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1265                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1266        return TR_ERROR_ASSERT;
1267    }
1268
1269    /* save the block */
1270    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1271
1272    /**
1273    *** Remove the block from our `we asked for this' list
1274    **/
1275
1276    myreq = tr_list_remove( &msgs->clientAskedFor, req, compareRequest );
1277    if( myreq == NULL ) {
1278        clientGotUnwantedBlock( msgs, req );
1279        dbgmsg( msgs, "we didn't ask for this message..." );
1280        return 0;
1281    }
1282
1283    dbgmsg( msgs, "got block %u:%u->%u (turnaround time %d secs)", 
1284                  myreq->index, myreq->offset, myreq->length,
1285                  (int)(time(NULL) - myreq->time_requested) );
1286    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1287                  tr_list_size(msgs->clientAskedFor));
1288
1289    /**
1290    *** Error checks
1291    **/
1292
1293    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1294        dbgmsg( msgs, "we have this block already..." );
1295        clientGotUnwantedBlock( msgs, req );
1296        return 0;
1297    }
1298
1299    /**
1300    ***  Save the block
1301    **/
1302
1303    msgs->info->peerSentPieceDataAt = time( NULL );
1304    clientGotBytes( msgs, req->length );
1305    i = tr_ioWrite( tor, req->index, req->offset, req->length, data );
1306    if( i )
1307        return 0;
1308
1309    tr_cpBlockAdd( tor->completion, block );
1310
1311    addPeerToBlamefield( msgs, req->index );
1312
1313    fireGotBlock( msgs, req );
1314
1315    /**
1316    ***  Handle if this was the last block in the piece
1317    **/
1318
1319    if( tr_cpPieceIsComplete( tor->completion, req->index ) )
1320    {
1321        if( tr_ioHash( tor, req->index ) )
1322        {
1323            gotBadPiece( msgs, req->index );
1324            return 0;
1325        }
1326
1327        fireClientHave( msgs, req->index );
1328    }
1329
1330    return 0;
1331}
1332
1333static ReadState
1334canRead( struct bufferevent * evin, void * vmsgs )
1335{
1336    ReadState ret;
1337    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1338    struct evbuffer * inbuf = EVBUFFER_INPUT ( evin );
1339
1340    if( !canDownload( msgs ) )
1341    {
1342        msgs->notListening = 1;
1343        tr_peerIoSetIOMode ( msgs->io, 0, EV_READ );
1344        ret = READ_DONE;
1345    }
1346    else switch( msgs->state )
1347    {
1348        case AWAITING_BT_LENGTH:  ret = readBtLength  ( msgs, inbuf ); break;
1349        case AWAITING_BT_MESSAGE: ret = readBtMessage ( msgs, inbuf ); break;
1350        default: assert( 0 );
1351    }
1352
1353    return ret;
1354}
1355
1356static void
1357sendKeepalive( tr_peermsgs * msgs )
1358{
1359    dbgmsg( msgs, "sending a keepalive message" );
1360    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1361}
1362
1363/**
1364***
1365**/
1366
1367static int
1368canWrite( const tr_peermsgs * msgs )
1369{
1370    /* don't let our outbuffer get too large */
1371    if( tr_peerIoWriteBytesWaiting( msgs->io ) > 4096 )
1372        return FALSE;
1373
1374    return TRUE;
1375}
1376
1377static size_t
1378getUploadMax( const tr_peermsgs * msgs )
1379{
1380    static const size_t maxval = ~0;
1381    const tr_torrent * tor = msgs->torrent;
1382
1383    if( !canWrite( msgs ) )
1384        return 0;
1385
1386    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1387        return tor->handle->useUploadLimit
1388            ? tr_rcBytesLeft( tor->handle->upload ) : maxval;
1389
1390    if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
1391        return tr_rcBytesLeft( tor->upload );
1392
1393    return maxval;
1394}
1395
1396static int
1397ratePulse( void * vmsgs )
1398{
1399    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1400    msgs->info->rateToClient = tr_rcRate( msgs->info->rcToClient );
1401    msgs->info->rateToPeer = tr_rcRate( msgs->info->rcToPeer );
1402    msgs->maxActiveRequests = MIN( 8 + (int)(msgs->info->rateToClient/5), 100 );
1403    msgs->minActiveRequests = msgs->maxActiveRequests / 3;
1404    return TRUE;
1405}
1406
1407static struct peer_request*
1408popNextRequest( tr_peermsgs * msgs )
1409{
1410    struct peer_request * ret;
1411    ret = tr_list_pop_front( &msgs->peerAskedForFast );
1412    if( !ret )
1413        ret = tr_list_pop_front( &msgs->peerAskedFor);
1414    return ret;
1415}
1416
1417static int
1418pulse( void * vmsgs )
1419{
1420    const time_t now = time( NULL );
1421    tr_peermsgs * msgs = vmsgs;
1422    struct peer_request * r;
1423    size_t len;
1424
1425    /* if we froze out a downloaded block because of speed limits,
1426       start listening to the peer again */
1427    if( msgs->notListening && canDownload( msgs ) )
1428    {
1429        msgs->notListening = 0;
1430        tr_peerIoSetIOMode ( msgs->io, EV_READ, 0 );
1431    }
1432
1433    pumpRequestQueue( msgs );
1434
1435    if( !canWrite( msgs ) )
1436    {
1437    }
1438    else if(( len = EVBUFFER_LENGTH( msgs->outBlock ) ))
1439    {
1440        const size_t uploadMax = getUploadMax( msgs );
1441        const size_t outlen = MIN( len, uploadMax );
1442        tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen );
1443        evbuffer_drain( msgs->outBlock, outlen );
1444        msgs->clientSentAnythingAt = now;
1445        peerGotBytes( msgs, outlen );
1446        len -= outlen;
1447        dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen, (int)len );
1448        fflush( stdout );
1449    }
1450    else if(( len = EVBUFFER_LENGTH( msgs->outMessages ) ))
1451    {
1452        tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1453        msgs->clientSentAnythingAt = now;
1454    }
1455    else if( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
1456    {
1457        sendKeepalive( msgs );
1458    }
1459
1460    if( !EVBUFFER_LENGTH( msgs->outBlock )
1461        && (( r = popNextRequest( msgs )))
1462        && requestIsValid( msgs, r )
1463        && tr_cpPieceIsComplete( msgs->torrent->completion, r->index ) )
1464    {
1465        uint8_t * buf = tr_new( uint8_t, r->length );
1466
1467        if( !tr_ioRead( msgs->torrent, r->index, r->offset, r->length, buf ) )
1468        {
1469            tr_peerIo * io = msgs->io;
1470            struct evbuffer * out = msgs->outBlock;
1471
1472            dbgmsg( msgs, "sending block %u:%u->%u", r->index, r->offset, r->length );
1473            tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 2*sizeof(uint32_t) + r->length );
1474            tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1475            tr_peerIoWriteUint32( io, out, r->index );
1476            tr_peerIoWriteUint32( io, out, r->offset );
1477            tr_peerIoWriteBytes ( io, out, buf, r->length );
1478        }
1479
1480        tr_free( buf );
1481        tr_free( r );
1482
1483        pulse( msgs ); /* start sending it right away */
1484    }
1485
1486    return TRUE; /* loop forever */
1487}
1488
1489static void
1490didWrite( struct bufferevent * evin UNUSED, void * vmsgs )
1491{
1492    pulse( vmsgs );
1493}
1494
1495static void
1496gotError( struct bufferevent * evbuf UNUSED, short what, void * vmsgs )
1497{
1498    dbgmsg( vmsgs, "libevent got an error! what=%d, errno=%d (%s)",
1499            (int)what, errno, strerror(errno) );
1500    fireGotError( vmsgs );
1501}
1502
1503static void
1504sendBitfield( tr_peermsgs * msgs )
1505{
1506    const tr_bitfield * bitfield = tr_cpPieceBitfield( msgs->torrent->completion );
1507    struct evbuffer * out = msgs->outMessages;
1508
1509    dbgmsg( msgs, "sending peer a bitfield message" );
1510    tr_peerIoWriteUint32( msgs->io, out, sizeof(uint8_t) + bitfield->len );
1511    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1512    tr_peerIoWriteBytes ( msgs->io, out, bitfield->bits, bitfield->len );
1513}
1514
1515/**
1516***
1517**/
1518
1519/* some peers give us error messages if we send
1520   more than this many peers in a single pex message */
1521#define MAX_PEX_DIFFS 200
1522
1523typedef struct
1524{
1525    tr_pex * added;
1526    tr_pex * dropped;
1527    tr_pex * elements;
1528    int addedCount;
1529    int droppedCount;
1530    int elementCount;
1531    int diffCount;
1532}
1533PexDiffs;
1534
1535static void
1536pexAddedCb( void * vpex, void * userData )
1537{
1538    PexDiffs * diffs = (PexDiffs *) userData;
1539    tr_pex * pex = (tr_pex *) vpex;
1540    if( diffs->diffCount < MAX_PEX_DIFFS )
1541    {
1542        diffs->diffCount++;
1543        diffs->added[diffs->addedCount++] = *pex;
1544        diffs->elements[diffs->elementCount++] = *pex;
1545    }
1546}
1547
1548static void
1549pexRemovedCb( void * vpex, void * userData )
1550{
1551    PexDiffs * diffs = (PexDiffs *) userData;
1552    tr_pex * pex = (tr_pex *) vpex;
1553    if( diffs->diffCount < MAX_PEX_DIFFS )
1554    {
1555        diffs->diffCount++;
1556        diffs->dropped[diffs->droppedCount++] = *pex;
1557    }
1558}
1559
1560static void
1561pexElementCb( void * vpex, void * userData )
1562{
1563    PexDiffs * diffs = (PexDiffs *) userData;
1564    tr_pex * pex = (tr_pex *) vpex;
1565    if( diffs->diffCount < MAX_PEX_DIFFS )
1566    {
1567        diffs->diffCount++;
1568        diffs->elements[diffs->elementCount++] = *pex;
1569    }
1570}
1571
1572static void
1573sendPex( tr_peermsgs * msgs )
1574{
1575    if( msgs->peerSupportsPex && tr_torrentIsPexEnabled( msgs->torrent ) )
1576    {
1577        int i;
1578        tr_pex * newPex = NULL;
1579        const int newCount = tr_peerMgrGetPeers( msgs->handle->peerMgr, msgs->torrent->info.hash, &newPex );
1580        PexDiffs diffs;
1581        benc_val_t val, *added, *dropped, *flags;
1582        uint8_t *tmp, *walk;
1583        char * benc;
1584        int bencLen;
1585
1586        /* build the diffs */
1587        diffs.added = tr_new( tr_pex, newCount );
1588        diffs.addedCount = 0;
1589        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1590        diffs.droppedCount = 0;
1591        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1592        diffs.elementCount = 0;
1593        diffs.diffCount = 0;
1594        tr_set_compare( msgs->pex, msgs->pexCount,
1595                        newPex, newCount,
1596                        tr_pexCompare, sizeof(tr_pex),
1597                        pexRemovedCb, pexAddedCb, pexElementCb, &diffs );
1598        dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1599
1600        /* update peer */
1601        tr_free( msgs->pex );
1602        msgs->pex = diffs.elements;
1603        msgs->pexCount = diffs.elementCount;
1604
1605        /* build the pex payload */
1606        tr_bencInit( &val, TYPE_DICT );
1607        tr_bencDictReserve( &val, 3 );
1608
1609        /* "added" */
1610        added = tr_bencDictAdd( &val, "added" );
1611        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1612        for( i=0; i<diffs.addedCount; ++i ) {
1613            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1614            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1615        }
1616        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1617        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
1618
1619        /* "added.f" */
1620        flags = tr_bencDictAdd( &val, "added.f" );
1621        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1622        for( i=0; i<diffs.addedCount; ++i )
1623            *walk++ = diffs.added[i].flags;
1624        assert( ( walk - tmp ) == diffs.addedCount );
1625        tr_bencInitStr( flags, tmp, walk-tmp, FALSE );
1626
1627        /* "dropped" */
1628        dropped = tr_bencDictAdd( &val, "dropped" );
1629        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1630        for( i=0; i<diffs.droppedCount; ++i ) {
1631            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1632            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1633        }
1634        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1635        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
1636
1637        /* write the pex message */
1638        benc = tr_bencSave( &val, &bencLen );
1639        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 2*sizeof(uint8_t) + bencLen );
1640        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_LTEP );
1641        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, OUR_LTEP_PEX );
1642        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, benc, bencLen );
1643
1644        /* cleanup */
1645        tr_free( benc );
1646        tr_bencFree( &val );
1647        tr_free( diffs.added );
1648        tr_free( diffs.dropped );
1649        tr_free( newPex );
1650
1651        msgs->clientSentPexAt = time( NULL );
1652    }
1653}
1654
1655static int
1656pexPulse( void * vpeer )
1657{
1658    sendPex( vpeer );
1659    return TRUE;
1660}
1661
1662/**
1663***
1664**/
1665
1666tr_peermsgs*
1667tr_peerMsgsNew( struct tr_torrent * torrent,
1668                struct tr_peer    * info,
1669                tr_delivery_func    func,
1670                void              * userData,
1671                tr_publisher_tag  * setme )
1672{
1673    tr_peermsgs * m;
1674
1675    assert( info != NULL );
1676    assert( info->io != NULL );
1677
1678    m = tr_new0( tr_peermsgs, 1 );
1679    m->publisher = tr_publisherNew( );
1680    m->info = info;
1681    m->handle = torrent->handle;
1682    m->torrent = torrent;
1683    m->io = info->io;
1684    m->info->clientIsChoked = 1;
1685    m->info->peerIsChoked = 1;
1686    m->info->clientIsInterested = 0;
1687    m->info->peerIsInterested = 0;
1688    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1689    m->pulseTimer = tr_timerNew( m->handle, pulse, m, PEER_PULSE_INTERVAL );
1690    m->rateTimer = tr_timerNew( m->handle, ratePulse, m, RATE_PULSE_INTERVAL );
1691    m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL );
1692    m->outMessages = evbuffer_new( );
1693    m->outBlock = evbuffer_new( );
1694    m->peerAllowedPieces = NULL;
1695    m->clientAllowedPieces = NULL;
1696    m->clientSuggestedPieces = NULL;
1697    *setme = tr_publisherSubscribe( m->publisher, func, userData );
1698   
1699    if ( tr_peerIoSupportsFEXT( m->io ) )
1700    {
1701        /* This peer is fastpeer-enabled, generate its allowed set
1702         * (before registering our callbacks) */
1703        if ( !m->peerAllowedPieces ) {
1704            const struct in_addr *peerAddr = tr_peerIoGetAddress( m->io, NULL );
1705           
1706            m->peerAllowedPieces = tr_peerMgrGenerateAllowedSet( MAX_FAST_ALLOWED_COUNT,
1707                                                                 m->torrent->info.pieceCount,
1708                                                                 m->torrent->info.hash,
1709                                                                 peerAddr );
1710        }
1711        m->clientAllowedPieces = tr_bitfieldNew( m->torrent->info.pieceCount );
1712       
1713        m->clientSuggestedPieces = tr_bitfieldNew( m->torrent->info.pieceCount );
1714    }
1715   
1716    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of inactivity */
1717    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
1718    tr_peerIoSetIOMode( m->io, EV_READ|EV_WRITE, 0 );
1719    ratePulse( m );
1720
1721    if ( tr_peerIoSupportsLTEP( m->io ) )
1722        sendLtepHandshake( m );
1723
1724    if ( !tr_peerIoSupportsFEXT( m->io ) )
1725        sendBitfield( m );
1726    else {
1727        /* This peer is fastpeer-enabled, send it have-all or have-none if appropriate */
1728        uint32_t peerProgress;
1729        float completion = tr_cpPercentComplete( m->torrent->completion );
1730        if ( completion == 0.0f ) {
1731            sendFastHave( m, 0 );
1732        } else if ( completion == 1.0f ) {
1733            sendFastHave( m, 1 );
1734        } else {
1735            sendBitfield( m );
1736        }
1737        peerProgress = m->torrent->info.pieceCount * m->info->progress;
1738       
1739        if ( peerProgress < MAX_FAST_ALLOWED_COUNT )
1740            sendFastAllowedSet( m );
1741    }
1742
1743    return m;
1744}
1745
1746void
1747tr_peerMsgsFree( tr_peermsgs* msgs )
1748{
1749    if( msgs != NULL )
1750    {
1751        tr_timerFree( &msgs->pulseTimer );
1752        tr_timerFree( &msgs->rateTimer );
1753        tr_timerFree( &msgs->pexTimer );
1754        tr_publisherFree( &msgs->publisher );
1755        tr_list_free( &msgs->clientWillAskFor, tr_free );
1756        tr_list_free( &msgs->clientAskedFor, tr_free );
1757        tr_list_free( &msgs->peerAskedForFast, tr_free );
1758        tr_list_free( &msgs->peerAskedFor, tr_free );
1759        evbuffer_free( msgs->outMessages );
1760        evbuffer_free( msgs->outBlock );
1761        tr_free( msgs->pex );
1762        msgs->pexCount = 0;
1763        tr_free( msgs );
1764    }
1765}
1766
1767tr_publisher_tag
1768tr_peerMsgsSubscribe( tr_peermsgs       * peer,
1769                      tr_delivery_func    func,
1770                      void              * userData )
1771{
1772    return tr_publisherSubscribe( peer->publisher, func, userData );
1773}
1774
1775void
1776tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
1777                        tr_publisher_tag    tag )
1778{
1779    tr_publisherUnsubscribe( peer->publisher, tag );
1780}
1781
1782int
1783tr_peerMsgsIsPieceFastAllowed( const tr_peermsgs * peer,
1784                               uint32_t            index )
1785{
1786    return tr_bitfieldHas( peer->clientAllowedPieces, index );
1787}
1788
1789int
1790tr_peerMsgsIsPieceSuggested( const tr_peermsgs * peer,
1791                             uint32_t            index )
1792{
1793    return tr_bitfieldHas( peer->clientSuggestedPieces, index );
1794}
1795
Note: See TracBrowser for help on using the repository browser.