source: trunk/libtransmission/peer-msgs.c @ 3416

Last change on this file since 3416 was 3416, checked in by charles, 15 years ago

fix choke fibrillation bug in new connections

  • Property svn:keywords set to Date Rev Author Id
File size: 50.7 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 3416 2007-10-15 16:01:42Z charles $
11 */
12
13#include <assert.h>
14#include <ctype.h>
15#include <errno.h>
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <arpa/inet.h>
21
22#include <sys/types.h> /* event.h needs this */
23#include <event.h>
24
25#include "transmission.h"
26#include "bencode.h"
27#include "completion.h"
28#include "inout.h"
29#include "list.h"
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-mgr-private.h"
33#include "peer-msgs.h"
34#include "ratecontrol.h"
35#include "trevent.h"
36#include "utils.h"
37
38/**
39***
40**/
41
42#define MAX_ALLOWED_SET_COUNT   10 /* number of pieces generated for allow-fast,
43                                      threshold for fast-allowing others */
44
45enum
46{
47    BT_CHOKE                = 0,
48    BT_UNCHOKE              = 1,
49    BT_INTERESTED           = 2,
50    BT_NOT_INTERESTED       = 3,
51    BT_HAVE                 = 4,
52    BT_BITFIELD             = 5,
53    BT_REQUEST              = 6,
54    BT_PIECE                = 7,
55    BT_CANCEL               = 8,
56    BT_PORT                 = 9,
57    BT_SUGGEST              = 13,
58    BT_HAVE_ALL             = 14,
59    BT_HAVE_NONE            = 15,
60    BT_REJECT               = 16,
61    BT_ALLOWED_FAST         = 17,
62    BT_LTEP                 = 20,
63
64    LTEP_HANDSHAKE          = 0,
65
66    OUR_LTEP_PEX            = 1,
67
68    MAX_REQUEST_BYTE_COUNT  = (16 * 1024), /* drop requests who want too much */
69
70    KEEPALIVE_INTERVAL_SECS = 90,          /* idle seconds before we send a keepalive */
71    PEX_INTERVAL            = (60 * 1000), /* msec between calls to sendPex() */
72    PEER_PULSE_INTERVAL     = (133),        /* msec between calls to pulse() */
73    RATE_PULSE_INTERVAL     = (333),       /* msec between calls to ratePulse() */
74};
75
76enum
77{
78    AWAITING_BT_LENGTH,
79    AWAITING_BT_MESSAGE,
80    READING_BT_PIECE
81};
82
83struct peer_request
84{
85    uint32_t index;
86    uint32_t offset;
87    uint32_t length;
88    time_t time_requested;
89};
90
91static int
92compareRequest( const void * va, const void * vb )
93{
94    struct peer_request * a = (struct peer_request*) va;
95    struct peer_request * b = (struct peer_request*) vb;
96    if( a->index != b->index ) return a->index - b->index;
97    if( a->offset != b->offset ) return a->offset - b->offset;
98    if( a->length != b->length ) return a->length - b->length;
99    return 0;
100}
101
102struct tr_peermsgs
103{
104    tr_peer * info;
105
106    tr_handle * handle;
107    tr_torrent * torrent;
108    tr_peerIo * io;
109
110    tr_publisher_t * publisher;
111
112    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
113    struct evbuffer * inBlock;     /* the block we're currently receiving */
114    tr_list * peerAskedFor;
115    tr_list * clientAskedFor;
116    tr_list * clientWillAskFor;
117
118    tr_timer * rateTimer;
119    tr_timer * pulseTimer;
120    tr_timer * pexTimer;
121
122    struct peer_request blockToUs; /* the block currntly being sent to us */
123
124    time_t lastReqAddedAt;
125    time_t clientSentPexAt;
126    time_t clientSentAnythingAt;
127
128    unsigned int notListening             : 1;
129    unsigned int peerSupportsPex          : 1;
130    unsigned int clientSentLtepHandshake  : 1;
131    unsigned int peerSentLtepHandshake    : 1;
132   
133    tr_bitfield * clientAllowedPieces;
134    tr_bitfield * peerAllowedPieces;
135   
136    uint8_t state;
137    uint8_t ut_pex_id;
138    uint16_t pexCount;
139    uint32_t incomingMessageLength;
140    uint32_t maxActiveRequests;
141    uint32_t minActiveRequests;
142
143    tr_pex * pex;
144};
145
146/**
147***
148**/
149
150static void
151myDebug( const char * file, int line,
152         const struct tr_peermsgs * msgs,
153         const char * fmt, ... )
154{
155    FILE * fp = tr_getLog( );
156    if( fp != NULL )
157    {
158        va_list args;
159        struct evbuffer * buf = evbuffer_new( );
160        char timestr[64];
161        evbuffer_add_printf( buf, "[%s] %s [%s]: ",
162                             tr_getLogTimeStr( timestr, sizeof(timestr) ),
163                             tr_peerIoGetAddrStr( msgs->io ),
164                             msgs->info->client );
165        va_start( args, fmt );
166        evbuffer_add_vprintf( buf, fmt, args );
167        va_end( args );
168        evbuffer_add_printf( buf, " (%s:%d)\n", file, line );
169
170        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
171        evbuffer_free( buf );
172    }
173}
174
175#define dbgmsg(msgs, fmt...) myDebug(__FILE__, __LINE__, msgs, ##fmt )
176
177/**
178***
179**/
180
181static void
182protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
183{
184    tr_peerIo * io = msgs->io;
185    struct evbuffer * out = msgs->outMessages;
186
187    dbgmsg( msgs, "requesting %u:%u->%u", req->index, req->offset, req->length );
188    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
189    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
190    tr_peerIoWriteUint32( io, out, req->index );
191    tr_peerIoWriteUint32( io, out, req->offset );
192    tr_peerIoWriteUint32( io, out, req->length );
193}
194
195static void
196protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
197{
198    tr_peerIo * io = msgs->io;
199    struct evbuffer * out = msgs->outMessages;
200
201    dbgmsg( msgs, "cancelling %u:%u->%u", req->index, req->offset, req->length );
202    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
203    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
204    tr_peerIoWriteUint32( io, out, req->index );
205    tr_peerIoWriteUint32( io, out, req->offset );
206    tr_peerIoWriteUint32( io, out, req->length );
207}
208
209static void
210protocolSendHave( tr_peermsgs * msgs, uint32_t index )
211{
212    tr_peerIo * io = msgs->io;
213    struct evbuffer * out = msgs->outMessages;
214
215    dbgmsg( msgs, "sending Have %u", index );
216    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
217    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
218    tr_peerIoWriteUint32( io, out, index );
219}
220
221static void
222protocolSendChoke( tr_peermsgs * msgs, int choke )
223{
224    tr_peerIo * io = msgs->io;
225    struct evbuffer * out = msgs->outMessages;
226
227    dbgmsg( msgs, "sending %s", (choke ? "Choke" : "Unchoke") );
228    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) );
229    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
230}
231
232static void
233protocolSendPiece( tr_peermsgs                * msgs,
234                   const struct peer_request  * r,
235                   const uint8_t              * pieceData )
236{
237    tr_peerIo * io = msgs->io;
238    struct evbuffer * out = evbuffer_new( );
239
240    dbgmsg( msgs, "sending block %u:%u->%u", r->index, r->offset, r->length );
241    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 2*sizeof(uint32_t) + r->length );
242    tr_peerIoWriteUint8 ( io, out, BT_PIECE );
243    tr_peerIoWriteUint32( io, out, r->index );
244    tr_peerIoWriteUint32( io, out, r->offset );
245    tr_peerIoWriteBytes ( io, out, pieceData, r->length );
246    tr_peerIoWriteBuf   ( io, out );
247
248    evbuffer_free( out );
249}
250
251/**
252***  EVENTS
253**/
254
255static const tr_peermsgs_event blankEvent = { 0, 0, 0, 0, 0.0f };
256
257static void
258publish( tr_peermsgs * msgs, tr_peermsgs_event * e )
259{
260    tr_publisherPublish( msgs->publisher, msgs->info, e );
261}
262
263static void
264fireGotError( tr_peermsgs * msgs )
265{
266    tr_peermsgs_event e = blankEvent;
267    e.eventType = TR_PEERMSG_GOT_ERROR;
268    publish( msgs, &e );
269}
270
271static void
272fireNeedReq( tr_peermsgs * msgs )
273{
274    tr_peermsgs_event e = blankEvent;
275    e.eventType = TR_PEERMSG_NEED_REQ;
276    publish( msgs, &e );
277}
278
279static void
280firePeerProgress( tr_peermsgs * msgs )
281{
282    tr_peermsgs_event e = blankEvent;
283    e.eventType = TR_PEERMSG_PEER_PROGRESS;
284    e.progress = msgs->info->progress;
285    publish( msgs, &e );
286}
287
288static void
289fireClientHave( tr_peermsgs * msgs, uint32_t pieceIndex )
290{
291    tr_peermsgs_event e = blankEvent;
292    e.eventType = TR_PEERMSG_CLIENT_HAVE;
293    e.pieceIndex = pieceIndex;
294    publish( msgs, &e );
295}
296
297static void
298fireGotBlock( tr_peermsgs * msgs, uint32_t pieceIndex, uint32_t offset, uint32_t length )
299{
300    tr_peermsgs_event e = blankEvent;
301    e.eventType = TR_PEERMSG_CLIENT_BLOCK;
302    e.pieceIndex = pieceIndex;
303    e.offset = offset;
304    e.length = length;
305    publish( msgs, &e );
306}
307
308static void
309fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
310{
311    tr_peermsgs_event e = blankEvent;
312    e.eventType = TR_PEERMSG_CANCEL;
313    e.pieceIndex = req->index;
314    e.offset = req->offset;
315    e.length = req->length;
316    publish( msgs, &e );
317}
318
319/**
320***  INTEREST
321**/
322
323static int
324isPieceInteresting( const tr_peermsgs   * peer,
325                    int                   piece )
326{
327    const tr_torrent * torrent = peer->torrent;
328    if( torrent->info.pieces[piece].dnd ) /* we don't want it */
329        return FALSE;
330    if( tr_cpPieceIsComplete( torrent->completion, piece ) ) /* we have it */
331        return FALSE;
332    if( !tr_bitfieldHas( peer->info->have, piece ) ) /* peer doesn't have it */
333        return FALSE;
334    if( tr_bitfieldHas( peer->info->banned, piece ) ) /* peer is banned */
335        return FALSE;
336    return TRUE;
337}
338
339/* "interested" means we'll ask for piece data if they unchoke us */
340static int
341isPeerInteresting( const tr_peermsgs * msgs )
342{
343    int i;
344    const tr_torrent * torrent;
345    const tr_bitfield * bitfield;
346    const int clientIsSeed =
347        tr_cpGetStatus( msgs->torrent->completion ) != TR_CP_INCOMPLETE;
348
349    if( clientIsSeed )
350        return FALSE;
351
352    torrent = msgs->torrent;
353    bitfield = tr_cpPieceBitfield( torrent->completion );
354
355    if( !msgs->info->have )
356        return TRUE;
357
358    assert( bitfield->len == msgs->info->have->len );
359    for( i=0; i<torrent->info.pieceCount; ++i )
360        if( isPieceInteresting( msgs, i ) )
361            return TRUE;
362
363    return FALSE;
364}
365
366static void
367sendInterest( tr_peermsgs * msgs, int weAreInterested )
368{
369    assert( msgs != NULL );
370    assert( weAreInterested==0 || weAreInterested==1 );
371
372    msgs->info->clientIsInterested = weAreInterested;
373    dbgmsg( msgs, "Sending %s",
374            weAreInterested ? "Interested" : "Not Interested");
375
376    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
377    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages,
378                   weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
379}
380
381static void
382updateInterest( tr_peermsgs * msgs )
383{
384    const int i = isPeerInteresting( msgs );
385    if( i != msgs->info->clientIsInterested )
386        sendInterest( msgs, i );
387    if( i )
388        fireNeedReq( msgs );
389}
390
391#define MIN_CHOKE_PERIOD_SEC 10
392
393int
394tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
395{
396    int ret;
397    const time_t fibrillationTime = time(NULL) - MIN_CHOKE_PERIOD_SEC;
398
399    assert( msgs != NULL );
400    assert( msgs->info != NULL );
401    assert( choke==0 || choke==1 );
402
403    if( msgs->info->chokeChangedAt > fibrillationTime )
404    {
405        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
406        ret = FALSE;
407    }
408    else if( msgs->info->peerIsChoked != choke )
409    {
410        msgs->info->peerIsChoked = choke;
411       
412        if( choke )
413        {
414            tr_list * walk;
415            for( walk = msgs->peerAskedFor; walk != NULL; )
416            {
417                tr_list * next = walk->next;
418                /* don't reject a peer's fast allowed requests at choke */
419                struct peer_request *req = walk->data;
420                if ( !tr_bitfieldHas( msgs->peerAllowedPieces, req->index ) )
421                {
422                    tr_list_remove_data( &msgs->peerAskedFor, req );
423                    tr_free( req );
424                }
425                walk = next;
426            }
427        }
428
429        protocolSendChoke( msgs, choke );
430        msgs->info->chokeChangedAt = time( NULL );
431        ret = TRUE;
432    }
433
434    return ret;
435}
436
437/**
438***
439**/
440
441void
442tr_peerMsgsHave( tr_peermsgs * msgs,
443                 uint32_t      index )
444{
445    protocolSendHave( msgs, index );
446
447    /* since we have more pieces now, we might not be interested in this peer */
448    updateInterest( msgs );
449}
450#if 0
451static void
452sendFastSuggest( tr_peermsgs * msgs,
453                 uint32_t      pieceIndex )
454{
455    dbgmsg( msgs, "w00t SUGGESTing them piece #%d", pieceIndex );
456    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
457    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
458    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
459   
460    updateInterest( msgs );
461}
462#endif
463static void
464sendFastHave( tr_peermsgs * msgs,
465              int           all)
466{
467    dbgmsg( msgs, "w00t telling them we %s pieces", (all ? "HAVE_ALL" : "HAVE_NONE" ) );
468    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
469    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, ( all ? BT_HAVE_ALL : BT_HAVE_NONE ) );
470   
471    updateInterest( msgs );
472}
473
474static void
475sendFastReject( tr_peermsgs * msgs,
476                uint32_t      pieceIndex,
477                uint32_t      offset,
478                uint32_t      length )
479{
480    assert( msgs != NULL );
481    assert( length > 0 );
482   
483    /* reject the request */
484    const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
485    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
486    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_REJECT );
487    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
488    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
489    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
490}
491
492static void
493sendFastAllowed( tr_peermsgs * msgs,
494                 uint32_t      pieceIndex)
495{
496    dbgmsg( msgs, "w00t telling them we ALLOW_FAST piece #%d", pieceIndex );
497    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
498    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_ALLOWED_FAST );
499    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
500}
501
502
503
504static void
505sendFastAllowedSet( tr_peermsgs * msgs )
506{
507    int i = 0;
508    while (i <= msgs->torrent->info.pieceCount )
509    {
510        if ( tr_bitfieldHas( msgs->peerAllowedPieces, i) )
511            sendFastAllowed( msgs, i );
512        i++;
513    }
514}
515
516
517/**
518***
519**/
520
521static int
522reqIsValid( const tr_peermsgs * msgs, uint32_t index, uint32_t offset, uint32_t length )
523{
524    const tr_torrent * tor = msgs->torrent;
525
526    if( index >= (uint32_t) tor->info.pieceCount )
527        return FALSE;
528    if ( (int)offset >= tr_torPieceCountBytes( tor, (int)index ) )
529        return FALSE;
530    if( length > MAX_REQUEST_BYTE_COUNT )
531        return FALSE;
532    if( tr_pieceOffset( tor, index, offset, length ) > tor->info.totalSize )
533        return FALSE;
534
535    return TRUE;
536}
537
538static int
539requestIsValid( const tr_peermsgs * msgs, struct peer_request * req )
540{
541    return reqIsValid( msgs, req->index, req->offset, req->length );
542}
543
544static void
545pumpRequestQueue( tr_peermsgs * msgs )
546{
547    const int max = msgs->maxActiveRequests;
548    const int min = msgs->minActiveRequests;
549    int count = tr_list_size( msgs->clientAskedFor );
550    int sent = 0;
551
552    if( count > min )
553        return;
554    if( msgs->info->clientIsChoked )
555        return;
556
557    while( ( count < max ) && ( msgs->clientWillAskFor != NULL ) )
558    {
559        struct peer_request * req = tr_list_pop_front( &msgs->clientWillAskFor );
560        protocolSendRequest( msgs, req );
561        req->time_requested = msgs->lastReqAddedAt = time( NULL );
562        tr_list_append( &msgs->clientAskedFor, req );
563        ++count;
564        ++sent;
565    }
566
567    if( sent )
568        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
569                sent,
570                tr_list_size(msgs->clientAskedFor),
571                tr_list_size(msgs->clientWillAskFor) );
572
573    if( count < max )
574        fireNeedReq( msgs );
575}
576
577int
578tr_peerMsgsAddRequest( tr_peermsgs * msgs,
579                       uint32_t      index, 
580                       uint32_t      offset, 
581                       uint32_t      length )
582{
583    const int req_max = msgs->maxActiveRequests;
584    struct peer_request tmp, *req;
585
586    assert( msgs != NULL );
587    assert( msgs->torrent != NULL );
588    assert( reqIsValid( msgs, index, offset, length ) );
589
590    /**
591    ***  Reasons to decline the request
592    **/
593
594    /* don't send requests to choked clients */
595    if( msgs->info->clientIsChoked ) {
596        dbgmsg( msgs, "declining request because they're choking us" );
597        return TR_ADDREQ_CLIENT_CHOKED;
598    }
599
600    /* peer doesn't have this piece */
601    if( !tr_bitfieldHas( msgs->info->have, index ) )
602        return TR_ADDREQ_MISSING;
603
604    /* peer's queue is full */
605    if( tr_list_size( msgs->clientWillAskFor ) >= req_max ) {
606        dbgmsg( msgs, "declining request because we're full" );
607        return TR_ADDREQ_FULL;
608    }
609
610    /* have we already asked for this piece? */
611    tmp.index = index;
612    tmp.offset = offset;
613    tmp.length = length;
614    if( tr_list_find( msgs->clientAskedFor, &tmp, compareRequest ) ) {
615        dbgmsg( msgs, "declining because it's a duplicate" );
616        return TR_ADDREQ_DUPLICATE;
617    }
618    if( tr_list_find( msgs->clientWillAskFor, &tmp, compareRequest ) ) {
619        dbgmsg( msgs, "declining because it's a duplicate" );
620        return TR_ADDREQ_DUPLICATE;
621    }
622
623    /**
624    ***  Accept this request
625    **/
626
627    dbgmsg( msgs, "added req for piece %d, offset %d", (int)index, (int)offset );
628    req = tr_new0( struct peer_request, 1 );
629    *req = tmp;
630    tr_list_append( &msgs->clientWillAskFor, req );
631    return TR_ADDREQ_OK;
632}
633
634static void
635tr_peerMsgsCancelAllRequests( tr_peermsgs * msgs )
636{
637    struct peer_request * req;
638
639    while(( req = tr_list_pop_front( &msgs->clientWillAskFor ) ))
640    {
641        fireCancelledReq( msgs, req );
642        tr_free( req );
643    }
644
645    while(( req = tr_list_pop_front( &msgs->clientAskedFor ) ))
646    {
647        fireCancelledReq( msgs, req );
648        protocolSendCancel( msgs, req );
649        tr_free( req );
650    }
651}
652
653void
654tr_peerMsgsCancel( tr_peermsgs * msgs,
655                   uint32_t      pieceIndex,
656                   uint32_t      offset,
657                   uint32_t      length )
658{
659    struct peer_request *req, tmp;
660
661    assert( msgs != NULL );
662    assert( length > 0 );
663
664    /* have we asked the peer for this piece? */
665    tmp.index = pieceIndex;
666    tmp.offset = offset;
667    tmp.length = length;
668
669    /* if it's only in the queue and hasn't been sent yet, free it */
670    if(( req = tr_list_remove( &msgs->clientWillAskFor, &tmp, compareRequest ) ))
671    {
672        fireCancelledReq( msgs, req );
673        tr_free( req );
674    }
675
676    /* if it's already been sent, send a cancel message too */
677    if(( req = tr_list_remove( &msgs->clientAskedFor, &tmp, compareRequest ) ))
678    {
679        protocolSendCancel( msgs, req );
680        fireCancelledReq( msgs, req );
681        tr_free( req );
682    }
683}
684
685/**
686***
687**/
688
689static void
690sendLtepHandshake( tr_peermsgs * msgs )
691{
692    benc_val_t val, *m;
693    char * buf;
694    int len;
695    int pex;
696    const char * v = TR_NAME " " USERAGENT_PREFIX;
697    const int port = tr_getPublicPort( msgs->handle );
698    struct evbuffer * outbuf;
699
700    if( msgs->clientSentLtepHandshake )
701        return;
702
703    outbuf = evbuffer_new( );
704    dbgmsg( msgs, "sending an ltep handshake" );
705    msgs->clientSentLtepHandshake = 1;
706
707    /* decide if we want to advertise pex support */
708    if( !tr_torrentIsPexEnabled( msgs->torrent ) )
709        pex = 0;
710    else if( msgs->peerSentLtepHandshake )
711        pex = msgs->peerSupportsPex ? 1 : 0;
712    else
713        pex = 1;
714
715    tr_bencInit( &val, TYPE_DICT );
716    tr_bencDictReserve( &val, 4 );
717    tr_bencInitInt( tr_bencDictAdd( &val, "e" ), 1 );
718    m  = tr_bencDictAdd( &val, "m" );
719    tr_bencInit( m, TYPE_DICT );
720    if( pex ) {
721        tr_bencDictReserve( m, 1 );
722        tr_bencInitInt( tr_bencDictAdd( m, "ut_pex" ), OUR_LTEP_PEX );
723    }
724    if( port > 0 )
725        tr_bencInitInt( tr_bencDictAdd( &val, "p" ), port );
726    tr_bencInitStr( tr_bencDictAdd( &val, "v" ), v, 0, 1 );
727    buf = tr_bencSaveMalloc( &val,  &len );
728
729    tr_peerIoWriteUint32( msgs->io, outbuf, 2*sizeof(uint8_t) + len );
730    tr_peerIoWriteUint8 ( msgs->io, outbuf, BT_LTEP );
731    tr_peerIoWriteUint8 ( msgs->io, outbuf, LTEP_HANDSHAKE );
732    tr_peerIoWriteBytes ( msgs->io, outbuf, buf, len );
733
734    tr_peerIoWriteBuf( msgs->io, outbuf );
735
736#if 0
737    dbgmsg( msgs, "here is the ltep handshake we sent:" );
738    tr_bencPrint( &val );
739#endif
740
741    /* cleanup */
742    tr_bencFree( &val );
743    tr_free( buf );
744    evbuffer_free( outbuf );
745}
746
747static void
748parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
749{
750    benc_val_t val, * sub;
751    uint8_t * tmp = tr_new( uint8_t, len );
752
753    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
754    msgs->peerSentLtepHandshake = 1;
755
756    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
757        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
758        tr_free( tmp );
759        return;
760    }
761
762#if 0
763    dbgmsg( msgs, "here is the ltep handshake we read:" );
764    tr_bencPrint( &val );
765#endif
766
767    /* does the peer prefer encrypted connections? */
768    sub = tr_bencDictFind( &val, "e" );
769    if( tr_bencIsInt( sub ) )
770        msgs->info->encryption_preference = sub->val.i
771                                      ? ENCRYPTION_PREFERENCE_YES
772                                      : ENCRYPTION_PREFERENCE_NO;
773
774    /* check supported messages for utorrent pex */
775    sub = tr_bencDictFind( &val, "m" );
776    if( tr_bencIsDict( sub ) ) {
777        sub = tr_bencDictFind( sub, "ut_pex" );
778        if( tr_bencIsInt( sub ) ) {
779            msgs->peerSupportsPex = 1;
780            msgs->ut_pex_id = (uint8_t) sub->val.i;
781            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
782        }
783    }
784
785    /* get peer's listening port */
786    sub = tr_bencDictFind( &val, "p" );
787    if( tr_bencIsInt( sub ) ) {
788        msgs->info->port = htons( (uint16_t)sub->val.i );
789        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
790    }
791
792    tr_bencFree( &val );
793    tr_free( tmp );
794}
795
796static void
797parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
798{
799    benc_val_t val, * sub;
800    uint8_t * tmp;
801
802    if( !tr_torrentIsPexEnabled( msgs->torrent ) ) /* no sharing! */
803        return;
804
805    tmp = tr_new( uint8_t, msglen );
806    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
807
808    if( tr_bencLoad( tmp, msglen, &val, NULL ) || !tr_bencIsDict( &val ) ) {
809        dbgmsg( msgs, "GET can't read extended-pex dictionary" );
810        tr_free( tmp );
811        return;
812    }
813
814    sub = tr_bencDictFind( &val, "added" );
815    if( tr_bencIsStr(sub) && ((sub->val.s.i % 6) == 0)) {
816        const int n = sub->val.s.i / 6 ;
817        dbgmsg( msgs, "got %d peers from uT pex", n );
818        tr_peerMgrAddPeers( msgs->handle->peerMgr,
819                            msgs->torrent->info.hash,
820                            TR_PEER_FROM_PEX,
821                            (uint8_t*)sub->val.s.s, n );
822    }
823
824    tr_bencFree( &val );
825    tr_free( tmp );
826}
827
828static void
829sendPex( tr_peermsgs * msgs );
830
831static void
832parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
833{
834    uint8_t ltep_msgid;
835
836    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
837    msglen--;
838
839    if( ltep_msgid == LTEP_HANDSHAKE )
840    {
841        dbgmsg( msgs, "got ltep handshake" );
842        parseLtepHandshake( msgs, msglen, inbuf );
843        sendLtepHandshake( msgs );
844        sendPex( msgs );
845    }
846    else if( ltep_msgid == msgs->ut_pex_id )
847    {
848        dbgmsg( msgs, "got ut pex" );
849        msgs->peerSupportsPex = 1;
850        parseUtPex( msgs, msglen, inbuf );
851    }
852    else
853    {
854        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
855        evbuffer_drain( inbuf, msglen );
856    }
857}
858
859static int
860readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf )
861{
862    uint32_t len;
863    const size_t needlen = sizeof(uint32_t);
864
865    if( EVBUFFER_LENGTH(inbuf) < needlen )
866        return READ_MORE;
867
868    tr_peerIoReadUint32( msgs->io, inbuf, &len );
869
870    if( len == 0 ) /* peer sent us a keepalive message */
871        dbgmsg( msgs, "got KeepAlive" );
872    else {
873        msgs->incomingMessageLength = len;
874        msgs->state = AWAITING_BT_MESSAGE;
875    }
876
877    return READ_AGAIN;
878}
879
880static void
881updatePeerProgress( tr_peermsgs * msgs )
882{
883    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
884    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
885    updateInterest( msgs );
886    firePeerProgress( msgs );
887}
888
889static int
890readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf )
891{
892    uint8_t id;
893    uint32_t ui32;
894    uint32_t msglen = msgs->incomingMessageLength;
895
896    if( EVBUFFER_LENGTH(inbuf) < msglen )
897        return READ_MORE;
898
899    tr_peerIoReadUint8( msgs->io, inbuf, &id );
900    msglen--;
901    dbgmsg( msgs, "got BT id %d, len %d", (int)id, (int)msglen );
902
903    switch( id )
904    {
905        case BT_CHOKE:
906            dbgmsg( msgs, "got Choke" );
907            assert( msglen == 0 );
908            msgs->info->clientIsChoked = 1;
909#if 0
910            tr_list * walk;
911            for( walk = msgs->peerAskedFor; walk != NULL; )
912            {
913                tr_list * next = walk->next;
914                /* We shouldn't reject a peer's fast allowed requests at choke */
915                struct peer_request *req = walk->data;
916                if ( !tr_bitfieldHas( msgs->peerAllowedPieces, req->index ) )
917                {
918                    tr_list_remove_data( &msgs->peerAskedFor, req );
919                    tr_free( req );
920                }
921                walk = next;
922            }
923#endif
924            tr_peerMsgsCancelAllRequests( msgs );
925            break;
926
927        case BT_UNCHOKE:
928            dbgmsg( msgs, "got Unchoke" );
929            assert( msglen == 0 );
930            msgs->info->clientIsChoked = 0;
931            fireNeedReq( msgs );
932            break;
933
934        case BT_INTERESTED:
935            dbgmsg( msgs, "got Interested" );
936            assert( msglen == 0 );
937            msgs->info->peerIsInterested = 1;
938            tr_peerMsgsSetChoke( msgs, 0 );
939            break;
940
941        case BT_NOT_INTERESTED:
942            dbgmsg( msgs, "got Not Interested" );
943            assert( msglen == 0 );
944            msgs->info->peerIsInterested = 0;
945            break;
946
947        case BT_HAVE:
948            assert( msglen == 4 );
949            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
950            tr_bitfieldAdd( msgs->info->have, ui32 );
951            updatePeerProgress( msgs );
952            tr_rcTransferred( msgs->torrent->swarmspeed, msgs->torrent->info.pieceSize );
953            dbgmsg( msgs, "got Have: %u", ui32 );
954            break;
955
956        case BT_BITFIELD: {
957            const int clientIsSeed = tr_cpGetStatus( msgs->torrent->completion ) != TR_CP_INCOMPLETE;
958            dbgmsg( msgs, "got a bitfield" );
959            assert( msglen == msgs->info->have->len );
960            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
961            updatePeerProgress( msgs );
962            tr_peerMsgsSetChoke( msgs, !clientIsSeed || (msgs->info->progress<1.0) );
963            fireNeedReq( msgs );
964            break;
965        }
966
967        case BT_REQUEST: {
968            struct peer_request * req;
969            assert( msglen == 12 );
970            req = tr_new( struct peer_request, 1 );
971            tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
972            tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
973            tr_peerIoReadUint32( msgs->io, inbuf, &req->length );
974            dbgmsg( msgs, "got Request: %u:%u->%u", req->index, req->offset, req->length );
975           
976            if ( !requestIsValid( msgs, req ) )
977            {
978                dbgmsg( msgs, "BT_REQUEST: invalid request, ignoring" );
979                tr_free( req );
980                break;
981            }
982            /*
983                If we're not choking him -> continue
984                If we're choking him
985                    it doesn't support FPE -> He's deaf, reCHOKE and bail...
986                    it support FPE
987                        If the asked piece is not allowed
988                            OR he's above our threshold
989                            OR we don't have the requested piece -> Reject
990                        Else
991                        Asked piece allowed AND he's below our threshold -> continue...
992             */
993   
994
995            if ( msgs->info->peerIsChoked )
996            {
997                if ( !tr_peerIoSupportsFEXT( msgs->io ) )
998                {
999                    dbgmsg( msgs, "BT_REQUEST: peer is choked, ignoring" );
1000                    /* Didn't he get it? */
1001                    tr_peerMsgsSetChoke( msgs, 1 );
1002                    tr_free( req );
1003                    break;
1004                }
1005                else
1006                {
1007                    if ( !tr_bitfieldHas( msgs->peerAllowedPieces, req->index )
1008                         || ( msgs->info->progress * (float)msgs->torrent->info.pieceCount) >= MAX_ALLOWED_SET_COUNT
1009                         || !tr_cpPieceIsComplete( msgs->torrent->completion, req->index ) )
1010                    {
1011                        dbgmsg( msgs, "BT_REQUEST: peer requests an un-fastallowed piece" );
1012                        sendFastReject( msgs, req->index, req->offset, req->length );
1013                        tr_free( req );
1014                        break;
1015                    }
1016                    dbgmsg( msgs, "BT_REQUEST: fast allowed piece, accepting request" );
1017                }   
1018            }
1019           
1020            tr_list_append( &msgs->peerAskedFor, req );
1021            break;
1022        }
1023
1024        case BT_CANCEL: {
1025            struct peer_request req;
1026            void * data;
1027            assert( msglen == 12 );
1028            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1029            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1030            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1031            dbgmsg( msgs, "got a Cancel %u:%u->%u", req.index, req.offset, req.length );
1032            data = tr_list_remove( &msgs->peerAskedFor, &req, compareRequest );
1033            tr_free( data );
1034            break;
1035        }
1036
1037        case BT_PIECE: {
1038            dbgmsg( msgs, "got a Piece!" );
1039            assert( msgs->blockToUs.length == 0 );
1040            tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.index );
1041            tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.offset );
1042            msgs->blockToUs.length = msglen - 8;
1043            assert( EVBUFFER_LENGTH(msgs->inBlock) == 0 );
1044            msgs->state = msgs->blockToUs.length ? READING_BT_PIECE : AWAITING_BT_LENGTH;
1045            return READ_AGAIN;
1046            break;
1047        }
1048
1049        case BT_PORT: {
1050            dbgmsg( msgs, "Got a BT_PORT" );
1051            assert( msglen == 2 );
1052            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1053            break;
1054        }
1055       
1056        case BT_SUGGEST: {
1057            /* tiennou TODO */
1058            break;
1059        }
1060           
1061        case BT_HAVE_ALL: {
1062            assert( msglen == 0 );
1063            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1064            memset( msgs->info->have->bits, 1, msgs->info->have->len );
1065            updatePeerProgress( msgs );
1066            break;
1067        }
1068           
1069        case BT_HAVE_NONE: {
1070            assert( msglen == 0 );
1071            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1072            memset( msgs->info->have->bits, 1, msgs->info->have->len );
1073            updatePeerProgress( msgs );
1074            break;
1075        }
1076           
1077        case BT_REJECT: {
1078            struct peer_request req;
1079            tr_list * node;
1080            assert( msglen == 12 );
1081            dbgmsg( msgs, "Got a BT_REJECT" );
1082            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1083            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1084            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1085            node = tr_list_find( msgs->peerAskedFor, &req, compareRequest );
1086            if( node != NULL ) {
1087                void * data = node->data;
1088                tr_list_remove_data( &msgs->peerAskedFor, data );
1089                tr_free( data );
1090                dbgmsg( msgs, "found the req that peer has rejected... cancelled." );
1091            }
1092            break;
1093        }
1094           
1095        case BT_ALLOWED_FAST: {
1096            assert( msglen == 4 );
1097            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1098            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1099            tr_bitfieldAdd( msgs->clientAllowedPieces, ui32 );
1100            break;
1101        }
1102           
1103        case BT_LTEP:
1104            dbgmsg( msgs, "Got a BT_LTEP" );
1105            parseLtep( msgs, msglen, inbuf );
1106            break;
1107
1108        default:
1109            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1110            tr_peerIoDrain( msgs->io, inbuf, msglen );
1111            assert( 0 );
1112    }
1113
1114    msgs->incomingMessageLength = -1;
1115    msgs->state = AWAITING_BT_LENGTH;
1116    return READ_AGAIN;
1117}
1118
1119static void
1120clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1121{
1122    tr_torrent * tor = msgs->torrent;
1123    tor->activityDate = tr_date( );
1124    tor->downloadedCur += byteCount;
1125    msgs->info->pieceDataActivityDate = time( NULL );
1126    tr_rcTransferred( msgs->info->rcToClient, byteCount );
1127    tr_rcTransferred( tor->download, byteCount );
1128    tr_rcTransferred( tor->handle->download, byteCount );
1129}
1130
1131static void
1132peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1133{
1134    tr_torrent * tor = msgs->torrent;
1135    tor->activityDate = tr_date( );
1136    tor->uploadedCur += byteCount;
1137    msgs->info->pieceDataActivityDate = time( NULL );
1138    tr_rcTransferred( msgs->info->rcToPeer, byteCount );
1139    tr_rcTransferred( tor->upload, byteCount );
1140    tr_rcTransferred( tor->handle->upload, byteCount );
1141}
1142
1143static int
1144canDownload( const tr_peermsgs * msgs )
1145{
1146    tr_torrent * tor = msgs->torrent;
1147
1148    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1149        return !tor->handle->useDownloadLimit || tr_rcCanTransfer( tor->handle->download );
1150
1151    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
1152        return tr_rcCanTransfer( tor->download );
1153
1154    return TRUE;
1155}
1156
1157static void
1158reassignBytesToCorrupt( tr_peermsgs * msgs, uint32_t byteCount )
1159{
1160    tr_torrent * tor = msgs->torrent;
1161
1162    /* increment the `corrupt' field */
1163    tor->corruptCur += byteCount;
1164
1165    /* decrement the `downloaded' field */
1166    if( tor->downloadedCur >= byteCount )
1167        tor->downloadedCur -= byteCount;
1168    else
1169        tor->downloadedCur = 0;
1170}
1171
1172
1173static void
1174gotBadPiece( tr_peermsgs * msgs, uint32_t pieceIndex )
1175{
1176    const uint32_t byteCount = tr_torPieceCountBytes( msgs->torrent, (int)pieceIndex );
1177    reassignBytesToCorrupt( msgs, byteCount );
1178}
1179
1180static void
1181gotUnwantedBlock( tr_peermsgs * msgs,
1182                  uint32_t      index UNUSED,
1183                  uint32_t      offset UNUSED,
1184                  uint32_t      length )
1185{
1186    reassignBytesToCorrupt( msgs, length );
1187}
1188
1189static void
1190addUsToBlamefield( tr_peermsgs * msgs, uint32_t index )
1191{
1192    if( !msgs->info->blame )
1193         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1194    tr_bitfieldAdd( msgs->info->blame, index );
1195}
1196
1197static void
1198gotBlock( tr_peermsgs      * msgs,
1199          struct evbuffer  * inbuf,
1200          uint32_t           index,
1201          uint32_t           offset,
1202          uint32_t           length )
1203{
1204    tr_torrent * tor = msgs->torrent;
1205    const int block = _tr_block( tor, index, offset );
1206    struct peer_request key, *req;
1207
1208    /**
1209    *** Remove the block from our `we asked for this' list
1210    **/
1211
1212    key.index = index;
1213    key.offset = offset;
1214    key.length = length;
1215    req = (struct peer_request*) tr_list_remove( &msgs->clientAskedFor, &key,
1216                                                 compareRequest );
1217    if( req == NULL ) {
1218        gotUnwantedBlock( msgs, index, offset, length );
1219        dbgmsg( msgs, "we didn't ask for this message..." );
1220        return;
1221    }
1222    dbgmsg( msgs, "Got block %u:%u->%u (turnaround time %d secs)", 
1223                     req->index, req->offset, req->length,
1224                     (int)(time(NULL) - req->time_requested) );
1225    tr_free( req );
1226    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1227                  tr_list_size(msgs->clientAskedFor));
1228
1229    /**
1230    *** Error checks
1231    **/
1232
1233    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1234        dbgmsg( msgs, "have this block already..." );
1235        tr_dbg( "have this block already..." );
1236        gotUnwantedBlock( msgs, index, offset, length );
1237        return;
1238    }
1239
1240    if( (int)length != tr_torBlockCountBytes( tor, block ) ) {
1241        dbgmsg( msgs, "block is the wrong length..." );
1242        tr_dbg( "block is the wrong length..." );
1243        gotUnwantedBlock( msgs, index, offset, length );
1244        return;
1245    }
1246
1247    /**
1248    ***  Write the block
1249    **/
1250
1251    if( tr_ioWrite( tor, index, offset, length, EVBUFFER_DATA( inbuf )))
1252        return;
1253
1254    tr_cpBlockAdd( tor->completion, block );
1255
1256    addUsToBlamefield( msgs, index );
1257
1258    fireGotBlock( msgs, index, offset, length );
1259
1260    /**
1261    ***  Handle if this was the last block in the piece
1262    **/
1263
1264    if( tr_cpPieceIsComplete( tor->completion, index ) )
1265    {
1266        if( tr_ioHash( tor, index ) )
1267        {
1268            gotBadPiece( msgs, index );
1269            return;
1270        }
1271
1272        fireClientHave( msgs, index );
1273    }
1274}
1275
1276
1277static ReadState
1278readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf )
1279{
1280    uint32_t inlen;
1281    uint8_t * tmp;
1282
1283    assert( msgs != NULL );
1284    assert( msgs->blockToUs.length > 0 );
1285    assert( inbuf != NULL );
1286    assert( EVBUFFER_LENGTH( inbuf ) > 0 );
1287
1288    /* read from the inbuf into our block buffer */
1289    inlen = MIN( EVBUFFER_LENGTH(inbuf), msgs->blockToUs.length );
1290    tmp = tr_new( uint8_t, inlen );
1291    tr_peerIoReadBytes( msgs->io, inbuf, tmp, inlen );
1292    evbuffer_add( msgs->inBlock, tmp, inlen );
1293
1294    /* update our tables accordingly */
1295    assert( inlen >= msgs->blockToUs.length );
1296    msgs->blockToUs.length -= inlen;
1297    msgs->info->peerSentPieceDataAt = time( NULL );
1298    clientGotBytes( msgs, inlen );
1299
1300    /* if this was the entire block, save it */
1301    if( !msgs->blockToUs.length )
1302    {
1303        dbgmsg( msgs, "got block %u:%u", msgs->blockToUs.index, msgs->blockToUs.offset );
1304        assert( (int)EVBUFFER_LENGTH( msgs->inBlock ) == tr_torBlockCountBytes( msgs->torrent, _tr_block(msgs->torrent,msgs->blockToUs.index, msgs->blockToUs.offset) ) );
1305        gotBlock( msgs, msgs->inBlock,
1306                        msgs->blockToUs.index,
1307                        msgs->blockToUs.offset,
1308                        EVBUFFER_LENGTH( msgs->inBlock ) );
1309        evbuffer_drain( msgs->inBlock, ~0 );
1310        msgs->state = AWAITING_BT_LENGTH;
1311    }
1312
1313    /* cleanup */
1314    tr_free( tmp );
1315    return READ_AGAIN;
1316}
1317
1318static ReadState
1319canRead( struct bufferevent * evin, void * vmsgs )
1320{
1321    ReadState ret;
1322    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1323    struct evbuffer * inbuf = EVBUFFER_INPUT ( evin );
1324
1325    if( !canDownload( msgs ) )
1326    {
1327        msgs->notListening = 1;
1328        tr_peerIoSetIOMode ( msgs->io, 0, EV_READ );
1329        ret = READ_DONE;
1330    }
1331    else switch( msgs->state )
1332    {
1333        case AWAITING_BT_LENGTH:  ret = readBtLength  ( msgs, inbuf ); break;
1334        case AWAITING_BT_MESSAGE: ret = readBtMessage ( msgs, inbuf ); break;
1335        case READING_BT_PIECE:    ret = readBtPiece   ( msgs, inbuf ); break;
1336        default: assert( 0 );
1337    }
1338
1339    return ret;
1340}
1341
1342static void
1343sendKeepalive( tr_peermsgs * msgs )
1344{
1345    dbgmsg( msgs, "sending a keepalive message" );
1346    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1347}
1348
1349/**
1350***
1351**/
1352
1353static int
1354canWrite( const tr_peermsgs * msgs )
1355{
1356    /* don't let our outbuffer get too large */
1357    if( tr_peerIoWriteBytesWaiting( msgs->io ) > 8192 )
1358        return FALSE;
1359
1360    return TRUE;
1361}
1362
1363static int
1364canUpload( const tr_peermsgs * msgs )
1365{
1366    const tr_torrent * tor = msgs->torrent;
1367
1368    if( !canWrite( msgs ) )
1369        return FALSE;
1370
1371    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1372        return !tor->handle->useUploadLimit || tr_rcCanTransfer( tor->handle->upload );
1373
1374    if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
1375        return tr_rcCanTransfer( tor->upload );
1376
1377    return TRUE;
1378}
1379
1380static int
1381ratePulse( void * vmsgs )
1382{
1383    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1384    msgs->info->rateToClient = tr_rcRate( msgs->info->rcToClient );
1385    msgs->info->rateToPeer = tr_rcRate( msgs->info->rcToPeer );
1386    msgs->maxActiveRequests = MIN( 8 + (int)(msgs->info->rateToClient/10), 100 );
1387    msgs->minActiveRequests = msgs->maxActiveRequests / 2;
1388    return TRUE;
1389}
1390
1391static int
1392pulse( void * vmsgs )
1393{
1394    const time_t now = time( NULL );
1395    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1396    size_t len;
1397
1398    /* if we froze out a downloaded block because of speed limits,
1399       start listening to the peer again */
1400    if( msgs->notListening && canDownload( msgs ) )
1401    {
1402        msgs->notListening = 0;
1403        tr_peerIoSetIOMode ( msgs->io, EV_READ, 0 );
1404    }
1405
1406    pumpRequestQueue( msgs );
1407
1408    if( !canWrite( msgs ) )
1409    {
1410    }
1411    else if(( len = EVBUFFER_LENGTH( msgs->outMessages ) ))
1412    {
1413        tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1414        msgs->clientSentAnythingAt = now;
1415    }
1416    else if(( msgs->peerAskedFor ))
1417    {
1418        if( canUpload( msgs ) )
1419        {
1420            struct peer_request * r = tr_list_pop_front( &msgs->peerAskedFor );
1421            uint8_t * buf = tr_new( uint8_t, r->length );
1422
1423            if( requestIsValid( msgs, r )
1424                && tr_cpPieceIsComplete( msgs->torrent->completion, r->index )
1425                && !tr_ioRead( msgs->torrent, r->index, r->offset, r->length, buf ) )
1426            {
1427                protocolSendPiece( msgs, r, buf );
1428                peerGotBytes( msgs, r->length );
1429                msgs->clientSentAnythingAt = now;
1430            }
1431
1432            tr_free( buf );
1433            tr_free( r );
1434        }
1435    }
1436    else if( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
1437    {
1438        sendKeepalive( msgs );
1439    }
1440
1441    return TRUE; /* loop forever */
1442}
1443
1444static void
1445didWrite( struct bufferevent * evin UNUSED, void * vmsgs )
1446{
1447    pulse( vmsgs );
1448}
1449
1450static void
1451gotError( struct bufferevent * evbuf UNUSED, short what, void * vmsgs )
1452{
1453    dbgmsg( vmsgs, "libevent got an error! what=%d, errno=%d (%s)",
1454            (int)what, errno, strerror(errno) );
1455    fireGotError( vmsgs );
1456}
1457
1458static void
1459sendBitfield( tr_peermsgs * msgs )
1460{
1461    const tr_bitfield * bitfield = tr_cpPieceBitfield( msgs->torrent->completion );
1462    struct evbuffer * out = msgs->outMessages;
1463
1464    dbgmsg( msgs, "sending peer a bitfield message" );
1465    tr_peerIoWriteUint32( msgs->io, out, sizeof(uint8_t) + bitfield->len );
1466    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1467    tr_peerIoWriteBytes ( msgs->io, out, bitfield->bits, bitfield->len );
1468}
1469
1470/**
1471***
1472**/
1473
1474/* some peers give us error messages if we send
1475   more than this many peers in a single pex message */
1476#define MAX_PEX_DIFFS 200
1477
1478typedef struct
1479{
1480    tr_pex * added;
1481    tr_pex * dropped;
1482    tr_pex * elements;
1483    int addedCount;
1484    int droppedCount;
1485    int elementCount;
1486    int diffCount;
1487}
1488PexDiffs;
1489
1490static void
1491pexAddedCb( void * vpex, void * userData )
1492{
1493    PexDiffs * diffs = (PexDiffs *) userData;
1494    tr_pex * pex = (tr_pex *) vpex;
1495    if( diffs->diffCount < MAX_PEX_DIFFS )
1496    {
1497        diffs->diffCount++;
1498        diffs->added[diffs->addedCount++] = *pex;
1499        diffs->elements[diffs->elementCount++] = *pex;
1500    }
1501}
1502
1503static void
1504pexRemovedCb( void * vpex, void * userData )
1505{
1506    PexDiffs * diffs = (PexDiffs *) userData;
1507    tr_pex * pex = (tr_pex *) vpex;
1508    if( diffs->diffCount < MAX_PEX_DIFFS )
1509    {
1510        diffs->diffCount++;
1511        diffs->dropped[diffs->droppedCount++] = *pex;
1512    }
1513}
1514
1515static void
1516pexElementCb( void * vpex, void * userData )
1517{
1518    PexDiffs * diffs = (PexDiffs *) userData;
1519    tr_pex * pex = (tr_pex *) vpex;
1520    if( diffs->diffCount < MAX_PEX_DIFFS )
1521    {
1522        diffs->diffCount++;
1523        diffs->elements[diffs->elementCount++] = *pex;
1524    }
1525}
1526
1527static void
1528sendPex( tr_peermsgs * msgs )
1529{
1530    if( msgs->peerSupportsPex && tr_torrentIsPexEnabled( msgs->torrent ) )
1531    {
1532        int i;
1533        tr_pex * newPex = NULL;
1534        const int newCount = tr_peerMgrGetPeers( msgs->handle->peerMgr, msgs->torrent->info.hash, &newPex );
1535        PexDiffs diffs;
1536        benc_val_t val, *added, *dropped, *flags;
1537        uint8_t *tmp, *walk;
1538        char * benc;
1539        int bencLen;
1540
1541        /* build the diffs */
1542        diffs.added = tr_new( tr_pex, newCount );
1543        diffs.addedCount = 0;
1544        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1545        diffs.droppedCount = 0;
1546        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1547        diffs.elementCount = 0;
1548        diffs.diffCount = 0;
1549        tr_set_compare( msgs->pex, msgs->pexCount,
1550                        newPex, newCount,
1551                        tr_pexCompare, sizeof(tr_pex),
1552                        pexRemovedCb, pexAddedCb, pexElementCb, &diffs );
1553        dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1554
1555        /* update peer */
1556        tr_free( msgs->pex );
1557        msgs->pex = diffs.elements;
1558        msgs->pexCount = diffs.elementCount;
1559
1560        /* build the pex payload */
1561        tr_bencInit( &val, TYPE_DICT );
1562        tr_bencDictReserve( &val, 3 );
1563
1564        /* "added" */
1565        added = tr_bencDictAdd( &val, "added" );
1566        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1567        for( i=0; i<diffs.addedCount; ++i ) {
1568            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1569            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1570        }
1571        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1572        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
1573
1574        /* "added.f" */
1575        flags = tr_bencDictAdd( &val, "added.f" );
1576        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1577        for( i=0; i<diffs.addedCount; ++i )
1578            *walk++ = diffs.added[i].flags;
1579        assert( ( walk - tmp ) == diffs.addedCount );
1580        tr_bencInitStr( flags, tmp, walk-tmp, FALSE );
1581
1582        /* "dropped" */
1583        dropped = tr_bencDictAdd( &val, "dropped" );
1584        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1585        for( i=0; i<diffs.droppedCount; ++i ) {
1586            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1587            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1588        }
1589        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1590        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
1591
1592        /* write the pex message */
1593        benc = tr_bencSaveMalloc( &val, &bencLen );
1594        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 2*sizeof(uint8_t) + bencLen );
1595        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_LTEP );
1596        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, OUR_LTEP_PEX );
1597        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, benc, bencLen );
1598
1599        /* cleanup */
1600        tr_free( benc );
1601        tr_bencFree( &val );
1602        tr_free( diffs.added );
1603        tr_free( diffs.dropped );
1604        tr_free( newPex );
1605
1606        msgs->clientSentPexAt = time( NULL );
1607    }
1608}
1609
1610static int
1611pexPulse( void * vpeer )
1612{
1613    sendPex( vpeer );
1614    return TRUE;
1615}
1616
1617/**
1618***
1619**/
1620
1621tr_peermsgs*
1622tr_peerMsgsNew( struct tr_torrent * torrent,
1623                struct tr_peer    * info,
1624                tr_delivery_func    func,
1625                void              * userData,
1626                tr_publisher_tag  * setme )
1627{
1628    tr_peermsgs * m;
1629
1630    assert( info != NULL );
1631    assert( info->io != NULL );
1632
1633    m = tr_new0( tr_peermsgs, 1 );
1634    m->publisher = tr_publisherNew( );
1635    m->info = info;
1636    m->handle = torrent->handle;
1637    m->torrent = torrent;
1638    m->io = info->io;
1639    m->info->clientIsChoked = 1;
1640    m->info->peerIsChoked = 1;
1641    m->info->clientIsInterested = 0;
1642    m->info->peerIsInterested = 0;
1643    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1644    m->pulseTimer = tr_timerNew( m->handle, pulse, m, PEER_PULSE_INTERVAL );
1645    m->rateTimer = tr_timerNew( m->handle, ratePulse, m, RATE_PULSE_INTERVAL );
1646    m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL );
1647    m->outMessages = evbuffer_new( );
1648    m->inBlock = evbuffer_new( );
1649    m->peerAllowedPieces = NULL;
1650    m->clientAllowedPieces = NULL;
1651    setme = tr_publisherSubscribe( m->publisher, func, userData );
1652   
1653    if ( tr_peerIoSupportsFEXT( m->io ) )
1654    {
1655        /* This peer is fastpeer-enabled, generate its allowed set
1656         * (before registering our callbacks) */
1657        if ( !m->peerAllowedPieces ) {
1658            const struct in_addr *peerAddr = tr_peerIoGetAddress( m->io, NULL );
1659           
1660            m->peerAllowedPieces = tr_peerMgrGenerateAllowedSet( MAX_ALLOWED_SET_COUNT,
1661                                                                 m->torrent->info.pieceCount,
1662                                                                 m->torrent->info.hash,
1663                                                                 peerAddr );
1664        }
1665        m->clientAllowedPieces = tr_bitfieldNew( m->torrent->info.pieceCount );
1666    }
1667   
1668    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* error if we don't read or write for 2.5 minutes */
1669    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
1670    tr_peerIoSetIOMode( m->io, EV_READ|EV_WRITE, 0 );
1671    ratePulse( m );
1672
1673    /**
1674    ***  If we initiated this connection,
1675    ***  we may need to send LTEP/AZMP handshakes.
1676    ***  Otherwise we'll wait for the peer to send theirs first.
1677    **/
1678    if( !tr_peerIoIsIncoming( m->io ) )
1679    {
1680        if ( tr_peerIoSupportsLTEP( m->io ) ) {
1681            sendLtepHandshake( m );
1682           
1683        } else if ( tr_peerIoSupportsAZMP( m->io ) ) {
1684            dbgmsg( m, "FIXME: need to send AZMP handshake" );
1685           
1686        } else {
1687            /* no-op */
1688        }
1689    }
1690   
1691    if ( tr_peerIoSupportsFEXT( m->io ) )
1692    {
1693        /* This peer is fastpeer-enabled, send it have-all or have-none if appropriate */
1694        float completion = tr_cpPercentComplete( m->torrent->completion );
1695        if ( completion == 0.0f ) {
1696            sendFastHave( m, 0 );
1697        } else if ( completion == 1.0f ) {
1698            sendFastHave( m, 1 );
1699        } else {
1700            sendBitfield( m );
1701        }
1702        uint32_t peerProgress = m->torrent->info.pieceCount * m->info->progress;
1703       
1704        if ( peerProgress < MAX_ALLOWED_SET_COUNT )
1705            sendFastAllowedSet( m );
1706    } else {
1707        sendBitfield( m );
1708    }
1709    return m;
1710}
1711
1712void
1713tr_peerMsgsFree( tr_peermsgs* msgs )
1714{
1715    if( msgs != NULL )
1716    {
1717        tr_timerFree( &msgs->pulseTimer );
1718        tr_timerFree( &msgs->rateTimer );
1719        tr_timerFree( &msgs->pexTimer );
1720        tr_publisherFree( &msgs->publisher );
1721        tr_list_free( &msgs->clientWillAskFor, tr_free );
1722        tr_list_free( &msgs->clientAskedFor, tr_free );
1723        tr_list_free( &msgs->peerAskedFor, tr_free );
1724        evbuffer_free( msgs->outMessages );
1725        evbuffer_free( msgs->inBlock );
1726        tr_free( msgs->pex );
1727        msgs->pexCount = 0;
1728        tr_free( msgs );
1729    }
1730}
1731
1732tr_publisher_tag
1733tr_peerMsgsSubscribe( tr_peermsgs       * peer,
1734                      tr_delivery_func    func,
1735                      void              * userData )
1736{
1737    return tr_publisherSubscribe( peer->publisher, func, userData );
1738}
1739
1740void
1741tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
1742                        tr_publisher_tag    tag )
1743{
1744    tr_publisherUnsubscribe( peer->publisher, tag );
1745}
1746
1747int
1748tr_peerMsgIsPieceFastAllowed( const tr_peermsgs * peer,
1749                              uint32_t            index )
1750{
1751    return tr_bitfieldHas( peer->clientAllowedPieces, index );
1752}
1753
Note: See TracBrowser for help on using the repository browser.