source: trunk/libtransmission/peer-msgs.c @ 3849

Last change on this file since 3849 was 3849, checked in by charles, 15 years ago

try to tickle some more information out of John_Clay's bug report

  • Property svn:keywords set to Date Rev Author Id
File size: 53.2 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 3849 2007-11-17 07:48:51Z charles $
11 */
12
13#include <assert.h>
14#include <ctype.h>
15#include <errno.h>
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19#include <libgen.h> /* basename */
20
21#include <netinet/in.h> /* struct in_addr */
22
23#include <event.h>
24
25#include "transmission.h"
26#include "bencode.h"
27#include "completion.h"
28#include "inout.h"
29#include "list.h"
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-mgr-private.h"
33#include "peer-msgs.h"
34#include "ratecontrol.h"
35#include "trevent.h"
36#include "utils.h"
37
38/**
39***
40**/
41
42enum
43{
44    BT_CHOKE                = 0,
45    BT_UNCHOKE              = 1,
46    BT_INTERESTED           = 2,
47    BT_NOT_INTERESTED       = 3,
48    BT_HAVE                 = 4,
49    BT_BITFIELD             = 5,
50    BT_REQUEST              = 6,
51    BT_PIECE                = 7,
52    BT_CANCEL               = 8,
53    BT_PORT                 = 9,
54    BT_SUGGEST              = 13,
55    BT_HAVE_ALL             = 14,
56    BT_HAVE_NONE            = 15,
57    BT_REJECT               = 16,
58    BT_ALLOWED_FAST         = 17,
59    BT_LTEP                 = 20,
60
61    LTEP_HANDSHAKE          = 0,
62
63    OUR_LTEP_PEX            = 1,
64
65    MAX_REQUEST_BYTE_COUNT  = (16 * 1024), /* drop requests who want too much */
66
67    /* idle seconds before we send a keepalive */
68    KEEPALIVE_INTERVAL_SECS = 90,
69
70    PEX_INTERVAL            = (60 * 1000), /* msec between calls to sendPex() */
71    PEER_PULSE_INTERVAL     = (100),       /* msec between calls to pulse() */
72    RATE_PULSE_INTERVAL     = (333),       /* msec between calls to ratePulse() */
73     
74    /* Fast Peers Extension constants */
75    MAX_FAST_ALLOWED_COUNT   = 10,          /* max. number of pieces we fast-allow to another peer */
76    MAX_FAST_ALLOWED_THRESHOLD = 10,        /* max threshold for allowing fast-pieces requests */
77
78};
79
80enum
81{
82    AWAITING_BT_LENGTH,
83    AWAITING_BT_ID,
84    AWAITING_BT_MESSAGE,
85    AWAITING_BT_PIECE
86};
87
88struct peer_request
89{
90    uint32_t index;
91    uint32_t offset;
92    uint32_t length;
93    time_t time_requested;
94};
95
96static int
97compareRequest( const void * va, const void * vb )
98{
99    int i;
100    const struct peer_request * a = va;
101    const struct peer_request * b = vb;
102    if(( i = tr_compareUint32( a->index, b->index ))) return i;
103    if(( i = tr_compareUint32( a->offset, b->offset ))) return i;
104    if(( i = tr_compareUint32( a->length, b->length ))) return i;
105    return 0;
106}
107
108/* this is raw, unchanged data from the peer regarding
109 * the current message that it's sending us. */
110struct tr_incoming
111{
112    uint32_t length; /* includes the +1 for id length */
113    uint8_t id;
114    struct peer_request blockReq; /* metadata for incoming blocks */
115    struct evbuffer * block; /* piece data for incoming blocks */
116};
117
118struct tr_peermsgs
119{
120    tr_peer * info;
121
122    tr_handle * handle;
123    tr_torrent * torrent;
124    tr_peerIo * io;
125
126    tr_publisher_t * publisher;
127
128    struct evbuffer * outBlock;    /* buffer of all the current piece message */
129    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
130    tr_list * peerAskedFor;
131    tr_list * peerAskedForFast;
132    tr_list * clientAskedFor;
133    tr_list * clientWillAskFor;
134
135    tr_timer * rateTimer;
136    tr_timer * pulseTimer;
137    tr_timer * pexTimer;
138
139    time_t lastReqAddedAt;
140    time_t clientSentPexAt;
141    time_t clientSentAnythingAt;
142
143    unsigned int peerSupportsPex          : 1;
144    unsigned int clientSentLtepHandshake  : 1;
145    unsigned int peerSentLtepHandshake    : 1;
146   
147    tr_bitfield * clientAllowedPieces;
148    tr_bitfield * peerAllowedPieces;
149   
150    tr_bitfield * clientSuggestedPieces;
151   
152    uint8_t state;
153    uint8_t ut_pex_id;
154    uint16_t pexCount;
155    uint32_t maxActiveRequests;
156    uint32_t minActiveRequests;
157
158    struct tr_incoming incoming; 
159
160    tr_pex * pex;
161};
162
163/**
164***
165**/
166
167static void
168myDebug( const char * file, int line,
169         const struct tr_peermsgs * msgs,
170         const char * fmt, ... )
171{
172    FILE * fp = tr_getLog( );
173    if( fp != NULL )
174    {
175        va_list args;
176        char timestr[64];
177        struct evbuffer * buf = evbuffer_new( );
178        char * myfile = tr_strdup( file );
179
180        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
181                             tr_getLogTimeStr( timestr, sizeof(timestr) ),
182                             msgs->torrent->info.name,
183                             tr_peerIoGetAddrStr( msgs->io ),
184                             msgs->info->client );
185        va_start( args, fmt );
186        evbuffer_add_vprintf( buf, fmt, args );
187        va_end( args );
188        evbuffer_add_printf( buf, " (%s:%d)\n", basename(myfile), line );
189        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
190
191        tr_free( myfile );
192        evbuffer_free( buf );
193    }
194}
195
196#define dbgmsg(msgs, fmt...) myDebug(__FILE__, __LINE__, msgs, ##fmt )
197
198/**
199***
200**/
201
202static void
203protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
204{
205    tr_peerIo * io = msgs->io;
206    struct evbuffer * out = msgs->outMessages;
207
208    dbgmsg( msgs, "requesting %u:%u->%u", req->index, req->offset, req->length );
209    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
210    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
211    tr_peerIoWriteUint32( io, out, req->index );
212    tr_peerIoWriteUint32( io, out, req->offset );
213    tr_peerIoWriteUint32( io, out, req->length );
214}
215
216static void
217protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
218{
219    tr_peerIo * io = msgs->io;
220    struct evbuffer * out = msgs->outMessages;
221
222    dbgmsg( msgs, "cancelling %u:%u->%u", req->index, req->offset, req->length );
223    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
224    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
225    tr_peerIoWriteUint32( io, out, req->index );
226    tr_peerIoWriteUint32( io, out, req->offset );
227    tr_peerIoWriteUint32( io, out, req->length );
228}
229
230static void
231protocolSendHave( tr_peermsgs * msgs, uint32_t index )
232{
233    tr_peerIo * io = msgs->io;
234    struct evbuffer * out = msgs->outMessages;
235
236    dbgmsg( msgs, "sending Have %u", index );
237    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
238    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
239    tr_peerIoWriteUint32( io, out, index );
240}
241
242static void
243protocolSendChoke( tr_peermsgs * msgs, int choke )
244{
245    tr_peerIo * io = msgs->io;
246    struct evbuffer * out = msgs->outMessages;
247
248    dbgmsg( msgs, "sending %s", (choke ? "Choke" : "Unchoke") );
249    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) );
250    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
251}
252
253/**
254***  EVENTS
255**/
256
257static const tr_peermsgs_event blankEvent = { 0, 0, 0, 0, 0.0f };
258
259static void
260publish( tr_peermsgs * msgs, tr_peermsgs_event * e )
261{
262    tr_publisherPublish( msgs->publisher, msgs->info, e );
263}
264
265static void
266fireGotAssertError( tr_peermsgs * msgs )
267{
268    tr_peermsgs_event e = blankEvent;
269    e.eventType = TR_PEERMSG_GOT_ASSERT_ERROR;
270    publish( msgs, &e );
271}
272
273static void
274fireGotError( tr_peermsgs * msgs )
275{
276    tr_peermsgs_event e = blankEvent;
277    e.eventType = TR_PEERMSG_GOT_ERROR;
278    publish( msgs, &e );
279}
280
281static void
282fireNeedReq( tr_peermsgs * msgs )
283{
284    tr_peermsgs_event e = blankEvent;
285    e.eventType = TR_PEERMSG_NEED_REQ;
286    publish( msgs, &e );
287}
288
289static void
290firePeerProgress( tr_peermsgs * msgs )
291{
292    tr_peermsgs_event e = blankEvent;
293    e.eventType = TR_PEERMSG_PEER_PROGRESS;
294    e.progress = msgs->info->progress;
295    publish( msgs, &e );
296}
297
298static void
299fireClientHave( tr_peermsgs * msgs, uint32_t pieceIndex )
300{
301    tr_peermsgs_event e = blankEvent;
302    e.eventType = TR_PEERMSG_CLIENT_HAVE;
303    e.pieceIndex = pieceIndex;
304    publish( msgs, &e );
305}
306
307static void
308fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
309{
310    tr_peermsgs_event e = blankEvent;
311    e.eventType = TR_PEERMSG_CLIENT_BLOCK;
312    e.pieceIndex = req->index;
313    e.offset = req->offset;
314    e.length = req->length;
315    publish( msgs, &e );
316}
317
318static void
319fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
320{
321    tr_peermsgs_event e = blankEvent;
322    e.eventType = TR_PEERMSG_CANCEL;
323    e.pieceIndex = req->index;
324    e.offset = req->offset;
325    e.length = req->length;
326    publish( msgs, &e );
327}
328
329/**
330***  INTEREST
331**/
332
333static int
334isPieceInteresting( const tr_peermsgs   * peer,
335                    int                   piece )
336{
337    const tr_torrent * torrent = peer->torrent;
338    if( torrent->info.pieces[piece].dnd ) /* we don't want it */
339        return FALSE;
340    if( tr_cpPieceIsComplete( torrent->completion, piece ) ) /* we have it */
341        return FALSE;
342    if( !tr_bitfieldHas( peer->info->have, piece ) ) /* peer doesn't have it */
343        return FALSE;
344    if( tr_bitfieldHas( peer->info->banned, piece ) ) /* peer is banned */
345        return FALSE;
346    return TRUE;
347}
348
349/* "interested" means we'll ask for piece data if they unchoke us */
350static int
351isPeerInteresting( const tr_peermsgs * msgs )
352{
353    int i;
354    const tr_torrent * torrent;
355    const tr_bitfield * bitfield;
356    const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
357
358    if( clientIsSeed )
359        return FALSE;
360
361    torrent = msgs->torrent;
362    bitfield = tr_cpPieceBitfield( torrent->completion );
363
364    if( !msgs->info->have )
365        return TRUE;
366
367    assert( bitfield->len == msgs->info->have->len );
368    for( i=0; i<torrent->info.pieceCount; ++i )
369        if( isPieceInteresting( msgs, i ) )
370            return TRUE;
371
372    return FALSE;
373}
374
375static void
376sendInterest( tr_peermsgs * msgs, int weAreInterested )
377{
378    assert( msgs != NULL );
379    assert( weAreInterested==0 || weAreInterested==1 );
380
381    msgs->info->clientIsInterested = weAreInterested;
382    dbgmsg( msgs, "Sending %s",
383            weAreInterested ? "Interested" : "Not Interested");
384
385    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
386    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages,
387                   weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
388}
389
390static void
391updateInterest( tr_peermsgs * msgs )
392{
393    const int i = isPeerInteresting( msgs );
394    if( i != msgs->info->clientIsInterested )
395        sendInterest( msgs, i );
396    if( i )
397        fireNeedReq( msgs );
398}
399
400#define MIN_CHOKE_PERIOD_SEC 10
401
402static void
403cancelAllRequestsToClientExceptFast( tr_peermsgs * msgs )
404{
405    tr_list_free( &msgs->peerAskedFor, tr_free );
406}
407
408void
409tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
410{
411    const time_t fibrillationTime = time(NULL) - MIN_CHOKE_PERIOD_SEC;
412
413    assert( msgs != NULL );
414    assert( msgs->info != NULL );
415    assert( choke==0 || choke==1 );
416
417    if( msgs->info->chokeChangedAt > fibrillationTime )
418    {
419        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
420    }
421    else if( msgs->info->peerIsChoked != choke )
422    {
423        msgs->info->peerIsChoked = choke;
424        if( choke )
425            cancelAllRequestsToClientExceptFast( msgs );
426        protocolSendChoke( msgs, choke );
427        msgs->info->chokeChangedAt = time( NULL );
428    }
429}
430
431/**
432***
433**/
434
435void
436tr_peerMsgsHave( tr_peermsgs * msgs,
437                 uint32_t      index )
438{
439    protocolSendHave( msgs, index );
440
441    /* since we have more pieces now, we might not be interested in this peer */
442    updateInterest( msgs );
443}
444#if 0
445static void
446sendFastSuggest( tr_peermsgs * msgs,
447                 uint32_t      pieceIndex )
448{
449    assert( msgs != NULL );
450   
451    if( tr_peerIoSupportsFEXT( msgs->io ) )
452    {
453        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
454        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
455        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
456    }
457}
458#endif
459static void
460sendFastHave( tr_peermsgs * msgs, int all )
461{
462    assert( msgs != NULL );
463   
464    if( tr_peerIoSupportsFEXT( msgs->io ) )
465    {
466        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
467        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, ( all ? BT_HAVE_ALL
468                                                                : BT_HAVE_NONE ) );
469        updateInterest( msgs );
470    }
471}
472
473static void
474sendFastReject( tr_peermsgs * msgs,
475                uint32_t      pieceIndex,
476                uint32_t      offset,
477                uint32_t      length )
478{
479    assert( msgs != NULL );
480
481    if( tr_peerIoSupportsFEXT( msgs->io ) )
482    {
483        const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
484        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
485        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_REJECT );
486        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
487        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
488        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
489    }
490}
491
492static void
493sendFastAllowed( tr_peermsgs * msgs,
494                 uint32_t      pieceIndex)
495{
496    assert( msgs != NULL );
497   
498    if( tr_peerIoSupportsFEXT( msgs->io ) )
499    {
500        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
501        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_ALLOWED_FAST );
502        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
503    }
504}
505
506
507
508static void
509sendFastAllowedSet( tr_peermsgs * msgs )
510{
511    int i = 0;
512    while (i <= msgs->torrent->info.pieceCount )
513    {
514        if ( tr_bitfieldHas( msgs->peerAllowedPieces, i) )
515            sendFastAllowed( msgs, i );
516        i++;
517    }
518}
519
520
521/**
522***
523**/
524
525static int
526reqIsValid( const tr_peermsgs   * msgs,
527            uint32_t              index,
528            uint32_t              offset,
529            uint32_t              length )
530{
531    const tr_torrent * tor = msgs->torrent;
532
533    if( index >= (uint32_t) tor->info.pieceCount )
534        return FALSE;
535    if ( (int)offset >= tr_torPieceCountBytes( tor, (int)index ) )
536        return FALSE;
537    if( length > MAX_REQUEST_BYTE_COUNT )
538        return FALSE;
539    if( tr_pieceOffset( tor, index, offset, length ) > tor->info.totalSize )
540        return FALSE;
541
542    return TRUE;
543}
544
545static int
546requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
547{
548    return reqIsValid( msgs, req->index, req->offset, req->length );
549}
550
551static void
552pumpRequestQueue( tr_peermsgs * msgs )
553{
554    const int max = msgs->maxActiveRequests;
555    const int min = msgs->minActiveRequests;
556    int count = tr_list_size( msgs->clientAskedFor );
557    int sent = 0;
558
559    if( count > min )
560        return;
561    if( msgs->info->clientIsChoked )
562        return;
563
564    while( ( count < max ) && ( msgs->clientWillAskFor != NULL ) )
565    {
566        struct peer_request * r = tr_list_pop_front( &msgs->clientWillAskFor );
567        protocolSendRequest( msgs, r );
568        r->time_requested = msgs->lastReqAddedAt = time( NULL );
569        tr_list_append( &msgs->clientAskedFor, r );
570        ++count;
571        ++sent;
572    }
573
574    if( sent )
575        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
576                sent,
577                tr_list_size(msgs->clientAskedFor),
578                tr_list_size(msgs->clientWillAskFor) );
579
580    if( count < max )
581        fireNeedReq( msgs );
582}
583
584static int
585pulse( void * vmsgs );
586
587int
588tr_peerMsgsAddRequest( tr_peermsgs * msgs,
589                       uint32_t      index, 
590                       uint32_t      offset, 
591                       uint32_t      length )
592{
593    const int req_max = msgs->maxActiveRequests;
594    struct peer_request tmp, *req;
595
596    assert( msgs != NULL );
597    assert( msgs->torrent != NULL );
598    assert( reqIsValid( msgs, index, offset, length ) );
599
600    /**
601    ***  Reasons to decline the request
602    **/
603
604    /* don't send requests to choked clients */
605    if( msgs->info->clientIsChoked ) {
606        dbgmsg( msgs, "declining request because they're choking us" );
607        return TR_ADDREQ_CLIENT_CHOKED;
608    }
609
610    /* peer doesn't have this piece */
611    if( !tr_bitfieldHas( msgs->info->have, index ) )
612        return TR_ADDREQ_MISSING;
613
614    /* peer's queue is full */
615    if( tr_list_size( msgs->clientWillAskFor ) >= req_max ) {
616        dbgmsg( msgs, "declining request because we're full" );
617        return TR_ADDREQ_FULL;
618    }
619
620    /* have we already asked for this piece? */
621    tmp.index = index;
622    tmp.offset = offset;
623    tmp.length = length;
624    if( tr_list_find( msgs->clientAskedFor, &tmp, compareRequest ) ) {
625        dbgmsg( msgs, "declining because it's a duplicate" );
626        return TR_ADDREQ_DUPLICATE;
627    }
628    if( tr_list_find( msgs->clientWillAskFor, &tmp, compareRequest ) ) {
629        dbgmsg( msgs, "declining because it's a duplicate" );
630        return TR_ADDREQ_DUPLICATE;
631    }
632
633    /**
634    ***  Accept this request
635    **/
636
637    dbgmsg( msgs, "added req for piece %d, offset %d", (int)index, (int)offset );
638    req = tr_new0( struct peer_request, 1 );
639    *req = tmp;
640    tr_list_append( &msgs->clientWillAskFor, req );
641    pulse( msgs );
642    return TR_ADDREQ_OK;
643}
644
645static void
646cancelAllRequestsToPeer( tr_peermsgs * msgs )
647{
648    struct peer_request * req;
649
650    while(( req = tr_list_pop_front( &msgs->clientWillAskFor ) ))
651    {
652        fireCancelledReq( msgs, req );
653        tr_free( req );
654    }
655
656    while(( req = tr_list_pop_front( &msgs->clientAskedFor ) ))
657    {
658        fireCancelledReq( msgs, req );
659        protocolSendCancel( msgs, req );
660        tr_free( req );
661    }
662}
663
664void
665tr_peerMsgsCancel( tr_peermsgs * msgs,
666                   uint32_t      pieceIndex,
667                   uint32_t      offset,
668                   uint32_t      length )
669{
670    struct peer_request *req, tmp;
671
672    assert( msgs != NULL );
673    assert( length > 0 );
674
675    /* have we asked the peer for this piece? */
676    tmp.index = pieceIndex;
677    tmp.offset = offset;
678    tmp.length = length;
679
680    /* if it's only in the queue and hasn't been sent yet, free it */
681    if(( req = tr_list_remove( &msgs->clientWillAskFor, &tmp, compareRequest ) ))
682    {
683        fireCancelledReq( msgs, req );
684        tr_free( req );
685    }
686
687    /* if it's already been sent, send a cancel message too */
688    if(( req = tr_list_remove( &msgs->clientAskedFor, &tmp, compareRequest ) ))
689    {
690        protocolSendCancel( msgs, req );
691        fireCancelledReq( msgs, req );
692        tr_free( req );
693    }
694}
695
696/**
697***
698**/
699
700static void
701sendLtepHandshake( tr_peermsgs * msgs )
702{
703    benc_val_t val, *m;
704    char * buf;
705    int len;
706    int pex;
707    const char * v = TR_NAME " " USERAGENT_PREFIX;
708    const int port = tr_getPublicPort( msgs->handle );
709    struct evbuffer * outbuf;
710
711    if( msgs->clientSentLtepHandshake )
712        return;
713
714    outbuf = evbuffer_new( );
715    dbgmsg( msgs, "sending an ltep handshake" );
716    msgs->clientSentLtepHandshake = 1;
717
718    /* decide if we want to advertise pex support */
719    if( !tr_torrentIsPexEnabled( msgs->torrent ) )
720        pex = 0;
721    else if( msgs->peerSentLtepHandshake )
722        pex = msgs->peerSupportsPex ? 1 : 0;
723    else
724        pex = 1;
725
726    tr_bencInit( &val, TYPE_DICT );
727    tr_bencDictReserve( &val, 4 );
728    tr_bencInitInt( tr_bencDictAdd( &val, "e" ), 1 );
729    m  = tr_bencDictAdd( &val, "m" );
730    tr_bencInit( m, TYPE_DICT );
731    if( pex ) {
732        tr_bencDictReserve( m, 1 );
733        tr_bencInitInt( tr_bencDictAdd( m, "ut_pex" ), OUR_LTEP_PEX );
734    }
735    if( port > 0 )
736        tr_bencInitInt( tr_bencDictAdd( &val, "p" ), port );
737    tr_bencInitStr( tr_bencDictAdd( &val, "v" ), v, 0, 1 );
738    buf = tr_bencSave( &val, &len );
739
740    tr_peerIoWriteUint32( msgs->io, outbuf, 2*sizeof(uint8_t) + len );
741    tr_peerIoWriteUint8 ( msgs->io, outbuf, BT_LTEP );
742    tr_peerIoWriteUint8 ( msgs->io, outbuf, LTEP_HANDSHAKE );
743    tr_peerIoWriteBytes ( msgs->io, outbuf, buf, len );
744
745    tr_peerIoWriteBuf( msgs->io, outbuf );
746
747#if 0
748    dbgmsg( msgs, "here is the ltep handshake we sent:" );
749    tr_bencPrint( &val );
750    dbgmsg( msgs, "here is the ltep handshake we read [%s]:", tr_bencSave( &val, NULL ) );
751#endif
752
753    /* cleanup */
754    tr_bencFree( &val );
755    tr_free( buf );
756    evbuffer_free( outbuf );
757}
758
759static void
760parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
761{
762    benc_val_t val, * sub;
763    uint8_t * tmp = tr_new( uint8_t, len );
764
765    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
766    msgs->peerSentLtepHandshake = 1;
767
768    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
769        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
770        tr_free( tmp );
771        return;
772    }
773
774#if 0
775    dbgmsg( msgs, "here is the ltep handshake we read:" );
776    tr_bencPrint( &val );
777    dbgmsg( msgs, "here is the ltep handshake we read [%s]:", tr_bencSave( &val, NULL ) );
778#endif
779
780    /* does the peer prefer encrypted connections? */
781    sub = tr_bencDictFind( &val, "e" );
782    if( tr_bencIsInt( sub ) )
783        msgs->info->encryption_preference = sub->val.i
784                                      ? ENCRYPTION_PREFERENCE_YES
785                                      : ENCRYPTION_PREFERENCE_NO;
786
787    /* check supported messages for utorrent pex */
788    sub = tr_bencDictFind( &val, "m" );
789    if( tr_bencIsDict( sub ) ) {
790        sub = tr_bencDictFind( sub, "ut_pex" );
791        if( tr_bencIsInt( sub ) ) {
792            msgs->peerSupportsPex = 1;
793            msgs->ut_pex_id = (uint8_t) sub->val.i;
794            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
795        }
796    }
797
798    /* get peer's listening port */
799    sub = tr_bencDictFind( &val, "p" );
800    if( tr_bencIsInt( sub ) ) {
801        msgs->info->port = htons( (uint16_t)sub->val.i );
802        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
803    }
804
805    tr_bencFree( &val );
806    tr_free( tmp );
807}
808
809static void
810parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
811{
812    benc_val_t val, * sub;
813    uint8_t * tmp;
814
815    if( !tr_torrentIsPexEnabled( msgs->torrent ) ) /* no sharing! */
816        return;
817
818    tmp = tr_new( uint8_t, msglen );
819    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
820
821    if( tr_bencLoad( tmp, msglen, &val, NULL ) || !tr_bencIsDict( &val ) ) {
822        dbgmsg( msgs, "GET can't read extended-pex dictionary" );
823        tr_free( tmp );
824        return;
825    }
826
827    sub = tr_bencDictFind( &val, "added" );
828    if( tr_bencIsStr(sub) && ((sub->val.s.i % 6) == 0)) {
829        const int n = sub->val.s.i / 6 ;
830        dbgmsg( msgs, "got %d peers from uT pex", n );
831        tr_peerMgrAddPeers( msgs->handle->peerMgr,
832                            msgs->torrent->info.hash,
833                            TR_PEER_FROM_PEX,
834                            (uint8_t*)sub->val.s.s, n );
835    }
836
837    tr_bencFree( &val );
838    tr_free( tmp );
839}
840
841static void
842sendPex( tr_peermsgs * msgs );
843
844static void
845parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
846{
847    uint8_t ltep_msgid;
848
849    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
850    msglen--;
851
852    if( ltep_msgid == LTEP_HANDSHAKE )
853    {
854        dbgmsg( msgs, "got ltep handshake" );
855        parseLtepHandshake( msgs, msglen, inbuf );
856        if( tr_peerIoSupportsLTEP( msgs->io ) )
857        {
858            sendLtepHandshake( msgs );
859            sendPex( msgs );
860        }
861    }
862    else if( ltep_msgid == msgs->ut_pex_id )
863    {
864        dbgmsg( msgs, "got ut pex" );
865        msgs->peerSupportsPex = 1;
866        parseUtPex( msgs, msglen, inbuf );
867    }
868    else
869    {
870        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
871        evbuffer_drain( inbuf, msglen );
872    }
873}
874
875static int
876readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
877{
878    uint32_t len;
879
880    if( inlen < sizeof(len) )
881        return READ_MORE;
882
883    tr_peerIoReadUint32( msgs->io, inbuf, &len );
884
885    if( len == 0 ) /* peer sent us a keepalive message */
886        dbgmsg( msgs, "got KeepAlive" );
887    else {
888        msgs->incoming.length = len;
889        msgs->state = AWAITING_BT_ID;
890    }
891dbgmsg( msgs, "readBtLength: got a length of %d, msgs->state is now %d", (int)len, (int)msgs->state );
892
893    return READ_AGAIN;
894}
895
896static int
897readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen );
898
899static int
900readBtId( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
901{
902    uint8_t id;
903
904    if( inlen < sizeof(uint8_t) )
905        return READ_MORE;
906
907    tr_peerIoReadUint8( msgs->io, inbuf, &id );
908    msgs->incoming.id = id;
909
910    if( id==BT_PIECE )
911    {
912        msgs->state = AWAITING_BT_PIECE;
913        return READ_AGAIN;
914    }
915    else if( msgs->incoming.length != 1 )
916    {
917        msgs->state = AWAITING_BT_MESSAGE;
918        return READ_AGAIN;
919    }
920    else return readBtMessage( msgs, inbuf, inlen-1 );
921}
922
923static void
924updatePeerProgress( tr_peermsgs * msgs )
925{
926    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
927    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
928    updateInterest( msgs );
929    firePeerProgress( msgs );
930}
931
932static int
933clientCanSendFastBlock( const tr_peermsgs * msgs UNUSED )
934{
935    /* We can't send a fast piece if a peer has more than MAX_FAST_ALLOWED_THRESHOLD pieces */
936    if( tr_bitfieldCountTrueBits( msgs->info->have ) > MAX_FAST_ALLOWED_THRESHOLD )
937        return FALSE;
938   
939    /* ...or if we don't have ourself enough pieces */
940    if( tr_bitfieldCountTrueBits( tr_cpPieceBitfield( msgs->torrent->completion ) ) < MAX_FAST_ALLOWED_THRESHOLD )
941        return FALSE;
942
943    /* Maybe a bandwidth limit ? */
944    return TRUE;
945}
946
947static void
948peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
949{
950    const int reqIsValid = requestIsValid( msgs, req );
951    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
952    const int peerIsChoked = msgs->info->peerIsChoked;
953    const int peerIsFast = tr_peerIoSupportsFEXT( msgs->io );
954    const int pieceIsFast = reqIsValid && tr_bitfieldHas( msgs->peerAllowedPieces, req->index );
955    const int canSendFast = clientCanSendFastBlock( msgs );
956
957    if( !reqIsValid ) /* bad request */
958    {
959        dbgmsg( msgs, "rejecting an invalid request." );
960        sendFastReject( msgs, req->index, req->offset, req->length );
961    }
962    else if( !clientHasPiece ) /* we don't have it */
963    {
964        dbgmsg( msgs, "rejecting request for a piece we don't have." );
965        sendFastReject( msgs, req->index, req->offset, req->length );
966    }
967    else if( peerIsChoked && !peerIsFast ) /* doesn't he know he's choked? */
968    {
969        tr_peerMsgsSetChoke( msgs, 1 );
970        sendFastReject( msgs, req->index, req->offset, req->length );
971    }
972    else if( peerIsChoked && peerIsFast && ( !pieceIsFast || !canSendFast ) )
973    {
974        sendFastReject( msgs, req->index, req->offset, req->length );
975    }
976    else /* YAY */
977    {
978        struct peer_request * tmp = tr_new( struct peer_request, 1 );
979        *tmp = *req;
980        if( peerIsFast && pieceIsFast )
981            tr_list_append( &msgs->peerAskedForFast, tmp );
982        else
983            tr_list_append( &msgs->peerAskedFor, tmp );
984    }
985}
986
987static int
988messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
989{
990    switch( id )
991    {
992        case BT_CHOKE:
993        case BT_UNCHOKE:
994        case BT_INTERESTED:
995        case BT_NOT_INTERESTED:
996        case BT_HAVE_ALL:
997        case BT_HAVE_NONE:
998            return len==1;
999
1000        case BT_HAVE:
1001        case BT_SUGGEST:
1002        case BT_ALLOWED_FAST:
1003            return len==5;
1004
1005        case BT_BITFIELD:
1006            return len == (msg->torrent->info.pieceCount+7u)/8u + 1u;
1007       
1008        case BT_REQUEST:
1009        case BT_CANCEL:
1010        case BT_REJECT:
1011            return len==13;
1012
1013        case BT_PIECE:
1014            return len>9 && len<=16393;
1015
1016        case BT_PORT:
1017            return len==3;
1018
1019        case BT_LTEP:
1020            return len >= 2;
1021
1022        default:
1023            return FALSE;
1024    }
1025}
1026
1027static int
1028clientGotBlock( tr_peermsgs * msgs, const uint8_t * block, const struct peer_request * req );
1029
1030static int
1031readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1032{
1033    struct peer_request * req = &msgs->incoming.blockReq;
1034    dbgmsg( msgs, "In readBtPiece" );
1035
1036    if( !req->length )
1037    {
1038        if( inlen < 8 )
1039            return READ_MORE;
1040
1041        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
1042        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
1043        req->length = msgs->incoming.length - 9;
1044        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1045        return READ_AGAIN;
1046    }
1047    else
1048    {
1049        int err;
1050
1051        /* read in another chunk of data */
1052        const size_t nLeft = req->length - EVBUFFER_LENGTH(msgs->incoming.block);
1053        size_t n = MIN( nLeft, inlen );
1054        uint8_t * buf = tr_new( uint8_t, n );
1055        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
1056        evbuffer_add( msgs->incoming.block, buf, n );
1057        tr_free( buf );
1058        dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
1059               (int)n, req->index, req->offset, req->length,
1060               (int)( req->length - EVBUFFER_LENGTH(msgs->incoming.block) ) );
1061        if( EVBUFFER_LENGTH(msgs->incoming.block) < req->length )
1062            return READ_MORE;
1063
1064        /* we've got the whole block ... process it */
1065        err = clientGotBlock( msgs, EVBUFFER_DATA(msgs->incoming.block), req );
1066
1067        /* cleanup */
1068        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH(msgs->incoming.block) );
1069        req->length = 0;
1070        msgs->state = AWAITING_BT_LENGTH;
1071        if( !err )
1072            return READ_AGAIN;
1073        else {
1074            fireGotAssertError( msgs );
1075            return READ_DONE;
1076        }
1077    }
1078}
1079
1080static int
1081readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1082{
1083    uint32_t ui32;
1084    uint32_t msglen = msgs->incoming.length;
1085    const uint8_t id = msgs->incoming.id;
1086    const size_t startBufLen = EVBUFFER_LENGTH( inbuf );
1087
1088dbgmsg( msgs, "in readBtMessage" );
1089
1090    --msglen; // id length
1091
1092    if( inlen < msglen ) {
1093        dbgmsg( msgs, " too short!!! " );
1094        return READ_MORE;
1095    }
1096
1097    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id, (int)msglen, (int)inlen );
1098
1099    if( !messageLengthIsCorrect( msgs, id, msglen+1 ) )
1100    {
1101        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1102        fireGotError( msgs );
1103        return READ_DONE;
1104    }
1105
1106    switch( id )
1107    {
1108        case BT_CHOKE:
1109            dbgmsg( msgs, "got Choke" );
1110            msgs->info->clientIsChoked = 1;
1111            cancelAllRequestsToPeer( msgs );
1112            cancelAllRequestsToClientExceptFast( msgs );
1113            break;
1114
1115        case BT_UNCHOKE:
1116            dbgmsg( msgs, "got Unchoke" );
1117            msgs->info->clientIsChoked = 0;
1118            fireNeedReq( msgs );
1119            break;
1120
1121        case BT_INTERESTED:
1122            dbgmsg( msgs, "got Interested" );
1123            msgs->info->peerIsInterested = 1;
1124            tr_peerMsgsSetChoke( msgs, 0 );
1125            break;
1126
1127        case BT_NOT_INTERESTED:
1128            dbgmsg( msgs, "got Not Interested" );
1129            msgs->info->peerIsInterested = 0;
1130            break;
1131           
1132        case BT_HAVE:
1133            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1134            dbgmsg( msgs, "got Have: %u", ui32 );
1135            tr_bitfieldAdd( msgs->info->have, ui32 );
1136            /* If this is a fast-allowed piece for this peer, mark it as normal now */
1137            if( msgs->clientAllowedPieces != NULL && tr_bitfieldHas( msgs->clientAllowedPieces, ui32 ) )
1138                tr_bitfieldRem( msgs->clientAllowedPieces, ui32 );
1139            updatePeerProgress( msgs );
1140            tr_rcTransferred( msgs->torrent->swarmspeed, msgs->torrent->info.pieceSize );
1141            break;
1142
1143        case BT_BITFIELD: {
1144            const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
1145            dbgmsg( msgs, "got a bitfield" );
1146            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
1147            updatePeerProgress( msgs );
1148            tr_peerMsgsSetChoke( msgs, !clientIsSeed || (msgs->info->progress<1.0) );
1149            fireNeedReq( msgs );
1150            break;
1151        }
1152
1153        case BT_REQUEST: {
1154            struct peer_request req;
1155            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1156            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1157            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1158            dbgmsg( msgs, "got Request: %u:%u->%u", req.index, req.offset, req.length );
1159            peerMadeRequest( msgs, &req );
1160            break;
1161        }
1162
1163        case BT_CANCEL: {
1164            struct peer_request req;
1165            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1166            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1167            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1168            dbgmsg( msgs, "got a Cancel %u:%u->%u", req.index, req.offset, req.length );
1169            tr_free( tr_list_remove( &msgs->peerAskedForFast, &req, compareRequest ) );
1170            tr_free( tr_list_remove( &msgs->peerAskedFor, &req, compareRequest ) );
1171            break;
1172        }
1173
1174        case BT_PIECE:
1175            assert( 0 ); /* should be handled elsewhere! */
1176            break;
1177       
1178        case BT_PORT:
1179            dbgmsg( msgs, "Got a BT_PORT" );
1180            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1181            break;
1182           
1183        case BT_SUGGEST: {
1184            dbgmsg( msgs, "Got a BT_SUGGEST" );
1185            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1186            const tr_bitfield * bt = tr_cpPieceBitfield( msgs->torrent->completion );
1187            if( tr_bitfieldHas( bt, ui32 ) )
1188                tr_bitfieldAdd( msgs->clientSuggestedPieces, ui32 );
1189            break;
1190        }
1191           
1192        case BT_HAVE_ALL:
1193            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1194            tr_bitfieldAddRange( msgs->info->have, 0, msgs->torrent->info.pieceCount );
1195            updatePeerProgress( msgs );
1196            break;
1197       
1198       
1199        case BT_HAVE_NONE:
1200            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1201            tr_bitfieldClear( msgs->info->have );
1202            updatePeerProgress( msgs );
1203            break;
1204       
1205        case BT_REJECT: {
1206            struct peer_request req;
1207            dbgmsg( msgs, "Got a BT_REJECT" );
1208            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1209            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1210            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1211            tr_free( tr_list_remove( &msgs->clientAskedFor, &req, compareRequest ) );
1212            break;
1213        }
1214
1215        case BT_ALLOWED_FAST: {
1216            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1217            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1218            tr_bitfieldAdd( msgs->clientAllowedPieces, ui32 );
1219            break;
1220        }
1221
1222        case BT_LTEP:
1223            dbgmsg( msgs, "Got a BT_LTEP" );
1224            parseLtep( msgs, msglen, inbuf );
1225            break;
1226
1227        default:
1228            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1229            tr_peerIoDrain( msgs->io, inbuf, msglen );
1230            break;
1231    }
1232
1233    dbgmsg( msgs, "startBufLen was %d, msglen was %d, current inbuf len is %d", (int)startBufLen, (int)(msglen+1), (int)EVBUFFER_LENGTH(inbuf) );
1234
1235    assert( msglen + 1 == msgs->incoming.length );
1236    assert( EVBUFFER_LENGTH(inbuf) == startBufLen - msglen );
1237
1238    msgs->state = AWAITING_BT_LENGTH;
1239    return READ_AGAIN;
1240}
1241
1242static void
1243clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1244{
1245    tr_torrent * tor = msgs->torrent;
1246    tor->activityDate = tr_date( );
1247    tor->downloadedCur += byteCount;
1248    msgs->info->pieceDataActivityDate = time( NULL );
1249    tr_rcTransferred( msgs->info->rcToClient, byteCount );
1250    tr_rcTransferred( tor->download, byteCount );
1251    tr_rcTransferred( tor->handle->download, byteCount );
1252}
1253
1254static void
1255peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1256{
1257    tr_torrent * tor = msgs->torrent;
1258    tor->activityDate = tr_date( );
1259    tor->uploadedCur += byteCount;
1260    msgs->info->pieceDataActivityDate = time( NULL );
1261    tr_rcTransferred( msgs->info->rcToPeer, byteCount );
1262    tr_rcTransferred( tor->upload, byteCount );
1263    tr_rcTransferred( tor->handle->upload, byteCount );
1264}
1265
1266static size_t
1267getDownloadMax( const tr_peermsgs * msgs )
1268{
1269    static const size_t maxval = ~0;
1270    const tr_torrent * tor = msgs->torrent;
1271
1272    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1273        return tor->handle->useDownloadLimit
1274            ? tr_rcBytesLeft( tor->handle->download ) : maxval;
1275
1276    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
1277        return tr_rcBytesLeft( tor->download );
1278
1279    return maxval;
1280}
1281
1282static void
1283reassignBytesToCorrupt( tr_peermsgs * msgs, uint32_t byteCount )
1284{
1285    tr_torrent * tor = msgs->torrent;
1286
1287    /* increment the `corrupt' field */
1288    tor->corruptCur += byteCount;
1289
1290    /* decrement the `downloaded' field */
1291    if( tor->downloadedCur >= byteCount )
1292        tor->downloadedCur -= byteCount;
1293    else
1294        tor->downloadedCur = 0;
1295}
1296
1297
1298static void
1299gotBadPiece( tr_peermsgs * msgs, uint32_t pieceIndex )
1300{
1301    const uint32_t byteCount = tr_torPieceCountBytes( msgs->torrent, (int)pieceIndex );
1302    reassignBytesToCorrupt( msgs, byteCount );
1303}
1304
1305static void
1306clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1307{
1308    reassignBytesToCorrupt( msgs, req->length );
1309}
1310
1311static void
1312addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1313{
1314    if( !msgs->info->blame )
1315         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1316    tr_bitfieldAdd( msgs->info->blame, index );
1317}
1318
1319static int
1320clientGotBlock( tr_peermsgs * msgs, const uint8_t * data, const struct peer_request * req )
1321{
1322    int i;
1323    tr_torrent * tor = msgs->torrent;
1324    const int block = _tr_block( tor, req->index, req->offset );
1325    struct peer_request *myreq;
1326
1327    assert( msgs != NULL );
1328    assert( req != NULL );
1329
1330    if( req->length != (uint32_t)tr_torBlockCountBytes( msgs->torrent, block ) )
1331    {
1332        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1333                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1334        return TR_ERROR_ASSERT;
1335    }
1336
1337    /* save the block */
1338    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1339
1340    /**
1341    *** Remove the block from our `we asked for this' list
1342    **/
1343
1344    myreq = tr_list_remove( &msgs->clientAskedFor, req, compareRequest );
1345    if( myreq == NULL ) {
1346        clientGotUnwantedBlock( msgs, req );
1347        dbgmsg( msgs, "we didn't ask for this message..." );
1348        return 0;
1349    }
1350
1351    dbgmsg( msgs, "got block %u:%u->%u (turnaround time %d secs)", 
1352                  myreq->index, myreq->offset, myreq->length,
1353                  (int)(time(NULL) - myreq->time_requested) );
1354    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1355                  tr_list_size(msgs->clientAskedFor));
1356
1357    tr_free( myreq );
1358    myreq = NULL;
1359
1360    /**
1361    *** Error checks
1362    **/
1363
1364    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1365        dbgmsg( msgs, "we have this block already..." );
1366        clientGotUnwantedBlock( msgs, req );
1367        return 0;
1368    }
1369
1370    /**
1371    ***  Save the block
1372    **/
1373
1374    msgs->info->peerSentPieceDataAt = time( NULL );
1375    clientGotBytes( msgs, req->length );
1376    i = tr_ioWrite( tor, req->index, req->offset, req->length, data );
1377    if( i )
1378        return 0;
1379
1380    tr_cpBlockAdd( tor->completion, block );
1381
1382    addPeerToBlamefield( msgs, req->index );
1383
1384    fireGotBlock( msgs, req );
1385
1386    /**
1387    ***  Handle if this was the last block in the piece
1388    **/
1389
1390    if( tr_cpPieceIsComplete( tor->completion, req->index ) )
1391    {
1392        if( tr_ioHash( tor, req->index ) )
1393        {
1394            gotBadPiece( msgs, req->index );
1395            return 0;
1396        }
1397
1398        fireClientHave( msgs, req->index );
1399    }
1400
1401    return 0;
1402}
1403
1404static ReadState
1405canRead( struct bufferevent * evin, void * vmsgs )
1406{
1407    ReadState ret;
1408    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1409    struct evbuffer * inbuf = EVBUFFER_INPUT ( evin );
1410    const size_t buflen = EVBUFFER_LENGTH( inbuf );
1411    const size_t inlen = MIN( buflen, getDownloadMax( msgs ) );
1412
1413    if( !inlen )
1414    {
1415        ret = READ_DONE;
1416    }
1417    else switch( msgs->state )
1418    {
1419        case AWAITING_BT_LENGTH:  ret = readBtLength ( msgs, inbuf, inlen ); break;
1420        case AWAITING_BT_ID:      ret = readBtId     ( msgs, inbuf, inlen ); break;
1421        case AWAITING_BT_MESSAGE: ret = readBtMessage( msgs, inbuf, inlen ); break;
1422        case AWAITING_BT_PIECE:   ret = readBtPiece  ( msgs, inbuf, inlen ); break;
1423        default: assert( 0 );
1424    }
1425
1426    return ret;
1427}
1428
1429static void
1430sendKeepalive( tr_peermsgs * msgs )
1431{
1432    dbgmsg( msgs, "sending a keepalive message" );
1433    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1434}
1435
1436/**
1437***
1438**/
1439
1440static int
1441canWrite( const tr_peermsgs * msgs )
1442{
1443    /* don't let our outbuffer get too large */
1444    if( tr_peerIoWriteBytesWaiting( msgs->io ) > 4096 )
1445        return FALSE;
1446
1447    return TRUE;
1448}
1449
1450static size_t
1451getUploadMax( const tr_peermsgs * msgs )
1452{
1453    static const size_t maxval = ~0;
1454    const tr_torrent * tor = msgs->torrent;
1455
1456    if( !canWrite( msgs ) )
1457        return 0;
1458
1459    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1460        return tor->handle->useUploadLimit
1461            ? tr_rcBytesLeft( tor->handle->upload ) : maxval;
1462
1463    if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
1464        return tr_rcBytesLeft( tor->upload );
1465
1466    return maxval;
1467}
1468
1469static int
1470ratePulse( void * vmsgs )
1471{
1472    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1473    msgs->info->rateToClient = tr_rcRate( msgs->info->rcToClient );
1474    msgs->info->rateToPeer = tr_rcRate( msgs->info->rcToPeer );
1475    msgs->maxActiveRequests = MIN( 8 + (int)(msgs->info->rateToClient/5), 100 );
1476    msgs->minActiveRequests = msgs->maxActiveRequests / 3;
1477    return TRUE;
1478}
1479
1480static struct peer_request*
1481popNextRequest( tr_peermsgs * msgs )
1482{
1483    struct peer_request * ret;
1484    ret = tr_list_pop_front( &msgs->peerAskedForFast );
1485    if( !ret )
1486        ret = tr_list_pop_front( &msgs->peerAskedFor);
1487    return ret;
1488}
1489
1490static int
1491pulse( void * vmsgs )
1492{
1493    const time_t now = time( NULL );
1494    tr_peermsgs * msgs = vmsgs;
1495    struct peer_request * r;
1496    size_t len;
1497
1498    tr_peerIoTryRead( msgs->io );
1499
1500    pumpRequestQueue( msgs );
1501
1502    if( !canWrite( msgs ) )
1503    {
1504    }
1505    else if(( len = EVBUFFER_LENGTH( msgs->outBlock ) ))
1506    {
1507        const size_t uploadMax = getUploadMax( msgs );
1508        const size_t outlen = MIN( len, uploadMax );
1509        tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen );
1510        evbuffer_drain( msgs->outBlock, outlen );
1511        msgs->clientSentAnythingAt = now;
1512        peerGotBytes( msgs, outlen );
1513        len -= outlen;
1514        dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen, (int)len );
1515        fflush( stdout );
1516    }
1517    else if(( len = EVBUFFER_LENGTH( msgs->outMessages ) ))
1518    {
1519        tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1520        msgs->clientSentAnythingAt = now;
1521    }
1522    else if( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
1523    {
1524        sendKeepalive( msgs );
1525    }
1526
1527    if( !EVBUFFER_LENGTH( msgs->outBlock )
1528        && (( r = popNextRequest( msgs )))
1529        && requestIsValid( msgs, r )
1530        && tr_cpPieceIsComplete( msgs->torrent->completion, r->index ) )
1531    {
1532        uint8_t * buf = tr_new( uint8_t, r->length );
1533
1534        if( !tr_ioRead( msgs->torrent, r->index, r->offset, r->length, buf ) )
1535        {
1536            tr_peerIo * io = msgs->io;
1537            struct evbuffer * out = msgs->outBlock;
1538
1539            dbgmsg( msgs, "sending block %u:%u->%u", r->index, r->offset, r->length );
1540            tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 2*sizeof(uint32_t) + r->length );
1541            tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1542            tr_peerIoWriteUint32( io, out, r->index );
1543            tr_peerIoWriteUint32( io, out, r->offset );
1544            tr_peerIoWriteBytes ( io, out, buf, r->length );
1545        }
1546
1547        tr_free( buf );
1548        tr_free( r );
1549
1550        pulse( msgs ); /* start sending it right away */
1551    }
1552
1553    return TRUE; /* loop forever */
1554}
1555
1556static void
1557gotError( struct bufferevent * evbuf UNUSED, short what, void * vmsgs )
1558{
1559    dbgmsg( vmsgs, "libevent got an error! what=%d, errno=%d (%s)",
1560            (int)what, errno, strerror(errno) );
1561    fireGotError( vmsgs );
1562}
1563
1564static void
1565sendBitfield( tr_peermsgs * msgs )
1566{
1567    const tr_bitfield * bitfield = tr_cpPieceBitfield( msgs->torrent->completion );
1568    struct evbuffer * out = msgs->outMessages;
1569
1570    dbgmsg( msgs, "sending peer a bitfield message" );
1571    tr_peerIoWriteUint32( msgs->io, out, sizeof(uint8_t) + bitfield->len );
1572    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1573    tr_peerIoWriteBytes ( msgs->io, out, bitfield->bits, bitfield->len );
1574}
1575
1576/**
1577***
1578**/
1579
1580/* some peers give us error messages if we send
1581   more than this many peers in a single pex message */
1582#define MAX_PEX_DIFFS 200
1583
1584typedef struct
1585{
1586    tr_pex * added;
1587    tr_pex * dropped;
1588    tr_pex * elements;
1589    int addedCount;
1590    int droppedCount;
1591    int elementCount;
1592    int diffCount;
1593}
1594PexDiffs;
1595
1596static void
1597pexAddedCb( void * vpex, void * userData )
1598{
1599    PexDiffs * diffs = (PexDiffs *) userData;
1600    tr_pex * pex = (tr_pex *) vpex;
1601    if( diffs->diffCount < MAX_PEX_DIFFS )
1602    {
1603        diffs->diffCount++;
1604        diffs->added[diffs->addedCount++] = *pex;
1605        diffs->elements[diffs->elementCount++] = *pex;
1606    }
1607}
1608
1609static void
1610pexRemovedCb( void * vpex, void * userData )
1611{
1612    PexDiffs * diffs = (PexDiffs *) userData;
1613    tr_pex * pex = (tr_pex *) vpex;
1614    if( diffs->diffCount < MAX_PEX_DIFFS )
1615    {
1616        diffs->diffCount++;
1617        diffs->dropped[diffs->droppedCount++] = *pex;
1618    }
1619}
1620
1621static void
1622pexElementCb( void * vpex, void * userData )
1623{
1624    PexDiffs * diffs = (PexDiffs *) userData;
1625    tr_pex * pex = (tr_pex *) vpex;
1626    if( diffs->diffCount < MAX_PEX_DIFFS )
1627    {
1628        diffs->diffCount++;
1629        diffs->elements[diffs->elementCount++] = *pex;
1630    }
1631}
1632
1633static void
1634sendPex( tr_peermsgs * msgs )
1635{
1636    if( msgs->peerSupportsPex && tr_torrentIsPexEnabled( msgs->torrent ) )
1637    {
1638        int i;
1639        tr_pex * newPex = NULL;
1640        const int newCount = tr_peerMgrGetPeers( msgs->handle->peerMgr, msgs->torrent->info.hash, &newPex );
1641        PexDiffs diffs;
1642        benc_val_t val, *added, *dropped, *flags;
1643        uint8_t *tmp, *walk;
1644        char * benc;
1645        int bencLen;
1646
1647        /* build the diffs */
1648        diffs.added = tr_new( tr_pex, newCount );
1649        diffs.addedCount = 0;
1650        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1651        diffs.droppedCount = 0;
1652        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1653        diffs.elementCount = 0;
1654        diffs.diffCount = 0;
1655        tr_set_compare( msgs->pex, msgs->pexCount,
1656                        newPex, newCount,
1657                        tr_pexCompare, sizeof(tr_pex),
1658                        pexRemovedCb, pexAddedCb, pexElementCb, &diffs );
1659        dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1660
1661        /* update peer */
1662        tr_free( msgs->pex );
1663        msgs->pex = diffs.elements;
1664        msgs->pexCount = diffs.elementCount;
1665
1666        /* build the pex payload */
1667        tr_bencInit( &val, TYPE_DICT );
1668        tr_bencDictReserve( &val, 3 );
1669
1670        /* "added" */
1671        added = tr_bencDictAdd( &val, "added" );
1672        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1673        for( i=0; i<diffs.addedCount; ++i ) {
1674            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1675            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1676        }
1677        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1678        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
1679
1680        /* "added.f" */
1681        flags = tr_bencDictAdd( &val, "added.f" );
1682        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1683        for( i=0; i<diffs.addedCount; ++i )
1684            *walk++ = diffs.added[i].flags;
1685        assert( ( walk - tmp ) == diffs.addedCount );
1686        tr_bencInitStr( flags, tmp, walk-tmp, FALSE );
1687
1688        /* "dropped" */
1689        dropped = tr_bencDictAdd( &val, "dropped" );
1690        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1691        for( i=0; i<diffs.droppedCount; ++i ) {
1692            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1693            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1694        }
1695        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1696        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
1697
1698        /* write the pex message */
1699        benc = tr_bencSave( &val, &bencLen );
1700        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 2*sizeof(uint8_t) + bencLen );
1701        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_LTEP );
1702        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, OUR_LTEP_PEX );
1703        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, benc, bencLen );
1704
1705        /* cleanup */
1706        tr_free( benc );
1707        tr_bencFree( &val );
1708        tr_free( diffs.added );
1709        tr_free( diffs.dropped );
1710        tr_free( newPex );
1711
1712        msgs->clientSentPexAt = time( NULL );
1713    }
1714}
1715
1716static int
1717pexPulse( void * vpeer )
1718{
1719    sendPex( vpeer );
1720    return TRUE;
1721}
1722
1723/**
1724***
1725**/
1726
1727tr_peermsgs*
1728tr_peerMsgsNew( struct tr_torrent * torrent,
1729                struct tr_peer    * info,
1730                tr_delivery_func    func,
1731                void              * userData,
1732                tr_publisher_tag  * setme )
1733{
1734    tr_peermsgs * m;
1735
1736    assert( info != NULL );
1737    assert( info->io != NULL );
1738
1739    m = tr_new0( tr_peermsgs, 1 );
1740    m->publisher = tr_publisherNew( );
1741    m->info = info;
1742    m->handle = torrent->handle;
1743    m->torrent = torrent;
1744    m->io = info->io;
1745    m->info->clientIsChoked = 1;
1746    m->info->peerIsChoked = 1;
1747    m->info->clientIsInterested = 0;
1748    m->info->peerIsInterested = 0;
1749    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1750    m->state = AWAITING_BT_LENGTH;
1751    m->pulseTimer = tr_timerNew( m->handle, pulse, m, PEER_PULSE_INTERVAL );
1752    m->rateTimer = tr_timerNew( m->handle, ratePulse, m, RATE_PULSE_INTERVAL );
1753    m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL );
1754    m->outMessages = evbuffer_new( );
1755    m->incoming.block = evbuffer_new( );
1756    m->outBlock = evbuffer_new( );
1757    m->peerAllowedPieces = NULL;
1758    m->clientAllowedPieces = NULL;
1759    m->clientSuggestedPieces = NULL;
1760    *setme = tr_publisherSubscribe( m->publisher, func, userData );
1761   
1762    if ( tr_peerIoSupportsFEXT( m->io ) )
1763    {
1764        /* This peer is fastpeer-enabled, generate its allowed set
1765         * (before registering our callbacks) */
1766        if ( !m->peerAllowedPieces ) {
1767            const struct in_addr *peerAddr = tr_peerIoGetAddress( m->io, NULL );
1768           
1769            m->peerAllowedPieces = tr_peerMgrGenerateAllowedSet( MAX_FAST_ALLOWED_COUNT,
1770                                                                 m->torrent->info.pieceCount,
1771                                                                 m->torrent->info.hash,
1772                                                                 peerAddr );
1773        }
1774        m->clientAllowedPieces = tr_bitfieldNew( m->torrent->info.pieceCount );
1775       
1776        m->clientSuggestedPieces = tr_bitfieldNew( m->torrent->info.pieceCount );
1777    }
1778   
1779    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of inactivity */
1780    tr_peerIoSetIOFuncs( m->io, canRead, gotError, m );
1781    ratePulse( m );
1782
1783    if ( tr_peerIoSupportsLTEP( m->io ) )
1784        sendLtepHandshake( m );
1785
1786    if ( !tr_peerIoSupportsFEXT( m->io ) )
1787        sendBitfield( m );
1788    else {
1789        /* This peer is fastpeer-enabled, send it have-all or have-none if appropriate */
1790        uint32_t peerProgress;
1791        float completion = tr_cpPercentComplete( m->torrent->completion );
1792        if ( completion == 0.0f ) {
1793            sendFastHave( m, 0 );
1794        } else if ( completion == 1.0f ) {
1795            sendFastHave( m, 1 );
1796        } else {
1797            sendBitfield( m );
1798        }
1799        peerProgress = m->torrent->info.pieceCount * m->info->progress;
1800       
1801        if ( peerProgress < MAX_FAST_ALLOWED_COUNT )
1802            sendFastAllowedSet( m );
1803    }
1804
1805    return m;
1806}
1807
1808void
1809tr_peerMsgsFree( tr_peermsgs* msgs )
1810{
1811    if( msgs != NULL )
1812    {
1813        tr_timerFree( &msgs->pulseTimer );
1814        tr_timerFree( &msgs->rateTimer );
1815        tr_timerFree( &msgs->pexTimer );
1816        tr_publisherFree( &msgs->publisher );
1817        tr_list_free( &msgs->clientWillAskFor, tr_free );
1818        tr_list_free( &msgs->clientAskedFor, tr_free );
1819        tr_list_free( &msgs->peerAskedForFast, tr_free );
1820        tr_list_free( &msgs->peerAskedFor, tr_free );
1821        tr_bitfieldFree( msgs->peerAllowedPieces );
1822        tr_bitfieldFree( msgs->clientAllowedPieces );
1823        tr_bitfieldFree( msgs->clientSuggestedPieces );
1824        evbuffer_free( msgs->incoming.block );
1825        evbuffer_free( msgs->outMessages );
1826        evbuffer_free( msgs->outBlock );
1827        tr_free( msgs->pex );
1828
1829        memset( msgs, ~0, sizeof( tr_peermsgs ) );
1830        tr_free( msgs );
1831    }
1832}
1833
1834tr_publisher_tag
1835tr_peerMsgsSubscribe( tr_peermsgs       * peer,
1836                      tr_delivery_func    func,
1837                      void              * userData )
1838{
1839    return tr_publisherSubscribe( peer->publisher, func, userData );
1840}
1841
1842void
1843tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
1844                        tr_publisher_tag    tag )
1845{
1846    tr_publisherUnsubscribe( peer->publisher, tag );
1847}
1848
1849int
1850tr_peerMsgsIsPieceFastAllowed( const tr_peermsgs * peer,
1851                               uint32_t            index )
1852{
1853    return tr_bitfieldHas( peer->clientAllowedPieces, index );
1854}
1855
1856int
1857tr_peerMsgsIsPieceSuggested( const tr_peermsgs * peer,
1858                             uint32_t            index )
1859{
1860    return tr_bitfieldHas( peer->clientSuggestedPieces, index );
1861}
1862
Note: See TracBrowser for help on using the repository browser.