source: trunk/libtransmission/peer-msgs.c @ 3921

Last change on this file since 3921 was 3921, checked in by charles, 15 years ago

some progress on the overall statistics, though probably not visible to end users yet

  • Property svn:keywords set to Date Rev Author Id
File size: 55.0 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 3921 2007-11-21 20:03:53Z charles $
11 */
12
13#include <assert.h>
14#include <ctype.h>
15#include <errno.h>
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19#include <libgen.h> /* basename */
20
21#include <netinet/in.h> /* struct in_addr */
22
23#include <event.h>
24
25#include "transmission.h"
26#include "bencode.h"
27#include "completion.h"
28#include "inout.h"
29#include "list.h"
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-mgr-private.h"
33#include "peer-msgs.h"
34#include "ratecontrol.h"
35#include "stats.h"
36#include "trevent.h"
37#include "utils.h"
38
39/**
40***
41**/
42
43enum
44{
45    BT_CHOKE                = 0,
46    BT_UNCHOKE              = 1,
47    BT_INTERESTED           = 2,
48    BT_NOT_INTERESTED       = 3,
49    BT_HAVE                 = 4,
50    BT_BITFIELD             = 5,
51    BT_REQUEST              = 6,
52    BT_PIECE                = 7,
53    BT_CANCEL               = 8,
54    BT_PORT                 = 9,
55    BT_SUGGEST              = 13,
56    BT_HAVE_ALL             = 14,
57    BT_HAVE_NONE            = 15,
58    BT_REJECT               = 16,
59    BT_ALLOWED_FAST         = 17,
60    BT_LTEP                 = 20,
61
62    LTEP_HANDSHAKE          = 0,
63
64    OUR_LTEP_PEX            = 1,
65
66    MAX_REQUEST_BYTE_COUNT  = (16 * 1024), /* drop requests who want too much */
67
68    /* idle seconds before we send a keepalive */
69    KEEPALIVE_INTERVAL_SECS = 90,
70
71    PEX_INTERVAL            = (60 * 1000), /* msec between calls to sendPex() */
72    PEER_PULSE_INTERVAL     = (100),       /* msec between calls to pulse() */
73    RATE_PULSE_INTERVAL     = (250),       /* msec between calls to ratePulse() */
74
75    MAX_OUTBUF_SIZE         = 4096,
76     
77    /* Fast Peers Extension constants */
78    MAX_FAST_ALLOWED_COUNT   = 10,          /* max. number of pieces we fast-allow to another peer */
79    MAX_FAST_ALLOWED_THRESHOLD = 10,        /* max threshold for allowing fast-pieces requests */
80
81};
82
83enum
84{
85    AWAITING_BT_LENGTH,
86    AWAITING_BT_ID,
87    AWAITING_BT_MESSAGE,
88    AWAITING_BT_PIECE
89};
90
91struct peer_request
92{
93    uint32_t index;
94    uint32_t offset;
95    uint32_t length;
96    time_t time_requested;
97};
98
99static int
100compareRequest( const void * va, const void * vb )
101{
102    int i;
103    const struct peer_request * a = va;
104    const struct peer_request * b = vb;
105    if(( i = tr_compareUint32( a->index, b->index ))) return i;
106    if(( i = tr_compareUint32( a->offset, b->offset ))) return i;
107    if(( i = tr_compareUint32( a->length, b->length ))) return i;
108    return 0;
109}
110
111/* this is raw, unchanged data from the peer regarding
112 * the current message that it's sending us. */
113struct tr_incoming
114{
115    uint32_t length; /* includes the +1 for id length */
116    uint8_t id;
117    struct peer_request blockReq; /* metadata for incoming blocks */
118    struct evbuffer * block; /* piece data for incoming blocks */
119};
120
121struct tr_peermsgs
122{
123    tr_peer * info;
124
125    tr_handle * handle;
126    tr_torrent * torrent;
127    tr_peerIo * io;
128
129    tr_publisher_t * publisher;
130
131    struct evbuffer * outBlock;    /* buffer of all the current piece message */
132    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
133    tr_list * peerAskedFor;
134    tr_list * peerAskedForFast;
135    tr_list * clientAskedFor;
136    tr_list * clientWillAskFor;
137
138    tr_timer * rateTimer;
139    tr_timer * pulseTimer;
140    tr_timer * pexTimer;
141
142    time_t lastReqAddedAt;
143    time_t clientSentPexAt;
144    time_t clientSentAnythingAt;
145
146    unsigned int peerSentBitfield         : 1;
147    unsigned int peerSupportsPex          : 1;
148    unsigned int clientSentLtepHandshake  : 1;
149    unsigned int peerSentLtepHandshake    : 1;
150    unsigned int sendingBlock             : 1;
151   
152    tr_bitfield * clientAllowedPieces;
153    tr_bitfield * peerAllowedPieces;
154   
155    tr_bitfield * clientSuggestedPieces;
156   
157    uint8_t state;
158    uint8_t ut_pex_id;
159    uint16_t pexCount;
160    uint32_t maxActiveRequests;
161    uint32_t minActiveRequests;
162
163    struct tr_incoming incoming; 
164
165    tr_pex * pex;
166};
167
168/**
169***
170**/
171
172static void
173myDebug( const char * file, int line,
174         const struct tr_peermsgs * msgs,
175         const char * fmt, ... )
176{
177    FILE * fp = tr_getLog( );
178    if( fp != NULL )
179    {
180        va_list args;
181        char timestr[64];
182        struct evbuffer * buf = evbuffer_new( );
183        char * myfile = tr_strdup( file );
184
185        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
186                             tr_getLogTimeStr( timestr, sizeof(timestr) ),
187                             msgs->torrent->info.name,
188                             tr_peerIoGetAddrStr( msgs->io ),
189                             msgs->info->client );
190        va_start( args, fmt );
191        evbuffer_add_vprintf( buf, fmt, args );
192        va_end( args );
193        evbuffer_add_printf( buf, " (%s:%d)\n", basename(myfile), line );
194        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
195
196        tr_free( myfile );
197        evbuffer_free( buf );
198    }
199}
200
201#define dbgmsg(msgs, fmt...) myDebug(__FILE__, __LINE__, msgs, ##fmt )
202
203/**
204***
205**/
206
207static void
208protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
209{
210    tr_peerIo * io = msgs->io;
211    struct evbuffer * out = msgs->outMessages;
212
213    dbgmsg( msgs, "requesting %u:%u->%u", req->index, req->offset, req->length );
214    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
215    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
216    tr_peerIoWriteUint32( io, out, req->index );
217    tr_peerIoWriteUint32( io, out, req->offset );
218    tr_peerIoWriteUint32( io, out, req->length );
219}
220
221static void
222protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
223{
224    tr_peerIo * io = msgs->io;
225    struct evbuffer * out = msgs->outMessages;
226
227    dbgmsg( msgs, "cancelling %u:%u->%u", req->index, req->offset, req->length );
228    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
229    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
230    tr_peerIoWriteUint32( io, out, req->index );
231    tr_peerIoWriteUint32( io, out, req->offset );
232    tr_peerIoWriteUint32( io, out, req->length );
233}
234
235static void
236protocolSendHave( tr_peermsgs * msgs, uint32_t index )
237{
238    tr_peerIo * io = msgs->io;
239    struct evbuffer * out = msgs->outMessages;
240
241    dbgmsg( msgs, "sending Have %u", index );
242    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
243    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
244    tr_peerIoWriteUint32( io, out, index );
245}
246
247static void
248protocolSendChoke( tr_peermsgs * msgs, int choke )
249{
250    tr_peerIo * io = msgs->io;
251    struct evbuffer * out = msgs->outMessages;
252
253    dbgmsg( msgs, "sending %s", (choke ? "Choke" : "Unchoke") );
254    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) );
255    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
256}
257
258/**
259***  EVENTS
260**/
261
262static const tr_peermsgs_event blankEvent = { 0, 0, 0, 0, 0.0f };
263
264static void
265publish( tr_peermsgs * msgs, tr_peermsgs_event * e )
266{
267    tr_publisherPublish( msgs->publisher, msgs->info, e );
268}
269
270static void
271fireGotAssertError( tr_peermsgs * msgs )
272{
273    tr_peermsgs_event e = blankEvent;
274    e.eventType = TR_PEERMSG_GOT_ASSERT_ERROR;
275    publish( msgs, &e );
276}
277
278static void
279fireGotError( tr_peermsgs * msgs )
280{
281    tr_peermsgs_event e = blankEvent;
282    e.eventType = TR_PEERMSG_GOT_ERROR;
283    publish( msgs, &e );
284}
285
286static void
287fireNeedReq( tr_peermsgs * msgs )
288{
289    tr_peermsgs_event e = blankEvent;
290    e.eventType = TR_PEERMSG_NEED_REQ;
291    publish( msgs, &e );
292}
293
294static void
295firePeerProgress( tr_peermsgs * msgs )
296{
297    tr_peermsgs_event e = blankEvent;
298    e.eventType = TR_PEERMSG_PEER_PROGRESS;
299    e.progress = msgs->info->progress;
300    publish( msgs, &e );
301}
302
303static void
304fireClientHave( tr_peermsgs * msgs, uint32_t pieceIndex )
305{
306    tr_peermsgs_event e = blankEvent;
307    e.eventType = TR_PEERMSG_CLIENT_HAVE;
308    e.pieceIndex = pieceIndex;
309    publish( msgs, &e );
310}
311
312static void
313fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
314{
315    tr_peermsgs_event e = blankEvent;
316    e.eventType = TR_PEERMSG_CLIENT_BLOCK;
317    e.pieceIndex = req->index;
318    e.offset = req->offset;
319    e.length = req->length;
320    publish( msgs, &e );
321}
322
323static void
324fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
325{
326    tr_peermsgs_event e = blankEvent;
327    e.eventType = TR_PEERMSG_CANCEL;
328    e.pieceIndex = req->index;
329    e.offset = req->offset;
330    e.length = req->length;
331    publish( msgs, &e );
332}
333
334/**
335***  INTEREST
336**/
337
338static int
339isPieceInteresting( const tr_peermsgs   * peer,
340                    int                   piece )
341{
342    const tr_torrent * torrent = peer->torrent;
343    if( torrent->info.pieces[piece].dnd ) /* we don't want it */
344        return FALSE;
345    if( tr_cpPieceIsComplete( torrent->completion, piece ) ) /* we have it */
346        return FALSE;
347    if( !tr_bitfieldHas( peer->info->have, piece ) ) /* peer doesn't have it */
348        return FALSE;
349    if( tr_bitfieldHas( peer->info->banned, piece ) ) /* peer is banned */
350        return FALSE;
351    return TRUE;
352}
353
354/* "interested" means we'll ask for piece data if they unchoke us */
355static int
356isPeerInteresting( const tr_peermsgs * msgs )
357{
358    int i;
359    const tr_torrent * torrent;
360    const tr_bitfield * bitfield;
361    const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
362
363    if( clientIsSeed )
364        return FALSE;
365
366    torrent = msgs->torrent;
367    bitfield = tr_cpPieceBitfield( torrent->completion );
368
369    if( !msgs->info->have )
370        return TRUE;
371
372    assert( bitfield->len == msgs->info->have->len );
373    for( i=0; i<torrent->info.pieceCount; ++i )
374        if( isPieceInteresting( msgs, i ) )
375            return TRUE;
376
377    return FALSE;
378}
379
380static void
381sendInterest( tr_peermsgs * msgs, int weAreInterested )
382{
383    assert( msgs != NULL );
384    assert( weAreInterested==0 || weAreInterested==1 );
385
386    msgs->info->clientIsInterested = weAreInterested;
387    dbgmsg( msgs, "Sending %s",
388            weAreInterested ? "Interested" : "Not Interested");
389
390    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
391    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages,
392                   weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
393}
394
395static void
396updateInterest( tr_peermsgs * msgs )
397{
398    const int i = isPeerInteresting( msgs );
399    if( i != msgs->info->clientIsInterested )
400        sendInterest( msgs, i );
401    if( i )
402        fireNeedReq( msgs );
403}
404
405#define MIN_CHOKE_PERIOD_SEC 10
406
407static void
408cancelAllRequestsToClientExceptFast( tr_peermsgs * msgs )
409{
410    tr_list_free( &msgs->peerAskedFor, tr_free );
411}
412
413void
414tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
415{
416    const time_t fibrillationTime = time(NULL) - MIN_CHOKE_PERIOD_SEC;
417
418    assert( msgs != NULL );
419    assert( msgs->info != NULL );
420    assert( choke==0 || choke==1 );
421
422    if( msgs->info->chokeChangedAt > fibrillationTime )
423    {
424        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
425    }
426    else if( msgs->info->peerIsChoked != choke )
427    {
428        msgs->info->peerIsChoked = choke;
429        if( choke )
430            cancelAllRequestsToClientExceptFast( msgs );
431        protocolSendChoke( msgs, choke );
432        msgs->info->chokeChangedAt = time( NULL );
433    }
434}
435
436/**
437***
438**/
439
440void
441tr_peerMsgsHave( tr_peermsgs * msgs,
442                 uint32_t      index )
443{
444    protocolSendHave( msgs, index );
445
446    /* since we have more pieces now, we might not be interested in this peer */
447    updateInterest( msgs );
448}
449#if 0
450static void
451sendFastSuggest( tr_peermsgs * msgs,
452                 uint32_t      pieceIndex )
453{
454    assert( msgs != NULL );
455   
456    if( tr_peerIoSupportsFEXT( msgs->io ) )
457    {
458        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
459        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
460        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
461    }
462}
463#endif
464static void
465sendFastHave( tr_peermsgs * msgs, int all )
466{
467    assert( msgs != NULL );
468   
469    if( tr_peerIoSupportsFEXT( msgs->io ) )
470    {
471        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
472        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, ( all ? BT_HAVE_ALL
473                                                                : BT_HAVE_NONE ) );
474        updateInterest( msgs );
475    }
476}
477
478static void
479sendFastReject( tr_peermsgs * msgs,
480                uint32_t      pieceIndex,
481                uint32_t      offset,
482                uint32_t      length )
483{
484    assert( msgs != NULL );
485
486    if( tr_peerIoSupportsFEXT( msgs->io ) )
487    {
488        const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
489        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
490        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_REJECT );
491        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
492        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
493        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
494    }
495}
496
497static void
498sendFastAllowed( tr_peermsgs * msgs,
499                 uint32_t      pieceIndex)
500{
501    assert( msgs != NULL );
502   
503    if( tr_peerIoSupportsFEXT( msgs->io ) )
504    {
505        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
506        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_ALLOWED_FAST );
507        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
508    }
509}
510
511
512
513static void
514sendFastAllowedSet( tr_peermsgs * msgs )
515{
516    int i = 0;
517    while (i <= msgs->torrent->info.pieceCount )
518    {
519        if ( tr_bitfieldHas( msgs->peerAllowedPieces, i) )
520            sendFastAllowed( msgs, i );
521        i++;
522    }
523}
524
525
526/**
527***
528**/
529
530static int
531reqIsValid( const tr_peermsgs   * msgs,
532            uint32_t              index,
533            uint32_t              offset,
534            uint32_t              length )
535{
536    const tr_torrent * tor = msgs->torrent;
537
538    if( index >= (uint32_t) tor->info.pieceCount )
539        return FALSE;
540    if ( (int)offset >= tr_torPieceCountBytes( tor, (int)index ) )
541        return FALSE;
542    if( length > MAX_REQUEST_BYTE_COUNT )
543        return FALSE;
544    if( tr_pieceOffset( tor, index, offset, length ) > tor->info.totalSize )
545        return FALSE;
546
547    return TRUE;
548}
549
550static int
551requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
552{
553    return reqIsValid( msgs, req->index, req->offset, req->length );
554}
555
556static void
557pumpRequestQueue( tr_peermsgs * msgs )
558{
559    const int max = msgs->maxActiveRequests;
560    const int min = msgs->minActiveRequests;
561    int count = tr_list_size( msgs->clientAskedFor );
562    int sent = 0;
563
564    if( count > min )
565        return;
566    if( msgs->info->clientIsChoked )
567        return;
568
569    while( ( count < max ) && ( msgs->clientWillAskFor != NULL ) )
570    {
571        struct peer_request * r = tr_list_pop_front( &msgs->clientWillAskFor );
572        assert( requestIsValid( msgs, r ) );
573        assert( tr_bitfieldHas( msgs->info->have, r->index ) );
574        protocolSendRequest( msgs, r );
575        r->time_requested = msgs->lastReqAddedAt = time( NULL );
576        tr_list_append( &msgs->clientAskedFor, r );
577        ++count;
578        ++sent;
579    }
580
581    if( sent )
582        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
583                sent,
584                tr_list_size(msgs->clientAskedFor),
585                tr_list_size(msgs->clientWillAskFor) );
586
587    if( count < max )
588        fireNeedReq( msgs );
589}
590
591static int
592pulse( void * vmsgs );
593
594int
595tr_peerMsgsAddRequest( tr_peermsgs * msgs,
596                       uint32_t      index, 
597                       uint32_t      offset, 
598                       uint32_t      length )
599{
600    const int req_max = msgs->maxActiveRequests;
601    struct peer_request tmp, *req;
602
603    assert( msgs != NULL );
604    assert( msgs->torrent != NULL );
605    assert( reqIsValid( msgs, index, offset, length ) );
606
607    /**
608    ***  Reasons to decline the request
609    **/
610
611    /* don't send requests to choked clients */
612    if( msgs->info->clientIsChoked ) {
613        dbgmsg( msgs, "declining request because they're choking us" );
614        return TR_ADDREQ_CLIENT_CHOKED;
615    }
616
617    /* peer doesn't have this piece */
618    if( !tr_bitfieldHas( msgs->info->have, index ) )
619        return TR_ADDREQ_MISSING;
620
621    /* peer's queue is full */
622    if( tr_list_size( msgs->clientWillAskFor ) >= req_max ) {
623        dbgmsg( msgs, "declining request because we're full" );
624        return TR_ADDREQ_FULL;
625    }
626
627    /* have we already asked for this piece? */
628    tmp.index = index;
629    tmp.offset = offset;
630    tmp.length = length;
631    if( tr_list_find( msgs->clientAskedFor, &tmp, compareRequest ) ) {
632        dbgmsg( msgs, "declining because it's a duplicate" );
633        return TR_ADDREQ_DUPLICATE;
634    }
635    if( tr_list_find( msgs->clientWillAskFor, &tmp, compareRequest ) ) {
636        dbgmsg( msgs, "declining because it's a duplicate" );
637        return TR_ADDREQ_DUPLICATE;
638    }
639
640    /**
641    ***  Accept this request
642    **/
643
644    dbgmsg( msgs, "added req for piece %d, offset %d", (int)index, (int)offset );
645    req = tr_new0( struct peer_request, 1 );
646    *req = tmp;
647    tr_list_append( &msgs->clientWillAskFor, req );
648    return TR_ADDREQ_OK;
649}
650
651static void
652cancelAllRequestsToPeer( tr_peermsgs * msgs )
653{
654    struct peer_request * req;
655
656    while(( req = tr_list_pop_front( &msgs->clientWillAskFor ) ))
657    {
658        fireCancelledReq( msgs, req );
659        tr_free( req );
660    }
661
662    while(( req = tr_list_pop_front( &msgs->clientAskedFor ) ))
663    {
664        fireCancelledReq( msgs, req );
665        protocolSendCancel( msgs, req );
666        tr_free( req );
667    }
668}
669
670void
671tr_peerMsgsCancel( tr_peermsgs * msgs,
672                   uint32_t      pieceIndex,
673                   uint32_t      offset,
674                   uint32_t      length )
675{
676    struct peer_request *req, tmp;
677
678    assert( msgs != NULL );
679    assert( length > 0 );
680
681    /* have we asked the peer for this piece? */
682    tmp.index = pieceIndex;
683    tmp.offset = offset;
684    tmp.length = length;
685
686    /* if it's only in the queue and hasn't been sent yet, free it */
687    if(( req = tr_list_remove( &msgs->clientWillAskFor, &tmp, compareRequest ) ))
688    {
689        fireCancelledReq( msgs, req );
690        tr_free( req );
691    }
692
693    /* if it's already been sent, send a cancel message too */
694    if(( req = tr_list_remove( &msgs->clientAskedFor, &tmp, compareRequest ) ))
695    {
696        protocolSendCancel( msgs, req );
697        fireCancelledReq( msgs, req );
698        tr_free( req );
699    }
700}
701
702/**
703***
704**/
705
706static void
707sendLtepHandshake( tr_peermsgs * msgs )
708{
709    benc_val_t val, *m;
710    char * buf;
711    int len;
712    int pex;
713    const char * v = TR_NAME " " USERAGENT_PREFIX;
714    const int port = tr_getPublicPort( msgs->handle );
715    struct evbuffer * outbuf;
716
717    if( msgs->clientSentLtepHandshake )
718        return;
719
720    outbuf = evbuffer_new( );
721    dbgmsg( msgs, "sending an ltep handshake" );
722    msgs->clientSentLtepHandshake = 1;
723
724    /* decide if we want to advertise pex support */
725    if( !tr_torrentIsPexEnabled( msgs->torrent ) )
726        pex = 0;
727    else if( msgs->peerSentLtepHandshake )
728        pex = msgs->peerSupportsPex ? 1 : 0;
729    else
730        pex = 1;
731
732    tr_bencInit( &val, TYPE_DICT );
733    tr_bencDictReserve( &val, 4 );
734    tr_bencInitInt( tr_bencDictAdd( &val, "e" ), 1 );
735    m  = tr_bencDictAdd( &val, "m" );
736    tr_bencInit( m, TYPE_DICT );
737    if( pex ) {
738        tr_bencDictReserve( m, 1 );
739        tr_bencInitInt( tr_bencDictAdd( m, "ut_pex" ), OUR_LTEP_PEX );
740    }
741    if( port > 0 )
742        tr_bencInitInt( tr_bencDictAdd( &val, "p" ), port );
743    tr_bencInitStr( tr_bencDictAdd( &val, "v" ), v, 0, 1 );
744    buf = tr_bencSave( &val, &len );
745
746    tr_peerIoWriteUint32( msgs->io, outbuf, 2*sizeof(uint8_t) + len );
747    tr_peerIoWriteUint8 ( msgs->io, outbuf, BT_LTEP );
748    tr_peerIoWriteUint8 ( msgs->io, outbuf, LTEP_HANDSHAKE );
749    tr_peerIoWriteBytes ( msgs->io, outbuf, buf, len );
750
751    tr_peerIoWriteBuf( msgs->io, outbuf );
752
753#if 0
754    dbgmsg( msgs, "here is the ltep handshake we sent:" );
755    tr_bencPrint( &val );
756    dbgmsg( msgs, "here is the ltep handshake we read [%s]:", tr_bencSave( &val, NULL ) );
757#endif
758
759    /* cleanup */
760    tr_bencFree( &val );
761    tr_free( buf );
762    evbuffer_free( outbuf );
763}
764
765static void
766parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
767{
768    benc_val_t val, * sub;
769    uint8_t * tmp = tr_new( uint8_t, len );
770
771    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
772    msgs->peerSentLtepHandshake = 1;
773
774    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
775        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
776        tr_free( tmp );
777        return;
778    }
779
780#if 0
781    dbgmsg( msgs, "here is the ltep handshake we read:" );
782    tr_bencPrint( &val );
783    dbgmsg( msgs, "here is the ltep handshake we read [%s]:", tr_bencSave( &val, NULL ) );
784#endif
785
786    /* does the peer prefer encrypted connections? */
787    if(( sub = tr_bencDictFindType( &val, "e", TYPE_INT )))
788        msgs->info->encryption_preference = sub->val.i
789                                      ? ENCRYPTION_PREFERENCE_YES
790                                      : ENCRYPTION_PREFERENCE_NO;
791
792    /* check supported messages for utorrent pex */
793    if(( sub = tr_bencDictFindType( &val, "m", TYPE_DICT ))) {
794        if(( sub = tr_bencDictFindType( sub, "ut_pex", TYPE_INT ))) {
795            msgs->peerSupportsPex = 1;
796            msgs->ut_pex_id = (uint8_t) sub->val.i;
797            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
798        }
799    }
800
801    /* get peer's listening port */
802    if(( sub = tr_bencDictFindType( &val, "p", TYPE_INT ))) {
803        msgs->info->port = htons( (uint16_t)sub->val.i );
804        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
805    }
806
807    tr_bencFree( &val );
808    tr_free( tmp );
809}
810
811static void
812parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
813{
814    benc_val_t val, * sub;
815    uint8_t * tmp;
816
817    if( !tr_torrentIsPexEnabled( msgs->torrent ) ) /* no sharing! */
818        return;
819
820    tmp = tr_new( uint8_t, msglen );
821    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
822
823    if( tr_bencLoad( tmp, msglen, &val, NULL ) || ( val.type != TYPE_DICT ) ) {
824        dbgmsg( msgs, "GET can't read extended-pex dictionary" );
825        tr_free( tmp );
826        return;
827    }
828
829    if(( sub = tr_bencDictFindType( &val, "added", TYPE_STR ))) {
830        const int n = sub->val.s.i / 6 ;
831        dbgmsg( msgs, "got %d peers from uT pex", n );
832        tr_peerMgrAddPeers( msgs->handle->peerMgr,
833                            msgs->torrent->info.hash,
834                            TR_PEER_FROM_PEX,
835                            (uint8_t*)sub->val.s.s, n );
836    }
837
838    tr_bencFree( &val );
839    tr_free( tmp );
840}
841
842static void
843sendPex( tr_peermsgs * msgs );
844
845static void
846parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
847{
848    uint8_t ltep_msgid;
849
850    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
851    msglen--;
852
853    if( ltep_msgid == LTEP_HANDSHAKE )
854    {
855        dbgmsg( msgs, "got ltep handshake" );
856        parseLtepHandshake( msgs, msglen, inbuf );
857        if( tr_peerIoSupportsLTEP( msgs->io ) )
858        {
859            sendLtepHandshake( msgs );
860            sendPex( msgs );
861        }
862    }
863    else if( ltep_msgid == msgs->ut_pex_id )
864    {
865        dbgmsg( msgs, "got ut pex" );
866        msgs->peerSupportsPex = 1;
867        parseUtPex( msgs, msglen, inbuf );
868    }
869    else
870    {
871        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
872        evbuffer_drain( inbuf, msglen );
873    }
874}
875
876static int
877readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
878{
879    uint32_t len;
880
881    if( inlen < sizeof(len) )
882        return READ_MORE;
883
884    tr_peerIoReadUint32( msgs->io, inbuf, &len );
885
886    if( len == 0 ) /* peer sent us a keepalive message */
887        dbgmsg( msgs, "got KeepAlive" );
888    else {
889        msgs->incoming.length = len;
890        msgs->state = AWAITING_BT_ID;
891    }
892
893    return READ_AGAIN;
894}
895
896static int
897readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen );
898
899static int
900readBtId( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
901{
902    uint8_t id;
903
904    if( inlen < sizeof(uint8_t) )
905        return READ_MORE;
906
907    tr_peerIoReadUint8( msgs->io, inbuf, &id );
908    msgs->incoming.id = id;
909
910    if( id==BT_PIECE )
911    {
912        msgs->state = AWAITING_BT_PIECE;
913        return READ_AGAIN;
914    }
915    else if( msgs->incoming.length != 1 )
916    {
917        msgs->state = AWAITING_BT_MESSAGE;
918        return READ_AGAIN;
919    }
920    else return readBtMessage( msgs, inbuf, inlen-1 );
921}
922
923static void
924updatePeerProgress( tr_peermsgs * msgs )
925{
926    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
927    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
928    updateInterest( msgs );
929    firePeerProgress( msgs );
930}
931
932static int
933clientCanSendFastBlock( const tr_peermsgs * msgs UNUSED )
934{
935    /* We can't send a fast piece if a peer has more than MAX_FAST_ALLOWED_THRESHOLD pieces */
936    if( tr_bitfieldCountTrueBits( msgs->info->have ) > MAX_FAST_ALLOWED_THRESHOLD )
937        return FALSE;
938   
939    /* ...or if we don't have ourself enough pieces */
940    if( tr_bitfieldCountTrueBits( tr_cpPieceBitfield( msgs->torrent->completion ) ) < MAX_FAST_ALLOWED_THRESHOLD )
941        return FALSE;
942
943    /* Maybe a bandwidth limit ? */
944    return TRUE;
945}
946
947static void
948peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
949{
950    const int reqIsValid = requestIsValid( msgs, req );
951    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
952    const int peerIsChoked = msgs->info->peerIsChoked;
953    const int peerIsFast = tr_peerIoSupportsFEXT( msgs->io );
954    const int pieceIsFast = reqIsValid && tr_bitfieldHas( msgs->peerAllowedPieces, req->index );
955    const int canSendFast = clientCanSendFastBlock( msgs );
956
957    if( !reqIsValid ) /* bad request */
958    {
959        dbgmsg( msgs, "rejecting an invalid request." );
960        sendFastReject( msgs, req->index, req->offset, req->length );
961    }
962    else if( !clientHasPiece ) /* we don't have it */
963    {
964        dbgmsg( msgs, "rejecting request for a piece we don't have." );
965        sendFastReject( msgs, req->index, req->offset, req->length );
966    }
967    else if( peerIsChoked && !peerIsFast ) /* doesn't he know he's choked? */
968    {
969        tr_peerMsgsSetChoke( msgs, 1 );
970        sendFastReject( msgs, req->index, req->offset, req->length );
971    }
972    else if( peerIsChoked && peerIsFast && ( !pieceIsFast || !canSendFast ) )
973    {
974        sendFastReject( msgs, req->index, req->offset, req->length );
975    }
976    else /* YAY */
977    {
978        struct peer_request * tmp = tr_new( struct peer_request, 1 );
979        *tmp = *req;
980        if( peerIsFast && pieceIsFast )
981            tr_list_append( &msgs->peerAskedForFast, tmp );
982        else
983            tr_list_append( &msgs->peerAskedFor, tmp );
984    }
985}
986
987static int
988messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
989{
990    switch( id )
991    {
992        case BT_CHOKE:
993        case BT_UNCHOKE:
994        case BT_INTERESTED:
995        case BT_NOT_INTERESTED:
996        case BT_HAVE_ALL:
997        case BT_HAVE_NONE:
998            return len==1;
999
1000        case BT_HAVE:
1001        case BT_SUGGEST:
1002        case BT_ALLOWED_FAST:
1003            return len==5;
1004
1005        case BT_BITFIELD:
1006            return len == (msg->torrent->info.pieceCount+7u)/8u + 1u;
1007       
1008        case BT_REQUEST:
1009        case BT_CANCEL:
1010        case BT_REJECT:
1011            return len==13;
1012
1013        case BT_PIECE:
1014            return len>9 && len<=16393;
1015
1016        case BT_PORT:
1017            return len==3;
1018
1019        case BT_LTEP:
1020            return len >= 2;
1021
1022        default:
1023            return FALSE;
1024    }
1025}
1026
1027static int
1028clientGotBlock( tr_peermsgs * msgs, const uint8_t * block, const struct peer_request * req );
1029
1030static void
1031clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1032{
1033    tr_torrent * tor = msgs->torrent;
1034    tor->activityDate = tr_date( );
1035    tor->downloadedCur += byteCount;
1036    msgs->info->pieceDataActivityDate = time( NULL );
1037    msgs->info->credit += (int)(byteCount * SWIFT_REPAYMENT_RATIO);
1038    tr_rcTransferred( msgs->info->rcToClient, byteCount );
1039    tr_rcTransferred( tor->download, byteCount );
1040    tr_rcTransferred( tor->handle->download, byteCount );
1041    tr_statsAddDownloaded( msgs->handle, byteCount );
1042}
1043
1044static int
1045readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1046{
1047    struct peer_request * req = &msgs->incoming.blockReq;
1048    assert( EVBUFFER_LENGTH(inbuf) >= inlen );
1049    dbgmsg( msgs, "In readBtPiece" );
1050
1051    if( !req->length )
1052    {
1053        if( inlen < 8 )
1054            return READ_MORE;
1055
1056        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
1057        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
1058        req->length = msgs->incoming.length - 9;
1059        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1060        return READ_AGAIN;
1061    }
1062    else
1063    {
1064        int err;
1065
1066        /* read in another chunk of data */
1067        const size_t nLeft = req->length - EVBUFFER_LENGTH(msgs->incoming.block);
1068        size_t n = MIN( nLeft, inlen );
1069        uint8_t * buf = tr_new( uint8_t, n );
1070        assert( EVBUFFER_LENGTH(inbuf) >= n );
1071        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
1072        evbuffer_add( msgs->incoming.block, buf, n );
1073        clientGotBytes( msgs, n );
1074        tr_free( buf );
1075        dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
1076               (int)n, req->index, req->offset, req->length,
1077               (int)( req->length - EVBUFFER_LENGTH(msgs->incoming.block) ) );
1078        if( EVBUFFER_LENGTH(msgs->incoming.block) < req->length )
1079            return READ_MORE;
1080
1081        /* we've got the whole block ... process it */
1082        err = clientGotBlock( msgs, EVBUFFER_DATA(msgs->incoming.block), req );
1083
1084        /* cleanup */
1085        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH(msgs->incoming.block) );
1086        req->length = 0;
1087        msgs->state = AWAITING_BT_LENGTH;
1088        if( !err )
1089            return READ_AGAIN;
1090        else {
1091            fireGotAssertError( msgs );
1092            return READ_DONE;
1093        }
1094    }
1095}
1096
1097static int
1098readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1099{
1100    uint32_t ui32;
1101    uint32_t msglen = msgs->incoming.length;
1102    const uint8_t id = msgs->incoming.id;
1103    const size_t startBufLen = EVBUFFER_LENGTH( inbuf );
1104
1105    --msglen; // id length
1106
1107    if( inlen < msglen )
1108        return READ_MORE;
1109
1110    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id, (int)msglen, (int)inlen );
1111
1112    if( !messageLengthIsCorrect( msgs, id, msglen+1 ) )
1113    {
1114        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1115        fireGotError( msgs );
1116        return READ_DONE;
1117    }
1118
1119    switch( id )
1120    {
1121        case BT_CHOKE:
1122            dbgmsg( msgs, "got Choke" );
1123            msgs->info->clientIsChoked = 1;
1124            cancelAllRequestsToPeer( msgs );
1125            cancelAllRequestsToClientExceptFast( msgs );
1126            break;
1127
1128        case BT_UNCHOKE:
1129            dbgmsg( msgs, "got Unchoke" );
1130            msgs->info->clientIsChoked = 0;
1131            fireNeedReq( msgs );
1132            break;
1133
1134        case BT_INTERESTED:
1135            dbgmsg( msgs, "got Interested" );
1136            msgs->info->peerIsInterested = 1;
1137            tr_peerMsgsSetChoke( msgs, 0 );
1138            break;
1139
1140        case BT_NOT_INTERESTED:
1141            dbgmsg( msgs, "got Not Interested" );
1142            msgs->info->peerIsInterested = 0;
1143            break;
1144           
1145        case BT_HAVE:
1146            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1147            dbgmsg( msgs, "got Have: %u", ui32 );
1148            tr_bitfieldAdd( msgs->info->have, ui32 );
1149            /* If this is a fast-allowed piece for this peer, mark it as normal now */
1150            if( msgs->clientAllowedPieces != NULL && tr_bitfieldHas( msgs->clientAllowedPieces, ui32 ) )
1151                tr_bitfieldRem( msgs->clientAllowedPieces, ui32 );
1152            updatePeerProgress( msgs );
1153            tr_rcTransferred( msgs->torrent->swarmspeed, msgs->torrent->info.pieceSize );
1154            break;
1155
1156        case BT_BITFIELD: {
1157            const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
1158            dbgmsg( msgs, "got a bitfield" );
1159            msgs->peerSentBitfield = 1;
1160            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
1161            updatePeerProgress( msgs );
1162            tr_peerMsgsSetChoke( msgs, !clientIsSeed || (msgs->info->progress<1.0) );
1163            fireNeedReq( msgs );
1164            break;
1165        }
1166
1167        case BT_REQUEST: {
1168            struct peer_request req;
1169            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1170            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1171            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1172            dbgmsg( msgs, "got Request: %u:%u->%u", req.index, req.offset, req.length );
1173            peerMadeRequest( msgs, &req );
1174            break;
1175        }
1176
1177        case BT_CANCEL: {
1178            struct peer_request req;
1179            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1180            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1181            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1182            dbgmsg( msgs, "got a Cancel %u:%u->%u", req.index, req.offset, req.length );
1183            tr_free( tr_list_remove( &msgs->peerAskedForFast, &req, compareRequest ) );
1184            tr_free( tr_list_remove( &msgs->peerAskedFor, &req, compareRequest ) );
1185            break;
1186        }
1187
1188        case BT_PIECE:
1189            assert( 0 ); /* should be handled elsewhere! */
1190            break;
1191       
1192        case BT_PORT:
1193            dbgmsg( msgs, "Got a BT_PORT" );
1194            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1195            break;
1196           
1197        case BT_SUGGEST: {
1198            dbgmsg( msgs, "Got a BT_SUGGEST" );
1199            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1200            const tr_bitfield * bt = tr_cpPieceBitfield( msgs->torrent->completion );
1201            if( tr_bitfieldHas( bt, ui32 ) )
1202                tr_bitfieldAdd( msgs->clientSuggestedPieces, ui32 );
1203            break;
1204        }
1205           
1206        case BT_HAVE_ALL:
1207            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1208            tr_bitfieldAddRange( msgs->info->have, 0, msgs->torrent->info.pieceCount );
1209            updatePeerProgress( msgs );
1210            break;
1211       
1212       
1213        case BT_HAVE_NONE:
1214            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1215            tr_bitfieldClear( msgs->info->have );
1216            updatePeerProgress( msgs );
1217            break;
1218       
1219        case BT_REJECT: {
1220            struct peer_request req;
1221            dbgmsg( msgs, "Got a BT_REJECT" );
1222            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1223            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1224            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1225            tr_free( tr_list_remove( &msgs->clientAskedFor, &req, compareRequest ) );
1226            break;
1227        }
1228
1229        case BT_ALLOWED_FAST: {
1230            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1231            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1232            tr_bitfieldAdd( msgs->clientAllowedPieces, ui32 );
1233            break;
1234        }
1235
1236        case BT_LTEP:
1237            dbgmsg( msgs, "Got a BT_LTEP" );
1238            parseLtep( msgs, msglen, inbuf );
1239            break;
1240
1241        default:
1242            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1243            tr_peerIoDrain( msgs->io, inbuf, msglen );
1244            break;
1245    }
1246
1247    assert( msglen + 1 == msgs->incoming.length );
1248    assert( EVBUFFER_LENGTH(inbuf) == startBufLen - msglen );
1249
1250    msgs->state = AWAITING_BT_LENGTH;
1251    return READ_AGAIN;
1252}
1253
1254static void
1255peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1256{
1257    tr_torrent * tor = msgs->torrent;
1258    tor->activityDate = tr_date( );
1259    tor->uploadedCur += byteCount;
1260    msgs->info->pieceDataActivityDate = time( NULL );
1261    msgs->info->credit -= byteCount;
1262    tr_rcTransferred( msgs->info->rcToPeer, byteCount );
1263    tr_rcTransferred( tor->upload, byteCount );
1264    tr_rcTransferred( tor->handle->upload, byteCount );
1265    tr_statsAddUploaded( msgs->handle, byteCount );
1266}
1267
1268static size_t
1269getDownloadMax( const tr_peermsgs * msgs )
1270{
1271    static const size_t maxval = ~0;
1272    const tr_torrent * tor = msgs->torrent;
1273
1274    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1275        return tor->handle->useDownloadLimit
1276            ? tr_rcBytesLeft( tor->handle->download ) : maxval;
1277
1278    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
1279        return tr_rcBytesLeft( tor->download );
1280
1281    return maxval;
1282}
1283
1284static void
1285reassignBytesToCorrupt( tr_peermsgs * msgs, uint32_t byteCount )
1286{
1287    tr_torrent * tor = msgs->torrent;
1288
1289    /* increment the `corrupt' field */
1290    tor->corruptCur += byteCount;
1291
1292    /* decrement the `downloaded' field */
1293    if( tor->downloadedCur >= byteCount )
1294        tor->downloadedCur -= byteCount;
1295    else
1296        tor->downloadedCur = 0;
1297}
1298
1299
1300static void
1301gotBadPiece( tr_peermsgs * msgs, uint32_t pieceIndex )
1302{
1303    const uint32_t byteCount =
1304        tr_torPieceCountBytes( msgs->torrent, (int)pieceIndex );
1305    reassignBytesToCorrupt( msgs, byteCount );
1306}
1307
1308static void
1309clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1310{
1311    reassignBytesToCorrupt( msgs, req->length );
1312}
1313
1314static void
1315addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1316{
1317    if( !msgs->info->blame )
1318         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1319    tr_bitfieldAdd( msgs->info->blame, index );
1320}
1321
1322static int
1323clientGotBlock( tr_peermsgs                * msgs,
1324                const uint8_t              * data,
1325                const struct peer_request  * req )
1326{
1327    int i;
1328    tr_torrent * tor = msgs->torrent;
1329    const int block = _tr_block( tor, req->index, req->offset );
1330    struct peer_request *myreq;
1331
1332    assert( msgs != NULL );
1333    assert( req != NULL );
1334
1335    if( req->length != (uint32_t)tr_torBlockCountBytes( msgs->torrent, block ) )
1336    {
1337        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1338                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1339        return TR_ERROR_ASSERT;
1340    }
1341
1342    /* save the block */
1343    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1344
1345    /**
1346    *** Remove the block from our `we asked for this' list
1347    **/
1348
1349    myreq = tr_list_remove( &msgs->clientAskedFor, req, compareRequest );
1350    if( myreq == NULL ) {
1351        clientGotUnwantedBlock( msgs, req );
1352        dbgmsg( msgs, "we didn't ask for this message..." );
1353        return 0;
1354    }
1355
1356    dbgmsg( msgs, "got block %u:%u->%u (turnaround time %d secs)", 
1357                  myreq->index, myreq->offset, myreq->length,
1358                  (int)(time(NULL) - myreq->time_requested) );
1359    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1360                  tr_list_size(msgs->clientAskedFor));
1361
1362    tr_free( myreq );
1363    myreq = NULL;
1364
1365    /**
1366    *** Error checks
1367    **/
1368
1369    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1370        dbgmsg( msgs, "we have this block already..." );
1371        clientGotUnwantedBlock( msgs, req );
1372        return 0;
1373    }
1374
1375    /**
1376    ***  Save the block
1377    **/
1378
1379    msgs->info->peerSentPieceDataAt = time( NULL );
1380    i = tr_ioWrite( tor, req->index, req->offset, req->length, data );
1381    if( i )
1382        return 0;
1383
1384    tr_cpBlockAdd( tor->completion, block );
1385
1386    addPeerToBlamefield( msgs, req->index );
1387
1388    fireGotBlock( msgs, req );
1389
1390    /**
1391    ***  Handle if this was the last block in the piece
1392    **/
1393
1394    if( tr_cpPieceIsComplete( tor->completion, req->index ) )
1395    {
1396        if( tr_ioHash( tor, req->index ) )
1397        {
1398            gotBadPiece( msgs, req->index );
1399            return 0;
1400        }
1401
1402        fireClientHave( msgs, req->index );
1403    }
1404
1405    return 0;
1406}
1407
1408static ReadState
1409canRead( struct bufferevent * evin, void * vmsgs )
1410{
1411    ReadState ret;
1412    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1413    struct evbuffer * inbuf = EVBUFFER_INPUT ( evin );
1414    const size_t buflen = EVBUFFER_LENGTH( inbuf );
1415    const size_t downloadMax = getDownloadMax( msgs );
1416    const size_t n = MIN( buflen, downloadMax );
1417
1418    if( !n )
1419    {
1420        ret = READ_DONE;
1421    }
1422    else switch( msgs->state )
1423    {
1424        case AWAITING_BT_LENGTH:  ret = readBtLength ( msgs, inbuf, n ); break;
1425        case AWAITING_BT_ID:      ret = readBtId     ( msgs, inbuf, n ); break;
1426        case AWAITING_BT_MESSAGE: ret = readBtMessage( msgs, inbuf, n ); break;
1427        case AWAITING_BT_PIECE:   ret = readBtPiece  ( msgs, inbuf, n ); break;
1428        default: assert( 0 );
1429    }
1430
1431    return ret;
1432}
1433
1434static void
1435sendKeepalive( tr_peermsgs * msgs )
1436{
1437    dbgmsg( msgs, "sending a keepalive message" );
1438    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1439}
1440
1441/**
1442***
1443**/
1444
1445static size_t
1446getUploadMax( const tr_peermsgs * msgs )
1447{
1448    static const size_t maxval = ~0;
1449    const tr_torrent * tor = msgs->torrent;
1450    const int useSwift = SWIFT_ENABLED && !tr_torrentIsSeed( msgs->torrent );
1451    const size_t swiftLeft = msgs->info->credit;
1452    size_t speedLeft;
1453    size_t bufLeft;
1454    size_t ret;
1455
1456    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1457        speedLeft = tor->handle->useUploadLimit ? tr_rcBytesLeft( tor->handle->upload ) : maxval;
1458    else if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
1459        speedLeft = tr_rcBytesLeft( tor->upload );
1460    else
1461        speedLeft = ~0;
1462
1463    bufLeft = MAX_OUTBUF_SIZE - tr_peerIoWriteBytesWaiting( msgs->io );
1464    ret = MIN( speedLeft, bufLeft );
1465    if( useSwift)
1466        ret = MIN( ret, swiftLeft );
1467    return ret;
1468}
1469
1470static int
1471ratePulse( void * vmsgs )
1472{
1473    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1474    msgs->info->rateToClient = tr_rcRate( msgs->info->rcToClient );
1475    msgs->info->rateToPeer = tr_rcRate( msgs->info->rcToPeer );
1476    msgs->maxActiveRequests = MIN( 8 + (int)(msgs->info->rateToClient/5), 100 );
1477    msgs->minActiveRequests = msgs->maxActiveRequests / 3;
1478    return TRUE;
1479}
1480
1481static struct peer_request*
1482popNextRequest( tr_peermsgs * msgs )
1483{
1484    struct peer_request * ret;
1485    ret = tr_list_pop_front( &msgs->peerAskedForFast );
1486    if( !ret )
1487        ret = tr_list_pop_front( &msgs->peerAskedFor);
1488    return ret;
1489}
1490
1491static void
1492updatePeerStatus( tr_peermsgs * msgs )
1493{
1494    tr_peer * peer = msgs->info;
1495
1496    if( !msgs->peerSentBitfield )
1497        peer->status = TR_PEER_STATUS_HANDSHAKE;
1498
1499    else if( ( time(NULL) - peer->pieceDataActivityDate ) < 3 )
1500        peer->status = peer->clientIsChoked
1501                       ? TR_PEER_STATUS_ACTIVE_AND_CHOKED
1502                       : TR_PEER_STATUS_ACTIVE;
1503
1504    else if( peer->peerIsChoked )
1505        peer->status = TR_PEER_STATUS_PEER_IS_CHOKED;
1506
1507    else if( peer->clientIsChoked )
1508        peer->status = peer->clientIsInterested
1509                       ? TR_PEER_STATUS_CLIENT_IS_INTERESTED
1510                       : TR_PEER_STATUS_CLIENT_IS_CHOKED;
1511
1512    else if( msgs->clientAskedFor != NULL )
1513        peer->status = TR_PEER_STATUS_REQUEST_SENT;
1514
1515    else
1516        peer->status = TR_PEER_STATUS_READY;
1517}
1518
1519static int
1520pulse( void * vmsgs )
1521{
1522    const time_t now = time( NULL );
1523    tr_peermsgs * msgs = vmsgs;
1524    struct peer_request * r;
1525
1526    tr_peerIoTryRead( msgs->io );
1527    pumpRequestQueue( msgs );
1528    updatePeerStatus( msgs );
1529
1530    if( msgs->sendingBlock )
1531    {
1532        const size_t uploadMax = getUploadMax( msgs );
1533        size_t len = EVBUFFER_LENGTH( msgs->outBlock );
1534        const size_t outlen = MIN( len, uploadMax );
1535
1536        assert( len );
1537
1538        if( outlen )
1539        {
1540            tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen );
1541            evbuffer_drain( msgs->outBlock, outlen );
1542            peerGotBytes( msgs, outlen );
1543
1544            len -= outlen;
1545            msgs->clientSentAnythingAt = now;
1546            msgs->sendingBlock = len!=0;
1547
1548            dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen, (int)len );
1549        }
1550        else dbgmsg( msgs, "stalled writing block... uploadMax %lu, outlen %lu", uploadMax, outlen );
1551    }
1552
1553    if( !msgs->sendingBlock )
1554    {
1555        if(( EVBUFFER_LENGTH( msgs->outMessages ) ))
1556        {
1557            dbgmsg( msgs, "flushing outMessages..." );
1558            tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1559            msgs->clientSentAnythingAt = now;
1560        }
1561        else if( !EVBUFFER_LENGTH( msgs->outBlock )
1562            && (( r = popNextRequest( msgs )))
1563            && requestIsValid( msgs, r )
1564            && tr_cpPieceIsComplete( msgs->torrent->completion, r->index ) )
1565        {
1566            uint8_t * buf = tr_new( uint8_t, r->length );
1567
1568            if( !tr_ioRead( msgs->torrent, r->index, r->offset, r->length, buf ) )
1569            {
1570                tr_peerIo * io = msgs->io;
1571                struct evbuffer * out = msgs->outBlock;
1572
1573                dbgmsg( msgs, "sending block %u:%u->%u", r->index, r->offset, r->length );
1574                tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 2*sizeof(uint32_t) + r->length );
1575                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1576                tr_peerIoWriteUint32( io, out, r->index );
1577                tr_peerIoWriteUint32( io, out, r->offset );
1578                tr_peerIoWriteBytes ( io, out, buf, r->length );
1579                msgs->sendingBlock = 1;
1580            }
1581
1582            tr_free( buf );
1583            tr_free( r );
1584        }
1585        else if( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
1586        {
1587            sendKeepalive( msgs );
1588        }
1589    }
1590
1591    return TRUE; /* loop forever */
1592}
1593
1594static void
1595gotError( struct bufferevent * evbuf UNUSED, short what, void * vmsgs )
1596{
1597    if( what & EVBUFFER_TIMEOUT )
1598        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1599
1600    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) ) {
1601        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1602                what, errno, strerror(errno) );
1603        fireGotError( vmsgs );
1604    }
1605}
1606
1607static void
1608sendBitfield( tr_peermsgs * msgs )
1609{
1610    const tr_bitfield * bitfield = tr_cpPieceBitfield( msgs->torrent->completion );
1611    struct evbuffer * out = msgs->outMessages;
1612
1613    dbgmsg( msgs, "sending peer a bitfield message" );
1614    tr_peerIoWriteUint32( msgs->io, out, sizeof(uint8_t) + bitfield->len );
1615    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1616    tr_peerIoWriteBytes ( msgs->io, out, bitfield->bits, bitfield->len );
1617}
1618
1619/**
1620***
1621**/
1622
1623/* some peers give us error messages if we send
1624   more than this many peers in a single pex message */
1625#define MAX_PEX_DIFFS 200
1626
1627typedef struct
1628{
1629    tr_pex * added;
1630    tr_pex * dropped;
1631    tr_pex * elements;
1632    int addedCount;
1633    int droppedCount;
1634    int elementCount;
1635    int diffCount;
1636}
1637PexDiffs;
1638
1639static void
1640pexAddedCb( void * vpex, void * userData )
1641{
1642    PexDiffs * diffs = (PexDiffs *) userData;
1643    tr_pex * pex = (tr_pex *) vpex;
1644    if( diffs->diffCount < MAX_PEX_DIFFS )
1645    {
1646        diffs->diffCount++;
1647        diffs->added[diffs->addedCount++] = *pex;
1648        diffs->elements[diffs->elementCount++] = *pex;
1649    }
1650}
1651
1652static void
1653pexRemovedCb( void * vpex, void * userData )
1654{
1655    PexDiffs * diffs = (PexDiffs *) userData;
1656    tr_pex * pex = (tr_pex *) vpex;
1657    if( diffs->diffCount < MAX_PEX_DIFFS )
1658    {
1659        diffs->diffCount++;
1660        diffs->dropped[diffs->droppedCount++] = *pex;
1661    }
1662}
1663
1664static void
1665pexElementCb( void * vpex, void * userData )
1666{
1667    PexDiffs * diffs = (PexDiffs *) userData;
1668    tr_pex * pex = (tr_pex *) vpex;
1669    if( diffs->diffCount < MAX_PEX_DIFFS )
1670    {
1671        diffs->diffCount++;
1672        diffs->elements[diffs->elementCount++] = *pex;
1673    }
1674}
1675
1676static void
1677sendPex( tr_peermsgs * msgs )
1678{
1679    if( msgs->peerSupportsPex && tr_torrentIsPexEnabled( msgs->torrent ) )
1680    {
1681        int i;
1682        tr_pex * newPex = NULL;
1683        const int newCount = tr_peerMgrGetPeers( msgs->handle->peerMgr, msgs->torrent->info.hash, &newPex );
1684        PexDiffs diffs;
1685        benc_val_t val, *added, *dropped, *flags;
1686        uint8_t *tmp, *walk;
1687        char * benc;
1688        int bencLen;
1689
1690        /* build the diffs */
1691        diffs.added = tr_new( tr_pex, newCount );
1692        diffs.addedCount = 0;
1693        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1694        diffs.droppedCount = 0;
1695        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1696        diffs.elementCount = 0;
1697        diffs.diffCount = 0;
1698        tr_set_compare( msgs->pex, msgs->pexCount,
1699                        newPex, newCount,
1700                        tr_pexCompare, sizeof(tr_pex),
1701                        pexRemovedCb, pexAddedCb, pexElementCb, &diffs );
1702        dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1703
1704        /* update peer */
1705        tr_free( msgs->pex );
1706        msgs->pex = diffs.elements;
1707        msgs->pexCount = diffs.elementCount;
1708
1709        /* build the pex payload */
1710        tr_bencInit( &val, TYPE_DICT );
1711        tr_bencDictReserve( &val, 3 );
1712
1713        /* "added" */
1714        added = tr_bencDictAdd( &val, "added" );
1715        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1716        for( i=0; i<diffs.addedCount; ++i ) {
1717            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1718            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1719        }
1720        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1721        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
1722
1723        /* "added.f" */
1724        flags = tr_bencDictAdd( &val, "added.f" );
1725        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1726        for( i=0; i<diffs.addedCount; ++i )
1727            *walk++ = diffs.added[i].flags;
1728        assert( ( walk - tmp ) == diffs.addedCount );
1729        tr_bencInitStr( flags, tmp, walk-tmp, FALSE );
1730
1731        /* "dropped" */
1732        dropped = tr_bencDictAdd( &val, "dropped" );
1733        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1734        for( i=0; i<diffs.droppedCount; ++i ) {
1735            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1736            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1737        }
1738        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1739        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
1740
1741        /* write the pex message */
1742        benc = tr_bencSave( &val, &bencLen );
1743        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 2*sizeof(uint8_t) + bencLen );
1744        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_LTEP );
1745        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, OUR_LTEP_PEX );
1746        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, benc, bencLen );
1747
1748        /* cleanup */
1749        tr_free( benc );
1750        tr_bencFree( &val );
1751        tr_free( diffs.added );
1752        tr_free( diffs.dropped );
1753        tr_free( newPex );
1754
1755        msgs->clientSentPexAt = time( NULL );
1756    }
1757}
1758
1759static int
1760pexPulse( void * vpeer )
1761{
1762    sendPex( vpeer );
1763    return TRUE;
1764}
1765
1766/**
1767***
1768**/
1769
1770tr_peermsgs*
1771tr_peerMsgsNew( struct tr_torrent * torrent,
1772                struct tr_peer    * info,
1773                tr_delivery_func    func,
1774                void              * userData,
1775                tr_publisher_tag  * setme )
1776{
1777    tr_peermsgs * m;
1778
1779    assert( info != NULL );
1780    assert( info->io != NULL );
1781
1782    m = tr_new0( tr_peermsgs, 1 );
1783    m->publisher = tr_publisherNew( );
1784    m->info = info;
1785    m->handle = torrent->handle;
1786    m->torrent = torrent;
1787    m->io = info->io;
1788    m->info->status = TR_PEER_STATUS_HANDSHAKE;
1789    m->info->clientIsChoked = 1;
1790    m->info->peerIsChoked = 1;
1791    m->info->clientIsInterested = 0;
1792    m->info->peerIsInterested = 0;
1793    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1794    m->state = AWAITING_BT_LENGTH;
1795    m->pulseTimer = tr_timerNew( m->handle, pulse, m, PEER_PULSE_INTERVAL );
1796    m->rateTimer = tr_timerNew( m->handle, ratePulse, m, RATE_PULSE_INTERVAL );
1797    m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL );
1798    m->outMessages = evbuffer_new( );
1799    m->incoming.block = evbuffer_new( );
1800    m->outBlock = evbuffer_new( );
1801    m->peerAllowedPieces = NULL;
1802    m->clientAllowedPieces = NULL;
1803    m->clientSuggestedPieces = NULL;
1804    *setme = tr_publisherSubscribe( m->publisher, func, userData );
1805   
1806    if ( tr_peerIoSupportsFEXT( m->io ) )
1807    {
1808        /* This peer is fastpeer-enabled, generate its allowed set
1809         * (before registering our callbacks) */
1810        if ( !m->peerAllowedPieces ) {
1811            const struct in_addr *peerAddr = tr_peerIoGetAddress( m->io, NULL );
1812           
1813            m->peerAllowedPieces = tr_peerMgrGenerateAllowedSet( MAX_FAST_ALLOWED_COUNT,
1814                                                                 m->torrent->info.pieceCount,
1815                                                                 m->torrent->info.hash,
1816                                                                 peerAddr );
1817        }
1818        m->clientAllowedPieces = tr_bitfieldNew( m->torrent->info.pieceCount );
1819       
1820        m->clientSuggestedPieces = tr_bitfieldNew( m->torrent->info.pieceCount );
1821    }
1822   
1823    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of inactivity */
1824    tr_peerIoSetIOFuncs( m->io, canRead, gotError, m );
1825    ratePulse( m );
1826
1827    if ( tr_peerIoSupportsLTEP( m->io ) )
1828        sendLtepHandshake( m );
1829
1830    if ( !tr_peerIoSupportsFEXT( m->io ) )
1831        sendBitfield( m );
1832    else {
1833        /* This peer is fastpeer-enabled, send it have-all or have-none if appropriate */
1834        uint32_t peerProgress;
1835        float completion = tr_cpPercentComplete( m->torrent->completion );
1836        if ( completion == 0.0f ) {
1837            sendFastHave( m, 0 );
1838        } else if ( completion == 1.0f ) {
1839            sendFastHave( m, 1 );
1840        } else {
1841            sendBitfield( m );
1842        }
1843        peerProgress = m->torrent->info.pieceCount * m->info->progress;
1844       
1845        if ( peerProgress < MAX_FAST_ALLOWED_COUNT )
1846            sendFastAllowedSet( m );
1847    }
1848
1849    return m;
1850}
1851
1852void
1853tr_peerMsgsFree( tr_peermsgs* msgs )
1854{
1855    if( msgs != NULL )
1856    {
1857        tr_timerFree( &msgs->pulseTimer );
1858        tr_timerFree( &msgs->rateTimer );
1859        tr_timerFree( &msgs->pexTimer );
1860        tr_publisherFree( &msgs->publisher );
1861        tr_list_free( &msgs->clientWillAskFor, tr_free );
1862        tr_list_free( &msgs->clientAskedFor, tr_free );
1863        tr_list_free( &msgs->peerAskedForFast, tr_free );
1864        tr_list_free( &msgs->peerAskedFor, tr_free );
1865        tr_bitfieldFree( msgs->peerAllowedPieces );
1866        tr_bitfieldFree( msgs->clientAllowedPieces );
1867        tr_bitfieldFree( msgs->clientSuggestedPieces );
1868        evbuffer_free( msgs->incoming.block );
1869        evbuffer_free( msgs->outMessages );
1870        evbuffer_free( msgs->outBlock );
1871        tr_free( msgs->pex );
1872
1873        memset( msgs, ~0, sizeof( tr_peermsgs ) );
1874        tr_free( msgs );
1875    }
1876}
1877
1878tr_publisher_tag
1879tr_peerMsgsSubscribe( tr_peermsgs       * peer,
1880                      tr_delivery_func    func,
1881                      void              * userData )
1882{
1883    return tr_publisherSubscribe( peer->publisher, func, userData );
1884}
1885
1886void
1887tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
1888                        tr_publisher_tag    tag )
1889{
1890    tr_publisherUnsubscribe( peer->publisher, tag );
1891}
1892
1893int
1894tr_peerMsgsIsPieceFastAllowed( const tr_peermsgs * peer,
1895                               uint32_t            index )
1896{
1897    return tr_bitfieldHas( peer->clientAllowedPieces, index );
1898}
1899
1900int
1901tr_peerMsgsIsPieceSuggested( const tr_peermsgs * peer,
1902                             uint32_t            index )
1903{
1904    return tr_bitfieldHas( peer->clientSuggestedPieces, index );
1905}
1906
Note: See TracBrowser for help on using the repository browser.