source: trunk/libtransmission/peer-msgs.c @ 7213

Last change on this file since 7213 was 7213, checked in by charles, 12 years ago

(libT) #1542: SIGSEGV in tr_publisherPublish

  • Property svn:keywords set to Date Rev Author Id
File size: 53.8 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 7213 2008-11-30 21:36:49Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <event.h>
21
22#include "transmission.h"
23#include "bencode.h"
24#include "completion.h"
25#include "crypto.h"
26#include "inout.h"
27#include "iobuf.h"
28#ifdef WIN32
29#include "net.h" /* for ECONN */
30#endif
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-mgr-private.h"
34#include "peer-msgs.h"
35#include "ratecontrol.h"
36#include "stats.h"
37#include "torrent.h"
38#include "trevent.h"
39#include "utils.h"
40
41/**
42***
43**/
44
45enum
46{
47    BT_CHOKE                = 0,
48    BT_UNCHOKE              = 1,
49    BT_INTERESTED           = 2,
50    BT_NOT_INTERESTED       = 3,
51    BT_HAVE                 = 4,
52    BT_BITFIELD             = 5,
53    BT_REQUEST              = 6,
54    BT_PIECE                = 7,
55    BT_CANCEL               = 8,
56    BT_PORT                 = 9,
57    BT_SUGGEST              = 13,
58    BT_HAVE_ALL             = 14,
59    BT_HAVE_NONE            = 15,
60    BT_REJECT               = 16,
61    BT_LTEP                 = 20,
62
63    LTEP_HANDSHAKE          = 0,
64
65    TR_LTEP_PEX             = 1,
66
67    MIN_CHOKE_PERIOD_SEC    = ( 10 ),
68
69    /* idle seconds before we send a keepalive */
70    KEEPALIVE_INTERVAL_SECS = 100,
71
72    PEX_INTERVAL            = ( 90 * 1000 ), /* msec between sendPex() calls */
73    PEER_PULSE_INTERVAL     = ( 250 ),       /* msec between peerPulse() calls
74                                               */
75
76    MAX_QUEUE_SIZE          = ( 100 ),
77
78    /* how long an unsent request can stay queued before it's returned
79       back to the peer-mgr's pool of requests */
80    QUEUED_REQUEST_TTL_SECS = 20,
81
82    /* how long a sent request can stay queued before it's returned
83       back to the peer-mgr's pool of requests */
84    SENT_REQUEST_TTL_SECS = 240,
85
86    /* used in lowering the outMessages queue period */
87    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
88    HIGH_PRIORITY_INTERVAL_SECS = 2,
89    LOW_PRIORITY_INTERVAL_SECS = 20,
90
91    /* number of pieces to remove from the bitfield when
92     * lazy bitfields are turned on */
93    LAZY_PIECE_COUNT = 26
94};
95
96/**
97***  REQUEST MANAGEMENT
98**/
99
100enum
101{
102    AWAITING_BT_LENGTH,
103    AWAITING_BT_ID,
104    AWAITING_BT_MESSAGE,
105    AWAITING_BT_PIECE
106};
107
108struct peer_request
109{
110    uint32_t    index;
111    uint32_t    offset;
112    uint32_t    length;
113    time_t      time_requested;
114};
115
116static int
117compareRequest( const void * va,
118                const void * vb )
119{
120    const struct peer_request * a = va;
121    const struct peer_request * b = vb;
122
123    if( a->index != b->index )
124        return a->index < b->index ? -1 : 1;
125
126    if( a->offset != b->offset )
127        return a->offset < b->offset ? -1 : 1;
128
129    if( a->length != b->length )
130        return a->length < b->length ? -1 : 1;
131
132    return 0;
133}
134
135struct request_list
136{
137    uint16_t               count;
138    uint16_t               max;
139    struct peer_request *  requests;
140};
141
142static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
143
144static void
145reqListReserve( struct request_list * list,
146                uint16_t              max )
147{
148    if( list->max < max )
149    {
150        list->max = max;
151        list->requests = tr_renew( struct peer_request,
152                                   list->requests,
153                                   list->max );
154    }
155}
156
157static void
158reqListClear( struct request_list * list )
159{
160    tr_free( list->requests );
161    *list = REQUEST_LIST_INIT;
162}
163
164static void
165reqListCopy( struct request_list *       dest,
166             const struct request_list * src )
167{
168    dest->count = dest->max = src->count;
169    dest->requests =
170        tr_memdup( src->requests, dest->count * sizeof( struct peer_request ) );
171}
172
173static void
174reqListRemoveOne( struct request_list * list,
175                  int                   i )
176{
177    assert( 0 <= i && i < list->count );
178
179    memmove( &list->requests[i],
180            &list->requests[i + 1],
181            sizeof( struct peer_request ) * ( --list->count - i ) );
182}
183
184static void
185reqListAppend( struct request_list *       list,
186               const struct peer_request * req )
187{
188    if( ++list->count >= list->max )
189        reqListReserve( list, list->max + 8 );
190
191    list->requests[list->count - 1] = *req;
192}
193
194static int
195reqListPop( struct request_list * list,
196            struct peer_request * setme )
197{
198    int success;
199
200    if( !list->count )
201        success = FALSE;
202    else {
203        *setme = list->requests[0];
204        reqListRemoveOne( list, 0 );
205        success = TRUE;
206    }
207
208    return success;
209}
210
211static int
212reqListFind( struct request_list *       list,
213             const struct peer_request * key )
214{
215    uint16_t i;
216
217    for( i = 0; i < list->count; ++i )
218        if( !compareRequest( key, list->requests + i ) )
219            return i;
220
221    return -1;
222}
223
224static int
225reqListRemove( struct request_list *       list,
226               const struct peer_request * key )
227{
228    int success;
229    const int i = reqListFind( list, key );
230
231    if( i < 0 )
232        success = FALSE;
233    else {
234        reqListRemoveOne( list, i );
235        success = TRUE;
236    }
237
238    return success;
239}
240
241/**
242***
243**/
244
245/* this is raw, unchanged data from the peer regarding
246 * the current message that it's sending us. */
247struct tr_incoming
248{
249    uint8_t                id;
250    uint32_t               length; /* includes the +1 for id length */
251    struct peer_request    blockReq; /* metadata for incoming blocks */
252    struct evbuffer *      block; /* piece data for incoming blocks */
253};
254
255struct tr_peermsgs
256{
257    tr_bool         peerSentBitfield;
258    tr_bool         peerSupportsPex;
259    tr_bool         clientSentLtepHandshake;
260    tr_bool         peerSentLtepHandshake;
261
262    uint8_t         state;
263    uint8_t         ut_pex_id;
264    uint16_t        pexCount;
265    uint16_t        minActiveRequests;
266    uint16_t        maxActiveRequests;
267
268    /* how long the outMessages batch should be allowed to grow before
269     * it's flushed -- some messages (like requests >:) should be sent
270     * very quickly; others aren't as urgent. */
271    int                    outMessagesBatchPeriod;
272
273    tr_peer *              info;
274
275    tr_session *           session;
276    tr_torrent *           torrent;
277    tr_peerIo *            io;
278
279    tr_publisher_t *       publisher;
280
281    struct evbuffer *      outMessages; /* all the non-piece messages */
282
283    struct request_list    peerAskedFor;
284    struct request_list    clientAskedFor;
285    struct request_list    clientWillAskFor;
286
287    tr_timer *             pexTimer;
288
289    time_t                 clientSentPexAt;
290    time_t                 clientSentAnythingAt;
291
292    /* when we started batching the outMessages */
293    time_t                outMessagesBatchedAt;
294
295    tr_bitfield *         peerAllowedPieces;
296
297    struct tr_incoming    incoming;
298
299    tr_pex *              pex;
300};
301
302/**
303***
304**/
305
306static void
307myDebug( const char *               file,
308         int                        line,
309         const struct tr_peermsgs * msgs,
310         const char *               fmt,
311         ... )
312{
313    FILE * fp = tr_getLog( );
314
315    if( fp )
316    {
317        va_list           args;
318        char              timestr[64];
319        struct evbuffer * buf = evbuffer_new( );
320        char *            base = tr_basename( file );
321
322        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
323                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
324                             msgs->torrent->info.name,
325                             tr_peerIoGetAddrStr( msgs->io ),
326                             msgs->info->client );
327        va_start( args, fmt );
328        evbuffer_add_vprintf( buf, fmt, args );
329        va_end( args );
330        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
331        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
332
333        tr_free( base );
334        evbuffer_free( buf );
335    }
336}
337
338#define dbgmsg( msgs, ... ) \
339    do { \
340        if( tr_deepLoggingIsActive( ) ) \
341            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
342    } while( 0 )
343
344/**
345***
346**/
347
348static void
349pokeBatchPeriod( tr_peermsgs * msgs,
350                 int           interval )
351{
352    if( msgs->outMessagesBatchPeriod > interval )
353    {
354        msgs->outMessagesBatchPeriod = interval;
355        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
356    }
357}
358
359static void
360protocolSendRequest( tr_peermsgs *               msgs,
361                     const struct peer_request * req )
362{
363    tr_peerIo *       io = msgs->io;
364    struct evbuffer * out = msgs->outMessages;
365
366    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
367    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
368    tr_peerIoWriteUint32( io, out, req->index );
369    tr_peerIoWriteUint32( io, out, req->offset );
370    tr_peerIoWriteUint32( io, out, req->length );
371    dbgmsg( msgs, "requesting %u:%u->%u... outMessage size is now %d",
372           req->index, req->offset, req->length, (int)EVBUFFER_LENGTH( out ) );
373    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
374}
375
376static void
377protocolSendCancel( tr_peermsgs *               msgs,
378                    const struct peer_request * req )
379{
380    tr_peerIo *       io = msgs->io;
381    struct evbuffer * out = msgs->outMessages;
382
383    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
384    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
385    tr_peerIoWriteUint32( io, out, req->index );
386    tr_peerIoWriteUint32( io, out, req->offset );
387    tr_peerIoWriteUint32( io, out, req->length );
388    dbgmsg( msgs, "cancelling %u:%u->%u... outMessage size is now %d",
389           req->index, req->offset, req->length, (int)EVBUFFER_LENGTH( out ) );
390    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
391}
392
393static void
394protocolSendHave( tr_peermsgs * msgs,
395                  uint32_t      index )
396{
397    tr_peerIo *       io = msgs->io;
398    struct evbuffer * out = msgs->outMessages;
399
400    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + sizeof( uint32_t ) );
401    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
402    tr_peerIoWriteUint32( io, out, index );
403    dbgmsg( msgs, "sending Have %u.. outMessage size is now %d",
404           index, (int)EVBUFFER_LENGTH( out ) );
405    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
406}
407
408static void
409protocolSendChoke( tr_peermsgs * msgs,
410                   int           choke )
411{
412    tr_peerIo *       io = msgs->io;
413    struct evbuffer * out = msgs->outMessages;
414
415    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
416    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
417    dbgmsg( msgs, "sending %s... outMessage size is now %d",
418           ( choke ? "Choke" : "Unchoke" ),
419           (int)EVBUFFER_LENGTH( out ) );
420    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
421}
422
423/**
424***  EVENTS
425**/
426
427static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0 };
428
429static void
430publish( tr_peermsgs *   msgs,
431         tr_peer_event * e )
432{
433    tr_publisherPublish( msgs->publisher, msgs->info, e );
434}
435
436static void
437fireError( tr_peermsgs * msgs,
438           int           err )
439{
440    tr_peer_event e = blankEvent;
441
442    e.eventType = TR_PEER_ERROR;
443    e.err = err;
444    publish( msgs, &e );
445}
446
447static void
448fireNeedReq( tr_peermsgs * msgs )
449{
450    tr_peer_event e = blankEvent;
451
452    e.eventType = TR_PEER_NEED_REQ;
453    publish( msgs, &e );
454}
455
456static void
457firePeerProgress( tr_peermsgs * msgs )
458{
459    tr_peer_event e = blankEvent;
460
461    e.eventType = TR_PEER_PEER_PROGRESS;
462    e.progress = msgs->info->progress;
463    publish( msgs, &e );
464}
465
466static void
467fireGotBlock( tr_peermsgs *               msgs,
468              const struct peer_request * req )
469{
470    tr_peer_event e = blankEvent;
471
472    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
473    e.pieceIndex = req->index;
474    e.offset = req->offset;
475    e.length = req->length;
476    publish( msgs, &e );
477}
478
479static void
480fireClientGotData( tr_peermsgs * msgs,
481                   uint32_t      length,
482                   int           wasPieceData )
483{
484    tr_peer_event e = blankEvent;
485
486    e.length = length;
487    e.eventType = TR_PEER_CLIENT_GOT_DATA;
488    e.wasPieceData = wasPieceData;
489    publish( msgs, &e );
490}
491
492static void
493firePeerGotData( tr_peermsgs  * msgs,
494                 uint32_t       length,
495                 int            wasPieceData )
496{
497    tr_peer_event e = blankEvent;
498
499    e.length = length;
500    e.eventType = TR_PEER_PEER_GOT_DATA;
501    e.wasPieceData = wasPieceData;
502
503    publish( msgs, &e );
504}
505
506static void
507fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
508{
509    tr_peer_event e = blankEvent;
510    e.eventType = TR_PEER_CANCEL;
511    e.pieceIndex = req->index;
512    e.offset = req->offset;
513    e.length = req->length;
514    publish( msgs, &e );
515}
516
517/**
518***  INTEREST
519**/
520
521static int
522isPieceInteresting( const tr_peermsgs * peer,
523                    tr_piece_index_t    piece )
524{
525    const tr_torrent * torrent = peer->torrent;
526
527    return ( !torrent->info.pieces[piece].dnd )                 /* we want it */
528           && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have
529                                                                        */
530           && ( tr_bitfieldHas( peer->info->have, piece ) );   /* peer has it */
531}
532
533/* "interested" means we'll ask for piece data if they unchoke us */
534static int
535isPeerInteresting( const tr_peermsgs * msgs )
536{
537    tr_piece_index_t    i;
538    const tr_torrent *  torrent;
539    const tr_bitfield * bitfield;
540    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
541
542    if( clientIsSeed )
543        return FALSE;
544
545    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
546        return FALSE;
547
548    torrent = msgs->torrent;
549    bitfield = tr_cpPieceBitfield( torrent->completion );
550
551    if( !msgs->info->have )
552        return TRUE;
553
554    assert( bitfield->byteCount == msgs->info->have->byteCount );
555
556    for( i = 0; i < torrent->info.pieceCount; ++i )
557        if( isPieceInteresting( msgs, i ) )
558            return TRUE;
559
560    return FALSE;
561}
562
563static void
564sendInterest( tr_peermsgs * msgs,
565              int           weAreInterested )
566{
567    struct evbuffer * out = msgs->outMessages;
568
569    assert( msgs );
570    assert( weAreInterested == 0 || weAreInterested == 1 );
571
572    msgs->info->clientIsInterested = weAreInterested;
573    dbgmsg( msgs, "Sending %s",
574            weAreInterested ? "Interested" : "Not Interested" );
575    tr_peerIoWriteUint32( msgs->io, out, sizeof( uint8_t ) );
576    tr_peerIoWriteUint8 (
577        msgs->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
578    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
579    dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH( out ) );
580}
581
582static void
583updateInterest( tr_peermsgs * msgs )
584{
585    const int i = isPeerInteresting( msgs );
586
587    if( i != msgs->info->clientIsInterested )
588        sendInterest( msgs, i );
589    if( i )
590        fireNeedReq( msgs );
591}
592
593static void
594cancelAllRequestsToClient( tr_peermsgs * msgs )
595{
596    reqListClear( &msgs->peerAskedFor );
597}
598
599void
600tr_peerMsgsSetChoke( tr_peermsgs * msgs,
601                     int           choke )
602{
603    const time_t now = time( NULL );
604    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
605
606    assert( msgs );
607    assert( msgs->info );
608    assert( choke == 0 || choke == 1 );
609
610    if( msgs->info->chokeChangedAt > fibrillationTime )
611    {
612        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation",
613                choke );
614    }
615    else if( msgs->info->peerIsChoked != choke )
616    {
617        msgs->info->peerIsChoked = choke;
618        if( choke )
619            cancelAllRequestsToClient( msgs );
620        protocolSendChoke( msgs, choke );
621        msgs->info->chokeChangedAt = now;
622    }
623}
624
625/**
626***
627**/
628
629void
630tr_peerMsgsHave( tr_peermsgs * msgs,
631                 uint32_t      index )
632{
633    protocolSendHave( msgs, index );
634
635    /* since we have more pieces now, we might not be interested in this peer */
636    updateInterest( msgs );
637}
638
639/**
640***
641**/
642
643static int
644reqIsValid( const tr_peermsgs * peer,
645            uint32_t            index,
646            uint32_t            offset,
647            uint32_t            length )
648{
649    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
650}
651
652static int
653requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
654{
655    return reqIsValid( msgs, req->index, req->offset, req->length );
656}
657
658static void
659expireOldRequests( tr_peermsgs * msgs, const time_t now  )
660{
661    int                 i;
662    time_t              oldestAllowed;
663    struct request_list tmp = REQUEST_LIST_INIT;
664
665    /* cancel requests that have been queued for too long */
666    oldestAllowed = now - QUEUED_REQUEST_TTL_SECS;
667    reqListCopy( &tmp, &msgs->clientWillAskFor );
668    for( i = 0; i < tmp.count; ++i )
669    {
670        const struct peer_request * req = &tmp.requests[i];
671        if( req->time_requested < oldestAllowed )
672            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
673    }
674    reqListClear( &tmp );
675
676    /* cancel requests that were sent too long ago */
677    oldestAllowed = now - SENT_REQUEST_TTL_SECS;
678    reqListCopy( &tmp, &msgs->clientAskedFor );
679    for( i = 0; i < tmp.count; ++i )
680    {
681        const struct peer_request * req = &tmp.requests[i];
682        if( req->time_requested < oldestAllowed )
683            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
684    }
685    reqListClear( &tmp );
686}
687
688static void
689pumpRequestQueue( tr_peermsgs * msgs, const time_t now )
690{
691    const int           max = msgs->maxActiveRequests;
692    const int           min = msgs->minActiveRequests;
693    int                 sent = 0;
694    int                 count = msgs->clientAskedFor.count;
695    struct peer_request req;
696
697    if( count > min )
698        return;
699    if( msgs->info->clientIsChoked )
700        return;
701    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
702        return;
703
704    while( ( count < max ) && reqListPop( &msgs->clientWillAskFor, &req ) )
705    {
706        const tr_block_index_t block =
707            _tr_block( msgs->torrent, req.index, req.offset );
708
709        assert( requestIsValid( msgs, &req ) );
710        assert( tr_bitfieldHas( msgs->info->have, req.index ) );
711
712        /* don't ask for it if we've already got it... this block may have
713         * come in from a different peer after we cancelled a request for it */
714        if( !tr_cpBlockIsComplete( msgs->torrent->completion, block ) )
715        {
716            protocolSendRequest( msgs, &req );
717            req.time_requested = now;
718            reqListAppend( &msgs->clientAskedFor, &req );
719
720            ++count;
721            ++sent;
722        }
723    }
724
725    if( sent )
726        dbgmsg( msgs,
727                "pump sent %d requests, now have %d active and %d queued",
728                sent,
729                msgs->clientAskedFor.count,
730                msgs->clientWillAskFor.count );
731
732    if( count < max )
733        fireNeedReq( msgs );
734}
735
736static int
737requestQueueIsFull( const tr_peermsgs * msgs )
738{
739    const int req_max = msgs->maxActiveRequests;
740    return msgs->clientWillAskFor.count >= req_max;
741}
742
743tr_addreq_t
744tr_peerMsgsAddRequest( tr_peermsgs *    msgs,
745                       uint32_t         index,
746                       uint32_t         offset,
747                       uint32_t         length )
748{
749    struct peer_request req;
750
751    assert( msgs );
752    assert( msgs->torrent );
753    assert( reqIsValid( msgs, index, offset, length ) );
754
755    /**
756    ***  Reasons to decline the request
757    **/
758
759    /* don't send requests to choked clients */
760    if( msgs->info->clientIsChoked )
761    {
762        dbgmsg( msgs, "declining request because they're choking us" );
763        return TR_ADDREQ_CLIENT_CHOKED;
764    }
765
766    /* peer doesn't have this piece */
767    if( !tr_bitfieldHas( msgs->info->have, index ) )
768        return TR_ADDREQ_MISSING;
769
770    /* peer's queue is full */
771    if( requestQueueIsFull( msgs ) ) {
772        dbgmsg( msgs, "declining request because we're full" );
773        return TR_ADDREQ_FULL;
774    }
775
776    /* have we already asked for this piece? */
777    req.index = index;
778    req.offset = offset;
779    req.length = length;
780    if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
781        dbgmsg( msgs, "declining because it's a duplicate" );
782        return TR_ADDREQ_DUPLICATE;
783    }
784    if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
785        dbgmsg( msgs, "declining because it's a duplicate" );
786        return TR_ADDREQ_DUPLICATE;
787    }
788
789    /**
790    ***  Accept this request
791    **/
792
793    dbgmsg( msgs, "adding req for %"PRIu32":%"PRIu32"->%"PRIu32" to our `will request' list",
794            index, offset, length );
795    req.time_requested = time( NULL );
796    reqListAppend( &msgs->clientWillAskFor, &req );
797    return TR_ADDREQ_OK;
798}
799
800static void
801cancelAllRequestsToPeer( tr_peermsgs * msgs )
802{
803    int                 i;
804    struct request_list a = msgs->clientWillAskFor;
805    struct request_list b = msgs->clientAskedFor;
806    dbgmsg( msgs, "cancelling all requests to peer" );
807
808    msgs->clientAskedFor = REQUEST_LIST_INIT;
809    msgs->clientWillAskFor = REQUEST_LIST_INIT;
810
811    for( i=0; i<a.count; ++i )
812        fireCancelledReq( msgs, &a.requests[i] );
813
814    for( i = 0; i < b.count; ++i ) {
815        fireCancelledReq( msgs, &b.requests[i] );
816        protocolSendCancel( msgs, &b.requests[i] );
817    }
818
819    reqListClear( &a );
820    reqListClear( &b );
821}
822
823void
824tr_peerMsgsCancel( tr_peermsgs * msgs,
825                   uint32_t      pieceIndex,
826                   uint32_t      offset,
827                   uint32_t      length )
828{
829    struct peer_request req;
830
831    assert( msgs != NULL );
832    assert( length > 0 );
833
834
835    /* have we asked the peer for this piece? */
836    req.index = pieceIndex;
837    req.offset = offset;
838    req.length = length;
839
840    /* if it's only in the queue and hasn't been sent yet, free it */
841    if( reqListRemove( &msgs->clientWillAskFor, &req ) ) {
842        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32"\n", pieceIndex, offset, length );
843        fireCancelledReq( msgs, &req );
844    }
845
846    /* if it's already been sent, send a cancel message too */
847    if( reqListRemove( &msgs->clientAskedFor, &req ) ) {
848        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32"\n", pieceIndex, offset, length );
849        protocolSendCancel( msgs, &req );
850        fireCancelledReq( msgs, &req );
851    }
852}
853
854/**
855***
856**/
857
858static void
859sendLtepHandshake( tr_peermsgs * msgs )
860{
861    tr_benc           val, *m;
862    char *            buf;
863    int               len;
864    int               pex;
865    struct evbuffer * out = msgs->outMessages;
866
867    if( msgs->clientSentLtepHandshake )
868        return;
869
870    dbgmsg( msgs, "sending an ltep handshake" );
871    msgs->clientSentLtepHandshake = 1;
872
873    /* decide if we want to advertise pex support */
874    if( !tr_torrentAllowsPex( msgs->torrent ) )
875        pex = 0;
876    else if( msgs->peerSentLtepHandshake )
877        pex = msgs->peerSupportsPex ? 1 : 0;
878    else
879        pex = 1;
880
881    tr_bencInitDict( &val, 4 );
882    tr_bencDictAddInt( &val, "e",
883                       msgs->session->encryptionMode != TR_CLEAR_PREFERRED );
884    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->session ) );
885    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
886    m  = tr_bencDictAddDict( &val, "m", 1 );
887    if( pex )
888        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
889    buf = tr_bencSave( &val, &len );
890
891    tr_peerIoWriteUint32( msgs->io, out, 2 * sizeof( uint8_t ) + len );
892    tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
893    tr_peerIoWriteUint8 ( msgs->io, out, LTEP_HANDSHAKE );
894    tr_peerIoWriteBytes ( msgs->io, out, buf, len );
895    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
896    dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH( out ) );
897
898    /* cleanup */
899    tr_bencFree( &val );
900    tr_free( buf );
901}
902
903static void
904parseLtepHandshake( tr_peermsgs *     msgs,
905                    int               len,
906                    struct evbuffer * inbuf )
907{
908    int64_t   i;
909    tr_benc   val, * sub;
910    uint8_t * tmp = tr_new( uint8_t, len );
911
912    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
913    msgs->peerSentLtepHandshake = 1;
914
915    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type != TYPE_DICT )
916    {
917        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
918        tr_free( tmp );
919        return;
920    }
921
922    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
923
924    /* does the peer prefer encrypted connections? */
925    if( tr_bencDictFindInt( &val, "e", &i ) )
926        msgs->info->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
927                                            : ENCRYPTION_PREFERENCE_NO;
928
929    /* check supported messages for utorrent pex */
930    msgs->peerSupportsPex = 0;
931    if( tr_bencDictFindDict( &val, "m", &sub ) )
932    {
933        if( tr_bencDictFindInt( sub, "ut_pex", &i ) )
934        {
935            msgs->ut_pex_id = (uint8_t) i;
936            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
937            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
938        }
939    }
940
941    /* get peer's listening port */
942    if( tr_bencDictFindInt( &val, "p", &i ) )
943    {
944        msgs->info->port = htons( (uint16_t)i );
945        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
946    }
947
948    tr_bencFree( &val );
949    tr_free( tmp );
950}
951
952static void
953parseUtPex( tr_peermsgs *     msgs,
954            int               msglen,
955            struct evbuffer * inbuf )
956{
957    int                loaded = 0;
958    uint8_t *          tmp = tr_new( uint8_t, msglen );
959    tr_benc            val;
960    const tr_torrent * tor = msgs->torrent;
961    const uint8_t *    added;
962    size_t             added_len;
963
964    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
965
966    if( tr_torrentAllowsPex( tor )
967      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) )
968      && tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
969    {
970        const uint8_t * added_f = NULL;
971        tr_pex *        pex;
972        size_t          i, n;
973        size_t          added_f_len = 0;
974        tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
975        pex =
976            tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len,
977                                    &n );
978        for( i = 0; i < n; ++i )
979            tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
980                              TR_PEER_FROM_PEX, pex + i );
981        tr_free( pex );
982    }
983
984    if( loaded )
985        tr_bencFree( &val );
986    tr_free( tmp );
987}
988
989static void sendPex( tr_peermsgs * msgs );
990
991static void
992parseLtep( tr_peermsgs *     msgs,
993           int               msglen,
994           struct evbuffer * inbuf )
995{
996    uint8_t ltep_msgid;
997
998    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
999    msglen--;
1000
1001    if( ltep_msgid == LTEP_HANDSHAKE )
1002    {
1003        dbgmsg( msgs, "got ltep handshake" );
1004        parseLtepHandshake( msgs, msglen, inbuf );
1005        if( tr_peerIoSupportsLTEP( msgs->io ) )
1006        {
1007            sendLtepHandshake( msgs );
1008            sendPex( msgs );
1009        }
1010    }
1011    else if( ltep_msgid == TR_LTEP_PEX )
1012    {
1013        dbgmsg( msgs, "got ut pex" );
1014        msgs->peerSupportsPex = 1;
1015        parseUtPex( msgs, msglen, inbuf );
1016    }
1017    else
1018    {
1019        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1020        evbuffer_drain( inbuf, msglen );
1021    }
1022}
1023
1024static int
1025readBtLength( tr_peermsgs *     msgs,
1026              struct evbuffer * inbuf,
1027              size_t            inlen )
1028{
1029    uint32_t len;
1030
1031    if( inlen < sizeof( len ) )
1032        return READ_LATER;
1033
1034    tr_peerIoReadUint32( msgs->io, inbuf, &len );
1035
1036    if( len == 0 ) /* peer sent us a keepalive message */
1037        dbgmsg( msgs, "got KeepAlive" );
1038    else
1039    {
1040        msgs->incoming.length = len;
1041        msgs->state = AWAITING_BT_ID;
1042    }
1043
1044    return READ_NOW;
1045}
1046
1047static int readBtMessage( tr_peermsgs *     msgs,
1048                          struct evbuffer * inbuf,
1049                          size_t            inlen );
1050
1051static int
1052readBtId( tr_peermsgs *     msgs,
1053          struct evbuffer * inbuf,
1054          size_t            inlen )
1055{
1056    uint8_t id;
1057
1058    if( inlen < sizeof( uint8_t ) )
1059        return READ_LATER;
1060
1061    tr_peerIoReadUint8( msgs->io, inbuf, &id );
1062    msgs->incoming.id = id;
1063
1064    if( id == BT_PIECE )
1065    {
1066        msgs->state = AWAITING_BT_PIECE;
1067        return READ_NOW;
1068    }
1069    else if( msgs->incoming.length != 1 )
1070    {
1071        msgs->state = AWAITING_BT_MESSAGE;
1072        return READ_NOW;
1073    }
1074    else return readBtMessage( msgs, inbuf, inlen - 1 );
1075}
1076
1077static void
1078updatePeerProgress( tr_peermsgs * msgs )
1079{
1080    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have )
1081                           / (float)msgs->torrent->info.pieceCount;
1082    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
1083    updateInterest( msgs );
1084    firePeerProgress( msgs );
1085}
1086
1087static void
1088peerMadeRequest( tr_peermsgs *               msgs,
1089                 const struct peer_request * req )
1090{
1091    const int reqIsValid = requestIsValid( msgs, req );
1092    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete(
1093        msgs->torrent->completion, req->index );
1094    const int peerIsChoked = msgs->info->peerIsChoked;
1095
1096    if( !reqIsValid ) /* bad request */
1097    {
1098        dbgmsg( msgs, "rejecting an invalid request." );
1099    }
1100    else if( !clientHasPiece ) /* we don't have it */
1101    {
1102        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1103    }
1104    else if( peerIsChoked ) /* doesn't he know he's choked? */
1105    {
1106        tr_peerMsgsSetChoke( msgs, 1 );
1107    }
1108    else /* YAY */
1109    {
1110        reqListAppend( &msgs->peerAskedFor, req );
1111    }
1112}
1113
1114static int
1115messageLengthIsCorrect( const tr_peermsgs * msg,
1116                        uint8_t             id,
1117                        uint32_t            len )
1118{
1119    switch( id )
1120    {
1121        case BT_CHOKE:
1122        case BT_UNCHOKE:
1123        case BT_INTERESTED:
1124        case BT_NOT_INTERESTED:
1125        case BT_HAVE_ALL:
1126        case BT_HAVE_NONE:
1127            return len == 1;
1128
1129        case BT_HAVE:
1130        case BT_SUGGEST:
1131
1132        case BT_BITFIELD:
1133            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1134
1135        case BT_REQUEST:
1136        case BT_CANCEL:
1137        case BT_REJECT:
1138            return len == 13;
1139
1140        case BT_PIECE:
1141            return len > 9 && len <= 16393;
1142
1143        case BT_PORT:
1144            return len == 3;
1145
1146        case BT_LTEP:
1147            return len >= 2;
1148
1149        default:
1150            return FALSE;
1151    }
1152}
1153
1154static int clientGotBlock( tr_peermsgs *               msgs,
1155                           const uint8_t *             block,
1156                           const struct peer_request * req );
1157
1158static int
1159readBtPiece( tr_peermsgs      * msgs,
1160             struct evbuffer  * inbuf,
1161             size_t             inlen,
1162             size_t           * setme_piece_bytes_read )
1163{
1164    struct peer_request * req = &msgs->incoming.blockReq;
1165
1166    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1167    dbgmsg( msgs, "In readBtPiece" );
1168
1169    if( !req->length )
1170    {
1171        if( inlen < 8 )
1172            return READ_LATER;
1173
1174        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
1175        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
1176        req->length = msgs->incoming.length - 9;
1177        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index,
1178                req->offset,
1179                req->length );
1180        return READ_NOW;
1181    }
1182    else
1183    {
1184        int          err;
1185
1186        /* read in another chunk of data */
1187        const size_t nLeft = req->length - EVBUFFER_LENGTH(
1188            msgs->incoming.block );
1189        size_t       n = MIN( nLeft, inlen );
1190        uint8_t *    buf = tr_new( uint8_t, n );
1191        assert( EVBUFFER_LENGTH( inbuf ) >= n );
1192        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
1193        evbuffer_add( msgs->incoming.block, buf, n );
1194        fireClientGotData( msgs, n, TRUE );
1195        *setme_piece_bytes_read += n;
1196        tr_free( buf );
1197        dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
1198               (int)n, req->index, req->offset, req->length,
1199               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1200        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1201            return READ_LATER;
1202
1203        /* we've got the whole block ... process it */
1204        err = clientGotBlock( msgs, EVBUFFER_DATA(
1205                                  msgs->incoming.block ), req );
1206
1207        /* cleanup */
1208        evbuffer_drain( msgs->incoming.block,
1209                       EVBUFFER_LENGTH( msgs->incoming.block ) );
1210        req->length = 0;
1211        msgs->state = AWAITING_BT_LENGTH;
1212        if( !err )
1213            return READ_NOW;
1214        else
1215        {
1216            fireError( msgs, err );
1217            return READ_ERR;
1218        }
1219    }
1220}
1221
1222static int
1223readBtMessage( tr_peermsgs *     msgs,
1224               struct evbuffer * inbuf,
1225               size_t            inlen )
1226{
1227    uint32_t      ui32;
1228    uint32_t      msglen = msgs->incoming.length;
1229    const uint8_t id = msgs->incoming.id;
1230    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1231
1232    --msglen; /* id length */
1233
1234    if( inlen < msglen )
1235        return READ_LATER;
1236
1237    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id,
1238            (int)msglen,
1239            (int)inlen );
1240
1241    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1242    {
1243        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d",
1244                (int)id, (int)msglen );
1245        fireError( msgs, EMSGSIZE );
1246        return READ_ERR;
1247    }
1248
1249    switch( id )
1250    {
1251        case BT_CHOKE:
1252            dbgmsg( msgs, "got Choke" );
1253            msgs->info->clientIsChoked = 1;
1254            cancelAllRequestsToPeer( msgs );
1255            cancelAllRequestsToClient( msgs );
1256            break;
1257
1258        case BT_UNCHOKE:
1259            dbgmsg( msgs, "got Unchoke" );
1260            msgs->info->clientIsChoked = 0;
1261            fireNeedReq( msgs );
1262            break;
1263
1264        case BT_INTERESTED:
1265            dbgmsg( msgs, "got Interested" );
1266            msgs->info->peerIsInterested = 1;
1267            break;
1268
1269        case BT_NOT_INTERESTED:
1270            dbgmsg( msgs, "got Not Interested" );
1271            msgs->info->peerIsInterested = 0;
1272            break;
1273
1274        case BT_HAVE:
1275            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1276            dbgmsg( msgs, "got Have: %u", ui32 );
1277            if( tr_bitfieldAdd( msgs->info->have, ui32 ) )
1278            {
1279                fireError( msgs, ERANGE );
1280                return READ_ERR;
1281            }
1282            updatePeerProgress( msgs );
1283            tr_rcTransferred( msgs->torrent->swarmSpeed,
1284                              msgs->torrent->info.pieceSize );
1285            break;
1286
1287        case BT_BITFIELD:
1288        {
1289            dbgmsg( msgs, "got a bitfield" );
1290            msgs->peerSentBitfield = 1;
1291            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits,
1292                                msglen );
1293            updatePeerProgress( msgs );
1294            fireNeedReq( msgs );
1295            break;
1296        }
1297
1298        case BT_REQUEST:
1299        {
1300            struct peer_request r;
1301            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1302            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1303            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1304            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset,
1305                    r.length );
1306            peerMadeRequest( msgs, &r );
1307            break;
1308        }
1309
1310        case BT_CANCEL:
1311        {
1312            struct peer_request r;
1313            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1314            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1315            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1316            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset,
1317                    r.length );
1318            reqListRemove( &msgs->peerAskedFor, &r );
1319            break;
1320        }
1321
1322        case BT_PIECE:
1323            assert( 0 ); /* handled elsewhere! */
1324            break;
1325
1326        case BT_PORT:
1327            dbgmsg( msgs, "Got a BT_PORT" );
1328            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1329            break;
1330
1331        case BT_SUGGEST:
1332        {
1333            dbgmsg( msgs, "Got a BT_SUGGEST" );
1334            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1335            /* we don't do anything with this yet */
1336            break;
1337        }
1338
1339        case BT_HAVE_ALL:
1340            dbgmsg( msgs, "Got a BT_HAVE_ALL" );
1341            tr_bitfieldAddRange( msgs->info->have, 0,
1342                                 msgs->torrent->info.pieceCount );
1343            updatePeerProgress( msgs );
1344            break;
1345
1346
1347        case BT_HAVE_NONE:
1348            dbgmsg( msgs, "Got a BT_HAVE_NONE" );
1349            tr_bitfieldClear( msgs->info->have );
1350            updatePeerProgress( msgs );
1351            break;
1352
1353        case BT_REJECT:
1354        {
1355            struct peer_request r;
1356            dbgmsg( msgs, "Got a BT_REJECT" );
1357            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1358            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1359            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1360            reqListRemove( &msgs->clientAskedFor, &r );
1361            break;
1362        }
1363
1364        case BT_LTEP:
1365            dbgmsg( msgs, "Got a BT_LTEP" );
1366            parseLtep( msgs, msglen, inbuf );
1367            break;
1368
1369        default:
1370            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1371            tr_peerIoDrain( msgs->io, inbuf, msglen );
1372            break;
1373    }
1374
1375    assert( msglen + 1 == msgs->incoming.length );
1376    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1377
1378    msgs->state = AWAITING_BT_LENGTH;
1379    return READ_NOW;
1380}
1381
1382static void
1383decrementDownloadedCount( tr_peermsgs * msgs,
1384                          uint32_t      byteCount )
1385{
1386    tr_torrent * tor = msgs->torrent;
1387
1388    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1389}
1390
1391static void
1392clientGotUnwantedBlock( tr_peermsgs *               msgs,
1393                        const struct peer_request * req )
1394{
1395    decrementDownloadedCount( msgs, req->length );
1396}
1397
1398static void
1399addPeerToBlamefield( tr_peermsgs * msgs,
1400                     uint32_t      index )
1401{
1402    if( !msgs->info->blame )
1403        msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1404    tr_bitfieldAdd( msgs->info->blame, index );
1405}
1406
1407/* returns 0 on success, or an errno on failure */
1408static int
1409clientGotBlock( tr_peermsgs *               msgs,
1410                const uint8_t *             data,
1411                const struct peer_request * req )
1412{
1413    int                    err;
1414    tr_torrent *           tor = msgs->torrent;
1415    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1416
1417    assert( msgs );
1418    assert( req );
1419
1420    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) )
1421    {
1422        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1423                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1424        return EMSGSIZE;
1425    }
1426
1427    /* save the block */
1428    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset,
1429            req->length );
1430
1431    /**
1432    *** Remove the block from our `we asked for this' list
1433    **/
1434
1435    if( !reqListRemove( &msgs->clientAskedFor, req ) )
1436    {
1437        clientGotUnwantedBlock( msgs, req );
1438        dbgmsg( msgs, "we didn't ask for this message..." );
1439        return 0;
1440    }
1441
1442    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1443            msgs->clientAskedFor.count );
1444
1445    /**
1446    *** Error checks
1447    **/
1448
1449    if( tr_cpBlockIsComplete( tor->completion, block ) )
1450    {
1451        dbgmsg( msgs, "we have this block already..." );
1452        clientGotUnwantedBlock( msgs, req );
1453        return 0;
1454    }
1455
1456    /**
1457    ***  Save the block
1458    **/
1459
1460    msgs->info->peerSentPieceDataAt = time( NULL );
1461    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1462        return err;
1463
1464    addPeerToBlamefield( msgs, req->index );
1465    fireGotBlock( msgs, req );
1466    return 0;
1467}
1468
1469static int peerPulse( void * vmsgs );
1470
1471static void
1472didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1473{
1474    tr_peermsgs * msgs = vmsgs;
1475    firePeerGotData( msgs, bytesWritten, wasPieceData );
1476    peerPulse( msgs );
1477}
1478
1479static ReadState
1480canRead( struct tr_iobuf * iobuf, void * vmsgs, size_t * piece )
1481{
1482    ReadState         ret;
1483    tr_peermsgs *     msgs = vmsgs;
1484    struct evbuffer * in = tr_iobuf_input( iobuf );
1485    const size_t      inlen = EVBUFFER_LENGTH( in );
1486
1487    if( !inlen )
1488    {
1489        ret = READ_LATER;
1490    }
1491    else if( msgs->state == AWAITING_BT_PIECE )
1492    {
1493        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1494    }
1495    else switch( msgs->state )
1496    {
1497        case AWAITING_BT_LENGTH:
1498            ret = readBtLength ( msgs, in, inlen ); break;
1499
1500        case AWAITING_BT_ID:
1501            ret = readBtId     ( msgs, in, inlen ); break;
1502
1503        case AWAITING_BT_MESSAGE:
1504            ret = readBtMessage( msgs, in, inlen ); break;
1505
1506        default:
1507            assert( 0 );
1508    }
1509
1510    /* log the raw data that was read */
1511    if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
1512        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1513
1514    return ret;
1515}
1516
1517/**
1518***
1519**/
1520
1521static int
1522ratePulse( void * vpeer )
1523{
1524    tr_peermsgs * peer = vpeer;
1525    const double rateToClient = tr_peerGetPieceSpeed( peer->info,
1526                                                      TR_PEER_TO_CLIENT );
1527    const int estimatedBlocksInNext30Seconds =
1528                  ( rateToClient * 30 * 1024 ) / peer->torrent->blockSize;
1529
1530    peer->minActiveRequests = 8;
1531    peer->maxActiveRequests = peer->minActiveRequests + estimatedBlocksInNext30Seconds;
1532    return TRUE;
1533}
1534
1535static int
1536popNextRequest( tr_peermsgs *         msgs,
1537                struct peer_request * setme )
1538{
1539    return reqListPop( &msgs->peerAskedFor, setme );
1540}
1541
1542static size_t
1543fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1544{
1545    size_t bytesWritten = 0;
1546    struct peer_request req;
1547    const int haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1548
1549    /**
1550    ***  Protocol messages
1551    **/
1552
1553    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1554    {
1555        dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1556        msgs->outMessagesBatchedAt = now;
1557    }
1558    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1559    {
1560        const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
1561        /* flush the protocol messages */
1562        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->io, len );
1563        tr_peerIoWriteBuf( msgs->io, msgs->outMessages, FALSE );
1564        msgs->clientSentAnythingAt = now;
1565        msgs->outMessagesBatchedAt = 0;
1566        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1567        bytesWritten +=  len;
1568    }
1569
1570    /**
1571    ***  Blocks
1572    **/
1573
1574    if( ( tr_peerIoGetWriteBufferSpace( msgs->io ) >= msgs->torrent->blockSize )
1575        && popNextRequest( msgs, &req )
1576        && requestIsValid( msgs, &req )
1577        && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) )
1578    {
1579        /* send a block */
1580        uint8_t * buf = tr_new( uint8_t, req.length );
1581        const int err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf );
1582        if( err ) {
1583            fireError( msgs, err );
1584            bytesWritten = 0;
1585            msgs = NULL;
1586        } else {
1587            tr_peerIo * io = msgs->io;
1588            struct evbuffer * out = evbuffer_new( );
1589            dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1590            tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1591            tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1592            tr_peerIoWriteUint32( io, out, req.index );
1593            tr_peerIoWriteUint32( io, out, req.offset );
1594            tr_peerIoWriteBytes ( io, out, buf, req.length );
1595            tr_peerIoWriteBuf( io, out, TRUE );
1596            bytesWritten += EVBUFFER_LENGTH( out );
1597            evbuffer_free( out );
1598            msgs->clientSentAnythingAt = now;
1599        }
1600        tr_free( buf );
1601    }
1602
1603    /**
1604    ***  Keepalive
1605    **/
1606
1607    if( ( msgs != NULL )
1608        && ( msgs->clientSentAnythingAt != 0 )
1609        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1610    {
1611        dbgmsg( msgs, "sending a keepalive message" );
1612        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1613        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1614    }
1615
1616    return bytesWritten;
1617}
1618
1619static int
1620peerPulse( void * vmsgs )
1621{
1622    tr_peermsgs * msgs = vmsgs;
1623    const time_t  now = time( NULL );
1624
1625    ratePulse( msgs );
1626
1627    pumpRequestQueue( msgs, now );
1628    expireOldRequests( msgs, now );
1629
1630    for( ;; )
1631        if( fillOutputBuffer( msgs, now ) < 1 )
1632            break;
1633
1634    return TRUE; /* loop forever */
1635}
1636
1637void
1638tr_peerMsgsPulse( tr_peermsgs * msgs )
1639{
1640    if( msgs != NULL )
1641        peerPulse( msgs );
1642}
1643
1644static void
1645gotError( struct tr_iobuf  * iobuf UNUSED,
1646          short              what,
1647          void             * vmsgs )
1648{
1649    if( what & EVBUFFER_TIMEOUT )
1650        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1651    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1652        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1653               what, errno, tr_strerror( errno ) );
1654    fireError( vmsgs, ENOTCONN );
1655}
1656
1657static void
1658sendBitfield( tr_peermsgs * msgs )
1659{
1660    struct evbuffer * out = msgs->outMessages;
1661    tr_bitfield *     field;
1662    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
1663    size_t            i;
1664    size_t            lazyCount = 0;
1665
1666    field = tr_bitfieldDup( tr_cpPieceBitfield( msgs->torrent->completion ) );
1667
1668    if( tr_sessionIsLazyBitfieldEnabled( msgs->session ) )
1669    {
1670        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
1671            speed over a truly random sample -- let's limit the pool size to
1672            the first 1000 pieces so large torrents don't bog things down */
1673        size_t poolSize;
1674        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
1675        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
1676
1677        /* build the pool */
1678        for( i=poolSize=0; i<maxPoolSize; ++i )
1679            if( tr_bitfieldHas( field, i ) )
1680                pool[poolSize++] = i;
1681
1682        /* pull random piece indices from the pool */
1683        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
1684        {
1685            const int pos = tr_cryptoWeakRandInt( poolSize );
1686            const tr_piece_index_t piece = pool[pos];
1687            tr_bitfieldRem( field, piece );
1688            lazyPieces[lazyCount++] = piece;
1689            pool[pos] = pool[--poolSize];
1690        }
1691
1692        /* cleanup */
1693        tr_free( pool );
1694    }
1695
1696    tr_peerIoWriteUint32( msgs->io, out,
1697                          sizeof( uint8_t ) + field->byteCount );
1698    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1699    tr_peerIoWriteBytes ( msgs->io, out, field->bits, field->byteCount );
1700    dbgmsg( msgs, "sending bitfield... outMessage size is now %d",
1701           (int)EVBUFFER_LENGTH( out ) );
1702    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1703
1704    for( i = 0; i < lazyCount; ++i )
1705        protocolSendHave( msgs, lazyPieces[i] );
1706
1707    tr_bitfieldFree( field );
1708}
1709
1710/**
1711***
1712**/
1713
1714/* some peers give us error messages if we send
1715   more than this many peers in a single pex message
1716   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1717#define MAX_PEX_ADDED 50
1718#define MAX_PEX_DROPPED 50
1719
1720typedef struct
1721{
1722    tr_pex *  added;
1723    tr_pex *  dropped;
1724    tr_pex *  elements;
1725    int       addedCount;
1726    int       droppedCount;
1727    int       elementCount;
1728}
1729PexDiffs;
1730
1731static void
1732pexAddedCb( void * vpex,
1733            void * userData )
1734{
1735    PexDiffs * diffs = userData;
1736    tr_pex *   pex = vpex;
1737
1738    if( diffs->addedCount < MAX_PEX_ADDED )
1739    {
1740        diffs->added[diffs->addedCount++] = *pex;
1741        diffs->elements[diffs->elementCount++] = *pex;
1742    }
1743}
1744
1745static void
1746pexDroppedCb( void * vpex,
1747              void * userData )
1748{
1749    PexDiffs * diffs = userData;
1750    tr_pex *   pex = vpex;
1751
1752    if( diffs->droppedCount < MAX_PEX_DROPPED )
1753    {
1754        diffs->dropped[diffs->droppedCount++] = *pex;
1755    }
1756}
1757
1758static void
1759pexElementCb( void * vpex,
1760              void * userData )
1761{
1762    PexDiffs * diffs = userData;
1763    tr_pex *   pex = vpex;
1764
1765    diffs->elements[diffs->elementCount++] = *pex;
1766}
1767
1768/* TODO: ipv6 pex */
1769static void
1770sendPex( tr_peermsgs * msgs )
1771{
1772    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
1773    {
1774        PexDiffs diffs;
1775        tr_pex * newPex = NULL;
1776        const int newCount = tr_peerMgrGetPeers( msgs->session->peerMgr,
1777                                                 msgs->torrent->info.hash,
1778                                                 &newPex );
1779
1780        /* build the diffs */
1781        diffs.added = tr_new( tr_pex, newCount );
1782        diffs.addedCount = 0;
1783        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1784        diffs.droppedCount = 0;
1785        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1786        diffs.elementCount = 0;
1787        tr_set_compare( msgs->pex, msgs->pexCount,
1788                        newPex, newCount,
1789                        tr_pexCompare, sizeof( tr_pex ),
1790                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
1791        dbgmsg(
1792            msgs,
1793            "pex: old peer count %d, new peer count %d, added %d, removed %d",
1794            msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
1795
1796        if( diffs.addedCount || diffs.droppedCount )
1797        {
1798            int  i;
1799            tr_benc val;
1800            char * benc;
1801            int bencLen;
1802            uint8_t * tmp, *walk;
1803            struct evbuffer * out = msgs->outMessages;
1804
1805            /* update peer */
1806            tr_free( msgs->pex );
1807            msgs->pex = diffs.elements;
1808            msgs->pexCount = diffs.elementCount;
1809
1810            /* build the pex payload */
1811            tr_bencInitDict( &val, 3 );
1812
1813            /* "added" */
1814            tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1815            for( i = 0; i < diffs.addedCount; ++i ) {
1816                memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
1817                memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1818            }
1819            assert( ( walk - tmp ) == diffs.addedCount * 6 );
1820            tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
1821            tr_free( tmp );
1822
1823            /* "added.f" */
1824            tmp = walk = tr_new( uint8_t, diffs.addedCount );
1825            for( i = 0; i < diffs.addedCount; ++i )
1826                *walk++ = diffs.added[i].flags;
1827            assert( ( walk - tmp ) == diffs.addedCount );
1828            tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
1829            tr_free( tmp );
1830
1831            /* "dropped" */
1832            tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
1833            for( i = 0; i < diffs.droppedCount; ++i ) {
1834                memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
1835                memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
1836            }
1837            assert( ( walk - tmp ) == diffs.droppedCount * 6 );
1838            tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
1839            tr_free( tmp );
1840
1841            /* write the pex message */
1842            benc = tr_bencSave( &val, &bencLen );
1843            tr_peerIoWriteUint32( msgs->io, out, 2 * sizeof( uint8_t ) + bencLen );
1844            tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
1845            tr_peerIoWriteUint8 ( msgs->io, out, msgs->ut_pex_id );
1846            tr_peerIoWriteBytes ( msgs->io, out, benc, bencLen );
1847            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1848            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
1849
1850            tr_free( benc );
1851            tr_bencFree( &val );
1852        }
1853
1854        /* cleanup */
1855        tr_free( diffs.added );
1856        tr_free( diffs.dropped );
1857        tr_free( newPex );
1858
1859        msgs->clientSentPexAt = time( NULL );
1860    }
1861}
1862
1863static int
1864pexPulse( void * vpeer )
1865{
1866    sendPex( vpeer );
1867    return TRUE;
1868}
1869
1870/**
1871***
1872**/
1873
1874tr_peermsgs*
1875tr_peerMsgsNew( struct tr_torrent * torrent,
1876                struct tr_peer *    info,
1877                tr_delivery_func    func,
1878                void *              userData,
1879                tr_publisher_tag *  setme )
1880{
1881    tr_peermsgs * m;
1882
1883    assert( info );
1884    assert( info->io );
1885
1886    m = tr_new0( tr_peermsgs, 1 );
1887    m->publisher = tr_publisherNew( );
1888    m->info = info;
1889    m->session = torrent->session;
1890    m->torrent = torrent;
1891    m->io = info->io;
1892    m->info->clientIsChoked = 1;
1893    m->info->peerIsChoked = 1;
1894    m->info->clientIsInterested = 0;
1895    m->info->peerIsInterested = 0;
1896    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
1897    m->state = AWAITING_BT_LENGTH;
1898    m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL );
1899    m->outMessages = evbuffer_new( );
1900    m->outMessagesBatchedAt = 0;
1901    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1902    m->incoming.block = evbuffer_new( );
1903    m->peerAllowedPieces = NULL;
1904    m->peerAskedFor = REQUEST_LIST_INIT;
1905    m->clientAskedFor = REQUEST_LIST_INIT;
1906    m->clientWillAskFor = REQUEST_LIST_INIT;
1907    *setme = tr_publisherSubscribe( m->publisher, func, userData );
1908
1909    if( tr_peerIoSupportsLTEP( m->io ) )
1910        sendLtepHandshake( m );
1911
1912    sendBitfield( m );
1913
1914    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of
1915                                             inactivity */
1916    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
1917    ratePulse( m );
1918
1919    return m;
1920}
1921
1922void
1923tr_peerMsgsFree( tr_peermsgs* msgs )
1924{
1925    if( msgs )
1926    {
1927        tr_timerFree( &msgs->pexTimer );
1928        tr_publisherFree( &msgs->publisher );
1929        reqListClear( &msgs->clientWillAskFor );
1930        reqListClear( &msgs->clientAskedFor );
1931
1932        tr_bitfieldFree( msgs->peerAllowedPieces );
1933        evbuffer_free( msgs->incoming.block );
1934        evbuffer_free( msgs->outMessages );
1935        tr_free( msgs->pex );
1936
1937        memset( msgs, ~0, sizeof( tr_peermsgs ) );
1938        tr_free( msgs );
1939    }
1940}
1941
1942void
1943tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
1944                        tr_publisher_tag tag )
1945{
1946    tr_publisherUnsubscribe( peer->publisher, tag );
1947}
1948
Note: See TracBrowser for help on using the repository browser.