source: trunk/libtransmission/peer-msgs.c @ 7231

Last change on this file since 7231 was 7231, checked in by charles, 12 years ago

(libT) re-apply jhujhiti's IPv6 patch. This merges in my tr_port cleanup, so any new bugs are mine :/

  • Property svn:keywords set to Date Rev Author Id
File size: 53.9 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 7231 2008-12-02 03:41:58Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <event.h>
21
22#include "transmission.h"
23#include "bencode.h"
24#include "completion.h"
25#include "crypto.h"
26#include "inout.h"
27#include "iobuf.h"
28#ifdef WIN32
29#include "net.h" /* for ECONN */
30#endif
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-mgr-private.h"
34#include "peer-msgs.h"
35#include "ratecontrol.h"
36#include "stats.h"
37#include "torrent.h"
38#include "trevent.h"
39#include "utils.h"
40
41/**
42***
43**/
44
45enum
46{
47    BT_CHOKE                = 0,
48    BT_UNCHOKE              = 1,
49    BT_INTERESTED           = 2,
50    BT_NOT_INTERESTED       = 3,
51    BT_HAVE                 = 4,
52    BT_BITFIELD             = 5,
53    BT_REQUEST              = 6,
54    BT_PIECE                = 7,
55    BT_CANCEL               = 8,
56    BT_PORT                 = 9,
57    BT_SUGGEST              = 13,
58    BT_HAVE_ALL             = 14,
59    BT_HAVE_NONE            = 15,
60    BT_REJECT               = 16,
61    BT_LTEP                 = 20,
62
63    LTEP_HANDSHAKE          = 0,
64
65    TR_LTEP_PEX             = 1,
66
67    MIN_CHOKE_PERIOD_SEC    = ( 10 ),
68
69    /* idle seconds before we send a keepalive */
70    KEEPALIVE_INTERVAL_SECS = 100,
71
72    PEX_INTERVAL            = ( 90 * 1000 ), /* msec between sendPex() calls */
73    PEER_PULSE_INTERVAL     = ( 250 ),       /* msec between peerPulse() calls
74                                               */
75
76    MAX_QUEUE_SIZE          = ( 100 ),
77
78    /* how long an unsent request can stay queued before it's returned
79       back to the peer-mgr's pool of requests */
80    QUEUED_REQUEST_TTL_SECS = 20,
81
82    /* how long a sent request can stay queued before it's returned
83       back to the peer-mgr's pool of requests */
84    SENT_REQUEST_TTL_SECS = 240,
85
86    /* used in lowering the outMessages queue period */
87    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
88    HIGH_PRIORITY_INTERVAL_SECS = 2,
89    LOW_PRIORITY_INTERVAL_SECS = 20,
90
91    /* number of pieces to remove from the bitfield when
92     * lazy bitfields are turned on */
93    LAZY_PIECE_COUNT = 26
94};
95
96/**
97***  REQUEST MANAGEMENT
98**/
99
100enum
101{
102    AWAITING_BT_LENGTH,
103    AWAITING_BT_ID,
104    AWAITING_BT_MESSAGE,
105    AWAITING_BT_PIECE
106};
107
108struct peer_request
109{
110    uint32_t    index;
111    uint32_t    offset;
112    uint32_t    length;
113    time_t      time_requested;
114};
115
116static int
117compareRequest( const void * va,
118                const void * vb )
119{
120    const struct peer_request * a = va;
121    const struct peer_request * b = vb;
122
123    if( a->index != b->index )
124        return a->index < b->index ? -1 : 1;
125
126    if( a->offset != b->offset )
127        return a->offset < b->offset ? -1 : 1;
128
129    if( a->length != b->length )
130        return a->length < b->length ? -1 : 1;
131
132    return 0;
133}
134
135struct request_list
136{
137    uint16_t               count;
138    uint16_t               max;
139    struct peer_request *  requests;
140};
141
142static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
143
144static void
145reqListReserve( struct request_list * list,
146                uint16_t              max )
147{
148    if( list->max < max )
149    {
150        list->max = max;
151        list->requests = tr_renew( struct peer_request,
152                                   list->requests,
153                                   list->max );
154    }
155}
156
157static void
158reqListClear( struct request_list * list )
159{
160    tr_free( list->requests );
161    *list = REQUEST_LIST_INIT;
162}
163
164static void
165reqListCopy( struct request_list *       dest,
166             const struct request_list * src )
167{
168    dest->count = dest->max = src->count;
169    dest->requests =
170        tr_memdup( src->requests, dest->count * sizeof( struct peer_request ) );
171}
172
173static void
174reqListRemoveOne( struct request_list * list,
175                  int                   i )
176{
177    assert( 0 <= i && i < list->count );
178
179    memmove( &list->requests[i],
180            &list->requests[i + 1],
181            sizeof( struct peer_request ) * ( --list->count - i ) );
182}
183
184static void
185reqListAppend( struct request_list *       list,
186               const struct peer_request * req )
187{
188    if( ++list->count >= list->max )
189        reqListReserve( list, list->max + 8 );
190
191    list->requests[list->count - 1] = *req;
192}
193
194static int
195reqListPop( struct request_list * list,
196            struct peer_request * setme )
197{
198    int success;
199
200    if( !list->count )
201        success = FALSE;
202    else {
203        *setme = list->requests[0];
204        reqListRemoveOne( list, 0 );
205        success = TRUE;
206    }
207
208    return success;
209}
210
211static int
212reqListFind( struct request_list *       list,
213             const struct peer_request * key )
214{
215    uint16_t i;
216
217    for( i = 0; i < list->count; ++i )
218        if( !compareRequest( key, list->requests + i ) )
219            return i;
220
221    return -1;
222}
223
224static int
225reqListRemove( struct request_list *       list,
226               const struct peer_request * key )
227{
228    int success;
229    const int i = reqListFind( list, key );
230
231    if( i < 0 )
232        success = FALSE;
233    else {
234        reqListRemoveOne( list, i );
235        success = TRUE;
236    }
237
238    return success;
239}
240
241/**
242***
243**/
244
245/* this is raw, unchanged data from the peer regarding
246 * the current message that it's sending us. */
247struct tr_incoming
248{
249    uint8_t                id;
250    uint32_t               length; /* includes the +1 for id length */
251    struct peer_request    blockReq; /* metadata for incoming blocks */
252    struct evbuffer *      block; /* piece data for incoming blocks */
253};
254
255struct tr_peermsgs
256{
257    tr_bool         peerSentBitfield;
258    tr_bool         peerSupportsPex;
259    tr_bool         clientSentLtepHandshake;
260    tr_bool         peerSentLtepHandshake;
261
262    uint8_t         state;
263    uint8_t         ut_pex_id;
264    uint16_t        pexCount;
265    uint16_t        minActiveRequests;
266    uint16_t        maxActiveRequests;
267
268    /* how long the outMessages batch should be allowed to grow before
269     * it's flushed -- some messages (like requests >:) should be sent
270     * very quickly; others aren't as urgent. */
271    int                    outMessagesBatchPeriod;
272
273    tr_peer *              info;
274
275    tr_session *           session;
276    tr_torrent *           torrent;
277    tr_peerIo *            io;
278
279    tr_publisher_t *       publisher;
280
281    struct evbuffer *      outMessages; /* all the non-piece messages */
282
283    struct request_list    peerAskedFor;
284    struct request_list    clientAskedFor;
285    struct request_list    clientWillAskFor;
286
287    tr_timer *             pexTimer;
288
289    time_t                 clientSentPexAt;
290    time_t                 clientSentAnythingAt;
291
292    /* when we started batching the outMessages */
293    time_t                outMessagesBatchedAt;
294
295    tr_bitfield *         peerAllowedPieces;
296
297    struct tr_incoming    incoming;
298
299    tr_pex *              pex;
300};
301
302/**
303***
304**/
305
306static void
307myDebug( const char *               file,
308         int                        line,
309         const struct tr_peermsgs * msgs,
310         const char *               fmt,
311         ... )
312{
313    FILE * fp = tr_getLog( );
314
315    if( fp )
316    {
317        va_list           args;
318        char              timestr[64];
319        struct evbuffer * buf = evbuffer_new( );
320        char *            base = tr_basename( file );
321
322        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
323                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
324                             msgs->torrent->info.name,
325                             tr_peerIoGetAddrStr( msgs->io ),
326                             msgs->info->client );
327        va_start( args, fmt );
328        evbuffer_add_vprintf( buf, fmt, args );
329        va_end( args );
330        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
331        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
332
333        tr_free( base );
334        evbuffer_free( buf );
335    }
336}
337
338#define dbgmsg( msgs, ... ) \
339    do { \
340        if( tr_deepLoggingIsActive( ) ) \
341            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
342    } while( 0 )
343
344/**
345***
346**/
347
348static void
349pokeBatchPeriod( tr_peermsgs * msgs,
350                 int           interval )
351{
352    if( msgs->outMessagesBatchPeriod > interval )
353    {
354        msgs->outMessagesBatchPeriod = interval;
355        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
356    }
357}
358
359static void
360protocolSendRequest( tr_peermsgs *               msgs,
361                     const struct peer_request * req )
362{
363    tr_peerIo *       io = msgs->io;
364    struct evbuffer * out = msgs->outMessages;
365
366    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
367    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
368    tr_peerIoWriteUint32( io, out, req->index );
369    tr_peerIoWriteUint32( io, out, req->offset );
370    tr_peerIoWriteUint32( io, out, req->length );
371    dbgmsg( msgs, "requesting %u:%u->%u... outMessage size is now %d",
372           req->index, req->offset, req->length, (int)EVBUFFER_LENGTH( out ) );
373    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
374}
375
376static void
377protocolSendCancel( tr_peermsgs *               msgs,
378                    const struct peer_request * req )
379{
380    tr_peerIo *       io = msgs->io;
381    struct evbuffer * out = msgs->outMessages;
382
383    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
384    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
385    tr_peerIoWriteUint32( io, out, req->index );
386    tr_peerIoWriteUint32( io, out, req->offset );
387    tr_peerIoWriteUint32( io, out, req->length );
388    dbgmsg( msgs, "cancelling %u:%u->%u... outMessage size is now %d",
389           req->index, req->offset, req->length, (int)EVBUFFER_LENGTH( out ) );
390    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
391}
392
393static void
394protocolSendHave( tr_peermsgs * msgs,
395                  uint32_t      index )
396{
397    tr_peerIo *       io = msgs->io;
398    struct evbuffer * out = msgs->outMessages;
399
400    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + sizeof( uint32_t ) );
401    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
402    tr_peerIoWriteUint32( io, out, index );
403    dbgmsg( msgs, "sending Have %u.. outMessage size is now %d",
404           index, (int)EVBUFFER_LENGTH( out ) );
405    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
406}
407
408static void
409protocolSendChoke( tr_peermsgs * msgs,
410                   int           choke )
411{
412    tr_peerIo *       io = msgs->io;
413    struct evbuffer * out = msgs->outMessages;
414
415    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
416    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
417    dbgmsg( msgs, "sending %s... outMessage size is now %d",
418           ( choke ? "Choke" : "Unchoke" ),
419           (int)EVBUFFER_LENGTH( out ) );
420    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
421}
422
423/**
424***  EVENTS
425**/
426
427static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0 };
428
429static void
430publish( tr_peermsgs *   msgs,
431         tr_peer_event * e )
432{
433    tr_publisherPublish( msgs->publisher, msgs->info, e );
434}
435
436static void
437fireError( tr_peermsgs * msgs,
438           int           err )
439{
440    tr_peer_event e = blankEvent;
441
442    e.eventType = TR_PEER_ERROR;
443    e.err = err;
444    publish( msgs, &e );
445}
446
447static void
448fireNeedReq( tr_peermsgs * msgs )
449{
450    tr_peer_event e = blankEvent;
451
452    e.eventType = TR_PEER_NEED_REQ;
453    publish( msgs, &e );
454}
455
456static void
457firePeerProgress( tr_peermsgs * msgs )
458{
459    tr_peer_event e = blankEvent;
460
461    e.eventType = TR_PEER_PEER_PROGRESS;
462    e.progress = msgs->info->progress;
463    publish( msgs, &e );
464}
465
466static void
467fireGotBlock( tr_peermsgs *               msgs,
468              const struct peer_request * req )
469{
470    tr_peer_event e = blankEvent;
471
472    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
473    e.pieceIndex = req->index;
474    e.offset = req->offset;
475    e.length = req->length;
476    publish( msgs, &e );
477}
478
479static void
480fireClientGotData( tr_peermsgs * msgs,
481                   uint32_t      length,
482                   int           wasPieceData )
483{
484    tr_peer_event e = blankEvent;
485
486    e.length = length;
487    e.eventType = TR_PEER_CLIENT_GOT_DATA;
488    e.wasPieceData = wasPieceData;
489    publish( msgs, &e );
490}
491
492static void
493firePeerGotData( tr_peermsgs  * msgs,
494                 uint32_t       length,
495                 int            wasPieceData )
496{
497    tr_peer_event e = blankEvent;
498
499    e.length = length;
500    e.eventType = TR_PEER_PEER_GOT_DATA;
501    e.wasPieceData = wasPieceData;
502
503    publish( msgs, &e );
504}
505
506static void
507fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
508{
509    tr_peer_event e = blankEvent;
510    e.eventType = TR_PEER_CANCEL;
511    e.pieceIndex = req->index;
512    e.offset = req->offset;
513    e.length = req->length;
514    publish( msgs, &e );
515}
516
517/**
518***  INTEREST
519**/
520
521static int
522isPieceInteresting( const tr_peermsgs * peer,
523                    tr_piece_index_t    piece )
524{
525    const tr_torrent * torrent = peer->torrent;
526
527    return ( !torrent->info.pieces[piece].dnd )                 /* we want it */
528           && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have
529                                                                        */
530           && ( tr_bitfieldHas( peer->info->have, piece ) );   /* peer has it */
531}
532
533/* "interested" means we'll ask for piece data if they unchoke us */
534static int
535isPeerInteresting( const tr_peermsgs * msgs )
536{
537    tr_piece_index_t    i;
538    const tr_torrent *  torrent;
539    const tr_bitfield * bitfield;
540    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
541
542    if( clientIsSeed )
543        return FALSE;
544
545    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
546        return FALSE;
547
548    torrent = msgs->torrent;
549    bitfield = tr_cpPieceBitfield( torrent->completion );
550
551    if( !msgs->info->have )
552        return TRUE;
553
554    assert( bitfield->byteCount == msgs->info->have->byteCount );
555
556    for( i = 0; i < torrent->info.pieceCount; ++i )
557        if( isPieceInteresting( msgs, i ) )
558            return TRUE;
559
560    return FALSE;
561}
562
563static void
564sendInterest( tr_peermsgs * msgs,
565              int           weAreInterested )
566{
567    struct evbuffer * out = msgs->outMessages;
568
569    assert( msgs );
570    assert( weAreInterested == 0 || weAreInterested == 1 );
571
572    msgs->info->clientIsInterested = weAreInterested;
573    dbgmsg( msgs, "Sending %s",
574            weAreInterested ? "Interested" : "Not Interested" );
575    tr_peerIoWriteUint32( msgs->io, out, sizeof( uint8_t ) );
576    tr_peerIoWriteUint8 (
577        msgs->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
578    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
579    dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH( out ) );
580}
581
582static void
583updateInterest( tr_peermsgs * msgs )
584{
585    const int i = isPeerInteresting( msgs );
586
587    if( i != msgs->info->clientIsInterested )
588        sendInterest( msgs, i );
589    if( i )
590        fireNeedReq( msgs );
591}
592
593static void
594cancelAllRequestsToClient( tr_peermsgs * msgs )
595{
596    reqListClear( &msgs->peerAskedFor );
597}
598
599void
600tr_peerMsgsSetChoke( tr_peermsgs * msgs,
601                     int           choke )
602{
603    const time_t now = time( NULL );
604    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
605
606    assert( msgs );
607    assert( msgs->info );
608    assert( choke == 0 || choke == 1 );
609
610    if( msgs->info->chokeChangedAt > fibrillationTime )
611    {
612        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation",
613                choke );
614    }
615    else if( msgs->info->peerIsChoked != choke )
616    {
617        msgs->info->peerIsChoked = choke;
618        if( choke )
619            cancelAllRequestsToClient( msgs );
620        protocolSendChoke( msgs, choke );
621        msgs->info->chokeChangedAt = now;
622    }
623}
624
625/**
626***
627**/
628
629void
630tr_peerMsgsHave( tr_peermsgs * msgs,
631                 uint32_t      index )
632{
633    protocolSendHave( msgs, index );
634
635    /* since we have more pieces now, we might not be interested in this peer */
636    updateInterest( msgs );
637}
638
639/**
640***
641**/
642
643static int
644reqIsValid( const tr_peermsgs * peer,
645            uint32_t            index,
646            uint32_t            offset,
647            uint32_t            length )
648{
649    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
650}
651
652static int
653requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
654{
655    return reqIsValid( msgs, req->index, req->offset, req->length );
656}
657
658static void
659expireOldRequests( tr_peermsgs * msgs, const time_t now  )
660{
661    int                 i;
662    time_t              oldestAllowed;
663    struct request_list tmp = REQUEST_LIST_INIT;
664
665    /* cancel requests that have been queued for too long */
666    oldestAllowed = now - QUEUED_REQUEST_TTL_SECS;
667    reqListCopy( &tmp, &msgs->clientWillAskFor );
668    for( i = 0; i < tmp.count; ++i )
669    {
670        const struct peer_request * req = &tmp.requests[i];
671        if( req->time_requested < oldestAllowed )
672            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
673    }
674    reqListClear( &tmp );
675
676    /* cancel requests that were sent too long ago */
677    oldestAllowed = now - SENT_REQUEST_TTL_SECS;
678    reqListCopy( &tmp, &msgs->clientAskedFor );
679    for( i = 0; i < tmp.count; ++i )
680    {
681        const struct peer_request * req = &tmp.requests[i];
682        if( req->time_requested < oldestAllowed )
683            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
684    }
685    reqListClear( &tmp );
686}
687
688static void
689pumpRequestQueue( tr_peermsgs * msgs, const time_t now )
690{
691    const int           max = msgs->maxActiveRequests;
692    const int           min = msgs->minActiveRequests;
693    int                 sent = 0;
694    int                 count = msgs->clientAskedFor.count;
695    struct peer_request req;
696
697    if( count > min )
698        return;
699    if( msgs->info->clientIsChoked )
700        return;
701    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
702        return;
703
704    while( ( count < max ) && reqListPop( &msgs->clientWillAskFor, &req ) )
705    {
706        const tr_block_index_t block =
707            _tr_block( msgs->torrent, req.index, req.offset );
708
709        assert( requestIsValid( msgs, &req ) );
710        assert( tr_bitfieldHas( msgs->info->have, req.index ) );
711
712        /* don't ask for it if we've already got it... this block may have
713         * come in from a different peer after we cancelled a request for it */
714        if( !tr_cpBlockIsComplete( msgs->torrent->completion, block ) )
715        {
716            protocolSendRequest( msgs, &req );
717            req.time_requested = now;
718            reqListAppend( &msgs->clientAskedFor, &req );
719
720            ++count;
721            ++sent;
722        }
723    }
724
725    if( sent )
726        dbgmsg( msgs,
727                "pump sent %d requests, now have %d active and %d queued",
728                sent,
729                msgs->clientAskedFor.count,
730                msgs->clientWillAskFor.count );
731
732    if( count < max )
733        fireNeedReq( msgs );
734}
735
736static int
737requestQueueIsFull( const tr_peermsgs * msgs )
738{
739    const int req_max = msgs->maxActiveRequests;
740    return msgs->clientWillAskFor.count >= req_max;
741}
742
743tr_addreq_t
744tr_peerMsgsAddRequest( tr_peermsgs *    msgs,
745                       uint32_t         index,
746                       uint32_t         offset,
747                       uint32_t         length )
748{
749    struct peer_request req;
750
751    assert( msgs );
752    assert( msgs->torrent );
753    assert( reqIsValid( msgs, index, offset, length ) );
754
755    /**
756    ***  Reasons to decline the request
757    **/
758
759    /* don't send requests to choked clients */
760    if( msgs->info->clientIsChoked )
761    {
762        dbgmsg( msgs, "declining request because they're choking us" );
763        return TR_ADDREQ_CLIENT_CHOKED;
764    }
765
766    /* peer doesn't have this piece */
767    if( !tr_bitfieldHas( msgs->info->have, index ) )
768        return TR_ADDREQ_MISSING;
769
770    /* peer's queue is full */
771    if( requestQueueIsFull( msgs ) ) {
772        dbgmsg( msgs, "declining request because we're full" );
773        return TR_ADDREQ_FULL;
774    }
775
776    /* have we already asked for this piece? */
777    req.index = index;
778    req.offset = offset;
779    req.length = length;
780    if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
781        dbgmsg( msgs, "declining because it's a duplicate" );
782        return TR_ADDREQ_DUPLICATE;
783    }
784    if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
785        dbgmsg( msgs, "declining because it's a duplicate" );
786        return TR_ADDREQ_DUPLICATE;
787    }
788
789    /**
790    ***  Accept this request
791    **/
792
793    dbgmsg( msgs, "adding req for %"PRIu32":%"PRIu32"->%"PRIu32" to our `will request' list",
794            index, offset, length );
795    req.time_requested = time( NULL );
796    reqListAppend( &msgs->clientWillAskFor, &req );
797    return TR_ADDREQ_OK;
798}
799
800static void
801cancelAllRequestsToPeer( tr_peermsgs * msgs )
802{
803    int                 i;
804    struct request_list a = msgs->clientWillAskFor;
805    struct request_list b = msgs->clientAskedFor;
806    dbgmsg( msgs, "cancelling all requests to peer" );
807
808    msgs->clientAskedFor = REQUEST_LIST_INIT;
809    msgs->clientWillAskFor = REQUEST_LIST_INIT;
810
811    for( i=0; i<a.count; ++i )
812        fireCancelledReq( msgs, &a.requests[i] );
813
814    for( i = 0; i < b.count; ++i ) {
815        fireCancelledReq( msgs, &b.requests[i] );
816        protocolSendCancel( msgs, &b.requests[i] );
817    }
818
819    reqListClear( &a );
820    reqListClear( &b );
821}
822
823void
824tr_peerMsgsCancel( tr_peermsgs * msgs,
825                   uint32_t      pieceIndex,
826                   uint32_t      offset,
827                   uint32_t      length )
828{
829    struct peer_request req;
830
831    assert( msgs != NULL );
832    assert( length > 0 );
833
834
835    /* have we asked the peer for this piece? */
836    req.index = pieceIndex;
837    req.offset = offset;
838    req.length = length;
839
840    /* if it's only in the queue and hasn't been sent yet, free it */
841    if( reqListRemove( &msgs->clientWillAskFor, &req ) ) {
842        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32"\n", pieceIndex, offset, length );
843        fireCancelledReq( msgs, &req );
844    }
845
846    /* if it's already been sent, send a cancel message too */
847    if( reqListRemove( &msgs->clientAskedFor, &req ) ) {
848        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32"\n", pieceIndex, offset, length );
849        protocolSendCancel( msgs, &req );
850        fireCancelledReq( msgs, &req );
851    }
852}
853
854/**
855***
856**/
857
858static void
859sendLtepHandshake( tr_peermsgs * msgs )
860{
861    tr_benc           val, *m;
862    char *            buf;
863    int               len;
864    int               pex;
865    struct evbuffer * out = msgs->outMessages;
866
867    if( msgs->clientSentLtepHandshake )
868        return;
869
870    dbgmsg( msgs, "sending an ltep handshake" );
871    msgs->clientSentLtepHandshake = 1;
872
873    /* decide if we want to advertise pex support */
874    if( !tr_torrentAllowsPex( msgs->torrent ) )
875        pex = 0;
876    else if( msgs->peerSentLtepHandshake )
877        pex = msgs->peerSupportsPex ? 1 : 0;
878    else
879        pex = 1;
880
881    tr_bencInitDict( &val, 4 );
882    tr_bencDictAddInt( &val, "e",
883                       msgs->session->encryptionMode != TR_CLEAR_PREFERRED );
884    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->session ) );
885    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
886    m  = tr_bencDictAddDict( &val, "m", 1 );
887    if( pex )
888        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
889    buf = tr_bencSave( &val, &len );
890
891    tr_peerIoWriteUint32( msgs->io, out, 2 * sizeof( uint8_t ) + len );
892    tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
893    tr_peerIoWriteUint8 ( msgs->io, out, LTEP_HANDSHAKE );
894    tr_peerIoWriteBytes ( msgs->io, out, buf, len );
895    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
896    dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH( out ) );
897
898    /* cleanup */
899    tr_bencFree( &val );
900    tr_free( buf );
901}
902
903static void
904parseLtepHandshake( tr_peermsgs *     msgs,
905                    int               len,
906                    struct evbuffer * inbuf )
907{
908    int64_t   i;
909    tr_benc   val, * sub;
910    uint8_t * tmp = tr_new( uint8_t, len );
911
912    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
913    msgs->peerSentLtepHandshake = 1;
914
915    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type != TYPE_DICT )
916    {
917        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
918        tr_free( tmp );
919        return;
920    }
921
922    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
923
924    /* does the peer prefer encrypted connections? */
925    if( tr_bencDictFindInt( &val, "e", &i ) )
926        msgs->info->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
927                                            : ENCRYPTION_PREFERENCE_NO;
928
929    /* check supported messages for utorrent pex */
930    msgs->peerSupportsPex = 0;
931    if( tr_bencDictFindDict( &val, "m", &sub ) )
932    {
933        if( tr_bencDictFindInt( sub, "ut_pex", &i ) )
934        {
935            msgs->ut_pex_id = (uint8_t) i;
936            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
937            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
938        }
939    }
940
941    /* get peer's listening port */
942    if( tr_bencDictFindInt( &val, "p", &i ) )
943    {
944        msgs->info->port = htons( (uint16_t)i );
945        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
946    }
947
948    tr_bencFree( &val );
949    tr_free( tmp );
950}
951
952static void
953parseUtPex( tr_peermsgs *     msgs,
954            int               msglen,
955            struct evbuffer * inbuf )
956{
957    int                loaded = 0;
958    uint8_t *          tmp = tr_new( uint8_t, msglen );
959    tr_benc            val;
960    const tr_torrent * tor = msgs->torrent;
961    const uint8_t *    added;
962    size_t             added_len;
963
964    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
965
966    if( tr_torrentAllowsPex( tor )
967      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) )
968      && tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
969    {
970        const uint8_t * added_f = NULL;
971        tr_pex *        pex;
972        size_t          i, n;
973        size_t          added_f_len = 0;
974        tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
975        pex =
976            tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len,
977                                    &n );
978        for( i = 0; i < n; ++i )
979            tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
980                              TR_PEER_FROM_PEX, pex + i );
981        tr_free( pex );
982    }
983
984    if( loaded )
985        tr_bencFree( &val );
986    tr_free( tmp );
987}
988
989static void sendPex( tr_peermsgs * msgs );
990
991static void
992parseLtep( tr_peermsgs *     msgs,
993           int               msglen,
994           struct evbuffer * inbuf )
995{
996    uint8_t ltep_msgid;
997
998    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
999    msglen--;
1000
1001    if( ltep_msgid == LTEP_HANDSHAKE )
1002    {
1003        dbgmsg( msgs, "got ltep handshake" );
1004        parseLtepHandshake( msgs, msglen, inbuf );
1005        if( tr_peerIoSupportsLTEP( msgs->io ) )
1006        {
1007            sendLtepHandshake( msgs );
1008            sendPex( msgs );
1009        }
1010    }
1011    else if( ltep_msgid == TR_LTEP_PEX )
1012    {
1013        dbgmsg( msgs, "got ut pex" );
1014        msgs->peerSupportsPex = 1;
1015        parseUtPex( msgs, msglen, inbuf );
1016    }
1017    else
1018    {
1019        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1020        evbuffer_drain( inbuf, msglen );
1021    }
1022}
1023
1024static int
1025readBtLength( tr_peermsgs *     msgs,
1026              struct evbuffer * inbuf,
1027              size_t            inlen )
1028{
1029    uint32_t len;
1030
1031    if( inlen < sizeof( len ) )
1032        return READ_LATER;
1033
1034    tr_peerIoReadUint32( msgs->io, inbuf, &len );
1035
1036    if( len == 0 ) /* peer sent us a keepalive message */
1037        dbgmsg( msgs, "got KeepAlive" );
1038    else
1039    {
1040        msgs->incoming.length = len;
1041        msgs->state = AWAITING_BT_ID;
1042    }
1043
1044    return READ_NOW;
1045}
1046
1047static int readBtMessage( tr_peermsgs *     msgs,
1048                          struct evbuffer * inbuf,
1049                          size_t            inlen );
1050
1051static int
1052readBtId( tr_peermsgs *     msgs,
1053          struct evbuffer * inbuf,
1054          size_t            inlen )
1055{
1056    uint8_t id;
1057
1058    if( inlen < sizeof( uint8_t ) )
1059        return READ_LATER;
1060
1061    tr_peerIoReadUint8( msgs->io, inbuf, &id );
1062    msgs->incoming.id = id;
1063
1064    if( id == BT_PIECE )
1065    {
1066        msgs->state = AWAITING_BT_PIECE;
1067        return READ_NOW;
1068    }
1069    else if( msgs->incoming.length != 1 )
1070    {
1071        msgs->state = AWAITING_BT_MESSAGE;
1072        return READ_NOW;
1073    }
1074    else return readBtMessage( msgs, inbuf, inlen - 1 );
1075}
1076
1077static void
1078updatePeerProgress( tr_peermsgs * msgs )
1079{
1080    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have )
1081                           / (float)msgs->torrent->info.pieceCount;
1082    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
1083    updateInterest( msgs );
1084    firePeerProgress( msgs );
1085}
1086
1087static void
1088peerMadeRequest( tr_peermsgs *               msgs,
1089                 const struct peer_request * req )
1090{
1091    const int reqIsValid = requestIsValid( msgs, req );
1092    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete(
1093        msgs->torrent->completion, req->index );
1094    const int peerIsChoked = msgs->info->peerIsChoked;
1095
1096    if( !reqIsValid ) /* bad request */
1097    {
1098        dbgmsg( msgs, "rejecting an invalid request." );
1099    }
1100    else if( !clientHasPiece ) /* we don't have it */
1101    {
1102        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1103    }
1104    else if( peerIsChoked ) /* doesn't he know he's choked? */
1105    {
1106        tr_peerMsgsSetChoke( msgs, 1 );
1107    }
1108    else /* YAY */
1109    {
1110        reqListAppend( &msgs->peerAskedFor, req );
1111    }
1112}
1113
1114static int
1115messageLengthIsCorrect( const tr_peermsgs * msg,
1116                        uint8_t             id,
1117                        uint32_t            len )
1118{
1119    switch( id )
1120    {
1121        case BT_CHOKE:
1122        case BT_UNCHOKE:
1123        case BT_INTERESTED:
1124        case BT_NOT_INTERESTED:
1125        case BT_HAVE_ALL:
1126        case BT_HAVE_NONE:
1127            return len == 1;
1128
1129        case BT_HAVE:
1130        case BT_SUGGEST:
1131            return len == 5;
1132
1133        case BT_BITFIELD:
1134            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1135
1136        case BT_REQUEST:
1137        case BT_CANCEL:
1138        case BT_REJECT:
1139            return len == 13;
1140
1141        case BT_PIECE:
1142            return len > 9 && len <= 16393;
1143
1144        case BT_PORT:
1145            return len == 3;
1146
1147        case BT_LTEP:
1148            return len >= 2;
1149
1150        default:
1151            return FALSE;
1152    }
1153}
1154
1155static int clientGotBlock( tr_peermsgs *               msgs,
1156                           const uint8_t *             block,
1157                           const struct peer_request * req );
1158
1159static int
1160readBtPiece( tr_peermsgs      * msgs,
1161             struct evbuffer  * inbuf,
1162             size_t             inlen,
1163             size_t           * setme_piece_bytes_read )
1164{
1165    struct peer_request * req = &msgs->incoming.blockReq;
1166
1167    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1168    dbgmsg( msgs, "In readBtPiece" );
1169
1170    if( !req->length )
1171    {
1172        if( inlen < 8 )
1173            return READ_LATER;
1174
1175        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
1176        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
1177        req->length = msgs->incoming.length - 9;
1178        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index,
1179                req->offset,
1180                req->length );
1181        return READ_NOW;
1182    }
1183    else
1184    {
1185        int          err;
1186
1187        /* read in another chunk of data */
1188        const size_t nLeft = req->length - EVBUFFER_LENGTH(
1189            msgs->incoming.block );
1190        size_t       n = MIN( nLeft, inlen );
1191        uint8_t *    buf = tr_new( uint8_t, n );
1192        assert( EVBUFFER_LENGTH( inbuf ) >= n );
1193        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
1194        evbuffer_add( msgs->incoming.block, buf, n );
1195        fireClientGotData( msgs, n, TRUE );
1196        *setme_piece_bytes_read += n;
1197        tr_free( buf );
1198        dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
1199               (int)n, req->index, req->offset, req->length,
1200               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1201        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1202            return READ_LATER;
1203
1204        /* we've got the whole block ... process it */
1205        err = clientGotBlock( msgs, EVBUFFER_DATA(
1206                                  msgs->incoming.block ), req );
1207
1208        /* cleanup */
1209        evbuffer_drain( msgs->incoming.block,
1210                       EVBUFFER_LENGTH( msgs->incoming.block ) );
1211        req->length = 0;
1212        msgs->state = AWAITING_BT_LENGTH;
1213        if( !err )
1214            return READ_NOW;
1215        else
1216        {
1217            fireError( msgs, err );
1218            return READ_ERR;
1219        }
1220    }
1221}
1222
1223static int
1224readBtMessage( tr_peermsgs *     msgs,
1225               struct evbuffer * inbuf,
1226               size_t            inlen )
1227{
1228    uint32_t      ui32;
1229    uint32_t      msglen = msgs->incoming.length;
1230    const uint8_t id = msgs->incoming.id;
1231    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1232
1233    --msglen; /* id length */
1234
1235    if( inlen < msglen )
1236        return READ_LATER;
1237
1238    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id,
1239            (int)msglen,
1240            (int)inlen );
1241
1242    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1243    {
1244        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d",
1245                (int)id, (int)msglen );
1246        fireError( msgs, EMSGSIZE );
1247        return READ_ERR;
1248    }
1249
1250    switch( id )
1251    {
1252        case BT_CHOKE:
1253            dbgmsg( msgs, "got Choke" );
1254            msgs->info->clientIsChoked = 1;
1255            cancelAllRequestsToPeer( msgs );
1256            cancelAllRequestsToClient( msgs );
1257            break;
1258
1259        case BT_UNCHOKE:
1260            dbgmsg( msgs, "got Unchoke" );
1261            msgs->info->clientIsChoked = 0;
1262            fireNeedReq( msgs );
1263            break;
1264
1265        case BT_INTERESTED:
1266            dbgmsg( msgs, "got Interested" );
1267            msgs->info->peerIsInterested = 1;
1268            break;
1269
1270        case BT_NOT_INTERESTED:
1271            dbgmsg( msgs, "got Not Interested" );
1272            msgs->info->peerIsInterested = 0;
1273            break;
1274
1275        case BT_HAVE:
1276            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1277            dbgmsg( msgs, "got Have: %u", ui32 );
1278            if( tr_bitfieldAdd( msgs->info->have, ui32 ) )
1279            {
1280                fireError( msgs, ERANGE );
1281                return READ_ERR;
1282            }
1283            updatePeerProgress( msgs );
1284            tr_rcTransferred( msgs->torrent->swarmSpeed,
1285                              msgs->torrent->info.pieceSize );
1286            break;
1287
1288        case BT_BITFIELD:
1289        {
1290            dbgmsg( msgs, "got a bitfield" );
1291            msgs->peerSentBitfield = 1;
1292            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits,
1293                                msglen );
1294            updatePeerProgress( msgs );
1295            fireNeedReq( msgs );
1296            break;
1297        }
1298
1299        case BT_REQUEST:
1300        {
1301            struct peer_request r;
1302            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1303            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1304            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1305            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1306            peerMadeRequest( msgs, &r );
1307            break;
1308        }
1309
1310        case BT_CANCEL:
1311        {
1312            struct peer_request r;
1313            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1314            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1315            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1316            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1317            reqListRemove( &msgs->peerAskedFor, &r );
1318            break;
1319        }
1320
1321        case BT_PIECE:
1322            assert( 0 ); /* handled elsewhere! */
1323            break;
1324
1325        case BT_PORT:
1326            dbgmsg( msgs, "Got a BT_PORT" );
1327            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1328            break;
1329
1330        case BT_SUGGEST:
1331        {
1332            dbgmsg( msgs, "Got a BT_SUGGEST" );
1333            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1334            /* we don't do anything with this yet */
1335            break;
1336        }
1337
1338        case BT_HAVE_ALL:
1339            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1340            tr_bitfieldAddRange( msgs->info->have, 0,
1341                                 msgs->torrent->info.pieceCount );
1342            updatePeerProgress( msgs );
1343            break;
1344
1345
1346        case BT_HAVE_NONE:
1347            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1348            tr_bitfieldClear( msgs->info->have );
1349            updatePeerProgress( msgs );
1350            break;
1351
1352        case BT_REJECT:
1353        {
1354            struct peer_request r;
1355            dbgmsg( msgs, "Got a BT_REJECT" );
1356            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1357            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1358            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1359            reqListRemove( &msgs->clientAskedFor, &r );
1360            break;
1361        }
1362
1363        case BT_LTEP:
1364            dbgmsg( msgs, "Got a BT_LTEP" );
1365            parseLtep( msgs, msglen, inbuf );
1366            break;
1367
1368        default:
1369            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1370            tr_peerIoDrain( msgs->io, inbuf, msglen );
1371            break;
1372    }
1373
1374    assert( msglen + 1 == msgs->incoming.length );
1375    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1376
1377    msgs->state = AWAITING_BT_LENGTH;
1378    return READ_NOW;
1379}
1380
1381static void
1382decrementDownloadedCount( tr_peermsgs * msgs,
1383                          uint32_t      byteCount )
1384{
1385    tr_torrent * tor = msgs->torrent;
1386
1387    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1388}
1389
1390static void
1391clientGotUnwantedBlock( tr_peermsgs *               msgs,
1392                        const struct peer_request * req )
1393{
1394    decrementDownloadedCount( msgs, req->length );
1395}
1396
1397static void
1398addPeerToBlamefield( tr_peermsgs * msgs,
1399                     uint32_t      index )
1400{
1401    if( !msgs->info->blame )
1402        msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1403    tr_bitfieldAdd( msgs->info->blame, index );
1404}
1405
1406/* returns 0 on success, or an errno on failure */
1407static int
1408clientGotBlock( tr_peermsgs *               msgs,
1409                const uint8_t *             data,
1410                const struct peer_request * req )
1411{
1412    int                    err;
1413    tr_torrent *           tor = msgs->torrent;
1414    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1415
1416    assert( msgs );
1417    assert( req );
1418
1419    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) )
1420    {
1421        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1422                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1423        return EMSGSIZE;
1424    }
1425
1426    /* save the block */
1427    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset,
1428            req->length );
1429
1430    /**
1431    *** Remove the block from our `we asked for this' list
1432    **/
1433
1434    if( !reqListRemove( &msgs->clientAskedFor, req ) )
1435    {
1436        clientGotUnwantedBlock( msgs, req );
1437        dbgmsg( msgs, "we didn't ask for this message..." );
1438        return 0;
1439    }
1440
1441    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1442            msgs->clientAskedFor.count );
1443
1444    /**
1445    *** Error checks
1446    **/
1447
1448    if( tr_cpBlockIsComplete( tor->completion, block ) )
1449    {
1450        dbgmsg( msgs, "we have this block already..." );
1451        clientGotUnwantedBlock( msgs, req );
1452        return 0;
1453    }
1454
1455    /**
1456    ***  Save the block
1457    **/
1458
1459    msgs->info->peerSentPieceDataAt = time( NULL );
1460    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1461        return err;
1462
1463    addPeerToBlamefield( msgs, req->index );
1464    fireGotBlock( msgs, req );
1465    return 0;
1466}
1467
1468static int peerPulse( void * vmsgs );
1469
1470static void
1471didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1472{
1473    tr_peermsgs * msgs = vmsgs;
1474    firePeerGotData( msgs, bytesWritten, wasPieceData );
1475    peerPulse( msgs );
1476}
1477
1478static ReadState
1479canRead( struct tr_iobuf * iobuf, void * vmsgs, size_t * piece )
1480{
1481    ReadState         ret;
1482    tr_peermsgs *     msgs = vmsgs;
1483    struct evbuffer * in = tr_iobuf_input( iobuf );
1484    const size_t      inlen = EVBUFFER_LENGTH( in );
1485
1486    if( !inlen )
1487    {
1488        ret = READ_LATER;
1489    }
1490    else if( msgs->state == AWAITING_BT_PIECE )
1491    {
1492        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1493    }
1494    else switch( msgs->state )
1495    {
1496        case AWAITING_BT_LENGTH:
1497            ret = readBtLength ( msgs, in, inlen ); break;
1498
1499        case AWAITING_BT_ID:
1500            ret = readBtId     ( msgs, in, inlen ); break;
1501
1502        case AWAITING_BT_MESSAGE:
1503            ret = readBtMessage( msgs, in, inlen ); break;
1504
1505        default:
1506            assert( 0 );
1507    }
1508
1509    /* log the raw data that was read */
1510    if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
1511        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1512
1513    return ret;
1514}
1515
1516/**
1517***
1518**/
1519
1520static int
1521ratePulse( void * vpeer )
1522{
1523    tr_peermsgs * peer = vpeer;
1524    const double rateToClient = tr_peerGetPieceSpeed( peer->info,
1525                                                      TR_PEER_TO_CLIENT );
1526    const int estimatedBlocksInNext30Seconds =
1527                  ( rateToClient * 30 * 1024 ) / peer->torrent->blockSize;
1528
1529    peer->minActiveRequests = 8;
1530    peer->maxActiveRequests = peer->minActiveRequests + estimatedBlocksInNext30Seconds;
1531    return TRUE;
1532}
1533
1534static int
1535popNextRequest( tr_peermsgs *         msgs,
1536                struct peer_request * setme )
1537{
1538    return reqListPop( &msgs->peerAskedFor, setme );
1539}
1540
1541static size_t
1542fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1543{
1544    size_t bytesWritten = 0;
1545    struct peer_request req;
1546    const int haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1547
1548    /**
1549    ***  Protocol messages
1550    **/
1551
1552    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1553    {
1554        dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1555        msgs->outMessagesBatchedAt = now;
1556    }
1557    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1558    {
1559        const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
1560        /* flush the protocol messages */
1561        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->io, len );
1562        tr_peerIoWriteBuf( msgs->io, msgs->outMessages, FALSE );
1563        msgs->clientSentAnythingAt = now;
1564        msgs->outMessagesBatchedAt = 0;
1565        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1566        bytesWritten +=  len;
1567    }
1568
1569    /**
1570    ***  Blocks
1571    **/
1572
1573    if( ( tr_peerIoGetWriteBufferSpace( msgs->io ) >= msgs->torrent->blockSize )
1574        && popNextRequest( msgs, &req )
1575        && requestIsValid( msgs, &req )
1576        && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) )
1577    {
1578        /* send a block */
1579        uint8_t * buf = tr_new( uint8_t, req.length );
1580        const int err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf );
1581        if( err ) {
1582            fireError( msgs, err );
1583            bytesWritten = 0;
1584            msgs = NULL;
1585        } else {
1586            tr_peerIo * io = msgs->io;
1587            struct evbuffer * out = evbuffer_new( );
1588            dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1589            tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1590            tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1591            tr_peerIoWriteUint32( io, out, req.index );
1592            tr_peerIoWriteUint32( io, out, req.offset );
1593            tr_peerIoWriteBytes ( io, out, buf, req.length );
1594            tr_peerIoWriteBuf( io, out, TRUE );
1595            bytesWritten += EVBUFFER_LENGTH( out );
1596            evbuffer_free( out );
1597            msgs->clientSentAnythingAt = now;
1598        }
1599        tr_free( buf );
1600    }
1601
1602    /**
1603    ***  Keepalive
1604    **/
1605
1606    if( ( msgs != NULL )
1607        && ( msgs->clientSentAnythingAt != 0 )
1608        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1609    {
1610        dbgmsg( msgs, "sending a keepalive message" );
1611        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1612        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1613    }
1614
1615    return bytesWritten;
1616}
1617
1618static int
1619peerPulse( void * vmsgs )
1620{
1621    tr_peermsgs * msgs = vmsgs;
1622    const time_t  now = time( NULL );
1623
1624    ratePulse( msgs );
1625
1626    pumpRequestQueue( msgs, now );
1627    expireOldRequests( msgs, now );
1628
1629    for( ;; )
1630        if( fillOutputBuffer( msgs, now ) < 1 )
1631            break;
1632
1633    return TRUE; /* loop forever */
1634}
1635
1636void
1637tr_peerMsgsPulse( tr_peermsgs * msgs )
1638{
1639    if( msgs != NULL )
1640        peerPulse( msgs );
1641}
1642
1643static void
1644gotError( struct tr_iobuf  * iobuf UNUSED,
1645          short              what,
1646          void             * vmsgs )
1647{
1648    if( what & EVBUFFER_TIMEOUT )
1649        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1650    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1651        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1652               what, errno, tr_strerror( errno ) );
1653    fireError( vmsgs, ENOTCONN );
1654}
1655
1656static void
1657sendBitfield( tr_peermsgs * msgs )
1658{
1659    struct evbuffer * out = msgs->outMessages;
1660    tr_bitfield *     field;
1661    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
1662    size_t            i;
1663    size_t            lazyCount = 0;
1664
1665    field = tr_bitfieldDup( tr_cpPieceBitfield( msgs->torrent->completion ) );
1666
1667    if( tr_sessionIsLazyBitfieldEnabled( msgs->session ) )
1668    {
1669        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
1670            speed over a truly random sample -- let's limit the pool size to
1671            the first 1000 pieces so large torrents don't bog things down */
1672        size_t poolSize;
1673        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
1674        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
1675
1676        /* build the pool */
1677        for( i=poolSize=0; i<maxPoolSize; ++i )
1678            if( tr_bitfieldHas( field, i ) )
1679                pool[poolSize++] = i;
1680
1681        /* pull random piece indices from the pool */
1682        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
1683        {
1684            const int pos = tr_cryptoWeakRandInt( poolSize );
1685            const tr_piece_index_t piece = pool[pos];
1686            tr_bitfieldRem( field, piece );
1687            lazyPieces[lazyCount++] = piece;
1688            pool[pos] = pool[--poolSize];
1689        }
1690
1691        /* cleanup */
1692        tr_free( pool );
1693    }
1694
1695    tr_peerIoWriteUint32( msgs->io, out,
1696                          sizeof( uint8_t ) + field->byteCount );
1697    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1698    tr_peerIoWriteBytes ( msgs->io, out, field->bits, field->byteCount );
1699    dbgmsg( msgs, "sending bitfield... outMessage size is now %d",
1700           (int)EVBUFFER_LENGTH( out ) );
1701    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1702
1703    for( i = 0; i < lazyCount; ++i )
1704        protocolSendHave( msgs, lazyPieces[i] );
1705
1706    tr_bitfieldFree( field );
1707}
1708
1709/**
1710***
1711**/
1712
1713/* some peers give us error messages if we send
1714   more than this many peers in a single pex message
1715   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1716#define MAX_PEX_ADDED 50
1717#define MAX_PEX_DROPPED 50
1718
1719typedef struct
1720{
1721    tr_pex *  added;
1722    tr_pex *  dropped;
1723    tr_pex *  elements;
1724    int       addedCount;
1725    int       droppedCount;
1726    int       elementCount;
1727}
1728PexDiffs;
1729
1730static void
1731pexAddedCb( void * vpex,
1732            void * userData )
1733{
1734    PexDiffs * diffs = userData;
1735    tr_pex *   pex = vpex;
1736
1737    if( diffs->addedCount < MAX_PEX_ADDED )
1738    {
1739        diffs->added[diffs->addedCount++] = *pex;
1740        diffs->elements[diffs->elementCount++] = *pex;
1741    }
1742}
1743
1744static void
1745pexDroppedCb( void * vpex,
1746              void * userData )
1747{
1748    PexDiffs * diffs = userData;
1749    tr_pex *   pex = vpex;
1750
1751    if( diffs->droppedCount < MAX_PEX_DROPPED )
1752    {
1753        diffs->dropped[diffs->droppedCount++] = *pex;
1754    }
1755}
1756
1757static void
1758pexElementCb( void * vpex,
1759              void * userData )
1760{
1761    PexDiffs * diffs = userData;
1762    tr_pex *   pex = vpex;
1763
1764    diffs->elements[diffs->elementCount++] = *pex;
1765}
1766
1767/* TODO: ipv6 pex */
1768static void
1769sendPex( tr_peermsgs * msgs )
1770{
1771    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
1772    {
1773        PexDiffs diffs;
1774        tr_pex * newPex = NULL;
1775        const int newCount = tr_peerMgrGetPeers( msgs->session->peerMgr,
1776                                                 msgs->torrent->info.hash,
1777                                                 &newPex );
1778
1779        /* build the diffs */
1780        diffs.added = tr_new( tr_pex, newCount );
1781        diffs.addedCount = 0;
1782        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1783        diffs.droppedCount = 0;
1784        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1785        diffs.elementCount = 0;
1786        tr_set_compare( msgs->pex, msgs->pexCount,
1787                        newPex, newCount,
1788                        tr_pexCompare, sizeof( tr_pex ),
1789                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
1790        dbgmsg(
1791            msgs,
1792            "pex: old peer count %d, new peer count %d, added %d, removed %d",
1793            msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1794
1795        if( !diffs.addedCount && !diffs.droppedCount )
1796        {
1797            tr_free( diffs.elements );
1798        }
1799        else
1800        {
1801            int  i;
1802            tr_benc val;
1803            char * benc;
1804            int bencLen;
1805            uint8_t * tmp, *walk;
1806            struct evbuffer * out = msgs->outMessages;
1807
1808            /* update peer */
1809            tr_free( msgs->pex );
1810            msgs->pex = diffs.elements;
1811            msgs->pexCount = diffs.elementCount;
1812
1813            /* build the pex payload */
1814            tr_bencInitDict( &val, 3 );
1815
1816            /* "added" */
1817            tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1818            for( i = 0; i < diffs.addedCount; ++i ) {
1819                memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
1820                memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1821            }
1822            assert( ( walk - tmp ) == diffs.addedCount * 6 );
1823            tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
1824            tr_free( tmp );
1825
1826            /* "added.f" */
1827            tmp = walk = tr_new( uint8_t, diffs.addedCount );
1828            for( i = 0; i < diffs.addedCount; ++i )
1829                *walk++ = diffs.added[i].flags;
1830            assert( ( walk - tmp ) == diffs.addedCount );
1831            tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
1832            tr_free( tmp );
1833
1834            /* "dropped" */
1835            tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1836            for( i = 0; i < diffs.droppedCount; ++i ) {
1837                memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
1838                memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1839            }
1840            assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1841            tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
1842            tr_free( tmp );
1843
1844            /* write the pex message */
1845            benc = tr_bencSave( &val, &bencLen );
1846            tr_peerIoWriteUint32( msgs->io, out, 2 * sizeof( uint8_t ) + bencLen );
1847            tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
1848            tr_peerIoWriteUint8 ( msgs->io, out, msgs->ut_pex_id );
1849            tr_peerIoWriteBytes ( msgs->io, out, benc, bencLen );
1850            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1851            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
1852
1853            tr_free( benc );
1854            tr_bencFree( &val );
1855        }
1856
1857        /* cleanup */
1858        tr_free( diffs.added );
1859        tr_free( diffs.dropped );
1860        tr_free( newPex );
1861
1862        msgs->clientSentPexAt = time( NULL );
1863    }
1864}
1865
1866static int
1867pexPulse( void * vpeer )
1868{
1869    sendPex( vpeer );
1870    return TRUE;
1871}
1872
1873/**
1874***
1875**/
1876
1877tr_peermsgs*
1878tr_peerMsgsNew( struct tr_torrent * torrent,
1879                struct tr_peer *    info,
1880                tr_delivery_func    func,
1881                void *              userData,
1882                tr_publisher_tag *  setme )
1883{
1884    tr_peermsgs * m;
1885
1886    assert( info );
1887    assert( info->io );
1888
1889    m = tr_new0( tr_peermsgs, 1 );
1890    m->publisher = tr_publisherNew( );
1891    m->info = info;
1892    m->session = torrent->session;
1893    m->torrent = torrent;
1894    m->io = info->io;
1895    m->info->clientIsChoked = 1;
1896    m->info->peerIsChoked = 1;
1897    m->info->clientIsInterested = 0;
1898    m->info->peerIsInterested = 0;
1899    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1900    m->state = AWAITING_BT_LENGTH;
1901    m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL );
1902    m->outMessages = evbuffer_new( );
1903    m->outMessagesBatchedAt = 0;
1904    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1905    m->incoming.block = evbuffer_new( );
1906    m->peerAllowedPieces = NULL;
1907    m->peerAskedFor = REQUEST_LIST_INIT;
1908    m->clientAskedFor = REQUEST_LIST_INIT;
1909    m->clientWillAskFor = REQUEST_LIST_INIT;
1910    *setme = tr_publisherSubscribe( m->publisher, func, userData );
1911
1912    if( tr_peerIoSupportsLTEP( m->io ) )
1913        sendLtepHandshake( m );
1914
1915    sendBitfield( m );
1916
1917    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of
1918                                             inactivity */
1919    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
1920    ratePulse( m );
1921
1922    return m;
1923}
1924
1925void
1926tr_peerMsgsFree( tr_peermsgs* msgs )
1927{
1928    if( msgs )
1929    {
1930        tr_timerFree( &msgs->pexTimer );
1931        tr_publisherFree( &msgs->publisher );
1932        reqListClear( &msgs->clientWillAskFor );
1933        reqListClear( &msgs->clientAskedFor );
1934        reqListClear( &msgs->peerAskedFor );
1935
1936        tr_bitfieldFree( msgs->peerAllowedPieces );
1937        evbuffer_free( msgs->incoming.block );
1938        evbuffer_free( msgs->outMessages );
1939        tr_free( msgs->pex );
1940
1941        memset( msgs, ~0, sizeof( tr_peermsgs ) );
1942        tr_free( msgs );
1943    }
1944}
1945
1946void
1947tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
1948                        tr_publisher_tag tag )
1949{
1950    tr_publisherUnsubscribe( peer->publisher, tag );
1951}
1952
Note: See TracBrowser for help on using the repository browser.