source: branches/0.9x/libtransmission/peer-msgs.c @ 3553

Last change on this file since 3553 was 3553, checked in by charles, 15 years ago

fix OpenBSD build error.

  • Property svn:keywords set to Date Rev Author Id
File size: 50.3 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 3553 2007-10-25 14:00:05Z charles $
11 */
12
13#include <assert.h>
14#include <ctype.h>
15#include <errno.h>
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19#include <libgen.h> /* basename */
20
21#include <netinet/in.h> /* struct in_addr */
22
23#include <sys/types.h> /* event.h needs this */
24#include <event.h>
25
26#include "transmission.h"
27#include "bencode.h"
28#include "completion.h"
29#include "inout.h"
30#include "list.h"
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-mgr-private.h"
34#include "peer-msgs.h"
35#include "ratecontrol.h"
36#include "trevent.h"
37#include "utils.h"
38
39/**
40***
41**/
42
43#define MAX_ALLOWED_SET_COUNT   10 /* number of pieces generated for allow-fast,
44                                      threshold for fast-allowing others */
45
46enum
47{
48    BT_CHOKE                = 0,
49    BT_UNCHOKE              = 1,
50    BT_INTERESTED           = 2,
51    BT_NOT_INTERESTED       = 3,
52    BT_HAVE                 = 4,
53    BT_BITFIELD             = 5,
54    BT_REQUEST              = 6,
55    BT_PIECE                = 7,
56    BT_CANCEL               = 8,
57    BT_PORT                 = 9,
58    BT_SUGGEST              = 13,
59    BT_HAVE_ALL             = 14,
60    BT_HAVE_NONE            = 15,
61    BT_REJECT               = 16,
62    BT_ALLOWED_FAST         = 17,
63    BT_LTEP                 = 20,
64
65    LTEP_HANDSHAKE          = 0,
66
67    OUR_LTEP_PEX            = 1,
68
69    MAX_REQUEST_BYTE_COUNT  = (16 * 1024), /* drop requests who want too much */
70
71    KEEPALIVE_INTERVAL_SECS = 90,          /* idle seconds before we send a keepalive */
72    PEX_INTERVAL            = (60 * 1000), /* msec between calls to sendPex() */
73    PEER_PULSE_INTERVAL     = (133),        /* msec between calls to pulse() */
74    RATE_PULSE_INTERVAL     = (333),       /* msec between calls to ratePulse() */
75};
76
77enum
78{
79    AWAITING_BT_LENGTH,
80    AWAITING_BT_MESSAGE,
81    READING_BT_PIECE
82};
83
84struct peer_request
85{
86    uint32_t index;
87    uint32_t offset;
88    uint32_t length;
89    time_t time_requested;
90};
91
92static int
93compareRequest( const void * va, const void * vb )
94{
95    struct peer_request * a = (struct peer_request*) va;
96    struct peer_request * b = (struct peer_request*) vb;
97    if( a->index != b->index ) return a->index - b->index;
98    if( a->offset != b->offset ) return a->offset - b->offset;
99    if( a->length != b->length ) return a->length - b->length;
100    return 0;
101}
102
103struct tr_peermsgs
104{
105    tr_peer * info;
106
107    tr_handle * handle;
108    tr_torrent * torrent;
109    tr_peerIo * io;
110
111    tr_publisher_t * publisher;
112
113    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
114    struct evbuffer * inBlock;     /* the block we're currently receiving */
115    tr_list * peerAskedFor;
116    tr_list * clientAskedFor;
117    tr_list * clientWillAskFor;
118
119    tr_timer * rateTimer;
120    tr_timer * pulseTimer;
121    tr_timer * pexTimer;
122
123    struct peer_request blockToUs; /* the block currntly being sent to us */
124
125    time_t lastReqAddedAt;
126    time_t clientSentPexAt;
127    time_t clientSentAnythingAt;
128
129    unsigned int notListening             : 1;
130    unsigned int peerSupportsPex          : 1;
131    unsigned int clientSentLtepHandshake  : 1;
132    unsigned int peerSentLtepHandshake    : 1;
133   
134    tr_bitfield * clientAllowedPieces;
135    tr_bitfield * peerAllowedPieces;
136   
137    uint8_t state;
138    uint8_t ut_pex_id;
139    uint16_t pexCount;
140    uint32_t incomingMessageLength;
141    uint32_t maxActiveRequests;
142    uint32_t minActiveRequests;
143
144    tr_pex * pex;
145};
146
147/**
148***
149**/
150
151static void
152myDebug( const char * file, int line,
153         const struct tr_peermsgs * msgs,
154         const char * fmt, ... )
155{
156    FILE * fp = tr_getLog( );
157    if( fp != NULL )
158    {
159        va_list args;
160        char timestr[64];
161        struct evbuffer * buf = evbuffer_new( );
162        char * myfile = tr_strdup( file );
163
164        evbuffer_add_printf( buf, "[%s] %s [%s]: ",
165                             tr_getLogTimeStr( timestr, sizeof(timestr) ),
166                             tr_peerIoGetAddrStr( msgs->io ),
167                             msgs->info->client );
168        va_start( args, fmt );
169        evbuffer_add_vprintf( buf, fmt, args );
170        va_end( args );
171        evbuffer_add_printf( buf, " (%s:%d)\n", basename(myfile), line );
172        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
173
174        tr_free( myfile );
175        evbuffer_free( buf );
176    }
177}
178
179#define dbgmsg(msgs, fmt...) myDebug(__FILE__, __LINE__, msgs, ##fmt )
180
181/**
182***
183**/
184
185static void
186protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
187{
188    tr_peerIo * io = msgs->io;
189    struct evbuffer * out = msgs->outMessages;
190
191    dbgmsg( msgs, "requesting %u:%u->%u", req->index, req->offset, req->length );
192    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
193    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
194    tr_peerIoWriteUint32( io, out, req->index );
195    tr_peerIoWriteUint32( io, out, req->offset );
196    tr_peerIoWriteUint32( io, out, req->length );
197}
198
199static void
200protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
201{
202    tr_peerIo * io = msgs->io;
203    struct evbuffer * out = msgs->outMessages;
204
205    dbgmsg( msgs, "cancelling %u:%u->%u", req->index, req->offset, req->length );
206    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
207    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
208    tr_peerIoWriteUint32( io, out, req->index );
209    tr_peerIoWriteUint32( io, out, req->offset );
210    tr_peerIoWriteUint32( io, out, req->length );
211}
212
213static void
214protocolSendHave( tr_peermsgs * msgs, uint32_t index )
215{
216    tr_peerIo * io = msgs->io;
217    struct evbuffer * out = msgs->outMessages;
218
219    dbgmsg( msgs, "sending Have %u", index );
220    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
221    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
222    tr_peerIoWriteUint32( io, out, index );
223}
224
225static void
226protocolSendChoke( tr_peermsgs * msgs, int choke )
227{
228    tr_peerIo * io = msgs->io;
229    struct evbuffer * out = msgs->outMessages;
230
231    dbgmsg( msgs, "sending %s", (choke ? "Choke" : "Unchoke") );
232    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) );
233    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
234}
235
236static void
237protocolSendPiece( tr_peermsgs                * msgs,
238                   const struct peer_request  * r,
239                   const uint8_t              * pieceData )
240{
241    tr_peerIo * io = msgs->io;
242    struct evbuffer * out = evbuffer_new( );
243
244    dbgmsg( msgs, "sending block %u:%u->%u", r->index, r->offset, r->length );
245    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 2*sizeof(uint32_t) + r->length );
246    tr_peerIoWriteUint8 ( io, out, BT_PIECE );
247    tr_peerIoWriteUint32( io, out, r->index );
248    tr_peerIoWriteUint32( io, out, r->offset );
249    tr_peerIoWriteBytes ( io, out, pieceData, r->length );
250    tr_peerIoWriteBuf   ( io, out );
251
252    evbuffer_free( out );
253}
254
255/**
256***  EVENTS
257**/
258
259static const tr_peermsgs_event blankEvent = { 0, 0, 0, 0, 0.0f };
260
261static void
262publish( tr_peermsgs * msgs, tr_peermsgs_event * e )
263{
264    tr_publisherPublish( msgs->publisher, msgs->info, e );
265}
266
267static void
268fireGotError( tr_peermsgs * msgs )
269{
270    tr_peermsgs_event e = blankEvent;
271    e.eventType = TR_PEERMSG_GOT_ERROR;
272    publish( msgs, &e );
273}
274
275static void
276fireNeedReq( tr_peermsgs * msgs )
277{
278    tr_peermsgs_event e = blankEvent;
279    e.eventType = TR_PEERMSG_NEED_REQ;
280    publish( msgs, &e );
281}
282
283static void
284firePeerProgress( tr_peermsgs * msgs )
285{
286    tr_peermsgs_event e = blankEvent;
287    e.eventType = TR_PEERMSG_PEER_PROGRESS;
288    e.progress = msgs->info->progress;
289    publish( msgs, &e );
290}
291
292static void
293fireClientHave( tr_peermsgs * msgs, uint32_t pieceIndex )
294{
295    tr_peermsgs_event e = blankEvent;
296    e.eventType = TR_PEERMSG_CLIENT_HAVE;
297    e.pieceIndex = pieceIndex;
298    publish( msgs, &e );
299}
300
301static void
302fireGotBlock( tr_peermsgs * msgs, uint32_t pieceIndex, uint32_t offset, uint32_t length )
303{
304    tr_peermsgs_event e = blankEvent;
305    e.eventType = TR_PEERMSG_CLIENT_BLOCK;
306    e.pieceIndex = pieceIndex;
307    e.offset = offset;
308    e.length = length;
309    publish( msgs, &e );
310}
311
312static void
313fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
314{
315    tr_peermsgs_event e = blankEvent;
316    e.eventType = TR_PEERMSG_CANCEL;
317    e.pieceIndex = req->index;
318    e.offset = req->offset;
319    e.length = req->length;
320    publish( msgs, &e );
321}
322
323/**
324***  INTEREST
325**/
326
327static int
328isPieceInteresting( const tr_peermsgs   * peer,
329                    int                   piece )
330{
331    const tr_torrent * torrent = peer->torrent;
332    if( torrent->info.pieces[piece].dnd ) /* we don't want it */
333        return FALSE;
334    if( tr_cpPieceIsComplete( torrent->completion, piece ) ) /* we have it */
335        return FALSE;
336    if( !tr_bitfieldHas( peer->info->have, piece ) ) /* peer doesn't have it */
337        return FALSE;
338    if( tr_bitfieldHas( peer->info->banned, piece ) ) /* peer is banned */
339        return FALSE;
340    return TRUE;
341}
342
343/* "interested" means we'll ask for piece data if they unchoke us */
344static int
345isPeerInteresting( const tr_peermsgs * msgs )
346{
347    int i;
348    const tr_torrent * torrent;
349    const tr_bitfield * bitfield;
350    const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
351
352    if( clientIsSeed )
353        return FALSE;
354
355    torrent = msgs->torrent;
356    bitfield = tr_cpPieceBitfield( torrent->completion );
357
358    if( !msgs->info->have )
359        return TRUE;
360
361    assert( bitfield->len == msgs->info->have->len );
362    for( i=0; i<torrent->info.pieceCount; ++i )
363        if( isPieceInteresting( msgs, i ) )
364            return TRUE;
365
366    return FALSE;
367}
368
369static void
370sendInterest( tr_peermsgs * msgs, int weAreInterested )
371{
372    assert( msgs != NULL );
373    assert( weAreInterested==0 || weAreInterested==1 );
374
375    msgs->info->clientIsInterested = weAreInterested;
376    dbgmsg( msgs, "Sending %s",
377            weAreInterested ? "Interested" : "Not Interested");
378
379    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
380    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages,
381                   weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
382}
383
384static void
385updateInterest( tr_peermsgs * msgs )
386{
387    const int i = isPeerInteresting( msgs );
388    if( i != msgs->info->clientIsInterested )
389        sendInterest( msgs, i );
390    if( i )
391        fireNeedReq( msgs );
392}
393
394#define MIN_CHOKE_PERIOD_SEC 10
395
396void
397tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
398{
399    const time_t fibrillationTime = time(NULL) - MIN_CHOKE_PERIOD_SEC;
400
401    assert( msgs != NULL );
402    assert( msgs->info != NULL );
403    assert( choke==0 || choke==1 );
404
405    if( msgs->info->chokeChangedAt > fibrillationTime )
406    {
407        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
408    }
409    else if( msgs->info->peerIsChoked != choke )
410    {
411        msgs->info->peerIsChoked = choke;
412       
413        if( choke )
414        {
415            tr_list * walk;
416            for( walk = msgs->peerAskedFor; walk != NULL; )
417            {
418                tr_list * next = walk->next;
419                /* don't reject a peer's fast allowed requests at choke */
420                struct peer_request *req = walk->data;
421                if ( !tr_bitfieldHas( msgs->peerAllowedPieces, req->index ) )
422                {
423                    tr_list_remove_data( &msgs->peerAskedFor, req );
424                    tr_free( req );
425                }
426                walk = next;
427            }
428        }
429
430        protocolSendChoke( msgs, choke );
431        msgs->info->chokeChangedAt = time( NULL );
432    }
433}
434
435/**
436***
437**/
438
439void
440tr_peerMsgsHave( tr_peermsgs * msgs,
441                 uint32_t      index )
442{
443    protocolSendHave( msgs, index );
444
445    /* since we have more pieces now, we might not be interested in this peer */
446    updateInterest( msgs );
447}
448#if 0
449static void
450sendFastSuggest( tr_peermsgs * msgs,
451                 uint32_t      pieceIndex )
452{
453    dbgmsg( msgs, "w00t SUGGESTing them piece #%d", pieceIndex );
454    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
455    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
456    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
457   
458    updateInterest( msgs );
459}
460#endif
461static void
462sendFastHave( tr_peermsgs * msgs,
463              int           all)
464{
465    dbgmsg( msgs, "w00t telling them we %s pieces", (all ? "HAVE_ALL" : "HAVE_NONE" ) );
466    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
467    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, ( all ? BT_HAVE_ALL : BT_HAVE_NONE ) );
468   
469    updateInterest( msgs );
470}
471
472static void
473sendFastReject( tr_peermsgs * msgs,
474                uint32_t      pieceIndex,
475                uint32_t      offset,
476                uint32_t      length )
477{
478    assert( msgs != NULL );
479    assert( length > 0 );
480   
481    /* reject the request */
482    const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
483    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
484    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_REJECT );
485    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
486    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
487    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
488}
489
490static void
491sendFastAllowed( tr_peermsgs * msgs,
492                 uint32_t      pieceIndex)
493{
494    dbgmsg( msgs, "w00t telling them we ALLOW_FAST piece #%d", pieceIndex );
495    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
496    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_ALLOWED_FAST );
497    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
498}
499
500
501
502static void
503sendFastAllowedSet( tr_peermsgs * msgs )
504{
505    int i = 0;
506    while (i <= msgs->torrent->info.pieceCount )
507    {
508        if ( tr_bitfieldHas( msgs->peerAllowedPieces, i) )
509            sendFastAllowed( msgs, i );
510        i++;
511    }
512}
513
514
515/**
516***
517**/
518
519static int
520reqIsValid( const tr_peermsgs * msgs, uint32_t index, uint32_t offset, uint32_t length )
521{
522    const tr_torrent * tor = msgs->torrent;
523
524    if( index >= (uint32_t) tor->info.pieceCount )
525        return FALSE;
526    if ( (int)offset >= tr_torPieceCountBytes( tor, (int)index ) )
527        return FALSE;
528    if( length > MAX_REQUEST_BYTE_COUNT )
529        return FALSE;
530    if( tr_pieceOffset( tor, index, offset, length ) > tor->info.totalSize )
531        return FALSE;
532
533    return TRUE;
534}
535
536static int
537requestIsValid( const tr_peermsgs * msgs, struct peer_request * req )
538{
539    return reqIsValid( msgs, req->index, req->offset, req->length );
540}
541
542static void
543pumpRequestQueue( tr_peermsgs * msgs )
544{
545    const int max = msgs->maxActiveRequests;
546    const int min = msgs->minActiveRequests;
547    int count = tr_list_size( msgs->clientAskedFor );
548    int sent = 0;
549
550    if( count > min )
551        return;
552    if( msgs->info->clientIsChoked )
553        return;
554
555    while( ( count < max ) && ( msgs->clientWillAskFor != NULL ) )
556    {
557        struct peer_request * req = tr_list_pop_front( &msgs->clientWillAskFor );
558        protocolSendRequest( msgs, req );
559        req->time_requested = msgs->lastReqAddedAt = time( NULL );
560        tr_list_append( &msgs->clientAskedFor, req );
561        ++count;
562        ++sent;
563    }
564
565    if( sent )
566        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
567                sent,
568                tr_list_size(msgs->clientAskedFor),
569                tr_list_size(msgs->clientWillAskFor) );
570
571    if( count < max )
572        fireNeedReq( msgs );
573}
574
575int
576tr_peerMsgsAddRequest( tr_peermsgs * msgs,
577                       uint32_t      index, 
578                       uint32_t      offset, 
579                       uint32_t      length )
580{
581    const int req_max = msgs->maxActiveRequests;
582    struct peer_request tmp, *req;
583
584    assert( msgs != NULL );
585    assert( msgs->torrent != NULL );
586    assert( reqIsValid( msgs, index, offset, length ) );
587
588    /**
589    ***  Reasons to decline the request
590    **/
591
592    /* don't send requests to choked clients */
593    if( msgs->info->clientIsChoked ) {
594        dbgmsg( msgs, "declining request because they're choking us" );
595        return TR_ADDREQ_CLIENT_CHOKED;
596    }
597
598    /* peer doesn't have this piece */
599    if( !tr_bitfieldHas( msgs->info->have, index ) )
600        return TR_ADDREQ_MISSING;
601
602    /* peer's queue is full */
603    if( tr_list_size( msgs->clientWillAskFor ) >= req_max ) {
604        dbgmsg( msgs, "declining request because we're full" );
605        return TR_ADDREQ_FULL;
606    }
607
608    /* have we already asked for this piece? */
609    tmp.index = index;
610    tmp.offset = offset;
611    tmp.length = length;
612    if( tr_list_find( msgs->clientAskedFor, &tmp, compareRequest ) ) {
613        dbgmsg( msgs, "declining because it's a duplicate" );
614        return TR_ADDREQ_DUPLICATE;
615    }
616    if( tr_list_find( msgs->clientWillAskFor, &tmp, compareRequest ) ) {
617        dbgmsg( msgs, "declining because it's a duplicate" );
618        return TR_ADDREQ_DUPLICATE;
619    }
620
621    /**
622    ***  Accept this request
623    **/
624
625    dbgmsg( msgs, "added req for piece %d, offset %d", (int)index, (int)offset );
626    req = tr_new0( struct peer_request, 1 );
627    *req = tmp;
628    tr_list_append( &msgs->clientWillAskFor, req );
629    return TR_ADDREQ_OK;
630}
631
632static void
633tr_peerMsgsCancelAllRequests( tr_peermsgs * msgs )
634{
635    struct peer_request * req;
636
637    while(( req = tr_list_pop_front( &msgs->clientWillAskFor ) ))
638    {
639        fireCancelledReq( msgs, req );
640        tr_free( req );
641    }
642
643    while(( req = tr_list_pop_front( &msgs->clientAskedFor ) ))
644    {
645        fireCancelledReq( msgs, req );
646        protocolSendCancel( msgs, req );
647        tr_free( req );
648    }
649}
650
651void
652tr_peerMsgsCancel( tr_peermsgs * msgs,
653                   uint32_t      pieceIndex,
654                   uint32_t      offset,
655                   uint32_t      length )
656{
657    struct peer_request *req, tmp;
658
659    assert( msgs != NULL );
660    assert( length > 0 );
661
662    /* have we asked the peer for this piece? */
663    tmp.index = pieceIndex;
664    tmp.offset = offset;
665    tmp.length = length;
666
667    /* if it's only in the queue and hasn't been sent yet, free it */
668    if(( req = tr_list_remove( &msgs->clientWillAskFor, &tmp, compareRequest ) ))
669    {
670        fireCancelledReq( msgs, req );
671        tr_free( req );
672    }
673
674    /* if it's already been sent, send a cancel message too */
675    if(( req = tr_list_remove( &msgs->clientAskedFor, &tmp, compareRequest ) ))
676    {
677        protocolSendCancel( msgs, req );
678        fireCancelledReq( msgs, req );
679        tr_free( req );
680    }
681}
682
683/**
684***
685**/
686
687static void
688sendLtepHandshake( tr_peermsgs * msgs )
689{
690    benc_val_t val, *m;
691    char * buf;
692    int len;
693    int pex;
694    const char * v = TR_NAME " " USERAGENT_PREFIX;
695    const int port = tr_getPublicPort( msgs->handle );
696    struct evbuffer * outbuf;
697
698    if( msgs->clientSentLtepHandshake )
699        return;
700
701    outbuf = evbuffer_new( );
702    dbgmsg( msgs, "sending an ltep handshake" );
703    msgs->clientSentLtepHandshake = 1;
704
705    /* decide if we want to advertise pex support */
706    if( !tr_torrentIsPexEnabled( msgs->torrent ) )
707        pex = 0;
708    else if( msgs->peerSentLtepHandshake )
709        pex = msgs->peerSupportsPex ? 1 : 0;
710    else
711        pex = 1;
712
713    tr_bencInit( &val, TYPE_DICT );
714    tr_bencDictReserve( &val, 4 );
715    tr_bencInitInt( tr_bencDictAdd( &val, "e" ), 1 );
716    m  = tr_bencDictAdd( &val, "m" );
717    tr_bencInit( m, TYPE_DICT );
718    if( pex ) {
719        tr_bencDictReserve( m, 1 );
720        tr_bencInitInt( tr_bencDictAdd( m, "ut_pex" ), OUR_LTEP_PEX );
721    }
722    if( port > 0 )
723        tr_bencInitInt( tr_bencDictAdd( &val, "p" ), port );
724    tr_bencInitStr( tr_bencDictAdd( &val, "v" ), v, 0, 1 );
725    buf = tr_bencSaveMalloc( &val,  &len );
726
727    tr_peerIoWriteUint32( msgs->io, outbuf, 2*sizeof(uint8_t) + len );
728    tr_peerIoWriteUint8 ( msgs->io, outbuf, BT_LTEP );
729    tr_peerIoWriteUint8 ( msgs->io, outbuf, LTEP_HANDSHAKE );
730    tr_peerIoWriteBytes ( msgs->io, outbuf, buf, len );
731
732    tr_peerIoWriteBuf( msgs->io, outbuf );
733
734#if 0
735    dbgmsg( msgs, "here is the ltep handshake we sent:" );
736    tr_bencPrint( &val );
737#endif
738
739    /* cleanup */
740    tr_bencFree( &val );
741    tr_free( buf );
742    evbuffer_free( outbuf );
743}
744
745static void
746parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
747{
748    benc_val_t val, * sub;
749    uint8_t * tmp = tr_new( uint8_t, len );
750
751    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
752    msgs->peerSentLtepHandshake = 1;
753
754    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
755        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
756        tr_free( tmp );
757        return;
758    }
759
760#if 0
761    dbgmsg( msgs, "here is the ltep handshake we read:" );
762    tr_bencPrint( &val );
763#endif
764
765    /* does the peer prefer encrypted connections? */
766    sub = tr_bencDictFind( &val, "e" );
767    if( tr_bencIsInt( sub ) )
768        msgs->info->encryption_preference = sub->val.i
769                                      ? ENCRYPTION_PREFERENCE_YES
770                                      : ENCRYPTION_PREFERENCE_NO;
771
772    /* check supported messages for utorrent pex */
773    sub = tr_bencDictFind( &val, "m" );
774    if( tr_bencIsDict( sub ) ) {
775        sub = tr_bencDictFind( sub, "ut_pex" );
776        if( tr_bencIsInt( sub ) ) {
777            msgs->peerSupportsPex = 1;
778            msgs->ut_pex_id = (uint8_t) sub->val.i;
779            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
780        }
781    }
782
783    /* get peer's listening port */
784    sub = tr_bencDictFind( &val, "p" );
785    if( tr_bencIsInt( sub ) ) {
786        msgs->info->port = htons( (uint16_t)sub->val.i );
787        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
788    }
789
790    tr_bencFree( &val );
791    tr_free( tmp );
792}
793
794static void
795parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
796{
797    benc_val_t val, * sub;
798    uint8_t * tmp;
799
800    if( !tr_torrentIsPexEnabled( msgs->torrent ) ) /* no sharing! */
801        return;
802
803    tmp = tr_new( uint8_t, msglen );
804    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
805
806    if( tr_bencLoad( tmp, msglen, &val, NULL ) || !tr_bencIsDict( &val ) ) {
807        dbgmsg( msgs, "GET can't read extended-pex dictionary" );
808        tr_free( tmp );
809        return;
810    }
811
812    sub = tr_bencDictFind( &val, "added" );
813    if( tr_bencIsStr(sub) && ((sub->val.s.i % 6) == 0)) {
814        const int n = sub->val.s.i / 6 ;
815        dbgmsg( msgs, "got %d peers from uT pex", n );
816        tr_peerMgrAddPeers( msgs->handle->peerMgr,
817                            msgs->torrent->info.hash,
818                            TR_PEER_FROM_PEX,
819                            (uint8_t*)sub->val.s.s, n );
820    }
821
822    tr_bencFree( &val );
823    tr_free( tmp );
824}
825
826static void
827sendPex( tr_peermsgs * msgs );
828
829static void
830parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
831{
832    uint8_t ltep_msgid;
833
834    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
835    msglen--;
836
837    if( ltep_msgid == LTEP_HANDSHAKE )
838    {
839        dbgmsg( msgs, "got ltep handshake" );
840        parseLtepHandshake( msgs, msglen, inbuf );
841        sendLtepHandshake( msgs );
842        sendPex( msgs );
843    }
844    else if( ltep_msgid == msgs->ut_pex_id )
845    {
846        dbgmsg( msgs, "got ut pex" );
847        msgs->peerSupportsPex = 1;
848        parseUtPex( msgs, msglen, inbuf );
849    }
850    else
851    {
852        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
853        evbuffer_drain( inbuf, msglen );
854    }
855}
856
857static int
858readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf )
859{
860    uint32_t len;
861    const size_t needlen = sizeof(uint32_t);
862
863    if( EVBUFFER_LENGTH(inbuf) < needlen )
864        return READ_MORE;
865
866    tr_peerIoReadUint32( msgs->io, inbuf, &len );
867
868    if( len == 0 ) /* peer sent us a keepalive message */
869        dbgmsg( msgs, "got KeepAlive" );
870    else {
871        msgs->incomingMessageLength = len;
872        msgs->state = AWAITING_BT_MESSAGE;
873    }
874
875    return READ_AGAIN;
876}
877
878static void
879updatePeerProgress( tr_peermsgs * msgs )
880{
881    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
882    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
883    updateInterest( msgs );
884    firePeerProgress( msgs );
885}
886
887static int
888readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf )
889{
890    uint8_t id;
891    uint32_t ui32;
892    uint32_t msglen = msgs->incomingMessageLength;
893
894    if( EVBUFFER_LENGTH(inbuf) < msglen )
895        return READ_MORE;
896
897    tr_peerIoReadUint8( msgs->io, inbuf, &id );
898    msglen--;
899    dbgmsg( msgs, "got BT id %d, len %d", (int)id, (int)msglen );
900
901    switch( id )
902    {
903        case BT_CHOKE:
904            dbgmsg( msgs, "got Choke" );
905            assert( msglen == 0 );
906            msgs->info->clientIsChoked = 1;
907#if 0
908            tr_list * walk;
909            for( walk = msgs->peerAskedFor; walk != NULL; )
910            {
911                tr_list * next = walk->next;
912                /* We shouldn't reject a peer's fast allowed requests at choke */
913                struct peer_request *req = walk->data;
914                if ( !tr_bitfieldHas( msgs->peerAllowedPieces, req->index ) )
915                {
916                    tr_list_remove_data( &msgs->peerAskedFor, req );
917                    tr_free( req );
918                }
919                walk = next;
920            }
921#endif
922            tr_peerMsgsCancelAllRequests( msgs );
923            break;
924
925        case BT_UNCHOKE:
926            dbgmsg( msgs, "got Unchoke" );
927            assert( msglen == 0 );
928            msgs->info->clientIsChoked = 0;
929            fireNeedReq( msgs );
930            break;
931
932        case BT_INTERESTED:
933            dbgmsg( msgs, "got Interested" );
934            assert( msglen == 0 );
935            msgs->info->peerIsInterested = 1;
936            tr_peerMsgsSetChoke( msgs, 0 );
937            break;
938
939        case BT_NOT_INTERESTED:
940            dbgmsg( msgs, "got Not Interested" );
941            assert( msglen == 0 );
942            msgs->info->peerIsInterested = 0;
943            break;
944
945        case BT_HAVE:
946            assert( msglen == 4 );
947            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
948            tr_bitfieldAdd( msgs->info->have, ui32 );
949            updatePeerProgress( msgs );
950            tr_rcTransferred( msgs->torrent->swarmspeed, msgs->torrent->info.pieceSize );
951            dbgmsg( msgs, "got Have: %u", ui32 );
952            break;
953
954        case BT_BITFIELD: {
955            const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
956            dbgmsg( msgs, "got a bitfield" );
957            assert( msglen == msgs->info->have->len );
958            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
959            updatePeerProgress( msgs );
960            tr_peerMsgsSetChoke( msgs, !clientIsSeed || (msgs->info->progress<1.0) );
961            fireNeedReq( msgs );
962            break;
963        }
964
965        case BT_REQUEST: {
966            struct peer_request * req;
967            assert( msglen == 12 );
968            req = tr_new( struct peer_request, 1 );
969            tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
970            tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
971            tr_peerIoReadUint32( msgs->io, inbuf, &req->length );
972            dbgmsg( msgs, "got Request: %u:%u->%u", req->index, req->offset, req->length );
973           
974            if ( !requestIsValid( msgs, req ) )
975            {
976                dbgmsg( msgs, "BT_REQUEST: invalid request, ignoring" );
977                tr_free( req );
978                break;
979            }
980            /*
981                If we're not choking him -> continue
982                If we're choking him
983                    it doesn't support FPE -> He's deaf, reCHOKE and bail...
984                    it support FPE
985                        If the asked piece is not allowed
986                            OR he's above our threshold
987                            OR we don't have the requested piece -> Reject
988                        Else
989                        Asked piece allowed AND he's below our threshold -> continue...
990             */
991   
992
993            if ( msgs->info->peerIsChoked )
994            {
995                if ( !tr_peerIoSupportsFEXT( msgs->io ) )
996                {
997                    dbgmsg( msgs, "BT_REQUEST: peer is choked, ignoring" );
998                    /* Didn't he get it? */
999                    tr_peerMsgsSetChoke( msgs, 1 );
1000                    tr_free( req );
1001                    break;
1002                }
1003                else
1004                {
1005                    if ( !tr_bitfieldHas( msgs->peerAllowedPieces, req->index )
1006                         || ( msgs->info->progress * (float)msgs->torrent->info.pieceCount) >= MAX_ALLOWED_SET_COUNT
1007                         || !tr_cpPieceIsComplete( msgs->torrent->completion, req->index ) )
1008                    {
1009                        dbgmsg( msgs, "BT_REQUEST: peer requests an un-fastallowed piece" );
1010                        sendFastReject( msgs, req->index, req->offset, req->length );
1011                        tr_free( req );
1012                        break;
1013                    }
1014                    dbgmsg( msgs, "BT_REQUEST: fast allowed piece, accepting request" );
1015                }   
1016            }
1017           
1018            tr_list_append( &msgs->peerAskedFor, req );
1019            break;
1020        }
1021
1022        case BT_CANCEL: {
1023            struct peer_request req;
1024            void * data;
1025            assert( msglen == 12 );
1026            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1027            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1028            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1029            dbgmsg( msgs, "got a Cancel %u:%u->%u", req.index, req.offset, req.length );
1030            data = tr_list_remove( &msgs->peerAskedFor, &req, compareRequest );
1031            tr_free( data );
1032            break;
1033        }
1034
1035        case BT_PIECE: {
1036            dbgmsg( msgs, "got a Piece!" );
1037            assert( msgs->blockToUs.length == 0 );
1038            tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.index );
1039            tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.offset );
1040            msgs->blockToUs.length = msglen - 8;
1041            assert( EVBUFFER_LENGTH(msgs->inBlock) == 0 );
1042            msgs->state = msgs->blockToUs.length ? READING_BT_PIECE : AWAITING_BT_LENGTH;
1043            return READ_AGAIN;
1044            break;
1045        }
1046
1047        case BT_PORT: {
1048            dbgmsg( msgs, "Got a BT_PORT" );
1049            assert( msglen == 2 );
1050            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1051            break;
1052        }
1053       
1054        case BT_SUGGEST: {
1055            /* tiennou TODO */
1056            break;
1057        }
1058           
1059        case BT_HAVE_ALL: {
1060            assert( msglen == 0 );
1061            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1062            memset( msgs->info->have->bits, 1, msgs->info->have->len );
1063            updatePeerProgress( msgs );
1064            break;
1065        }
1066           
1067        case BT_HAVE_NONE: {
1068            assert( msglen == 0 );
1069            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1070            tr_bitfieldClear( msgs->info->have );
1071            updatePeerProgress( msgs );
1072            break;
1073        }
1074           
1075        case BT_REJECT: {
1076            struct peer_request req;
1077            tr_list * node;
1078            assert( msglen == 12 );
1079            dbgmsg( msgs, "Got a BT_REJECT" );
1080            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1081            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1082            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1083            node = tr_list_find( msgs->peerAskedFor, &req, compareRequest );
1084            if( node != NULL ) {
1085                void * data = node->data;
1086                tr_list_remove_data( &msgs->peerAskedFor, data );
1087                tr_free( data );
1088                dbgmsg( msgs, "found the req that peer has rejected... cancelled." );
1089            }
1090            break;
1091        }
1092           
1093        case BT_ALLOWED_FAST: {
1094            assert( msglen == 4 );
1095            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1096            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1097            tr_bitfieldAdd( msgs->clientAllowedPieces, ui32 );
1098            break;
1099        }
1100           
1101        case BT_LTEP:
1102            dbgmsg( msgs, "Got a BT_LTEP" );
1103            parseLtep( msgs, msglen, inbuf );
1104            break;
1105
1106        default:
1107            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1108            tr_peerIoDrain( msgs->io, inbuf, msglen );
1109            assert( 0 );
1110    }
1111
1112    msgs->incomingMessageLength = -1;
1113    msgs->state = AWAITING_BT_LENGTH;
1114    return READ_AGAIN;
1115}
1116
1117static void
1118clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1119{
1120    tr_torrent * tor = msgs->torrent;
1121    tor->activityDate = tr_date( );
1122    tor->downloadedCur += byteCount;
1123    msgs->info->pieceDataActivityDate = time( NULL );
1124    tr_rcTransferred( msgs->info->rcToClient, byteCount );
1125    tr_rcTransferred( tor->download, byteCount );
1126    tr_rcTransferred( tor->handle->download, byteCount );
1127}
1128
1129static void
1130peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1131{
1132    tr_torrent * tor = msgs->torrent;
1133    tor->activityDate = tr_date( );
1134    tor->uploadedCur += byteCount;
1135    msgs->info->pieceDataActivityDate = time( NULL );
1136    tr_rcTransferred( msgs->info->rcToPeer, byteCount );
1137    tr_rcTransferred( tor->upload, byteCount );
1138    tr_rcTransferred( tor->handle->upload, byteCount );
1139}
1140
1141static int
1142canDownload( const tr_peermsgs * msgs )
1143{
1144    tr_torrent * tor = msgs->torrent;
1145
1146    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1147        return !tor->handle->useDownloadLimit || tr_rcCanTransfer( tor->handle->download );
1148
1149    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
1150        return tr_rcCanTransfer( tor->download );
1151
1152    return TRUE;
1153}
1154
1155static void
1156reassignBytesToCorrupt( tr_peermsgs * msgs, uint32_t byteCount )
1157{
1158    tr_torrent * tor = msgs->torrent;
1159
1160    /* increment the `corrupt' field */
1161    tor->corruptCur += byteCount;
1162
1163    /* decrement the `downloaded' field */
1164    if( tor->downloadedCur >= byteCount )
1165        tor->downloadedCur -= byteCount;
1166    else
1167        tor->downloadedCur = 0;
1168}
1169
1170
1171static void
1172gotBadPiece( tr_peermsgs * msgs, uint32_t pieceIndex )
1173{
1174    const uint32_t byteCount = tr_torPieceCountBytes( msgs->torrent, (int)pieceIndex );
1175    reassignBytesToCorrupt( msgs, byteCount );
1176}
1177
1178static void
1179gotUnwantedBlock( tr_peermsgs * msgs,
1180                  uint32_t      index UNUSED,
1181                  uint32_t      offset UNUSED,
1182                  uint32_t      length )
1183{
1184    reassignBytesToCorrupt( msgs, length );
1185}
1186
1187static void
1188addUsToBlamefield( tr_peermsgs * msgs, uint32_t index )
1189{
1190    if( !msgs->info->blame )
1191         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1192    tr_bitfieldAdd( msgs->info->blame, index );
1193}
1194
1195static void
1196gotBlock( tr_peermsgs      * msgs,
1197          struct evbuffer  * inbuf,
1198          uint32_t           index,
1199          uint32_t           offset,
1200          uint32_t           length )
1201{
1202    tr_torrent * tor = msgs->torrent;
1203    const int block = _tr_block( tor, index, offset );
1204    struct peer_request key, *req;
1205
1206    /**
1207    *** Remove the block from our `we asked for this' list
1208    **/
1209
1210    key.index = index;
1211    key.offset = offset;
1212    key.length = length;
1213    req = (struct peer_request*) tr_list_remove( &msgs->clientAskedFor, &key,
1214                                                 compareRequest );
1215    if( req == NULL ) {
1216        gotUnwantedBlock( msgs, index, offset, length );
1217        dbgmsg( msgs, "we didn't ask for this message..." );
1218        return;
1219    }
1220    dbgmsg( msgs, "Got block %u:%u->%u (turnaround time %d secs)", 
1221                     req->index, req->offset, req->length,
1222                     (int)(time(NULL) - req->time_requested) );
1223    tr_free( req );
1224    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1225                  tr_list_size(msgs->clientAskedFor));
1226
1227    /**
1228    *** Error checks
1229    **/
1230
1231    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1232        dbgmsg( msgs, "have this block already..." );
1233        tr_dbg( "have this block already..." );
1234        gotUnwantedBlock( msgs, index, offset, length );
1235        return;
1236    }
1237
1238    if( (int)length != tr_torBlockCountBytes( tor, block ) ) {
1239        dbgmsg( msgs, "block is the wrong length..." );
1240        tr_dbg( "block is the wrong length..." );
1241        gotUnwantedBlock( msgs, index, offset, length );
1242        return;
1243    }
1244
1245    /**
1246    ***  Write the block
1247    **/
1248
1249    if( tr_ioWrite( tor, index, offset, length, EVBUFFER_DATA( inbuf )))
1250        return;
1251
1252    tr_cpBlockAdd( tor->completion, block );
1253
1254    addUsToBlamefield( msgs, index );
1255
1256    fireGotBlock( msgs, index, offset, length );
1257
1258    /**
1259    ***  Handle if this was the last block in the piece
1260    **/
1261
1262    if( tr_cpPieceIsComplete( tor->completion, index ) )
1263    {
1264        if( tr_ioHash( tor, index ) )
1265        {
1266            gotBadPiece( msgs, index );
1267            return;
1268        }
1269
1270        fireClientHave( msgs, index );
1271    }
1272}
1273
1274
1275static ReadState
1276readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf )
1277{
1278    uint32_t inlen;
1279    uint8_t * tmp;
1280
1281    assert( msgs != NULL );
1282    assert( msgs->blockToUs.length > 0 );
1283    assert( inbuf != NULL );
1284    assert( EVBUFFER_LENGTH( inbuf ) > 0 );
1285
1286    /* read from the inbuf into our block buffer */
1287    inlen = MIN( EVBUFFER_LENGTH(inbuf), msgs->blockToUs.length );
1288    tmp = tr_new( uint8_t, inlen );
1289    tr_peerIoReadBytes( msgs->io, inbuf, tmp, inlen );
1290    evbuffer_add( msgs->inBlock, tmp, inlen );
1291
1292    /* update our tables accordingly */
1293    assert( inlen >= msgs->blockToUs.length );
1294    msgs->blockToUs.length -= inlen;
1295    msgs->info->peerSentPieceDataAt = time( NULL );
1296    clientGotBytes( msgs, inlen );
1297
1298    /* if this was the entire block, save it */
1299    if( !msgs->blockToUs.length )
1300    {
1301        dbgmsg( msgs, "got block %u:%u", msgs->blockToUs.index, msgs->blockToUs.offset );
1302        assert( (int)EVBUFFER_LENGTH( msgs->inBlock ) == tr_torBlockCountBytes( msgs->torrent, _tr_block(msgs->torrent,msgs->blockToUs.index, msgs->blockToUs.offset) ) );
1303        gotBlock( msgs, msgs->inBlock,
1304                        msgs->blockToUs.index,
1305                        msgs->blockToUs.offset,
1306                        EVBUFFER_LENGTH( msgs->inBlock ) );
1307        evbuffer_drain( msgs->inBlock, ~0 );
1308        msgs->state = AWAITING_BT_LENGTH;
1309    }
1310
1311    /* cleanup */
1312    tr_free( tmp );
1313    return READ_AGAIN;
1314}
1315
1316static ReadState
1317canRead( struct bufferevent * evin, void * vmsgs )
1318{
1319    ReadState ret;
1320    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1321    struct evbuffer * inbuf = EVBUFFER_INPUT ( evin );
1322
1323    if( !canDownload( msgs ) )
1324    {
1325        msgs->notListening = 1;
1326        tr_peerIoSetIOMode ( msgs->io, 0, EV_READ );
1327        ret = READ_DONE;
1328    }
1329    else switch( msgs->state )
1330    {
1331        case AWAITING_BT_LENGTH:  ret = readBtLength  ( msgs, inbuf ); break;
1332        case AWAITING_BT_MESSAGE: ret = readBtMessage ( msgs, inbuf ); break;
1333        case READING_BT_PIECE:    ret = readBtPiece   ( msgs, inbuf ); break;
1334        default: assert( 0 );
1335    }
1336
1337    return ret;
1338}
1339
1340static void
1341sendKeepalive( tr_peermsgs * msgs )
1342{
1343    dbgmsg( msgs, "sending a keepalive message" );
1344    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1345}
1346
1347/**
1348***
1349**/
1350
1351static int
1352canWrite( const tr_peermsgs * msgs )
1353{
1354    /* don't let our outbuffer get too large */
1355    if( tr_peerIoWriteBytesWaiting( msgs->io ) > 8192 )
1356        return FALSE;
1357
1358    return TRUE;
1359}
1360
1361static int
1362canUpload( const tr_peermsgs * msgs )
1363{
1364    const tr_torrent * tor = msgs->torrent;
1365
1366    if( !canWrite( msgs ) )
1367        return FALSE;
1368
1369    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1370        return !tor->handle->useUploadLimit || tr_rcCanTransfer( tor->handle->upload );
1371
1372    if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
1373        return tr_rcCanTransfer( tor->upload );
1374
1375    return TRUE;
1376}
1377
1378static int
1379ratePulse( void * vmsgs )
1380{
1381    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1382    msgs->info->rateToClient = tr_rcRate( msgs->info->rcToClient );
1383    msgs->info->rateToPeer = tr_rcRate( msgs->info->rcToPeer );
1384    msgs->maxActiveRequests = MIN( 8 + (int)(msgs->info->rateToClient/10), 100 );
1385    msgs->minActiveRequests = msgs->maxActiveRequests / 2;
1386    return TRUE;
1387}
1388
1389static int
1390pulse( void * vmsgs )
1391{
1392    const time_t now = time( NULL );
1393    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1394    size_t len;
1395
1396    /* if we froze out a downloaded block because of speed limits,
1397       start listening to the peer again */
1398    if( msgs->notListening && canDownload( msgs ) )
1399    {
1400        msgs->notListening = 0;
1401        tr_peerIoSetIOMode ( msgs->io, EV_READ, 0 );
1402    }
1403
1404    pumpRequestQueue( msgs );
1405
1406    if( !canWrite( msgs ) )
1407    {
1408    }
1409    else if(( len = EVBUFFER_LENGTH( msgs->outMessages ) ))
1410    {
1411        tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1412        msgs->clientSentAnythingAt = now;
1413    }
1414    else if(( msgs->peerAskedFor ))
1415    {
1416        if( canUpload( msgs ) )
1417        {
1418            struct peer_request * r = tr_list_pop_front( &msgs->peerAskedFor );
1419            uint8_t * buf = tr_new( uint8_t, r->length );
1420
1421            if( requestIsValid( msgs, r )
1422                && tr_cpPieceIsComplete( msgs->torrent->completion, r->index )
1423                && !tr_ioRead( msgs->torrent, r->index, r->offset, r->length, buf ) )
1424            {
1425                protocolSendPiece( msgs, r, buf );
1426                peerGotBytes( msgs, r->length );
1427                msgs->clientSentAnythingAt = now;
1428            }
1429
1430            tr_free( buf );
1431            tr_free( r );
1432        }
1433    }
1434    else if( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
1435    {
1436        sendKeepalive( msgs );
1437    }
1438
1439    return TRUE; /* loop forever */
1440}
1441
1442static void
1443didWrite( struct bufferevent * evin UNUSED, void * vmsgs )
1444{
1445    pulse( vmsgs );
1446}
1447
1448static void
1449gotError( struct bufferevent * evbuf UNUSED, short what, void * vmsgs )
1450{
1451    dbgmsg( vmsgs, "libevent got an error! what=%d, errno=%d (%s)",
1452            (int)what, errno, strerror(errno) );
1453    fireGotError( vmsgs );
1454}
1455
1456static void
1457sendBitfield( tr_peermsgs * msgs )
1458{
1459    const tr_bitfield * bitfield = tr_cpPieceBitfield( msgs->torrent->completion );
1460    struct evbuffer * out = msgs->outMessages;
1461
1462    dbgmsg( msgs, "sending peer a bitfield message" );
1463    tr_peerIoWriteUint32( msgs->io, out, sizeof(uint8_t) + bitfield->len );
1464    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1465    tr_peerIoWriteBytes ( msgs->io, out, bitfield->bits, bitfield->len );
1466}
1467
1468/**
1469***
1470**/
1471
1472/* some peers give us error messages if we send
1473   more than this many peers in a single pex message */
1474#define MAX_PEX_DIFFS 200
1475
1476typedef struct
1477{
1478    tr_pex * added;
1479    tr_pex * dropped;
1480    tr_pex * elements;
1481    int addedCount;
1482    int droppedCount;
1483    int elementCount;
1484    int diffCount;
1485}
1486PexDiffs;
1487
1488static void
1489pexAddedCb( void * vpex, void * userData )
1490{
1491    PexDiffs * diffs = (PexDiffs *) userData;
1492    tr_pex * pex = (tr_pex *) vpex;
1493    if( diffs->diffCount < MAX_PEX_DIFFS )
1494    {
1495        diffs->diffCount++;
1496        diffs->added[diffs->addedCount++] = *pex;
1497        diffs->elements[diffs->elementCount++] = *pex;
1498    }
1499}
1500
1501static void
1502pexRemovedCb( void * vpex, void * userData )
1503{
1504    PexDiffs * diffs = (PexDiffs *) userData;
1505    tr_pex * pex = (tr_pex *) vpex;
1506    if( diffs->diffCount < MAX_PEX_DIFFS )
1507    {
1508        diffs->diffCount++;
1509        diffs->dropped[diffs->droppedCount++] = *pex;
1510    }
1511}
1512
1513static void
1514pexElementCb( void * vpex, void * userData )
1515{
1516    PexDiffs * diffs = (PexDiffs *) userData;
1517    tr_pex * pex = (tr_pex *) vpex;
1518    if( diffs->diffCount < MAX_PEX_DIFFS )
1519    {
1520        diffs->diffCount++;
1521        diffs->elements[diffs->elementCount++] = *pex;
1522    }
1523}
1524
1525static void
1526sendPex( tr_peermsgs * msgs )
1527{
1528    if( msgs->peerSupportsPex && tr_torrentIsPexEnabled( msgs->torrent ) )
1529    {
1530        int i;
1531        tr_pex * newPex = NULL;
1532        const int newCount = tr_peerMgrGetPeers( msgs->handle->peerMgr, msgs->torrent->info.hash, &newPex );
1533        PexDiffs diffs;
1534        benc_val_t val, *added, *dropped, *flags;
1535        uint8_t *tmp, *walk;
1536        char * benc;
1537        int bencLen;
1538
1539        /* build the diffs */
1540        diffs.added = tr_new( tr_pex, newCount );
1541        diffs.addedCount = 0;
1542        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1543        diffs.droppedCount = 0;
1544        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1545        diffs.elementCount = 0;
1546        diffs.diffCount = 0;
1547        tr_set_compare( msgs->pex, msgs->pexCount,
1548                        newPex, newCount,
1549                        tr_pexCompare, sizeof(tr_pex),
1550                        pexRemovedCb, pexAddedCb, pexElementCb, &diffs );
1551        dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1552
1553        /* update peer */
1554        tr_free( msgs->pex );
1555        msgs->pex = diffs.elements;
1556        msgs->pexCount = diffs.elementCount;
1557
1558        /* build the pex payload */
1559        tr_bencInit( &val, TYPE_DICT );
1560        tr_bencDictReserve( &val, 3 );
1561
1562        /* "added" */
1563        added = tr_bencDictAdd( &val, "added" );
1564        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1565        for( i=0; i<diffs.addedCount; ++i ) {
1566            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1567            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1568        }
1569        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1570        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
1571
1572        /* "added.f" */
1573        flags = tr_bencDictAdd( &val, "added.f" );
1574        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1575        for( i=0; i<diffs.addedCount; ++i )
1576            *walk++ = diffs.added[i].flags;
1577        assert( ( walk - tmp ) == diffs.addedCount );
1578        tr_bencInitStr( flags, tmp, walk-tmp, FALSE );
1579
1580        /* "dropped" */
1581        dropped = tr_bencDictAdd( &val, "dropped" );
1582        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1583        for( i=0; i<diffs.droppedCount; ++i ) {
1584            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1585            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1586        }
1587        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1588        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
1589
1590        /* write the pex message */
1591        benc = tr_bencSaveMalloc( &val, &bencLen );
1592        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 2*sizeof(uint8_t) + bencLen );
1593        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_LTEP );
1594        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, OUR_LTEP_PEX );
1595        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, benc, bencLen );
1596
1597        /* cleanup */
1598        tr_free( benc );
1599        tr_bencFree( &val );
1600        tr_free( diffs.added );
1601        tr_free( diffs.dropped );
1602        tr_free( newPex );
1603
1604        msgs->clientSentPexAt = time( NULL );
1605    }
1606}
1607
1608static int
1609pexPulse( void * vpeer )
1610{
1611    sendPex( vpeer );
1612    return TRUE;
1613}
1614
1615/**
1616***
1617**/
1618
1619tr_peermsgs*
1620tr_peerMsgsNew( struct tr_torrent * torrent,
1621                struct tr_peer    * info,
1622                tr_delivery_func    func,
1623                void              * userData,
1624                tr_publisher_tag  * setme )
1625{
1626    tr_peermsgs * m;
1627
1628    assert( info != NULL );
1629    assert( info->io != NULL );
1630
1631    m = tr_new0( tr_peermsgs, 1 );
1632    m->publisher = tr_publisherNew( );
1633    m->info = info;
1634    m->handle = torrent->handle;
1635    m->torrent = torrent;
1636    m->io = info->io;
1637    m->info->clientIsChoked = 1;
1638    m->info->peerIsChoked = 1;
1639    m->info->clientIsInterested = 0;
1640    m->info->peerIsInterested = 0;
1641    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1642    m->pulseTimer = tr_timerNew( m->handle, pulse, m, PEER_PULSE_INTERVAL );
1643    m->rateTimer = tr_timerNew( m->handle, ratePulse, m, RATE_PULSE_INTERVAL );
1644    m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL );
1645    m->outMessages = evbuffer_new( );
1646    m->inBlock = evbuffer_new( );
1647    m->peerAllowedPieces = NULL;
1648    m->clientAllowedPieces = NULL;
1649    *setme = tr_publisherSubscribe( m->publisher, func, userData );
1650   
1651    if ( tr_peerIoSupportsFEXT( m->io ) )
1652    {
1653        /* This peer is fastpeer-enabled, generate its allowed set
1654         * (before registering our callbacks) */
1655        if ( !m->peerAllowedPieces ) {
1656            const struct in_addr *peerAddr = tr_peerIoGetAddress( m->io, NULL );
1657           
1658            m->peerAllowedPieces = tr_peerMgrGenerateAllowedSet( MAX_ALLOWED_SET_COUNT,
1659                                                                 m->torrent->info.pieceCount,
1660                                                                 m->torrent->info.hash,
1661                                                                 peerAddr );
1662        }
1663        m->clientAllowedPieces = tr_bitfieldNew( m->torrent->info.pieceCount );
1664    }
1665   
1666    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* error if we don't read or write for 2.5 minutes */
1667    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
1668    tr_peerIoSetIOMode( m->io, EV_READ|EV_WRITE, 0 );
1669    ratePulse( m );
1670
1671    if ( tr_peerIoSupportsLTEP( m->io ) )
1672        sendLtepHandshake( m );
1673
1674    if ( !tr_peerIoSupportsFEXT( m->io ) )
1675        sendBitfield( m );
1676    else {
1677        /* This peer is fastpeer-enabled, send it have-all or have-none if appropriate */
1678        float completion = tr_cpPercentComplete( m->torrent->completion );
1679        if ( completion == 0.0f ) {
1680            sendFastHave( m, 0 );
1681        } else if ( completion == 1.0f ) {
1682            sendFastHave( m, 1 );
1683        } else {
1684            sendBitfield( m );
1685        }
1686        uint32_t peerProgress = m->torrent->info.pieceCount * m->info->progress;
1687       
1688        if ( peerProgress < MAX_ALLOWED_SET_COUNT )
1689            sendFastAllowedSet( m );
1690    }
1691
1692    return m;
1693}
1694
1695void
1696tr_peerMsgsFree( tr_peermsgs* msgs )
1697{
1698    if( msgs != NULL )
1699    {
1700        tr_timerFree( &msgs->pulseTimer );
1701        tr_timerFree( &msgs->rateTimer );
1702        tr_timerFree( &msgs->pexTimer );
1703        tr_publisherFree( &msgs->publisher );
1704        tr_list_free( &msgs->clientWillAskFor, tr_free );
1705        tr_list_free( &msgs->clientAskedFor, tr_free );
1706        tr_list_free( &msgs->peerAskedFor, tr_free );
1707        evbuffer_free( msgs->outMessages );
1708        evbuffer_free( msgs->inBlock );
1709        tr_free( msgs->pex );
1710        msgs->pexCount = 0;
1711        tr_free( msgs );
1712    }
1713}
1714
1715tr_publisher_tag
1716tr_peerMsgsSubscribe( tr_peermsgs       * peer,
1717                      tr_delivery_func    func,
1718                      void              * userData )
1719{
1720    return tr_publisherSubscribe( peer->publisher, func, userData );
1721}
1722
1723void
1724tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
1725                        tr_publisher_tag    tag )
1726{
1727    tr_publisherUnsubscribe( peer->publisher, tag );
1728}
1729
1730int
1731tr_peerMsgIsPieceFastAllowed( const tr_peermsgs * peer,
1732                              uint32_t            index )
1733{
1734    return tr_bitfieldHas( peer->clientAllowedPieces, index );
1735}
Note: See TracBrowser for help on using the repository browser.