source: trunk/libtransmission/peer-msgs.c @ 7609

Last change on this file since 7609 was 7609, checked in by charles, 12 years ago

(trunk libT) new peer request fifo queue with log(N) search time. new unit tests for the queue. new utility tr_lowerBound()

  • Property svn:keywords set to Date Rev Author Id
File size: 62.5 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 7609 2009-01-04 16:29:44Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <event.h>
21
22#include "transmission.h"
23#include "session.h"
24#include "bencode.h"
25#include "completion.h"
26#include "crypto.h"
27#include "inout.h"
28#ifdef WIN32
29#include "net.h" /* for ECONN */
30#endif
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-mgr-private.h"
34#include "peer-msgs.h"
35#include "platform.h" /* MAX_STACK_ARRAY_SIZE */
36#include "ratecontrol.h"
37#include "request-list.h"
38#include "stats.h"
39#include "torrent.h"
40#include "trevent.h"
41#include "utils.h"
42
43/**
44***
45**/
46
47enum
48{
49    BT_CHOKE                = 0,
50    BT_UNCHOKE              = 1,
51    BT_INTERESTED           = 2,
52    BT_NOT_INTERESTED       = 3,
53    BT_HAVE                 = 4,
54    BT_BITFIELD             = 5,
55    BT_REQUEST              = 6,
56    BT_PIECE                = 7,
57    BT_CANCEL               = 8,
58    BT_PORT                 = 9,
59
60    BT_FEXT_SUGGEST         = 13,
61    BT_FEXT_HAVE_ALL        = 14,
62    BT_FEXT_HAVE_NONE       = 15,
63    BT_FEXT_REJECT          = 16,
64    BT_FEXT_ALLOWED_FAST    = 17,
65
66    BT_LTEP                 = 20,
67
68    LTEP_HANDSHAKE          = 0,
69
70    TR_LTEP_PEX             = 1,
71
72
73
74    MIN_CHOKE_PERIOD_SEC    = ( 10 ),
75
76    /* idle seconds before we send a keepalive */
77    KEEPALIVE_INTERVAL_SECS = 100,
78
79    PEX_INTERVAL            = ( 90 * 1000 ), /* msec between sendPex() calls */
80
81
82    MAX_BLOCK_SIZE          = ( 1024 * 16 ),
83
84
85    /* how long an unsent request can stay queued before it's returned
86       back to the peer-mgr's pool of requests */
87    QUEUED_REQUEST_TTL_SECS = 20,
88
89    /* how long a sent request can stay queued before it's returned
90       back to the peer-mgr's pool of requests */
91    SENT_REQUEST_TTL_SECS = 240,
92
93    /* used in lowering the outMessages queue period */
94    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
95    HIGH_PRIORITY_INTERVAL_SECS = 2,
96    LOW_PRIORITY_INTERVAL_SECS = 20,
97
98    /* number of pieces to remove from the bitfield when
99     * lazy bitfields are turned on */
100    LAZY_PIECE_COUNT = 26,
101
102    /* number of pieces we'll allow in our fast set */
103    MAX_FAST_SET_SIZE = 3
104};
105
106enum
107{
108    AWAITING_BT_LENGTH,
109    AWAITING_BT_ID,
110    AWAITING_BT_MESSAGE,
111    AWAITING_BT_PIECE
112};
113
114/**
115***
116**/
117
118/* this is raw, unchanged data from the peer regarding
119 * the current message that it's sending us. */
120struct tr_incoming
121{
122    uint8_t                id;
123    uint32_t               length; /* includes the +1 for id length */
124    struct peer_request    blockReq; /* metadata for incoming blocks */
125    struct evbuffer *      block; /* piece data for incoming blocks */
126};
127
128/**
129 * Low-level communication state information about a connected peer.
130 *
131 * This structure remembers the low-level protocol states that we're
132 * in with this peer, such as active requests, pex messages, and so on.
133 * Its fields are all private to peer-msgs.c.
134 *
135 * Data not directly involved with sending & receiving messages is
136 * stored in tr_peer, where it can be accessed by both peermsgs and
137 * the peer manager.
138 *
139 * @see struct peer_atom
140 * @see tr_peer
141 */
142struct tr_peermsgs
143{
144    tr_bool         peerSupportsPex;
145    tr_bool         clientSentLtepHandshake;
146    tr_bool         peerSentLtepHandshake;
147    tr_bool         haveFastSet;
148
149    uint8_t         state;
150    uint8_t         ut_pex_id;
151    uint16_t        pexCount;
152    uint16_t        pexCount6;
153    uint16_t        maxActiveRequests;
154
155    size_t                 fastsetSize;
156    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
157
158    /* how long the outMessages batch should be allowed to grow before
159     * it's flushed -- some messages (like requests >:) should be sent
160     * very quickly; others aren't as urgent. */
161    int                    outMessagesBatchPeriod;
162
163    tr_peer *              peer;
164
165    tr_session *           session;
166    tr_torrent *           torrent;
167
168    tr_publisher           publisher;
169
170    struct evbuffer *      outMessages; /* all the non-piece messages */
171
172    struct request_list    peerAskedFor;
173    struct request_list    clientAskedFor;
174    struct request_list    clientWillAskFor;
175
176    tr_timer             * pexTimer;
177    tr_pex               * pex;
178    tr_pex               * pex6;
179
180    time_t                 clientSentPexAt;
181    time_t                 clientSentAnythingAt;
182
183    /* when we started batching the outMessages */
184    time_t                outMessagesBatchedAt;
185
186    struct tr_incoming    incoming;
187
188    /* if the peer supports the Extension Protocol in BEP 10 and
189       supplied a reqq argument, it's stored here.  otherwise the
190       value is zero and should be ignored. */
191    int64_t               reqq;
192};
193
194/**
195***
196**/
197
198static void
199myDebug( const char * file, int line,
200         const struct tr_peermsgs * msgs,
201         const char * fmt, ... )
202{
203    FILE * fp = tr_getLog( );
204
205    if( fp )
206    {
207        va_list           args;
208        char              timestr[64];
209        struct evbuffer * buf = tr_getBuffer( );
210        char *            base = tr_basename( file );
211
212        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
213                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
214                             msgs->torrent->info.name,
215                             tr_peerIoGetAddrStr( msgs->peer->io ),
216                             msgs->peer->client );
217        va_start( args, fmt );
218        evbuffer_add_vprintf( buf, fmt, args );
219        va_end( args );
220        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
221        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
222
223        tr_free( base );
224        tr_releaseBuffer( buf );
225    }
226}
227
228#define dbgmsg( msgs, ... ) \
229    do { \
230        if( tr_deepLoggingIsActive( ) ) \
231            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
232    } while( 0 )
233
234/**
235***
236**/
237
238static void
239pokeBatchPeriod( tr_peermsgs * msgs,
240                 int           interval )
241{
242    if( msgs->outMessagesBatchPeriod > interval )
243    {
244        msgs->outMessagesBatchPeriod = interval;
245        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
246    }
247}
248
249static inline void
250dbgOutMessageLen( tr_peermsgs * msgs )
251{
252    dbgmsg( msgs, "outMessage size is now %zu", EVBUFFER_LENGTH( msgs->outMessages ) );
253}
254
255static void
256protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
257{
258    tr_peerIo       * io  = msgs->peer->io;
259    struct evbuffer * out = msgs->outMessages;
260
261    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
262
263    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
264    tr_peerIoWriteUint8 ( io, out, BT_FEXT_REJECT );
265    tr_peerIoWriteUint32( io, out, req->index );
266    tr_peerIoWriteUint32( io, out, req->offset );
267    tr_peerIoWriteUint32( io, out, req->length );
268
269    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
270    dbgOutMessageLen( msgs );
271}
272
273static void
274protocolSendRequest( tr_peermsgs               * msgs,
275                     const struct peer_request * req )
276{
277    tr_peerIo       * io  = msgs->peer->io;
278    struct evbuffer * out = msgs->outMessages;
279
280    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
281    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
282    tr_peerIoWriteUint32( io, out, req->index );
283    tr_peerIoWriteUint32( io, out, req->offset );
284    tr_peerIoWriteUint32( io, out, req->length );
285
286    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
287    dbgOutMessageLen( msgs );
288    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
289}
290
291static void
292protocolSendCancel( tr_peermsgs               * msgs,
293                    const struct peer_request * req )
294{
295    tr_peerIo       * io  = msgs->peer->io;
296    struct evbuffer * out = msgs->outMessages;
297
298    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
299    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
300    tr_peerIoWriteUint32( io, out, req->index );
301    tr_peerIoWriteUint32( io, out, req->offset );
302    tr_peerIoWriteUint32( io, out, req->length );
303
304    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
305    dbgOutMessageLen( msgs );
306    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
307}
308
309static void
310protocolSendHave( tr_peermsgs * msgs,
311                  uint32_t      index )
312{
313    tr_peerIo       * io  = msgs->peer->io;
314    struct evbuffer * out = msgs->outMessages;
315
316    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
317    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
318    tr_peerIoWriteUint32( io, out, index );
319
320    dbgmsg( msgs, "sending Have %u", index );
321    dbgOutMessageLen( msgs );
322    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
323}
324
325#if 0
326static void
327protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
328{
329    tr_peerIo       * io  = msgs->peer->io;
330    struct evbuffer * out = msgs->outMessages;
331
332    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
333
334    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
335    tr_peerIoWriteUint8 ( io, out, BT_FEXT_ALLOWED_FAST );
336    tr_peerIoWriteUint32( io, out, pieceIndex );
337
338    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
339    dbgOutMessageLen( msgs );
340}
341#endif
342
343static void
344protocolSendChoke( tr_peermsgs * msgs,
345                   int           choke )
346{
347    tr_peerIo       * io  = msgs->peer->io;
348    struct evbuffer * out = msgs->outMessages;
349
350    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
351    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
352
353    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
354    dbgOutMessageLen( msgs );
355    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
356}
357
358static void
359protocolSendHaveAll( tr_peermsgs * msgs )
360{
361    tr_peerIo       * io  = msgs->peer->io;
362    struct evbuffer * out = msgs->outMessages;
363
364    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
365
366    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
367    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_ALL );
368
369    dbgmsg( msgs, "sending HAVE_ALL..." );
370    dbgOutMessageLen( msgs );
371    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
372}
373
374static void
375protocolSendHaveNone( tr_peermsgs * msgs )
376{
377    tr_peerIo       * io  = msgs->peer->io;
378    struct evbuffer * out = msgs->outMessages;
379
380    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
381
382    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
383    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_NONE );
384
385    dbgmsg( msgs, "sending HAVE_NONE..." );
386    dbgOutMessageLen( msgs );
387    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
388}
389
390/**
391***  EVENTS
392**/
393
394static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0, 0 };
395
396static void
397publish( tr_peermsgs * msgs, tr_peer_event * e )
398{
399    assert( msgs->peer );
400    assert( msgs->peer->msgs == msgs );
401
402    tr_publisherPublish( &msgs->publisher, msgs->peer, e );
403}
404
405static void
406fireError( tr_peermsgs * msgs, int err )
407{
408    tr_peer_event e = blankEvent;
409    e.eventType = TR_PEER_ERROR;
410    e.err = err;
411    publish( msgs, &e );
412}
413
414static void
415fireUploadOnly( tr_peermsgs * msgs, tr_bool uploadOnly )
416{
417    tr_peer_event e = blankEvent;
418    e.eventType = TR_PEER_UPLOAD_ONLY;
419    e.uploadOnly = uploadOnly;
420    publish( msgs, &e );
421}
422
423static void
424fireNeedReq( tr_peermsgs * msgs )
425{
426    tr_peer_event e = blankEvent;
427    e.eventType = TR_PEER_NEED_REQ;
428    publish( msgs, &e );
429}
430
431static void
432firePeerProgress( tr_peermsgs * msgs )
433{
434    tr_peer_event e = blankEvent;
435    e.eventType = TR_PEER_PEER_PROGRESS;
436    e.progress = msgs->peer->progress;
437    publish( msgs, &e );
438}
439
440static void
441fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
442{
443    tr_peer_event e = blankEvent;
444    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
445    e.pieceIndex = req->index;
446    e.offset = req->offset;
447    e.length = req->length;
448    publish( msgs, &e );
449}
450
451static void
452fireClientGotData( tr_peermsgs * msgs,
453                   uint32_t      length,
454                   int           wasPieceData )
455{
456    tr_peer_event e = blankEvent;
457
458    e.length = length;
459    e.eventType = TR_PEER_CLIENT_GOT_DATA;
460    e.wasPieceData = wasPieceData;
461    publish( msgs, &e );
462}
463
464static void
465fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
466{
467    tr_peer_event e = blankEvent;
468    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
469    e.pieceIndex = pieceIndex;
470    publish( msgs, &e );
471}
472
473static void
474fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
475{
476    tr_peer_event e = blankEvent;
477    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
478    e.pieceIndex = pieceIndex;
479    publish( msgs, &e );
480}
481
482static void
483firePeerGotData( tr_peermsgs  * msgs,
484                 uint32_t       length,
485                 int            wasPieceData )
486{
487    tr_peer_event e = blankEvent;
488
489    e.length = length;
490    e.eventType = TR_PEER_PEER_GOT_DATA;
491    e.wasPieceData = wasPieceData;
492
493    publish( msgs, &e );
494}
495
496static void
497fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
498{
499    tr_peer_event e = blankEvent;
500    e.eventType = TR_PEER_CANCEL;
501    e.pieceIndex = req->index;
502    e.offset = req->offset;
503    e.length = req->length;
504    publish( msgs, &e );
505}
506
507/**
508***  ALLOWED FAST SET
509***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
510**/
511
512size_t
513tr_generateAllowedSet( tr_piece_index_t * setmePieces,
514                       size_t             desiredSetSize,
515                       size_t             pieceCount,
516                       const uint8_t    * infohash,
517                       const tr_address * addr )
518{
519    size_t setSize = 0;
520
521    assert( setmePieces );
522    assert( desiredSetSize <= pieceCount );
523    assert( desiredSetSize );
524    assert( pieceCount );
525    assert( infohash );
526    assert( addr );
527
528    if( addr->type == TR_AF_INET )
529    {
530        uint8_t w[SHA_DIGEST_LENGTH + 4];
531        uint8_t x[SHA_DIGEST_LENGTH];
532
533        *(uint32_t*)w = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
534        memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );                /* (2) */
535        tr_sha1( x, w, sizeof( w ), NULL );                          /* (3) */
536
537        while( setSize<desiredSetSize )
538        {
539            int i;
540            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
541            {
542                size_t k;
543                uint32_t j = i * 4;                                  /* (5) */
544                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
545                uint32_t index = y % pieceCount;                     /* (7) */
546
547                for( k=0; k<setSize; ++k )                           /* (8) */
548                    if( setmePieces[k] == index )
549                        break;
550
551                if( k == setSize )
552                    setmePieces[setSize++] = index;                  /* (9) */
553            }
554
555            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
556        }
557    }
558
559    return setSize;
560}
561
562static void
563updateFastSet( tr_peermsgs * msgs UNUSED )
564{
565#if 0
566    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
567    const int peerIsNeedy = msgs->peer->progress < 0.10;
568
569    if( fext && peerIsNeedy && !msgs->haveFastSet )
570    {
571        size_t i;
572        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
573        const tr_info * inf = &msgs->torrent->info;
574        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
575
576        /* build the fast set */
577        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
578        msgs->haveFastSet = 1;
579
580        /* send it to the peer */
581        for( i=0; i<msgs->fastsetSize; ++i )
582            protocolSendAllowedFast( msgs, msgs->fastset[i] );
583    }
584#endif
585}
586
587/**
588***  INTEREST
589**/
590
591static int
592isPieceInteresting( const tr_peermsgs * msgs,
593                    tr_piece_index_t    piece )
594{
595    const tr_torrent * torrent = msgs->torrent;
596
597    return ( !torrent->info.pieces[piece].dnd )                 /* we want it */
598          && ( !tr_cpPieceIsComplete( &torrent->completion, piece ) ) /* !have */
599          && ( tr_bitfieldHas( msgs->peer->have, piece ) );    /* peer has it */
600}
601
602/* "interested" means we'll ask for piece data if they unchoke us */
603static int
604isPeerInteresting( const tr_peermsgs * msgs )
605{
606    tr_piece_index_t    i;
607    const tr_torrent *  torrent;
608    const tr_bitfield * bitfield;
609    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
610
611    if( clientIsSeed )
612        return FALSE;
613
614    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
615        return FALSE;
616
617    torrent = msgs->torrent;
618    bitfield = tr_cpPieceBitfield( &torrent->completion );
619
620    if( !msgs->peer->have )
621        return TRUE;
622
623    assert( bitfield->byteCount == msgs->peer->have->byteCount );
624
625    for( i = 0; i < torrent->info.pieceCount; ++i )
626        if( isPieceInteresting( msgs, i ) )
627            return TRUE;
628
629    return FALSE;
630}
631
632static void
633sendInterest( tr_peermsgs * msgs,
634              int           weAreInterested )
635{
636    struct evbuffer * out = msgs->outMessages;
637
638    assert( msgs );
639    assert( weAreInterested == 0 || weAreInterested == 1 );
640
641    msgs->peer->clientIsInterested = weAreInterested;
642    dbgmsg( msgs, "Sending %s", weAreInterested ? "Interested" : "Not Interested" );
643    tr_peerIoWriteUint32( msgs->peer->io, out, sizeof( uint8_t ) );
644    tr_peerIoWriteUint8 ( msgs->peer->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
645
646    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
647    dbgOutMessageLen( msgs );
648}
649
650static void
651updateInterest( tr_peermsgs * msgs )
652{
653    const int i = isPeerInteresting( msgs );
654
655    if( i != msgs->peer->clientIsInterested )
656        sendInterest( msgs, i );
657    if( i )
658        fireNeedReq( msgs );
659}
660
661static int
662popNextRequest( tr_peermsgs *         msgs,
663                struct peer_request * setme )
664{
665    return reqListPop( &msgs->peerAskedFor, setme );
666}
667
668static void
669cancelAllRequestsToClient( tr_peermsgs * msgs )
670{
671    struct peer_request req;
672    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
673
674    while( popNextRequest( msgs, &req ) )
675        if( mustSendCancel )
676            protocolSendReject( msgs, &req );
677}
678
679void
680tr_peerMsgsSetChoke( tr_peermsgs * msgs,
681                     int           choke )
682{
683    const time_t now = time( NULL );
684    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
685
686    assert( msgs );
687    assert( msgs->peer );
688    assert( choke == 0 || choke == 1 );
689
690    if( msgs->peer->chokeChangedAt > fibrillationTime )
691    {
692        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
693    }
694    else if( msgs->peer->peerIsChoked != choke )
695    {
696        msgs->peer->peerIsChoked = choke;
697        if( choke )
698            cancelAllRequestsToClient( msgs );
699        protocolSendChoke( msgs, choke );
700        msgs->peer->chokeChangedAt = now;
701    }
702}
703
704/**
705***
706**/
707
708void
709tr_peerMsgsHave( tr_peermsgs * msgs,
710                 uint32_t      index )
711{
712    protocolSendHave( msgs, index );
713
714    /* since we have more pieces now, we might not be interested in this peer */
715    updateInterest( msgs );
716}
717
718/**
719***
720**/
721
722static int
723reqIsValid( const tr_peermsgs * peer,
724            uint32_t            index,
725            uint32_t            offset,
726            uint32_t            length )
727{
728    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
729}
730
731static int
732requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
733{
734    return reqIsValid( msgs, req->index, req->offset, req->length );
735}
736
737static void
738expireFromList( tr_peermsgs          * msgs,
739                struct request_list  * list,
740                const time_t           oldestAllowed )
741{
742    size_t i;
743    struct request_list tmp = REQUEST_LIST_INIT;
744
745    /* since the fifo list is sorted by time, the oldest will be first */
746    if( list->fifo[0].time_requested >= oldestAllowed )
747        return;
748
749    /* if we found one too old, start pruning them */
750    reqListCopy( &tmp, list );
751    for( i=0; i<tmp.len; ++i ) {
752        const struct peer_request * req = &tmp.fifo[i];
753        if( req->time_requested >= oldestAllowed )
754            break;
755        tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
756    }
757    reqListClear( &tmp );
758}
759
760static void
761expireOldRequests( tr_peermsgs * msgs, const time_t now  )
762{
763    time_t oldestAllowed;
764    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
765    dbgmsg( msgs, "entering `expire old requests' block" );
766
767    /* cancel requests that have been queued for too long */
768    oldestAllowed = now - QUEUED_REQUEST_TTL_SECS;
769    expireFromList( msgs, &msgs->clientWillAskFor, oldestAllowed );
770
771    /* if the peer doesn't support "Reject Request",
772     * cancel requests that were sent too long ago. */
773    if( !fext ) {
774        oldestAllowed = now - SENT_REQUEST_TTL_SECS;
775        expireFromList( msgs, &msgs->clientAskedFor, oldestAllowed );
776    }
777
778    dbgmsg( msgs, "leaving `expire old requests' block" );
779}
780
781static void
782pumpRequestQueue( tr_peermsgs * msgs, const time_t now )
783{
784    const int           max = msgs->maxActiveRequests;
785    int                 sent = 0;
786    int                 len = msgs->clientAskedFor.len;
787    struct peer_request req;
788
789    if( msgs->peer->clientIsChoked )
790        return;
791    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
792        return;
793
794    while( ( len < max ) && reqListPop( &msgs->clientWillAskFor, &req ) )
795    {
796        const tr_block_index_t block = _tr_block( msgs->torrent, req.index, req.offset );
797
798        assert( requestIsValid( msgs, &req ) );
799        assert( tr_bitfieldHas( msgs->peer->have, req.index ) );
800
801        /* don't ask for it if we've already got it... this block may have
802         * come in from a different peer after we cancelled a request for it */
803        if( !tr_cpBlockIsComplete( &msgs->torrent->completion, block ) )
804        {
805            protocolSendRequest( msgs, &req );
806            req.time_requested = now;
807            reqListAppend( &msgs->clientAskedFor, &req );
808
809            ++len;
810            ++sent;
811        }
812    }
813
814    if( sent )
815        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
816                sent, msgs->clientAskedFor.len, msgs->clientWillAskFor.len );
817
818    if( len < max )
819        fireNeedReq( msgs );
820}
821
822static inline int
823requestQueueIsFull( const tr_peermsgs * msgs )
824{
825    const int req_max = msgs->maxActiveRequests;
826    return msgs->clientWillAskFor.len >= (size_t)req_max;
827}
828
829tr_addreq_t
830tr_peerMsgsAddRequest( tr_peermsgs *    msgs,
831                       uint32_t         index,
832                       uint32_t         offset,
833                       uint32_t         length )
834{
835    struct peer_request req;
836
837    assert( msgs );
838    assert( msgs->torrent );
839    assert( reqIsValid( msgs, index, offset, length ) );
840
841    /**
842    ***  Reasons to decline the request
843    **/
844
845    /* don't send requests to choked clients */
846    if( msgs->peer->clientIsChoked ) {
847        dbgmsg( msgs, "declining request because they're choking us" );
848        return TR_ADDREQ_CLIENT_CHOKED;
849    }
850
851    /* peer doesn't have this piece */
852    if( !tr_bitfieldHas( msgs->peer->have, index ) )
853        return TR_ADDREQ_MISSING;
854
855    /* peer's queue is full */
856    if( requestQueueIsFull( msgs ) ) {
857        dbgmsg( msgs, "declining request because we're full" );
858        return TR_ADDREQ_FULL;
859    }
860
861    /* have we already asked for this piece? */
862    req.index = index;
863    req.offset = offset;
864    req.length = length;
865    if( reqListHas( &msgs->clientAskedFor, &req ) ) {
866        dbgmsg( msgs, "declining because it's a duplicate" );
867        return TR_ADDREQ_DUPLICATE;
868    }
869    if( reqListHas( &msgs->clientWillAskFor, &req ) ) {
870        dbgmsg( msgs, "declining because it's a duplicate" );
871        return TR_ADDREQ_DUPLICATE;
872    }
873
874    /**
875    ***  Accept this request
876    **/
877
878    dbgmsg( msgs, "adding req for %"PRIu32":%"PRIu32"->%"PRIu32" to our `will request' list",
879            index, offset, length );
880    req.time_requested = time( NULL );
881    reqListAppend( &msgs->clientWillAskFor, &req );
882    return TR_ADDREQ_OK;
883}
884
885static void
886cancelAllRequestsToPeer( tr_peermsgs * msgs, tr_bool sendCancel )
887{
888    size_t i;
889    struct request_list a = msgs->clientWillAskFor;
890    struct request_list b = msgs->clientAskedFor;
891    dbgmsg( msgs, "cancelling all requests to peer" );
892
893    msgs->clientAskedFor = REQUEST_LIST_INIT;
894    msgs->clientWillAskFor = REQUEST_LIST_INIT;
895
896    for( i=0; i<a.len; ++i )
897        fireCancelledReq( msgs, &a.fifo[i] );
898
899    for( i = 0; i < b.len; ++i ) {
900        fireCancelledReq( msgs, &b.fifo[i] );
901        if( sendCancel )
902            protocolSendCancel( msgs, &b.fifo[i] );
903    }
904
905    reqListClear( &a );
906    reqListClear( &b );
907}
908
909void
910tr_peerMsgsCancel( tr_peermsgs * msgs,
911                   uint32_t      pieceIndex,
912                   uint32_t      offset,
913                   uint32_t      length )
914{
915    struct peer_request req;
916
917    assert( msgs != NULL );
918    assert( length > 0 );
919
920
921    /* have we asked the peer for this piece? */
922    req.index = pieceIndex;
923    req.offset = offset;
924    req.length = length;
925
926    /* if it's only in the queue and hasn't been sent yet, free it */
927    if( reqListRemove( &msgs->clientWillAskFor, &req ) ) {
928        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length );
929        fireCancelledReq( msgs, &req );
930    }
931
932    /* if it's already been sent, send a cancel message too */
933    if( reqListRemove( &msgs->clientAskedFor, &req ) ) {
934        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length );
935        protocolSendCancel( msgs, &req );
936        fireCancelledReq( msgs, &req );
937    }
938}
939
940
941/**
942***
943**/
944
945static void
946sendLtepHandshake( tr_peermsgs * msgs )
947{
948    tr_benc val, *m;
949    char * buf;
950    int len;
951    int pex;
952    struct evbuffer * out = msgs->outMessages;
953
954    if( msgs->clientSentLtepHandshake )
955        return;
956
957    dbgmsg( msgs, "sending an ltep handshake" );
958    msgs->clientSentLtepHandshake = 1;
959
960    /* decide if we want to advertise pex support */
961    if( !tr_torrentAllowsPex( msgs->torrent ) )
962        pex = 0;
963    else if( msgs->peerSentLtepHandshake )
964        pex = msgs->peerSupportsPex ? 1 : 0;
965    else
966        pex = 1;
967
968    tr_bencInitDict( &val, 5 );
969    tr_bencDictAddInt( &val, "e", msgs->session->encryptionMode != TR_CLEAR_PREFERRED );
970    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->session ) );
971    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
972    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
973    m  = tr_bencDictAddDict( &val, "m", 1 );
974    if( pex )
975        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
976    buf = tr_bencSave( &val, &len );
977
978    tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + len );
979    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
980    tr_peerIoWriteUint8 ( msgs->peer->io, out, LTEP_HANDSHAKE );
981    tr_peerIoWriteBytes ( msgs->peer->io, out, buf, len );
982    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
983    dbgOutMessageLen( msgs );
984
985    /* cleanup */
986    tr_bencFree( &val );
987    tr_free( buf );
988}
989
990static void
991parseLtepHandshake( tr_peermsgs *     msgs,
992                    int               len,
993                    struct evbuffer * inbuf )
994{
995    int64_t   i;
996    tr_benc   val, * sub;
997    uint8_t * tmp = tr_new( uint8_t, len );
998
999    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
1000    msgs->peerSentLtepHandshake = 1;
1001
1002    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
1003    {
1004        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
1005        tr_free( tmp );
1006        return;
1007    }
1008
1009    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
1010
1011    /* does the peer prefer encrypted connections? */
1012    if( tr_bencDictFindInt( &val, "e", &i ) )
1013        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1014                                              : ENCRYPTION_PREFERENCE_NO;
1015
1016    /* check supported messages for utorrent pex */
1017    msgs->peerSupportsPex = 0;
1018    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
1019        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
1020            msgs->ut_pex_id = (uint8_t) i;
1021            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
1022            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
1023        }
1024    }
1025
1026    /* look for upload_only (BEP 21) */
1027    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
1028        fireUploadOnly( msgs, i!=0 );
1029
1030    /* get peer's listening port */
1031    if( tr_bencDictFindInt( &val, "p", &i ) ) {
1032        msgs->peer->port = htons( (uint16_t)i );
1033        dbgmsg( msgs, "msgs->port is now %hu", msgs->peer->port );
1034    }
1035
1036    /* get peer's maximum request queue size */
1037    if( tr_bencDictFindInt( &val, "reqq", &i ) )
1038        msgs->reqq = i;
1039
1040    tr_bencFree( &val );
1041    tr_free( tmp );
1042}
1043
1044static void
1045parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1046{
1047    int loaded = 0;
1048    uint8_t * tmp = tr_new( uint8_t, msglen );
1049    tr_benc val;
1050    const tr_torrent * tor = msgs->torrent;
1051    const uint8_t * added;
1052    size_t added_len;
1053
1054    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1055
1056    if( tr_torrentAllowsPex( tor )
1057      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1058    {
1059        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1060        {
1061            const uint8_t * added_f = NULL;
1062            tr_pex *        pex;
1063            size_t          i, n;
1064            size_t          added_f_len = 0;
1065            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1066            pex =
1067                tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len,
1068                                        &n );
1069            for( i = 0; i < n; ++i )
1070                tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1071                                  TR_PEER_FROM_PEX, pex + i );
1072            tr_free( pex );
1073        }
1074       
1075        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1076        {
1077            const uint8_t * added_f = NULL;
1078            tr_pex *        pex;
1079            size_t          i, n;
1080            size_t          added_f_len = 0;
1081            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1082            pex =
1083                tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len,
1084                                         &n );
1085            for( i = 0; i < n; ++i )
1086                tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1087                                  TR_PEER_FROM_PEX, pex + i );
1088            tr_free( pex );
1089        }
1090       
1091    }
1092
1093    if( loaded )
1094        tr_bencFree( &val );
1095    tr_free( tmp );
1096}
1097
1098static void sendPex( tr_peermsgs * msgs );
1099
1100static void
1101parseLtep( tr_peermsgs *     msgs,
1102           int               msglen,
1103           struct evbuffer * inbuf )
1104{
1105    uint8_t ltep_msgid;
1106
1107    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1108    msglen--;
1109
1110    if( ltep_msgid == LTEP_HANDSHAKE )
1111    {
1112        dbgmsg( msgs, "got ltep handshake" );
1113        parseLtepHandshake( msgs, msglen, inbuf );
1114        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1115        {
1116            sendLtepHandshake( msgs );
1117            sendPex( msgs );
1118        }
1119    }
1120    else if( ltep_msgid == TR_LTEP_PEX )
1121    {
1122        dbgmsg( msgs, "got ut pex" );
1123        msgs->peerSupportsPex = 1;
1124        parseUtPex( msgs, msglen, inbuf );
1125    }
1126    else
1127    {
1128        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1129        evbuffer_drain( inbuf, msglen );
1130    }
1131}
1132
1133static int
1134readBtLength( tr_peermsgs *     msgs,
1135              struct evbuffer * inbuf,
1136              size_t            inlen )
1137{
1138    uint32_t len;
1139
1140    if( inlen < sizeof( len ) )
1141        return READ_LATER;
1142
1143    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1144
1145    if( len == 0 ) /* peer sent us a keepalive message */
1146        dbgmsg( msgs, "got KeepAlive" );
1147    else
1148    {
1149        msgs->incoming.length = len;
1150        msgs->state = AWAITING_BT_ID;
1151    }
1152
1153    return READ_NOW;
1154}
1155
1156static int readBtMessage( tr_peermsgs *     msgs,
1157                          struct evbuffer * inbuf,
1158                          size_t            inlen );
1159
1160static int
1161readBtId( tr_peermsgs *     msgs,
1162          struct evbuffer * inbuf,
1163          size_t            inlen )
1164{
1165    uint8_t id;
1166
1167    if( inlen < sizeof( uint8_t ) )
1168        return READ_LATER;
1169
1170    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1171    msgs->incoming.id = id;
1172
1173    if( id == BT_PIECE )
1174    {
1175        msgs->state = AWAITING_BT_PIECE;
1176        return READ_NOW;
1177    }
1178    else if( msgs->incoming.length != 1 )
1179    {
1180        msgs->state = AWAITING_BT_MESSAGE;
1181        return READ_NOW;
1182    }
1183    else return readBtMessage( msgs, inbuf, inlen - 1 );
1184}
1185
1186static void
1187updatePeerProgress( tr_peermsgs * msgs )
1188{
1189    msgs->peer->progress = tr_bitfieldCountTrueBits( msgs->peer->have ) / (float)msgs->torrent->info.pieceCount;
1190    dbgmsg( msgs, "peer progress is %f", msgs->peer->progress );
1191    updateFastSet( msgs );
1192    updateInterest( msgs );
1193    firePeerProgress( msgs );
1194}
1195
1196static void
1197peerMadeRequest( tr_peermsgs *               msgs,
1198                 const struct peer_request * req )
1199{
1200    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1201    const int reqIsValid = requestIsValid( msgs, req );
1202    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( &msgs->torrent->completion, req->index );
1203    const int peerIsChoked = msgs->peer->peerIsChoked;
1204
1205    int allow = FALSE;
1206
1207    if( !reqIsValid )
1208        dbgmsg( msgs, "rejecting an invalid request." );
1209    else if( !clientHasPiece )
1210        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1211    else if( peerIsChoked )
1212        dbgmsg( msgs, "rejecting request from choked peer" );
1213    else
1214        allow = TRUE;
1215
1216    if( allow )
1217        reqListAppend( &msgs->peerAskedFor, req );
1218    else if( fext )
1219        protocolSendReject( msgs, req );
1220}
1221
1222static int
1223messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1224{
1225    switch( id )
1226    {
1227        case BT_CHOKE:
1228        case BT_UNCHOKE:
1229        case BT_INTERESTED:
1230        case BT_NOT_INTERESTED:
1231        case BT_FEXT_HAVE_ALL:
1232        case BT_FEXT_HAVE_NONE:
1233            return len == 1;
1234
1235        case BT_HAVE:
1236        case BT_FEXT_SUGGEST:
1237        case BT_FEXT_ALLOWED_FAST:
1238            return len == 5;
1239
1240        case BT_BITFIELD:
1241            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1242
1243        case BT_REQUEST:
1244        case BT_CANCEL:
1245        case BT_FEXT_REJECT:
1246            return len == 13;
1247
1248        case BT_PIECE:
1249            return len > 9 && len <= 16393;
1250
1251        case BT_PORT:
1252            return len == 3;
1253
1254        case BT_LTEP:
1255            return len >= 2;
1256
1257        default:
1258            return FALSE;
1259    }
1260}
1261
1262static int clientGotBlock( tr_peermsgs *               msgs,
1263                           const uint8_t *             block,
1264                           const struct peer_request * req );
1265
1266static int
1267readBtPiece( tr_peermsgs      * msgs,
1268             struct evbuffer  * inbuf,
1269             size_t             inlen,
1270             size_t           * setme_piece_bytes_read )
1271{
1272    struct peer_request * req = &msgs->incoming.blockReq;
1273
1274    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1275    dbgmsg( msgs, "In readBtPiece" );
1276
1277    if( !req->length )
1278    {
1279        if( inlen < 8 )
1280            return READ_LATER;
1281
1282        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1283        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1284        req->length = msgs->incoming.length - 9;
1285        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1286        return READ_NOW;
1287    }
1288    else
1289    {
1290        int err;
1291
1292        /* read in another chunk of data */
1293        const size_t nLeft = req->length - EVBUFFER_LENGTH( msgs->incoming.block );
1294        size_t n = MIN( nLeft, inlen );
1295        size_t i = n;
1296
1297        while( i > 0 )
1298        {
1299            uint8_t buf[MAX_STACK_ARRAY_SIZE];
1300            const size_t thisPass = MIN( i, sizeof( buf ) );
1301            tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, thisPass );
1302            evbuffer_add( msgs->incoming.block, buf, thisPass );
1303            i -= thisPass;
1304        }
1305
1306        fireClientGotData( msgs, n, TRUE );
1307        *setme_piece_bytes_read += n;
1308        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1309               n, req->index, req->offset, req->length,
1310               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1311        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1312            return READ_LATER;
1313
1314        /* we've got the whole block ... process it */
1315        err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), req );
1316
1317        /* cleanup */
1318        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH( msgs->incoming.block ) );
1319        req->length = 0;
1320        msgs->state = AWAITING_BT_LENGTH;
1321        if( !err )
1322            return READ_NOW;
1323        else {
1324            fireError( msgs, err );
1325            return READ_ERR;
1326        }
1327    }
1328}
1329
1330static int
1331readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1332{
1333    uint32_t      ui32;
1334    uint32_t      msglen = msgs->incoming.length;
1335    const uint8_t id = msgs->incoming.id;
1336    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1337    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1338
1339    --msglen; /* id length */
1340
1341    if( inlen < msglen )
1342        return READ_LATER;
1343
1344    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1345
1346    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1347    {
1348        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1349        fireError( msgs, EMSGSIZE );
1350        return READ_ERR;
1351    }
1352
1353    switch( id )
1354    {
1355        case BT_CHOKE:
1356            dbgmsg( msgs, "got Choke" );
1357            msgs->peer->clientIsChoked = 1;
1358            if( !fext )
1359                cancelAllRequestsToPeer( msgs, FALSE );
1360            break;
1361
1362        case BT_UNCHOKE:
1363            dbgmsg( msgs, "got Unchoke" );
1364            msgs->peer->clientIsChoked = 0;
1365            fireNeedReq( msgs );
1366            break;
1367
1368        case BT_INTERESTED:
1369            dbgmsg( msgs, "got Interested" );
1370            msgs->peer->peerIsInterested = 1;
1371            break;
1372
1373        case BT_NOT_INTERESTED:
1374            dbgmsg( msgs, "got Not Interested" );
1375            msgs->peer->peerIsInterested = 0;
1376            break;
1377
1378        case BT_HAVE:
1379            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1380            dbgmsg( msgs, "got Have: %u", ui32 );
1381            if( tr_bitfieldAdd( msgs->peer->have, ui32 ) ) {
1382                fireError( msgs, ERANGE );
1383                return READ_ERR;
1384            }
1385            updatePeerProgress( msgs );
1386            tr_rcTransferred( &msgs->torrent->swarmSpeed,
1387                              msgs->torrent->info.pieceSize );
1388            break;
1389
1390        case BT_BITFIELD:
1391        {
1392            dbgmsg( msgs, "got a bitfield" );
1393            tr_peerIoReadBytes( msgs->peer->io, inbuf, msgs->peer->have->bits, msglen );
1394            updatePeerProgress( msgs );
1395            fireNeedReq( msgs );
1396            break;
1397        }
1398
1399        case BT_REQUEST:
1400        {
1401            struct peer_request r;
1402            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1403            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1404            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1405            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1406            peerMadeRequest( msgs, &r );
1407            break;
1408        }
1409
1410        case BT_CANCEL:
1411        {
1412            struct peer_request r;
1413            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1414            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1415            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1416            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1417            if( reqListRemove( &msgs->peerAskedFor, &r ) && fext )
1418                protocolSendReject( msgs, &r );
1419            break;
1420        }
1421
1422        case BT_PIECE:
1423            assert( 0 ); /* handled elsewhere! */
1424            break;
1425
1426        case BT_PORT:
1427            dbgmsg( msgs, "Got a BT_PORT" );
1428            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->port );
1429            break;
1430
1431        case BT_FEXT_SUGGEST:
1432            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1433            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1434            if( fext )
1435                fireClientGotSuggest( msgs, ui32 );
1436            else {
1437                fireError( msgs, EMSGSIZE );
1438                return READ_ERR;
1439            }
1440            break;
1441
1442        case BT_FEXT_ALLOWED_FAST:
1443            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1444            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1445            if( fext )
1446                fireClientGotAllowedFast( msgs, ui32 );
1447            else {
1448                fireError( msgs, EMSGSIZE );
1449                return READ_ERR;
1450            }
1451            break;
1452
1453        case BT_FEXT_HAVE_ALL:
1454            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1455            if( fext ) {
1456                tr_bitfieldAddRange( msgs->peer->have, 0, msgs->torrent->info.pieceCount );
1457                updatePeerProgress( msgs );
1458            } else {
1459                fireError( msgs, EMSGSIZE );
1460                return READ_ERR;
1461            }
1462            break;
1463
1464        case BT_FEXT_HAVE_NONE:
1465            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1466            if( fext ) {
1467                tr_bitfieldClear( msgs->peer->have );
1468                updatePeerProgress( msgs );
1469            } else {
1470                fireError( msgs, EMSGSIZE );
1471                return READ_ERR;
1472            }
1473            break;
1474
1475        case BT_FEXT_REJECT:
1476        {
1477            struct peer_request r;
1478            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1479            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1480            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1481            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1482            if( fext )
1483                reqListRemove( &msgs->clientAskedFor, &r );
1484            else {
1485                fireError( msgs, EMSGSIZE );
1486                return READ_ERR;
1487            }
1488            break;
1489        }
1490
1491        case BT_LTEP:
1492            dbgmsg( msgs, "Got a BT_LTEP" );
1493            parseLtep( msgs, msglen, inbuf );
1494            break;
1495
1496        default:
1497            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1498            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1499            break;
1500    }
1501
1502    assert( msglen + 1 == msgs->incoming.length );
1503    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1504
1505    msgs->state = AWAITING_BT_LENGTH;
1506    return READ_NOW;
1507}
1508
1509static inline void
1510decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount )
1511{
1512    tr_torrent * tor = msgs->torrent;
1513
1514    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1515}
1516
1517static inline void
1518clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1519{
1520    decrementDownloadedCount( msgs, req->length );
1521}
1522
1523static void
1524addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1525{
1526    if( !msgs->peer->blame )
1527         msgs->peer->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1528    tr_bitfieldAdd( msgs->peer->blame, index );
1529}
1530
1531/* returns 0 on success, or an errno on failure */
1532static int
1533clientGotBlock( tr_peermsgs *               msgs,
1534                const uint8_t *             data,
1535                const struct peer_request * req )
1536{
1537    int err;
1538    tr_torrent * tor = msgs->torrent;
1539    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1540
1541    assert( msgs );
1542    assert( req );
1543
1544    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1545        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1546                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1547        return EMSGSIZE;
1548    }
1549
1550    /* save the block */
1551    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1552
1553    /**
1554    *** Remove the block from our `we asked for this' list
1555    **/
1556
1557    if( !reqListRemove( &msgs->clientAskedFor, req ) ) {
1558        clientGotUnwantedBlock( msgs, req );
1559        dbgmsg( msgs, "we didn't ask for this message..." );
1560        return 0;
1561    }
1562
1563    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1564            msgs->clientAskedFor.len );
1565
1566    /**
1567    *** Error checks
1568    **/
1569
1570    if( tr_cpBlockIsComplete( &tor->completion, block ) ) {
1571        dbgmsg( msgs, "we have this block already..." );
1572        clientGotUnwantedBlock( msgs, req );
1573        return 0;
1574    }
1575
1576    /**
1577    ***  Save the block
1578    **/
1579
1580    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1581        return err;
1582
1583    addPeerToBlamefield( msgs, req->index );
1584    fireGotBlock( msgs, req );
1585    return 0;
1586}
1587
1588static int peerPulse( void * vmsgs );
1589
1590static void
1591didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1592{
1593    tr_peermsgs * msgs = vmsgs;
1594    firePeerGotData( msgs, bytesWritten, wasPieceData );
1595    peerPulse( msgs );
1596}
1597
1598static ReadState
1599canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1600{
1601    ReadState         ret;
1602    tr_peermsgs *     msgs = vmsgs;
1603    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1604    const size_t      inlen = EVBUFFER_LENGTH( in );
1605
1606    if( !inlen )
1607    {
1608        ret = READ_LATER;
1609    }
1610    else if( msgs->state == AWAITING_BT_PIECE )
1611    {
1612        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1613    }
1614    else switch( msgs->state )
1615    {
1616        case AWAITING_BT_LENGTH:
1617            ret = readBtLength ( msgs, in, inlen ); break;
1618
1619        case AWAITING_BT_ID:
1620            ret = readBtId     ( msgs, in, inlen ); break;
1621
1622        case AWAITING_BT_MESSAGE:
1623            ret = readBtMessage( msgs, in, inlen ); break;
1624
1625        default:
1626            assert( 0 );
1627    }
1628
1629    /* log the raw data that was read */
1630    if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
1631        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1632
1633    return ret;
1634}
1635
1636/**
1637***
1638**/
1639
1640static int
1641ratePulse( void * vmsgs )
1642{
1643    tr_peermsgs * msgs = vmsgs;
1644    const double rateToClient = tr_peerGetPieceSpeed( msgs->peer, TR_PEER_TO_CLIENT );
1645    const int seconds = 10;
1646    const int floor = 8;
1647    const int estimatedBlocksInPeriod = ( rateToClient * seconds * 1024 ) / msgs->torrent->blockSize;
1648
1649    msgs->maxActiveRequests = floor + estimatedBlocksInPeriod;
1650
1651    if( msgs->reqq > 0 )
1652        msgs->maxActiveRequests = MIN( msgs->maxActiveRequests, msgs->reqq );
1653
1654    return TRUE;
1655}
1656
1657static size_t
1658fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1659{
1660    size_t bytesWritten = 0;
1661    struct peer_request req;
1662    const tr_bool haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1663    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1664
1665    /**
1666    ***  Protocol messages
1667    **/
1668
1669    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1670    {
1671        dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1672        msgs->outMessagesBatchedAt = now;
1673    }
1674    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1675    {
1676        const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
1677        /* flush the protocol messages */
1678        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1679        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, FALSE );
1680        msgs->clientSentAnythingAt = now;
1681        msgs->outMessagesBatchedAt = 0;
1682        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1683        bytesWritten +=  len;
1684    }
1685
1686    /**
1687    ***  Blocks
1688    **/
1689
1690    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io ) >= msgs->torrent->blockSize )
1691        && popNextRequest( msgs, &req ) )
1692    {
1693        if( requestIsValid( msgs, &req )
1694            && tr_cpPieceIsComplete( &msgs->torrent->completion, req.index ) )
1695        {
1696            int err;
1697            static uint8_t * buf = NULL;
1698
1699            if( buf == NULL )
1700                buf = tr_new( uint8_t, MAX_BLOCK_SIZE );
1701
1702            /* send a block */
1703            if(( err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf ))) {
1704                fireError( msgs, err );
1705                bytesWritten = 0;
1706                msgs = NULL;
1707            } else {
1708                tr_peerIo * io = msgs->peer->io;
1709                struct evbuffer * out = tr_getBuffer( );
1710                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1711                tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1712                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1713                tr_peerIoWriteUint32( io, out, req.index );
1714                tr_peerIoWriteUint32( io, out, req.offset );
1715                tr_peerIoWriteBytes ( io, out, buf, req.length );
1716                tr_peerIoWriteBuf( io, out, TRUE );
1717                bytesWritten += EVBUFFER_LENGTH( out );
1718                msgs->clientSentAnythingAt = now;
1719                tr_releaseBuffer( out );
1720            }
1721        }
1722        else if( fext ) /* peer needs a reject message */
1723        {
1724            protocolSendReject( msgs, &req );
1725        }
1726    }
1727
1728    /**
1729    ***  Keepalive
1730    **/
1731
1732    if( ( msgs != NULL )
1733        && ( msgs->clientSentAnythingAt != 0 )
1734        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1735    {
1736        dbgmsg( msgs, "sending a keepalive message" );
1737        tr_peerIoWriteUint32( msgs->peer->io, msgs->outMessages, 0 );
1738        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1739    }
1740
1741    return bytesWritten;
1742}
1743
1744static int
1745peerPulse( void * vmsgs )
1746{
1747    tr_peermsgs * msgs = vmsgs;
1748    const time_t  now = time( NULL );
1749
1750    ratePulse( msgs );
1751
1752    pumpRequestQueue( msgs, now );
1753    expireOldRequests( msgs, now );
1754
1755    for( ;; )
1756        if( fillOutputBuffer( msgs, now ) < 1 )
1757            break;
1758
1759    return TRUE; /* loop forever */
1760}
1761
1762void
1763tr_peerMsgsPulse( tr_peermsgs * msgs )
1764{
1765    if( msgs != NULL )
1766        peerPulse( msgs );
1767}
1768
1769static void
1770gotError( tr_peerIo  * io UNUSED,
1771          short        what,
1772          void       * vmsgs )
1773{
1774    if( what & EVBUFFER_TIMEOUT )
1775        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1776    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1777        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1778               what, errno, tr_strerror( errno ) );
1779    fireError( vmsgs, ENOTCONN );
1780}
1781
1782static void
1783sendBitfield( tr_peermsgs * msgs )
1784{
1785    struct evbuffer * out = msgs->outMessages;
1786    tr_bitfield *     field;
1787    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
1788    size_t            i;
1789    size_t            lazyCount = 0;
1790
1791    field = tr_bitfieldDup( tr_cpPieceBitfield( &msgs->torrent->completion ) );
1792
1793    if( tr_sessionIsLazyBitfieldEnabled( msgs->session ) )
1794    {
1795        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
1796            speed over a truly random sample -- let's limit the pool size to
1797            the first 1000 pieces so large torrents don't bog things down */
1798        size_t poolSize;
1799        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
1800        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
1801
1802        /* build the pool */
1803        for( i=poolSize=0; i<maxPoolSize; ++i )
1804            if( tr_bitfieldHas( field, i ) )
1805                pool[poolSize++] = i;
1806
1807        /* pull random piece indices from the pool */
1808        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
1809        {
1810            const int pos = tr_cryptoWeakRandInt( poolSize );
1811            const tr_piece_index_t piece = pool[pos];
1812            tr_bitfieldRem( field, piece );
1813            lazyPieces[lazyCount++] = piece;
1814            pool[pos] = pool[--poolSize];
1815        }
1816
1817        /* cleanup */
1818        tr_free( pool );
1819    }
1820
1821    tr_peerIoWriteUint32( msgs->peer->io, out,
1822                          sizeof( uint8_t ) + field->byteCount );
1823    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_BITFIELD );
1824    tr_peerIoWriteBytes ( msgs->peer->io, out, field->bits, field->byteCount );
1825    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu",
1826            EVBUFFER_LENGTH( out ) );
1827    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1828
1829    for( i = 0; i < lazyCount; ++i )
1830        protocolSendHave( msgs, lazyPieces[i] );
1831
1832    tr_bitfieldFree( field );
1833}
1834
1835static void
1836tellPeerWhatWeHave( tr_peermsgs * msgs )
1837{
1838    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1839
1840    if( fext && ( tr_cpGetStatus( &msgs->torrent->completion ) == TR_SEED ) )
1841    {
1842        protocolSendHaveAll( msgs );
1843    }
1844    else if( fext && ( tr_cpHaveValid( &msgs->torrent->completion ) == 0 ) )
1845    {
1846        protocolSendHaveNone( msgs );
1847    }
1848    else
1849    {
1850        sendBitfield( msgs );
1851    }
1852}
1853
1854/**
1855***
1856**/
1857
1858/* some peers give us error messages if we send
1859   more than this many peers in a single pex message
1860   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1861#define MAX_PEX_ADDED 50
1862#define MAX_PEX_DROPPED 50
1863
1864typedef struct
1865{
1866    tr_pex *  added;
1867    tr_pex *  dropped;
1868    tr_pex *  elements;
1869    int       addedCount;
1870    int       droppedCount;
1871    int       elementCount;
1872}
1873PexDiffs;
1874
1875static void
1876pexAddedCb( void * vpex,
1877            void * userData )
1878{
1879    PexDiffs * diffs = userData;
1880    tr_pex *   pex = vpex;
1881
1882    if( diffs->addedCount < MAX_PEX_ADDED )
1883    {
1884        diffs->added[diffs->addedCount++] = *pex;
1885        diffs->elements[diffs->elementCount++] = *pex;
1886    }
1887}
1888
1889static inline void
1890pexDroppedCb( void * vpex,
1891              void * userData )
1892{
1893    PexDiffs * diffs = userData;
1894    tr_pex *   pex = vpex;
1895
1896    if( diffs->droppedCount < MAX_PEX_DROPPED )
1897    {
1898        diffs->dropped[diffs->droppedCount++] = *pex;
1899    }
1900}
1901
1902static inline void
1903pexElementCb( void * vpex,
1904              void * userData )
1905{
1906    PexDiffs * diffs = userData;
1907    tr_pex * pex = vpex;
1908
1909    diffs->elements[diffs->elementCount++] = *pex;
1910}
1911
1912static void
1913sendPex( tr_peermsgs * msgs )
1914{
1915    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
1916    {
1917        PexDiffs diffs;
1918        PexDiffs diffs6;
1919        tr_pex * newPex = NULL;
1920        tr_pex * newPex6 = NULL;
1921        const int newCount = tr_peerMgrGetPeers( msgs->session->peerMgr,
1922                                                 msgs->torrent->info.hash,
1923                                                 &newPex, TR_AF_INET );
1924        const int newCount6 = tr_peerMgrGetPeers( msgs->session->peerMgr,
1925                                                  msgs->torrent->info.hash,
1926                                                  &newPex6, TR_AF_INET6 );
1927
1928        /* build the diffs */
1929        diffs.added = tr_new( tr_pex, newCount );
1930        diffs.addedCount = 0;
1931        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
1932        diffs.droppedCount = 0;
1933        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
1934        diffs.elementCount = 0;
1935        tr_set_compare( msgs->pex, msgs->pexCount,
1936                        newPex, newCount,
1937                        tr_pexCompare, sizeof( tr_pex ),
1938                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
1939        diffs6.added = tr_new( tr_pex, newCount6 );
1940        diffs6.addedCount = 0;
1941        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
1942        diffs6.droppedCount = 0;
1943        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
1944        diffs6.elementCount = 0;
1945        tr_set_compare( msgs->pex6, msgs->pexCount6,
1946                        newPex6, newCount6,
1947                        tr_pexCompare, sizeof( tr_pex ),
1948                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
1949        dbgmsg(
1950            msgs,
1951            "pex: old peer count %d, new peer count %d, added %d, removed %d",
1952            msgs->pexCount, newCount + newCount6,
1953            diffs.addedCount + diffs6.addedCount,
1954            diffs.droppedCount + diffs6.droppedCount );
1955
1956        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
1957            !diffs6.droppedCount )
1958        {
1959            tr_free( diffs.elements );
1960            tr_free( diffs6.elements );
1961        }
1962        else
1963        {
1964            int  i;
1965            tr_benc val;
1966            char * benc;
1967            int bencLen;
1968            uint8_t * tmp, *walk;
1969            struct evbuffer * out = msgs->outMessages;
1970
1971            /* update peer */
1972            tr_free( msgs->pex );
1973            msgs->pex = diffs.elements;
1974            msgs->pexCount = diffs.elementCount;
1975            tr_free( msgs->pex6 );
1976            msgs->pex6 = diffs6.elements;
1977            msgs->pexCount6 = diffs6.elementCount;
1978
1979            /* build the pex payload */
1980            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
1981                                         * speed vs. likelihood? */
1982
1983            /* "added" */
1984            tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
1985            for( i = 0; i < diffs.addedCount; ++i )
1986            {
1987                tr_suspectAddress( &diffs.added[i].addr, "pex" );
1988                memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
1989                memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
1990            }
1991            assert( ( walk - tmp ) == diffs.addedCount * 6 );
1992            tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
1993            tr_free( tmp );
1994
1995            /* "added.f" */
1996            tmp = walk = tr_new( uint8_t, diffs.addedCount );
1997            for( i = 0; i < diffs.addedCount; ++i )
1998                *walk++ = diffs.added[i].flags;
1999            assert( ( walk - tmp ) == diffs.addedCount );
2000            tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2001            tr_free( tmp );
2002
2003            /* "dropped" */
2004            tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2005            for( i = 0; i < diffs.droppedCount; ++i )
2006            {
2007                memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2008                memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2009            }
2010            assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2011            tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2012            tr_free( tmp );
2013           
2014            /* "added6" */
2015            tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2016            for( i = 0; i < diffs6.addedCount; ++i )
2017            {
2018                tr_suspectAddress( &diffs6.added[i].addr, "pex6" );
2019                memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2020                walk += 16;
2021                memcpy( walk, &diffs6.added[i].port, 2 );
2022                walk += 2;
2023            }
2024            assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2025            tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2026            tr_free( tmp );
2027           
2028            /* "added6.f" */
2029            tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2030            for( i = 0; i < diffs6.addedCount; ++i )
2031                *walk++ = diffs6.added[i].flags;
2032            assert( ( walk - tmp ) == diffs6.addedCount );
2033            tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2034            tr_free( tmp );
2035           
2036            /* "dropped6" */
2037            tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2038            for( i = 0; i < diffs6.droppedCount; ++i )
2039            {
2040                memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2041                walk += 16;
2042                memcpy( walk, &diffs6.dropped[i].port, 2 );
2043                walk += 2;
2044            }
2045            assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2046            tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2047            tr_free( tmp );
2048
2049            /* write the pex message */
2050            benc = tr_bencSave( &val, &bencLen );
2051            tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + bencLen );
2052            tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
2053            tr_peerIoWriteUint8 ( msgs->peer->io, out, msgs->ut_pex_id );
2054            tr_peerIoWriteBytes ( msgs->peer->io, out, benc, bencLen );
2055            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2056            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
2057
2058            tr_free( benc );
2059            tr_bencFree( &val );
2060        }
2061
2062        /* cleanup */
2063        tr_free( diffs.added );
2064        tr_free( diffs.dropped );
2065        tr_free( newPex );
2066        tr_free( diffs6.added );
2067        tr_free( diffs6.dropped );
2068        tr_free( newPex6 );
2069
2070        msgs->clientSentPexAt = time( NULL );
2071    }
2072}
2073
2074static inline int
2075pexPulse( void * vpeer )
2076{
2077    sendPex( vpeer );
2078    return TRUE;
2079}
2080
2081/**
2082***
2083**/
2084
2085tr_peermsgs*
2086tr_peerMsgsNew( struct tr_torrent * torrent,
2087                struct tr_peer    * peer,
2088                tr_delivery_func    func,
2089                void              * userData,
2090                tr_publisher_tag  * setme )
2091{
2092    tr_peermsgs * m;
2093
2094    assert( peer );
2095    assert( peer->io );
2096
2097    m = tr_new0( tr_peermsgs, 1 );
2098    m->publisher = TR_PUBLISHER_INIT;
2099    m->peer = peer;
2100    m->session = torrent->session;
2101    m->torrent = torrent;
2102    m->peer->clientIsChoked = 1;
2103    m->peer->peerIsChoked = 1;
2104    m->peer->clientIsInterested = 0;
2105    m->peer->peerIsInterested = 0;
2106    m->peer->have = tr_bitfieldNew( torrent->info.pieceCount );
2107    m->state = AWAITING_BT_LENGTH;
2108    m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL );
2109    m->outMessages = evbuffer_new( );
2110    m->outMessagesBatchedAt = 0;
2111    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2112    m->incoming.block = evbuffer_new( );
2113    m->peerAskedFor = REQUEST_LIST_INIT;
2114    m->clientAskedFor = REQUEST_LIST_INIT;
2115    m->clientWillAskFor = REQUEST_LIST_INIT;
2116    peer->msgs = m;
2117
2118    *setme = tr_publisherSubscribe( &m->publisher, func, userData );
2119
2120    if( tr_peerIoSupportsLTEP( peer->io ) )
2121        sendLtepHandshake( m );
2122
2123    tellPeerWhatWeHave( m );
2124
2125    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2126    ratePulse( m );
2127
2128    return m;
2129}
2130
2131void
2132tr_peerMsgsFree( tr_peermsgs* msgs )
2133{
2134    if( msgs )
2135    {
2136        tr_timerFree( &msgs->pexTimer );
2137        tr_publisherDestruct( &msgs->publisher );
2138        reqListClear( &msgs->clientWillAskFor );
2139        reqListClear( &msgs->clientAskedFor );
2140        reqListClear( &msgs->peerAskedFor );
2141
2142        evbuffer_free( msgs->incoming.block );
2143        evbuffer_free( msgs->outMessages );
2144        tr_free( msgs->pex6 );
2145        tr_free( msgs->pex );
2146
2147        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2148        tr_free( msgs );
2149    }
2150}
2151
2152void
2153tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2154                        tr_publisher_tag tag )
2155{
2156    tr_publisherUnsubscribe( &peer->publisher, tag );
2157}
2158
Note: See TracBrowser for help on using the repository browser.