source: trunk/libtransmission/peer-msgs.c @ 7257

Last change on this file since 7257 was 7257, checked in by charles, 12 years ago

(libt) #1554: EPROTO (in peer-msgs.c) not defined on Windows

  • Property svn:keywords set to Date Rev Author Id
File size: 61.8 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@rebelbase.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 7257 2008-12-03 13:21:41Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <event.h>
21
22#include "transmission.h"
23#include "bencode.h"
24#include "completion.h"
25#include "crypto.h"
26#include "inout.h"
27#include "iobuf.h"
28#ifdef WIN32
29#include "net.h" /* for ECONN */
30#endif
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-mgr-private.h"
34#include "peer-msgs.h"
35#include "ratecontrol.h"
36#include "stats.h"
37#include "torrent.h"
38#include "trevent.h"
39#include "utils.h"
40
41/**
42***
43**/
44
45enum
46{
47    BT_CHOKE                = 0,
48    BT_UNCHOKE              = 1,
49    BT_INTERESTED           = 2,
50    BT_NOT_INTERESTED       = 3,
51    BT_HAVE                 = 4,
52    BT_BITFIELD             = 5,
53    BT_REQUEST              = 6,
54    BT_PIECE                = 7,
55    BT_CANCEL               = 8,
56    BT_PORT                 = 9,
57
58    BT_FEXT_SUGGEST         = 13,
59    BT_FEXT_HAVE_ALL        = 14,
60    BT_FEXT_HAVE_NONE       = 15,
61    BT_FEXT_REJECT          = 16,
62    BT_FEXT_ALLOWED_FAST    = 17,
63
64    BT_LTEP                 = 20,
65
66    LTEP_HANDSHAKE          = 0,
67
68    TR_LTEP_PEX             = 1,
69
70
71
72    MIN_CHOKE_PERIOD_SEC    = ( 10 ),
73
74    /* idle seconds before we send a keepalive */
75    KEEPALIVE_INTERVAL_SECS = 100,
76
77    PEX_INTERVAL            = ( 90 * 1000 ), /* msec between sendPex() calls */
78    PEER_PULSE_INTERVAL     = ( 250 ),       /* msec between peerPulse() calls
79                                               */
80
81    MAX_QUEUE_SIZE          = ( 100 ),
82
83    /* how long an unsent request can stay queued before it's returned
84       back to the peer-mgr's pool of requests */
85    QUEUED_REQUEST_TTL_SECS = 20,
86
87    /* how long a sent request can stay queued before it's returned
88       back to the peer-mgr's pool of requests */
89    SENT_REQUEST_TTL_SECS = 240,
90
91    /* used in lowering the outMessages queue period */
92    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
93    HIGH_PRIORITY_INTERVAL_SECS = 2,
94    LOW_PRIORITY_INTERVAL_SECS = 20,
95
96    /* number of pieces to remove from the bitfield when
97     * lazy bitfields are turned on */
98    LAZY_PIECE_COUNT = 26,
99
100    /* number of pieces we'll allow in our fast set */
101    MAX_FAST_SET_SIZE = 3
102};
103
104/**
105***  REQUEST MANAGEMENT
106**/
107
108enum
109{
110    AWAITING_BT_LENGTH,
111    AWAITING_BT_ID,
112    AWAITING_BT_MESSAGE,
113    AWAITING_BT_PIECE
114};
115
116struct peer_request
117{
118    uint32_t    index;
119    uint32_t    offset;
120    uint32_t    length;
121    time_t      time_requested;
122};
123
124static int
125compareRequest( const void * va,
126                const void * vb )
127{
128    const struct peer_request * a = va;
129    const struct peer_request * b = vb;
130
131    if( a->index != b->index )
132        return a->index < b->index ? -1 : 1;
133
134    if( a->offset != b->offset )
135        return a->offset < b->offset ? -1 : 1;
136
137    if( a->length != b->length )
138        return a->length < b->length ? -1 : 1;
139
140    return 0;
141}
142
143struct request_list
144{
145    uint16_t               count;
146    uint16_t               max;
147    struct peer_request *  requests;
148};
149
150static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
151
152static void
153reqListReserve( struct request_list * list,
154                uint16_t              max )
155{
156    if( list->max < max )
157    {
158        list->max = max;
159        list->requests = tr_renew( struct peer_request,
160                                   list->requests,
161                                   list->max );
162    }
163}
164
165static void
166reqListClear( struct request_list * list )
167{
168    tr_free( list->requests );
169    *list = REQUEST_LIST_INIT;
170}
171
172static void
173reqListCopy( struct request_list *       dest,
174             const struct request_list * src )
175{
176    dest->count = dest->max = src->count;
177    dest->requests =
178        tr_memdup( src->requests, dest->count * sizeof( struct peer_request ) );
179}
180
181static void
182reqListRemoveOne( struct request_list * list,
183                  int                   i )
184{
185    assert( 0 <= i && i < list->count );
186
187    memmove( &list->requests[i],
188            &list->requests[i + 1],
189            sizeof( struct peer_request ) * ( --list->count - i ) );
190}
191
192static void
193reqListAppend( struct request_list *       list,
194               const struct peer_request * req )
195{
196    if( ++list->count >= list->max )
197        reqListReserve( list, list->max + 8 );
198
199    list->requests[list->count - 1] = *req;
200}
201
202static int
203reqListPop( struct request_list * list,
204            struct peer_request * setme )
205{
206    int success;
207
208    if( !list->count )
209        success = FALSE;
210    else {
211        *setme = list->requests[0];
212        reqListRemoveOne( list, 0 );
213        success = TRUE;
214    }
215
216    return success;
217}
218
219static int
220reqListFind( struct request_list *       list,
221             const struct peer_request * key )
222{
223    uint16_t i;
224
225    for( i = 0; i < list->count; ++i )
226        if( !compareRequest( key, list->requests + i ) )
227            return i;
228
229    return -1;
230}
231
232static int
233reqListRemove( struct request_list *       list,
234               const struct peer_request * key )
235{
236    int success;
237    const int i = reqListFind( list, key );
238
239    if( i < 0 )
240        success = FALSE;
241    else {
242        reqListRemoveOne( list, i );
243        success = TRUE;
244    }
245
246    return success;
247}
248
249/**
250***
251**/
252
253/* this is raw, unchanged data from the peer regarding
254 * the current message that it's sending us. */
255struct tr_incoming
256{
257    uint8_t                id;
258    uint32_t               length; /* includes the +1 for id length */
259    struct peer_request    blockReq; /* metadata for incoming blocks */
260    struct evbuffer *      block; /* piece data for incoming blocks */
261};
262
263struct tr_peermsgs
264{
265    tr_bool         peerSentBitfield;
266    tr_bool         peerSupportsPex;
267    tr_bool         clientSentLtepHandshake;
268    tr_bool         peerSentLtepHandshake;
269    tr_bool         haveFastSet;
270
271    uint8_t         state;
272    uint8_t         ut_pex_id;
273    uint16_t        pexCount;
274    uint16_t        minActiveRequests;
275    uint16_t        maxActiveRequests;
276
277    size_t                 fastsetSize;
278    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
279
280    /* how long the outMessages batch should be allowed to grow before
281     * it's flushed -- some messages (like requests >:) should be sent
282     * very quickly; others aren't as urgent. */
283    int                    outMessagesBatchPeriod;
284
285    tr_peer *              info;
286
287    tr_session *           session;
288    tr_torrent *           torrent;
289    tr_peerIo *            io;
290
291    tr_publisher_t *       publisher;
292
293    struct evbuffer *      outMessages; /* all the non-piece messages */
294
295    struct request_list    peerAskedFor;
296    struct request_list    clientAskedFor;
297    struct request_list    clientWillAskFor;
298
299    tr_timer *             pexTimer;
300
301    time_t                 clientSentPexAt;
302    time_t                 clientSentAnythingAt;
303
304    /* when we started batching the outMessages */
305    time_t                outMessagesBatchedAt;
306
307    tr_bitfield *         peerAllowedPieces;
308
309    struct tr_incoming    incoming;
310
311    tr_pex *              pex;
312};
313
314/**
315***
316**/
317
318static void
319myDebug( const char *               file,
320         int                        line,
321         const struct tr_peermsgs * msgs,
322         const char *               fmt,
323         ... )
324{
325    FILE * fp = tr_getLog( );
326
327    if( fp )
328    {
329        va_list           args;
330        char              timestr[64];
331        struct evbuffer * buf = evbuffer_new( );
332        char *            base = tr_basename( file );
333
334        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
335                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
336                             msgs->torrent->info.name,
337                             tr_peerIoGetAddrStr( msgs->io ),
338                             msgs->info->client );
339        va_start( args, fmt );
340        evbuffer_add_vprintf( buf, fmt, args );
341        va_end( args );
342        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
343        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
344
345        tr_free( base );
346        evbuffer_free( buf );
347    }
348}
349
350#define dbgmsg( msgs, ... ) \
351    do { \
352        if( tr_deepLoggingIsActive( ) ) \
353            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
354    } while( 0 )
355
356/**
357***
358**/
359
360static void
361pokeBatchPeriod( tr_peermsgs * msgs,
362                 int           interval )
363{
364    if( msgs->outMessagesBatchPeriod > interval )
365    {
366        msgs->outMessagesBatchPeriod = interval;
367        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
368    }
369}
370
371static void
372dbgOutMessageLen( tr_peermsgs * msgs )
373{
374    dbgmsg( msgs, "outMessage size is now %zu", EVBUFFER_LENGTH( msgs->outMessages ) );
375}
376
377static void
378protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
379{
380    tr_peerIo       * io  = msgs->io;
381    struct evbuffer * out = msgs->outMessages;
382
383    assert( tr_peerIoSupportsFEXT( msgs->io ) );
384
385    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
386    tr_peerIoWriteUint8 ( io, out, BT_FEXT_REJECT );
387    tr_peerIoWriteUint32( io, out, req->index );
388    tr_peerIoWriteUint32( io, out, req->offset );
389    tr_peerIoWriteUint32( io, out, req->length );
390
391    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
392    dbgOutMessageLen( msgs );
393}
394
395static void
396protocolSendRequest( tr_peermsgs *               msgs,
397                     const struct peer_request * req )
398{
399    tr_peerIo       * io  = msgs->io;
400    struct evbuffer * out = msgs->outMessages;
401
402    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
403    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
404    tr_peerIoWriteUint32( io, out, req->index );
405    tr_peerIoWriteUint32( io, out, req->offset );
406    tr_peerIoWriteUint32( io, out, req->length );
407
408    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
409    dbgOutMessageLen( msgs );
410    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
411}
412
413static void
414protocolSendCancel( tr_peermsgs *               msgs,
415                    const struct peer_request * req )
416{
417    tr_peerIo       * io  = msgs->io;
418    struct evbuffer * out = msgs->outMessages;
419
420    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
421    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
422    tr_peerIoWriteUint32( io, out, req->index );
423    tr_peerIoWriteUint32( io, out, req->offset );
424    tr_peerIoWriteUint32( io, out, req->length );
425
426    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
427    dbgOutMessageLen( msgs );
428    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
429}
430
431static void
432protocolSendHave( tr_peermsgs * msgs,
433                  uint32_t      index )
434{
435    tr_peerIo       * io  = msgs->io;
436    struct evbuffer * out = msgs->outMessages;
437
438    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
439    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
440    tr_peerIoWriteUint32( io, out, index );
441
442    dbgmsg( msgs, "sending Have %u...", index );
443    dbgOutMessageLen( msgs );
444    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
445}
446
447#if 0
448static void
449protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
450{
451    tr_peerIo       * io  = msgs->io;
452    struct evbuffer * out = msgs->outMessages;
453
454    assert( tr_peerIoSupportsFEXT( msgs->io ) );
455
456    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
457    tr_peerIoWriteUint8 ( io, out, BT_FEXT_ALLOWED_FAST );
458    tr_peerIoWriteUint32( io, out, pieceIndex );
459
460    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
461    dbgOutMessageLen( msgs );
462}
463#endif
464
465static void
466protocolSendChoke( tr_peermsgs * msgs,
467                   int           choke )
468{
469    tr_peerIo       * io  = msgs->io;
470    struct evbuffer * out = msgs->outMessages;
471
472    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
473    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
474
475    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
476    dbgOutMessageLen( msgs );
477    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
478}
479
480static void
481protocolSendHaveAll( tr_peermsgs * msgs )
482{
483    tr_peerIo       * io  = msgs->io;
484    struct evbuffer * out = msgs->outMessages;
485
486    assert( tr_peerIoSupportsFEXT( msgs->io ) );
487
488    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
489    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_ALL );
490
491    dbgmsg( msgs, "sending HAVE_ALL..." );
492    dbgOutMessageLen( msgs );
493    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
494}
495
496static void
497protocolSendHaveNone( tr_peermsgs * msgs )
498{
499    tr_peerIo       * io  = msgs->io;
500    struct evbuffer * out = msgs->outMessages;
501
502    assert( tr_peerIoSupportsFEXT( msgs->io ) );
503
504    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
505    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_NONE );
506
507    dbgmsg( msgs, "sending HAVE_NONE..." );
508    dbgOutMessageLen( msgs );
509    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
510}
511
512/**
513***  EVENTS
514**/
515
516static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0, 0 };
517
518static void
519publish( tr_peermsgs *   msgs,
520         tr_peer_event * e )
521{
522    assert( msgs->info );
523    assert( msgs->info->msgs == msgs );
524
525    tr_publisherPublish( msgs->publisher, msgs->info, e );
526}
527
528static void
529fireError( tr_peermsgs * msgs,
530           int           err )
531{
532    tr_peer_event e = blankEvent;
533
534    e.eventType = TR_PEER_ERROR;
535    e.err = err;
536    publish( msgs, &e );
537}
538
539static void
540fireUploadOnly( tr_peermsgs * msgs, tr_bool uploadOnly )
541{
542    tr_peer_event e = blankEvent;
543    e.eventType = TR_PEER_UPLOAD_ONLY;
544    e.uploadOnly = uploadOnly;
545    publish( msgs, &e );
546}
547
548static void
549fireNeedReq( tr_peermsgs * msgs )
550{
551    tr_peer_event e = blankEvent;
552    e.eventType = TR_PEER_NEED_REQ;
553    publish( msgs, &e );
554}
555
556static void
557firePeerProgress( tr_peermsgs * msgs )
558{
559    tr_peer_event e = blankEvent;
560
561    e.eventType = TR_PEER_PEER_PROGRESS;
562    e.progress = msgs->info->progress;
563    publish( msgs, &e );
564}
565
566static void
567fireGotBlock( tr_peermsgs *               msgs,
568              const struct peer_request * req )
569{
570    tr_peer_event e = blankEvent;
571
572    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
573    e.pieceIndex = req->index;
574    e.offset = req->offset;
575    e.length = req->length;
576    publish( msgs, &e );
577}
578
579static void
580fireClientGotData( tr_peermsgs * msgs,
581                   uint32_t      length,
582                   int           wasPieceData )
583{
584    tr_peer_event e = blankEvent;
585
586    e.length = length;
587    e.eventType = TR_PEER_CLIENT_GOT_DATA;
588    e.wasPieceData = wasPieceData;
589    publish( msgs, &e );
590}
591
592static void
593fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
594{
595    tr_peer_event e = blankEvent;
596    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
597    e.pieceIndex = pieceIndex;
598    publish( msgs, &e );
599}
600
601static void
602fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
603{
604    tr_peer_event e = blankEvent;
605    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
606    e.pieceIndex = pieceIndex;
607    publish( msgs, &e );
608}
609
610static void
611firePeerGotData( tr_peermsgs  * msgs,
612                 uint32_t       length,
613                 int            wasPieceData )
614{
615    tr_peer_event e = blankEvent;
616
617    e.length = length;
618    e.eventType = TR_PEER_PEER_GOT_DATA;
619    e.wasPieceData = wasPieceData;
620
621    publish( msgs, &e );
622}
623
624static void
625fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
626{
627    tr_peer_event e = blankEvent;
628    e.eventType = TR_PEER_CANCEL;
629    e.pieceIndex = req->index;
630    e.offset = req->offset;
631    e.length = req->length;
632    publish( msgs, &e );
633}
634
635/**
636***  ALLOWED FAST SET
637***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
638**/
639
640size_t
641tr_generateAllowedSet( tr_piece_index_t * setmePieces,
642                       size_t             desiredSetSize,
643                       size_t             pieceCount,
644                       const uint8_t    * infohash,
645                       const tr_address * addr )
646{
647    size_t setSize = 0;
648
649    assert( setmePieces );
650    assert( desiredSetSize <= pieceCount );
651    assert( desiredSetSize );
652    assert( pieceCount );
653    assert( infohash );
654    assert( addr );
655
656    if( addr->type == TR_AF_INET )
657    {
658        uint8_t w[SHA_DIGEST_LENGTH + 4];
659        uint8_t x[SHA_DIGEST_LENGTH];
660
661        *(uint32_t*)w = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
662        memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );                /* (2) */
663        tr_sha1( x, w, sizeof( w ), NULL );                          /* (3) */
664
665        while( setSize<desiredSetSize )
666        {
667            int i;
668            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
669            {
670                size_t k;
671                uint32_t j = i * 4;                                  /* (5) */
672                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
673                uint32_t index = y % pieceCount;                     /* (7) */
674
675                for( k=0; k<setSize; ++k )                           /* (8) */
676                    if( setmePieces[k] == index )
677                        break;
678
679                if( k == setSize )
680                    setmePieces[setSize++] = index;                  /* (9) */
681            }
682
683            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
684        }
685    }
686
687    return setSize;
688}
689
690static void
691updateFastSet( tr_peermsgs * msgs UNUSED )
692{
693#if 0
694    const int fext = tr_peerIoSupportsFEXT( msgs->io );
695    const int peerIsNeedy = msgs->info->progress < 0.10;
696
697    if( fext && peerIsNeedy && !msgs->haveFastSet )
698    {
699        size_t i;
700        const struct tr_address * addr = tr_peerIoGetAddress( msgs->io, NULL );
701        const tr_info * inf = &msgs->torrent->info;
702        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
703
704        /* build the fast set */
705        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
706        msgs->haveFastSet = 1;
707
708        /* send it to the peer */
709        for( i=0; i<msgs->fastsetSize; ++i )
710            protocolSendAllowedFast( msgs, msgs->fastset[i] );
711    }
712#endif
713}
714
715/**
716***  INTEREST
717**/
718
719static int
720isPieceInteresting( const tr_peermsgs * peer,
721                    tr_piece_index_t    piece )
722{
723    const tr_torrent * torrent = peer->torrent;
724
725    return ( !torrent->info.pieces[piece].dnd )                 /* we want it */
726           && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have
727                                                                        */
728           && ( tr_bitfieldHas( peer->info->have, piece ) );   /* peer has it */
729}
730
731/* "interested" means we'll ask for piece data if they unchoke us */
732static int
733isPeerInteresting( const tr_peermsgs * msgs )
734{
735    tr_piece_index_t    i;
736    const tr_torrent *  torrent;
737    const tr_bitfield * bitfield;
738    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
739
740    if( clientIsSeed )
741        return FALSE;
742
743    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
744        return FALSE;
745
746    torrent = msgs->torrent;
747    bitfield = tr_cpPieceBitfield( torrent->completion );
748
749    if( !msgs->info->have )
750        return TRUE;
751
752    assert( bitfield->byteCount == msgs->info->have->byteCount );
753
754    for( i = 0; i < torrent->info.pieceCount; ++i )
755        if( isPieceInteresting( msgs, i ) )
756            return TRUE;
757
758    return FALSE;
759}
760
761static void
762sendInterest( tr_peermsgs * msgs,
763              int           weAreInterested )
764{
765    struct evbuffer * out = msgs->outMessages;
766
767    assert( msgs );
768    assert( weAreInterested == 0 || weAreInterested == 1 );
769
770    msgs->info->clientIsInterested = weAreInterested;
771    dbgmsg( msgs, "Sending %s",
772            weAreInterested ? "Interested" : "Not Interested" );
773    tr_peerIoWriteUint32( msgs->io, out, sizeof( uint8_t ) );
774    tr_peerIoWriteUint8 (
775        msgs->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
776    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
777    dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH( out ) );
778}
779
780static void
781updateInterest( tr_peermsgs * msgs )
782{
783    const int i = isPeerInteresting( msgs );
784
785    if( i != msgs->info->clientIsInterested )
786        sendInterest( msgs, i );
787    if( i )
788        fireNeedReq( msgs );
789}
790
791static int
792popNextRequest( tr_peermsgs *         msgs,
793                struct peer_request * setme )
794{
795    return reqListPop( &msgs->peerAskedFor, setme );
796}
797
798static void
799cancelAllRequestsToClient( tr_peermsgs * msgs )
800{
801    struct peer_request req;
802    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->io );
803
804    while( popNextRequest( msgs, &req ) )
805        if( mustSendCancel )
806            protocolSendReject( msgs, &req );
807}
808
809void
810tr_peerMsgsSetChoke( tr_peermsgs * msgs,
811                     int           choke )
812{
813    const time_t now = time( NULL );
814    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
815
816    assert( msgs );
817    assert( msgs->info );
818    assert( choke == 0 || choke == 1 );
819
820    if( msgs->info->chokeChangedAt > fibrillationTime )
821    {
822        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation",
823                choke );
824    }
825    else if( msgs->info->peerIsChoked != choke )
826    {
827        msgs->info->peerIsChoked = choke;
828        if( choke )
829            cancelAllRequestsToClient( msgs );
830        protocolSendChoke( msgs, choke );
831        msgs->info->chokeChangedAt = now;
832    }
833}
834
835/**
836***
837**/
838
839void
840tr_peerMsgsHave( tr_peermsgs * msgs,
841                 uint32_t      index )
842{
843    protocolSendHave( msgs, index );
844
845    /* since we have more pieces now, we might not be interested in this peer */
846    updateInterest( msgs );
847}
848
849/**
850***
851**/
852
853static int
854reqIsValid( const tr_peermsgs * peer,
855            uint32_t            index,
856            uint32_t            offset,
857            uint32_t            length )
858{
859    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
860}
861
862static int
863requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
864{
865    return reqIsValid( msgs, req->index, req->offset, req->length );
866}
867
868static void
869expireOldRequests( tr_peermsgs * msgs, const time_t now  )
870{
871    int                 i;
872    time_t              oldestAllowed;
873    struct request_list tmp = REQUEST_LIST_INIT;
874    const int fext = tr_peerIoSupportsFEXT( msgs->io );
875
876    /* cancel requests that have been queued for too long */
877    oldestAllowed = now - QUEUED_REQUEST_TTL_SECS;
878    reqListCopy( &tmp, &msgs->clientWillAskFor );
879    for( i = 0; i < tmp.count; ++i ) {
880        const struct peer_request * req = &tmp.requests[i];
881        if( req->time_requested < oldestAllowed )
882            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
883    }
884    reqListClear( &tmp );
885
886    /* if the peer doesn't support "Reject Request",
887     * cancel requests that were sent too long ago. */
888    if( !fext ) {
889        oldestAllowed = now - SENT_REQUEST_TTL_SECS;
890        reqListCopy( &tmp, &msgs->clientAskedFor );
891        for( i=0; i<tmp.count; ++i ) {
892            const struct peer_request * req = &tmp.requests[i];
893            if( req->time_requested < oldestAllowed )
894                tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
895        }
896        reqListClear( &tmp );
897    }
898}
899
900static void
901pumpRequestQueue( tr_peermsgs * msgs, const time_t now )
902{
903    const int           max = msgs->maxActiveRequests;
904    const int           min = msgs->minActiveRequests;
905    int                 sent = 0;
906    int                 count = msgs->clientAskedFor.count;
907    struct peer_request req;
908
909    if( count > min )
910        return;
911    if( msgs->info->clientIsChoked )
912        return;
913    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
914        return;
915
916    while( ( count < max ) && reqListPop( &msgs->clientWillAskFor, &req ) )
917    {
918        const tr_block_index_t block =
919            _tr_block( msgs->torrent, req.index, req.offset );
920
921        assert( requestIsValid( msgs, &req ) );
922        assert( tr_bitfieldHas( msgs->info->have, req.index ) );
923
924        /* don't ask for it if we've already got it... this block may have
925         * come in from a different peer after we cancelled a request for it */
926        if( !tr_cpBlockIsComplete( msgs->torrent->completion, block ) )
927        {
928            protocolSendRequest( msgs, &req );
929            req.time_requested = now;
930            reqListAppend( &msgs->clientAskedFor, &req );
931
932            ++count;
933            ++sent;
934        }
935    }
936
937    if( sent )
938        dbgmsg( msgs,
939                "pump sent %d requests, now have %d active and %d queued",
940                sent,
941                msgs->clientAskedFor.count,
942                msgs->clientWillAskFor.count );
943
944    if( count < max )
945        fireNeedReq( msgs );
946}
947
948static int
949requestQueueIsFull( const tr_peermsgs * msgs )
950{
951    const int req_max = msgs->maxActiveRequests;
952    return msgs->clientWillAskFor.count >= req_max;
953}
954
955tr_addreq_t
956tr_peerMsgsAddRequest( tr_peermsgs *    msgs,
957                       uint32_t         index,
958                       uint32_t         offset,
959                       uint32_t         length,
960                       int              doForce )
961{
962    struct peer_request req;
963
964    assert( msgs );
965    assert( msgs->torrent );
966    assert( reqIsValid( msgs, index, offset, length ) );
967
968    /**
969    ***  Reasons to decline the request
970    **/
971
972    /* don't send requests to choked clients */
973    if( !doForce && msgs->info->clientIsChoked ) {
974        dbgmsg( msgs, "declining request because they're choking us" );
975        return TR_ADDREQ_CLIENT_CHOKED;
976    }
977
978    /* peer doesn't have this piece */
979    if( !doForce && !tr_bitfieldHas( msgs->info->have, index ) )
980        return TR_ADDREQ_MISSING;
981
982    /* peer's queue is full */
983    if( !doForce && requestQueueIsFull( msgs ) ) {
984        dbgmsg( msgs, "declining request because we're full" );
985        return TR_ADDREQ_FULL;
986    }
987
988    /* have we already asked for this piece? */
989    req.index = index;
990    req.offset = offset;
991    req.length = length;
992    if( !doForce ) {
993        if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
994            dbgmsg( msgs, "declining because it's a duplicate" );
995            return TR_ADDREQ_DUPLICATE;
996        }
997        if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
998            dbgmsg( msgs, "declining because it's a duplicate" );
999            return TR_ADDREQ_DUPLICATE;
1000        }
1001    }
1002
1003    /**
1004    ***  Accept this request
1005    **/
1006
1007    dbgmsg( msgs, "adding req for %"PRIu32":%"PRIu32"->%"PRIu32" to our `will request' list",
1008            index, offset, length );
1009    req.time_requested = time( NULL );
1010    reqListAppend( &msgs->clientWillAskFor, &req );
1011    return TR_ADDREQ_OK;
1012}
1013
1014static void
1015cancelAllRequestsToPeer( tr_peermsgs * msgs )
1016{
1017    int                 i;
1018    struct request_list a = msgs->clientWillAskFor;
1019    struct request_list b = msgs->clientAskedFor;
1020    dbgmsg( msgs, "cancelling all requests to peer" );
1021
1022    msgs->clientAskedFor = REQUEST_LIST_INIT;
1023    msgs->clientWillAskFor = REQUEST_LIST_INIT;
1024
1025    for( i=0; i<a.count; ++i )
1026        fireCancelledReq( msgs, &a.requests[i] );
1027
1028    for( i = 0; i < b.count; ++i ) {
1029        fireCancelledReq( msgs, &b.requests[i] );
1030        protocolSendCancel( msgs, &b.requests[i] );
1031    }
1032
1033    reqListClear( &a );
1034    reqListClear( &b );
1035}
1036
1037void
1038tr_peerMsgsCancel( tr_peermsgs * msgs,
1039                   uint32_t      pieceIndex,
1040                   uint32_t      offset,
1041                   uint32_t      length )
1042{
1043    struct peer_request req;
1044
1045    assert( msgs != NULL );
1046    assert( length > 0 );
1047
1048
1049    /* have we asked the peer for this piece? */
1050    req.index = pieceIndex;
1051    req.offset = offset;
1052    req.length = length;
1053
1054    /* if it's only in the queue and hasn't been sent yet, free it */
1055    if( reqListRemove( &msgs->clientWillAskFor, &req ) ) {
1056        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32"\n", pieceIndex, offset, length );
1057        fireCancelledReq( msgs, &req );
1058    }
1059
1060    /* if it's already been sent, send a cancel message too */
1061    if( reqListRemove( &msgs->clientAskedFor, &req ) ) {
1062        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32"\n", pieceIndex, offset, length );
1063        protocolSendCancel( msgs, &req );
1064        fireCancelledReq( msgs, &req );
1065    }
1066}
1067
1068
1069/**
1070***
1071**/
1072
1073static void
1074sendLtepHandshake( tr_peermsgs * msgs )
1075{
1076    tr_benc           val, *m;
1077    char *            buf;
1078    int               len;
1079    int               pex;
1080    struct evbuffer * out = msgs->outMessages;
1081
1082    if( msgs->clientSentLtepHandshake )
1083        return;
1084
1085    dbgmsg( msgs, "sending an ltep handshake" );
1086    msgs->clientSentLtepHandshake = 1;
1087
1088    /* decide if we want to advertise pex support */
1089    if( !tr_torrentAllowsPex( msgs->torrent ) )
1090        pex = 0;
1091    else if( msgs->peerSentLtepHandshake )
1092        pex = msgs->peerSupportsPex ? 1 : 0;
1093    else
1094        pex = 1;
1095
1096    tr_bencInitDict( &val, 4 );
1097    tr_bencDictAddInt( &val, "e", msgs->session->encryptionMode != TR_CLEAR_PREFERRED );
1098    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->session ) );
1099    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
1100    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
1101    m  = tr_bencDictAddDict( &val, "m", 1 );
1102    if( pex )
1103        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
1104    buf = tr_bencSave( &val, &len );
1105
1106    tr_peerIoWriteUint32( msgs->io, out, 2 * sizeof( uint8_t ) + len );
1107    tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
1108    tr_peerIoWriteUint8 ( msgs->io, out, LTEP_HANDSHAKE );
1109    tr_peerIoWriteBytes ( msgs->io, out, buf, len );
1110    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1111    dbgmsg( msgs, "outMessage size is now %d", (int)EVBUFFER_LENGTH( out ) );
1112
1113    /* cleanup */
1114    tr_bencFree( &val );
1115    tr_free( buf );
1116}
1117
1118static void
1119parseLtepHandshake( tr_peermsgs *     msgs,
1120                    int               len,
1121                    struct evbuffer * inbuf )
1122{
1123    int64_t   i;
1124    tr_benc   val, * sub;
1125    uint8_t * tmp = tr_new( uint8_t, len );
1126
1127    tr_peerIoReadBytes( msgs->io, inbuf, tmp, len );
1128    msgs->peerSentLtepHandshake = 1;
1129
1130    if( tr_bencLoad( tmp, len, &val, NULL ) || val.type != TYPE_DICT )
1131    {
1132        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
1133        tr_free( tmp );
1134        return;
1135    }
1136
1137    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
1138
1139    /* does the peer prefer encrypted connections? */
1140    if( tr_bencDictFindInt( &val, "e", &i ) )
1141        msgs->info->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1142                                            : ENCRYPTION_PREFERENCE_NO;
1143
1144    /* check supported messages for utorrent pex */
1145    msgs->peerSupportsPex = 0;
1146    if( tr_bencDictFindDict( &val, "m", &sub ) )
1147    {
1148        if( tr_bencDictFindInt( sub, "ut_pex", &i ) )
1149        {
1150            msgs->ut_pex_id = (uint8_t) i;
1151            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
1152            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
1153        }
1154    }
1155
1156    /* look for upload_only (BEP 21) */
1157    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
1158        fireUploadOnly( msgs, i!=0 );
1159
1160    /* get peer's listening port */
1161    if( tr_bencDictFindInt( &val, "p", &i ) )
1162    {
1163        msgs->info->port = htons( (uint16_t)i );
1164        dbgmsg( msgs, "msgs->port is now %hu", msgs->info->port );
1165    }
1166
1167    tr_bencFree( &val );
1168    tr_free( tmp );
1169}
1170
1171static void
1172parseUtPex( tr_peermsgs *     msgs,
1173            int               msglen,
1174            struct evbuffer * inbuf )
1175{
1176    int                loaded = 0;
1177    uint8_t *          tmp = tr_new( uint8_t, msglen );
1178    tr_benc            val;
1179    const tr_torrent * tor = msgs->torrent;
1180    const uint8_t *    added;
1181    size_t             added_len;
1182
1183    tr_peerIoReadBytes( msgs->io, inbuf, tmp, msglen );
1184
1185    if( tr_torrentAllowsPex( tor )
1186      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) )
1187      && tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1188    {
1189        const uint8_t * added_f = NULL;
1190        tr_pex *        pex;
1191        size_t          i, n;
1192        size_t          added_f_len = 0;
1193        tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1194        pex =
1195            tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len,
1196                                    &n );
1197        for( i = 0; i < n; ++i )
1198            tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1199                              TR_PEER_FROM_PEX, pex + i );
1200        tr_free( pex );
1201    }
1202
1203    if( loaded )
1204        tr_bencFree( &val );
1205    tr_free( tmp );
1206}
1207
1208static void sendPex( tr_peermsgs * msgs );
1209
1210static void
1211parseLtep( tr_peermsgs *     msgs,
1212           int               msglen,
1213           struct evbuffer * inbuf )
1214{
1215    uint8_t ltep_msgid;
1216
1217    tr_peerIoReadUint8( msgs->io, inbuf, &ltep_msgid );
1218    msglen--;
1219
1220    if( ltep_msgid == LTEP_HANDSHAKE )
1221    {
1222        dbgmsg( msgs, "got ltep handshake" );
1223        parseLtepHandshake( msgs, msglen, inbuf );
1224        if( tr_peerIoSupportsLTEP( msgs->io ) )
1225        {
1226            sendLtepHandshake( msgs );
1227            sendPex( msgs );
1228        }
1229    }
1230    else if( ltep_msgid == TR_LTEP_PEX )
1231    {
1232        dbgmsg( msgs, "got ut pex" );
1233        msgs->peerSupportsPex = 1;
1234        parseUtPex( msgs, msglen, inbuf );
1235    }
1236    else
1237    {
1238        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1239        evbuffer_drain( inbuf, msglen );
1240    }
1241}
1242
1243static int
1244readBtLength( tr_peermsgs *     msgs,
1245              struct evbuffer * inbuf,
1246              size_t            inlen )
1247{
1248    uint32_t len;
1249
1250    if( inlen < sizeof( len ) )
1251        return READ_LATER;
1252
1253    tr_peerIoReadUint32( msgs->io, inbuf, &len );
1254
1255    if( len == 0 ) /* peer sent us a keepalive message */
1256        dbgmsg( msgs, "got KeepAlive" );
1257    else
1258    {
1259        msgs->incoming.length = len;
1260        msgs->state = AWAITING_BT_ID;
1261    }
1262
1263    return READ_NOW;
1264}
1265
1266static int readBtMessage( tr_peermsgs *     msgs,
1267                          struct evbuffer * inbuf,
1268                          size_t            inlen );
1269
1270static int
1271readBtId( tr_peermsgs *     msgs,
1272          struct evbuffer * inbuf,
1273          size_t            inlen )
1274{
1275    uint8_t id;
1276
1277    if( inlen < sizeof( uint8_t ) )
1278        return READ_LATER;
1279
1280    tr_peerIoReadUint8( msgs->io, inbuf, &id );
1281    msgs->incoming.id = id;
1282
1283    if( id == BT_PIECE )
1284    {
1285        msgs->state = AWAITING_BT_PIECE;
1286        return READ_NOW;
1287    }
1288    else if( msgs->incoming.length != 1 )
1289    {
1290        msgs->state = AWAITING_BT_MESSAGE;
1291        return READ_NOW;
1292    }
1293    else return readBtMessage( msgs, inbuf, inlen - 1 );
1294}
1295
1296static void
1297updatePeerProgress( tr_peermsgs * msgs )
1298{
1299    msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
1300    dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
1301    updateFastSet( msgs );
1302    updateInterest( msgs );
1303    firePeerProgress( msgs );
1304}
1305
1306static void
1307peerMadeRequest( tr_peermsgs *               msgs,
1308                 const struct peer_request * req )
1309{
1310    const int fext = tr_peerIoSupportsFEXT( msgs->io );
1311    const int reqIsValid = requestIsValid( msgs, req );
1312    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
1313    const int peerIsChoked = msgs->info->peerIsChoked;
1314
1315    int allow = FALSE;
1316
1317    if( !reqIsValid )
1318        dbgmsg( msgs, "rejecting an invalid request." );
1319    else if( !clientHasPiece )
1320        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1321    else if( peerIsChoked )
1322        dbgmsg( msgs, "rejecting request from choked peer" );
1323    else
1324        allow = TRUE;
1325
1326    if( allow )
1327        reqListAppend( &msgs->peerAskedFor, req );
1328    else if( fext )
1329        protocolSendReject( msgs, req );
1330}
1331
1332static int
1333messageLengthIsCorrect( const tr_peermsgs * msg,
1334                        uint8_t             id,
1335                        uint32_t            len )
1336{
1337    switch( id )
1338    {
1339        case BT_CHOKE:
1340        case BT_UNCHOKE:
1341        case BT_INTERESTED:
1342        case BT_NOT_INTERESTED:
1343        case BT_FEXT_HAVE_ALL:
1344        case BT_FEXT_HAVE_NONE:
1345            return len == 1;
1346
1347        case BT_HAVE:
1348        case BT_FEXT_SUGGEST:
1349        case BT_FEXT_ALLOWED_FAST:
1350            return len == 5;
1351
1352        case BT_BITFIELD:
1353            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1354
1355        case BT_REQUEST:
1356        case BT_CANCEL:
1357        case BT_FEXT_REJECT:
1358            return len == 13;
1359
1360        case BT_PIECE:
1361            return len > 9 && len <= 16393;
1362
1363        case BT_PORT:
1364            return len == 3;
1365
1366        case BT_LTEP:
1367            return len >= 2;
1368
1369        default:
1370            return FALSE;
1371    }
1372}
1373
1374static int clientGotBlock( tr_peermsgs *               msgs,
1375                           const uint8_t *             block,
1376                           const struct peer_request * req );
1377
1378static int
1379readBtPiece( tr_peermsgs      * msgs,
1380             struct evbuffer  * inbuf,
1381             size_t             inlen,
1382             size_t           * setme_piece_bytes_read )
1383{
1384    struct peer_request * req = &msgs->incoming.blockReq;
1385
1386    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1387    dbgmsg( msgs, "In readBtPiece" );
1388
1389    if( !req->length )
1390    {
1391        if( inlen < 8 )
1392            return READ_LATER;
1393
1394        tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
1395        tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
1396        req->length = msgs->incoming.length - 9;
1397        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index,
1398                req->offset,
1399                req->length );
1400        return READ_NOW;
1401    }
1402    else
1403    {
1404        int          err;
1405
1406        /* read in another chunk of data */
1407        const size_t nLeft = req->length - EVBUFFER_LENGTH(
1408            msgs->incoming.block );
1409        size_t       n = MIN( nLeft, inlen );
1410        uint8_t *    buf = tr_new( uint8_t, n );
1411        assert( EVBUFFER_LENGTH( inbuf ) >= n );
1412        tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
1413        evbuffer_add( msgs->incoming.block, buf, n );
1414        fireClientGotData( msgs, n, TRUE );
1415        *setme_piece_bytes_read += n;
1416        tr_free( buf );
1417        dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
1418               (int)n, req->index, req->offset, req->length,
1419               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1420        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1421            return READ_LATER;
1422
1423        /* we've got the whole block ... process it */
1424        err = clientGotBlock( msgs, EVBUFFER_DATA(
1425                                  msgs->incoming.block ), req );
1426
1427        /* cleanup */
1428        evbuffer_drain( msgs->incoming.block,
1429                       EVBUFFER_LENGTH( msgs->incoming.block ) );
1430        req->length = 0;
1431        msgs->state = AWAITING_BT_LENGTH;
1432        if( !err )
1433            return READ_NOW;
1434        else
1435        {
1436            fireError( msgs, err );
1437            return READ_ERR;
1438        }
1439    }
1440}
1441
1442static int
1443readBtMessage( tr_peermsgs *     msgs,
1444               struct evbuffer * inbuf,
1445               size_t            inlen )
1446{
1447    uint32_t      ui32;
1448    uint32_t      msglen = msgs->incoming.length;
1449    const uint8_t id = msgs->incoming.id;
1450    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1451    const int     fext = tr_peerIoSupportsFEXT( msgs->io );
1452
1453    --msglen; /* id length */
1454
1455    if( inlen < msglen )
1456        return READ_LATER;
1457
1458    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id,
1459            (int)msglen,
1460            (int)inlen );
1461
1462    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1463    {
1464        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d",
1465                (int)id, (int)msglen );
1466        fireError( msgs, EMSGSIZE );
1467        return READ_ERR;
1468    }
1469
1470    switch( id )
1471    {
1472        case BT_CHOKE:
1473            dbgmsg( msgs, "got Choke" );
1474            msgs->info->clientIsChoked = 1;
1475            if( !fext )
1476                cancelAllRequestsToPeer( msgs );
1477            cancelAllRequestsToClient( msgs );
1478            break;
1479
1480        case BT_UNCHOKE:
1481            dbgmsg( msgs, "got Unchoke" );
1482            msgs->info->clientIsChoked = 0;
1483            fireNeedReq( msgs );
1484            break;
1485
1486        case BT_INTERESTED:
1487            dbgmsg( msgs, "got Interested" );
1488            msgs->info->peerIsInterested = 1;
1489            break;
1490
1491        case BT_NOT_INTERESTED:
1492            dbgmsg( msgs, "got Not Interested" );
1493            msgs->info->peerIsInterested = 0;
1494            break;
1495
1496        case BT_HAVE:
1497            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1498            dbgmsg( msgs, "got Have: %u", ui32 );
1499            if( tr_bitfieldAdd( msgs->info->have, ui32 ) )
1500            {
1501                fireError( msgs, ERANGE );
1502                return READ_ERR;
1503            }
1504            updatePeerProgress( msgs );
1505            tr_rcTransferred( msgs->torrent->swarmSpeed,
1506                              msgs->torrent->info.pieceSize );
1507            break;
1508
1509        case BT_BITFIELD:
1510        {
1511            dbgmsg( msgs, "got a bitfield" );
1512            msgs->peerSentBitfield = 1;
1513            tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits,
1514                                msglen );
1515            updatePeerProgress( msgs );
1516            fireNeedReq( msgs );
1517            break;
1518        }
1519
1520        case BT_REQUEST:
1521        {
1522            struct peer_request r;
1523            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1524            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1525            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1526            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1527            peerMadeRequest( msgs, &r );
1528            break;
1529        }
1530
1531        case BT_CANCEL:
1532        {
1533            struct peer_request r;
1534            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1535            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1536            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1537            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1538            if( reqListRemove( &msgs->peerAskedFor, &r ) && fext )
1539                protocolSendReject( msgs, &r );
1540            break;
1541        }
1542
1543        case BT_PIECE:
1544            assert( 0 ); /* handled elsewhere! */
1545            break;
1546
1547        case BT_PORT:
1548            dbgmsg( msgs, "Got a BT_PORT" );
1549            tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
1550            break;
1551
1552        case BT_FEXT_SUGGEST:
1553            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1554            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1555            if( fext )
1556                fireClientGotSuggest( msgs, ui32 );
1557            else {
1558                fireError( msgs, EMSGSIZE );
1559                return READ_ERR;
1560            }
1561            break;
1562
1563        case BT_FEXT_ALLOWED_FAST:
1564            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1565            tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
1566            if( fext )
1567                fireClientGotAllowedFast( msgs, ui32 );
1568            else {
1569                fireError( msgs, EMSGSIZE );
1570                return READ_ERR;
1571            }
1572            break;
1573
1574        case BT_FEXT_HAVE_ALL:
1575            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1576            if( fext ) {
1577                tr_bitfieldAddRange( msgs->info->have, 0, msgs->torrent->info.pieceCount );
1578                updatePeerProgress( msgs );
1579            } else {
1580                fireError( msgs, EMSGSIZE );
1581                return READ_ERR;
1582            }
1583            break;
1584
1585
1586        case BT_FEXT_HAVE_NONE:
1587            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1588            if( fext ) {
1589                tr_bitfieldClear( msgs->info->have );
1590                updatePeerProgress( msgs );
1591            } else {
1592                fireError( msgs, EMSGSIZE );
1593                return READ_ERR;
1594            }
1595            break;
1596
1597        case BT_FEXT_REJECT:
1598        {
1599            struct peer_request r;
1600            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1601            tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
1602            tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
1603            tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
1604            if( fext )
1605                reqListRemove( &msgs->clientAskedFor, &r );
1606            else {
1607                fireError( msgs, EMSGSIZE );
1608                return READ_ERR;
1609            }
1610            break;
1611        }
1612
1613        case BT_LTEP:
1614            dbgmsg( msgs, "Got a BT_LTEP" );
1615            parseLtep( msgs, msglen, inbuf );
1616            break;
1617
1618        default:
1619            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1620            tr_peerIoDrain( msgs->io, inbuf, msglen );
1621            break;
1622    }
1623
1624    assert( msglen + 1 == msgs->incoming.length );
1625    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1626
1627    msgs->state = AWAITING_BT_LENGTH;
1628    return READ_NOW;
1629}
1630
1631static void
1632decrementDownloadedCount( tr_peermsgs * msgs,
1633                          uint32_t      byteCount )
1634{
1635    tr_torrent * tor = msgs->torrent;
1636
1637    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1638}
1639
1640static void
1641clientGotUnwantedBlock( tr_peermsgs *               msgs,
1642                        const struct peer_request * req )
1643{
1644    decrementDownloadedCount( msgs, req->length );
1645}
1646
1647static void
1648addPeerToBlamefield( tr_peermsgs * msgs,
1649                     uint32_t      index )
1650{
1651    if( !msgs->info->blame )
1652        msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1653    tr_bitfieldAdd( msgs->info->blame, index );
1654}
1655
1656/* returns 0 on success, or an errno on failure */
1657static int
1658clientGotBlock( tr_peermsgs *               msgs,
1659                const uint8_t *             data,
1660                const struct peer_request * req )
1661{
1662    int                    err;
1663    tr_torrent *           tor = msgs->torrent;
1664    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1665
1666    assert( msgs );
1667    assert( req );
1668
1669    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) )
1670    {
1671        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1672                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1673        return EMSGSIZE;
1674    }
1675
1676    /* save the block */
1677    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset,
1678            req->length );
1679
1680    /**
1681    *** Remove the block from our `we asked for this' list
1682    **/
1683
1684    if( !reqListRemove( &msgs->clientAskedFor, req ) )
1685    {
1686        clientGotUnwantedBlock( msgs, req );
1687        dbgmsg( msgs, "we didn't ask for this message..." );
1688        return 0;
1689    }
1690
1691    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1692            msgs->clientAskedFor.count );
1693
1694    /**
1695    *** Error checks
1696    **/
1697
1698    if( tr_cpBlockIsComplete( tor->completion, block ) )
1699    {
1700        dbgmsg( msgs, "we have this block already..." );
1701        clientGotUnwantedBlock( msgs, req );
1702        return 0;
1703    }
1704
1705    /**
1706    ***  Save the block
1707    **/
1708
1709    msgs->info->peerSentPieceDataAt = time( NULL );
1710    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1711        return err;
1712
1713    addPeerToBlamefield( msgs, req->index );
1714    fireGotBlock( msgs, req );
1715    return 0;
1716}
1717
1718static int peerPulse( void * vmsgs );
1719
1720static void
1721didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1722{
1723    tr_peermsgs * msgs = vmsgs;
1724    firePeerGotData( msgs, bytesWritten, wasPieceData );
1725    peerPulse( msgs );
1726}
1727
1728static ReadState
1729canRead( struct tr_iobuf * iobuf, void * vmsgs, size_t * piece )
1730{
1731    ReadState         ret;
1732    tr_peermsgs *     msgs = vmsgs;
1733    struct evbuffer * in = tr_iobuf_input( iobuf );
1734    const size_t      inlen = EVBUFFER_LENGTH( in );
1735
1736    if( !inlen )
1737    {
1738        ret = READ_LATER;
1739    }
1740    else if( msgs->state == AWAITING_BT_PIECE )
1741    {
1742        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1743    }
1744    else switch( msgs->state )
1745    {
1746        case AWAITING_BT_LENGTH:
1747            ret = readBtLength ( msgs, in, inlen ); break;
1748
1749        case AWAITING_BT_ID:
1750            ret = readBtId     ( msgs, in, inlen ); break;
1751
1752        case AWAITING_BT_MESSAGE:
1753            ret = readBtMessage( msgs, in, inlen ); break;
1754
1755        default:
1756            assert( 0 );
1757    }
1758
1759    /* log the raw data that was read */
1760    if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
1761        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1762
1763    return ret;
1764}
1765
1766/**
1767***
1768**/
1769
1770static int
1771ratePulse( void * vpeer )
1772{
1773    tr_peermsgs * peer = vpeer;
1774    const double rateToClient = tr_peerGetPieceSpeed( peer->info,
1775                                                      TR_PEER_TO_CLIENT );
1776    const int estimatedBlocksInNext30Seconds =
1777                  ( rateToClient * 30 * 1024 ) / peer->torrent->blockSize;
1778
1779    peer->minActiveRequests = 8;
1780    peer->maxActiveRequests = peer->minActiveRequests + estimatedBlocksInNext30Seconds;
1781    return TRUE;
1782}
1783
1784static size_t
1785fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1786{
1787    size_t bytesWritten = 0;
1788    struct peer_request req;
1789    const int haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1790    const int fext = tr_peerIoSupportsFEXT( msgs->io );
1791
1792    /**
1793    ***  Protocol messages
1794    **/
1795
1796    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1797    {
1798        dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1799        msgs->outMessagesBatchedAt = now;
1800    }
1801    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1802    {
1803        const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
1804        /* flush the protocol messages */
1805        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->io, len );
1806        tr_peerIoWriteBuf( msgs->io, msgs->outMessages, FALSE );
1807        msgs->clientSentAnythingAt = now;
1808        msgs->outMessagesBatchedAt = 0;
1809        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1810        bytesWritten +=  len;
1811    }
1812
1813    /**
1814    ***  Blocks
1815    **/
1816
1817    if( ( tr_peerIoGetWriteBufferSpace( msgs->io ) >= msgs->torrent->blockSize )
1818        && popNextRequest( msgs, &req ) )
1819    {
1820        if( requestIsValid( msgs, &req )
1821            && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) )
1822        {
1823            /* send a block */
1824            uint8_t * buf = tr_new( uint8_t, req.length );
1825            const int err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf );
1826            if( err ) {
1827                fireError( msgs, err );
1828                bytesWritten = 0;
1829                msgs = NULL;
1830            } else {
1831                tr_peerIo * io = msgs->io;
1832                struct evbuffer * out = evbuffer_new( );
1833                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1834                tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1835                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1836                tr_peerIoWriteUint32( io, out, req.index );
1837                tr_peerIoWriteUint32( io, out, req.offset );
1838                tr_peerIoWriteBytes ( io, out, buf, req.length );
1839                tr_peerIoWriteBuf( io, out, TRUE );
1840                bytesWritten += EVBUFFER_LENGTH( out );
1841                evbuffer_free( out );
1842                msgs->clientSentAnythingAt = now;
1843            }
1844            tr_free( buf );
1845        }
1846        else if( fext ) /* peer needs a reject message */
1847        {
1848            protocolSendReject( msgs, &req );
1849        }
1850    }
1851
1852    /**
1853    ***  Keepalive
1854    **/
1855
1856    if( ( msgs != NULL )
1857        && ( msgs->clientSentAnythingAt != 0 )
1858        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1859    {
1860        dbgmsg( msgs, "sending a keepalive message" );
1861        tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
1862        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1863    }
1864
1865    return bytesWritten;
1866}
1867
1868static int
1869peerPulse( void * vmsgs )
1870{
1871    tr_peermsgs * msgs = vmsgs;
1872    const time_t  now = time( NULL );
1873
1874    ratePulse( msgs );
1875
1876    pumpRequestQueue( msgs, now );
1877    expireOldRequests( msgs, now );
1878
1879    for( ;; )
1880        if( fillOutputBuffer( msgs, now ) < 1 )
1881            break;
1882
1883    return TRUE; /* loop forever */
1884}
1885
1886void
1887tr_peerMsgsPulse( tr_peermsgs * msgs )
1888{
1889    if( msgs != NULL )
1890        peerPulse( msgs );
1891}
1892
1893static void
1894gotError( struct tr_iobuf  * iobuf UNUSED,
1895          short              what,
1896          void             * vmsgs )
1897{
1898    if( what & EVBUFFER_TIMEOUT )
1899        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1900    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1901        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1902               what, errno, tr_strerror( errno ) );
1903    fireError( vmsgs, ENOTCONN );
1904}
1905
1906static void
1907sendBitfield( tr_peermsgs * msgs )
1908{
1909    struct evbuffer * out = msgs->outMessages;
1910    tr_bitfield *     field;
1911    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
1912    size_t            i;
1913    size_t            lazyCount = 0;
1914
1915    field = tr_bitfieldDup( tr_cpPieceBitfield( msgs->torrent->completion ) );
1916
1917    if( tr_sessionIsLazyBitfieldEnabled( msgs->session ) )
1918    {
1919        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
1920            speed over a truly random sample -- let's limit the pool size to
1921            the first 1000 pieces so large torrents don't bog things down */
1922        size_t poolSize;
1923        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
1924        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
1925
1926        /* build the pool */
1927        for( i=poolSize=0; i<maxPoolSize; ++i )
1928            if( tr_bitfieldHas( field, i ) )
1929                pool[poolSize++] = i;
1930
1931        /* pull random piece indices from the pool */
1932        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
1933        {
1934            const int pos = tr_cryptoWeakRandInt( poolSize );
1935            const tr_piece_index_t piece = pool[pos];
1936            tr_bitfieldRem( field, piece );
1937            lazyPieces[lazyCount++] = piece;
1938            pool[pos] = pool[--poolSize];
1939        }
1940
1941        /* cleanup */
1942        tr_free( pool );
1943    }
1944
1945    tr_peerIoWriteUint32( msgs->io, out,
1946                          sizeof( uint8_t ) + field->byteCount );
1947    tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
1948    tr_peerIoWriteBytes ( msgs->io, out, field->bits, field->byteCount );
1949    dbgmsg( msgs, "sending bitfield... outMessage size is now %d",
1950           (int)EVBUFFER_LENGTH( out ) );
1951    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1952
1953    for( i = 0; i < lazyCount; ++i )
1954        protocolSendHave( msgs, lazyPieces[i] );
1955
1956    tr_bitfieldFree( field );
1957}
1958
1959static void
1960tellPeerWhatWeHave( tr_peermsgs * msgs )
1961{
1962    const int fext = tr_peerIoSupportsFEXT( msgs->io );
1963
1964    if( fext && ( tr_cpGetStatus( msgs->torrent->completion ) == TR_SEED ) )
1965    {
1966        protocolSendHaveAll( msgs );
1967    }
1968    else if( fext && ( tr_cpHaveValid( msgs->torrent->completion ) == 0 ) )
1969    {
1970        protocolSendHaveNone( msgs );
1971    }
1972    else
1973    {
1974        sendBitfield( msgs );
1975    }
1976}
1977
1978/**
1979***
1980**/
1981
1982/* some peers give us error messages if we send
1983   more than this many peers in a single pex message
1984   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1985#define MAX_PEX_ADDED 50
1986#define MAX_PEX_DROPPED 50
1987
1988typedef struct
1989{
1990    tr_pex *  added;
1991    tr_pex *  dropped;
1992    tr_pex *  elements;
1993    int       addedCount;
1994    int       droppedCount;
1995    int       elementCount;
1996}
1997PexDiffs;
1998
1999static void
2000pexAddedCb( void * vpex,
2001            void * userData )
2002{
2003    PexDiffs * diffs = userData;
2004    tr_pex *   pex = vpex;
2005
2006    if( diffs->addedCount < MAX_PEX_ADDED )
2007    {
2008        diffs->added[diffs->addedCount++] = *pex;
2009        diffs->elements[diffs->elementCount++] = *pex;
2010    }
2011}
2012
2013static void
2014pexDroppedCb( void * vpex,
2015              void * userData )
2016{
2017    PexDiffs * diffs = userData;
2018    tr_pex *   pex = vpex;
2019
2020    if( diffs->droppedCount < MAX_PEX_DROPPED )
2021    {
2022        diffs->dropped[diffs->droppedCount++] = *pex;
2023    }
2024}
2025
2026static void
2027pexElementCb( void * vpex,
2028              void * userData )
2029{
2030    PexDiffs * diffs = userData;
2031    tr_pex *   pex = vpex;
2032
2033    diffs->elements[diffs->elementCount++] = *pex;
2034}
2035
2036/* TODO: ipv6 pex */
2037static void
2038sendPex( tr_peermsgs * msgs )
2039{
2040    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2041    {
2042        PexDiffs diffs;
2043        tr_pex * newPex = NULL;
2044        const int newCount = tr_peerMgrGetPeers( msgs->session->peerMgr,
2045                                                 msgs->torrent->info.hash,
2046                                                 &newPex );
2047
2048        /* build the diffs */
2049        diffs.added = tr_new( tr_pex, newCount );
2050        diffs.addedCount = 0;
2051        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2052        diffs.droppedCount = 0;
2053        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2054        diffs.elementCount = 0;
2055        tr_set_compare( msgs->pex, msgs->pexCount,
2056                        newPex, newCount,
2057                        tr_pexCompare, sizeof( tr_pex ),
2058                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2059        dbgmsg(
2060            msgs,
2061            "pex: old peer count %d, new peer count %d, added %d, removed %d",
2062            msgs->pexCount, newCount, diffs.addedCount, diffs.droppedCount );
2063
2064        if( !diffs.addedCount && !diffs.droppedCount )
2065        {
2066            tr_free( diffs.elements );
2067        }
2068        else
2069        {
2070            int  i;
2071            tr_benc val;
2072            char * benc;
2073            int bencLen;
2074            uint8_t * tmp, *walk;
2075            struct evbuffer * out = msgs->outMessages;
2076
2077            /* update peer */
2078            tr_free( msgs->pex );
2079            msgs->pex = diffs.elements;
2080            msgs->pexCount = diffs.elementCount;
2081
2082            /* build the pex payload */
2083            tr_bencInitDict( &val, 3 );
2084
2085            /* "added" */
2086            tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2087            for( i = 0; i < diffs.addedCount; ++i ) {
2088                memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2089                memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2090            }
2091            assert( ( walk - tmp ) == diffs.addedCount * 6 );
2092            tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2093            tr_free( tmp );
2094
2095            /* "added.f" */
2096            tmp = walk = tr_new( uint8_t, diffs.addedCount );
2097            for( i = 0; i < diffs.addedCount; ++i )
2098                *walk++ = diffs.added[i].flags;
2099            assert( ( walk - tmp ) == diffs.addedCount );
2100            tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2101            tr_free( tmp );
2102
2103            /* "dropped" */
2104            tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2105            for( i = 0; i < diffs.droppedCount; ++i ) {
2106                memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2107                memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2108            }
2109            assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2110            tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2111            tr_free( tmp );
2112
2113            /* write the pex message */
2114            benc = tr_bencSave( &val, &bencLen );
2115            tr_peerIoWriteUint32( msgs->io, out, 2 * sizeof( uint8_t ) + bencLen );
2116            tr_peerIoWriteUint8 ( msgs->io, out, BT_LTEP );
2117            tr_peerIoWriteUint8 ( msgs->io, out, msgs->ut_pex_id );
2118            tr_peerIoWriteBytes ( msgs->io, out, benc, bencLen );
2119            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2120            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
2121
2122            tr_free( benc );
2123            tr_bencFree( &val );
2124        }
2125
2126        /* cleanup */
2127        tr_free( diffs.added );
2128        tr_free( diffs.dropped );
2129        tr_free( newPex );
2130
2131        msgs->clientSentPexAt = time( NULL );
2132    }
2133}
2134
2135static int
2136pexPulse( void * vpeer )
2137{
2138    sendPex( vpeer );
2139    return TRUE;
2140}
2141
2142/**
2143***
2144**/
2145
2146tr_peermsgs*
2147tr_peerMsgsNew( struct tr_torrent * torrent,
2148                struct tr_peer *    peer,
2149                tr_delivery_func    func,
2150                void *              userData,
2151                tr_publisher_tag *  setme )
2152{
2153    tr_peermsgs * m;
2154
2155    assert( peer );
2156    assert( peer->io );
2157
2158    m = tr_new0( tr_peermsgs, 1 );
2159    m->publisher = tr_publisherNew( );
2160    m->info = peer;
2161    m->session = torrent->session;
2162    m->torrent = torrent;
2163    m->io = peer->io;
2164    m->info->clientIsChoked = 1;
2165    m->info->peerIsChoked = 1;
2166    m->info->clientIsInterested = 0;
2167    m->info->peerIsInterested = 0;
2168    m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
2169    m->state = AWAITING_BT_LENGTH;
2170    m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL );
2171    m->outMessages = evbuffer_new( );
2172    m->outMessagesBatchedAt = 0;
2173    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2174    m->incoming.block = evbuffer_new( );
2175    m->peerAllowedPieces = NULL;
2176    m->peerAskedFor = REQUEST_LIST_INIT;
2177    m->clientAskedFor = REQUEST_LIST_INIT;
2178    m->clientWillAskFor = REQUEST_LIST_INIT;
2179    peer->msgs = m;
2180
2181    *setme = tr_publisherSubscribe( m->publisher, func, userData );
2182
2183    if( tr_peerIoSupportsLTEP( m->io ) )
2184        sendLtepHandshake( m );
2185
2186    tellPeerWhatWeHave( m );
2187
2188    tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of inactivity */
2189    tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
2190    ratePulse( m );
2191
2192    return m;
2193}
2194
2195void
2196tr_peerMsgsFree( tr_peermsgs* msgs )
2197{
2198    if( msgs )
2199    {
2200        tr_timerFree( &msgs->pexTimer );
2201        tr_publisherFree( &msgs->publisher );
2202        reqListClear( &msgs->clientWillAskFor );
2203        reqListClear( &msgs->clientAskedFor );
2204        reqListClear( &msgs->peerAskedFor );
2205
2206        tr_bitfieldFree( msgs->peerAllowedPieces );
2207        evbuffer_free( msgs->incoming.block );
2208        evbuffer_free( msgs->outMessages );
2209        tr_free( msgs->pex );
2210
2211        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2212        tr_free( msgs );
2213    }
2214}
2215
2216void
2217tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2218                        tr_publisher_tag tag )
2219{
2220    tr_publisherUnsubscribe( peer->publisher, tag );
2221}
2222
Note: See TracBrowser for help on using the repository browser.