source: trunk/libtransmission/peer-msgs.c @ 3753

Last change on this file since 3753 was 3753, checked in by charles, 15 years ago

update our #includes now that libevent has cleaned up event.h

  • Property svn:keywords set to Date Rev Author Id
File size: 50.0 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 3753 2007-11-07 18:26:19Z charles $
11 */
12
13#include <assert.h>
14#include <ctype.h>
15#include <errno.h>
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19#include <libgen.h> /* basename */
20
21#include <netinet/in.h> /* struct in_addr */
22
23#include <event.h>
24
25#include "transmission.h"
26#include "bencode.h"
27#include "completion.h"
28#include "inout.h"
29#include "list.h"
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-mgr-private.h"
33#include "peer-msgs.h"
34#include "ratecontrol.h"
35#include "trevent.h"
36#include "utils.h"
37
38/**
39***
40**/
41
42#define MAX_ALLOWED_SET_COUNT   10 /* number of pieces generated for allow-fast,
43                                      threshold for fast-allowing others */
44
45enum
46{
47    BT_CHOKE                = 0,
48    BT_UNCHOKE              = 1,
49    BT_INTERESTED           = 2,
50    BT_NOT_INTERESTED       = 3,
51    BT_HAVE                 = 4,
52    BT_BITFIELD             = 5,
53    BT_REQUEST              = 6,
54    BT_PIECE                = 7,
55    BT_CANCEL               = 8,
56    BT_PORT                 = 9,
57    BT_SUGGEST              = 13,
58    BT_HAVE_ALL             = 14,
59    BT_HAVE_NONE            = 15,
60    BT_REJECT               = 16,
61    BT_ALLOWED_FAST         = 17,
62    BT_LTEP                 = 20,
63
64    LTEP_HANDSHAKE          = 0,
65
66    OUR_LTEP_PEX            = 1,
67
68    MAX_REQUEST_BYTE_COUNT  = (16 * 1024), /* drop requests who want too much */
69
70    KEEPALIVE_INTERVAL_SECS = 90,          /* idle seconds before we send a keepalive */
71    PEX_INTERVAL            = (60 * 1000), /* msec between calls to sendPex() */
72    PEER_PULSE_INTERVAL     = (100),       /* msec between calls to pulse() */
73    RATE_PULSE_INTERVAL     = (333),       /* msec between calls to ratePulse() */
74};
75
76enum
77{
78    AWAITING_BT_LENGTH,
79    AWAITING_BT_MESSAGE,
80    READING_BT_PIECE
81};
82
83struct peer_request
84{
85    uint32_t index;
86    uint32_t offset;
87    uint32_t length;
88    time_t time_requested;
89};
90
91static int
92compareRequest( const void * va, const void * vb )
93{
94    struct peer_request * a = (struct peer_request*) va;
95    struct peer_request * b = (struct peer_request*) vb;
96    if( a->index != b->index ) return a->index - b->index;
97    if( a->offset != b->offset ) return a->offset - b->offset;
98    if( a->length != b->length ) return a->length - b->length;
99    return 0;
100}
101
102struct tr_peermsgs
103{
104    tr_peer * info;
105
106    tr_handle * handle;
107    tr_torrent * torrent;
108    tr_peerIo * io;
109
110    tr_publisher_t * publisher;
111
112    struct evbuffer * outBlock;    /* buffer of all the current piece message */
113    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
114    struct evbuffer * inBlock;     /* the block we're currently receiving */
115    tr_list * peerAskedFor;
116    tr_list * peerAskedForFast;
117    tr_list * clientAskedFor;
118    tr_list * clientWillAskFor;
119
120    tr_timer * rateTimer;
121    tr_timer * pulseTimer;
122    tr_timer * pexTimer;
123
124    struct peer_request blockToUs; /* the block currntly being sent to us */
125
126    time_t lastReqAddedAt;
127    time_t clientSentPexAt;
128    time_t clientSentAnythingAt;
129
130    unsigned int notListening             : 1;
131    unsigned int peerSupportsPex          : 1;
132    unsigned int clientSentLtepHandshake  : 1;
133    unsigned int peerSentLtepHandshake    : 1;
134   
135    tr_bitfield * clientAllowedPieces;
136    tr_bitfield * peerAllowedPieces;
137   
138    uint8_t state;
139    uint8_t ut_pex_id;
140    uint16_t pexCount;
141    uint32_t incomingMessageLength;
142    uint32_t maxActiveRequests;
143    uint32_t minActiveRequests;
144
145    tr_pex * pex;
146};
147
148/**
149***
150**/
151
152static void
153myDebug( const char * file, int line,
154         const struct tr_peermsgs * msgs,
155         const char * fmt, ... )
156{
157    FILE * fp = tr_getLog( );
158    if( fp != NULL )
159    {
160        va_list args;
161        char timestr[64];
162        struct evbuffer * buf = evbuffer_new( );
163        char * myfile = tr_strdup( file );
164
165        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
166                             tr_getLogTimeStr( timestr, sizeof(timestr) ),
167                             msgs->torrent->info.name,
168                             tr_peerIoGetAddrStr( msgs->io ),
169                             msgs->info->client );
170        va_start( args, fmt );
171        evbuffer_add_vprintf( buf, fmt, args );
172        va_end( args );
173        evbuffer_add_printf( buf, " (%s:%d)\n", basename(myfile), line );
174        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
175
176        tr_free( myfile );
177        evbuffer_free( buf );
178    }
179}
180
181#define dbgmsg(msgs, fmt...) myDebug(__FILE__, __LINE__, msgs, ##fmt )
182
183/**
184***
185**/
186
187static void
188protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
189{
190    tr_peerIo * io = msgs->io;
191    struct evbuffer * out = msgs->outMessages;
192
193    dbgmsg( msgs, "requesting %u:%u->%u", req->index, req->offset, req->length );
194    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
195    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
196    tr_peerIoWriteUint32( io, out, req->index );
197    tr_peerIoWriteUint32( io, out, req->offset );
198    tr_peerIoWriteUint32( io, out, req->length );
199}
200
201static void
202protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
203{
204    tr_peerIo * io = msgs->io;
205    struct evbuffer * out = msgs->outMessages;
206
207    dbgmsg( msgs, "cancelling %u:%u->%u", req->index, req->offset, req->length );
208    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
209    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
210    tr_peerIoWriteUint32( io, out, req->index );
211    tr_peerIoWriteUint32( io, out, req->offset );
212    tr_peerIoWriteUint32( io, out, req->length );
213}
214
215static void
216protocolSendHave( tr_peermsgs * msgs, uint32_t index )
217{
218    tr_peerIo * io = msgs->io;
219    struct evbuffer * out = msgs->outMessages;
220
221    dbgmsg( msgs, "sending Have %u", index );
222    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
223    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
224    tr_peerIoWriteUint32( io, out, index );
225}
226
227static void
228protocolSendChoke( tr_peermsgs * msgs, int choke )
229{
230    tr_peerIo * io = msgs->io;
231    struct evbuffer * out = msgs->outMessages;
232
233    dbgmsg( msgs, "sending %s", (choke ? "Choke" : "Unchoke") );
234    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) );
235    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
236}
237
238/**
239***  EVENTS
240**/
241
242static const tr_peermsgs_event blankEvent = { 0, 0, 0, 0, 0.0f };
243
244static void
245publish( tr_peermsgs * msgs, tr_peermsgs_event * e )
246{
247    tr_publisherPublish( msgs->publisher, msgs->info, e );
248}
249
250static void
251fireGotError( tr_peermsgs * msgs )
252{
253    tr_peermsgs_event e = blankEvent;
254    e.eventType = TR_PEERMSG_GOT_ERROR;
255    publish( msgs, &e );
256}
257
258static void
259fireNeedReq( tr_peermsgs * msgs )
260{
261    tr_peermsgs_event e = blankEvent;
262    e.eventType = TR_PEERMSG_NEED_REQ;
263    publish( msgs, &e );
264}
265
266static void
267firePeerProgress( tr_peermsgs * msgs )
268{
269    tr_peermsgs_event e = blankEvent;
270    e.eventType = TR_PEERMSG_PEER_PROGRESS;
271    e.progress = msgs->info->progress;
272    publish( msgs, &e );
273}
274
275static void
276fireClientHave( tr_peermsgs * msgs, uint32_t pieceIndex )
277{
278    tr_peermsgs_event e = blankEvent;
279    e.eventType = TR_PEERMSG_CLIENT_HAVE;
280    e.pieceIndex = pieceIndex;
281    publish( msgs, &e );
282}
283
284static void
285fireGotBlock( tr_peermsgs * msgs, uint32_t pieceIndex, uint32_t offset, uint32_t length )
286{
287    tr_peermsgs_event e = blankEvent;
288    e.eventType = TR_PEERMSG_CLIENT_BLOCK;
289    e.pieceIndex = pieceIndex;
290    e.offset = offset;
291    e.length = length;
292    publish( msgs, &e );
293}
294
295static void
296fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
297{
298    tr_peermsgs_event e = blankEvent;
299    e.eventType = TR_PEERMSG_CANCEL;
300    e.pieceIndex = req->index;
301    e.offset = req->offset;
302    e.length = req->length;
303    publish( msgs, &e );
304}
305
306/**
307***  INTEREST
308**/
309
310static int
311isPieceInteresting( const tr_peermsgs   * peer,
312                    int                   piece )
313{
314    const tr_torrent * torrent = peer->torrent;
315    if( torrent->info.pieces[piece].dnd ) /* we don't want it */
316        return FALSE;
317    if( tr_cpPieceIsComplete( torrent->completion, piece ) ) /* we have it */
318        return FALSE;
319    if( !tr_bitfieldHas( peer->info->have, piece ) ) /* peer doesn't have it */
320        return FALSE;
321    if( tr_bitfieldHas( peer->info->banned, piece ) ) /* peer is banned */
322        return FALSE;
323    return TRUE;
324}
325
326/* "interested" means we'll ask for piece data if they unchoke us */
327static int
328isPeerInteresting( const tr_peermsgs * msgs )
329{
330    int i;
331    const tr_torrent * torrent;
332    const tr_bitfield * bitfield;
333    const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
334
335    if( clientIsSeed )
336        return FALSE;
337
338    torrent = msgs->torrent;
339    bitfield = tr_cpPieceBitfield( torrent->completion );
340
341    if( !msgs->info->have )
342        return TRUE;
343
344    assert( bitfield->len == msgs->info->have->len );
345    for( i=0; i<torrent->info.pieceCount; ++i )
346        if( isPieceInteresting( msgs, i ) )
347            return TRUE;
348
349    return FALSE;
350}
351
352static void
353sendInterest( tr_peermsgs * msgs, int weAreInterested )
354{
355    assert( msgs != NULL );
356    assert( weAreInterested==0 || weAreInterested==1 );
357
358    msgs->info->clientIsInterested = weAreInterested;
359    dbgmsg( msgs, "Sending %s",
360            weAreInterested ? "Interested" : "Not Interested");
361
362    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
363    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages,
364                   weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
365}
366
367static void
368updateInterest( tr_peermsgs * msgs )
369{
370    const int i = isPeerInteresting( msgs );
371    if( i != msgs->info->clientIsInterested )
372        sendInterest( msgs, i );
373    if( i )
374        fireNeedReq( msgs );
375}
376
377#define MIN_CHOKE_PERIOD_SEC 10
378
379static void
380cancelAllRequestsToClientExceptFast( tr_peermsgs * msgs )
381{
382    tr_list_free( &msgs->peerAskedFor, tr_free );
383}
384
385void
386tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
387{
388    const time_t fibrillationTime = time(NULL) - MIN_CHOKE_PERIOD_SEC;
389
390    assert( msgs != NULL );
391    assert( msgs->info != NULL );
392    assert( choke==0 || choke==1 );
393
394    if( msgs->info->chokeChangedAt > fibrillationTime )
395    {
396        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
397    }
398    else if( msgs->info->peerIsChoked != choke )
399    {
400        msgs->info->peerIsChoked = choke;
401        if( choke )
402            cancelAllRequestsToClientExceptFast( msgs );
403        protocolSendChoke( msgs, choke );
404        msgs->info->chokeChangedAt = time( NULL );
405    }
406}
407
408/**
409***
410**/
411
412void
413tr_peerMsgsHave( tr_peermsgs * msgs,
414                 uint32_t      index )
415{
416    protocolSendHave( msgs, index );
417
418    /* since we have more pieces now, we might not be interested in this peer */
419    updateInterest( msgs );
420}
421#if 0
422static void
423sendFastSuggest( tr_peermsgs * msgs,
424                 uint32_t      pieceIndex )
425{
426    dbgmsg( msgs, "w00t SUGGESTing them piece #%d", pieceIndex );
427    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
428    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
429    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
430   
431    updateInterest( msgs );
432}
433#endif
434static void
435sendFastHave( tr_peermsgs * msgs,
436              int           all)
437{
438    dbgmsg( msgs, "w00t telling them we %s pieces", (all ? "HAVE_ALL" : "HAVE_NONE" ) );
439    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
440    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, ( all ? BT_HAVE_ALL : BT_HAVE_NONE ) );
441   
442    updateInterest( msgs );
443}
444
445static void
446sendFastReject( tr_peermsgs * msgs,
447                uint32_t      pieceIndex,
448                uint32_t      offset,
449                uint32_t      length )
450{
451    assert( msgs != NULL );
452
453    if( tr_peerIoSupportsFEXT( msgs->io ) )
454    {
455        const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
456        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
457        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_REJECT );
458        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
459        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
460        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
461    }
462}
463
464static void
465sendFastAllowed( tr_peermsgs * msgs,
466                 uint32_t      pieceIndex)
467{
468    dbgmsg( msgs, "w00t telling them we ALLOW_FAST piece #%d", pieceIndex );
469    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
470    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_ALLOWED_FAST );
471    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
472}
473
474
475
476static void
477sendFastAllowedSet( tr_peermsgs * msgs )
478{
479    int i = 0;
480    while (i <= msgs->torrent->info.pieceCount )
481    {
482        if ( tr_bitfieldHas( msgs->peerAllowedPieces, i) )
483            sendFastAllowed( msgs, i );
484        i++;
485    }
486}
487
488
489/**
490***
491**/
492
493static int
494reqIsValid( const tr_peermsgs * msgs, uint32_t index, uint32_t offset, uint32_t length )
495{
496    const tr_torrent * tor = msgs->torrent;
497
498    if( index >= (uint32_t) tor->info.pieceCount )
499        return FALSE;
500    if ( (int)offset >= tr_torPieceCountBytes( tor, (int)index ) )
501        return FALSE;
502    if( length > MAX_REQUEST_BYTE_COUNT )
503        return FALSE;
504    if( tr_pieceOffset( tor, index, offset, length ) > tor->info.totalSize )
505        return FALSE;
506
507    return TRUE;
508}
509
510static int
511requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
512{
513    return reqIsValid( msgs, req->index, req->offset, req->length );
514}
515
516static void
517pumpRequestQueue( tr_peermsgs * msgs )
518{
519    const int max = msgs->maxActiveRequests;
520    const int min = msgs->minActiveRequests;
521    int count = tr_list_size( msgs->clientAskedFor );
522    int sent = 0;
523
524    if( count > min )
525        return;
526    if( msgs->info->clientIsChoked )
527        return;
528
529    while( ( count < max ) && ( msgs->clientWillAskFor != NULL ) )
530    {
531        struct peer_request * req = tr_list_pop_front( &msgs->clientWillAskFor );
532        protocolSendRequest( msgs, req );
533        req->time_requested = msgs->lastReqAddedAt = time( NULL );
534        tr_list_append( &msgs->clientAskedFor, req );
535        ++count;
536        ++sent;
537    }
538
539    if( sent )
540        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
541                sent,
542                tr_list_size(msgs->clientAskedFor),
543                tr_list_size(msgs->clientWillAskFor) );
544
545    if( count < max )
546        fireNeedReq( msgs );
547}
548
549int
550tr_peerMsgsAddRequest( tr_peermsgs * msgs,
551                       uint32_t      index, 
552                       uint32_t      offset, 
553                       uint32_t      length )
554{
555    const int req_max = msgs->maxActiveRequests;
556    struct peer_request tmp, *req;
557
558    assert( msgs != NULL );
559    assert( msgs->torrent != NULL );
560    assert( reqIsValid( msgs, index, offset, length ) );
561
562    /**
563    ***  Reasons to decline the request
564    **/
565
566    /* don't send requests to choked clients */
567    if( msgs->info->clientIsChoked ) {
568        dbgmsg( msgs, "declining request because they're choking us" );
569        return TR_ADDREQ_CLIENT_CHOKED;
570    }
571
572    /* peer doesn't have this piece */
573    if( !tr_bitfieldHas( msgs->info->have, index ) )
574        return TR_ADDREQ_MISSING;
575
576    /* peer's queue is full */
577    if( tr_list_size( msgs->clientWillAskFor ) >= req_max ) {
578        dbgmsg( msgs, "declining request because we're full" );
579        return TR_ADDREQ_FULL;
580    }
581
582    /* have we already asked for this piece? */
583    tmp.index = index;
584    tmp.offset = offset;
585    tmp.length = length;
586    if( tr_list_find( msgs->clientAskedFor, &tmp, compareRequest ) ) {
587        dbgmsg( msgs, "declining because it's a duplicate" );
588        return TR_ADDREQ_DUPLICATE;
589    }
590    if( tr_list_find( msgs->clientWillAskFor, &tmp, compareRequest ) ) {
591        dbgmsg( msgs, "declining because it's a duplicate" );
592        return TR_ADDREQ_DUPLICATE;
593    }
594
595    /**
596    ***  Accept this request
597    **/
598
599    dbgmsg( msgs, "added req for piece %d, offset %d", (int)index, (int)offset );
600    req = tr_new0( struct peer_request, 1 );
601    *req = tmp;
602    tr_list_append( &msgs->clientWillAskFor, req );
603    return TR_ADDREQ_OK;
604}
605
606static void
607cancelAllRequestsToPeer( tr_peermsgs * msgs )
608{
609    struct peer_request * req;
610
611    while(( req = tr_list_pop_front( &msgs->clientWillAskFor ) ))
612    {
613        fireCancelledReq( msgs, req );
614        tr_free( req );
615    }
616
617    while(( req = tr_list_pop_front( &msgs->clientAskedFor ) ))
618    {
619        fireCancelledReq( msgs, req );
620        protocolSendCancel( msgs, req );
621        tr_free( req );
622    }
623}
624
625void
626tr_peerMsgsCancel( tr_peermsgs * msgs,
627                   uint32_t      pieceIndex,
628                   uint32_t      offset,
629                   uint32_t      length )
630{
631    struct peer_request *req, tmp;
632
633    assert( msgs != NULL );
634    assert( length > 0 );
635
636    /* have we asked the peer for this piece? */
637    tmp.index = pieceIndex;
638    tmp.offset = offset;
639    tmp.length = length;
640
641    /* if it's only in the queue and hasn't been sent yet, free it */
642    if(( req = tr_list_remove( &msgs->clientWillAskFor, &tmp, compareRequest ) ))
643    {
644        fireCancelledReq( msgs, req );
645        tr_free( req );
646    }
647
648    /* if it's already been sent, send a cancel message too */
649    if(( req = tr_list_remove( &msgs->clientAskedFor, &tmp, compareRequest ) ))
650    {
651        protocolSendCancel( msgs, req );
652        fireCancelledReq( msgs, req );
653        tr_free( req );
654    }
655}
656
657/**
658***
659**/
660
661static void
662sendLtepHandshake( tr_peermsgs * msgs )
663{
664    benc_val_t val, *m;
665    char * buf;
666    int len;
667    int pex;
668    const char * v = TR_NAME " " USERAGENT_PREFIX;
669    const int port = tr_getPublicPort( msgs->handle );
670    struct evbuffer * outbuf;
671
672    if( msgs->clientSentLtepHandshake )
673        return;
674
675    outbuf = evbuffer_new( );
676    dbgmsg( msgs, "sending an ltep handshake" );
677    msgs->clientSentLtepHandshake = 1;
678
679    /* decide if we want to advertise pex support */
680    if( !tr_torrentIsPexEnabled( msgs->torrent ) )
681        pex = 0;
682    else if( msgs->peerSentLtepHandshake )
683        pex = msgs->peerSupportsPex ? 1 : 0;
684    else
685        pex = 1;
686
687    tr_bencInit( &val, TYPE_DICT );
688    tr_bencDictReserve( &val, 4 );
689    tr_bencInitInt( tr_bencDictAdd( &val, "e" ), 1 );
690    m  = tr_bencDictAdd( &val, "m" );
691    tr_bencInit( m, TYPE_DICT );
692    if( pex ) {
693        tr_bencDictReserve( m, 1 );
694        tr_bencInitInt( tr_bencDictAdd( m, "ut_pex" ), OUR_LTEP_PEX );
695    }
696    if( port > 0 )
697        tr_bencInitInt( tr_bencDictAdd( &val, "p" ), port );
698    tr_bencInitStr( tr_bencDictAdd( &val, "v" ), v, 0, 1 );
699    buf = tr_bencSave( &val, &len );
700
701    tr_peerIoWriteUint32( msgs->io, outbuf, 2*sizeof(uint8_t) + len );
702    tr_peerIoWriteUint8 ( msgs->io, outbuf, BT_LTEP );
703    tr_peerIoWriteUint8 ( msgs->io, outbuf, LTEP_HANDSHAKE );
704    tr_peerIoWriteBytes ( msgs->io, outbuf, buf, len );
705
706    tr_peerIoWriteBuf( msgs->io, outbuf );
707
708#if 0
709    dbgmsg( msgs, "here is the ltep handshake we sent:" );
710    tr_bencPrint( &val );
711#endif
712
713    /* cleanup */
714    tr_bencFree( &val );
715    tr_free( buf );
716    evbuffer_free( outbuf );
717}
718
719static void
720parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
721{
722    benc_val_t val, * sub;
723    uint8_t * tmp = tr_new( uint8_t, len );
724
725    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
726    msgs->peerSentLtepHandshake = 1;
727
728    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
729        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
730        tr_free( tmp );
731        return;
732    }
733
734#if 0
735    dbgmsg( msgs, "here is the ltep handshake we read:" );
736    tr_bencPrint( &val );
737#endif
738
739    /* does the peer prefer encrypted connections? */
740    sub = tr_bencDictFind( &val, "e" );
741    if( tr_bencIsInt( sub ) )
742        msgs->info->encryption_preference = sub->val.i
743                                      ? ENCRYPTION_PREFERENCE_YES
744                                      : ENCRYPTION_PREFERENCE_NO;
745
746    /* check supported messages for utorrent pex */
747    sub = tr_bencDictFind( &val, "m" );
748    if( tr_bencIsDict( sub ) ) {
749        sub = tr_bencDictFind( sub, "ut_pex" );
750        if( tr_bencIsInt( sub ) ) {
751            msgs->peerSupportsPex = 1;
752            msgs->ut_pex_id = (uint8_t) sub->val.i;
753            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
754        }
755    }
756
757    /* get peer's listening port */
758    sub = tr_bencDictFind( &val, "p" );
759    if( tr_bencIsInt( sub ) ) {
760        msgs->info->port = htons( (uint16_t)sub->val.i );
761        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
762    }
763
764    tr_bencFree( &val );
765    tr_free( tmp );
766}
767
768static void
769parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
770{
771    benc_val_t val, * sub;
772    uint8_t * tmp;
773
774    if( !tr_torrentIsPexEnabled( msgs->torrent ) ) /* no sharing! */
775        return;
776
777    tmp = tr_new( uint8_t, msglen );
778    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
779
780    if( tr_bencLoad( tmp, msglen, &val, NULL ) || !tr_bencIsDict( &val ) ) {
781        dbgmsg( msgs, "GET can't read extended-pex dictionary" );
782        tr_free( tmp );
783        return;
784    }
785
786    sub = tr_bencDictFind( &val, "added" );
787    if( tr_bencIsStr(sub) && ((sub->val.s.i % 6) == 0)) {
788        const int n = sub->val.s.i / 6 ;
789        dbgmsg( msgs, "got %d peers from uT pex", n );
790        tr_peerMgrAddPeers( msgs->handle->peerMgr,
791                            msgs->torrent->info.hash,
792                            TR_PEER_FROM_PEX,
793                            (uint8_t*)sub->val.s.s, n );
794    }
795
796    tr_bencFree( &val );
797    tr_free( tmp );
798}
799
800static void
801sendPex( tr_peermsgs * msgs );
802
803static void
804parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
805{
806    uint8_t ltep_msgid;
807
808    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
809    msglen--;
810
811    if( ltep_msgid == LTEP_HANDSHAKE )
812    {
813        dbgmsg( msgs, "got ltep handshake" );
814        parseLtepHandshake( msgs, msglen, inbuf );
815        sendLtepHandshake( msgs );
816        sendPex( msgs );
817    }
818    else if( ltep_msgid == msgs->ut_pex_id )
819    {
820        dbgmsg( msgs, "got ut pex" );
821        msgs->peerSupportsPex = 1;
822        parseUtPex( msgs, msglen, inbuf );
823    }
824    else
825    {
826        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
827        evbuffer_drain( inbuf, msglen );
828    }
829}
830
831static int
832readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf )
833{
834    uint32_t len;
835    const size_t needlen = sizeof(uint32_t);
836
837    if( EVBUFFER_LENGTH(inbuf) < needlen )
838        return READ_MORE;
839
840    tr_peerIoReadUint32( msgs->io, inbuf, &len );
841
842    if( len == 0 ) /* peer sent us a keepalive message */
843        dbgmsg( msgs, "got KeepAlive" );
844    else {
845        msgs->incomingMessageLength = len;
846        msgs->state = AWAITING_BT_MESSAGE;
847    }
848
849    return READ_AGAIN;
850}
851
852static void
853updatePeerProgress( tr_peermsgs * msgs )
854{
855    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
856    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
857    updateInterest( msgs );
858    firePeerProgress( msgs );
859}
860
861static int
862clientCanSendFastBlock( const tr_peermsgs * msgs UNUSED )
863{
864    /* FIXME(tiennou): base this on how many blocks we've already sent this
865     * peer, or maybe how many fast blocks per minute we've sent overall,
866     * or maybe how much bandwidth we're already using up w/o fast peers.
867     * I don't know what the Right Thing here is, but
868     * the previous measurement of how many pieces we have is not enough. */
869    return FALSE;
870}
871
872static void
873peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
874{
875    const int reqIsValid = requestIsValid( msgs, req );
876    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
877    const int peerIsChoked = msgs->info->peerIsChoked;
878    const int peerIsFast = tr_peerIoSupportsFEXT( msgs->io );
879    const int pieceIsFast = reqIsValid && tr_bitfieldHas( msgs->peerAllowedPieces, req->index );
880    const int canSendFast = clientCanSendFastBlock( msgs );
881
882    if( !reqIsValid ) /* bad request */
883    {
884        dbgmsg( msgs, "rejecting an invalid request." );
885        sendFastReject( msgs, req->index, req->offset, req->length );
886    }
887    else if( !clientHasPiece ) /* we don't have it */
888    {
889        dbgmsg( msgs, "rejecting request for a piece we don't have." );
890        sendFastReject( msgs, req->index, req->offset, req->length );
891    }
892    else if( peerIsChoked && !peerIsFast ) /* maybe he doesn't know he's choked? */
893    {
894        tr_peerMsgsSetChoke( msgs, 1 );
895        sendFastReject( msgs, req->index, req->offset, req->length );
896    }
897    else if( peerIsChoked && peerIsFast && ( !pieceIsFast || !canSendFast ) )
898    {
899        sendFastReject( msgs, req->index, req->offset, req->length );
900    }
901    else /* YAY */
902    {
903        struct peer_request * tmp = tr_new( struct peer_request, 1 );
904        *tmp = *req;
905        if( peerIsFast && pieceIsFast )
906            tr_list_append( &msgs->peerAskedForFast, tmp );
907        else
908            tr_list_append( &msgs->peerAskedFor, tmp );
909    }
910}
911
912static int
913messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
914{
915    switch( id )
916    {
917        case BT_CHOKE:
918        case BT_UNCHOKE:
919        case BT_INTERESTED:
920        case BT_NOT_INTERESTED:
921        case BT_HAVE_ALL:
922        case BT_HAVE_NONE:
923            return len==1;
924
925        case BT_HAVE:
926        case BT_SUGGEST:
927        case BT_ALLOWED_FAST:
928            return len==5;
929
930        case BT_BITFIELD:
931            return len == (msg->torrent->info.pieceCount+7u)/8u + 1u;
932       
933        case BT_REQUEST:
934        case BT_CANCEL:
935        case BT_REJECT:
936            return len==13;
937
938        case BT_PIECE:
939            return len>9 && len<=16393;
940
941        case BT_PORT:
942            return len==3;
943
944        case BT_LTEP:
945            return len >= 2;
946
947        default:
948            return FALSE;
949    }
950}
951
952
953static int
954readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf )
955{
956    uint8_t id;
957    uint32_t ui32;
958    uint32_t msglen = msgs->incomingMessageLength;
959
960    if( EVBUFFER_LENGTH(inbuf) < msglen )
961        return READ_MORE;
962
963    tr_peerIoReadUint8( msgs->io, inbuf, &id );
964    dbgmsg( msgs, "got BT id %d, len %d", (int)id, (int)msglen );
965
966    if( !messageLengthIsCorrect( msgs, id, msglen ) )
967    {
968        fireGotError( msgs );
969        return READ_DONE;
970    }
971
972    --msglen;
973
974    switch( id )
975    {
976        case BT_CHOKE:
977            dbgmsg( msgs, "got Choke" );
978            msgs->info->clientIsChoked = 1;
979            cancelAllRequestsToPeer( msgs );
980            cancelAllRequestsToClientExceptFast( msgs );
981            break;
982
983        case BT_UNCHOKE:
984            dbgmsg( msgs, "got Unchoke" );
985            msgs->info->clientIsChoked = 0;
986            fireNeedReq( msgs );
987            break;
988
989        case BT_INTERESTED:
990            dbgmsg( msgs, "got Interested" );
991            msgs->info->peerIsInterested = 1;
992            tr_peerMsgsSetChoke( msgs, 0 );
993            break;
994
995        case BT_NOT_INTERESTED:
996            dbgmsg( msgs, "got Not Interested" );
997            msgs->info->peerIsInterested = 0;
998            break;
999
1000        case BT_HAVE:
1001            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1002            dbgmsg( msgs, "got Have: %u", ui32 );
1003            tr_bitfieldAdd( msgs->info->have, ui32 );
1004            updatePeerProgress( msgs );
1005            tr_rcTransferred( msgs->torrent->swarmspeed, msgs->torrent->info.pieceSize );
1006            break;
1007
1008        case BT_BITFIELD: {
1009            const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
1010            dbgmsg( msgs, "got a bitfield" );
1011            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
1012            updatePeerProgress( msgs );
1013            tr_peerMsgsSetChoke( msgs, !clientIsSeed || (msgs->info->progress<1.0) );
1014            fireNeedReq( msgs );
1015            break;
1016        }
1017
1018        case BT_REQUEST: {
1019            struct peer_request req;
1020            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1021            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1022            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1023            dbgmsg( msgs, "got Request: %u:%u->%u", req.index, req.offset, req.length );
1024            peerMadeRequest( msgs, &req );
1025            break;
1026        }
1027
1028        case BT_CANCEL: {
1029            struct peer_request req;
1030            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1031            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1032            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1033            dbgmsg( msgs, "got a Cancel %u:%u->%u", req.index, req.offset, req.length );
1034            tr_free( tr_list_remove( &msgs->peerAskedForFast, &req, compareRequest ) );
1035            tr_free( tr_list_remove( &msgs->peerAskedFor, &req, compareRequest ) );
1036            break;
1037        }
1038
1039        case BT_PIECE: {
1040            dbgmsg( msgs, "got a Piece!" );
1041            assert( msgs->blockToUs.length == 0 );
1042            tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.index );
1043            tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.offset );
1044            msgs->blockToUs.length = msglen - 8;
1045            assert( EVBUFFER_LENGTH(msgs->inBlock) == 0 );
1046            msgs->state = msgs->blockToUs.length ? READING_BT_PIECE : AWAITING_BT_LENGTH;
1047            return READ_AGAIN;
1048            break;
1049        }
1050
1051        case BT_PORT: {
1052            dbgmsg( msgs, "Got a BT_PORT" );
1053            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1054            break;
1055        }
1056   
1057        case BT_SUGGEST: {
1058            /* FIXME(tiennou) */
1059            uint32_t index;
1060            tr_peerIoReadUint32( msgs->io, inbuf, &index );
1061            break;
1062        }
1063       
1064        case BT_HAVE_ALL:
1065            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1066            tr_bitfieldAddRange( msgs->info->have, 0, msgs->torrent->info.pieceCount );
1067            updatePeerProgress( msgs );
1068            break;
1069       
1070        case BT_HAVE_NONE:
1071            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1072            tr_bitfieldClear( msgs->info->have );
1073            updatePeerProgress( msgs );
1074            break;
1075       
1076        case BT_REJECT: {
1077            struct peer_request req;
1078            dbgmsg( msgs, "Got a BT_REJECT" );
1079            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1080            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1081            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1082            tr_free( tr_list_remove( &msgs->clientAskedFor, &req, compareRequest ) );
1083            break;
1084        }
1085       
1086        case BT_ALLOWED_FAST: {
1087            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1088            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1089            tr_bitfieldAdd( msgs->clientAllowedPieces, ui32 );
1090            break;
1091        }
1092       
1093        case BT_LTEP:
1094            dbgmsg( msgs, "Got a BT_LTEP" );
1095            parseLtep( msgs, msglen, inbuf );
1096            break;
1097
1098        default:
1099            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1100            tr_peerIoDrain( msgs->io, inbuf, msglen );
1101            break;
1102    }
1103
1104    msgs->incomingMessageLength = -1;
1105    msgs->state = AWAITING_BT_LENGTH;
1106    return READ_AGAIN;
1107}
1108
1109static void
1110clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1111{
1112    tr_torrent * tor = msgs->torrent;
1113    tor->activityDate = tr_date( );
1114    tor->downloadedCur += byteCount;
1115    msgs->info->pieceDataActivityDate = time( NULL );
1116    tr_rcTransferred( msgs->info->rcToClient, byteCount );
1117    tr_rcTransferred( tor->download, byteCount );
1118    tr_rcTransferred( tor->handle->download, byteCount );
1119}
1120
1121static void
1122peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1123{
1124    tr_torrent * tor = msgs->torrent;
1125    tor->activityDate = tr_date( );
1126    tor->uploadedCur += byteCount;
1127    msgs->info->pieceDataActivityDate = time( NULL );
1128    tr_rcTransferred( msgs->info->rcToPeer, byteCount );
1129    tr_rcTransferred( tor->upload, byteCount );
1130    tr_rcTransferred( tor->handle->upload, byteCount );
1131}
1132
1133static int
1134canDownload( const tr_peermsgs * msgs )
1135{
1136    tr_torrent * tor = msgs->torrent;
1137
1138    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1139        return !tor->handle->useDownloadLimit || tr_rcCanTransfer( tor->handle->download );
1140
1141    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
1142        return tr_rcCanTransfer( tor->download );
1143
1144    return TRUE;
1145}
1146
1147static void
1148reassignBytesToCorrupt( tr_peermsgs * msgs, uint32_t byteCount )
1149{
1150    tr_torrent * tor = msgs->torrent;
1151
1152    /* increment the `corrupt' field */
1153    tor->corruptCur += byteCount;
1154
1155    /* decrement the `downloaded' field */
1156    if( tor->downloadedCur >= byteCount )
1157        tor->downloadedCur -= byteCount;
1158    else
1159        tor->downloadedCur = 0;
1160}
1161
1162
1163static void
1164gotBadPiece( tr_peermsgs * msgs, uint32_t pieceIndex )
1165{
1166    const uint32_t byteCount = tr_torPieceCountBytes( msgs->torrent, (int)pieceIndex );
1167    reassignBytesToCorrupt( msgs, byteCount );
1168}
1169
1170static void
1171gotUnwantedBlock( tr_peermsgs * msgs,
1172                  uint32_t      index UNUSED,
1173                  uint32_t      offset UNUSED,
1174                  uint32_t      length )
1175{
1176    reassignBytesToCorrupt( msgs, length );
1177}
1178
1179static void
1180addUsToBlamefield( tr_peermsgs * msgs, uint32_t index )
1181{
1182    if( !msgs->info->blame )
1183         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1184    tr_bitfieldAdd( msgs->info->blame, index );
1185}
1186
1187static void
1188gotBlock( tr_peermsgs      * msgs,
1189          struct evbuffer  * inbuf,
1190          uint32_t           index,
1191          uint32_t           offset,
1192          uint32_t           length )
1193{
1194    tr_torrent * tor = msgs->torrent;
1195    const int block = _tr_block( tor, index, offset );
1196    struct peer_request key, *req;
1197
1198    /**
1199    *** Remove the block from our `we asked for this' list
1200    **/
1201
1202    key.index = index;
1203    key.offset = offset;
1204    key.length = length;
1205    req = (struct peer_request*) tr_list_remove( &msgs->clientAskedFor, &key,
1206                                                 compareRequest );
1207    if( req == NULL ) {
1208        gotUnwantedBlock( msgs, index, offset, length );
1209        dbgmsg( msgs, "we didn't ask for this message..." );
1210        return;
1211    }
1212    dbgmsg( msgs, "got block %u:%u->%u (turnaround time %d secs)", 
1213                     req->index, req->offset, req->length,
1214                     (int)(time(NULL) - req->time_requested) );
1215    tr_free( req );
1216    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1217                  tr_list_size(msgs->clientAskedFor));
1218
1219    /**
1220    *** Error checks
1221    **/
1222
1223    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1224        dbgmsg( msgs, "have this block already..." );
1225        tr_dbg( "have this block already..." );
1226        gotUnwantedBlock( msgs, index, offset, length );
1227        return;
1228    }
1229
1230    if( (int)length != tr_torBlockCountBytes( tor, block ) ) {
1231        dbgmsg( msgs, "block is the wrong length..." );
1232        tr_dbg( "block is the wrong length..." );
1233        gotUnwantedBlock( msgs, index, offset, length );
1234        return;
1235    }
1236
1237    /**
1238    ***  Write the block
1239    **/
1240
1241    if( tr_ioWrite( tor, index, offset, length, EVBUFFER_DATA( inbuf )))
1242        return;
1243
1244#warning this sanity check is here to help track down the excess corrupt data bug, but is expensive and should be removed before the next release
1245{
1246    uint8_t * tmp = tr_new( uint8_t, length );
1247    const int val = tr_ioRead( tor, index, offset, length, tmp );
1248    assert( !val );
1249    assert( !memcmp( tmp, EVBUFFER_DATA(inbuf), length ) );
1250    tr_free( tmp );
1251}
1252
1253    tr_cpBlockAdd( tor->completion, block );
1254
1255    addUsToBlamefield( msgs, index );
1256
1257    fireGotBlock( msgs, index, offset, length );
1258
1259    /**
1260    ***  Handle if this was the last block in the piece
1261    **/
1262
1263    if( tr_cpPieceIsComplete( tor->completion, index ) )
1264    {
1265        if( tr_ioHash( tor, index ) )
1266        {
1267            gotBadPiece( msgs, index );
1268            return;
1269        }
1270
1271        fireClientHave( msgs, index );
1272    }
1273}
1274
1275
1276static ReadState
1277readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf )
1278{
1279    uint32_t inlen;
1280    uint8_t * tmp;
1281
1282    assert( msgs != NULL );
1283    assert( msgs->blockToUs.length > 0 );
1284    assert( inbuf != NULL );
1285    assert( EVBUFFER_LENGTH( inbuf ) > 0 );
1286
1287    /* read from the inbuf into our block buffer */
1288    inlen = MIN( EVBUFFER_LENGTH(inbuf), msgs->blockToUs.length );
1289    tmp = tr_new( uint8_t, inlen );
1290    tr_peerIoReadBytes( msgs->io, inbuf, tmp, inlen );
1291    evbuffer_add( msgs->inBlock, tmp, inlen );
1292
1293    /* update our tables accordingly */
1294    assert( inlen >= msgs->blockToUs.length );
1295    msgs->blockToUs.length -= inlen;
1296    msgs->info->peerSentPieceDataAt = time( NULL );
1297    clientGotBytes( msgs, inlen );
1298
1299    /* if this was the entire block, save it */
1300    if( !msgs->blockToUs.length )
1301    {
1302        dbgmsg( msgs, "got block %u:%u", msgs->blockToUs.index, msgs->blockToUs.offset );
1303        assert( (int)EVBUFFER_LENGTH( msgs->inBlock ) == tr_torBlockCountBytes( msgs->torrent, _tr_block(msgs->torrent,msgs->blockToUs.index, msgs->blockToUs.offset) ) );
1304        gotBlock( msgs, msgs->inBlock,
1305                        msgs->blockToUs.index,
1306                        msgs->blockToUs.offset,
1307                        EVBUFFER_LENGTH( msgs->inBlock ) );
1308        evbuffer_drain( msgs->inBlock, ~0 );
1309        msgs->state = AWAITING_BT_LENGTH;
1310    }
1311
1312    /* cleanup */
1313    tr_free( tmp );
1314    return READ_AGAIN;
1315}
1316
1317static ReadState
1318canRead( struct bufferevent * evin, void * vmsgs )
1319{
1320    ReadState ret;
1321    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1322    struct evbuffer * inbuf = EVBUFFER_INPUT ( evin );
1323
1324    if( !canDownload( msgs ) )
1325    {
1326        msgs->notListening = 1;
1327        tr_peerIoSetIOMode ( msgs->io, 0, EV_READ );
1328        ret = READ_DONE;
1329    }
1330    else switch( msgs->state )
1331    {
1332        case AWAITING_BT_LENGTH:  ret = readBtLength  ( msgs, inbuf ); break;
1333        case AWAITING_BT_MESSAGE: ret = readBtMessage ( msgs, inbuf ); break;
1334        case READING_BT_PIECE:    ret = readBtPiece   ( msgs, inbuf ); break;
1335        default: assert( 0 );
1336    }
1337
1338    return ret;
1339}
1340
1341static void
1342sendKeepalive( tr_peermsgs * msgs )
1343{
1344    dbgmsg( msgs, "sending a keepalive message" );
1345    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1346}
1347
1348/**
1349***
1350**/
1351
1352static int
1353canWrite( const tr_peermsgs * msgs )
1354{
1355    /* don't let our outbuffer get too large */
1356    if( tr_peerIoWriteBytesWaiting( msgs->io ) > 4096 )
1357        return FALSE;
1358
1359    return TRUE;
1360}
1361
1362static size_t
1363getUploadMax( const tr_peermsgs * msgs )
1364{
1365    static const size_t maxval = ~0;
1366    const tr_torrent * tor = msgs->torrent;
1367
1368    if( !canWrite( msgs ) )
1369        return 0;
1370
1371    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1372        return tor->handle->useUploadLimit ? tr_rcBytesLeft( tor->handle->upload ) : maxval;
1373
1374    if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
1375        return tr_rcBytesLeft( tor->upload );
1376
1377    return maxval;
1378}
1379
1380static int
1381ratePulse( void * vmsgs )
1382{
1383    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1384    msgs->info->rateToClient = tr_rcRate( msgs->info->rcToClient );
1385    msgs->info->rateToPeer = tr_rcRate( msgs->info->rcToPeer );
1386    msgs->maxActiveRequests = MIN( 8 + (int)(msgs->info->rateToClient/10), 100 );
1387    msgs->minActiveRequests = msgs->maxActiveRequests / 2;
1388    return TRUE;
1389}
1390
1391static struct peer_request*
1392popNextRequest( tr_peermsgs * msgs )
1393{
1394    struct peer_request * ret;
1395    ret = tr_list_pop_front( &msgs->peerAskedForFast );
1396    if( !ret )
1397        ret = tr_list_pop_front( &msgs->peerAskedFor);
1398    return ret;
1399}
1400
1401static int
1402pulse( void * vmsgs )
1403{
1404    const time_t now = time( NULL );
1405    tr_peermsgs * msgs = vmsgs;
1406    struct peer_request * r;
1407    size_t len;
1408
1409    /* if we froze out a downloaded block because of speed limits,
1410       start listening to the peer again */
1411    if( msgs->notListening && canDownload( msgs ) )
1412    {
1413        msgs->notListening = 0;
1414        tr_peerIoSetIOMode ( msgs->io, EV_READ, 0 );
1415    }
1416
1417    pumpRequestQueue( msgs );
1418
1419    if( !canWrite( msgs ) )
1420    {
1421    }
1422    else if(( len = EVBUFFER_LENGTH( msgs->outBlock ) ))
1423    {
1424        const size_t uploadMax = getUploadMax( msgs );
1425        const size_t outlen = MIN( len, uploadMax );
1426        tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen );
1427        evbuffer_drain( msgs->outBlock, outlen );
1428        msgs->clientSentAnythingAt = now;
1429        peerGotBytes( msgs, outlen );
1430        len -= outlen;
1431        dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen, (int)len );
1432        fflush( stdout );
1433    }
1434    else if(( len = EVBUFFER_LENGTH( msgs->outMessages ) ))
1435    {
1436        tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1437        msgs->clientSentAnythingAt = now;
1438    }
1439    else if( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
1440    {
1441        sendKeepalive( msgs );
1442    }
1443
1444    if( !EVBUFFER_LENGTH( msgs->outBlock )
1445        && (( r = popNextRequest( msgs )))
1446        && requestIsValid( msgs, r )
1447        && tr_cpPieceIsComplete( msgs->torrent->completion, r->index ) )
1448    {
1449        uint8_t * buf = tr_new( uint8_t, r->length );
1450
1451        if( !tr_ioRead( msgs->torrent, r->index, r->offset, r->length, buf ) )
1452        {
1453            tr_peerIo * io = msgs->io;
1454            struct evbuffer * out = msgs->outBlock;
1455
1456            dbgmsg( msgs, "sending block %u:%u->%u", r->index, r->offset, r->length );
1457            tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 2*sizeof(uint32_t) + r->length );
1458            tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1459            tr_peerIoWriteUint32( io, out, r->index );
1460            tr_peerIoWriteUint32( io, out, r->offset );
1461            tr_peerIoWriteBytes ( io, out, buf, r->length );
1462        }
1463
1464        tr_free( buf );
1465        tr_free( r );
1466
1467        pulse( msgs ); /* start sending it right away */
1468    }
1469
1470    return TRUE; /* loop forever */
1471}
1472
1473static void
1474didWrite( struct bufferevent * evin UNUSED, void * vmsgs )
1475{
1476    pulse( vmsgs );
1477}
1478
1479static void
1480gotError( struct bufferevent * evbuf UNUSED, short what, void * vmsgs )
1481{
1482    dbgmsg( vmsgs, "libevent got an error! what=%d, errno=%d (%s)",
1483            (int)what, errno, strerror(errno) );
1484    fireGotError( vmsgs );
1485}
1486
1487static void
1488sendBitfield( tr_peermsgs * msgs )
1489{
1490    const tr_bitfield * bitfield = tr_cpPieceBitfield( msgs->torrent->completion );
1491    struct evbuffer * out = msgs->outMessages;
1492
1493    dbgmsg( msgs, "sending peer a bitfield message" );
1494    tr_peerIoWriteUint32( msgs->io, out, sizeof(uint8_t) + bitfield->len );
1495    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1496    tr_peerIoWriteBytes ( msgs->io, out, bitfield->bits, bitfield->len );
1497}
1498
1499/**
1500***
1501**/
1502
1503/* some peers give us error messages if we send
1504   more than this many peers in a single pex message */
1505#define MAX_PEX_DIFFS 200
1506
1507typedef struct
1508{
1509    tr_pex * added;
1510    tr_pex * dropped;
1511    tr_pex * elements;
1512    int addedCount;
1513    int droppedCount;
1514    int elementCount;
1515    int diffCount;
1516}
1517PexDiffs;
1518
1519static void
1520pexAddedCb( void * vpex, void * userData )
1521{
1522    PexDiffs * diffs = (PexDiffs *) userData;
1523    tr_pex * pex = (tr_pex *) vpex;
1524    if( diffs->diffCount < MAX_PEX_DIFFS )
1525    {
1526        diffs->diffCount++;
1527        diffs->added[diffs->addedCount++] = *pex;
1528        diffs->elements[diffs->elementCount++] = *pex;
1529    }
1530}
1531
1532static void
1533pexRemovedCb( void * vpex, void * userData )
1534{
1535    PexDiffs * diffs = (PexDiffs *) userData;
1536    tr_pex * pex = (tr_pex *) vpex;
1537    if( diffs->diffCount < MAX_PEX_DIFFS )
1538    {
1539        diffs->diffCount++;
1540        diffs->dropped[diffs->droppedCount++] = *pex;
1541    }
1542}
1543
1544static void
1545pexElementCb( void * vpex, void * userData )
1546{
1547    PexDiffs * diffs = (PexDiffs *) userData;
1548    tr_pex * pex = (tr_pex *) vpex;
1549    if( diffs->diffCount < MAX_PEX_DIFFS )
1550    {
1551        diffs->diffCount++;
1552        diffs->elements[diffs->elementCount++] = *pex;
1553    }
1554}
1555
1556static void
1557sendPex( tr_peermsgs * msgs )
1558{
1559    if( msgs->peerSupportsPex && tr_torrentIsPexEnabled( msgs->torrent ) )
1560    {
1561        int i;
1562        tr_pex * newPex = NULL;
1563        const int newCount = tr_peerMgrGetPeers( msgs->handle->peerMgr, msgs->torrent->info.hash, &newPex );
1564        PexDiffs diffs;
1565        benc_val_t val, *added, *dropped, *flags;
1566        uint8_t *tmp, *walk;
1567        char * benc;
1568        int bencLen;
1569
1570        /* build the diffs */
1571        diffs.added = tr_new( tr_pex, newCount );
1572        diffs.addedCount = 0;
1573        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1574        diffs.droppedCount = 0;
1575        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1576        diffs.elementCount = 0;
1577        diffs.diffCount = 0;
1578        tr_set_compare( msgs->pex, msgs->pexCount,
1579                        newPex, newCount,
1580                        tr_pexCompare, sizeof(tr_pex),
1581                        pexRemovedCb, pexAddedCb, pexElementCb, &diffs );
1582        dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1583
1584        /* update peer */
1585        tr_free( msgs->pex );
1586        msgs->pex = diffs.elements;
1587        msgs->pexCount = diffs.elementCount;
1588
1589        /* build the pex payload */
1590        tr_bencInit( &val, TYPE_DICT );
1591        tr_bencDictReserve( &val, 3 );
1592
1593        /* "added" */
1594        added = tr_bencDictAdd( &val, "added" );
1595        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1596        for( i=0; i<diffs.addedCount; ++i ) {
1597            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1598            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1599        }
1600        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1601        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
1602
1603        /* "added.f" */
1604        flags = tr_bencDictAdd( &val, "added.f" );
1605        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1606        for( i=0; i<diffs.addedCount; ++i )
1607            *walk++ = diffs.added[i].flags;
1608        assert( ( walk - tmp ) == diffs.addedCount );
1609        tr_bencInitStr( flags, tmp, walk-tmp, FALSE );
1610
1611        /* "dropped" */
1612        dropped = tr_bencDictAdd( &val, "dropped" );
1613        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1614        for( i=0; i<diffs.droppedCount; ++i ) {
1615            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1616            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1617        }
1618        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1619        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
1620
1621        /* write the pex message */
1622        benc = tr_bencSave( &val, &bencLen );
1623        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 2*sizeof(uint8_t) + bencLen );
1624        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_LTEP );
1625        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, OUR_LTEP_PEX );
1626        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, benc, bencLen );
1627
1628        /* cleanup */
1629        tr_free( benc );
1630        tr_bencFree( &val );
1631        tr_free( diffs.added );
1632        tr_free( diffs.dropped );
1633        tr_free( newPex );
1634
1635        msgs->clientSentPexAt = time( NULL );
1636    }
1637}
1638
1639static int
1640pexPulse( void * vpeer )
1641{
1642    sendPex( vpeer );
1643    return TRUE;
1644}
1645
1646/**
1647***
1648**/
1649
1650tr_peermsgs*
1651tr_peerMsgsNew( struct tr_torrent * torrent,
1652                struct tr_peer    * info,
1653                tr_delivery_func    func,
1654                void              * userData,
1655                tr_publisher_tag  * setme )
1656{
1657    tr_peermsgs * m;
1658
1659    assert( info != NULL );
1660    assert( info->io != NULL );
1661
1662    m = tr_new0( tr_peermsgs, 1 );
1663    m->publisher = tr_publisherNew( );
1664    m->info = info;
1665    m->handle = torrent->handle;
1666    m->torrent = torrent;
1667    m->io = info->io;
1668    m->info->clientIsChoked = 1;
1669    m->info->peerIsChoked = 1;
1670    m->info->clientIsInterested = 0;
1671    m->info->peerIsInterested = 0;
1672    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1673    m->pulseTimer = tr_timerNew( m->handle, pulse, m, PEER_PULSE_INTERVAL );
1674    m->rateTimer = tr_timerNew( m->handle, ratePulse, m, RATE_PULSE_INTERVAL );
1675    m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL );
1676    m->outMessages = evbuffer_new( );
1677    m->outBlock = evbuffer_new( );
1678    m->inBlock = evbuffer_new( );
1679    m->peerAllowedPieces = NULL;
1680    m->clientAllowedPieces = NULL;
1681    *setme = tr_publisherSubscribe( m->publisher, func, userData );
1682   
1683    if ( tr_peerIoSupportsFEXT( m->io ) )
1684    {
1685        /* This peer is fastpeer-enabled, generate its allowed set
1686         * (before registering our callbacks) */
1687        if ( !m->peerAllowedPieces ) {
1688            const struct in_addr *peerAddr = tr_peerIoGetAddress( m->io, NULL );
1689           
1690            m->peerAllowedPieces = tr_peerMgrGenerateAllowedSet( MAX_ALLOWED_SET_COUNT,
1691                                                                 m->torrent->info.pieceCount,
1692                                                                 m->torrent->info.hash,
1693                                                                 peerAddr );
1694        }
1695        m->clientAllowedPieces = tr_bitfieldNew( m->torrent->info.pieceCount );
1696    }
1697   
1698    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* error if we don't read or write for 2.5 minutes */
1699    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
1700    tr_peerIoSetIOMode( m->io, EV_READ|EV_WRITE, 0 );
1701    ratePulse( m );
1702
1703    if ( tr_peerIoSupportsLTEP( m->io ) )
1704        sendLtepHandshake( m );
1705
1706    if ( !tr_peerIoSupportsFEXT( m->io ) )
1707        sendBitfield( m );
1708    else {
1709        /* This peer is fastpeer-enabled, send it have-all or have-none if appropriate */
1710        uint32_t peerProgress;
1711        float completion = tr_cpPercentComplete( m->torrent->completion );
1712        if ( completion == 0.0f ) {
1713            sendFastHave( m, 0 );
1714        } else if ( completion == 1.0f ) {
1715            sendFastHave( m, 1 );
1716        } else {
1717            sendBitfield( m );
1718        }
1719        peerProgress = m->torrent->info.pieceCount * m->info->progress;
1720       
1721        if ( peerProgress < MAX_ALLOWED_SET_COUNT )
1722            sendFastAllowedSet( m );
1723    }
1724
1725    return m;
1726}
1727
1728void
1729tr_peerMsgsFree( tr_peermsgs* msgs )
1730{
1731    if( msgs != NULL )
1732    {
1733        tr_timerFree( &msgs->pulseTimer );
1734        tr_timerFree( &msgs->rateTimer );
1735        tr_timerFree( &msgs->pexTimer );
1736        tr_publisherFree( &msgs->publisher );
1737        tr_list_free( &msgs->clientWillAskFor, tr_free );
1738        tr_list_free( &msgs->clientAskedFor, tr_free );
1739        tr_list_free( &msgs->peerAskedForFast, tr_free );
1740        tr_list_free( &msgs->peerAskedFor, tr_free );
1741        evbuffer_free( msgs->outMessages );
1742        evbuffer_free( msgs->outBlock );
1743        evbuffer_free( msgs->inBlock );
1744        tr_free( msgs->pex );
1745        msgs->pexCount = 0;
1746        tr_free( msgs );
1747    }
1748}
1749
1750tr_publisher_tag
1751tr_peerMsgsSubscribe( tr_peermsgs       * peer,
1752                      tr_delivery_func    func,
1753                      void              * userData )
1754{
1755    return tr_publisherSubscribe( peer->publisher, func, userData );
1756}
1757
1758void
1759tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
1760                        tr_publisher_tag    tag )
1761{
1762    tr_publisherUnsubscribe( peer->publisher, tag );
1763}
1764
1765int
1766tr_peerMsgIsPieceFastAllowed( const tr_peermsgs * peer,
1767                              uint32_t            index )
1768{
1769    return tr_bitfieldHas( peer->clientAllowedPieces, index );
1770}
Note: See TracBrowser for help on using the repository browser.