source: branches/0.9x/libtransmission/peer-msgs.c @ 3598

Last change on this file since 3598 was 3598, checked in by charles, 15 years ago

Ticket #398 (peer-msgs.c:905: failed assertion `msglen == 0' in 0.90)

  • Property svn:keywords set to Date Rev Author Id
File size: 49.2 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 3598 2007-10-27 15:45:00Z charles $
11 */
12
13#include <assert.h>
14#include <ctype.h>
15#include <errno.h>
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19#include <libgen.h> /* basename */
20
21#include <netinet/in.h> /* struct in_addr */
22
23#include <sys/types.h> /* event.h needs this */
24#include <event.h>
25
26#include "transmission.h"
27#include "bencode.h"
28#include "completion.h"
29#include "inout.h"
30#include "list.h"
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-mgr-private.h"
34#include "peer-msgs.h"
35#include "ratecontrol.h"
36#include "trevent.h"
37#include "utils.h"
38
39/**
40***
41**/
42
43#define MAX_ALLOWED_SET_COUNT   10 /* number of pieces generated for allow-fast,
44                                      threshold for fast-allowing others */
45
46enum
47{
48    BT_CHOKE                = 0,
49    BT_UNCHOKE              = 1,
50    BT_INTERESTED           = 2,
51    BT_NOT_INTERESTED       = 3,
52    BT_HAVE                 = 4,
53    BT_BITFIELD             = 5,
54    BT_REQUEST              = 6,
55    BT_PIECE                = 7,
56    BT_CANCEL               = 8,
57    BT_PORT                 = 9,
58    BT_SUGGEST              = 13,
59    BT_HAVE_ALL             = 14,
60    BT_HAVE_NONE            = 15,
61    BT_REJECT               = 16,
62    BT_ALLOWED_FAST         = 17,
63    BT_LTEP                 = 20,
64
65    LTEP_HANDSHAKE          = 0,
66
67    OUR_LTEP_PEX            = 1,
68
69    MAX_REQUEST_BYTE_COUNT  = (16 * 1024), /* drop requests who want too much */
70
71    KEEPALIVE_INTERVAL_SECS = 90,          /* idle seconds before we send a keepalive */
72    PEX_INTERVAL            = (60 * 1000), /* msec between calls to sendPex() */
73    PEER_PULSE_INTERVAL     = (133),        /* msec between calls to pulse() */
74    RATE_PULSE_INTERVAL     = (333),       /* msec between calls to ratePulse() */
75};
76
77enum
78{
79    AWAITING_BT_LENGTH,
80    AWAITING_BT_MESSAGE,
81    READING_BT_PIECE
82};
83
84struct peer_request
85{
86    uint32_t index;
87    uint32_t offset;
88    uint32_t length;
89    time_t time_requested;
90};
91
92static int
93compareRequest( const void * va, const void * vb )
94{
95    struct peer_request * a = (struct peer_request*) va;
96    struct peer_request * b = (struct peer_request*) vb;
97    if( a->index != b->index ) return a->index - b->index;
98    if( a->offset != b->offset ) return a->offset - b->offset;
99    if( a->length != b->length ) return a->length - b->length;
100    return 0;
101}
102
103struct tr_peermsgs
104{
105    tr_peer * info;
106
107    tr_handle * handle;
108    tr_torrent * torrent;
109    tr_peerIo * io;
110
111    tr_publisher_t * publisher;
112
113    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
114    struct evbuffer * inBlock;     /* the block we're currently receiving */
115    tr_list * peerAskedFor;
116    tr_list * peerAskedForFast;
117    tr_list * clientAskedFor;
118    tr_list * clientWillAskFor;
119
120    tr_timer * rateTimer;
121    tr_timer * pulseTimer;
122    tr_timer * pexTimer;
123
124    struct peer_request blockToUs; /* the block currntly being sent to us */
125
126    time_t lastReqAddedAt;
127    time_t clientSentPexAt;
128    time_t clientSentAnythingAt;
129
130    unsigned int notListening             : 1;
131    unsigned int peerSupportsPex          : 1;
132    unsigned int clientSentLtepHandshake  : 1;
133    unsigned int peerSentLtepHandshake    : 1;
134   
135    tr_bitfield * clientAllowedPieces;
136    tr_bitfield * peerAllowedPieces;
137   
138    uint8_t state;
139    uint8_t ut_pex_id;
140    uint16_t pexCount;
141    uint32_t incomingMessageLength;
142    uint32_t maxActiveRequests;
143    uint32_t minActiveRequests;
144
145    tr_pex * pex;
146};
147
148/**
149***
150**/
151
152static void
153myDebug( const char * file, int line,
154         const struct tr_peermsgs * msgs,
155         const char * fmt, ... )
156{
157    FILE * fp = tr_getLog( );
158    if( fp != NULL )
159    {
160        va_list args;
161        char timestr[64];
162        struct evbuffer * buf = evbuffer_new( );
163        char * myfile = tr_strdup( file );
164
165        evbuffer_add_printf( buf, "[%s] %s [%s]: ",
166                             tr_getLogTimeStr( timestr, sizeof(timestr) ),
167                             tr_peerIoGetAddrStr( msgs->io ),
168                             msgs->info->client );
169        va_start( args, fmt );
170        evbuffer_add_vprintf( buf, fmt, args );
171        va_end( args );
172        evbuffer_add_printf( buf, " (%s:%d)\n", basename(myfile), line );
173        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
174
175        tr_free( myfile );
176        evbuffer_free( buf );
177    }
178}
179
180#define dbgmsg(msgs, fmt...) myDebug(__FILE__, __LINE__, msgs, ##fmt )
181
182/**
183***
184**/
185
186static void
187protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
188{
189    tr_peerIo * io = msgs->io;
190    struct evbuffer * out = msgs->outMessages;
191
192    dbgmsg( msgs, "requesting %u:%u->%u", req->index, req->offset, req->length );
193    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
194    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
195    tr_peerIoWriteUint32( io, out, req->index );
196    tr_peerIoWriteUint32( io, out, req->offset );
197    tr_peerIoWriteUint32( io, out, req->length );
198}
199
200static void
201protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
202{
203    tr_peerIo * io = msgs->io;
204    struct evbuffer * out = msgs->outMessages;
205
206    dbgmsg( msgs, "cancelling %u:%u->%u", req->index, req->offset, req->length );
207    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
208    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
209    tr_peerIoWriteUint32( io, out, req->index );
210    tr_peerIoWriteUint32( io, out, req->offset );
211    tr_peerIoWriteUint32( io, out, req->length );
212}
213
214static void
215protocolSendHave( tr_peermsgs * msgs, uint32_t index )
216{
217    tr_peerIo * io = msgs->io;
218    struct evbuffer * out = msgs->outMessages;
219
220    dbgmsg( msgs, "sending Have %u", index );
221    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
222    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
223    tr_peerIoWriteUint32( io, out, index );
224}
225
226static void
227protocolSendChoke( tr_peermsgs * msgs, int choke )
228{
229    tr_peerIo * io = msgs->io;
230    struct evbuffer * out = msgs->outMessages;
231
232    dbgmsg( msgs, "sending %s", (choke ? "Choke" : "Unchoke") );
233    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) );
234    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
235}
236
237static void
238protocolSendPiece( tr_peermsgs                * msgs,
239                   const struct peer_request  * r,
240                   const uint8_t              * pieceData )
241{
242    tr_peerIo * io = msgs->io;
243    struct evbuffer * out = evbuffer_new( );
244
245    dbgmsg( msgs, "sending block %u:%u->%u", r->index, r->offset, r->length );
246    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 2*sizeof(uint32_t) + r->length );
247    tr_peerIoWriteUint8 ( io, out, BT_PIECE );
248    tr_peerIoWriteUint32( io, out, r->index );
249    tr_peerIoWriteUint32( io, out, r->offset );
250    tr_peerIoWriteBytes ( io, out, pieceData, r->length );
251    tr_peerIoWriteBuf   ( io, out );
252
253    evbuffer_free( out );
254}
255
256/**
257***  EVENTS
258**/
259
260static const tr_peermsgs_event blankEvent = { 0, 0, 0, 0, 0.0f };
261
262static void
263publish( tr_peermsgs * msgs, tr_peermsgs_event * e )
264{
265    tr_publisherPublish( msgs->publisher, msgs->info, e );
266}
267
268static void
269fireGotError( tr_peermsgs * msgs )
270{
271    tr_peermsgs_event e = blankEvent;
272    e.eventType = TR_PEERMSG_GOT_ERROR;
273    publish( msgs, &e );
274}
275
276static void
277fireNeedReq( tr_peermsgs * msgs )
278{
279    tr_peermsgs_event e = blankEvent;
280    e.eventType = TR_PEERMSG_NEED_REQ;
281    publish( msgs, &e );
282}
283
284static void
285firePeerProgress( tr_peermsgs * msgs )
286{
287    tr_peermsgs_event e = blankEvent;
288    e.eventType = TR_PEERMSG_PEER_PROGRESS;
289    e.progress = msgs->info->progress;
290    publish( msgs, &e );
291}
292
293static void
294fireClientHave( tr_peermsgs * msgs, uint32_t pieceIndex )
295{
296    tr_peermsgs_event e = blankEvent;
297    e.eventType = TR_PEERMSG_CLIENT_HAVE;
298    e.pieceIndex = pieceIndex;
299    publish( msgs, &e );
300}
301
302static void
303fireGotBlock( tr_peermsgs * msgs, uint32_t pieceIndex, uint32_t offset, uint32_t length )
304{
305    tr_peermsgs_event e = blankEvent;
306    e.eventType = TR_PEERMSG_CLIENT_BLOCK;
307    e.pieceIndex = pieceIndex;
308    e.offset = offset;
309    e.length = length;
310    publish( msgs, &e );
311}
312
313static void
314fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
315{
316    tr_peermsgs_event e = blankEvent;
317    e.eventType = TR_PEERMSG_CANCEL;
318    e.pieceIndex = req->index;
319    e.offset = req->offset;
320    e.length = req->length;
321    publish( msgs, &e );
322}
323
324/**
325***  INTEREST
326**/
327
328static int
329isPieceInteresting( const tr_peermsgs   * peer,
330                    int                   piece )
331{
332    const tr_torrent * torrent = peer->torrent;
333    if( torrent->info.pieces[piece].dnd ) /* we don't want it */
334        return FALSE;
335    if( tr_cpPieceIsComplete( torrent->completion, piece ) ) /* we have it */
336        return FALSE;
337    if( !tr_bitfieldHas( peer->info->have, piece ) ) /* peer doesn't have it */
338        return FALSE;
339    if( tr_bitfieldHas( peer->info->banned, piece ) ) /* peer is banned */
340        return FALSE;
341    return TRUE;
342}
343
344/* "interested" means we'll ask for piece data if they unchoke us */
345static int
346isPeerInteresting( const tr_peermsgs * msgs )
347{
348    int i;
349    const tr_torrent * torrent;
350    const tr_bitfield * bitfield;
351    const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
352
353    if( clientIsSeed )
354        return FALSE;
355
356    torrent = msgs->torrent;
357    bitfield = tr_cpPieceBitfield( torrent->completion );
358
359    if( !msgs->info->have )
360        return TRUE;
361
362    assert( bitfield->len == msgs->info->have->len );
363    for( i=0; i<torrent->info.pieceCount; ++i )
364        if( isPieceInteresting( msgs, i ) )
365            return TRUE;
366
367    return FALSE;
368}
369
370static void
371sendInterest( tr_peermsgs * msgs, int weAreInterested )
372{
373    assert( msgs != NULL );
374    assert( weAreInterested==0 || weAreInterested==1 );
375
376    msgs->info->clientIsInterested = weAreInterested;
377    dbgmsg( msgs, "Sending %s",
378            weAreInterested ? "Interested" : "Not Interested");
379
380    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
381    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages,
382                   weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
383}
384
385static void
386updateInterest( tr_peermsgs * msgs )
387{
388    const int i = isPeerInteresting( msgs );
389    if( i != msgs->info->clientIsInterested )
390        sendInterest( msgs, i );
391    if( i )
392        fireNeedReq( msgs );
393}
394
395#define MIN_CHOKE_PERIOD_SEC 10
396
397static void
398cancelAllRequestsToClientExceptFast( tr_peermsgs * msgs )
399{
400    tr_list_free( &msgs->peerAskedFor, tr_free );
401}
402
403void
404tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
405{
406    const time_t fibrillationTime = time(NULL) - MIN_CHOKE_PERIOD_SEC;
407
408    assert( msgs != NULL );
409    assert( msgs->info != NULL );
410    assert( choke==0 || choke==1 );
411
412    if( msgs->info->chokeChangedAt > fibrillationTime )
413    {
414        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
415    }
416    else if( msgs->info->peerIsChoked != choke )
417    {
418        msgs->info->peerIsChoked = choke;
419        if( choke )
420            cancelAllRequestsToClientExceptFast( msgs );
421        protocolSendChoke( msgs, choke );
422        msgs->info->chokeChangedAt = time( NULL );
423    }
424}
425
426/**
427***
428**/
429
430void
431tr_peerMsgsHave( tr_peermsgs * msgs,
432                 uint32_t      index )
433{
434    protocolSendHave( msgs, index );
435
436    /* since we have more pieces now, we might not be interested in this peer */
437    updateInterest( msgs );
438}
439#if 0
440static void
441sendFastSuggest( tr_peermsgs * msgs,
442                 uint32_t      pieceIndex )
443{
444    dbgmsg( msgs, "w00t SUGGESTing them piece #%d", pieceIndex );
445    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
446    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
447    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
448   
449    updateInterest( msgs );
450}
451#endif
452static void
453sendFastHave( tr_peermsgs * msgs,
454              int           all)
455{
456    dbgmsg( msgs, "w00t telling them we %s pieces", (all ? "HAVE_ALL" : "HAVE_NONE" ) );
457    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
458    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, ( all ? BT_HAVE_ALL : BT_HAVE_NONE ) );
459   
460    updateInterest( msgs );
461}
462
463static void
464sendFastReject( tr_peermsgs * msgs,
465                uint32_t      pieceIndex,
466                uint32_t      offset,
467                uint32_t      length )
468{
469    assert( msgs != NULL );
470    assert( length > 0 );
471
472    if( tr_peerIoSupportsFEXT( msgs->io ) )
473    {
474        const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
475        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
476        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_REJECT );
477        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
478        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
479        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
480    }
481}
482
483static void
484sendFastAllowed( tr_peermsgs * msgs,
485                 uint32_t      pieceIndex)
486{
487    dbgmsg( msgs, "w00t telling them we ALLOW_FAST piece #%d", pieceIndex );
488    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
489    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_ALLOWED_FAST );
490    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
491}
492
493
494
495static void
496sendFastAllowedSet( tr_peermsgs * msgs )
497{
498    int i = 0;
499    while (i <= msgs->torrent->info.pieceCount )
500    {
501        if ( tr_bitfieldHas( msgs->peerAllowedPieces, i) )
502            sendFastAllowed( msgs, i );
503        i++;
504    }
505}
506
507
508/**
509***
510**/
511
512static int
513reqIsValid( const tr_peermsgs * msgs, uint32_t index, uint32_t offset, uint32_t length )
514{
515    const tr_torrent * tor = msgs->torrent;
516
517    if( index >= (uint32_t) tor->info.pieceCount )
518        return FALSE;
519    if ( (int)offset >= tr_torPieceCountBytes( tor, (int)index ) )
520        return FALSE;
521    if( length > MAX_REQUEST_BYTE_COUNT )
522        return FALSE;
523    if( tr_pieceOffset( tor, index, offset, length ) > tor->info.totalSize )
524        return FALSE;
525
526    return TRUE;
527}
528
529static int
530requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
531{
532    return reqIsValid( msgs, req->index, req->offset, req->length );
533}
534
535static void
536pumpRequestQueue( tr_peermsgs * msgs )
537{
538    const int max = msgs->maxActiveRequests;
539    const int min = msgs->minActiveRequests;
540    int count = tr_list_size( msgs->clientAskedFor );
541    int sent = 0;
542
543    if( count > min )
544        return;
545    if( msgs->info->clientIsChoked )
546        return;
547
548    while( ( count < max ) && ( msgs->clientWillAskFor != NULL ) )
549    {
550        struct peer_request * req = tr_list_pop_front( &msgs->clientWillAskFor );
551        protocolSendRequest( msgs, req );
552        req->time_requested = msgs->lastReqAddedAt = time( NULL );
553        tr_list_append( &msgs->clientAskedFor, req );
554        ++count;
555        ++sent;
556    }
557
558    if( sent )
559        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
560                sent,
561                tr_list_size(msgs->clientAskedFor),
562                tr_list_size(msgs->clientWillAskFor) );
563
564    if( count < max )
565        fireNeedReq( msgs );
566}
567
568int
569tr_peerMsgsAddRequest( tr_peermsgs * msgs,
570                       uint32_t      index, 
571                       uint32_t      offset, 
572                       uint32_t      length )
573{
574    const int req_max = msgs->maxActiveRequests;
575    struct peer_request tmp, *req;
576
577    assert( msgs != NULL );
578    assert( msgs->torrent != NULL );
579    assert( reqIsValid( msgs, index, offset, length ) );
580
581    /**
582    ***  Reasons to decline the request
583    **/
584
585    /* don't send requests to choked clients */
586    if( msgs->info->clientIsChoked ) {
587        dbgmsg( msgs, "declining request because they're choking us" );
588        return TR_ADDREQ_CLIENT_CHOKED;
589    }
590
591    /* peer doesn't have this piece */
592    if( !tr_bitfieldHas( msgs->info->have, index ) )
593        return TR_ADDREQ_MISSING;
594
595    /* peer's queue is full */
596    if( tr_list_size( msgs->clientWillAskFor ) >= req_max ) {
597        dbgmsg( msgs, "declining request because we're full" );
598        return TR_ADDREQ_FULL;
599    }
600
601    /* have we already asked for this piece? */
602    tmp.index = index;
603    tmp.offset = offset;
604    tmp.length = length;
605    if( tr_list_find( msgs->clientAskedFor, &tmp, compareRequest ) ) {
606        dbgmsg( msgs, "declining because it's a duplicate" );
607        return TR_ADDREQ_DUPLICATE;
608    }
609    if( tr_list_find( msgs->clientWillAskFor, &tmp, compareRequest ) ) {
610        dbgmsg( msgs, "declining because it's a duplicate" );
611        return TR_ADDREQ_DUPLICATE;
612    }
613
614    /**
615    ***  Accept this request
616    **/
617
618    dbgmsg( msgs, "added req for piece %d, offset %d", (int)index, (int)offset );
619    req = tr_new0( struct peer_request, 1 );
620    *req = tmp;
621    tr_list_append( &msgs->clientWillAskFor, req );
622    return TR_ADDREQ_OK;
623}
624
625static void
626cancelAllRequestsToPeer( tr_peermsgs * msgs )
627{
628    struct peer_request * req;
629
630    while(( req = tr_list_pop_front( &msgs->clientWillAskFor ) ))
631    {
632        fireCancelledReq( msgs, req );
633        tr_free( req );
634    }
635
636    while(( req = tr_list_pop_front( &msgs->clientAskedFor ) ))
637    {
638        fireCancelledReq( msgs, req );
639        protocolSendCancel( msgs, req );
640        tr_free( req );
641    }
642}
643
644void
645tr_peerMsgsCancel( tr_peermsgs * msgs,
646                   uint32_t      pieceIndex,
647                   uint32_t      offset,
648                   uint32_t      length )
649{
650    struct peer_request *req, tmp;
651
652    assert( msgs != NULL );
653    assert( length > 0 );
654
655    /* have we asked the peer for this piece? */
656    tmp.index = pieceIndex;
657    tmp.offset = offset;
658    tmp.length = length;
659
660    /* if it's only in the queue and hasn't been sent yet, free it */
661    if(( req = tr_list_remove( &msgs->clientWillAskFor, &tmp, compareRequest ) ))
662    {
663        fireCancelledReq( msgs, req );
664        tr_free( req );
665    }
666
667    /* if it's already been sent, send a cancel message too */
668    if(( req = tr_list_remove( &msgs->clientAskedFor, &tmp, compareRequest ) ))
669    {
670        protocolSendCancel( msgs, req );
671        fireCancelledReq( msgs, req );
672        tr_free( req );
673    }
674}
675
676/**
677***
678**/
679
680static void
681sendLtepHandshake( tr_peermsgs * msgs )
682{
683    benc_val_t val, *m;
684    char * buf;
685    int len;
686    int pex;
687    const char * v = TR_NAME " " USERAGENT_PREFIX;
688    const int port = tr_getPublicPort( msgs->handle );
689    struct evbuffer * outbuf;
690
691    if( msgs->clientSentLtepHandshake )
692        return;
693
694    outbuf = evbuffer_new( );
695    dbgmsg( msgs, "sending an ltep handshake" );
696    msgs->clientSentLtepHandshake = 1;
697
698    /* decide if we want to advertise pex support */
699    if( !tr_torrentIsPexEnabled( msgs->torrent ) )
700        pex = 0;
701    else if( msgs->peerSentLtepHandshake )
702        pex = msgs->peerSupportsPex ? 1 : 0;
703    else
704        pex = 1;
705
706    tr_bencInit( &val, TYPE_DICT );
707    tr_bencDictReserve( &val, 4 );
708    tr_bencInitInt( tr_bencDictAdd( &val, "e" ), 1 );
709    m  = tr_bencDictAdd( &val, "m" );
710    tr_bencInit( m, TYPE_DICT );
711    if( pex ) {
712        tr_bencDictReserve( m, 1 );
713        tr_bencInitInt( tr_bencDictAdd( m, "ut_pex" ), OUR_LTEP_PEX );
714    }
715    if( port > 0 )
716        tr_bencInitInt( tr_bencDictAdd( &val, "p" ), port );
717    tr_bencInitStr( tr_bencDictAdd( &val, "v" ), v, 0, 1 );
718    buf = tr_bencSaveMalloc( &val,  &len );
719
720    tr_peerIoWriteUint32( msgs->io, outbuf, 2*sizeof(uint8_t) + len );
721    tr_peerIoWriteUint8 ( msgs->io, outbuf, BT_LTEP );
722    tr_peerIoWriteUint8 ( msgs->io, outbuf, LTEP_HANDSHAKE );
723    tr_peerIoWriteBytes ( msgs->io, outbuf, buf, len );
724
725    tr_peerIoWriteBuf( msgs->io, outbuf );
726
727#if 0
728    dbgmsg( msgs, "here is the ltep handshake we sent:" );
729    tr_bencPrint( &val );
730#endif
731
732    /* cleanup */
733    tr_bencFree( &val );
734    tr_free( buf );
735    evbuffer_free( outbuf );
736}
737
738static void
739parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
740{
741    benc_val_t val, * sub;
742    uint8_t * tmp = tr_new( uint8_t, len );
743
744    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
745    msgs->peerSentLtepHandshake = 1;
746
747    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
748        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
749        tr_free( tmp );
750        return;
751    }
752
753#if 0
754    dbgmsg( msgs, "here is the ltep handshake we read:" );
755    tr_bencPrint( &val );
756#endif
757
758    /* does the peer prefer encrypted connections? */
759    sub = tr_bencDictFind( &val, "e" );
760    if( tr_bencIsInt( sub ) )
761        msgs->info->encryption_preference = sub->val.i
762                                      ? ENCRYPTION_PREFERENCE_YES
763                                      : ENCRYPTION_PREFERENCE_NO;
764
765    /* check supported messages for utorrent pex */
766    sub = tr_bencDictFind( &val, "m" );
767    if( tr_bencIsDict( sub ) ) {
768        sub = tr_bencDictFind( sub, "ut_pex" );
769        if( tr_bencIsInt( sub ) ) {
770            msgs->peerSupportsPex = 1;
771            msgs->ut_pex_id = (uint8_t) sub->val.i;
772            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
773        }
774    }
775
776    /* get peer's listening port */
777    sub = tr_bencDictFind( &val, "p" );
778    if( tr_bencIsInt( sub ) ) {
779        msgs->info->port = htons( (uint16_t)sub->val.i );
780        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
781    }
782
783    tr_bencFree( &val );
784    tr_free( tmp );
785}
786
787static void
788parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
789{
790    benc_val_t val, * sub;
791    uint8_t * tmp;
792
793    if( !tr_torrentIsPexEnabled( msgs->torrent ) ) /* no sharing! */
794        return;
795
796    tmp = tr_new( uint8_t, msglen );
797    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
798
799    if( tr_bencLoad( tmp, msglen, &val, NULL ) || !tr_bencIsDict( &val ) ) {
800        dbgmsg( msgs, "GET can't read extended-pex dictionary" );
801        tr_free( tmp );
802        return;
803    }
804
805    sub = tr_bencDictFind( &val, "added" );
806    if( tr_bencIsStr(sub) && ((sub->val.s.i % 6) == 0)) {
807        const int n = sub->val.s.i / 6 ;
808        dbgmsg( msgs, "got %d peers from uT pex", n );
809        tr_peerMgrAddPeers( msgs->handle->peerMgr,
810                            msgs->torrent->info.hash,
811                            TR_PEER_FROM_PEX,
812                            (uint8_t*)sub->val.s.s, n );
813    }
814
815    tr_bencFree( &val );
816    tr_free( tmp );
817}
818
819static void
820sendPex( tr_peermsgs * msgs );
821
822static void
823parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
824{
825    uint8_t ltep_msgid;
826
827    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
828    msglen--;
829
830    if( ltep_msgid == LTEP_HANDSHAKE )
831    {
832        dbgmsg( msgs, "got ltep handshake" );
833        parseLtepHandshake( msgs, msglen, inbuf );
834        sendLtepHandshake( msgs );
835        sendPex( msgs );
836    }
837    else if( ltep_msgid == msgs->ut_pex_id )
838    {
839        dbgmsg( msgs, "got ut pex" );
840        msgs->peerSupportsPex = 1;
841        parseUtPex( msgs, msglen, inbuf );
842    }
843    else
844    {
845        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
846        evbuffer_drain( inbuf, msglen );
847    }
848}
849
850static int
851readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf )
852{
853    uint32_t len;
854    const size_t needlen = sizeof(uint32_t);
855
856    if( EVBUFFER_LENGTH(inbuf) < needlen )
857        return READ_MORE;
858
859    tr_peerIoReadUint32( msgs->io, inbuf, &len );
860
861    if( len == 0 ) /* peer sent us a keepalive message */
862        dbgmsg( msgs, "got KeepAlive" );
863    else {
864        msgs->incomingMessageLength = len;
865        msgs->state = AWAITING_BT_MESSAGE;
866    }
867
868    return READ_AGAIN;
869}
870
871static void
872updatePeerProgress( tr_peermsgs * msgs )
873{
874    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
875    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
876    updateInterest( msgs );
877    firePeerProgress( msgs );
878}
879
880static int
881clientCanSendFastBlock( const tr_peermsgs * msgs UNUSED )
882{
883    /* FIXME(tiennou): base this on how many blocks we've already sent this
884     * peer, or maybe how many fast blocks per minute we've sent overall,
885     * or maybe how much bandwidth we're already using up w/o fast peers.
886     * I don't know what the Right Thing here is, but
887     * the previous measurement of how many pieces we have is not enough. */
888    return FALSE;
889}
890
891static void
892peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
893{
894    const int reqIsValid = requestIsValid( msgs, req );
895    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
896    const int peerIsChoked = msgs->info->peerIsChoked;
897    const int peerIsFast = tr_peerIoSupportsFEXT( msgs->io );
898    const int pieceIsFast = reqIsValid && tr_bitfieldHas( msgs->peerAllowedPieces, req->index );
899    const int canSendFast = clientCanSendFastBlock( msgs );
900
901    if( !reqIsValid ) /* bad request */
902    {
903        dbgmsg( msgs, "rejecting an invalid request." );
904        sendFastReject( msgs, req->index, req->offset, req->length );
905    }
906    else if( !clientHasPiece ) /* we don't have it */
907    {
908        dbgmsg( msgs, "rejecting request for a piece we don't have." );
909        sendFastReject( msgs, req->index, req->offset, req->length );
910    }
911    else if( peerIsChoked && !peerIsFast ) /* maybe he doesn't know he's choked? */
912    {
913        tr_peerMsgsSetChoke( msgs, 1 );
914        sendFastReject( msgs, req->index, req->offset, req->length );
915    }
916    else if( peerIsChoked && peerIsFast && ( !pieceIsFast || !canSendFast ) )
917    {
918        sendFastReject( msgs, req->index, req->offset, req->length );
919    }
920    else /* YAY */
921    {
922        struct peer_request * tmp = tr_new( struct peer_request, 1 );
923        *tmp = *req;
924        if( peerIsFast && pieceIsFast )
925            tr_list_append( &msgs->peerAskedForFast, tmp );
926        else
927            tr_list_append( &msgs->peerAskedFor, tmp );
928    }
929}
930
931static int
932messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
933{
934    switch( id )
935    {
936        case BT_CHOKE:
937        case BT_UNCHOKE:
938        case BT_INTERESTED:
939        case BT_NOT_INTERESTED:
940        case BT_HAVE_ALL:
941        case BT_HAVE_NONE:
942            return len==1;
943
944        case BT_HAVE:
945        case BT_SUGGEST:
946        case BT_ALLOWED_FAST:
947            return len==5;
948
949        case BT_BITFIELD:
950            return len == (msg->torrent->info.pieceCount+7u)/8u + 1u;
951       
952        case BT_REQUEST:
953        case BT_CANCEL:
954        case BT_REJECT:
955            return len==13;
956
957        case BT_PIECE:
958            return len > 9;
959
960        case BT_PORT:
961            return len==3;
962
963        case BT_LTEP:
964            return len >= 2;
965
966        default:
967            return FALSE;
968    }
969}
970
971
972static int
973readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf )
974{
975    uint8_t id;
976    uint32_t ui32;
977    uint32_t msglen = msgs->incomingMessageLength;
978
979    if( EVBUFFER_LENGTH(inbuf) < msglen )
980        return READ_MORE;
981
982    tr_peerIoReadUint8( msgs->io, inbuf, &id );
983    dbgmsg( msgs, "got BT id %d, len %d", (int)id, (int)msglen );
984
985    if( !messageLengthIsCorrect( msgs, id, msglen ) )
986    {
987        fireGotError( msgs );
988        return READ_DONE;
989    }
990
991    --msglen;
992
993    switch( id )
994    {
995        case BT_CHOKE:
996            dbgmsg( msgs, "got Choke" );
997            msgs->info->clientIsChoked = 1;
998            cancelAllRequestsToPeer( msgs );
999            cancelAllRequestsToClientExceptFast( msgs );
1000            break;
1001
1002        case BT_UNCHOKE:
1003            dbgmsg( msgs, "got Unchoke" );
1004            msgs->info->clientIsChoked = 0;
1005            fireNeedReq( msgs );
1006            break;
1007
1008        case BT_INTERESTED:
1009            dbgmsg( msgs, "got Interested" );
1010            msgs->info->peerIsInterested = 1;
1011            tr_peerMsgsSetChoke( msgs, 0 );
1012            break;
1013
1014        case BT_NOT_INTERESTED:
1015            dbgmsg( msgs, "got Not Interested" );
1016            msgs->info->peerIsInterested = 0;
1017            break;
1018
1019        case BT_HAVE:
1020            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1021            dbgmsg( msgs, "got Have: %u", ui32 );
1022            tr_bitfieldAdd( msgs->info->have, ui32 );
1023            updatePeerProgress( msgs );
1024            tr_rcTransferred( msgs->torrent->swarmspeed, msgs->torrent->info.pieceSize );
1025            break;
1026
1027        case BT_BITFIELD: {
1028            const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
1029            dbgmsg( msgs, "got a bitfield" );
1030            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
1031            updatePeerProgress( msgs );
1032            tr_peerMsgsSetChoke( msgs, !clientIsSeed || (msgs->info->progress<1.0) );
1033            fireNeedReq( msgs );
1034            break;
1035        }
1036
1037        case BT_REQUEST: {
1038            struct peer_request req;
1039            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1040            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1041            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1042            dbgmsg( msgs, "got Request: %u:%u->%u", req.index, req.offset, req.length );
1043            peerMadeRequest( msgs, &req );
1044            break;
1045        }
1046
1047        case BT_CANCEL: {
1048            struct peer_request req;
1049            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1050            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1051            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1052            dbgmsg( msgs, "got a Cancel %u:%u->%u", req.index, req.offset, req.length );
1053            tr_free( tr_list_remove( &msgs->peerAskedForFast, &req, compareRequest ) );
1054            tr_free( tr_list_remove( &msgs->peerAskedFor, &req, compareRequest ) );
1055            break;
1056        }
1057
1058        case BT_PIECE: {
1059            dbgmsg( msgs, "got a Piece!" );
1060            assert( msgs->blockToUs.length == 0 );
1061            tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.index );
1062            tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.offset );
1063            msgs->blockToUs.length = msglen - 8;
1064            assert( EVBUFFER_LENGTH(msgs->inBlock) == 0 );
1065            msgs->state = msgs->blockToUs.length ? READING_BT_PIECE : AWAITING_BT_LENGTH;
1066            return READ_AGAIN;
1067            break;
1068        }
1069
1070        case BT_PORT: {
1071            dbgmsg( msgs, "Got a BT_PORT" );
1072            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1073            break;
1074        }
1075   
1076        case BT_SUGGEST: {
1077            /* FIXME(tiennou) */
1078            uint32_t index;
1079            tr_peerIoReadUint32( msgs->io, inbuf, &index );
1080            break;
1081        }
1082       
1083        case BT_HAVE_ALL:
1084            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1085            tr_bitfieldAddRange( msgs->info->have, 0, msgs->torrent->info.pieceCount );
1086            updatePeerProgress( msgs );
1087            break;
1088       
1089        case BT_HAVE_NONE:
1090            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1091            tr_bitfieldClear( msgs->info->have );
1092            updatePeerProgress( msgs );
1093            break;
1094       
1095        case BT_REJECT: {
1096            struct peer_request req;
1097            dbgmsg( msgs, "Got a BT_REJECT" );
1098            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1099            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1100            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1101            tr_free( tr_list_remove( &msgs->clientAskedFor, &req, compareRequest ) );
1102            break;
1103        }
1104       
1105        case BT_ALLOWED_FAST: {
1106            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1107            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1108            tr_bitfieldAdd( msgs->clientAllowedPieces, ui32 );
1109            break;
1110        }
1111       
1112        case BT_LTEP:
1113            dbgmsg( msgs, "Got a BT_LTEP" );
1114            parseLtep( msgs, msglen, inbuf );
1115            break;
1116
1117        default:
1118            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1119            tr_peerIoDrain( msgs->io, inbuf, msglen );
1120            break;
1121    }
1122
1123    msgs->incomingMessageLength = -1;
1124    msgs->state = AWAITING_BT_LENGTH;
1125    return READ_AGAIN;
1126}
1127
1128static void
1129clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1130{
1131    tr_torrent * tor = msgs->torrent;
1132    tor->activityDate = tr_date( );
1133    tor->downloadedCur += byteCount;
1134    msgs->info->pieceDataActivityDate = time( NULL );
1135    tr_rcTransferred( msgs->info->rcToClient, byteCount );
1136    tr_rcTransferred( tor->download, byteCount );
1137    tr_rcTransferred( tor->handle->download, byteCount );
1138}
1139
1140static void
1141peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1142{
1143    tr_torrent * tor = msgs->torrent;
1144    tor->activityDate = tr_date( );
1145    tor->uploadedCur += byteCount;
1146    msgs->info->pieceDataActivityDate = time( NULL );
1147    tr_rcTransferred( msgs->info->rcToPeer, byteCount );
1148    tr_rcTransferred( tor->upload, byteCount );
1149    tr_rcTransferred( tor->handle->upload, byteCount );
1150}
1151
1152static int
1153canDownload( const tr_peermsgs * msgs )
1154{
1155    tr_torrent * tor = msgs->torrent;
1156
1157    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1158        return !tor->handle->useDownloadLimit || tr_rcCanTransfer( tor->handle->download );
1159
1160    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
1161        return tr_rcCanTransfer( tor->download );
1162
1163    return TRUE;
1164}
1165
1166static void
1167reassignBytesToCorrupt( tr_peermsgs * msgs, uint32_t byteCount )
1168{
1169    tr_torrent * tor = msgs->torrent;
1170
1171    /* increment the `corrupt' field */
1172    tor->corruptCur += byteCount;
1173
1174    /* decrement the `downloaded' field */
1175    if( tor->downloadedCur >= byteCount )
1176        tor->downloadedCur -= byteCount;
1177    else
1178        tor->downloadedCur = 0;
1179}
1180
1181
1182static void
1183gotBadPiece( tr_peermsgs * msgs, uint32_t pieceIndex )
1184{
1185    const uint32_t byteCount = tr_torPieceCountBytes( msgs->torrent, (int)pieceIndex );
1186    reassignBytesToCorrupt( msgs, byteCount );
1187}
1188
1189static void
1190gotUnwantedBlock( tr_peermsgs * msgs,
1191                  uint32_t      index UNUSED,
1192                  uint32_t      offset UNUSED,
1193                  uint32_t      length )
1194{
1195    reassignBytesToCorrupt( msgs, length );
1196}
1197
1198static void
1199addUsToBlamefield( tr_peermsgs * msgs, uint32_t index )
1200{
1201    if( !msgs->info->blame )
1202         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1203    tr_bitfieldAdd( msgs->info->blame, index );
1204}
1205
1206static void
1207gotBlock( tr_peermsgs      * msgs,
1208          struct evbuffer  * inbuf,
1209          uint32_t           index,
1210          uint32_t           offset,
1211          uint32_t           length )
1212{
1213    tr_torrent * tor = msgs->torrent;
1214    const int block = _tr_block( tor, index, offset );
1215    struct peer_request key, *req;
1216
1217    /**
1218    *** Remove the block from our `we asked for this' list
1219    **/
1220
1221    key.index = index;
1222    key.offset = offset;
1223    key.length = length;
1224    req = (struct peer_request*) tr_list_remove( &msgs->clientAskedFor, &key,
1225                                                 compareRequest );
1226    if( req == NULL ) {
1227        gotUnwantedBlock( msgs, index, offset, length );
1228        dbgmsg( msgs, "we didn't ask for this message..." );
1229        return;
1230    }
1231    dbgmsg( msgs, "Got block %u:%u->%u (turnaround time %d secs)", 
1232                     req->index, req->offset, req->length,
1233                     (int)(time(NULL) - req->time_requested) );
1234    tr_free( req );
1235    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1236                  tr_list_size(msgs->clientAskedFor));
1237
1238    /**
1239    *** Error checks
1240    **/
1241
1242    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1243        dbgmsg( msgs, "have this block already..." );
1244        tr_dbg( "have this block already..." );
1245        gotUnwantedBlock( msgs, index, offset, length );
1246        return;
1247    }
1248
1249    if( (int)length != tr_torBlockCountBytes( tor, block ) ) {
1250        dbgmsg( msgs, "block is the wrong length..." );
1251        tr_dbg( "block is the wrong length..." );
1252        gotUnwantedBlock( msgs, index, offset, length );
1253        return;
1254    }
1255
1256    /**
1257    ***  Write the block
1258    **/
1259
1260    if( tr_ioWrite( tor, index, offset, length, EVBUFFER_DATA( inbuf )))
1261        return;
1262
1263    tr_cpBlockAdd( tor->completion, block );
1264
1265    addUsToBlamefield( msgs, index );
1266
1267    fireGotBlock( msgs, index, offset, length );
1268
1269    /**
1270    ***  Handle if this was the last block in the piece
1271    **/
1272
1273    if( tr_cpPieceIsComplete( tor->completion, index ) )
1274    {
1275        if( tr_ioHash( tor, index ) )
1276        {
1277            gotBadPiece( msgs, index );
1278            return;
1279        }
1280
1281        fireClientHave( msgs, index );
1282    }
1283}
1284
1285
1286static ReadState
1287readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf )
1288{
1289    uint32_t inlen;
1290    uint8_t * tmp;
1291
1292    assert( msgs != NULL );
1293    assert( msgs->blockToUs.length > 0 );
1294    assert( inbuf != NULL );
1295    assert( EVBUFFER_LENGTH( inbuf ) > 0 );
1296
1297    /* read from the inbuf into our block buffer */
1298    inlen = MIN( EVBUFFER_LENGTH(inbuf), msgs->blockToUs.length );
1299    tmp = tr_new( uint8_t, inlen );
1300    tr_peerIoReadBytes( msgs->io, inbuf, tmp, inlen );
1301    evbuffer_add( msgs->inBlock, tmp, inlen );
1302
1303    /* update our tables accordingly */
1304    assert( inlen >= msgs->blockToUs.length );
1305    msgs->blockToUs.length -= inlen;
1306    msgs->info->peerSentPieceDataAt = time( NULL );
1307    clientGotBytes( msgs, inlen );
1308
1309    /* if this was the entire block, save it */
1310    if( !msgs->blockToUs.length )
1311    {
1312        dbgmsg( msgs, "got block %u:%u", msgs->blockToUs.index, msgs->blockToUs.offset );
1313        assert( (int)EVBUFFER_LENGTH( msgs->inBlock ) == tr_torBlockCountBytes( msgs->torrent, _tr_block(msgs->torrent,msgs->blockToUs.index, msgs->blockToUs.offset) ) );
1314        gotBlock( msgs, msgs->inBlock,
1315                        msgs->blockToUs.index,
1316                        msgs->blockToUs.offset,
1317                        EVBUFFER_LENGTH( msgs->inBlock ) );
1318        evbuffer_drain( msgs->inBlock, ~0 );
1319        msgs->state = AWAITING_BT_LENGTH;
1320    }
1321
1322    /* cleanup */
1323    tr_free( tmp );
1324    return READ_AGAIN;
1325}
1326
1327static ReadState
1328canRead( struct bufferevent * evin, void * vmsgs )
1329{
1330    ReadState ret;
1331    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1332    struct evbuffer * inbuf = EVBUFFER_INPUT ( evin );
1333
1334    if( !canDownload( msgs ) )
1335    {
1336        msgs->notListening = 1;
1337        tr_peerIoSetIOMode ( msgs->io, 0, EV_READ );
1338        ret = READ_DONE;
1339    }
1340    else switch( msgs->state )
1341    {
1342        case AWAITING_BT_LENGTH:  ret = readBtLength  ( msgs, inbuf ); break;
1343        case AWAITING_BT_MESSAGE: ret = readBtMessage ( msgs, inbuf ); break;
1344        case READING_BT_PIECE:    ret = readBtPiece   ( msgs, inbuf ); break;
1345        default: assert( 0 );
1346    }
1347
1348    return ret;
1349}
1350
1351static void
1352sendKeepalive( tr_peermsgs * msgs )
1353{
1354    dbgmsg( msgs, "sending a keepalive message" );
1355    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1356}
1357
1358/**
1359***
1360**/
1361
1362static int
1363canWrite( const tr_peermsgs * msgs )
1364{
1365    /* don't let our outbuffer get too large */
1366    if( tr_peerIoWriteBytesWaiting( msgs->io ) > 8192 )
1367        return FALSE;
1368
1369    return TRUE;
1370}
1371
1372static int
1373canUpload( const tr_peermsgs * msgs )
1374{
1375    const tr_torrent * tor = msgs->torrent;
1376
1377    if( !canWrite( msgs ) )
1378        return FALSE;
1379
1380    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1381        return !tor->handle->useUploadLimit || tr_rcCanTransfer( tor->handle->upload );
1382
1383    if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
1384        return tr_rcCanTransfer( tor->upload );
1385
1386    return TRUE;
1387}
1388
1389static int
1390ratePulse( void * vmsgs )
1391{
1392    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1393    msgs->info->rateToClient = tr_rcRate( msgs->info->rcToClient );
1394    msgs->info->rateToPeer = tr_rcRate( msgs->info->rcToPeer );
1395    msgs->maxActiveRequests = MIN( 8 + (int)(msgs->info->rateToClient/10), 100 );
1396    msgs->minActiveRequests = msgs->maxActiveRequests / 2;
1397    return TRUE;
1398}
1399
1400static int
1401pulse( void * vmsgs )
1402{
1403    const time_t now = time( NULL );
1404    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1405    size_t len;
1406
1407    /* if we froze out a downloaded block because of speed limits,
1408       start listening to the peer again */
1409    if( msgs->notListening && canDownload( msgs ) )
1410    {
1411        msgs->notListening = 0;
1412        tr_peerIoSetIOMode ( msgs->io, EV_READ, 0 );
1413    }
1414
1415    pumpRequestQueue( msgs );
1416
1417    if( !canWrite( msgs ) )
1418    {
1419    }
1420    else if(( len = EVBUFFER_LENGTH( msgs->outMessages ) ))
1421    {
1422        tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1423        msgs->clientSentAnythingAt = now;
1424    }
1425    else if( msgs->peerAskedForFast || msgs->peerAskedFor )
1426    {
1427        if( canUpload( msgs ) )
1428        {
1429            struct peer_request * r;
1430            uint8_t * buf;
1431
1432            r = tr_list_pop_front( &msgs->peerAskedForFast );
1433            if( r == NULL )
1434                r = tr_list_pop_front( &msgs->peerAskedFor);
1435
1436            buf = tr_new( uint8_t, r->length );
1437
1438            if( requestIsValid( msgs, r )
1439                && tr_cpPieceIsComplete( msgs->torrent->completion, r->index )
1440                && !tr_ioRead( msgs->torrent, r->index, r->offset, r->length, buf ) )
1441            {
1442                protocolSendPiece( msgs, r, buf );
1443                peerGotBytes( msgs, r->length );
1444                msgs->clientSentAnythingAt = now;
1445            }
1446
1447            tr_free( buf );
1448            tr_free( r );
1449        }
1450    }
1451    else if( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
1452    {
1453        sendKeepalive( msgs );
1454    }
1455
1456    return TRUE; /* loop forever */
1457}
1458
1459static void
1460didWrite( struct bufferevent * evin UNUSED, void * vmsgs )
1461{
1462    pulse( vmsgs );
1463}
1464
1465static void
1466gotError( struct bufferevent * evbuf UNUSED, short what, void * vmsgs )
1467{
1468    dbgmsg( vmsgs, "libevent got an error! what=%d, errno=%d (%s)",
1469            (int)what, errno, strerror(errno) );
1470    fireGotError( vmsgs );
1471}
1472
1473static void
1474sendBitfield( tr_peermsgs * msgs )
1475{
1476    const tr_bitfield * bitfield = tr_cpPieceBitfield( msgs->torrent->completion );
1477    struct evbuffer * out = msgs->outMessages;
1478
1479    dbgmsg( msgs, "sending peer a bitfield message" );
1480    tr_peerIoWriteUint32( msgs->io, out, sizeof(uint8_t) + bitfield->len );
1481    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1482    tr_peerIoWriteBytes ( msgs->io, out, bitfield->bits, bitfield->len );
1483}
1484
1485/**
1486***
1487**/
1488
1489/* some peers give us error messages if we send
1490   more than this many peers in a single pex message */
1491#define MAX_PEX_DIFFS 200
1492
1493typedef struct
1494{
1495    tr_pex * added;
1496    tr_pex * dropped;
1497    tr_pex * elements;
1498    int addedCount;
1499    int droppedCount;
1500    int elementCount;
1501    int diffCount;
1502}
1503PexDiffs;
1504
1505static void
1506pexAddedCb( void * vpex, void * userData )
1507{
1508    PexDiffs * diffs = (PexDiffs *) userData;
1509    tr_pex * pex = (tr_pex *) vpex;
1510    if( diffs->diffCount < MAX_PEX_DIFFS )
1511    {
1512        diffs->diffCount++;
1513        diffs->added[diffs->addedCount++] = *pex;
1514        diffs->elements[diffs->elementCount++] = *pex;
1515    }
1516}
1517
1518static void
1519pexRemovedCb( void * vpex, void * userData )
1520{
1521    PexDiffs * diffs = (PexDiffs *) userData;
1522    tr_pex * pex = (tr_pex *) vpex;
1523    if( diffs->diffCount < MAX_PEX_DIFFS )
1524    {
1525        diffs->diffCount++;
1526        diffs->dropped[diffs->droppedCount++] = *pex;
1527    }
1528}
1529
1530static void
1531pexElementCb( void * vpex, void * userData )
1532{
1533    PexDiffs * diffs = (PexDiffs *) userData;
1534    tr_pex * pex = (tr_pex *) vpex;
1535    if( diffs->diffCount < MAX_PEX_DIFFS )
1536    {
1537        diffs->diffCount++;
1538        diffs->elements[diffs->elementCount++] = *pex;
1539    }
1540}
1541
1542static void
1543sendPex( tr_peermsgs * msgs )
1544{
1545    if( msgs->peerSupportsPex && tr_torrentIsPexEnabled( msgs->torrent ) )
1546    {
1547        int i;
1548        tr_pex * newPex = NULL;
1549        const int newCount = tr_peerMgrGetPeers( msgs->handle->peerMgr, msgs->torrent->info.hash, &newPex );
1550        PexDiffs diffs;
1551        benc_val_t val, *added, *dropped, *flags;
1552        uint8_t *tmp, *walk;
1553        char * benc;
1554        int bencLen;
1555
1556        /* build the diffs */
1557        diffs.added = tr_new( tr_pex, newCount );
1558        diffs.addedCount = 0;
1559        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1560        diffs.droppedCount = 0;
1561        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1562        diffs.elementCount = 0;
1563        diffs.diffCount = 0;
1564        tr_set_compare( msgs->pex, msgs->pexCount,
1565                        newPex, newCount,
1566                        tr_pexCompare, sizeof(tr_pex),
1567                        pexRemovedCb, pexAddedCb, pexElementCb, &diffs );
1568        dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1569
1570        /* update peer */
1571        tr_free( msgs->pex );
1572        msgs->pex = diffs.elements;
1573        msgs->pexCount = diffs.elementCount;
1574
1575        /* build the pex payload */
1576        tr_bencInit( &val, TYPE_DICT );
1577        tr_bencDictReserve( &val, 3 );
1578
1579        /* "added" */
1580        added = tr_bencDictAdd( &val, "added" );
1581        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1582        for( i=0; i<diffs.addedCount; ++i ) {
1583            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1584            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1585        }
1586        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1587        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
1588
1589        /* "added.f" */
1590        flags = tr_bencDictAdd( &val, "added.f" );
1591        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1592        for( i=0; i<diffs.addedCount; ++i )
1593            *walk++ = diffs.added[i].flags;
1594        assert( ( walk - tmp ) == diffs.addedCount );
1595        tr_bencInitStr( flags, tmp, walk-tmp, FALSE );
1596
1597        /* "dropped" */
1598        dropped = tr_bencDictAdd( &val, "dropped" );
1599        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1600        for( i=0; i<diffs.droppedCount; ++i ) {
1601            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1602            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1603        }
1604        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1605        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
1606
1607        /* write the pex message */
1608        benc = tr_bencSaveMalloc( &val, &bencLen );
1609        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 2*sizeof(uint8_t) + bencLen );
1610        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_LTEP );
1611        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, OUR_LTEP_PEX );
1612        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, benc, bencLen );
1613
1614        /* cleanup */
1615        tr_free( benc );
1616        tr_bencFree( &val );
1617        tr_free( diffs.added );
1618        tr_free( diffs.dropped );
1619        tr_free( newPex );
1620
1621        msgs->clientSentPexAt = time( NULL );
1622    }
1623}
1624
1625static int
1626pexPulse( void * vpeer )
1627{
1628    sendPex( vpeer );
1629    return TRUE;
1630}
1631
1632/**
1633***
1634**/
1635
1636tr_peermsgs*
1637tr_peerMsgsNew( struct tr_torrent * torrent,
1638                struct tr_peer    * info,
1639                tr_delivery_func    func,
1640                void              * userData,
1641                tr_publisher_tag  * setme )
1642{
1643    tr_peermsgs * m;
1644
1645    assert( info != NULL );
1646    assert( info->io != NULL );
1647
1648    m = tr_new0( tr_peermsgs, 1 );
1649    m->publisher = tr_publisherNew( );
1650    m->info = info;
1651    m->handle = torrent->handle;
1652    m->torrent = torrent;
1653    m->io = info->io;
1654    m->info->clientIsChoked = 1;
1655    m->info->peerIsChoked = 1;
1656    m->info->clientIsInterested = 0;
1657    m->info->peerIsInterested = 0;
1658    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1659    m->pulseTimer = tr_timerNew( m->handle, pulse, m, PEER_PULSE_INTERVAL );
1660    m->rateTimer = tr_timerNew( m->handle, ratePulse, m, RATE_PULSE_INTERVAL );
1661    m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL );
1662    m->outMessages = evbuffer_new( );
1663    m->inBlock = evbuffer_new( );
1664    m->peerAllowedPieces = NULL;
1665    m->clientAllowedPieces = NULL;
1666    *setme = tr_publisherSubscribe( m->publisher, func, userData );
1667   
1668    if ( tr_peerIoSupportsFEXT( m->io ) )
1669    {
1670        /* This peer is fastpeer-enabled, generate its allowed set
1671         * (before registering our callbacks) */
1672        if ( !m->peerAllowedPieces ) {
1673            const struct in_addr *peerAddr = tr_peerIoGetAddress( m->io, NULL );
1674           
1675            m->peerAllowedPieces = tr_peerMgrGenerateAllowedSet( MAX_ALLOWED_SET_COUNT,
1676                                                                 m->torrent->info.pieceCount,
1677                                                                 m->torrent->info.hash,
1678                                                                 peerAddr );
1679        }
1680        m->clientAllowedPieces = tr_bitfieldNew( m->torrent->info.pieceCount );
1681    }
1682   
1683    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* error if we don't read or write for 2.5 minutes */
1684    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
1685    tr_peerIoSetIOMode( m->io, EV_READ|EV_WRITE, 0 );
1686    ratePulse( m );
1687
1688    if ( tr_peerIoSupportsLTEP( m->io ) )
1689        sendLtepHandshake( m );
1690
1691    if ( !tr_peerIoSupportsFEXT( m->io ) )
1692        sendBitfield( m );
1693    else {
1694        /* This peer is fastpeer-enabled, send it have-all or have-none if appropriate */
1695        float completion = tr_cpPercentComplete( m->torrent->completion );
1696        if ( completion == 0.0f ) {
1697            sendFastHave( m, 0 );
1698        } else if ( completion == 1.0f ) {
1699            sendFastHave( m, 1 );
1700        } else {
1701            sendBitfield( m );
1702        }
1703        uint32_t peerProgress = m->torrent->info.pieceCount * m->info->progress;
1704       
1705        if ( peerProgress < MAX_ALLOWED_SET_COUNT )
1706            sendFastAllowedSet( m );
1707    }
1708
1709    return m;
1710}
1711
1712void
1713tr_peerMsgsFree( tr_peermsgs* msgs )
1714{
1715    if( msgs != NULL )
1716    {
1717        tr_timerFree( &msgs->pulseTimer );
1718        tr_timerFree( &msgs->rateTimer );
1719        tr_timerFree( &msgs->pexTimer );
1720        tr_publisherFree( &msgs->publisher );
1721        tr_list_free( &msgs->clientWillAskFor, tr_free );
1722        tr_list_free( &msgs->clientAskedFor, tr_free );
1723        tr_list_free( &msgs->peerAskedForFast, tr_free );
1724        tr_list_free( &msgs->peerAskedFor, tr_free );
1725        evbuffer_free( msgs->outMessages );
1726        evbuffer_free( msgs->inBlock );
1727        tr_free( msgs->pex );
1728        msgs->pexCount = 0;
1729        tr_free( msgs );
1730    }
1731}
1732
1733tr_publisher_tag
1734tr_peerMsgsSubscribe( tr_peermsgs       * peer,
1735                      tr_delivery_func    func,
1736                      void              * userData )
1737{
1738    return tr_publisherSubscribe( peer->publisher, func, userData );
1739}
1740
1741void
1742tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
1743                        tr_publisher_tag    tag )
1744{
1745    tr_publisherUnsubscribe( peer->publisher, tag );
1746}
1747
1748int
1749tr_peerMsgIsPieceFastAllowed( const tr_peermsgs * peer,
1750                              uint32_t            index )
1751{
1752    return tr_bitfieldHas( peer->clientAllowedPieces, index );
1753}
Note: See TracBrowser for help on using the repository browser.