source: trunk/libtransmission/peer-msgs.c @ 7195

Last change on this file since 7195 was 7195, checked in by charles, 12 years ago

(libT) patch from jhujhiti to add ipv6 support.

  • Property svn:keywords set to Date Rev Author Id
File size: 53.6 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 7195 2008-11-30 00:47:18Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <event.h>
21
22#include "transmission.h"
23#include "bencode.h"
24#include "completion.h"
25#include "crypto.h"
26#include "inout.h"
27#include "iobuf.h"
28#ifdef WIN32
29#include "net.h" /* for ECONN */
30#endif
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-mgr-private.h"
34#include "peer-msgs.h"
35#include "ratecontrol.h"
36#include "stats.h"
37#include "torrent.h"
38#include "trevent.h"
39#include "utils.h"
40
41/**
42***
43**/
44
45enum
46{
47    BT_CHOKE                = 0,
48    BT_UNCHOKE              = 1,
49    BT_INTERESTED           = 2,
50    BT_NOT_INTERESTED       = 3,
51    BT_HAVE                 = 4,
52    BT_BITFIELD             = 5,
53    BT_REQUEST              = 6,
54    BT_PIECE                = 7,
55    BT_CANCEL               = 8,
56    BT_PORT                 = 9,
57    BT_SUGGEST              = 13,
58    BT_HAVE_ALL             = 14,
59    BT_HAVE_NONE            = 15,
60    BT_REJECT               = 16,
61    BT_LTEP                 = 20,
62
63    LTEP_HANDSHAKE          = 0,
64
65    TR_LTEP_PEX             = 1,
66
67    MIN_CHOKE_PERIOD_SEC    = ( 10 ),
68
69    /* idle seconds before we send a keepalive */
70    KEEPALIVE_INTERVAL_SECS = 100,
71
72    PEX_INTERVAL            = ( 90 * 1000 ), /* msec between sendPex() calls */
73    PEER_PULSE_INTERVAL     = ( 250 ),       /* msec between peerPulse() calls
74                                               */
75
76    MAX_QUEUE_SIZE          = ( 100 ),
77
78    /* how long an unsent request can stay queued before it's returned
79       back to the peer-mgr's pool of requests */
80    QUEUED_REQUEST_TTL_SECS = 20,
81
82    /* how long a sent request can stay queued before it's returned
83       back to the peer-mgr's pool of requests */
84    SENT_REQUEST_TTL_SECS = 240,
85
86    /* used in lowering the outMessages queue period */
87    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
88    HIGH_PRIORITY_INTERVAL_SECS = 2,
89    LOW_PRIORITY_INTERVAL_SECS = 20,
90
91    /* number of pieces to remove from the bitfield when
92     * lazy bitfields are turned on */
93    LAZY_PIECE_COUNT = 26
94};
95
96/**
97***  REQUEST MANAGEMENT
98**/
99
100enum
101{
102    AWAITING_BT_LENGTH,
103    AWAITING_BT_ID,
104    AWAITING_BT_MESSAGE,
105    AWAITING_BT_PIECE
106};
107
108struct peer_request
109{
110    uint32_t    index;
111    uint32_t    offset;
112    uint32_t    length;
113    time_t      time_requested;
114};
115
116static int
117compareRequest( const void * va,
118                const void * vb )
119{
120    const struct peer_request * a = va;
121    const struct peer_request * b = vb;
122
123    if( a->index != b->index )
124        return a->index < b->index ? -1 : 1;
125
126    if( a->offset != b->offset )
127        return a->offset < b->offset ? -1 : 1;
128
129    if( a->length != b->length )
130        return a->length < b->length ? -1 : 1;
131
132    return 0;
133}
134
135struct request_list
136{
137    uint16_t               count;
138    uint16_t               max;
139    struct peer_request *  requests;
140};
141
142static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
143
144static void
145reqListReserve( struct request_list * list,
146                uint16_t              max )
147{
148    if( list->max < max )
149    {
150        list->max = max;
151        list->requests = tr_renew( struct peer_request,
152                                   list->requests,
153                                   list->max );
154    }
155}
156
157static void
158reqListClear( struct request_list * list )
159{
160    tr_free( list->requests );
161    *list = REQUEST_LIST_INIT;
162}
163
164static void
165reqListCopy( struct request_list *       dest,
166             const struct request_list * src )
167{
168    dest->count = dest->max = src->count;
169    dest->requests =
170        tr_memdup( src->requests, dest->count * sizeof( struct peer_request ) );
171}
172
173static void
174reqListRemoveOne( struct request_list * list,
175                  int                   i )
176{
177    assert( 0 <= i && i < list->count );
178
179    memmove( &list->requests[i],
180            &list->requests[i + 1],
181            sizeof( struct peer_request ) * ( --list->count - i ) );
182}
183
184static void
185reqListAppend( struct request_list *       list,
186               const struct peer_request * req )
187{
188    if( ++list->count >= list->max )
189        reqListReserve( list, list->max + 8 );
190
191    list->requests[list->count - 1] = *req;
192}
193
194static int
195reqListPop( struct request_list * list,
196            struct peer_request * setme )
197{
198    int success;
199
200    if( !list->count )
201        success = FALSE;
202    else {
203        *setme = list->requests[0];
204        reqListRemoveOne( list, 0 );
205        success = TRUE;
206    }
207
208    return success;
209}
210
211static int
212reqListFind( struct request_list *       list,
213             const struct peer_request * key )
214{
215    uint16_t i;
216
217    for( i = 0; i < list->count; ++i )
218        if( !compareRequest( key, list->requests + i ) )
219            return i;
220
221    return -1;
222}
223
224static int
225reqListRemove( struct request_list *       list,
226               const struct peer_request * key )
227{
228    int success;
229    const int i = reqListFind( list, key );
230
231    if( i < 0 )
232        success = FALSE;
233    else {
234        reqListRemoveOne( list, i );
235        success = TRUE;
236    }
237
238    return success;
239}
240
241/**
242***
243**/
244
245/* this is raw, unchanged data from the peer regarding
246 * the current message that it's sending us. */
247struct tr_incoming
248{
249    uint8_t                id;
250    uint32_t               length; /* includes the +1 for id length */
251    struct peer_request    blockReq; /* metadata for incoming blocks */
252    struct evbuffer *      block; /* piece data for incoming blocks */
253};
254
255struct tr_peermsgs
256{
257    tr_bool         peerSentBitfield;
258    tr_bool         peerSupportsPex;
259    tr_bool         clientSentLtepHandshake;
260    tr_bool         peerSentLtepHandshake;
261
262    uint8_t         state;
263    uint8_t         ut_pex_id;
264    uint16_t        pexCount;
265    uint16_t        minActiveRequests;
266    uint16_t        maxActiveRequests;
267
268    /* how long the outMessages batch should be allowed to grow before
269     * it's flushed -- some messages (like requests >:) should be sent
270     * very quickly; others aren't as urgent. */
271    int                    outMessagesBatchPeriod;
272
273    tr_peer *              info;
274
275    tr_session *           session;
276    tr_torrent *           torrent;
277    tr_peerIo *            io;
278
279    tr_publisher_t *       publisher;
280
281    struct evbuffer *      outMessages; /* all the non-piece messages */
282
283    struct request_list    peerAskedFor;
284    struct request_list    clientAskedFor;
285    struct request_list    clientWillAskFor;
286
287    tr_timer *             pexTimer;
288
289    time_t                 clientSentPexAt;
290    time_t                 clientSentAnythingAt;
291
292    /* when we started batching the outMessages */
293    time_t                outMessagesBatchedAt;
294
295    tr_bitfield *         peerAllowedPieces;
296
297    struct tr_incoming    incoming;
298
299    tr_pex *              pex;
300};
301
302/**
303***
304**/
305
306static void
307myDebug( const char *               file,
308         int                        line,
309         const struct tr_peermsgs * msgs,
310         const char *               fmt,
311         ... )
312{
313    FILE * fp = tr_getLog( );
314
315    if( fp )
316    {
317        va_list           args;
318        char              timestr[64];
319        struct evbuffer * buf = evbuffer_new( );
320        char *            base = tr_basename( file );
321
322        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
323                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
324                             msgs->torrent->info.name,
325                             tr_peerIoGetAddrStr( msgs->io ),
326                             msgs->info->client );
327        va_start( args, fmt );
328        evbuffer_add_vprintf( buf, fmt, args );
329        va_end( args );
330        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
331        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
332
333        tr_free( base );
334        evbuffer_free( buf );
335    }
336}
337
338#define dbgmsg( msgs, ... ) \
339    do { \
340        if( tr_deepLoggingIsActive( ) ) \
341            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
342    } while( 0 )
343
344/**
345***
346**/
347
348static void
349pokeBatchPeriod( tr_peermsgs * msgs,
350                 int           interval )
351{
352    if( msgs->outMessagesBatchPeriod > interval )
353    {
354        msgs->outMessagesBatchPeriod = interval;
355        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
356    }
357}
358
359static void
360protocolSendRequest( tr_peermsgs *               msgs,
361                     const struct peer_request * req )
362{
363    tr_peerIo *       io = msgs->io;
364    struct evbuffer * out = msgs->outMessages;
365
366    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
367    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
368    tr_peerIoWriteUint32( io, out, req->index );
369    tr_peerIoWriteUint32( io, out, req->offset );
370    tr_peerIoWriteUint32( io, out, req->length );
371    dbgmsg( msgs, "requesting %u:%u->%u... outMessage size is now %d",
372           req->index, req->offset, req->length, (int)EVBUFFER_LENGTH( out ) );
373    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
374}
375
376static void
377protocolSendCancel( tr_peermsgs *               msgs,
378                    const struct peer_request * req )
379{
380    tr_peerIo *       io = msgs->io;
381    struct evbuffer * out = msgs->outMessages;
382
383    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
384    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
385    tr_peerIoWriteUint32( io, out, req->index );
386    tr_peerIoWriteUint32( io, out, req->offset );
387    tr_peerIoWriteUint32( io, out, req->length );
388    dbgmsg( msgs, "cancelling %u:%u->%u... outMessage size is now %d",
389           req->index, req->offset, req->length, (int)EVBUFFER_LENGTH( out ) );
390    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
391}
392
393static void
394protocolSendHave( tr_peermsgs * msgs,
395                  uint32_t      index )
396{
397    tr_peerIo *       io = msgs->io;
398    struct evbuffer * out = msgs->outMessages;
399
400    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + sizeof( uint32_t ) );
401    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
402    tr_peerIoWriteUint32( io, out, index );
403    dbgmsg( msgs, "sending Have %u.. outMessage size is now %d",
404           index, (int)EVBUFFER_LENGTH( out ) );
405    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
406}
407
408static void
409protocolSendChoke( tr_peermsgs * msgs,
410                   int           choke )
411{
412    tr_peerIo *       io = msgs->io;
413    struct evbuffer * out = msgs->outMessages;
414
415    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
416    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
417    dbgmsg( msgs, "sending %s... outMessage size is now %d",
418           ( choke ? "Choke" : "Unchoke" ),
419           (int)EVBUFFER_LENGTH( out ) );
420    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
421}
422
423/**
424***  EVENTS
425**/
426
427static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0 };
428
429static void
430publish( tr_peermsgs *   msgs,
431         tr_peer_event * e )
432{
433    tr_publisherPublish( msgs->publisher, msgs->info, e );
434}
435
436static void
437fireError( tr_peermsgs * msgs,
438           int           err )
439{
440    tr_peer_event e = blankEvent;
441
442    e.eventType = TR_PEER_ERROR;
443    e.err = err;
444    publish( msgs, &e );
445}
446
447static void
448fireNeedReq( tr_peermsgs * msgs )
449{
450    tr_peer_event e = blankEvent;
451
452    e.eventType = TR_PEER_NEED_REQ;
453    publish( msgs, &e );
454}
455
456static void
457firePeerProgress( tr_peermsgs * msgs )
458{
459    tr_peer_event e = blankEvent;
460
461    e.eventType = TR_PEER_PEER_PROGRESS;
462    e.progress = msgs->info->progress;
463    publish( msgs, &e );
464}
465
466static void
467fireGotBlock( tr_peermsgs *               msgs,
468              const struct peer_request * req )
469{
470    tr_peer_event e = blankEvent;
471
472    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
473    e.pieceIndex = req->index;
474    e.offset = req->offset;
475    e.length = req->length;
476    publish( msgs, &e );
477}
478
479static void
480fireClientGotData( tr_peermsgs * msgs,
481                   uint32_t      length,
482                   int           wasPieceData )
483{
484    tr_peer_event e = blankEvent;
485
486    e.length = length;
487    e.eventType = TR_PEER_CLIENT_GOT_DATA;
488    e.wasPieceData = wasPieceData;
489    publish( msgs, &e );
490}
491
492static void
493firePeerGotData( tr_peermsgs  * msgs,
494                 uint32_t       length,
495                 int            wasPieceData )
496{
497    tr_peer_event e = blankEvent;
498
499    e.length = length;
500    e.eventType = TR_PEER_PEER_GOT_DATA;
501    e.wasPieceData = wasPieceData;
502
503    publish( msgs, &e );
504}
505
506static void
507fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
508{
509    tr_peer_event e = blankEvent;
510    e.eventType = TR_PEER_CANCEL;
511    e.pieceIndex = req->index;
512    e.offset = req->offset;
513    e.length = req->length;
514    publish( msgs, &e );
515}
516
517/**
518***  INTEREST
519**/
520
521static int
522isPieceInteresting( const tr_peermsgs * peer,
523                    tr_piece_index_t    piece )
524{
525    const tr_torrent * torrent = peer->torrent;
526
527    return ( !torrent->info.pieces[piece].dnd )                 /* we want it */
528           && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have
529                                                                        */
530           && ( tr_bitfieldHas( peer->info->have, piece ) );   /* peer has it */
531}
532
533/* "interested" means we'll ask for piece data if they unchoke us */
534static int
535isPeerInteresting( const tr_peermsgs * msgs )
536{
537    tr_piece_index_t    i;
538    const tr_torrent *  torrent;
539    const tr_bitfield * bitfield;
540    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
541
542    if( clientIsSeed )
543        return FALSE;
544
545    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
546        return FALSE;
547
548    torrent = msgs->torrent;
549    bitfield = tr_cpPieceBitfield( torrent->completion );
550
551    if( !msgs->info->have )
552        return TRUE;
553
554    assert( bitfield->byteCount == msgs->info->have->byteCount );
555
556    for( i = 0; i < torrent->info.pieceCount; ++i )
557        if( isPieceInteresting( msgs, i ) )
558            return TRUE;
559
560    return FALSE;
561}
562
563static void
564sendInterest( tr_peermsgs * msgs,
565              int           weAreInterested )
566{
567    struct evbuffer * out = msgs->outMessages;
568
569    assert( msgs );
570    assert( weAreInterested == 0 || weAreInterested == 1 );
571
572    msgs->info->clientIsInterested = weAreInterested;
573    dbgmsg( msgs, "Sending %s",
574            weAreInterested ? "Interested" : "Not Interested" );
575    tr_peerIoWriteUint32( msgs->io, out, sizeof( uint8_t ) );
576    tr_peerIoWriteUint8 (
577        msgs->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
578    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
579    dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH( out ) );
580}
581
582static void
583updateInterest( tr_peermsgs * msgs )
584{
585    const int i = isPeerInteresting( msgs );
586
587    if( i != msgs->info->clientIsInterested )
588        sendInterest( msgs, i );
589    if( i )
590        fireNeedReq( msgs );
591}
592
593static void
594cancelAllRequestsToClient( tr_peermsgs * msgs )
595{
596    reqListClear( &msgs->peerAskedFor );
597}
598
599void
600tr_peerMsgsSetChoke( tr_peermsgs * msgs,
601                     int           choke )
602{
603    const time_t now = time( NULL );
604    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
605
606    assert( msgs );
607    assert( msgs->info );
608    assert( choke == 0 || choke == 1 );
609
610    if( msgs->info->chokeChangedAt > fibrillationTime )
611    {
612        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation",
613                choke );
614    }
615    else if( msgs->info->peerIsChoked != choke )
616    {
617        msgs->info->peerIsChoked = choke;
618        if( choke )
619            cancelAllRequestsToClient( msgs );
620        protocolSendChoke( msgs, choke );
621        msgs->info->chokeChangedAt = now;
622    }
623}
624
625/**
626***
627**/
628
629void
630tr_peerMsgsHave( tr_peermsgs * msgs,
631                 uint32_t      index )
632{
633    protocolSendHave( msgs, index );
634
635    /* since we have more pieces now, we might not be interested in this peer */
636    updateInterest( msgs );
637}
638
639/**
640***
641**/
642
643static int
644reqIsValid( const tr_peermsgs * peer,
645            uint32_t            index,
646            uint32_t            offset,
647            uint32_t            length )
648{
649    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
650}
651
652static int
653requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
654{
655    return reqIsValid( msgs, req->index, req->offset, req->length );
656}
657
658static void
659expireOldRequests( tr_peermsgs * msgs, const time_t now  )
660{
661    int                 i;
662    time_t              oldestAllowed;
663    struct request_list tmp = REQUEST_LIST_INIT;
664
665    /* cancel requests that have been queued for too long */
666    oldestAllowed = now - QUEUED_REQUEST_TTL_SECS;
667    reqListCopy( &tmp, &msgs->clientWillAskFor );
668    for( i = 0; i < tmp.count; ++i )
669    {
670        const struct peer_request * req = &tmp.requests[i];
671        if( req->time_requested < oldestAllowed )
672            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
673    }
674    reqListClear( &tmp );
675
676    /* cancel requests that were sent too long ago */
677    oldestAllowed = now - SENT_REQUEST_TTL_SECS;
678    reqListCopy( &tmp, &msgs->clientAskedFor );
679    for( i = 0; i < tmp.count; ++i )
680    {
681        const struct peer_request * req = &tmp.requests[i];
682        if( req->time_requested < oldestAllowed )
683            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
684    }
685    reqListClear( &tmp );
686}
687
688static void
689pumpRequestQueue( tr_peermsgs * msgs, const time_t now )
690{
691    const int           max = msgs->maxActiveRequests;
692    const int           min = msgs->minActiveRequests;
693    int                 sent = 0;
694    int                 count = msgs->clientAskedFor.count;
695    struct peer_request req;
696
697    if( count > min )
698        return;
699    if( msgs->info->clientIsChoked )
700        return;
701    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
702        return;
703
704    while( ( count < max ) && reqListPop( &msgs->clientWillAskFor, &req ) )
705    {
706        const tr_block_index_t block =
707            _tr_block( msgs->torrent, req.index, req.offset );
708
709        assert( requestIsValid( msgs, &req ) );
710        assert( tr_bitfieldHas( msgs->info->have, req.index ) );
711
712        /* don't ask for it if we've already got it... this block may have
713         * come in from a different peer after we cancelled a request for it */
714        if( !tr_cpBlockIsComplete( msgs->torrent->completion, block ) )
715        {
716            protocolSendRequest( msgs, &req );
717            req.time_requested = now;
718            reqListAppend( &msgs->clientAskedFor, &req );
719
720            ++count;
721            ++sent;
722        }
723    }
724
725    if( sent )
726        dbgmsg( msgs,
727                "pump sent %d requests, now have %d active and %d queued",
728                sent,
729                msgs->clientAskedFor.count,
730                msgs->clientWillAskFor.count );
731
732    if( count < max )
733        fireNeedReq( msgs );
734}
735
736static int
737requestQueueIsFull( const tr_peermsgs * msgs )
738{
739    const int req_max = msgs->maxActiveRequests;
740    return msgs->clientWillAskFor.count >= req_max;
741}
742
743tr_addreq_t
744tr_peerMsgsAddRequest( tr_peermsgs *    msgs,
745                       uint32_t         index,
746                       uint32_t         offset,
747                       uint32_t         length )
748{
749    struct peer_request req;
750
751    assert( msgs );
752    assert( msgs->torrent );
753    assert( reqIsValid( msgs, index, offset, length ) );
754
755    /**
756    ***  Reasons to decline the request
757    **/
758
759    /* don't send requests to choked clients */
760    if( msgs->info->clientIsChoked )
761    {
762        dbgmsg( msgs, "declining request because they're choking us" );
763        return TR_ADDREQ_CLIENT_CHOKED;
764    }
765
766    /* peer doesn't have this piece */
767    if( !tr_bitfieldHas( msgs->info->have, index ) )
768        return TR_ADDREQ_MISSING;
769
770    /* peer's queue is full */
771    if( requestQueueIsFull( msgs ) ) {
772        dbgmsg( msgs, "declining request because we're full" );
773        return TR_ADDREQ_FULL;
774    }
775
776    /* have we already asked for this piece? */
777    req.index = index;
778    req.offset = offset;
779    req.length = length;
780    if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
781        dbgmsg( msgs, "declining because it's a duplicate" );
782        return TR_ADDREQ_DUPLICATE;
783    }
784    if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
785        dbgmsg( msgs, "declining because it's a duplicate" );
786        return TR_ADDREQ_DUPLICATE;
787    }
788
789    /**
790    ***  Accept this request
791    **/
792
793    dbgmsg( msgs, "adding req for %"PRIu32":%"PRIu32"->%"PRIu32" to our `will request' list",
794            index, offset, length );
795    req.time_requested = time( NULL );
796    reqListAppend( &msgs->clientWillAskFor, &req );
797    return TR_ADDREQ_OK;
798}
799
800static void
801cancelAllRequestsToPeer( tr_peermsgs * msgs )
802{
803    int                 i;
804    struct request_list a = msgs->clientWillAskFor;
805    struct request_list b = msgs->clientAskedFor;
806    dbgmsg( msgs, "cancelling all requests to peer" );
807
808    msgs->clientAskedFor = REQUEST_LIST_INIT;
809    msgs->clientWillAskFor = REQUEST_LIST_INIT;
810
811    for( i=0; i<a.count; ++i )
812        fireCancelledReq( msgs, &a.requests[i] );
813
814    for( i = 0; i < b.count; ++i ) {
815        fireCancelledReq( msgs, &b.requests[i] );
816        protocolSendCancel( msgs, &b.requests[i] );
817    }
818
819    reqListClear( &a );
820    reqListClear( &b );
821}
822
823void
824tr_peerMsgsCancel( tr_peermsgs * msgs,
825                   uint32_t      pieceIndex,
826                   uint32_t      offset,
827                   uint32_t      length )
828{
829    struct peer_request req;
830
831    assert( msgs != NULL );
832    assert( length > 0 );
833
834
835    /* have we asked the peer for this piece? */
836    req.index = pieceIndex;
837    req.offset = offset;
838    req.length = length;
839
840    /* if it's only in the queue and hasn't been sent yet, free it */
841    if( reqListRemove( &msgs->clientWillAskFor, &req ) ) {
842        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32"\n", pieceIndex, offset, length );
843        fireCancelledReq( msgs, &req );
844    }
845
846    /* if it's already been sent, send a cancel message too */
847    if( reqListRemove( &msgs->clientAskedFor, &req ) ) {
848        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32"\n", pieceIndex, offset, length );
849        protocolSendCancel( msgs, &req );
850        fireCancelledReq( msgs, &req );
851    }
852}
853
854/**
855***
856**/
857
858static void
859sendLtepHandshake( tr_peermsgs * msgs )
860{
861    tr_benc           val, *m;
862    char *            buf;
863    int               len;
864    int               pex;
865    struct evbuffer * out = msgs->outMessages;
866
867    if( msgs->clientSentLtepHandshake )
868        return;
869
870    dbgmsg( msgs, "sending an ltep handshake" );
871    msgs->clientSentLtepHandshake = 1;
872
873    /* decide if we want to advertise pex support */
874    if( !tr_torrentAllowsPex( msgs->torrent ) )
875        pex = 0;
876    else if( msgs->peerSentLtepHandshake )
877        pex = msgs->peerSupportsPex ? 1 : 0;
878    else
879        pex = 1;
880
881    tr_bencInitDict( &val, 4 );
882    tr_bencDictAddInt( &val, "e",
883                       msgs->session->encryptionMode != TR_CLEAR_PREFERRED );
884    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->session ) );
885    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
886    m  = tr_bencDictAddDict( &val, "m", 1 );
887    if( pex )
888        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
889    buf = tr_bencSave( &val, &len );
890
891    tr_peerIoWriteUint32( msgs->io, out, 2 * sizeof( uint8_t ) + len );
892    tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
893    tr_peerIoWriteUint8 ( msgs->io, out, LTEP_HANDSHAKE );
894    tr_peerIoWriteBytes ( msgs->io, out, buf, len );
895    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
896    dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH( out ) );
897
898    /* cleanup */
899    tr_bencFree( &val );
900    tr_free( buf );
901}
902
903static void
904parseLtepHandshake( tr_peermsgs *     msgs,
905                    int               len,
906                    struct evbuffer * inbuf )
907{
908    int64_t   i;
909    tr_benc   val, * sub;
910    uint8_t * tmp = tr_new( uint8_t, len );
911
912    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
913    msgs->peerSentLtepHandshake = 1;
914
915    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type != TYPE_DICT )
916    {
917        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
918        tr_free( tmp );
919        return;
920    }
921
922    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
923
924    /* does the peer prefer encrypted connections? */
925    if( tr_bencDictFindInt( &val, "e", &i ) )
926        msgs->info->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
927                                            : ENCRYPTION_PREFERENCE_NO;
928
929    /* check supported messages for utorrent pex */
930    msgs->peerSupportsPex = 0;
931    if( tr_bencDictFindDict( &val, "m", &sub ) )
932    {
933        if( tr_bencDictFindInt( sub, "ut_pex", &i ) )
934        {
935            msgs->ut_pex_id = (uint8_t) i;
936            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
937            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
938        }
939    }
940
941    /* get peer's listening port */
942    if( tr_bencDictFindInt( &val, "p", &i ) )
943    {
944        msgs->info->port = htons( (uint16_t)i );
945        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
946    }
947
948    tr_bencFree( &val );
949    tr_free( tmp );
950}
951
952static void
953parseUtPex( tr_peermsgs *     msgs,
954            int               msglen,
955            struct evbuffer * inbuf )
956{
957    int                loaded = 0;
958    uint8_t *          tmp = tr_new( uint8_t, msglen );
959    tr_benc            val;
960    const tr_torrent * tor = msgs->torrent;
961    const uint8_t *    added;
962    size_t             added_len;
963
964    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
965
966    if( tr_torrentAllowsPex( tor )
967      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) )
968      && tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
969    {
970        const uint8_t * added_f = NULL;
971        tr_pex *        pex;
972        size_t          i, n;
973        size_t          added_f_len = 0;
974        tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
975        pex =
976            tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len,
977                                    &n );
978        for( i = 0; i < n; ++i )
979            tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
980                              TR_PEER_FROM_PEX, pex + i );
981        tr_free( pex );
982    }
983
984    if( loaded )
985        tr_bencFree( &val );
986    tr_free( tmp );
987}
988
989static void sendPex( tr_peermsgs * msgs );
990
991static void
992parseLtep( tr_peermsgs *     msgs,
993           int               msglen,
994           struct evbuffer * inbuf )
995{
996    uint8_t ltep_msgid;
997
998    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
999    msglen--;
1000
1001    if( ltep_msgid == LTEP_HANDSHAKE )
1002    {
1003        dbgmsg( msgs, "got ltep handshake" );
1004        parseLtepHandshake( msgs, msglen, inbuf );
1005        if( tr_peerIoSupportsLTEP( msgs->io ) )
1006        {
1007            sendLtepHandshake( msgs );
1008            sendPex( msgs );
1009        }
1010    }
1011    else if( ltep_msgid == TR_LTEP_PEX )
1012    {
1013        dbgmsg( msgs, "got ut pex" );
1014        msgs->peerSupportsPex = 1;
1015        parseUtPex( msgs, msglen, inbuf );
1016    }
1017    else
1018    {
1019        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1020        evbuffer_drain( inbuf, msglen );
1021    }
1022}
1023
1024static int
1025readBtLength( tr_peermsgs *     msgs,
1026              struct evbuffer * inbuf,
1027              size_t            inlen )
1028{
1029    uint32_t len;
1030
1031    if( inlen < sizeof( len ) )
1032        return READ_LATER;
1033
1034    tr_peerIoReadUint32( msgs->io, inbuf, &len );
1035
1036    if( len == 0 ) /* peer sent us a keepalive message */
1037        dbgmsg( msgs, "got KeepAlive" );
1038    else
1039    {
1040        msgs->incoming.length = len;
1041        msgs->state = AWAITING_BT_ID;
1042    }
1043
1044    return READ_NOW;
1045}
1046
1047static int readBtMessage( tr_peermsgs *     msgs,
1048                          struct evbuffer * inbuf,
1049                          size_t            inlen );
1050
1051static int
1052readBtId( tr_peermsgs *     msgs,
1053          struct evbuffer * inbuf,
1054          size_t            inlen )
1055{
1056    uint8_t id;
1057
1058    if( inlen < sizeof( uint8_t ) )
1059        return READ_LATER;
1060
1061    tr_peerIoReadUint8( msgs->io, inbuf, &id );
1062    msgs->incoming.id = id;
1063
1064    if( id == BT_PIECE )
1065    {
1066        msgs->state = AWAITING_BT_PIECE;
1067        return READ_NOW;
1068    }
1069    else if( msgs->incoming.length != 1 )
1070    {
1071        msgs->state = AWAITING_BT_MESSAGE;
1072        return READ_NOW;
1073    }
1074    else return readBtMessage( msgs, inbuf, inlen - 1 );
1075}
1076
1077static void
1078updatePeerProgress( tr_peermsgs * msgs )
1079{
1080    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have )
1081                           / (float)msgs->torrent->info.pieceCount;
1082    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
1083    updateInterest( msgs );
1084    firePeerProgress( msgs );
1085}
1086
1087static void
1088peerMadeRequest( tr_peermsgs *               msgs,
1089                 const struct peer_request * req )
1090{
1091    const int reqIsValid = requestIsValid( msgs, req );
1092    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete(
1093        msgs->torrent->completion, req->index );
1094    const int peerIsChoked = msgs->info->peerIsChoked;
1095
1096    if( !reqIsValid ) /* bad request */
1097    {
1098        dbgmsg( msgs, "rejecting an invalid request." );
1099    }
1100    else if( !clientHasPiece ) /* we don't have it */
1101    {
1102        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1103    }
1104    else if( peerIsChoked ) /* doesn't he know he's choked? */
1105    {
1106        tr_peerMsgsSetChoke( msgs, 1 );
1107    }
1108    else /* YAY */
1109    {
1110        reqListAppend( &msgs->peerAskedFor, req );
1111    }
1112}
1113
1114static int
1115messageLengthIsCorrect( const tr_peermsgs * msg,
1116                        uint8_t             id,
1117                        uint32_t            len )
1118{
1119    switch( id )
1120    {
1121        case BT_CHOKE:
1122        case BT_UNCHOKE:
1123        case BT_INTERESTED:
1124        case BT_NOT_INTERESTED:
1125        case BT_HAVE_ALL:
1126        case BT_HAVE_NONE:
1127            return len == 1;
1128
1129        case BT_HAVE:
1130        case BT_SUGGEST:
1131
1132        case BT_BITFIELD:
1133            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1134
1135        case BT_REQUEST:
1136        case BT_CANCEL:
1137        case BT_REJECT:
1138            return len == 13;
1139
1140        case BT_PIECE:
1141            return len > 9 && len <= 16393;
1142
1143        case BT_PORT:
1144            return len == 3;
1145
1146        case BT_LTEP:
1147            return len >= 2;
1148
1149        default:
1150            return FALSE;
1151    }
1152}
1153
1154static int clientGotBlock( tr_peermsgs *               msgs,
1155                           const uint8_t *             block,
1156                           const struct peer_request * req );
1157
1158static int
1159readBtPiece( tr_peermsgs      * msgs,
1160             struct evbuffer  * inbuf,
1161             size_t             inlen,
1162             size_t           * setme_piece_bytes_read )
1163{
1164    struct peer_request * req = &msgs->incoming.blockReq;
1165
1166    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1167    dbgmsg( msgs, "In readBtPiece" );
1168
1169    if( !req->length )
1170    {
1171        if( inlen < 8 )
1172            return READ_LATER;
1173
1174        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
1175        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
1176        req->length = msgs->incoming.length - 9;
1177        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index,
1178                req->offset,
1179                req->length );
1180        return READ_NOW;
1181    }
1182    else
1183    {
1184        int          err;
1185
1186        /* read in another chunk of data */
1187        const size_t nLeft = req->length - EVBUFFER_LENGTH(
1188            msgs->incoming.block );
1189        size_t       n = MIN( nLeft, inlen );
1190        uint8_t *    buf = tr_new( uint8_t, n );
1191        assert( EVBUFFER_LENGTH( inbuf ) >= n );
1192        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
1193        evbuffer_add( msgs->incoming.block, buf, n );
1194        fireClientGotData( msgs, n, TRUE );
1195        *setme_piece_bytes_read += n;
1196        tr_free( buf );
1197        dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
1198               (int)n, req->index, req->offset, req->length,
1199               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1200        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1201            return READ_LATER;
1202
1203        /* we've got the whole block ... process it */
1204        err = clientGotBlock( msgs, EVBUFFER_DATA(
1205                                  msgs->incoming.block ), req );
1206
1207        /* cleanup */
1208        evbuffer_drain( msgs->incoming.block,
1209                       EVBUFFER_LENGTH( msgs->incoming.block ) );
1210        req->length = 0;
1211        msgs->state = AWAITING_BT_LENGTH;
1212        if( !err )
1213            return READ_NOW;
1214        else
1215        {
1216            fireError( msgs, err );
1217            return READ_ERR;
1218        }
1219    }
1220}
1221
1222static int
1223readBtMessage( tr_peermsgs *     msgs,
1224               struct evbuffer * inbuf,
1225               size_t            inlen )
1226{
1227    uint32_t      ui32;
1228    uint32_t      msglen = msgs->incoming.length;
1229    const uint8_t id = msgs->incoming.id;
1230    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1231
1232    --msglen; /* id length */
1233
1234    if( inlen < msglen )
1235        return READ_LATER;
1236
1237    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id,
1238            (int)msglen,
1239            (int)inlen );
1240
1241    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1242    {
1243        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d",
1244                (int)id, (int)msglen );
1245        fireError( msgs, EMSGSIZE );
1246        return READ_ERR;
1247    }
1248
1249    switch( id )
1250    {
1251        case BT_CHOKE:
1252            dbgmsg( msgs, "got Choke" );
1253            msgs->info->clientIsChoked = 1;
1254            cancelAllRequestsToPeer( msgs );
1255            cancelAllRequestsToClient( msgs );
1256            break;
1257
1258        case BT_UNCHOKE:
1259            dbgmsg( msgs, "got Unchoke" );
1260            msgs->info->clientIsChoked = 0;
1261            fireNeedReq( msgs );
1262            break;
1263
1264        case BT_INTERESTED:
1265            dbgmsg( msgs, "got Interested" );
1266            msgs->info->peerIsInterested = 1;
1267            break;
1268
1269        case BT_NOT_INTERESTED:
1270            dbgmsg( msgs, "got Not Interested" );
1271            msgs->info->peerIsInterested = 0;
1272            break;
1273
1274        case BT_HAVE:
1275            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1276            dbgmsg( msgs, "got Have: %u", ui32 );
1277            if( tr_bitfieldAdd( msgs->info->have, ui32 ) )
1278                fireError( msgs, ERANGE );
1279            updatePeerProgress( msgs );
1280            tr_rcTransferred( msgs->torrent->swarmSpeed,
1281                              msgs->torrent->info.pieceSize );
1282            break;
1283
1284        case BT_BITFIELD:
1285        {
1286            dbgmsg( msgs, "got a bitfield" );
1287            msgs->peerSentBitfield = 1;
1288            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits,
1289                                msglen );
1290            updatePeerProgress( msgs );
1291            fireNeedReq( msgs );
1292            break;
1293        }
1294
1295        case BT_REQUEST:
1296        {
1297            struct peer_request r;
1298            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1299            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1300            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1301            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset,
1302                    r.length );
1303            peerMadeRequest( msgs, &r );
1304            break;
1305        }
1306
1307        case BT_CANCEL:
1308        {
1309            struct peer_request r;
1310            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1311            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1312            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1313            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset,
1314                    r.length );
1315            reqListRemove( &msgs->peerAskedFor, &r );
1316            break;
1317        }
1318
1319        case BT_PIECE:
1320            assert( 0 ); /* handled elsewhere! */
1321            break;
1322
1323        case BT_PORT:
1324            dbgmsg( msgs, "Got a BT_PORT" );
1325            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1326            break;
1327
1328        case BT_SUGGEST:
1329        {
1330            dbgmsg( msgs, "Got a BT_SUGGEST" );
1331            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1332            /* we don't do anything with this yet */
1333            break;
1334        }
1335
1336        case BT_HAVE_ALL:
1337            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1338            tr_bitfieldAddRange( msgs->info->have, 0,
1339                                 msgs->torrent->info.pieceCount );
1340            updatePeerProgress( msgs );
1341            break;
1342
1343
1344        case BT_HAVE_NONE:
1345            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1346            tr_bitfieldClear( msgs->info->have );
1347            updatePeerProgress( msgs );
1348            break;
1349
1350        case BT_REJECT:
1351        {
1352            struct peer_request r;
1353            dbgmsg( msgs, "Got a BT_REJECT" );
1354            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1355            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1356            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1357            reqListRemove( &msgs->clientAskedFor, &r );
1358            break;
1359        }
1360
1361        case BT_LTEP:
1362            dbgmsg( msgs, "Got a BT_LTEP" );
1363            parseLtep( msgs, msglen, inbuf );
1364            break;
1365
1366        default:
1367            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1368            tr_peerIoDrain( msgs->io, inbuf, msglen );
1369            break;
1370    }
1371
1372    assert( msglen + 1 == msgs->incoming.length );
1373    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1374
1375    msgs->state = AWAITING_BT_LENGTH;
1376    return READ_NOW;
1377}
1378
1379static void
1380decrementDownloadedCount( tr_peermsgs * msgs,
1381                          uint32_t      byteCount )
1382{
1383    tr_torrent * tor = msgs->torrent;
1384
1385    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1386}
1387
1388static void
1389clientGotUnwantedBlock( tr_peermsgs *               msgs,
1390                        const struct peer_request * req )
1391{
1392    decrementDownloadedCount( msgs, req->length );
1393}
1394
1395static void
1396addPeerToBlamefield( tr_peermsgs * msgs,
1397                     uint32_t      index )
1398{
1399    if( !msgs->info->blame )
1400        msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1401    tr_bitfieldAdd( msgs->info->blame, index );
1402}
1403
1404/* returns 0 on success, or an errno on failure */
1405static int
1406clientGotBlock( tr_peermsgs *               msgs,
1407                const uint8_t *             data,
1408                const struct peer_request * req )
1409{
1410    int                    err;
1411    tr_torrent *           tor = msgs->torrent;
1412    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1413
1414    assert( msgs );
1415    assert( req );
1416
1417    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) )
1418    {
1419        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1420                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1421        return EMSGSIZE;
1422    }
1423
1424    /* save the block */
1425    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset,
1426            req->length );
1427
1428    /**
1429    *** Remove the block from our `we asked for this' list
1430    **/
1431
1432    if( !reqListRemove( &msgs->clientAskedFor, req ) )
1433    {
1434        clientGotUnwantedBlock( msgs, req );
1435        dbgmsg( msgs, "we didn't ask for this message..." );
1436        return 0;
1437    }
1438
1439    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1440            msgs->clientAskedFor.count );
1441
1442    /**
1443    *** Error checks
1444    **/
1445
1446    if( tr_cpBlockIsComplete( tor->completion, block ) )
1447    {
1448        dbgmsg( msgs, "we have this block already..." );
1449        clientGotUnwantedBlock( msgs, req );
1450        return 0;
1451    }
1452
1453    /**
1454    ***  Save the block
1455    **/
1456
1457    msgs->info->peerSentPieceDataAt = time( NULL );
1458    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1459        return err;
1460
1461    addPeerToBlamefield( msgs, req->index );
1462    fireGotBlock( msgs, req );
1463    return 0;
1464}
1465
1466static int peerPulse( void * vmsgs );
1467
1468static void
1469didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1470{
1471    tr_peermsgs * msgs = vmsgs;
1472    firePeerGotData( msgs, bytesWritten, wasPieceData );
1473    peerPulse( msgs );
1474}
1475
1476static ReadState
1477canRead( struct tr_iobuf * iobuf, void * vmsgs, size_t * piece )
1478{
1479    ReadState         ret;
1480    tr_peermsgs *     msgs = vmsgs;
1481    struct evbuffer * in = tr_iobuf_input( iobuf );
1482    const size_t      inlen = EVBUFFER_LENGTH( in );
1483
1484    if( !inlen )
1485    {
1486        ret = READ_LATER;
1487    }
1488    else if( msgs->state == AWAITING_BT_PIECE )
1489    {
1490        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1491    }
1492    else switch( msgs->state )
1493    {
1494        case AWAITING_BT_LENGTH:
1495            ret = readBtLength ( msgs, in, inlen ); break;
1496
1497        case AWAITING_BT_ID:
1498            ret = readBtId     ( msgs, in, inlen ); break;
1499
1500        case AWAITING_BT_MESSAGE:
1501            ret = readBtMessage( msgs, in, inlen ); break;
1502
1503        default:
1504            assert( 0 );
1505    }
1506
1507    /* log the raw data that was read */
1508    if( EVBUFFER_LENGTH( in ) != inlen )
1509        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1510
1511    return ret;
1512}
1513
1514/**
1515***
1516**/
1517
1518static int
1519ratePulse( void * vpeer )
1520{
1521    tr_peermsgs * peer = vpeer;
1522    const double rateToClient = tr_peerGetPieceSpeed( peer->info,
1523                                                      TR_PEER_TO_CLIENT );
1524    const int estimatedBlocksInNext30Seconds =
1525                  ( rateToClient * 30 * 1024 ) / peer->torrent->blockSize;
1526
1527    peer->minActiveRequests = 8;
1528    peer->maxActiveRequests = peer->minActiveRequests + estimatedBlocksInNext30Seconds;
1529    return TRUE;
1530}
1531
1532static int
1533popNextRequest( tr_peermsgs *         msgs,
1534                struct peer_request * setme )
1535{
1536    return reqListPop( &msgs->peerAskedFor, setme );
1537}
1538
1539static size_t
1540fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1541{
1542    size_t bytesWritten = 0;
1543    struct peer_request req;
1544    const int haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1545
1546    /**
1547    ***  Protocol messages
1548    **/
1549
1550    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1551    {
1552        dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1553        msgs->outMessagesBatchedAt = now;
1554    }
1555    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1556    {
1557        const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
1558        /* flush the protocol messages */
1559        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->io, len );
1560        tr_peerIoWriteBuf( msgs->io, msgs->outMessages, FALSE );
1561        msgs->clientSentAnythingAt = now;
1562        msgs->outMessagesBatchedAt = 0;
1563        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1564        bytesWritten +=  len;
1565    }
1566
1567    /**
1568    ***  Blocks
1569    **/
1570
1571    if( ( tr_peerIoGetWriteBufferSpace( msgs->io ) >= msgs->torrent->blockSize )
1572        && popNextRequest( msgs, &req )
1573        && requestIsValid( msgs, &req )
1574        && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) )
1575    {
1576        /* send a block */
1577        uint8_t * buf = tr_new( uint8_t, req.length );
1578        const int err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf );
1579        if( err ) {
1580            fireError( msgs, err );
1581        } else {
1582            tr_peerIo * io = msgs->io;
1583            struct evbuffer * out = evbuffer_new( );
1584            dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1585            tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1586            tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1587            tr_peerIoWriteUint32( io, out, req.index );
1588            tr_peerIoWriteUint32( io, out, req.offset );
1589            tr_peerIoWriteBytes ( io, out, buf, req.length );
1590            tr_peerIoWriteBuf( io, out, TRUE );
1591            bytesWritten += EVBUFFER_LENGTH( out );
1592            evbuffer_free( out );
1593            msgs->clientSentAnythingAt = now;
1594        }
1595        tr_free( buf );
1596    }
1597
1598    /**
1599    ***  Keepalive
1600    **/
1601
1602    if( msgs->clientSentAnythingAt
1603        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1604    {
1605        dbgmsg( msgs, "sending a keepalive message" );
1606        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1607        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1608    }
1609
1610    return bytesWritten;
1611}
1612
1613static int
1614peerPulse( void * vmsgs )
1615{
1616    tr_peermsgs * msgs = vmsgs;
1617    const time_t  now = time( NULL );
1618
1619    ratePulse( msgs );
1620
1621    pumpRequestQueue( msgs, now );
1622    expireOldRequests( msgs, now );
1623
1624    for( ;; )
1625        if( fillOutputBuffer( msgs, now ) < 1 )
1626            break;
1627
1628    return TRUE; /* loop forever */
1629}
1630
1631void
1632tr_peerMsgsPulse( tr_peermsgs * msgs )
1633{
1634    if( msgs != NULL )
1635        peerPulse( msgs );
1636}
1637
1638static void
1639gotError( struct tr_iobuf  * iobuf UNUSED,
1640          short              what,
1641          void             * vmsgs )
1642{
1643    if( what & EVBUFFER_TIMEOUT )
1644        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1645    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1646        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1647               what, errno, tr_strerror( errno ) );
1648    fireError( vmsgs, ENOTCONN );
1649}
1650
1651static void
1652sendBitfield( tr_peermsgs * msgs )
1653{
1654    struct evbuffer * out = msgs->outMessages;
1655    tr_bitfield *     field;
1656    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
1657    size_t            i;
1658    size_t            lazyCount = 0;
1659
1660    field = tr_bitfieldDup( tr_cpPieceBitfield( msgs->torrent->completion ) );
1661
1662    if( tr_sessionIsLazyBitfieldEnabled( msgs->session ) )
1663    {
1664        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
1665            speed over a truly random sample -- let's limit the pool size to
1666            the first 1000 pieces so large torrents don't bog things down */
1667        size_t poolSize;
1668        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
1669        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
1670
1671        /* build the pool */
1672        for( i=poolSize=0; i<maxPoolSize; ++i )
1673            if( tr_bitfieldHas( field, i ) )
1674                pool[poolSize++] = i;
1675
1676        /* pull random piece indices from the pool */
1677        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
1678        {
1679            const int pos = tr_cryptoWeakRandInt( poolSize );
1680            const tr_piece_index_t piece = pool[pos];
1681            tr_bitfieldRem( field, piece );
1682            lazyPieces[lazyCount++] = piece;
1683            pool[pos] = pool[--poolSize];
1684        }
1685
1686        /* cleanup */
1687        tr_free( pool );
1688    }
1689
1690    tr_peerIoWriteUint32( msgs->io, out,
1691                          sizeof( uint8_t ) + field->byteCount );
1692    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1693    tr_peerIoWriteBytes ( msgs->io, out, field->bits, field->byteCount );
1694    dbgmsg( msgs, "sending bitfield... outMessage size is now %d",
1695           (int)EVBUFFER_LENGTH( out ) );
1696    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1697
1698    for( i = 0; i < lazyCount; ++i )
1699        protocolSendHave( msgs, lazyPieces[i] );
1700
1701    tr_bitfieldFree( field );
1702}
1703
1704/**
1705***
1706**/
1707
1708/* some peers give us error messages if we send
1709   more than this many peers in a single pex message
1710   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1711#define MAX_PEX_ADDED 50
1712#define MAX_PEX_DROPPED 50
1713
1714typedef struct
1715{
1716    tr_pex *  added;
1717    tr_pex *  dropped;
1718    tr_pex *  elements;
1719    int       addedCount;
1720    int       droppedCount;
1721    int       elementCount;
1722}
1723PexDiffs;
1724
1725static void
1726pexAddedCb( void * vpex,
1727            void * userData )
1728{
1729    PexDiffs * diffs = userData;
1730    tr_pex *   pex = vpex;
1731
1732    if( diffs->addedCount < MAX_PEX_ADDED )
1733    {
1734        diffs->added[diffs->addedCount++] = *pex;
1735        diffs->elements[diffs->elementCount++] = *pex;
1736    }
1737}
1738
1739static void
1740pexDroppedCb( void * vpex,
1741              void * userData )
1742{
1743    PexDiffs * diffs = userData;
1744    tr_pex *   pex = vpex;
1745
1746    if( diffs->droppedCount < MAX_PEX_DROPPED )
1747    {
1748        diffs->dropped[diffs->droppedCount++] = *pex;
1749    }
1750}
1751
1752static void
1753pexElementCb( void * vpex,
1754              void * userData )
1755{
1756    PexDiffs * diffs = userData;
1757    tr_pex *   pex = vpex;
1758
1759    diffs->elements[diffs->elementCount++] = *pex;
1760}
1761
1762/* TODO: ipv6 pex */
1763static void
1764sendPex( tr_peermsgs * msgs )
1765{
1766    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
1767    {
1768        PexDiffs diffs;
1769        tr_pex * newPex = NULL;
1770        const int newCount = tr_peerMgrGetPeers( msgs->session->peerMgr,
1771                                                 msgs->torrent->info.hash,
1772                                                 &newPex );
1773
1774        /* build the diffs */
1775        diffs.added = tr_new( tr_pex, newCount );
1776        diffs.addedCount = 0;
1777        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1778        diffs.droppedCount = 0;
1779        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1780        diffs.elementCount = 0;
1781        tr_set_compare( msgs->pex, msgs->pexCount,
1782                        newPex, newCount,
1783                        tr_pexCompare, sizeof( tr_pex ),
1784                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
1785        dbgmsg(
1786            msgs,
1787            "pex: old peer count %d, new peer count %d, added %d, removed %d",
1788            msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1789
1790        if( diffs.addedCount || diffs.droppedCount )
1791        {
1792            int  i;
1793            tr_benc val;
1794            char * benc;
1795            int bencLen;
1796            uint8_t * tmp, *walk;
1797            struct evbuffer * out = msgs->outMessages;
1798
1799            /* update peer */
1800            tr_free( msgs->pex );
1801            msgs->pex = diffs.elements;
1802            msgs->pexCount = diffs.elementCount;
1803
1804            /* build the pex payload */
1805            tr_bencInitDict( &val, 3 );
1806
1807            /* "added" */
1808            tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1809            for( i = 0; i < diffs.addedCount; ++i ) {
1810                memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
1811                memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1812            }
1813            assert( ( walk - tmp ) == diffs.addedCount * 6 );
1814            tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
1815            tr_free( tmp );
1816
1817            /* "added.f" */
1818            tmp = walk = tr_new( uint8_t, diffs.addedCount );
1819            for( i = 0; i < diffs.addedCount; ++i )
1820                *walk++ = diffs.added[i].flags;
1821            assert( ( walk - tmp ) == diffs.addedCount );
1822            tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
1823            tr_free( tmp );
1824
1825            /* "dropped" */
1826            tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1827            for( i = 0; i < diffs.droppedCount; ++i ) {
1828                memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
1829                memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1830            }
1831            assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1832            tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
1833            tr_free( tmp );
1834
1835            /* write the pex message */
1836            benc = tr_bencSave( &val, &bencLen );
1837            tr_peerIoWriteUint32( msgs->io, out, 2 * sizeof( uint8_t ) + bencLen );
1838            tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
1839            tr_peerIoWriteUint8 ( msgs->io, out, msgs->ut_pex_id );
1840            tr_peerIoWriteBytes ( msgs->io, out, benc, bencLen );
1841            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1842            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
1843
1844            tr_free( benc );
1845            tr_bencFree( &val );
1846        }
1847
1848        /* cleanup */
1849        tr_free( diffs.added );
1850        tr_free( diffs.dropped );
1851        tr_free( newPex );
1852
1853        msgs->clientSentPexAt = time( NULL );
1854    }
1855}
1856
1857static int
1858pexPulse( void * vpeer )
1859{
1860    sendPex( vpeer );
1861    return TRUE;
1862}
1863
1864/**
1865***
1866**/
1867
1868tr_peermsgs*
1869tr_peerMsgsNew( struct tr_torrent * torrent,
1870                struct tr_peer *    info,
1871                tr_delivery_func    func,
1872                void *              userData,
1873                tr_publisher_tag *  setme )
1874{
1875    tr_peermsgs * m;
1876
1877    assert( info );
1878    assert( info->io );
1879
1880    m = tr_new0( tr_peermsgs, 1 );
1881    m->publisher = tr_publisherNew( );
1882    m->info = info;
1883    m->session = torrent->session;
1884    m->torrent = torrent;
1885    m->io = info->io;
1886    m->info->clientIsChoked = 1;
1887    m->info->peerIsChoked = 1;
1888    m->info->clientIsInterested = 0;
1889    m->info->peerIsInterested = 0;
1890    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1891    m->state = AWAITING_BT_LENGTH;
1892    m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL );
1893    m->outMessages = evbuffer_new( );
1894    m->outMessagesBatchedAt = 0;
1895    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1896    m->incoming.block = evbuffer_new( );
1897    m->peerAllowedPieces = NULL;
1898    m->peerAskedFor = REQUEST_LIST_INIT;
1899    m->clientAskedFor = REQUEST_LIST_INIT;
1900    m->clientWillAskFor = REQUEST_LIST_INIT;
1901    *setme = tr_publisherSubscribe( m->publisher, func, userData );
1902
1903    if( tr_peerIoSupportsLTEP( m->io ) )
1904        sendLtepHandshake( m );
1905
1906    sendBitfield( m );
1907
1908    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of
1909                                             inactivity */
1910    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
1911    ratePulse( m );
1912
1913    return m;
1914}
1915
1916void
1917tr_peerMsgsFree( tr_peermsgs* msgs )
1918{
1919    if( msgs )
1920    {
1921        tr_timerFree( &msgs->pexTimer );
1922        tr_publisherFree( &msgs->publisher );
1923        reqListClear( &msgs->clientWillAskFor );
1924        reqListClear( &msgs->clientAskedFor );
1925
1926        tr_bitfieldFree( msgs->peerAllowedPieces );
1927        evbuffer_free( msgs->incoming.block );
1928        evbuffer_free( msgs->outMessages );
1929        tr_free( msgs->pex );
1930
1931        memset( msgs, ~0, sizeof( tr_peermsgs ) );
1932        tr_free( msgs );
1933    }
1934}
1935
1936void
1937tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
1938                        tr_publisher_tag tag )
1939{
1940    tr_publisherUnsubscribe( peer->publisher, tag );
1941}
1942
Note: See TracBrowser for help on using the repository browser.