source: branches/0.9x/libtransmission/peer-msgs.c @ 3705

Last change on this file since 3705 was 3705, checked in by charles, 15 years ago

fix bug reported in the forums by "grabman"

  • Property svn:keywords set to Date Rev Author Id
File size: 50.0 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 3705 2007-11-05 03:09:29Z charles $
11 */
12
13#include <assert.h>
14#include <ctype.h>
15#include <errno.h>
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19#include <libgen.h> /* basename */
20
21#include <netinet/in.h> /* struct in_addr */
22
23#include <sys/types.h> /* event.h needs this */
24#include <event.h>
25
26#include "transmission.h"
27#include "bencode.h"
28#include "completion.h"
29#include "inout.h"
30#include "list.h"
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-mgr-private.h"
34#include "peer-msgs.h"
35#include "ratecontrol.h"
36#include "trevent.h"
37#include "utils.h"
38
39/**
40***
41**/
42
43#define MAX_ALLOWED_SET_COUNT   10 /* number of pieces generated for allow-fast,
44                                      threshold for fast-allowing others */
45
46enum
47{
48    BT_CHOKE                = 0,
49    BT_UNCHOKE              = 1,
50    BT_INTERESTED           = 2,
51    BT_NOT_INTERESTED       = 3,
52    BT_HAVE                 = 4,
53    BT_BITFIELD             = 5,
54    BT_REQUEST              = 6,
55    BT_PIECE                = 7,
56    BT_CANCEL               = 8,
57    BT_PORT                 = 9,
58    BT_SUGGEST              = 13,
59    BT_HAVE_ALL             = 14,
60    BT_HAVE_NONE            = 15,
61    BT_REJECT               = 16,
62    BT_ALLOWED_FAST         = 17,
63    BT_LTEP                 = 20,
64
65    LTEP_HANDSHAKE          = 0,
66
67    OUR_LTEP_PEX            = 1,
68
69    MAX_REQUEST_BYTE_COUNT  = (16 * 1024), /* drop requests who want too much */
70
71    KEEPALIVE_INTERVAL_SECS = 90,          /* idle seconds before we send a keepalive */
72    PEX_INTERVAL            = (60 * 1000), /* msec between calls to sendPex() */
73    PEER_PULSE_INTERVAL     = (100),       /* msec between calls to pulse() */
74    RATE_PULSE_INTERVAL     = (333),       /* msec between calls to ratePulse() */
75};
76
77enum
78{
79    AWAITING_BT_LENGTH,
80    AWAITING_BT_MESSAGE,
81    READING_BT_PIECE
82};
83
84struct peer_request
85{
86    uint32_t index;
87    uint32_t offset;
88    uint32_t length;
89    time_t time_requested;
90};
91
92static int
93compareRequest( const void * va, const void * vb )
94{
95    struct peer_request * a = (struct peer_request*) va;
96    struct peer_request * b = (struct peer_request*) vb;
97    if( a->index != b->index ) return a->index - b->index;
98    if( a->offset != b->offset ) return a->offset - b->offset;
99    if( a->length != b->length ) return a->length - b->length;
100    return 0;
101}
102
103struct tr_peermsgs
104{
105    tr_peer * info;
106
107    tr_handle * handle;
108    tr_torrent * torrent;
109    tr_peerIo * io;
110
111    tr_publisher_t * publisher;
112
113    struct evbuffer * outBlock;    /* buffer of all the current piece message */
114    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
115    struct evbuffer * inBlock;     /* the block we're currently receiving */
116    tr_list * peerAskedFor;
117    tr_list * peerAskedForFast;
118    tr_list * clientAskedFor;
119    tr_list * clientWillAskFor;
120
121    tr_timer * rateTimer;
122    tr_timer * pulseTimer;
123    tr_timer * pexTimer;
124
125    struct peer_request blockToUs; /* the block currntly being sent to us */
126
127    time_t lastReqAddedAt;
128    time_t clientSentPexAt;
129    time_t clientSentAnythingAt;
130
131    unsigned int notListening             : 1;
132    unsigned int peerSupportsPex          : 1;
133    unsigned int clientSentLtepHandshake  : 1;
134    unsigned int peerSentLtepHandshake    : 1;
135   
136    tr_bitfield * clientAllowedPieces;
137    tr_bitfield * peerAllowedPieces;
138   
139    uint8_t state;
140    uint8_t ut_pex_id;
141    uint16_t pexCount;
142    uint32_t incomingMessageLength;
143    uint32_t maxActiveRequests;
144    uint32_t minActiveRequests;
145
146    tr_pex * pex;
147};
148
149/**
150***
151**/
152
153static void
154myDebug( const char * file, int line,
155         const struct tr_peermsgs * msgs,
156         const char * fmt, ... )
157{
158    FILE * fp = tr_getLog( );
159    if( fp != NULL )
160    {
161        va_list args;
162        char timestr[64];
163        struct evbuffer * buf = evbuffer_new( );
164        char * myfile = tr_strdup( file );
165
166        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
167                             tr_getLogTimeStr( timestr, sizeof(timestr) ),
168                             msgs->torrent->info.name,
169                             tr_peerIoGetAddrStr( msgs->io ),
170                             msgs->info->client );
171        va_start( args, fmt );
172        evbuffer_add_vprintf( buf, fmt, args );
173        va_end( args );
174        evbuffer_add_printf( buf, " (%s:%d)\n", basename(myfile), line );
175        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
176
177        tr_free( myfile );
178        evbuffer_free( buf );
179    }
180}
181
182#define dbgmsg(msgs, fmt...) myDebug(__FILE__, __LINE__, msgs, ##fmt )
183
184/**
185***
186**/
187
188static void
189protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
190{
191    tr_peerIo * io = msgs->io;
192    struct evbuffer * out = msgs->outMessages;
193
194    dbgmsg( msgs, "requesting %u:%u->%u", req->index, req->offset, req->length );
195    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
196    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
197    tr_peerIoWriteUint32( io, out, req->index );
198    tr_peerIoWriteUint32( io, out, req->offset );
199    tr_peerIoWriteUint32( io, out, req->length );
200}
201
202static void
203protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
204{
205    tr_peerIo * io = msgs->io;
206    struct evbuffer * out = msgs->outMessages;
207
208    dbgmsg( msgs, "cancelling %u:%u->%u", req->index, req->offset, req->length );
209    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
210    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
211    tr_peerIoWriteUint32( io, out, req->index );
212    tr_peerIoWriteUint32( io, out, req->offset );
213    tr_peerIoWriteUint32( io, out, req->length );
214}
215
216static void
217protocolSendHave( tr_peermsgs * msgs, uint32_t index )
218{
219    tr_peerIo * io = msgs->io;
220    struct evbuffer * out = msgs->outMessages;
221
222    dbgmsg( msgs, "sending Have %u", index );
223    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
224    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
225    tr_peerIoWriteUint32( io, out, index );
226}
227
228static void
229protocolSendChoke( tr_peermsgs * msgs, int choke )
230{
231    tr_peerIo * io = msgs->io;
232    struct evbuffer * out = msgs->outMessages;
233
234    dbgmsg( msgs, "sending %s", (choke ? "Choke" : "Unchoke") );
235    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) );
236    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
237}
238
239/**
240***  EVENTS
241**/
242
243static const tr_peermsgs_event blankEvent = { 0, 0, 0, 0, 0.0f };
244
245static void
246publish( tr_peermsgs * msgs, tr_peermsgs_event * e )
247{
248    tr_publisherPublish( msgs->publisher, msgs->info, e );
249}
250
251static void
252fireGotError( tr_peermsgs * msgs )
253{
254    tr_peermsgs_event e = blankEvent;
255    e.eventType = TR_PEERMSG_GOT_ERROR;
256    publish( msgs, &e );
257}
258
259static void
260fireNeedReq( tr_peermsgs * msgs )
261{
262    tr_peermsgs_event e = blankEvent;
263    e.eventType = TR_PEERMSG_NEED_REQ;
264    publish( msgs, &e );
265}
266
267static void
268firePeerProgress( tr_peermsgs * msgs )
269{
270    tr_peermsgs_event e = blankEvent;
271    e.eventType = TR_PEERMSG_PEER_PROGRESS;
272    e.progress = msgs->info->progress;
273    publish( msgs, &e );
274}
275
276static void
277fireClientHave( tr_peermsgs * msgs, uint32_t pieceIndex )
278{
279    tr_peermsgs_event e = blankEvent;
280    e.eventType = TR_PEERMSG_CLIENT_HAVE;
281    e.pieceIndex = pieceIndex;
282    publish( msgs, &e );
283}
284
285static void
286fireGotBlock( tr_peermsgs * msgs, uint32_t pieceIndex, uint32_t offset, uint32_t length )
287{
288    tr_peermsgs_event e = blankEvent;
289    e.eventType = TR_PEERMSG_CLIENT_BLOCK;
290    e.pieceIndex = pieceIndex;
291    e.offset = offset;
292    e.length = length;
293    publish( msgs, &e );
294}
295
296static void
297fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
298{
299    tr_peermsgs_event e = blankEvent;
300    e.eventType = TR_PEERMSG_CANCEL;
301    e.pieceIndex = req->index;
302    e.offset = req->offset;
303    e.length = req->length;
304    publish( msgs, &e );
305}
306
307/**
308***  INTEREST
309**/
310
311static int
312isPieceInteresting( const tr_peermsgs   * peer,
313                    int                   piece )
314{
315    const tr_torrent * torrent = peer->torrent;
316    if( torrent->info.pieces[piece].dnd ) /* we don't want it */
317        return FALSE;
318    if( tr_cpPieceIsComplete( torrent->completion, piece ) ) /* we have it */
319        return FALSE;
320    if( !tr_bitfieldHas( peer->info->have, piece ) ) /* peer doesn't have it */
321        return FALSE;
322    if( tr_bitfieldHas( peer->info->banned, piece ) ) /* peer is banned */
323        return FALSE;
324    return TRUE;
325}
326
327/* "interested" means we'll ask for piece data if they unchoke us */
328static int
329isPeerInteresting( const tr_peermsgs * msgs )
330{
331    int i;
332    const tr_torrent * torrent;
333    const tr_bitfield * bitfield;
334    const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
335
336    if( clientIsSeed )
337        return FALSE;
338
339    torrent = msgs->torrent;
340    bitfield = tr_cpPieceBitfield( torrent->completion );
341
342    if( !msgs->info->have )
343        return TRUE;
344
345    assert( bitfield->len == msgs->info->have->len );
346    for( i=0; i<torrent->info.pieceCount; ++i )
347        if( isPieceInteresting( msgs, i ) )
348            return TRUE;
349
350    return FALSE;
351}
352
353static void
354sendInterest( tr_peermsgs * msgs, int weAreInterested )
355{
356    assert( msgs != NULL );
357    assert( weAreInterested==0 || weAreInterested==1 );
358
359    msgs->info->clientIsInterested = weAreInterested;
360    dbgmsg( msgs, "Sending %s",
361            weAreInterested ? "Interested" : "Not Interested");
362
363    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
364    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages,
365                   weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
366}
367
368static void
369updateInterest( tr_peermsgs * msgs )
370{
371    const int i = isPeerInteresting( msgs );
372    if( i != msgs->info->clientIsInterested )
373        sendInterest( msgs, i );
374    if( i )
375        fireNeedReq( msgs );
376}
377
378#define MIN_CHOKE_PERIOD_SEC 10
379
380static void
381cancelAllRequestsToClientExceptFast( tr_peermsgs * msgs )
382{
383    tr_list_free( &msgs->peerAskedFor, tr_free );
384}
385
386void
387tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
388{
389    const time_t fibrillationTime = time(NULL) - MIN_CHOKE_PERIOD_SEC;
390
391    assert( msgs != NULL );
392    assert( msgs->info != NULL );
393    assert( choke==0 || choke==1 );
394
395    if( msgs->info->chokeChangedAt > fibrillationTime )
396    {
397        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
398    }
399    else if( msgs->info->peerIsChoked != choke )
400    {
401        msgs->info->peerIsChoked = choke;
402        if( choke )
403            cancelAllRequestsToClientExceptFast( msgs );
404        protocolSendChoke( msgs, choke );
405        msgs->info->chokeChangedAt = time( NULL );
406    }
407}
408
409/**
410***
411**/
412
413void
414tr_peerMsgsHave( tr_peermsgs * msgs,
415                 uint32_t      index )
416{
417    protocolSendHave( msgs, index );
418
419    /* since we have more pieces now, we might not be interested in this peer */
420    updateInterest( msgs );
421}
422#if 0
423static void
424sendFastSuggest( tr_peermsgs * msgs,
425                 uint32_t      pieceIndex )
426{
427    dbgmsg( msgs, "w00t SUGGESTing them piece #%d", pieceIndex );
428    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
429    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
430    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
431   
432    updateInterest( msgs );
433}
434#endif
435static void
436sendFastHave( tr_peermsgs * msgs,
437              int           all)
438{
439    dbgmsg( msgs, "w00t telling them we %s pieces", (all ? "HAVE_ALL" : "HAVE_NONE" ) );
440    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
441    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, ( all ? BT_HAVE_ALL : BT_HAVE_NONE ) );
442   
443    updateInterest( msgs );
444}
445
446static void
447sendFastReject( tr_peermsgs * msgs,
448                uint32_t      pieceIndex,
449                uint32_t      offset,
450                uint32_t      length )
451{
452    assert( msgs != NULL );
453
454    if( tr_peerIoSupportsFEXT( msgs->io ) )
455    {
456        const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
457        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
458        tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_REJECT );
459        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
460        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
461        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
462    }
463}
464
465static void
466sendFastAllowed( tr_peermsgs * msgs,
467                 uint32_t      pieceIndex)
468{
469    dbgmsg( msgs, "w00t telling them we ALLOW_FAST piece #%d", pieceIndex );
470    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
471    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_ALLOWED_FAST );
472    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
473}
474
475
476
477static void
478sendFastAllowedSet( tr_peermsgs * msgs )
479{
480    int i = 0;
481    while (i <= msgs->torrent->info.pieceCount )
482    {
483        if ( tr_bitfieldHas( msgs->peerAllowedPieces, i) )
484            sendFastAllowed( msgs, i );
485        i++;
486    }
487}
488
489
490/**
491***
492**/
493
494static int
495reqIsValid( const tr_peermsgs * msgs, uint32_t index, uint32_t offset, uint32_t length )
496{
497    const tr_torrent * tor = msgs->torrent;
498
499    if( index >= (uint32_t) tor->info.pieceCount )
500        return FALSE;
501    if ( (int)offset >= tr_torPieceCountBytes( tor, (int)index ) )
502        return FALSE;
503    if( length > MAX_REQUEST_BYTE_COUNT )
504        return FALSE;
505    if( tr_pieceOffset( tor, index, offset, length ) > tor->info.totalSize )
506        return FALSE;
507
508    return TRUE;
509}
510
511static int
512requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
513{
514    return reqIsValid( msgs, req->index, req->offset, req->length );
515}
516
517static void
518pumpRequestQueue( tr_peermsgs * msgs )
519{
520    const int max = msgs->maxActiveRequests;
521    const int min = msgs->minActiveRequests;
522    int count = tr_list_size( msgs->clientAskedFor );
523    int sent = 0;
524
525    if( count > min )
526        return;
527    if( msgs->info->clientIsChoked )
528        return;
529
530    while( ( count < max ) && ( msgs->clientWillAskFor != NULL ) )
531    {
532        struct peer_request * req = tr_list_pop_front( &msgs->clientWillAskFor );
533        protocolSendRequest( msgs, req );
534        req->time_requested = msgs->lastReqAddedAt = time( NULL );
535        tr_list_append( &msgs->clientAskedFor, req );
536        ++count;
537        ++sent;
538    }
539
540    if( sent )
541        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
542                sent,
543                tr_list_size(msgs->clientAskedFor),
544                tr_list_size(msgs->clientWillAskFor) );
545
546    if( count < max )
547        fireNeedReq( msgs );
548}
549
550int
551tr_peerMsgsAddRequest( tr_peermsgs * msgs,
552                       uint32_t      index, 
553                       uint32_t      offset, 
554                       uint32_t      length )
555{
556    const int req_max = msgs->maxActiveRequests;
557    struct peer_request tmp, *req;
558
559    assert( msgs != NULL );
560    assert( msgs->torrent != NULL );
561    assert( reqIsValid( msgs, index, offset, length ) );
562
563    /**
564    ***  Reasons to decline the request
565    **/
566
567    /* don't send requests to choked clients */
568    if( msgs->info->clientIsChoked ) {
569        dbgmsg( msgs, "declining request because they're choking us" );
570        return TR_ADDREQ_CLIENT_CHOKED;
571    }
572
573    /* peer doesn't have this piece */
574    if( !tr_bitfieldHas( msgs->info->have, index ) )
575        return TR_ADDREQ_MISSING;
576
577    /* peer's queue is full */
578    if( tr_list_size( msgs->clientWillAskFor ) >= req_max ) {
579        dbgmsg( msgs, "declining request because we're full" );
580        return TR_ADDREQ_FULL;
581    }
582
583    /* have we already asked for this piece? */
584    tmp.index = index;
585    tmp.offset = offset;
586    tmp.length = length;
587    if( tr_list_find( msgs->clientAskedFor, &tmp, compareRequest ) ) {
588        dbgmsg( msgs, "declining because it's a duplicate" );
589        return TR_ADDREQ_DUPLICATE;
590    }
591    if( tr_list_find( msgs->clientWillAskFor, &tmp, compareRequest ) ) {
592        dbgmsg( msgs, "declining because it's a duplicate" );
593        return TR_ADDREQ_DUPLICATE;
594    }
595
596    /**
597    ***  Accept this request
598    **/
599
600    dbgmsg( msgs, "added req for piece %d, offset %d", (int)index, (int)offset );
601    req = tr_new0( struct peer_request, 1 );
602    *req = tmp;
603    tr_list_append( &msgs->clientWillAskFor, req );
604    return TR_ADDREQ_OK;
605}
606
607static void
608cancelAllRequestsToPeer( tr_peermsgs * msgs )
609{
610    struct peer_request * req;
611
612    while(( req = tr_list_pop_front( &msgs->clientWillAskFor ) ))
613    {
614        fireCancelledReq( msgs, req );
615        tr_free( req );
616    }
617
618    while(( req = tr_list_pop_front( &msgs->clientAskedFor ) ))
619    {
620        fireCancelledReq( msgs, req );
621        protocolSendCancel( msgs, req );
622        tr_free( req );
623    }
624}
625
626void
627tr_peerMsgsCancel( tr_peermsgs * msgs,
628                   uint32_t      pieceIndex,
629                   uint32_t      offset,
630                   uint32_t      length )
631{
632    struct peer_request *req, tmp;
633
634    assert( msgs != NULL );
635    assert( length > 0 );
636
637    /* have we asked the peer for this piece? */
638    tmp.index = pieceIndex;
639    tmp.offset = offset;
640    tmp.length = length;
641
642    /* if it's only in the queue and hasn't been sent yet, free it */
643    if(( req = tr_list_remove( &msgs->clientWillAskFor, &tmp, compareRequest ) ))
644    {
645        fireCancelledReq( msgs, req );
646        tr_free( req );
647    }
648
649    /* if it's already been sent, send a cancel message too */
650    if(( req = tr_list_remove( &msgs->clientAskedFor, &tmp, compareRequest ) ))
651    {
652        protocolSendCancel( msgs, req );
653        fireCancelledReq( msgs, req );
654        tr_free( req );
655    }
656}
657
658/**
659***
660**/
661
662static void
663sendLtepHandshake( tr_peermsgs * msgs )
664{
665    benc_val_t val, *m;
666    char * buf;
667    int len;
668    int pex;
669    const char * v = TR_NAME " " USERAGENT_PREFIX;
670    const int port = tr_getPublicPort( msgs->handle );
671    struct evbuffer * outbuf;
672
673    if( msgs->clientSentLtepHandshake )
674        return;
675
676    outbuf = evbuffer_new( );
677    dbgmsg( msgs, "sending an ltep handshake" );
678    msgs->clientSentLtepHandshake = 1;
679
680    /* decide if we want to advertise pex support */
681    if( !tr_torrentIsPexEnabled( msgs->torrent ) )
682        pex = 0;
683    else if( msgs->peerSentLtepHandshake )
684        pex = msgs->peerSupportsPex ? 1 : 0;
685    else
686        pex = 1;
687
688    tr_bencInit( &val, TYPE_DICT );
689    tr_bencDictReserve( &val, 4 );
690    tr_bencInitInt( tr_bencDictAdd( &val, "e" ), 1 );
691    m  = tr_bencDictAdd( &val, "m" );
692    tr_bencInit( m, TYPE_DICT );
693    if( pex ) {
694        tr_bencDictReserve( m, 1 );
695        tr_bencInitInt( tr_bencDictAdd( m, "ut_pex" ), OUR_LTEP_PEX );
696    }
697    if( port > 0 )
698        tr_bencInitInt( tr_bencDictAdd( &val, "p" ), port );
699    tr_bencInitStr( tr_bencDictAdd( &val, "v" ), v, 0, 1 );
700    buf = tr_bencSaveMalloc( &val,  &len );
701
702    tr_peerIoWriteUint32( msgs->io, outbuf, 2*sizeof(uint8_t) + len );
703    tr_peerIoWriteUint8 ( msgs->io, outbuf, BT_LTEP );
704    tr_peerIoWriteUint8 ( msgs->io, outbuf, LTEP_HANDSHAKE );
705    tr_peerIoWriteBytes ( msgs->io, outbuf, buf, len );
706
707    tr_peerIoWriteBuf( msgs->io, outbuf );
708
709#if 0
710    dbgmsg( msgs, "here is the ltep handshake we sent:" );
711    tr_bencPrint( &val );
712#endif
713
714    /* cleanup */
715    tr_bencFree( &val );
716    tr_free( buf );
717    evbuffer_free( outbuf );
718}
719
720static void
721parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
722{
723    benc_val_t val, * sub;
724    uint8_t * tmp = tr_new( uint8_t, len );
725
726    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
727    msgs->peerSentLtepHandshake = 1;
728
729    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
730        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
731        tr_free( tmp );
732        return;
733    }
734
735#if 0
736    dbgmsg( msgs, "here is the ltep handshake we read:" );
737    tr_bencPrint( &val );
738#endif
739
740    /* does the peer prefer encrypted connections? */
741    sub = tr_bencDictFind( &val, "e" );
742    if( tr_bencIsInt( sub ) )
743        msgs->info->encryption_preference = sub->val.i
744                                      ? ENCRYPTION_PREFERENCE_YES
745                                      : ENCRYPTION_PREFERENCE_NO;
746
747    /* check supported messages for utorrent pex */
748    sub = tr_bencDictFind( &val, "m" );
749    if( tr_bencIsDict( sub ) ) {
750        sub = tr_bencDictFind( sub, "ut_pex" );
751        if( tr_bencIsInt( sub ) ) {
752            msgs->peerSupportsPex = 1;
753            msgs->ut_pex_id = (uint8_t) sub->val.i;
754            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
755        }
756    }
757
758    /* get peer's listening port */
759    sub = tr_bencDictFind( &val, "p" );
760    if( tr_bencIsInt( sub ) ) {
761        msgs->info->port = htons( (uint16_t)sub->val.i );
762        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
763    }
764
765    tr_bencFree( &val );
766    tr_free( tmp );
767}
768
769static void
770parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
771{
772    benc_val_t val, * sub;
773    uint8_t * tmp;
774
775    if( !tr_torrentIsPexEnabled( msgs->torrent ) ) /* no sharing! */
776        return;
777
778    tmp = tr_new( uint8_t, msglen );
779    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
780
781    if( tr_bencLoad( tmp, msglen, &val, NULL ) || !tr_bencIsDict( &val ) ) {
782        dbgmsg( msgs, "GET can't read extended-pex dictionary" );
783        tr_free( tmp );
784        return;
785    }
786
787    sub = tr_bencDictFind( &val, "added" );
788    if( tr_bencIsStr(sub) && ((sub->val.s.i % 6) == 0)) {
789        const int n = sub->val.s.i / 6 ;
790        dbgmsg( msgs, "got %d peers from uT pex", n );
791        tr_peerMgrAddPeers( msgs->handle->peerMgr,
792                            msgs->torrent->info.hash,
793                            TR_PEER_FROM_PEX,
794                            (uint8_t*)sub->val.s.s, n );
795    }
796
797    tr_bencFree( &val );
798    tr_free( tmp );
799}
800
801static void
802sendPex( tr_peermsgs * msgs );
803
804static void
805parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
806{
807    uint8_t ltep_msgid;
808
809    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
810    msglen--;
811
812    if( ltep_msgid == LTEP_HANDSHAKE )
813    {
814        dbgmsg( msgs, "got ltep handshake" );
815        parseLtepHandshake( msgs, msglen, inbuf );
816        sendLtepHandshake( msgs );
817        sendPex( msgs );
818    }
819    else if( ltep_msgid == msgs->ut_pex_id )
820    {
821        dbgmsg( msgs, "got ut pex" );
822        msgs->peerSupportsPex = 1;
823        parseUtPex( msgs, msglen, inbuf );
824    }
825    else
826    {
827        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
828        evbuffer_drain( inbuf, msglen );
829    }
830}
831
832static int
833readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf )
834{
835    uint32_t len;
836    const size_t needlen = sizeof(uint32_t);
837
838    if( EVBUFFER_LENGTH(inbuf) < needlen )
839        return READ_MORE;
840
841    tr_peerIoReadUint32( msgs->io, inbuf, &len );
842
843    if( len == 0 ) /* peer sent us a keepalive message */
844        dbgmsg( msgs, "got KeepAlive" );
845    else {
846        msgs->incomingMessageLength = len;
847        msgs->state = AWAITING_BT_MESSAGE;
848    }
849
850    return READ_AGAIN;
851}
852
853static void
854updatePeerProgress( tr_peermsgs * msgs )
855{
856    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
857    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
858    updateInterest( msgs );
859    firePeerProgress( msgs );
860}
861
862static int
863clientCanSendFastBlock( const tr_peermsgs * msgs UNUSED )
864{
865    /* FIXME(tiennou): base this on how many blocks we've already sent this
866     * peer, or maybe how many fast blocks per minute we've sent overall,
867     * or maybe how much bandwidth we're already using up w/o fast peers.
868     * I don't know what the Right Thing here is, but
869     * the previous measurement of how many pieces we have is not enough. */
870    return FALSE;
871}
872
873static void
874peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
875{
876    const int reqIsValid = requestIsValid( msgs, req );
877    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
878    const int peerIsChoked = msgs->info->peerIsChoked;
879    const int peerIsFast = tr_peerIoSupportsFEXT( msgs->io );
880    const int pieceIsFast = reqIsValid && tr_bitfieldHas( msgs->peerAllowedPieces, req->index );
881    const int canSendFast = clientCanSendFastBlock( msgs );
882
883    if( !reqIsValid ) /* bad request */
884    {
885        dbgmsg( msgs, "rejecting an invalid request." );
886        sendFastReject( msgs, req->index, req->offset, req->length );
887    }
888    else if( !clientHasPiece ) /* we don't have it */
889    {
890        dbgmsg( msgs, "rejecting request for a piece we don't have." );
891        sendFastReject( msgs, req->index, req->offset, req->length );
892    }
893    else if( peerIsChoked && !peerIsFast ) /* maybe he doesn't know he's choked? */
894    {
895        tr_peerMsgsSetChoke( msgs, 1 );
896        sendFastReject( msgs, req->index, req->offset, req->length );
897    }
898    else if( peerIsChoked && peerIsFast && ( !pieceIsFast || !canSendFast ) )
899    {
900        sendFastReject( msgs, req->index, req->offset, req->length );
901    }
902    else /* YAY */
903    {
904        struct peer_request * tmp = tr_new( struct peer_request, 1 );
905        *tmp = *req;
906        if( peerIsFast && pieceIsFast )
907            tr_list_append( &msgs->peerAskedForFast, tmp );
908        else
909            tr_list_append( &msgs->peerAskedFor, tmp );
910    }
911}
912
913static int
914messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
915{
916    switch( id )
917    {
918        case BT_CHOKE:
919        case BT_UNCHOKE:
920        case BT_INTERESTED:
921        case BT_NOT_INTERESTED:
922        case BT_HAVE_ALL:
923        case BT_HAVE_NONE:
924            return len==1;
925
926        case BT_HAVE:
927        case BT_SUGGEST:
928        case BT_ALLOWED_FAST:
929            return len==5;
930
931        case BT_BITFIELD:
932            return len == (msg->torrent->info.pieceCount+7u)/8u + 1u;
933       
934        case BT_REQUEST:
935        case BT_CANCEL:
936        case BT_REJECT:
937            return len==13;
938
939        case BT_PIECE:
940            return len>9 && len<=16393;
941
942        case BT_PORT:
943            return len==3;
944
945        case BT_LTEP:
946            return len >= 2;
947
948        default:
949            return FALSE;
950    }
951}
952
953
954static int
955readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf )
956{
957    uint8_t id;
958    uint32_t ui32;
959    uint32_t msglen = msgs->incomingMessageLength;
960
961    if( EVBUFFER_LENGTH(inbuf) < msglen )
962        return READ_MORE;
963
964    tr_peerIoReadUint8( msgs->io, inbuf, &id );
965    dbgmsg( msgs, "got BT id %d, len %d", (int)id, (int)msglen );
966
967    if( !messageLengthIsCorrect( msgs, id, msglen ) )
968    {
969        fireGotError( msgs );
970        return READ_DONE;
971    }
972
973    --msglen;
974
975    switch( id )
976    {
977        case BT_CHOKE:
978            dbgmsg( msgs, "got Choke" );
979            msgs->info->clientIsChoked = 1;
980            cancelAllRequestsToPeer( msgs );
981            cancelAllRequestsToClientExceptFast( msgs );
982            break;
983
984        case BT_UNCHOKE:
985            dbgmsg( msgs, "got Unchoke" );
986            msgs->info->clientIsChoked = 0;
987            fireNeedReq( msgs );
988            break;
989
990        case BT_INTERESTED:
991            dbgmsg( msgs, "got Interested" );
992            msgs->info->peerIsInterested = 1;
993            tr_peerMsgsSetChoke( msgs, 0 );
994            break;
995
996        case BT_NOT_INTERESTED:
997            dbgmsg( msgs, "got Not Interested" );
998            msgs->info->peerIsInterested = 0;
999            break;
1000
1001        case BT_HAVE:
1002            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1003            dbgmsg( msgs, "got Have: %u", ui32 );
1004            tr_bitfieldAdd( msgs->info->have, ui32 );
1005            updatePeerProgress( msgs );
1006            tr_rcTransferred( msgs->torrent->swarmspeed, msgs->torrent->info.pieceSize );
1007            break;
1008
1009        case BT_BITFIELD: {
1010            const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
1011            dbgmsg( msgs, "got a bitfield" );
1012            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
1013            updatePeerProgress( msgs );
1014            tr_peerMsgsSetChoke( msgs, !clientIsSeed || (msgs->info->progress<1.0) );
1015            fireNeedReq( msgs );
1016            break;
1017        }
1018
1019        case BT_REQUEST: {
1020            struct peer_request req;
1021            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1022            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1023            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1024            dbgmsg( msgs, "got Request: %u:%u->%u", req.index, req.offset, req.length );
1025            peerMadeRequest( msgs, &req );
1026            break;
1027        }
1028
1029        case BT_CANCEL: {
1030            struct peer_request req;
1031            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1032            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1033            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1034            dbgmsg( msgs, "got a Cancel %u:%u->%u", req.index, req.offset, req.length );
1035            tr_free( tr_list_remove( &msgs->peerAskedForFast, &req, compareRequest ) );
1036            tr_free( tr_list_remove( &msgs->peerAskedFor, &req, compareRequest ) );
1037            break;
1038        }
1039
1040        case BT_PIECE: {
1041            dbgmsg( msgs, "got a Piece!" );
1042            assert( msgs->blockToUs.length == 0 );
1043            tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.index );
1044            tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.offset );
1045            msgs->blockToUs.length = msglen - 8;
1046            assert( EVBUFFER_LENGTH(msgs->inBlock) == 0 );
1047            msgs->state = msgs->blockToUs.length ? READING_BT_PIECE : AWAITING_BT_LENGTH;
1048            return READ_AGAIN;
1049            break;
1050        }
1051
1052        case BT_PORT: {
1053            dbgmsg( msgs, "Got a BT_PORT" );
1054            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1055            break;
1056        }
1057   
1058        case BT_SUGGEST: {
1059            /* FIXME(tiennou) */
1060            uint32_t index;
1061            tr_peerIoReadUint32( msgs->io, inbuf, &index );
1062            break;
1063        }
1064       
1065        case BT_HAVE_ALL:
1066            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1067            tr_bitfieldAddRange( msgs->info->have, 0, msgs->torrent->info.pieceCount );
1068            updatePeerProgress( msgs );
1069            break;
1070       
1071        case BT_HAVE_NONE:
1072            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1073            tr_bitfieldClear( msgs->info->have );
1074            updatePeerProgress( msgs );
1075            break;
1076       
1077        case BT_REJECT: {
1078            struct peer_request req;
1079            dbgmsg( msgs, "Got a BT_REJECT" );
1080            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1081            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1082            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1083            tr_free( tr_list_remove( &msgs->clientAskedFor, &req, compareRequest ) );
1084            break;
1085        }
1086       
1087        case BT_ALLOWED_FAST: {
1088            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1089            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1090            tr_bitfieldAdd( msgs->clientAllowedPieces, ui32 );
1091            break;
1092        }
1093       
1094        case BT_LTEP:
1095            dbgmsg( msgs, "Got a BT_LTEP" );
1096            parseLtep( msgs, msglen, inbuf );
1097            break;
1098
1099        default:
1100            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1101            tr_peerIoDrain( msgs->io, inbuf, msglen );
1102            break;
1103    }
1104
1105    msgs->incomingMessageLength = -1;
1106    msgs->state = AWAITING_BT_LENGTH;
1107    return READ_AGAIN;
1108}
1109
1110static void
1111clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1112{
1113    tr_torrent * tor = msgs->torrent;
1114    tor->activityDate = tr_date( );
1115    tor->downloadedCur += byteCount;
1116    msgs->info->pieceDataActivityDate = time( NULL );
1117    tr_rcTransferred( msgs->info->rcToClient, byteCount );
1118    tr_rcTransferred( tor->download, byteCount );
1119    tr_rcTransferred( tor->handle->download, byteCount );
1120}
1121
1122static void
1123peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1124{
1125    tr_torrent * tor = msgs->torrent;
1126    tor->activityDate = tr_date( );
1127    tor->uploadedCur += byteCount;
1128    msgs->info->pieceDataActivityDate = time( NULL );
1129    tr_rcTransferred( msgs->info->rcToPeer, byteCount );
1130    tr_rcTransferred( tor->upload, byteCount );
1131    tr_rcTransferred( tor->handle->upload, byteCount );
1132}
1133
1134static int
1135canDownload( const tr_peermsgs * msgs )
1136{
1137    tr_torrent * tor = msgs->torrent;
1138
1139    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1140        return !tor->handle->useDownloadLimit || tr_rcCanTransfer( tor->handle->download );
1141
1142    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
1143        return tr_rcCanTransfer( tor->download );
1144
1145    return TRUE;
1146}
1147
1148static void
1149reassignBytesToCorrupt( tr_peermsgs * msgs, uint32_t byteCount )
1150{
1151    tr_torrent * tor = msgs->torrent;
1152
1153    /* increment the `corrupt' field */
1154    tor->corruptCur += byteCount;
1155
1156    /* decrement the `downloaded' field */
1157    if( tor->downloadedCur >= byteCount )
1158        tor->downloadedCur -= byteCount;
1159    else
1160        tor->downloadedCur = 0;
1161}
1162
1163
1164static void
1165gotBadPiece( tr_peermsgs * msgs, uint32_t pieceIndex )
1166{
1167    const uint32_t byteCount = tr_torPieceCountBytes( msgs->torrent, (int)pieceIndex );
1168    reassignBytesToCorrupt( msgs, byteCount );
1169}
1170
1171static void
1172gotUnwantedBlock( tr_peermsgs * msgs,
1173                  uint32_t      index UNUSED,
1174                  uint32_t      offset UNUSED,
1175                  uint32_t      length )
1176{
1177    reassignBytesToCorrupt( msgs, length );
1178}
1179
1180static void
1181addUsToBlamefield( tr_peermsgs * msgs, uint32_t index )
1182{
1183    if( !msgs->info->blame )
1184         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1185    tr_bitfieldAdd( msgs->info->blame, index );
1186}
1187
1188static void
1189gotBlock( tr_peermsgs      * msgs,
1190          struct evbuffer  * inbuf,
1191          uint32_t           index,
1192          uint32_t           offset,
1193          uint32_t           length )
1194{
1195    tr_torrent * tor = msgs->torrent;
1196    const int block = _tr_block( tor, index, offset );
1197    struct peer_request key, *req;
1198
1199    /**
1200    *** Remove the block from our `we asked for this' list
1201    **/
1202
1203    key.index = index;
1204    key.offset = offset;
1205    key.length = length;
1206    req = (struct peer_request*) tr_list_remove( &msgs->clientAskedFor, &key,
1207                                                 compareRequest );
1208    if( req == NULL ) {
1209        gotUnwantedBlock( msgs, index, offset, length );
1210        dbgmsg( msgs, "we didn't ask for this message..." );
1211        return;
1212    }
1213    dbgmsg( msgs, "got block %u:%u->%u (turnaround time %d secs)", 
1214                     req->index, req->offset, req->length,
1215                     (int)(time(NULL) - req->time_requested) );
1216    tr_free( req );
1217    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1218                  tr_list_size(msgs->clientAskedFor));
1219
1220    /**
1221    *** Error checks
1222    **/
1223
1224    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1225        dbgmsg( msgs, "have this block already..." );
1226        tr_dbg( "have this block already..." );
1227        gotUnwantedBlock( msgs, index, offset, length );
1228        return;
1229    }
1230
1231    if( (int)length != tr_torBlockCountBytes( tor, block ) ) {
1232        dbgmsg( msgs, "block is the wrong length..." );
1233        tr_dbg( "block is the wrong length..." );
1234        gotUnwantedBlock( msgs, index, offset, length );
1235        return;
1236    }
1237
1238    /**
1239    ***  Write the block
1240    **/
1241
1242    if( tr_ioWrite( tor, index, offset, length, EVBUFFER_DATA( inbuf )))
1243        return;
1244
1245#warning this sanity check is here to help track down the excess corrupt data bug, but is expensive and should be removed before the next release
1246{
1247    uint8_t * tmp = tr_new( uint8_t, length );
1248    const int val = tr_ioRead( tor, index, offset, length, tmp );
1249    assert( !val );
1250    assert( !memcmp( tmp, EVBUFFER_DATA(inbuf), length ) );
1251    tr_free( tmp );
1252}
1253
1254    tr_cpBlockAdd( tor->completion, block );
1255
1256    addUsToBlamefield( msgs, index );
1257
1258    fireGotBlock( msgs, index, offset, length );
1259
1260    /**
1261    ***  Handle if this was the last block in the piece
1262    **/
1263
1264    if( tr_cpPieceIsComplete( tor->completion, index ) )
1265    {
1266        if( tr_ioHash( tor, index ) )
1267        {
1268            gotBadPiece( msgs, index );
1269            return;
1270        }
1271
1272        fireClientHave( msgs, index );
1273    }
1274}
1275
1276
1277static ReadState
1278readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf )
1279{
1280    uint32_t inlen;
1281    uint8_t * tmp;
1282
1283    assert( msgs != NULL );
1284    assert( msgs->blockToUs.length > 0 );
1285    assert( inbuf != NULL );
1286    assert( EVBUFFER_LENGTH( inbuf ) > 0 );
1287
1288    /* read from the inbuf into our block buffer */
1289    inlen = MIN( EVBUFFER_LENGTH(inbuf), msgs->blockToUs.length );
1290    tmp = tr_new( uint8_t, inlen );
1291    tr_peerIoReadBytes( msgs->io, inbuf, tmp, inlen );
1292    evbuffer_add( msgs->inBlock, tmp, inlen );
1293
1294    /* update our tables accordingly */
1295    assert( inlen >= msgs->blockToUs.length );
1296    msgs->blockToUs.length -= inlen;
1297    msgs->info->peerSentPieceDataAt = time( NULL );
1298    clientGotBytes( msgs, inlen );
1299
1300    /* if this was the entire block, save it */
1301    if( !msgs->blockToUs.length )
1302    {
1303        dbgmsg( msgs, "got block %u:%u", msgs->blockToUs.index, msgs->blockToUs.offset );
1304        assert( (int)EVBUFFER_LENGTH( msgs->inBlock ) == tr_torBlockCountBytes( msgs->torrent, _tr_block(msgs->torrent,msgs->blockToUs.index, msgs->blockToUs.offset) ) );
1305        gotBlock( msgs, msgs->inBlock,
1306                        msgs->blockToUs.index,
1307                        msgs->blockToUs.offset,
1308                        EVBUFFER_LENGTH( msgs->inBlock ) );
1309        evbuffer_drain( msgs->inBlock, ~0 );
1310        msgs->state = AWAITING_BT_LENGTH;
1311    }
1312
1313    /* cleanup */
1314    tr_free( tmp );
1315    return READ_AGAIN;
1316}
1317
1318static ReadState
1319canRead( struct bufferevent * evin, void * vmsgs )
1320{
1321    ReadState ret;
1322    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1323    struct evbuffer * inbuf = EVBUFFER_INPUT ( evin );
1324
1325    if( !canDownload( msgs ) )
1326    {
1327        msgs->notListening = 1;
1328        tr_peerIoSetIOMode ( msgs->io, 0, EV_READ );
1329        ret = READ_DONE;
1330    }
1331    else switch( msgs->state )
1332    {
1333        case AWAITING_BT_LENGTH:  ret = readBtLength  ( msgs, inbuf ); break;
1334        case AWAITING_BT_MESSAGE: ret = readBtMessage ( msgs, inbuf ); break;
1335        case READING_BT_PIECE:    ret = readBtPiece   ( msgs, inbuf ); break;
1336        default: assert( 0 );
1337    }
1338
1339    return ret;
1340}
1341
1342static void
1343sendKeepalive( tr_peermsgs * msgs )
1344{
1345    dbgmsg( msgs, "sending a keepalive message" );
1346    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1347}
1348
1349/**
1350***
1351**/
1352
1353static int
1354canWrite( const tr_peermsgs * msgs )
1355{
1356    /* don't let our outbuffer get too large */
1357    if( tr_peerIoWriteBytesWaiting( msgs->io ) > 4096 )
1358        return FALSE;
1359
1360    return TRUE;
1361}
1362
1363static size_t
1364getUploadMax( const tr_peermsgs * msgs )
1365{
1366    static const size_t maxval = ~0;
1367    const tr_torrent * tor = msgs->torrent;
1368
1369    if( !canWrite( msgs ) )
1370        return 0;
1371
1372    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1373        return tor->handle->useUploadLimit ? tr_rcBytesLeft( tor->handle->upload ) : maxval;
1374
1375    if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
1376        return tr_rcBytesLeft( tor->upload );
1377
1378    return maxval;
1379}
1380
1381static int
1382ratePulse( void * vmsgs )
1383{
1384    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1385    msgs->info->rateToClient = tr_rcRate( msgs->info->rcToClient );
1386    msgs->info->rateToPeer = tr_rcRate( msgs->info->rcToPeer );
1387    msgs->maxActiveRequests = MIN( 8 + (int)(msgs->info->rateToClient/10), 100 );
1388    msgs->minActiveRequests = msgs->maxActiveRequests / 2;
1389    return TRUE;
1390}
1391
1392static struct peer_request*
1393popNextRequest( tr_peermsgs * msgs )
1394{
1395    struct peer_request * ret;
1396    ret = tr_list_pop_front( &msgs->peerAskedForFast );
1397    if( !ret )
1398        ret = tr_list_pop_front( &msgs->peerAskedFor);
1399    return ret;
1400}
1401
1402static int
1403pulse( void * vmsgs )
1404{
1405    const time_t now = time( NULL );
1406    tr_peermsgs * msgs = vmsgs;
1407    struct peer_request * r;
1408    size_t len;
1409
1410    /* if we froze out a downloaded block because of speed limits,
1411       start listening to the peer again */
1412    if( msgs->notListening && canDownload( msgs ) )
1413    {
1414        msgs->notListening = 0;
1415        tr_peerIoSetIOMode ( msgs->io, EV_READ, 0 );
1416    }
1417
1418    pumpRequestQueue( msgs );
1419
1420    if( !canWrite( msgs ) )
1421    {
1422    }
1423    else if(( len = EVBUFFER_LENGTH( msgs->outBlock ) ))
1424    {
1425        const size_t uploadMax = getUploadMax( msgs );
1426        const size_t outlen = MIN( len, uploadMax );
1427        tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen );
1428        evbuffer_drain( msgs->outBlock, outlen );
1429        msgs->clientSentAnythingAt = now;
1430        peerGotBytes( msgs, outlen );
1431        len -= outlen;
1432        dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen, (int)len );
1433        fflush( stdout );
1434    }
1435    else if(( len = EVBUFFER_LENGTH( msgs->outMessages ) ))
1436    {
1437        tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1438        msgs->clientSentAnythingAt = now;
1439    }
1440    else if( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
1441    {
1442        sendKeepalive( msgs );
1443    }
1444
1445    if( !EVBUFFER_LENGTH( msgs->outBlock )
1446        && (( r = popNextRequest( msgs )))
1447        && requestIsValid( msgs, r )
1448        && tr_cpPieceIsComplete( msgs->torrent->completion, r->index ) )
1449    {
1450        uint8_t * buf = tr_new( uint8_t, r->length );
1451
1452        if( !tr_ioRead( msgs->torrent, r->index, r->offset, r->length, buf ) )
1453        {
1454            tr_peerIo * io = msgs->io;
1455            struct evbuffer * out = msgs->outBlock;
1456
1457            dbgmsg( msgs, "sending block %u:%u->%u", r->index, r->offset, r->length );
1458            tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 2*sizeof(uint32_t) + r->length );
1459            tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1460            tr_peerIoWriteUint32( io, out, r->index );
1461            tr_peerIoWriteUint32( io, out, r->offset );
1462            tr_peerIoWriteBytes ( io, out, buf, r->length );
1463        }
1464
1465        tr_free( buf );
1466        tr_free( r );
1467
1468        pulse( msgs ); /* start sending it right away */
1469    }
1470
1471    return TRUE; /* loop forever */
1472}
1473
1474static void
1475didWrite( struct bufferevent * evin UNUSED, void * vmsgs )
1476{
1477    pulse( vmsgs );
1478}
1479
1480static void
1481gotError( struct bufferevent * evbuf UNUSED, short what, void * vmsgs )
1482{
1483    dbgmsg( vmsgs, "libevent got an error! what=%d, errno=%d (%s)",
1484            (int)what, errno, strerror(errno) );
1485    fireGotError( vmsgs );
1486}
1487
1488static void
1489sendBitfield( tr_peermsgs * msgs )
1490{
1491    const tr_bitfield * bitfield = tr_cpPieceBitfield( msgs->torrent->completion );
1492    struct evbuffer * out = msgs->outMessages;
1493
1494    dbgmsg( msgs, "sending peer a bitfield message" );
1495    tr_peerIoWriteUint32( msgs->io, out, sizeof(uint8_t) + bitfield->len );
1496    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1497    tr_peerIoWriteBytes ( msgs->io, out, bitfield->bits, bitfield->len );
1498}
1499
1500/**
1501***
1502**/
1503
1504/* some peers give us error messages if we send
1505   more than this many peers in a single pex message */
1506#define MAX_PEX_DIFFS 200
1507
1508typedef struct
1509{
1510    tr_pex * added;
1511    tr_pex * dropped;
1512    tr_pex * elements;
1513    int addedCount;
1514    int droppedCount;
1515    int elementCount;
1516    int diffCount;
1517}
1518PexDiffs;
1519
1520static void
1521pexAddedCb( void * vpex, void * userData )
1522{
1523    PexDiffs * diffs = (PexDiffs *) userData;
1524    tr_pex * pex = (tr_pex *) vpex;
1525    if( diffs->diffCount < MAX_PEX_DIFFS )
1526    {
1527        diffs->diffCount++;
1528        diffs->added[diffs->addedCount++] = *pex;
1529        diffs->elements[diffs->elementCount++] = *pex;
1530    }
1531}
1532
1533static void
1534pexRemovedCb( void * vpex, void * userData )
1535{
1536    PexDiffs * diffs = (PexDiffs *) userData;
1537    tr_pex * pex = (tr_pex *) vpex;
1538    if( diffs->diffCount < MAX_PEX_DIFFS )
1539    {
1540        diffs->diffCount++;
1541        diffs->dropped[diffs->droppedCount++] = *pex;
1542    }
1543}
1544
1545static void
1546pexElementCb( void * vpex, void * userData )
1547{
1548    PexDiffs * diffs = (PexDiffs *) userData;
1549    tr_pex * pex = (tr_pex *) vpex;
1550    if( diffs->diffCount < MAX_PEX_DIFFS )
1551    {
1552        diffs->diffCount++;
1553        diffs->elements[diffs->elementCount++] = *pex;
1554    }
1555}
1556
1557static void
1558sendPex( tr_peermsgs * msgs )
1559{
1560    if( msgs->peerSupportsPex && tr_torrentIsPexEnabled( msgs->torrent ) )
1561    {
1562        int i;
1563        tr_pex * newPex = NULL;
1564        const int newCount = tr_peerMgrGetPeers( msgs->handle->peerMgr, msgs->torrent->info.hash, &newPex );
1565        PexDiffs diffs;
1566        benc_val_t val, *added, *dropped, *flags;
1567        uint8_t *tmp, *walk;
1568        char * benc;
1569        int bencLen;
1570
1571        /* build the diffs */
1572        diffs.added = tr_new( tr_pex, newCount );
1573        diffs.addedCount = 0;
1574        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1575        diffs.droppedCount = 0;
1576        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1577        diffs.elementCount = 0;
1578        diffs.diffCount = 0;
1579        tr_set_compare( msgs->pex, msgs->pexCount,
1580                        newPex, newCount,
1581                        tr_pexCompare, sizeof(tr_pex),
1582                        pexRemovedCb, pexAddedCb, pexElementCb, &diffs );
1583        dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1584
1585        /* update peer */
1586        tr_free( msgs->pex );
1587        msgs->pex = diffs.elements;
1588        msgs->pexCount = diffs.elementCount;
1589
1590        /* build the pex payload */
1591        tr_bencInit( &val, TYPE_DICT );
1592        tr_bencDictReserve( &val, 3 );
1593
1594        /* "added" */
1595        added = tr_bencDictAdd( &val, "added" );
1596        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1597        for( i=0; i<diffs.addedCount; ++i ) {
1598            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1599            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1600        }
1601        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1602        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
1603
1604        /* "added.f" */
1605        flags = tr_bencDictAdd( &val, "added.f" );
1606        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1607        for( i=0; i<diffs.addedCount; ++i )
1608            *walk++ = diffs.added[i].flags;
1609        assert( ( walk - tmp ) == diffs.addedCount );
1610        tr_bencInitStr( flags, tmp, walk-tmp, FALSE );
1611
1612        /* "dropped" */
1613        dropped = tr_bencDictAdd( &val, "dropped" );
1614        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1615        for( i=0; i<diffs.droppedCount; ++i ) {
1616            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1617            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1618        }
1619        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1620        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
1621
1622        /* write the pex message */
1623        benc = tr_bencSaveMalloc( &val, &bencLen );
1624        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 2*sizeof(uint8_t) + bencLen );
1625        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_LTEP );
1626        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, OUR_LTEP_PEX );
1627        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, benc, bencLen );
1628
1629        /* cleanup */
1630        tr_free( benc );
1631        tr_bencFree( &val );
1632        tr_free( diffs.added );
1633        tr_free( diffs.dropped );
1634        tr_free( newPex );
1635
1636        msgs->clientSentPexAt = time( NULL );
1637    }
1638}
1639
1640static int
1641pexPulse( void * vpeer )
1642{
1643    sendPex( vpeer );
1644    return TRUE;
1645}
1646
1647/**
1648***
1649**/
1650
1651tr_peermsgs*
1652tr_peerMsgsNew( struct tr_torrent * torrent,
1653                struct tr_peer    * info,
1654                tr_delivery_func    func,
1655                void              * userData,
1656                tr_publisher_tag  * setme )
1657{
1658    tr_peermsgs * m;
1659
1660    assert( info != NULL );
1661    assert( info->io != NULL );
1662
1663    m = tr_new0( tr_peermsgs, 1 );
1664    m->publisher = tr_publisherNew( );
1665    m->info = info;
1666    m->handle = torrent->handle;
1667    m->torrent = torrent;
1668    m->io = info->io;
1669    m->info->clientIsChoked = 1;
1670    m->info->peerIsChoked = 1;
1671    m->info->clientIsInterested = 0;
1672    m->info->peerIsInterested = 0;
1673    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1674    m->pulseTimer = tr_timerNew( m->handle, pulse, m, PEER_PULSE_INTERVAL );
1675    m->rateTimer = tr_timerNew( m->handle, ratePulse, m, RATE_PULSE_INTERVAL );
1676    m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL );
1677    m->outMessages = evbuffer_new( );
1678    m->outBlock = evbuffer_new( );
1679    m->inBlock = evbuffer_new( );
1680    m->peerAllowedPieces = NULL;
1681    m->clientAllowedPieces = NULL;
1682    *setme = tr_publisherSubscribe( m->publisher, func, userData );
1683   
1684    if ( tr_peerIoSupportsFEXT( m->io ) )
1685    {
1686        /* This peer is fastpeer-enabled, generate its allowed set
1687         * (before registering our callbacks) */
1688        if ( !m->peerAllowedPieces ) {
1689            const struct in_addr *peerAddr = tr_peerIoGetAddress( m->io, NULL );
1690           
1691            m->peerAllowedPieces = tr_peerMgrGenerateAllowedSet( MAX_ALLOWED_SET_COUNT,
1692                                                                 m->torrent->info.pieceCount,
1693                                                                 m->torrent->info.hash,
1694                                                                 peerAddr );
1695        }
1696        m->clientAllowedPieces = tr_bitfieldNew( m->torrent->info.pieceCount );
1697    }
1698   
1699    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* error if we don't read or write for 2.5 minutes */
1700    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
1701    tr_peerIoSetIOMode( m->io, EV_READ|EV_WRITE, 0 );
1702    ratePulse( m );
1703
1704    if ( tr_peerIoSupportsLTEP( m->io ) )
1705        sendLtepHandshake( m );
1706
1707    if ( !tr_peerIoSupportsFEXT( m->io ) )
1708        sendBitfield( m );
1709    else {
1710        /* This peer is fastpeer-enabled, send it have-all or have-none if appropriate */
1711        uint32_t peerProgress;
1712        float completion = tr_cpPercentComplete( m->torrent->completion );
1713        if ( completion == 0.0f ) {
1714            sendFastHave( m, 0 );
1715        } else if ( completion == 1.0f ) {
1716            sendFastHave( m, 1 );
1717        } else {
1718            sendBitfield( m );
1719        }
1720        peerProgress = m->torrent->info.pieceCount * m->info->progress;
1721       
1722        if ( peerProgress < MAX_ALLOWED_SET_COUNT )
1723            sendFastAllowedSet( m );
1724    }
1725
1726    return m;
1727}
1728
1729void
1730tr_peerMsgsFree( tr_peermsgs* msgs )
1731{
1732    if( msgs != NULL )
1733    {
1734        tr_timerFree( &msgs->pulseTimer );
1735        tr_timerFree( &msgs->rateTimer );
1736        tr_timerFree( &msgs->pexTimer );
1737        tr_publisherFree( &msgs->publisher );
1738        tr_list_free( &msgs->clientWillAskFor, tr_free );
1739        tr_list_free( &msgs->clientAskedFor, tr_free );
1740        tr_list_free( &msgs->peerAskedForFast, tr_free );
1741        tr_list_free( &msgs->peerAskedFor, tr_free );
1742        evbuffer_free( msgs->outMessages );
1743        evbuffer_free( msgs->outBlock );
1744        evbuffer_free( msgs->inBlock );
1745        tr_free( msgs->pex );
1746        msgs->pexCount = 0;
1747        tr_free( msgs );
1748    }
1749}
1750
1751tr_publisher_tag
1752tr_peerMsgsSubscribe( tr_peermsgs       * peer,
1753                      tr_delivery_func    func,
1754                      void              * userData )
1755{
1756    return tr_publisherSubscribe( peer->publisher, func, userData );
1757}
1758
1759void
1760tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
1761                        tr_publisher_tag    tag )
1762{
1763    tr_publisherUnsubscribe( peer->publisher, tag );
1764}
1765
1766int
1767tr_peerMsgIsPieceFastAllowed( const tr_peermsgs * peer,
1768                              uint32_t            index )
1769{
1770    return tr_bitfieldHas( peer->clientAllowedPieces, index );
1771}
Note: See TracBrowser for help on using the repository browser.