source: trunk/libtransmission/peer-msgs.c @ 3349

Last change on this file since 3349 was 3349, checked in by charles, 15 years ago

commit more of tiennou's fastpeers patch

  • Property svn:keywords set to Date Rev Author Id
File size: 50.0 KB
Line 
1/*
2 * This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 3349 2007-10-10 16:39:12Z charles $
11 */
12
13#include <assert.h>
14#include <ctype.h>
15#include <stdio.h>
16#include <stdlib.h>
17#include <string.h>
18
19#include <arpa/inet.h>
20
21#include <sys/types.h> /* event.h needs this */
22#include <event.h>
23
24#include "transmission.h"
25#include "bencode.h"
26#include "completion.h"
27#include "inout.h"
28#include "list.h"
29#include "peer-io.h"
30#include "peer-mgr.h"
31#include "peer-mgr-private.h"
32#include "peer-msgs.h"
33#include "ratecontrol.h"
34#include "trevent.h"
35#include "utils.h"
36
37/**
38***
39**/
40
41#define MAX_ALLOWED_SET_COUNT   10 /* number of pieces generated for allow-fast,
42                                    threshold for fast-allowing others */
43
44enum
45{
46    BT_CHOKE                = 0,
47    BT_UNCHOKE              = 1,
48    BT_INTERESTED           = 2,
49    BT_NOT_INTERESTED       = 3,
50    BT_HAVE                 = 4,
51    BT_BITFIELD             = 5,
52    BT_REQUEST              = 6,
53    BT_PIECE                = 7,
54    BT_CANCEL               = 8,
55    BT_PORT                 = 9,
56    BT_SUGGEST              = 13,
57    BT_HAVE_ALL             = 14,
58    BT_HAVE_NONE            = 15,
59    BT_REJECT               = 16,
60    BT_ALLOWED_FAST         = 17,
61    BT_LTEP                 = 20,
62
63    LTEP_HANDSHAKE          = 0,
64
65    OUR_LTEP_PEX            = 1,
66
67    MAX_REQUEST_BYTE_COUNT  = (16 * 1024), /* drop requests who want too much */
68
69    KEEPALIVE_INTERVAL_SECS = 90,          /* idle seconds before we send a keepalive */
70    PEX_INTERVAL            = (60 * 1000), /* msec between calls to sendPex() */
71    PEER_PULSE_INTERVAL     = (66),        /* msec between calls to pulse() */
72    RATE_PULSE_INTERVAL     = (333),       /* msec between calls to ratePulse() */
73};
74
75enum
76{
77    AWAITING_BT_LENGTH,
78    AWAITING_BT_MESSAGE,
79    READING_BT_PIECE
80};
81
82struct peer_request
83{
84    uint32_t index;
85    uint32_t offset;
86    uint32_t length;
87    time_t time_requested;
88};
89
90static int
91compareRequest( const void * va, const void * vb )
92{
93    struct peer_request * a = (struct peer_request*) va;
94    struct peer_request * b = (struct peer_request*) vb;
95    if( a->index != b->index ) return a->index - b->index;
96    if( a->offset != b->offset ) return a->offset - b->offset;
97    if( a->length != b->length ) return a->length - b->length;
98    return 0;
99}
100
101struct tr_peermsgs
102{
103    tr_peer * info;
104
105    tr_handle * handle;
106    tr_torrent * torrent;
107    tr_peerIo * io;
108
109    tr_publisher_t * publisher;
110
111    struct evbuffer * outMessages; /* buffer of all the non-piece messages */
112    struct evbuffer * inBlock;     /* the block we're currently receiving */
113    tr_list * peerAskedFor;
114    tr_list * clientAskedFor;
115    tr_list * clientWillAskFor;
116
117    tr_timer * rateTimer;
118    tr_timer * pulseTimer;
119    tr_timer * pexTimer;
120
121    struct peer_request blockToUs; /* the block currntly being sent to us */
122
123    time_t lastReqAddedAt;
124    time_t clientSentPexAt;
125    time_t clientSentAnythingAt;
126
127    unsigned int notListening             : 1;
128    unsigned int peerSupportsPex          : 1;
129    unsigned int clientSentLtepHandshake  : 1;
130    unsigned int peerSentLtepHandshake    : 1;
131   
132    tr_bitfield * clientAllowedPieces;
133    tr_bitfield * peerAllowedPieces;
134   
135    uint8_t state;
136    uint8_t ut_pex_id;
137    uint16_t pexCount;
138    uint32_t incomingMessageLength;
139    uint32_t maxActiveRequests;
140    uint32_t minActiveRequests;
141
142    tr_pex * pex;
143};
144
145/**
146***
147**/
148
149static void
150myDebug( const char * file, int line,
151         const struct tr_peermsgs * msgs,
152         const char * fmt, ... )
153{
154    FILE * fp = tr_getLog( );
155    if( fp != NULL )
156    {
157        va_list args;
158        struct evbuffer * buf = evbuffer_new( );
159        char timestr[64];
160        evbuffer_add_printf( buf, "[%s] %s [%s]: ",
161                             tr_getLogTimeStr( timestr, sizeof(timestr) ),
162                             tr_peerIoGetAddrStr( msgs->io ),
163                             msgs->info->client );
164        va_start( args, fmt );
165        evbuffer_add_vprintf( buf, fmt, args );
166        va_end( args );
167        evbuffer_add_printf( buf, " (%s:%d)\n", file, line );
168
169        fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
170        evbuffer_free( buf );
171    }
172}
173
174#define dbgmsg(msgs, fmt...) myDebug(__FILE__, __LINE__, msgs, ##fmt )
175
176/**
177***
178**/
179
180static void
181protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
182{
183    tr_peerIo * io = msgs->io;
184    struct evbuffer * out = msgs->outMessages;
185
186    dbgmsg( msgs, "requesting %u:%u->%u", req->index, req->offset, req->length );
187    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
188    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
189    tr_peerIoWriteUint32( io, out, req->index );
190    tr_peerIoWriteUint32( io, out, req->offset );
191    tr_peerIoWriteUint32( io, out, req->length );
192}
193
194static void
195protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
196{
197    tr_peerIo * io = msgs->io;
198    struct evbuffer * out = msgs->outMessages;
199
200    dbgmsg( msgs, "cancelling %u:%u->%u", req->index, req->offset, req->length );
201    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
202    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
203    tr_peerIoWriteUint32( io, out, req->index );
204    tr_peerIoWriteUint32( io, out, req->offset );
205    tr_peerIoWriteUint32( io, out, req->length );
206}
207
208static void
209protocolSendHave( tr_peermsgs * msgs, uint32_t index )
210{
211    tr_peerIo * io = msgs->io;
212    struct evbuffer * out = msgs->outMessages;
213
214    dbgmsg( msgs, "sending Have %u", index );
215    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
216    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
217    tr_peerIoWriteUint32( io, out, index );
218}
219
220static void
221protocolSendChoke( tr_peermsgs * msgs, int choke )
222{
223    tr_peerIo * io = msgs->io;
224    struct evbuffer * out = msgs->outMessages;
225
226    dbgmsg( msgs, "sending %s", (choke ? "Choke" : "Unchoke") );
227    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) );
228    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
229}
230
231static void
232protocolSendPiece( tr_peermsgs                * msgs,
233                   const struct peer_request  * r,
234                   const uint8_t              * pieceData )
235{
236    tr_peerIo * io = msgs->io;
237    struct evbuffer * out = evbuffer_new( );
238
239    dbgmsg( msgs, "sending block %u:%u->%u", r->index, r->offset, r->length );
240    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 2*sizeof(uint32_t) + r->length );
241    tr_peerIoWriteUint8 ( io, out, BT_PIECE );
242    tr_peerIoWriteUint32( io, out, r->index );
243    tr_peerIoWriteUint32( io, out, r->offset );
244    tr_peerIoWriteBytes ( io, out, pieceData, r->length );
245    tr_peerIoWriteBuf   ( io, out );
246
247    evbuffer_free( out );
248}
249
250/**
251***  EVENTS
252**/
253
254static const tr_peermsgs_event blankEvent = { 0, 0, 0, 0, 0.0f };
255
256static void
257publish( tr_peermsgs * msgs, tr_peermsgs_event * e )
258{
259    tr_publisherPublish( msgs->publisher, msgs->info, e );
260}
261
262static void
263fireGotError( tr_peermsgs * msgs )
264{
265    tr_peermsgs_event e = blankEvent;
266    e.eventType = TR_PEERMSG_GOT_ERROR;
267    publish( msgs, &e );
268}
269
270static void
271fireNeedReq( tr_peermsgs * msgs )
272{
273    tr_peermsgs_event e = blankEvent;
274    e.eventType = TR_PEERMSG_NEED_REQ;
275    publish( msgs, &e );
276}
277
278static void
279firePeerProgress( tr_peermsgs * msgs )
280{
281    tr_peermsgs_event e = blankEvent;
282    e.eventType = TR_PEERMSG_PEER_PROGRESS;
283    e.progress = msgs->info->progress;
284    publish( msgs, &e );
285}
286
287static void
288fireClientHave( tr_peermsgs * msgs, uint32_t pieceIndex )
289{
290    tr_peermsgs_event e = blankEvent;
291    e.eventType = TR_PEERMSG_CLIENT_HAVE;
292    e.pieceIndex = pieceIndex;
293    publish( msgs, &e );
294}
295
296static void
297fireGotBlock( tr_peermsgs * msgs, uint32_t pieceIndex, uint32_t offset, uint32_t length )
298{
299    tr_peermsgs_event e = blankEvent;
300    e.eventType = TR_PEERMSG_CLIENT_BLOCK;
301    e.pieceIndex = pieceIndex;
302    e.offset = offset;
303    e.length = length;
304    publish( msgs, &e );
305}
306
307static void
308fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
309{
310    tr_peermsgs_event e = blankEvent;
311    e.eventType = TR_PEERMSG_CANCEL;
312    e.pieceIndex = req->index;
313    e.offset = req->offset;
314    e.length = req->length;
315    publish( msgs, &e );
316}
317
318/**
319***  INTEREST
320**/
321
322static int
323isPieceInteresting( const tr_peermsgs   * peer,
324                    int                   piece )
325{
326    const tr_torrent * torrent = peer->torrent;
327    if( torrent->info.pieces[piece].dnd ) /* we don't want it */
328        return FALSE;
329    if( tr_cpPieceIsComplete( torrent->completion, piece ) ) /* we have it */
330        return FALSE;
331    if( !tr_bitfieldHas( peer->info->have, piece ) ) /* peer doesn't have it */
332        return FALSE;
333    if( tr_bitfieldHas( peer->info->banned, piece ) ) /* peer is banned */
334        return FALSE;
335    return TRUE;
336}
337
338/* "interested" means we'll ask for piece data if they unchoke us */
339static int
340isPeerInteresting( const tr_peermsgs * msgs )
341{
342    int i;
343    const tr_torrent * torrent;
344    const tr_bitfield * bitfield;
345    const int clientIsSeed =
346        tr_cpGetStatus( msgs->torrent->completion ) != TR_CP_INCOMPLETE;
347
348    if( clientIsSeed )
349        return FALSE;
350
351    torrent = msgs->torrent;
352    bitfield = tr_cpPieceBitfield( torrent->completion );
353
354    if( !msgs->info->have )
355        return TRUE;
356
357    assert( bitfield->len == msgs->info->have->len );
358    for( i=0; i<torrent->info.pieceCount; ++i )
359        if( isPieceInteresting( msgs, i ) )
360            return TRUE;
361
362    return FALSE;
363}
364
365static void
366sendInterest( tr_peermsgs * msgs, int weAreInterested )
367{
368    assert( msgs != NULL );
369    assert( weAreInterested==0 || weAreInterested==1 );
370
371    msgs->info->clientIsInterested = weAreInterested;
372    dbgmsg( msgs, "Sending %s",
373            weAreInterested ? "Interested" : "Not Interested");
374
375    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
376    tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages,
377                   weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
378}
379
380static void
381updateInterest( tr_peermsgs * msgs )
382{
383    const int i = isPeerInteresting( msgs );
384    if( i != msgs->info->clientIsInterested )
385        sendInterest( msgs, i );
386    if( i )
387        fireNeedReq( msgs );
388}
389
390void
391tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
392{
393    assert( msgs != NULL );
394    assert( msgs->info != NULL );
395    assert( choke==0 || choke==1 );
396
397    if( msgs->info->peerIsChoked != choke )
398    {
399        msgs->info->peerIsChoked = choke;
400        tr_list * walk;
401       
402        if( choke )
403            for( walk = msgs->peerAskedFor; walk != NULL; )
404            {
405                tr_list * next = walk->next;
406                /* don't reject a peer's fast allowed requests at choke */
407                struct peer_request *req = walk->data;
408                if ( !tr_bitfieldHas( msgs->peerAllowedPieces, req->index ) )
409                {
410                    tr_list_remove_data( &msgs->peerAskedFor, req );
411                    tr_free( req );
412                }
413                walk = next;
414            }
415
416        protocolSendChoke( msgs, choke );
417        msgs->info->chokeChangedAt = time( NULL );
418    }
419}
420
421/**
422***
423**/
424
425void
426tr_peerMsgsHave( tr_peermsgs * msgs,
427                 uint32_t      index )
428{
429    protocolSendHave( msgs, index );
430
431    /* since we have more pieces now, we might not be interested in this peer */
432    updateInterest( msgs );
433}
434#if 0
435static void
436sendFastSuggest( tr_peermsgs * msgs,
437                 uint32_t      pieceIndex )
438{
439    dbgmsg( msgs, "w00t SUGGESTing them piece #%d", pieceIndex );
440    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
441    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_SUGGEST );
442    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
443   
444    updateInterest( msgs );
445}
446#endif
447static void
448sendFastHave( tr_peermsgs * msgs,
449              int           all)
450{
451    dbgmsg( msgs, "w00t telling them we %s pieces", (all ? "HAVE_ALL" : "HAVE_NONE" ) );
452    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
453    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, ( all ? BT_HAVE_ALL : BT_HAVE_NONE ) );
454   
455    updateInterest( msgs );
456}
457
458static void
459sendFastReject( tr_peermsgs * msgs,
460                uint32_t      pieceIndex,
461                uint32_t      offset,
462                uint32_t      length )
463{
464    assert( msgs != NULL );
465    assert( length > 0 );
466   
467    /* reject the request */
468    const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
469    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
470    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_REJECT );
471    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
472    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
473    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
474}
475
476static void
477sendFastAllowed( tr_peermsgs * msgs,
478                 uint32_t      pieceIndex)
479{
480    dbgmsg( msgs, "w00t telling them we ALLOW_FAST piece #%d", pieceIndex );
481    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
482    tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_ALLOWED_FAST );
483    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
484}
485
486
487
488static void
489sendFastAllowedSet( tr_peermsgs * msgs )
490{
491    int i = 0;
492    while (i <= msgs->torrent->info.pieceCount )
493    {
494        if ( tr_bitfieldHas( msgs->peerAllowedPieces, i) )
495            sendFastAllowed( msgs, i );
496        i++;
497    }
498}
499
500
501/**
502***
503**/
504
505static int
506reqIsValid( const tr_peermsgs * msgs, uint32_t index, uint32_t offset, uint32_t length )
507{
508    const tr_torrent * tor = msgs->torrent;
509
510    if( index >= (uint32_t) tor->info.pieceCount )
511        return FALSE;
512    if ( (int)offset >= tr_torPieceCountBytes( tor, (int)index ) )
513        return FALSE;
514    if( length > MAX_REQUEST_BYTE_COUNT )
515        return FALSE;
516    if( tr_pieceOffset( tor, index, offset, length ) > tor->info.totalSize )
517        return FALSE;
518
519    return TRUE;
520}
521
522static int
523requestIsValid( const tr_peermsgs * msgs, struct peer_request * req )
524{
525    return reqIsValid( msgs, req->index, req->offset, req->length );
526}
527
528static void
529pumpRequestQueue( tr_peermsgs * msgs )
530{
531    const int max = msgs->maxActiveRequests;
532    const int min = msgs->minActiveRequests;
533    int count = tr_list_size( msgs->clientAskedFor );
534    int sent = 0;
535
536    if( count > min )
537        return;
538    if( msgs->info->clientIsChoked )
539        return;
540
541    while( ( count < max ) && ( msgs->clientWillAskFor != NULL ) )
542    {
543        struct peer_request * req = tr_list_pop_front( &msgs->clientWillAskFor );
544        protocolSendRequest( msgs, req );
545        req->time_requested = msgs->lastReqAddedAt = time( NULL );
546        tr_list_append( &msgs->clientAskedFor, req );
547        ++count;
548        ++sent;
549    }
550
551    if( sent )
552        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
553                sent,
554                tr_list_size(msgs->clientAskedFor),
555                tr_list_size(msgs->clientWillAskFor) );
556
557    if( count < max )
558        fireNeedReq( msgs );
559}
560
561int
562tr_peerMsgsAddRequest( tr_peermsgs * msgs,
563                       uint32_t      index, 
564                       uint32_t      offset, 
565                       uint32_t      length )
566{
567    const int req_max = msgs->maxActiveRequests;
568    struct peer_request tmp, *req;
569
570    assert( msgs != NULL );
571    assert( msgs->torrent != NULL );
572    assert( reqIsValid( msgs, index, offset, length ) );
573
574    /**
575    ***  Reasons to decline the request
576    **/
577
578    /* don't send requests to choked clients */
579    if( msgs->info->clientIsChoked ) {
580        dbgmsg( msgs, "declining request because they're choking us" );
581        return TR_ADDREQ_CLIENT_CHOKED;
582    }
583
584    /* peer doesn't have this piece */
585    if( !tr_bitfieldHas( msgs->info->have, index ) )
586        return TR_ADDREQ_MISSING;
587
588    /* peer's queue is full */
589    if( tr_list_size( msgs->clientWillAskFor ) >= req_max ) {
590        dbgmsg( msgs, "declining request because we're full" );
591        return TR_ADDREQ_FULL;
592    }
593
594    /* have we already asked for this piece? */
595    tmp.index = index;
596    tmp.offset = offset;
597    tmp.length = length;
598    if( tr_list_find( msgs->clientAskedFor, &tmp, compareRequest ) ) {
599        dbgmsg( msgs, "declining because it's a duplicate" );
600        return TR_ADDREQ_DUPLICATE;
601    }
602    if( tr_list_find( msgs->clientWillAskFor, &tmp, compareRequest ) ) {
603        dbgmsg( msgs, "declining because it's a duplicate" );
604        return TR_ADDREQ_DUPLICATE;
605    }
606
607    /**
608    ***  Accept this request
609    **/
610
611    dbgmsg( msgs, "added req for piece %d, offset %d", (int)index, (int)offset );
612    req = tr_new0( struct peer_request, 1 );
613    *req = tmp;
614    tr_list_append( &msgs->clientWillAskFor, req );
615    return TR_ADDREQ_OK;
616}
617
618static void
619tr_peerMsgsCancelAllRequests( tr_peermsgs * msgs )
620{
621    struct peer_request * req;
622
623    while(( req = tr_list_pop_front( &msgs->clientWillAskFor ) ))
624    {
625        fireCancelledReq( msgs, req );
626        tr_free( req );
627    }
628
629    while(( req = tr_list_pop_front( &msgs->clientAskedFor ) ))
630    {
631        fireCancelledReq( msgs, req );
632        protocolSendCancel( msgs, req );
633        tr_free( req );
634    }
635}
636
637void
638tr_peerMsgsCancel( tr_peermsgs * msgs,
639                   uint32_t      pieceIndex,
640                   uint32_t      offset,
641                   uint32_t      length )
642{
643    struct peer_request *req, tmp;
644
645    assert( msgs != NULL );
646    assert( length > 0 );
647
648    /* have we asked the peer for this piece? */
649    tmp.index = pieceIndex;
650    tmp.offset = offset;
651    tmp.length = length;
652
653    /* if it's only in the queue and hasn't been sent yet, free it */
654    if(( req = tr_list_remove( &msgs->clientWillAskFor, &tmp, compareRequest ) ))
655    {
656        fireCancelledReq( msgs, req );
657        tr_free( req );
658    }
659
660    /* if it's already been sent, send a cancel message too */
661    if(( req = tr_list_remove( &msgs->clientAskedFor, &tmp, compareRequest ) ))
662    {
663        protocolSendCancel( msgs, req );
664        fireCancelledReq( msgs, req );
665        tr_free( req );
666    }
667}
668
669/**
670***
671**/
672
673static void
674sendLtepHandshake( tr_peermsgs * msgs )
675{
676    benc_val_t val, *m;
677    char * buf;
678    int len;
679    int pex;
680    const char * v = TR_NAME " " USERAGENT_PREFIX;
681    const int port = tr_getPublicPort( msgs->handle );
682    struct evbuffer * outbuf;
683
684    if( msgs->clientSentLtepHandshake )
685        return;
686
687    outbuf = evbuffer_new( );
688    dbgmsg( msgs, "sending an ltep handshake" );
689    msgs->clientSentLtepHandshake = 1;
690
691    /* decide if we want to advertise pex support */
692    if( msgs->torrent->pexDisabled )
693        pex = 0;
694    else if( msgs->peerSentLtepHandshake )
695        pex = msgs->peerSupportsPex ? 1 : 0;
696    else
697        pex = 1;
698
699    tr_bencInit( &val, TYPE_DICT );
700    tr_bencDictReserve( &val, 4 );
701    tr_bencInitInt( tr_bencDictAdd( &val, "e" ), 1 );
702    m  = tr_bencDictAdd( &val, "m" );
703    tr_bencInit( m, TYPE_DICT );
704    if( pex ) {
705        tr_bencDictReserve( m, 1 );
706        tr_bencInitInt( tr_bencDictAdd( m, "ut_pex" ), OUR_LTEP_PEX );
707    }
708    if( port > 0 )
709        tr_bencInitInt( tr_bencDictAdd( &val, "p" ), port );
710    tr_bencInitStr( tr_bencDictAdd( &val, "v" ), v, 0, 1 );
711    buf = tr_bencSaveMalloc( &val,  &len );
712
713    tr_peerIoWriteUint32( msgs->io, outbuf, 2*sizeof(uint8_t) + len );
714    tr_peerIoWriteUint8 ( msgs->io, outbuf, BT_LTEP );
715    tr_peerIoWriteUint8 ( msgs->io, outbuf, LTEP_HANDSHAKE );
716    tr_peerIoWriteBytes ( msgs->io, outbuf, buf, len );
717
718    tr_peerIoWriteBuf( msgs->io, outbuf );
719
720#if 0
721    dbgmsg( msgs, "here is the ltep handshake we sent:" );
722    tr_bencPrint( &val );
723#endif
724
725    /* cleanup */
726    tr_bencFree( &val );
727    tr_free( buf );
728    evbuffer_free( outbuf );
729}
730
731static void
732parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
733{
734    benc_val_t val, * sub;
735    uint8_t * tmp = tr_new( uint8_t, len );
736
737    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
738    msgs->peerSentLtepHandshake = 1;
739
740    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type!=TYPE_DICT ) {
741        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
742        tr_free( tmp );
743        return;
744    }
745
746#if 0
747    dbgmsg( msgs, "here is the ltep handshake we read:" );
748    tr_bencPrint( &val );
749#endif
750
751    /* does the peer prefer encrypted connections? */
752    sub = tr_bencDictFind( &val, "e" );
753    if( tr_bencIsInt( sub ) )
754        msgs->info->encryption_preference = sub->val.i
755                                      ? ENCRYPTION_PREFERENCE_YES
756                                      : ENCRYPTION_PREFERENCE_NO;
757
758    /* check supported messages for utorrent pex */
759    sub = tr_bencDictFind( &val, "m" );
760    if( tr_bencIsDict( sub ) ) {
761        sub = tr_bencDictFind( sub, "ut_pex" );
762        if( tr_bencIsInt( sub ) ) {
763            msgs->peerSupportsPex = 1;
764            msgs->ut_pex_id = (uint8_t) sub->val.i;
765            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
766        }
767    }
768
769    /* get peer's listening port */
770    sub = tr_bencDictFind( &val, "p" );
771    if( tr_bencIsInt( sub ) ) {
772        msgs->info->port = htons( (uint16_t)sub->val.i );
773        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
774    }
775
776    tr_bencFree( &val );
777    tr_free( tmp );
778}
779
780static void
781parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
782{
783    benc_val_t val, * sub;
784    uint8_t * tmp;
785
786    if( msgs->torrent->pexDisabled ) /* no sharing! */
787        return;
788
789    tmp = tr_new( uint8_t, msglen );
790    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
791
792    if( tr_bencLoad( tmp, msglen, &val, NULL ) || !tr_bencIsDict( &val ) ) {
793        dbgmsg( msgs, "GET can't read extended-pex dictionary" );
794        tr_free( tmp );
795        return;
796    }
797
798    sub = tr_bencDictFind( &val, "added" );
799    if( tr_bencIsStr(sub) && ((sub->val.s.i % 6) == 0)) {
800        const int n = sub->val.s.i / 6 ;
801        dbgmsg( msgs, "got %d peers from uT pex", n );
802        tr_peerMgrAddPeers( msgs->handle->peerMgr,
803                            msgs->torrent->info.hash,
804                            TR_PEER_FROM_PEX,
805                            (uint8_t*)sub->val.s.s, n );
806    }
807
808    tr_bencFree( &val );
809    tr_free( tmp );
810}
811
812static void
813sendPex( tr_peermsgs * msgs );
814
815static void
816parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
817{
818    uint8_t ltep_msgid;
819
820    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
821    msglen--;
822
823    if( ltep_msgid == LTEP_HANDSHAKE )
824    {
825        dbgmsg( msgs, "got ltep handshake" );
826        parseLtepHandshake( msgs, msglen, inbuf );
827        sendLtepHandshake( msgs );
828        sendPex( msgs );
829    }
830    else if( ltep_msgid == msgs->ut_pex_id )
831    {
832        dbgmsg( msgs, "got ut pex" );
833        msgs->peerSupportsPex = 1;
834        parseUtPex( msgs, msglen, inbuf );
835    }
836    else
837    {
838        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
839        evbuffer_drain( inbuf, msglen );
840    }
841}
842
843static int
844readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf )
845{
846    uint32_t len;
847    const size_t needlen = sizeof(uint32_t);
848
849    if( EVBUFFER_LENGTH(inbuf) < needlen )
850        return READ_MORE;
851
852    tr_peerIoReadUint32( msgs->io, inbuf, &len );
853
854    if( len == 0 ) /* peer sent us a keepalive message */
855        dbgmsg( msgs, "got KeepAlive" );
856    else {
857        msgs->incomingMessageLength = len;
858        msgs->state = AWAITING_BT_MESSAGE;
859    }
860
861    return READ_AGAIN;
862}
863
864static void
865updatePeerProgress( tr_peermsgs * msgs )
866{
867    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
868    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
869    updateInterest( msgs );
870    firePeerProgress( msgs );
871}
872
873static int
874readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf )
875{
876    uint8_t id;
877    uint32_t ui32;
878    uint32_t msglen = msgs->incomingMessageLength;
879
880    if( EVBUFFER_LENGTH(inbuf) < msglen )
881        return READ_MORE;
882
883    tr_peerIoReadUint8( msgs->io, inbuf, &id );
884    msglen--;
885    dbgmsg( msgs, "got BT id %d, len %d", (int)id, (int)msglen );
886
887    switch( id )
888    {
889        case BT_CHOKE:
890            dbgmsg( msgs, "got Choke" );
891            assert( msglen == 0 );
892            msgs->info->clientIsChoked = 1;
893#if 0
894            tr_list * walk;
895            for( walk = msgs->peerAskedFor; walk != NULL; )
896            {
897                tr_list * next = walk->next;
898                /* We shouldn't reject a peer's fast allowed requests at choke */
899                struct peer_request *req = walk->data;
900                if ( !tr_bitfieldHas( msgs->peerAllowedPieces, req->index ) )
901                {
902                    tr_list_remove_data( &msgs->peerAskedFor, req );
903                    tr_free( req );
904                }
905                walk = next;
906            }
907#endif
908            tr_peerMsgsCancelAllRequests( msgs );
909            break;
910
911        case BT_UNCHOKE:
912            dbgmsg( msgs, "got Unchoke" );
913            assert( msglen == 0 );
914            msgs->info->clientIsChoked = 0;
915            fireNeedReq( msgs );
916            break;
917
918        case BT_INTERESTED:
919            dbgmsg( msgs, "got Interested" );
920            assert( msglen == 0 );
921            msgs->info->peerIsInterested = 1;
922            tr_peerMsgsSetChoke( msgs, 0 );
923            break;
924
925        case BT_NOT_INTERESTED:
926            dbgmsg( msgs, "got Not Interested" );
927            assert( msglen == 0 );
928            msgs->info->peerIsInterested = 0;
929            break;
930
931        case BT_HAVE:
932            assert( msglen == 4 );
933            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
934            tr_bitfieldAdd( msgs->info->have, ui32 );
935            updatePeerProgress( msgs );
936            dbgmsg( msgs, "got Have: %u", ui32 );
937            break;
938
939        case BT_BITFIELD: {
940            const int clientIsSeed = tr_cpGetStatus( msgs->torrent->completion ) != TR_CP_INCOMPLETE;
941            assert( msglen == msgs->info->have->len );
942            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
943            updatePeerProgress( msgs );
944            tr_peerMsgsSetChoke( msgs, !clientIsSeed || (msgs->info->progress<1.0) );
945            fireNeedReq( msgs );
946            break;
947        }
948
949        case BT_REQUEST: {
950            struct peer_request * req;
951            assert( msglen == 12 );
952            req = tr_new( struct peer_request, 1 );
953            tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
954            tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
955            tr_peerIoReadUint32( msgs->io, inbuf, &req->length );
956            dbgmsg( msgs, "got Request: %u:%u->%u", req->index, req->offset, req->length );
957           
958            if ( !requestIsValid( msgs, req ) )
959            {
960                dbgmsg( msgs, "BT_REQUEST: invalid request, ignoring" );
961                tr_free( req );
962                break;
963            }
964            /*
965                If we're not choking him -> continue
966                If we're choking him
967                    it doesn't support FPE -> He's deaf, reCHOKE and bail...
968                    it support FPE
969                        If the asked piece is not allowed
970                            OR he's above our threshold
971                            OR we don't have the requested piece -> Reject
972                        Else
973                        Asked piece allowed AND he's below our threshold -> continue...
974             */
975   
976
977            if ( msgs->info->peerIsChoked )
978            {
979                if ( !tr_peerIoSupportsFEXT( msgs->io ) )
980                {
981                    dbgmsg( msgs, "BT_REQUEST: peer is choked, ignoring" );
982                    /* Didn't he get it? */
983                    tr_peerMsgsSetChoke( msgs, 1 );
984                    tr_free( req );
985                    break;
986                }
987                else
988                {
989                    if ( !tr_bitfieldHas( msgs->peerAllowedPieces, req->index )
990                         || ( msgs->info->progress * (float)msgs->torrent->info.pieceCount) >= MAX_ALLOWED_SET_COUNT
991                         || !tr_cpPieceIsComplete( msgs->torrent->completion, req->index ) )
992                    {
993                        dbgmsg( msgs, "BT_REQUEST: peer requests an un-fastallowed piece" );
994                        sendFastReject( msgs, req->index, req->offset, req->length );
995                        tr_free( req );
996                        break;
997                    }
998                    dbgmsg( msgs, "BT_REQUEST: fast allowed piece, accepting request" );
999                }   
1000            }
1001           
1002            tr_list_append( &msgs->peerAskedFor, req );
1003            break;
1004        }
1005
1006        case BT_CANCEL: {
1007            struct peer_request req;
1008            void * data;
1009            assert( msglen == 12 );
1010            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1011            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1012            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1013            dbgmsg( msgs, "got a Cancel %u:%u->%u", req.index, req.offset, req.length );
1014            data = tr_list_remove( &msgs->peerAskedFor, &req, compareRequest );
1015            tr_free( data );
1016            break;
1017        }
1018
1019        case BT_PIECE: {
1020            dbgmsg( msgs, "got a Piece!" );
1021            assert( msgs->blockToUs.length == 0 );
1022            tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.index );
1023            tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.offset );
1024            msgs->blockToUs.length = msglen - 8;
1025            assert( EVBUFFER_LENGTH(msgs->inBlock) == 0 );
1026            msgs->state = msgs->blockToUs.length ? READING_BT_PIECE : AWAITING_BT_LENGTH;
1027            return READ_AGAIN;
1028            break;
1029        }
1030
1031        case BT_PORT: {
1032            dbgmsg( msgs, "Got a BT_PORT" );
1033            assert( msglen == 2 );
1034            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1035            break;
1036        }
1037       
1038        case BT_SUGGEST: {
1039            /* tiennou TODO */
1040            break;
1041        }
1042           
1043        case BT_HAVE_ALL: {
1044            assert( msglen == 0 );
1045            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1046            memset( msgs->info->have->bits, 1, msgs->info->have->len );
1047            updatePeerProgress( msgs );
1048            break;
1049        }
1050           
1051        case BT_HAVE_NONE: {
1052            assert( msglen == 0 );
1053            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1054            memset( msgs->info->have->bits, 1, msgs->info->have->len );
1055            updatePeerProgress( msgs );
1056            break;
1057        }
1058           
1059        case BT_REJECT: {
1060            struct peer_request req;
1061            tr_list * node;
1062            assert( msglen == 12 );
1063            dbgmsg( msgs, "Got a BT_REJECT" );
1064            tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
1065            tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
1066            tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
1067            node = tr_list_find( msgs->peerAskedFor, &req, compareRequest );
1068            if( node != NULL ) {
1069                void * data = node->data;
1070                tr_list_remove_data( &msgs->peerAskedFor, data );
1071                tr_free( data );
1072                dbgmsg( msgs, "found the req that peer has rejected... cancelled." );
1073            }
1074            break;
1075        }
1076           
1077        case BT_ALLOWED_FAST: {
1078            assert( msglen == 4 );
1079            dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
1080            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1081            tr_bitfieldAdd( msgs->clientAllowedPieces, ui32 );
1082            break;
1083        }
1084           
1085        case BT_LTEP:
1086            dbgmsg( msgs, "Got a BT_LTEP" );
1087            parseLtep( msgs, msglen, inbuf );
1088            break;
1089
1090        default:
1091            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1092            tr_peerIoDrain( msgs->io, inbuf, msglen );
1093            assert( 0 );
1094    }
1095
1096    msgs->incomingMessageLength = -1;
1097    msgs->state = AWAITING_BT_LENGTH;
1098    return READ_AGAIN;
1099}
1100
1101static void
1102clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1103{
1104    tr_torrent * tor = msgs->torrent;
1105    tor->activityDate = tr_date( );
1106    tor->downloadedCur += byteCount;
1107    msgs->info->pieceDataActivityDate = time( NULL );
1108    tr_rcTransferred( msgs->info->rcToClient, byteCount );
1109    tr_rcTransferred( tor->download, byteCount );
1110    tr_rcTransferred( tor->handle->download, byteCount );
1111}
1112
1113static void
1114peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
1115{
1116    tr_torrent * tor = msgs->torrent;
1117    tor->activityDate = tr_date( );
1118    tor->uploadedCur += byteCount;
1119    msgs->info->pieceDataActivityDate = time( NULL );
1120    tr_rcTransferred( msgs->info->rcToPeer, byteCount );
1121    tr_rcTransferred( tor->upload, byteCount );
1122    tr_rcTransferred( tor->handle->upload, byteCount );
1123}
1124
1125static int
1126canDownload( const tr_peermsgs * msgs )
1127{
1128    tr_torrent * tor = msgs->torrent;
1129
1130    if( tor->downloadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1131        return !tor->handle->useDownloadLimit || tr_rcCanTransfer( tor->handle->download );
1132
1133    if( tor->downloadLimitMode == TR_SPEEDLIMIT_SINGLE )
1134        return tr_rcCanTransfer( tor->download );
1135
1136    return TRUE;
1137}
1138
1139static void
1140reassignBytesToCorrupt( tr_peermsgs * msgs, uint32_t byteCount )
1141{
1142    tr_torrent * tor = msgs->torrent;
1143
1144    /* increment the `corrupt' field */
1145    tor->corruptCur += byteCount;
1146
1147    /* decrement the `downloaded' field */
1148    if( tor->downloadedCur >= byteCount )
1149        tor->downloadedCur -= byteCount;
1150    else
1151        tor->downloadedCur = 0;
1152}
1153
1154
1155static void
1156gotBadPiece( tr_peermsgs * msgs, uint32_t pieceIndex )
1157{
1158    const uint32_t byteCount = tr_torPieceCountBytes( msgs->torrent, (int)pieceIndex );
1159    reassignBytesToCorrupt( msgs, byteCount );
1160}
1161
1162static void
1163gotUnwantedBlock( tr_peermsgs * msgs,
1164                  uint32_t      index UNUSED,
1165                  uint32_t      offset UNUSED,
1166                  uint32_t      length )
1167{
1168    reassignBytesToCorrupt( msgs, length );
1169}
1170
1171static void
1172addUsToBlamefield( tr_peermsgs * msgs, uint32_t index )
1173{
1174    if( !msgs->info->blame )
1175         msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1176    tr_bitfieldAdd( msgs->info->blame, index );
1177}
1178
1179static void
1180gotBlock( tr_peermsgs      * msgs,
1181          struct evbuffer  * inbuf,
1182          uint32_t           index,
1183          uint32_t           offset,
1184          uint32_t           length )
1185{
1186    tr_torrent * tor = msgs->torrent;
1187    const int block = _tr_block( tor, index, offset );
1188    struct peer_request key, *req;
1189
1190    /**
1191    *** Remove the block from our `we asked for this' list
1192    **/
1193
1194    key.index = index;
1195    key.offset = offset;
1196    key.length = length;
1197    req = (struct peer_request*) tr_list_remove( &msgs->clientAskedFor, &key,
1198                                                 compareRequest );
1199    if( req == NULL ) {
1200        gotUnwantedBlock( msgs, index, offset, length );
1201        dbgmsg( msgs, "we didn't ask for this message..." );
1202        return;
1203    }
1204    dbgmsg( msgs, "Got block %u:%u->%u (turnaround time %d secs)", 
1205                     req->index, req->offset, req->length,
1206                     (int)(time(NULL) - req->time_requested) );
1207    tr_free( req );
1208    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1209                  tr_list_size(msgs->clientAskedFor));
1210
1211    /**
1212    *** Error checks
1213    **/
1214
1215    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1216        dbgmsg( msgs, "have this block already..." );
1217        tr_dbg( "have this block already..." );
1218        gotUnwantedBlock( msgs, index, offset, length );
1219        return;
1220    }
1221
1222    if( (int)length != tr_torBlockCountBytes( tor, block ) ) {
1223        dbgmsg( msgs, "block is the wrong length..." );
1224        tr_dbg( "block is the wrong length..." );
1225        gotUnwantedBlock( msgs, index, offset, length );
1226        return;
1227    }
1228
1229    /**
1230    ***  Write the block
1231    **/
1232
1233    if( tr_ioWrite( tor, index, offset, length, EVBUFFER_DATA( inbuf )))
1234        return;
1235
1236    tr_cpBlockAdd( tor->completion, block );
1237
1238    addUsToBlamefield( msgs, index );
1239
1240    fireGotBlock( msgs, index, offset, length );
1241
1242    /**
1243    ***  Handle if this was the last block in the piece
1244    **/
1245
1246    if( tr_cpPieceIsComplete( tor->completion, index ) )
1247    {
1248        if( tr_ioHash( tor, index ) )
1249        {
1250            gotBadPiece( msgs, index );
1251            return;
1252        }
1253
1254        fireClientHave( msgs, index );
1255    }
1256}
1257
1258
1259static ReadState
1260readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf )
1261{
1262    uint32_t inlen;
1263    uint8_t * tmp;
1264
1265    assert( msgs != NULL );
1266    assert( msgs->blockToUs.length > 0 );
1267    assert( inbuf != NULL );
1268    assert( EVBUFFER_LENGTH( inbuf ) > 0 );
1269
1270    /* read from the inbuf into our block buffer */
1271    inlen = MIN( EVBUFFER_LENGTH(inbuf), msgs->blockToUs.length );
1272    tmp = tr_new( uint8_t, inlen );
1273    tr_peerIoReadBytes( msgs->io, inbuf, tmp, inlen );
1274    evbuffer_add( msgs->inBlock, tmp, inlen );
1275
1276    /* update our tables accordingly */
1277    assert( inlen >= msgs->blockToUs.length );
1278    msgs->blockToUs.length -= inlen;
1279    msgs->info->peerSentPieceDataAt = time( NULL );
1280    clientGotBytes( msgs, inlen );
1281
1282    /* if this was the entire block, save it */
1283    if( !msgs->blockToUs.length )
1284    {
1285        dbgmsg( msgs, "got block %u:%u", msgs->blockToUs.index, msgs->blockToUs.offset );
1286        assert( (int)EVBUFFER_LENGTH( msgs->inBlock ) == tr_torBlockCountBytes( msgs->torrent, _tr_block(msgs->torrent,msgs->blockToUs.index, msgs->blockToUs.offset) ) );
1287        gotBlock( msgs, msgs->inBlock,
1288                        msgs->blockToUs.index,
1289                        msgs->blockToUs.offset,
1290                        EVBUFFER_LENGTH( msgs->inBlock ) );
1291        evbuffer_drain( msgs->inBlock, ~0 );
1292        msgs->state = AWAITING_BT_LENGTH;
1293    }
1294
1295    /* cleanup */
1296    tr_free( tmp );
1297    return READ_AGAIN;
1298}
1299
1300static ReadState
1301canRead( struct bufferevent * evin, void * vmsgs )
1302{
1303    ReadState ret;
1304    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1305    struct evbuffer * inbuf = EVBUFFER_INPUT ( evin );
1306
1307    if( !canDownload( msgs ) )
1308    {
1309        msgs->notListening = 1;
1310        tr_peerIoSetIOMode ( msgs->io, 0, EV_READ );
1311        ret = READ_DONE;
1312    }
1313    else switch( msgs->state )
1314    {
1315        case AWAITING_BT_LENGTH:  ret = readBtLength  ( msgs, inbuf ); break;
1316        case AWAITING_BT_MESSAGE: ret = readBtMessage ( msgs, inbuf ); break;
1317        case READING_BT_PIECE:    ret = readBtPiece   ( msgs, inbuf ); break;
1318        default: assert( 0 );
1319    }
1320
1321    return ret;
1322}
1323
1324static void
1325sendKeepalive( tr_peermsgs * msgs )
1326{
1327    dbgmsg( msgs, "sending a keepalive message" );
1328    tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1329}
1330
1331/**
1332***
1333**/
1334
1335static int
1336canWrite( const tr_peermsgs * msgs )
1337{
1338    /* don't let our outbuffer get too large */
1339    if( tr_peerIoWriteBytesWaiting( msgs->io ) > 8192 )
1340        return FALSE;
1341
1342    return TRUE;
1343}
1344
1345static int
1346canUpload( const tr_peermsgs * msgs )
1347{
1348    const tr_torrent * tor = msgs->torrent;
1349
1350    if( !canWrite( msgs ) )
1351        return FALSE;
1352
1353    if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
1354        return !tor->handle->useUploadLimit || tr_rcCanTransfer( tor->handle->upload );
1355
1356    if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
1357        return tr_rcCanTransfer( tor->upload );
1358
1359    return TRUE;
1360}
1361
1362static int
1363ratePulse( void * vmsgs )
1364{
1365    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1366    msgs->info->rateToClient = tr_rcRate( msgs->info->rcToClient );
1367    msgs->info->rateToPeer = tr_rcRate( msgs->info->rcToPeer );
1368    msgs->maxActiveRequests = MIN( 8 + (int)(msgs->info->rateToClient/10), 100 );
1369    msgs->minActiveRequests = msgs->maxActiveRequests / 2;
1370    return TRUE;
1371}
1372
1373static int
1374pulse( void * vmsgs )
1375{
1376    const time_t now = time( NULL );
1377    tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
1378    size_t len;
1379
1380    /* if we froze out a downloaded block because of speed limits,
1381       start listening to the peer again */
1382    if( msgs->notListening && canDownload( msgs ) )
1383    {
1384        msgs->notListening = 0;
1385        tr_peerIoSetIOMode ( msgs->io, EV_READ, 0 );
1386    }
1387
1388    pumpRequestQueue( msgs );
1389
1390    if( !canWrite( msgs ) )
1391    {
1392    }
1393    else if(( len = EVBUFFER_LENGTH( msgs->outMessages ) ))
1394    {
1395        tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
1396        msgs->clientSentAnythingAt = now;
1397    }
1398    else if(( msgs->peerAskedFor ))
1399    {
1400        if( canUpload( msgs ) )
1401        {
1402            struct peer_request * r = tr_list_pop_front( &msgs->peerAskedFor );
1403            uint8_t * buf = tr_new( uint8_t, r->length );
1404
1405            if( requestIsValid( msgs, r )
1406                && tr_cpPieceIsComplete( msgs->torrent->completion, r->index )
1407                && !tr_ioRead( msgs->torrent, r->index, r->offset, r->length, buf ) )
1408            {
1409                protocolSendPiece( msgs, r, buf );
1410                peerGotBytes( msgs, r->length );
1411                msgs->clientSentAnythingAt = now;
1412            }
1413
1414            tr_free( buf );
1415            tr_free( r );
1416        }
1417    }
1418    else if( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
1419    {
1420        sendKeepalive( msgs );
1421    }
1422
1423    return TRUE; /* loop forever */
1424}
1425
1426static void
1427didWrite( struct bufferevent * evin UNUSED, void * vmsgs )
1428{
1429    pulse( vmsgs );
1430}
1431
1432static void
1433gotError( struct bufferevent * evbuf UNUSED, short what UNUSED, void * vpeer )
1434{
1435    fireGotError( vpeer );
1436}
1437
1438static void
1439sendBitfield( tr_peermsgs * msgs )
1440{
1441    const tr_bitfield * bitfield = tr_cpPieceBitfield( msgs->torrent->completion );
1442    struct evbuffer * out = msgs->outMessages;
1443
1444    dbgmsg( msgs, "sending peer a bitfield message" );
1445    tr_peerIoWriteUint32( msgs->io, out, sizeof(uint8_t) + bitfield->len );
1446    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1447    tr_peerIoWriteBytes ( msgs->io, out, bitfield->bits, bitfield->len );
1448}
1449
1450/**
1451***
1452**/
1453
1454/* some peers give us error messages if we send
1455   more than this many peers in a single pex message */
1456#define MAX_PEX_DIFFS 200
1457
1458typedef struct
1459{
1460    tr_pex * added;
1461    tr_pex * dropped;
1462    tr_pex * elements;
1463    int addedCount;
1464    int droppedCount;
1465    int elementCount;
1466    int diffCount;
1467}
1468PexDiffs;
1469
1470static void
1471pexAddedCb( void * vpex, void * userData )
1472{
1473    PexDiffs * diffs = (PexDiffs *) userData;
1474    tr_pex * pex = (tr_pex *) vpex;
1475    if( diffs->diffCount < MAX_PEX_DIFFS )
1476    {
1477        diffs->diffCount++;
1478        diffs->added[diffs->addedCount++] = *pex;
1479        diffs->elements[diffs->elementCount++] = *pex;
1480    }
1481}
1482
1483static void
1484pexRemovedCb( void * vpex, void * userData )
1485{
1486    PexDiffs * diffs = (PexDiffs *) userData;
1487    tr_pex * pex = (tr_pex *) vpex;
1488    if( diffs->diffCount < MAX_PEX_DIFFS )
1489    {
1490        diffs->diffCount++;
1491        diffs->dropped[diffs->droppedCount++] = *pex;
1492    }
1493}
1494
1495static void
1496pexElementCb( void * vpex, void * userData )
1497{
1498    PexDiffs * diffs = (PexDiffs *) userData;
1499    tr_pex * pex = (tr_pex *) vpex;
1500    if( diffs->diffCount < MAX_PEX_DIFFS )
1501    {
1502        diffs->diffCount++;
1503        diffs->elements[diffs->elementCount++] = *pex;
1504    }
1505}
1506
1507static void
1508sendPex( tr_peermsgs * msgs )
1509{
1510    if( msgs->peerSupportsPex && !msgs->torrent->pexDisabled )
1511    {
1512        int i;
1513        tr_pex * newPex = NULL;
1514        const int newCount = tr_peerMgrGetPeers( msgs->handle->peerMgr, msgs->torrent->info.hash, &newPex );
1515        PexDiffs diffs;
1516        benc_val_t val, *added, *dropped, *flags;
1517        uint8_t *tmp, *walk;
1518        char * benc;
1519        int bencLen;
1520
1521        /* build the diffs */
1522        diffs.added = tr_new( tr_pex, newCount );
1523        diffs.addedCount = 0;
1524        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1525        diffs.droppedCount = 0;
1526        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1527        diffs.elementCount = 0;
1528        diffs.diffCount = 0;
1529        tr_set_compare( msgs->pex, msgs->pexCount,
1530                        newPex, newCount,
1531                        tr_pexCompare, sizeof(tr_pex),
1532                        pexRemovedCb, pexAddedCb, pexElementCb, &diffs );
1533        dbgmsg( msgs, "pex: old peer count %d, new peer count %d, added %d, removed %d", msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1534
1535        /* update peer */
1536        tr_free( msgs->pex );
1537        msgs->pex = diffs.elements;
1538        msgs->pexCount = diffs.elementCount;
1539
1540        /* build the pex payload */
1541        tr_bencInit( &val, TYPE_DICT );
1542        tr_bencDictReserve( &val, 3 );
1543
1544        /* "added" */
1545        added = tr_bencDictAdd( &val, "added" );
1546        tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1547        for( i=0; i<diffs.addedCount; ++i ) {
1548            memcpy( walk, &diffs.added[i].in_addr, 4 ); walk += 4;
1549            memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1550        }
1551        assert( ( walk - tmp ) == diffs.addedCount * 6 );
1552        tr_bencInitStr( added, tmp, walk-tmp, FALSE );
1553
1554        /* "added.f" */
1555        flags = tr_bencDictAdd( &val, "added.f" );
1556        tmp = walk = tr_new( uint8_t, diffs.addedCount );
1557        for( i=0; i<diffs.addedCount; ++i )
1558            *walk++ = diffs.added[i].flags;
1559        assert( ( walk - tmp ) == diffs.addedCount );
1560        tr_bencInitStr( flags, tmp, walk-tmp, FALSE );
1561
1562        /* "dropped" */
1563        dropped = tr_bencDictAdd( &val, "dropped" );
1564        tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1565        for( i=0; i<diffs.droppedCount; ++i ) {
1566            memcpy( walk, &diffs.dropped[i].in_addr, 4 ); walk += 4;
1567            memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1568        }
1569        assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1570        tr_bencInitStr( dropped, tmp, walk-tmp, FALSE );
1571
1572        /* write the pex message */
1573        benc = tr_bencSaveMalloc( &val, &bencLen );
1574        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 2*sizeof(uint8_t) + bencLen );
1575        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_LTEP );
1576        tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, OUR_LTEP_PEX );
1577        tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, benc, bencLen );
1578
1579        /* cleanup */
1580        tr_free( benc );
1581        tr_bencFree( &val );
1582        tr_free( diffs.added );
1583        tr_free( diffs.dropped );
1584        tr_free( newPex );
1585
1586        msgs->clientSentPexAt = time( NULL );
1587    }
1588}
1589
1590static int
1591pexPulse( void * vpeer )
1592{
1593    sendPex( vpeer );
1594    return TRUE;
1595}
1596
1597/**
1598***
1599**/
1600
1601tr_peermsgs*
1602tr_peerMsgsNew( struct tr_torrent * torrent,
1603                struct tr_peer    * info,
1604                tr_delivery_func    func,
1605                void              * userData,
1606                tr_publisher_tag  * setme )
1607{
1608    tr_peermsgs * m;
1609
1610    assert( info != NULL );
1611    assert( info->io != NULL );
1612
1613    m = tr_new0( tr_peermsgs, 1 );
1614    m->publisher = tr_publisherNew( );
1615    m->info = info;
1616    m->handle = torrent->handle;
1617    m->torrent = torrent;
1618    m->io = info->io;
1619    m->info->clientIsChoked = 1;
1620    m->info->peerIsChoked = 1;
1621    m->info->clientIsInterested = 0;
1622    m->info->peerIsInterested = 0;
1623    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1624    m->pulseTimer = tr_timerNew( m->handle, pulse, m, PEER_PULSE_INTERVAL );
1625    m->rateTimer = tr_timerNew( m->handle, ratePulse, m, RATE_PULSE_INTERVAL );
1626    m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL );
1627    m->outMessages = evbuffer_new( );
1628    m->inBlock = evbuffer_new( );
1629    m->peerAllowedPieces = NULL;
1630    m->clientAllowedPieces = NULL;
1631    setme = tr_publisherSubscribe( m->publisher, func, userData );
1632   
1633    if ( tr_peerIoSupportsFEXT( m->io ) )
1634    {
1635        /* This peer is fastpeer-enabled, generate its allowed set
1636         * (before registering our callbacks) */
1637        if ( !m->peerAllowedPieces ) {
1638            const struct in_addr *peerAddr = tr_peerIoGetAddress( m->io, NULL );
1639           
1640            m->peerAllowedPieces = tr_peerMgrGenerateAllowedSet( MAX_ALLOWED_SET_COUNT,
1641                                                                 m->torrent->info.pieceCount,
1642                                                                 m->torrent->info.hash,
1643                                                                 peerAddr );
1644        }
1645        m->clientAllowedPieces = tr_bitfieldNew( m->torrent->info.pieceCount );
1646    }
1647   
1648    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
1649    tr_peerIoSetIOMode( m->io, EV_READ|EV_WRITE, 0 );
1650    ratePulse( m );
1651
1652    /**
1653    ***  If we initiated this connection,
1654    ***  we may need to send LTEP/AZMP handshakes.
1655    ***  Otherwise we'll wait for the peer to send theirs first.
1656    **/
1657    if( !tr_peerIoIsIncoming( m->io ) )
1658    {
1659        if ( tr_peerIoSupportsLTEP( m->io ) ) {
1660            sendLtepHandshake( m );
1661           
1662        } else if ( tr_peerIoSupportsAZMP( m->io ) ) {
1663            dbgmsg( m, "FIXME: need to send AZMP handshake" );
1664           
1665        } else {
1666            /* no-op */
1667        }
1668    }
1669   
1670    if ( tr_peerIoSupportsFEXT( m->io ) )
1671    {
1672        /* This peer is fastpeer-enabled, send it have-all or have-none if appropriate */
1673        float completion = tr_cpPercentComplete( m->torrent->completion );
1674        if ( completion == 0.0f ) {
1675            sendFastHave( m, 0 );
1676        } else if ( completion == 1.0f ) {
1677            sendFastHave( m, 1 );
1678        } else {
1679            sendBitfield( m );
1680        }
1681        uint32_t peerProgress = m->torrent->info.pieceCount * m->info->progress;
1682       
1683        if ( peerProgress < MAX_ALLOWED_SET_COUNT )
1684            sendFastAllowedSet( m );
1685    } else {
1686        sendBitfield( m );
1687    }
1688    return m;
1689}
1690
1691void
1692tr_peerMsgsFree( tr_peermsgs* msgs )
1693{
1694    if( msgs != NULL )
1695    {
1696        tr_timerFree( &msgs->pulseTimer );
1697        tr_timerFree( &msgs->rateTimer );
1698        tr_timerFree( &msgs->pexTimer );
1699        tr_publisherFree( &msgs->publisher );
1700        tr_list_free( &msgs->clientWillAskFor, tr_free );
1701        tr_list_free( &msgs->clientAskedFor, tr_free );
1702        tr_list_free( &msgs->peerAskedFor, tr_free );
1703        evbuffer_free( msgs->outMessages );
1704        evbuffer_free( msgs->inBlock );
1705        tr_free( msgs->pex );
1706        msgs->pexCount = 0;
1707        tr_free( msgs );
1708    }
1709}
1710
1711tr_publisher_tag
1712tr_peerMsgsSubscribe( tr_peermsgs       * peer,
1713                      tr_delivery_func    func,
1714                      void              * userData )
1715{
1716    return tr_publisherSubscribe( peer->publisher, func, userData );
1717}
1718
1719void
1720tr_peerMsgsUnsubscribe( tr_peermsgs       * peer,
1721                        tr_publisher_tag    tag )
1722{
1723    tr_publisherUnsubscribe( peer->publisher, tag );
1724}
1725
1726int
1727tr_peerMsgIsPieceFastAllowed( const tr_peermsgs * peer,
1728                              uint32_t            index )
1729{
1730    return tr_bitfieldHas( peer->clientAllowedPieces, index );
1731}
1732
Note: See TracBrowser for help on using the repository browser.