source: trunk/libtransmission/peer-msgs.c @ 10332

Last change on this file since 10332 was 10332, checked in by charles, 12 years ago

(trunk) #2993 "'Downloaded' much greater than 'Have' or 'verified'" -- add new congestion-based throttle for 2.0

  • Property svn:keywords set to Date Rev Author Id
File size: 71.1 KB
Line 
1/*
2 * This file Copyright (C) 2007-2010 Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 10332 2010-03-08 04:29:58Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdarg.h>
17#include <stdio.h>
18#include <stdlib.h>
19#include <string.h>
20
21#include <event.h>
22
23#include "transmission.h"
24#include "bencode.h"
25#include "completion.h"
26#include "crypto.h"
27#include "inout.h"
28#ifdef WIN32
29#include "net.h" /* for ECONN */
30#endif
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-msgs.h"
34#include "session.h"
35#include "stats.h"
36#include "torrent.h"
37#include "torrent-magnet.h"
38#include "tr-dht.h"
39#include "utils.h"
40#include "version.h"
41
42/**
43***
44**/
45
46enum
47{
48    BT_CHOKE                = 0,
49    BT_UNCHOKE              = 1,
50    BT_INTERESTED           = 2,
51    BT_NOT_INTERESTED       = 3,
52    BT_HAVE                 = 4,
53    BT_BITFIELD             = 5,
54    BT_REQUEST              = 6,
55    BT_PIECE                = 7,
56    BT_CANCEL               = 8,
57    BT_PORT                 = 9,
58
59    BT_FEXT_SUGGEST         = 13,
60    BT_FEXT_HAVE_ALL        = 14,
61    BT_FEXT_HAVE_NONE       = 15,
62    BT_FEXT_REJECT          = 16,
63    BT_FEXT_ALLOWED_FAST    = 17,
64
65    BT_LTEP                 = 20,
66
67    LTEP_HANDSHAKE          = 0,
68
69    UT_PEX_ID               = 1,
70    UT_METADATA_ID          = 3,
71
72    MAX_PEX_PEER_COUNT      = 50,
73
74    MIN_CHOKE_PERIOD_SEC    = 10,
75
76    /* idle seconds before we send a keepalive */
77    KEEPALIVE_INTERVAL_SECS = 100,
78
79    PEX_INTERVAL_SECS       = 90, /* sec between sendPex() calls */
80
81    REQQ                    = 512,
82
83    METADATA_REQQ           = 64,
84
85    MAX_BLOCK_SIZE          = ( 1024 * 16 ),
86
87    /* used in lowering the outMessages queue period */
88    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
89    HIGH_PRIORITY_INTERVAL_SECS = 2,
90    LOW_PRIORITY_INTERVAL_SECS = 10,
91
92    /* number of pieces to remove from the bitfield when
93     * lazy bitfields are turned on */
94    LAZY_PIECE_COUNT = 26,
95
96    /* number of pieces we'll allow in our fast set */
97    MAX_FAST_SET_SIZE = 3,
98
99    /* defined in BEP #9 */
100    METADATA_MSG_TYPE_REQUEST = 0,
101    METADATA_MSG_TYPE_DATA = 1,
102    METADATA_MSG_TYPE_REJECT = 2
103};
104
105enum
106{
107    AWAITING_BT_LENGTH,
108    AWAITING_BT_ID,
109    AWAITING_BT_MESSAGE,
110    AWAITING_BT_PIECE
111};
112
113/**
114***
115**/
116
117struct peer_request
118{
119    uint32_t    index;
120    uint32_t    offset;
121    uint32_t    length;
122};
123
124static uint32_t
125getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
126{
127    const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
128    const uint64_t blockPos = tor->blockSize * b;
129    assert( blockPos >= piecePos );
130    return (uint32_t)( blockPos - piecePos );
131}
132
133static void
134blockToReq( const tr_torrent     * tor,
135            tr_block_index_t       block,
136            struct peer_request  * setme )
137{
138    assert( setme != NULL );
139
140    setme->index = tr_torBlockPiece( tor, block );
141    setme->offset = getBlockOffsetInPiece( tor, block );
142    setme->length = tr_torBlockCountBytes( tor, block );
143}
144
145/**
146***
147**/
148
149/* this is raw, unchanged data from the peer regarding
150 * the current message that it's sending us. */
151struct tr_incoming
152{
153    uint8_t                id;
154    uint32_t               length; /* includes the +1 for id length */
155    struct peer_request    blockReq; /* metadata for incoming blocks */
156    struct evbuffer *      block; /* piece data for incoming blocks */
157};
158
159/**
160 * Low-level communication state information about a connected peer.
161 *
162 * This structure remembers the low-level protocol states that we're
163 * in with this peer, such as active requests, pex messages, and so on.
164 * Its fields are all private to peer-msgs.c.
165 *
166 * Data not directly involved with sending & receiving messages is
167 * stored in tr_peer, where it can be accessed by both peermsgs and
168 * the peer manager.
169 *
170 * @see struct peer_atom
171 * @see tr_peer
172 */
173struct tr_peermsgs
174{
175    tr_bool         peerSupportsPex;
176    tr_bool         peerSupportsMetadataXfer;
177    tr_bool         clientSentLtepHandshake;
178    tr_bool         peerSentLtepHandshake;
179
180    /*tr_bool         haveFastSet;*/
181
182    int             desiredRequestCount;
183
184    int             prefetchCount;
185
186    /* how long the outMessages batch should be allowed to grow before
187     * it's flushed -- some messages (like requests >:) should be sent
188     * very quickly; others aren't as urgent. */
189    int8_t          outMessagesBatchPeriod;
190
191    uint8_t         state;
192    uint8_t         ut_pex_id;
193    uint8_t         ut_metadata_id;
194    uint16_t        pexCount;
195    uint16_t        pexCount6;
196
197#if 0
198    size_t                 fastsetSize;
199    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
200#endif
201
202    tr_peer *              peer;
203
204    tr_torrent *           torrent;
205
206    tr_publisher           publisher;
207
208    struct evbuffer *      outMessages; /* all the non-piece messages */
209
210    struct peer_request    peerAskedFor[REQQ];
211
212    int                    peerAskedForMetadata[METADATA_REQQ];
213    int                    peerAskedForMetadataCount;
214
215    tr_pex               * pex;
216    tr_pex               * pex6;
217
218    /*time_t                 clientSentPexAt;*/
219    time_t                 clientSentAnythingAt;
220
221    /* when we started batching the outMessages */
222    time_t                outMessagesBatchedAt;
223
224    struct tr_incoming    incoming;
225
226    /* if the peer supports the Extension Protocol in BEP 10 and
227       supplied a reqq argument, it's stored here.  otherwise the
228       value is zero and should be ignored. */
229    int64_t               reqq;
230
231    struct event          pexTimer;
232};
233
234/**
235***
236**/
237
238#if 0
239static tr_bitfield*
240getHave( const struct tr_peermsgs * msgs )
241{
242    if( msgs->peer->have == NULL )
243        msgs->peer->have = tr_bitfieldNew( msgs->torrent->info.pieceCount );
244    return msgs->peer->have;
245}
246#endif
247
248static inline tr_session*
249getSession( struct tr_peermsgs * msgs )
250{
251    return msgs->torrent->session;
252}
253
254/**
255***
256**/
257
258static void
259myDebug( const char * file, int line,
260         const struct tr_peermsgs * msgs,
261         const char * fmt, ... )
262{
263    FILE * fp = tr_getLog( );
264
265    if( fp )
266    {
267        va_list           args;
268        char              timestr[64];
269        struct evbuffer * buf = evbuffer_new( );
270        char *            base = tr_basename( file );
271
272        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
273                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
274                             tr_torrentName( msgs->torrent ),
275                             tr_peerIoGetAddrStr( msgs->peer->io ),
276                             msgs->peer->client );
277        va_start( args, fmt );
278        evbuffer_add_vprintf( buf, fmt, args );
279        va_end( args );
280        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
281        /* FIXME(libevent2) tr_getLog() should return an fd, then use evbuffer_write() here */
282        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
283
284        tr_free( base );
285        evbuffer_free( buf );
286    }
287}
288
289#define dbgmsg( msgs, ... ) \
290    do { \
291        if( tr_deepLoggingIsActive( ) ) \
292            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
293    } while( 0 )
294
295/**
296***
297**/
298
299static void
300pokeBatchPeriod( tr_peermsgs * msgs,
301                 int           interval )
302{
303    if( msgs->outMessagesBatchPeriod > interval )
304    {
305        msgs->outMessagesBatchPeriod = interval;
306        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
307    }
308}
309
310static inline void
311dbgOutMessageLen( tr_peermsgs * msgs )
312{
313    dbgmsg( msgs, "outMessage size is now %zu", EVBUFFER_LENGTH( msgs->outMessages ) );
314}
315
316static void
317protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
318{
319    tr_peerIo       * io  = msgs->peer->io;
320    struct evbuffer * out = msgs->outMessages;
321
322    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
323
324    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
325    tr_peerIoWriteUint8 ( io, out, BT_FEXT_REJECT );
326    tr_peerIoWriteUint32( io, out, req->index );
327    tr_peerIoWriteUint32( io, out, req->offset );
328    tr_peerIoWriteUint32( io, out, req->length );
329
330    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
331    dbgOutMessageLen( msgs );
332}
333
334static void
335protocolSendRequest( tr_peermsgs               * msgs,
336                     const struct peer_request * req )
337{
338    tr_peerIo       * io  = msgs->peer->io;
339    struct evbuffer * out = msgs->outMessages;
340
341    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
342    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
343    tr_peerIoWriteUint32( io, out, req->index );
344    tr_peerIoWriteUint32( io, out, req->offset );
345    tr_peerIoWriteUint32( io, out, req->length );
346
347    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
348    dbgOutMessageLen( msgs );
349    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
350}
351
352static void
353protocolSendCancel( tr_peermsgs               * msgs,
354                    const struct peer_request * req )
355{
356    tr_peerIo       * io  = msgs->peer->io;
357    struct evbuffer * out = msgs->outMessages;
358
359    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
360    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
361    tr_peerIoWriteUint32( io, out, req->index );
362    tr_peerIoWriteUint32( io, out, req->offset );
363    tr_peerIoWriteUint32( io, out, req->length );
364
365    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
366    dbgOutMessageLen( msgs );
367    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
368}
369
370static void
371protocolSendPort(tr_peermsgs *msgs, uint16_t port)
372{
373    tr_peerIo       * io  = msgs->peer->io;
374    struct evbuffer * out = msgs->outMessages;
375
376    dbgmsg( msgs, "sending Port %u", port);
377    tr_peerIoWriteUint32( io, out, 3 );
378    tr_peerIoWriteUint8 ( io, out, BT_PORT );
379    tr_peerIoWriteUint16( io, out, port);
380}
381
382static void
383protocolSendHave( tr_peermsgs * msgs,
384                  uint32_t      index )
385{
386    tr_peerIo       * io  = msgs->peer->io;
387    struct evbuffer * out = msgs->outMessages;
388
389    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
390    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
391    tr_peerIoWriteUint32( io, out, index );
392
393    dbgmsg( msgs, "sending Have %u", index );
394    dbgOutMessageLen( msgs );
395    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
396}
397
398#if 0
399static void
400protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
401{
402    tr_peerIo       * io  = msgs->peer->io;
403    struct evbuffer * out = msgs->outMessages;
404
405    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
406
407    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
408    tr_peerIoWriteUint8 ( io, out, BT_FEXT_ALLOWED_FAST );
409    tr_peerIoWriteUint32( io, out, pieceIndex );
410
411    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
412    dbgOutMessageLen( msgs );
413}
414#endif
415
416static void
417protocolSendChoke( tr_peermsgs * msgs,
418                   int           choke )
419{
420    tr_peerIo       * io  = msgs->peer->io;
421    struct evbuffer * out = msgs->outMessages;
422
423    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
424    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
425
426    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
427    dbgOutMessageLen( msgs );
428    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
429}
430
431static void
432protocolSendHaveAll( tr_peermsgs * msgs )
433{
434    tr_peerIo       * io  = msgs->peer->io;
435    struct evbuffer * out = msgs->outMessages;
436
437    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
438
439    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
440    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_ALL );
441
442    dbgmsg( msgs, "sending HAVE_ALL..." );
443    dbgOutMessageLen( msgs );
444    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
445}
446
447static void
448protocolSendHaveNone( tr_peermsgs * msgs )
449{
450    tr_peerIo       * io  = msgs->peer->io;
451    struct evbuffer * out = msgs->outMessages;
452
453    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
454
455    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
456    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_NONE );
457
458    dbgmsg( msgs, "sending HAVE_NONE..." );
459    dbgOutMessageLen( msgs );
460    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
461}
462
463/**
464***  EVENTS
465**/
466
467static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0, 0, 0 };
468
469static void
470publish( tr_peermsgs * msgs, tr_peer_event * e )
471{
472    assert( msgs->peer );
473    assert( msgs->peer->msgs == msgs );
474
475    tr_publisherPublish( &msgs->publisher, msgs->peer, e );
476}
477
478static void
479fireError( tr_peermsgs * msgs, int err )
480{
481    tr_peer_event e = blankEvent;
482    e.eventType = TR_PEER_ERROR;
483    e.err = err;
484    publish( msgs, &e );
485}
486
487static void
488fireUploadOnly( tr_peermsgs * msgs, tr_bool uploadOnly )
489{
490    tr_peer_event e = blankEvent;
491    e.eventType = TR_PEER_UPLOAD_ONLY;
492    e.uploadOnly = uploadOnly;
493    publish( msgs, &e );
494}
495
496static void
497firePeerProgress( tr_peermsgs * msgs )
498{
499    tr_peer_event e = blankEvent;
500    e.eventType = TR_PEER_PEER_PROGRESS;
501    e.progress = msgs->peer->progress;
502    publish( msgs, &e );
503}
504
505static void
506fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
507{
508    tr_peer_event e = blankEvent;
509    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
510    e.pieceIndex = req->index;
511    e.offset = req->offset;
512    e.length = req->length;
513    publish( msgs, &e );
514}
515
516static void
517fireGotRej( tr_peermsgs * msgs, const struct peer_request * req )
518{
519    tr_peer_event e = blankEvent;
520    e.eventType = TR_PEER_CLIENT_GOT_REJ;
521    e.pieceIndex = req->index;
522    e.offset = req->offset;
523    e.length = req->length;
524    publish( msgs, &e );
525}
526
527static void
528fireGotChoke( tr_peermsgs * msgs )
529{
530    tr_peer_event e = blankEvent;
531    e.eventType = TR_PEER_CLIENT_GOT_CHOKE;
532    publish( msgs, &e );
533}
534
535static void
536fireClientGotData( tr_peermsgs * msgs,
537                   uint32_t      length,
538                   int           wasPieceData )
539{
540    tr_peer_event e = blankEvent;
541
542    e.length = length;
543    e.eventType = TR_PEER_CLIENT_GOT_DATA;
544    e.wasPieceData = wasPieceData;
545    publish( msgs, &e );
546}
547
548static void
549fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
550{
551    tr_peer_event e = blankEvent;
552    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
553    e.pieceIndex = pieceIndex;
554    publish( msgs, &e );
555}
556
557static void
558fireClientGotPort( tr_peermsgs * msgs, tr_port port )
559{
560    tr_peer_event e = blankEvent;
561    e.eventType = TR_PEER_CLIENT_GOT_PORT;
562    e.port = port;
563    publish( msgs, &e );
564}
565
566static void
567fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
568{
569    tr_peer_event e = blankEvent;
570    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
571    e.pieceIndex = pieceIndex;
572    publish( msgs, &e );
573}
574
575static void
576firePeerGotData( tr_peermsgs  * msgs,
577                 uint32_t       length,
578                 int            wasPieceData )
579{
580    tr_peer_event e = blankEvent;
581
582    e.length = length;
583    e.eventType = TR_PEER_PEER_GOT_DATA;
584    e.wasPieceData = wasPieceData;
585
586    publish( msgs, &e );
587}
588
589/**
590***  ALLOWED FAST SET
591***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
592**/
593
594size_t
595tr_generateAllowedSet( tr_piece_index_t * setmePieces,
596                       size_t             desiredSetSize,
597                       size_t             pieceCount,
598                       const uint8_t    * infohash,
599                       const tr_address * addr )
600{
601    size_t setSize = 0;
602
603    assert( setmePieces );
604    assert( desiredSetSize <= pieceCount );
605    assert( desiredSetSize );
606    assert( pieceCount );
607    assert( infohash );
608    assert( addr );
609
610    if( addr->type == TR_AF_INET )
611    {
612        uint8_t w[SHA_DIGEST_LENGTH + 4], *walk=w;
613        uint8_t x[SHA_DIGEST_LENGTH];
614
615        uint32_t ui32 = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
616        memcpy( w, &ui32, sizeof( uint32_t ) );
617        walk += sizeof( uint32_t );
618        memcpy( walk, infohash, SHA_DIGEST_LENGTH );                 /* (2) */
619        walk += SHA_DIGEST_LENGTH;
620        tr_sha1( x, w, walk-w, NULL );                               /* (3) */
621        assert( sizeof( w ) == walk-w );
622
623        while( setSize<desiredSetSize )
624        {
625            int i;
626            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
627            {
628                size_t k;
629                uint32_t j = i * 4;                                  /* (5) */
630                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
631                uint32_t index = y % pieceCount;                     /* (7) */
632
633                for( k=0; k<setSize; ++k )                           /* (8) */
634                    if( setmePieces[k] == index )
635                        break;
636
637                if( k == setSize )
638                    setmePieces[setSize++] = index;                  /* (9) */
639            }
640
641            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
642        }
643    }
644
645    return setSize;
646}
647
648static void
649updateFastSet( tr_peermsgs * msgs UNUSED )
650{
651#if 0
652    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
653    const int peerIsNeedy = msgs->peer->progress < 0.10;
654
655    if( fext && peerIsNeedy && !msgs->haveFastSet )
656    {
657        size_t i;
658        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
659        const tr_info * inf = &msgs->torrent->info;
660        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
661
662        /* build the fast set */
663        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
664        msgs->haveFastSet = 1;
665
666        /* send it to the peer */
667        for( i=0; i<msgs->fastsetSize; ++i )
668            protocolSendAllowedFast( msgs, msgs->fastset[i] );
669    }
670#endif
671}
672
673/**
674***  INTEREST
675**/
676
677static void
678sendInterest( tr_peermsgs * msgs, tr_bool clientIsInterested )
679{
680    struct evbuffer * out = msgs->outMessages;
681
682    assert( msgs );
683    assert( tr_isBool( clientIsInterested ) );
684
685    msgs->peer->clientIsInterested = clientIsInterested;
686    dbgmsg( msgs, "Sending %s", clientIsInterested ? "Interested" : "Not Interested" );
687    tr_peerIoWriteUint32( msgs->peer->io, out, sizeof( uint8_t ) );
688    tr_peerIoWriteUint8 ( msgs->peer->io, out, clientIsInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
689
690    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
691    dbgOutMessageLen( msgs );
692}
693
694static void
695updateInterest( tr_peermsgs * msgs UNUSED )
696{
697    /* FIXME -- might need to poke the mgr on startup */
698}
699
700void
701tr_peerMsgsSetInterested( tr_peermsgs * msgs, int isInterested )
702{
703    assert( tr_isBool( isInterested ) );
704
705    if( isInterested != msgs->peer->clientIsInterested )
706        sendInterest( msgs, isInterested );
707}
708
709static tr_bool
710popNextMetadataRequest( tr_peermsgs * msgs, int * piece )
711{
712    if( msgs->peerAskedForMetadataCount == 0 )
713        return FALSE;
714
715    *piece = msgs->peerAskedForMetadata[0];
716
717    tr_removeElementFromArray( msgs->peerAskedForMetadata, 0, sizeof( int ),
718                               msgs->peerAskedForMetadataCount-- );
719
720    return TRUE;
721}
722
723static tr_bool
724popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
725{
726    if( msgs->peer->pendingReqsToClient == 0 )
727        return FALSE;
728
729    *setme = msgs->peerAskedFor[0];
730
731    tr_removeElementFromArray( msgs->peerAskedFor, 0, sizeof( struct peer_request ),
732                               msgs->peer->pendingReqsToClient-- );
733
734    return TRUE;
735}
736
737static void
738cancelAllRequestsToClient( tr_peermsgs * msgs )
739{
740    struct peer_request req;
741    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
742
743    while( popNextRequest( msgs, &req ))
744        if( mustSendCancel )
745            protocolSendReject( msgs, &req );
746}
747
748void
749tr_peerMsgsSetChoke( tr_peermsgs * msgs,
750                     int           choke )
751{
752    const time_t now = tr_time( );
753    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
754
755    assert( msgs );
756    assert( msgs->peer );
757    assert( choke == 0 || choke == 1 );
758
759    if( msgs->peer->chokeChangedAt > fibrillationTime )
760    {
761        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
762    }
763    else if( msgs->peer->peerIsChoked != choke )
764    {
765        msgs->peer->peerIsChoked = choke;
766        if( choke )
767            cancelAllRequestsToClient( msgs );
768        protocolSendChoke( msgs, choke );
769        msgs->peer->chokeChangedAt = now;
770    }
771}
772
773/**
774***
775**/
776
777void
778tr_peerMsgsHave( tr_peermsgs * msgs,
779                 uint32_t      index )
780{
781    protocolSendHave( msgs, index );
782
783    /* since we have more pieces now, we might not be interested in this peer */
784    updateInterest( msgs );
785}
786
787/**
788***
789**/
790
791static tr_bool
792reqIsValid( const tr_peermsgs * peer,
793            uint32_t            index,
794            uint32_t            offset,
795            uint32_t            length )
796{
797    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
798}
799
800static tr_bool
801requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
802{
803    return reqIsValid( msgs, req->index, req->offset, req->length );
804}
805
806void
807tr_peerMsgsCancel( tr_peermsgs * msgs, tr_block_index_t block )
808{
809    struct peer_request req;
810/*fprintf( stderr, "SENDING CANCEL MESSAGE FOR BLOCK %zu\n\t\tFROM PEER %p ------------------------------------\n", (size_t)block, msgs->peer );*/
811    blockToReq( msgs->torrent, block, &req );
812    protocolSendCancel( msgs, &req );
813}
814
815/**
816***
817**/
818
819static void
820sendLtepHandshake( tr_peermsgs * msgs )
821{
822    tr_benc val, *m;
823    char * buf;
824    int len;
825    tr_bool allow_pex;
826    tr_bool allow_metadata_xfer;
827    struct evbuffer * out = msgs->outMessages;
828    const unsigned char * ipv6 = tr_globalIPv6();
829
830    if( msgs->clientSentLtepHandshake )
831        return;
832
833    dbgmsg( msgs, "sending an ltep handshake" );
834    msgs->clientSentLtepHandshake = 1;
835
836    /* decide if we want to advertise metadata xfer support (BEP 9) */
837    if( tr_torrentIsPrivate( msgs->torrent ) )
838        allow_metadata_xfer = 0;
839    else
840        allow_metadata_xfer = 1;
841
842    /* decide if we want to advertise pex support */
843    if( !tr_torrentAllowsPex( msgs->torrent ) )
844        allow_pex = 0;
845    else if( msgs->peerSentLtepHandshake )
846        allow_pex = msgs->peerSupportsPex ? 1 : 0;
847    else
848        allow_pex = 1;
849
850    tr_bencInitDict( &val, 8 );
851    tr_bencDictAddInt( &val, "e", getSession(msgs)->encryptionMode != TR_CLEAR_PREFERRED );
852    if( ipv6 != NULL )
853        tr_bencDictAddRaw( &val, "ipv6", ipv6, 16 );
854    if( allow_metadata_xfer && tr_torrentHasMetadata( msgs->torrent )
855                            && ( msgs->torrent->infoDictLength > 0 ) )
856        tr_bencDictAddInt( &val, "metadata_size", msgs->torrent->infoDictLength );
857    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( getSession(msgs) ) );
858    tr_bencDictAddInt( &val, "reqq", REQQ );
859    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
860    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
861    m  = tr_bencDictAddDict( &val, "m", 2 );
862    if( allow_metadata_xfer )
863        tr_bencDictAddInt( m, "ut_metadata", UT_METADATA_ID );
864    if( allow_pex )
865        tr_bencDictAddInt( m, "ut_pex", UT_PEX_ID );
866
867    buf = tr_bencToStr( &val, TR_FMT_BENC, &len );
868
869    tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + len );
870    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
871    tr_peerIoWriteUint8 ( msgs->peer->io, out, LTEP_HANDSHAKE );
872    tr_peerIoWriteBytes ( msgs->peer->io, out, buf, len );
873    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
874    dbgOutMessageLen( msgs );
875
876    /* cleanup */
877    tr_bencFree( &val );
878    tr_free( buf );
879}
880
881static void
882parseLtepHandshake( tr_peermsgs *     msgs,
883                    int               len,
884                    struct evbuffer * inbuf )
885{
886    int64_t   i;
887    tr_benc   val, * sub;
888    uint8_t * tmp = tr_new( uint8_t, len );
889    const uint8_t *addr;
890    size_t addr_len;
891    tr_pex pex;
892
893    memset( &pex, 0, sizeof( tr_pex ) );
894
895    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
896    msgs->peerSentLtepHandshake = 1;
897
898    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
899    {
900        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
901        tr_free( tmp );
902        return;
903    }
904
905    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
906
907    /* does the peer prefer encrypted connections? */
908    if( tr_bencDictFindInt( &val, "e", &i ) ) {
909        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
910                                              : ENCRYPTION_PREFERENCE_NO;
911        if( i )
912            pex.flags |= ADDED_F_ENCRYPTION_FLAG;
913    }
914
915    /* check supported messages for utorrent pex */
916    msgs->peerSupportsPex = 0;
917    msgs->peerSupportsMetadataXfer = 0;
918
919    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
920        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
921            msgs->peerSupportsPex = i != 0;
922            msgs->ut_pex_id = (uint8_t) i;
923            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
924        }
925        if( tr_bencDictFindInt( sub, "ut_metadata", &i ) ) {
926            msgs->peerSupportsMetadataXfer = i != 0;
927            msgs->ut_metadata_id = (uint8_t) i;
928            dbgmsg( msgs, "msgs->ut_metadata_id is %d", (int)msgs->ut_metadata_id );
929        }
930    }
931
932    /* look for metainfo size (BEP 9) */
933    if( tr_bencDictFindInt( &val, "metadata_size", &i ) )
934        tr_torrentSetMetadataSizeHint( msgs->torrent, i );
935
936    /* look for upload_only (BEP 21) */
937    if( tr_bencDictFindInt( &val, "upload_only", &i ) ) {
938        fireUploadOnly( msgs, i!=0 );
939        if( i )
940            pex.flags |= ADDED_F_SEED_FLAG;
941    }
942
943    /* get peer's listening port */
944    if( tr_bencDictFindInt( &val, "p", &i ) ) {
945        pex.port = htons( (uint16_t)i );
946        fireClientGotPort( msgs, pex.port );
947        dbgmsg( msgs, "peer's port is now %d", (int)i );
948    }
949
950    if( tr_peerIoIsIncoming( msgs->peer->io )
951        && tr_bencDictFindRaw( &val, "ipv4", &addr, &addr_len )
952        && ( addr_len == 4 ) )
953    {
954        pex.addr.type = TR_AF_INET;
955        memcpy( &pex.addr.addr.addr4, addr, 4 );
956        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex );
957    }
958
959    if( tr_peerIoIsIncoming( msgs->peer->io )
960        && tr_bencDictFindRaw( &val, "ipv6", &addr, &addr_len )
961        && ( addr_len == 16 ) )
962    {
963        pex.addr.type = TR_AF_INET6;
964        memcpy( &pex.addr.addr.addr6, addr, 16 );
965        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex );
966    }
967
968    /* get peer's maximum request queue size */
969    if( tr_bencDictFindInt( &val, "reqq", &i ) )
970        msgs->reqq = i;
971
972    tr_bencFree( &val );
973    tr_free( tmp );
974}
975
976static void
977parseUtMetadata( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
978{
979    tr_benc dict;
980    char * msg_end;
981    char * benc_end;
982    int64_t msg_type = -1;
983    int64_t piece = -1;
984    int64_t total_size = 0;
985    uint8_t * tmp = tr_new( uint8_t, msglen );
986
987    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
988    msg_end = (char*)tmp + msglen;
989
990    if( !tr_bencLoad( tmp, msglen, &dict, &benc_end ) )
991    {
992        tr_bencDictFindInt( &dict, "msg_type", &msg_type );
993        tr_bencDictFindInt( &dict, "piece", &piece );
994        tr_bencDictFindInt( &dict, "total_size", &total_size );
995        tr_bencFree( &dict );
996    }
997
998    dbgmsg( msgs, "got ut_metadata msg: type %d, piece %d, total_size %d",
999            (int)msg_type, (int)piece, (int)total_size );
1000
1001    if( msg_type == METADATA_MSG_TYPE_REJECT )
1002    {
1003        /* NOOP */
1004    }
1005
1006    if( ( msg_type == METADATA_MSG_TYPE_DATA )
1007        && ( !tr_torrentHasMetadata( msgs->torrent ) )
1008        && ( msg_end - benc_end <= METADATA_PIECE_SIZE )
1009        && ( piece * METADATA_PIECE_SIZE + (msg_end - benc_end) <= total_size ) )
1010    {
1011        const int pieceLen = msg_end - benc_end;
1012        tr_torrentSetMetadataPiece( msgs->torrent, piece, benc_end, pieceLen );
1013    }
1014
1015    if( msg_type == METADATA_MSG_TYPE_REQUEST )
1016    {
1017        if( ( piece >= 0 )
1018            && tr_torrentHasMetadata( msgs->torrent )
1019            && !tr_torrentIsPrivate( msgs->torrent )
1020            && ( msgs->peerAskedForMetadataCount < METADATA_REQQ ) )
1021        {
1022            msgs->peerAskedForMetadata[msgs->peerAskedForMetadataCount++] = piece;
1023        }
1024        else
1025        {
1026            tr_benc tmp;
1027            int payloadLen;
1028            char * payload;
1029            tr_peerIo  * io  = msgs->peer->io;
1030            struct evbuffer * out = msgs->outMessages;
1031
1032            /* build the rejection message */
1033            tr_bencInitDict( &tmp, 2 );
1034            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1035            tr_bencDictAddInt( &tmp, "piece", piece );
1036            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1037            tr_bencFree( &tmp );
1038
1039            /* write it out as a LTEP message to our outMessages buffer */
1040            tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + payloadLen );
1041            tr_peerIoWriteUint8 ( io, out, BT_LTEP );
1042            tr_peerIoWriteUint8 ( io, out, msgs->ut_metadata_id );
1043            tr_peerIoWriteBytes ( io, out, payload, payloadLen );
1044            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1045            dbgOutMessageLen( msgs );
1046
1047            tr_free( payload );
1048        }
1049    }
1050
1051    tr_free( tmp );
1052}
1053
1054static void
1055parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1056{
1057    int loaded = 0;
1058    uint8_t * tmp = tr_new( uint8_t, msglen );
1059    tr_benc val;
1060    tr_torrent * tor = msgs->torrent;
1061    const uint8_t * added;
1062    size_t added_len;
1063
1064    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1065
1066    if( tr_torrentAllowsPex( tor )
1067      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1068    {
1069        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1070        {
1071            tr_pex * pex;
1072            size_t i, n;
1073            size_t added_f_len = 0;
1074            const uint8_t * added_f = NULL;
1075
1076            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1077            pex = tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len, &n );
1078
1079            n = MIN( n, MAX_PEX_PEER_COUNT );
1080            for( i=0; i<n; ++i )
1081                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex + i );
1082
1083            tr_free( pex );
1084        }
1085
1086        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1087        {
1088            tr_pex * pex;
1089            size_t i, n;
1090            size_t added_f_len = 0;
1091            const uint8_t * added_f = NULL;
1092
1093            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1094            pex = tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len, &n );
1095
1096            n = MIN( n, MAX_PEX_PEER_COUNT );
1097            for( i=0; i<n; ++i )
1098                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex + i );
1099
1100            tr_free( pex );
1101        }
1102    }
1103
1104    if( loaded )
1105        tr_bencFree( &val );
1106    tr_free( tmp );
1107}
1108
1109static void sendPex( tr_peermsgs * msgs );
1110
1111static void
1112parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer  * inbuf )
1113{
1114    uint8_t ltep_msgid;
1115
1116    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1117    msglen--;
1118
1119    if( ltep_msgid == LTEP_HANDSHAKE )
1120    {
1121        dbgmsg( msgs, "got ltep handshake" );
1122        parseLtepHandshake( msgs, msglen, inbuf );
1123        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1124        {
1125            sendLtepHandshake( msgs );
1126            sendPex( msgs );
1127        }
1128    }
1129    else if( ltep_msgid == UT_PEX_ID )
1130    {
1131        dbgmsg( msgs, "got ut pex" );
1132        msgs->peerSupportsPex = 1;
1133        parseUtPex( msgs, msglen, inbuf );
1134    }
1135    else if( ltep_msgid == UT_METADATA_ID )
1136    {
1137        dbgmsg( msgs, "got ut metadata" );
1138        msgs->peerSupportsMetadataXfer = 1;
1139        parseUtMetadata( msgs, msglen, inbuf );
1140    }
1141    else
1142    {
1143        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1144        evbuffer_drain( inbuf, msglen );
1145    }
1146}
1147
1148static int
1149readBtLength( tr_peermsgs *     msgs,
1150              struct evbuffer * inbuf,
1151              size_t            inlen )
1152{
1153    uint32_t len;
1154
1155    if( inlen < sizeof( len ) )
1156        return READ_LATER;
1157
1158    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1159
1160    if( len == 0 ) /* peer sent us a keepalive message */
1161        dbgmsg( msgs, "got KeepAlive" );
1162    else
1163    {
1164        msgs->incoming.length = len;
1165        msgs->state = AWAITING_BT_ID;
1166    }
1167
1168    return READ_NOW;
1169}
1170
1171static int readBtMessage( tr_peermsgs     * msgs,
1172                          struct evbuffer * inbuf,
1173                          size_t            inlen );
1174
1175static int
1176readBtId( tr_peermsgs * msgs, struct evbuffer  * inbuf, size_t inlen )
1177{
1178    uint8_t id;
1179
1180    if( inlen < sizeof( uint8_t ) )
1181        return READ_LATER;
1182
1183    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1184    msgs->incoming.id = id;
1185    dbgmsg( msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length );
1186
1187    if( id == BT_PIECE )
1188    {
1189        msgs->state = AWAITING_BT_PIECE;
1190        return READ_NOW;
1191    }
1192    else if( msgs->incoming.length != 1 )
1193    {
1194        msgs->state = AWAITING_BT_MESSAGE;
1195        return READ_NOW;
1196    }
1197    else return readBtMessage( msgs, inbuf, inlen - 1 );
1198}
1199
1200static void
1201updatePeerProgress( tr_peermsgs * msgs )
1202{
1203    msgs->peer->progress = tr_bitsetPercent( &msgs->peer->have );
1204    dbgmsg( msgs, "peer progress is %f", msgs->peer->progress );
1205    updateFastSet( msgs );
1206    updateInterest( msgs );
1207    firePeerProgress( msgs );
1208}
1209
1210static void
1211peerMadeRequest( tr_peermsgs *               msgs,
1212                 const struct peer_request * req )
1213{
1214    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1215    const int reqIsValid = requestIsValid( msgs, req );
1216    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( &msgs->torrent->completion, req->index );
1217    const int peerIsChoked = msgs->peer->peerIsChoked;
1218
1219    int allow = FALSE;
1220
1221    if( !reqIsValid )
1222        dbgmsg( msgs, "rejecting an invalid request." );
1223    else if( !clientHasPiece )
1224        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1225    else if( peerIsChoked )
1226        dbgmsg( msgs, "rejecting request from choked peer" );
1227    else if( msgs->peer->pendingReqsToClient + 1 >= REQQ )
1228        dbgmsg( msgs, "rejecting request ... reqq is full" );
1229    else
1230        allow = TRUE;
1231
1232    if( allow )
1233        msgs->peerAskedFor[msgs->peer->pendingReqsToClient++] = *req;
1234    else if( fext )
1235        protocolSendReject( msgs, req );
1236}
1237
1238static tr_bool
1239messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1240{
1241    switch( id )
1242    {
1243        case BT_CHOKE:
1244        case BT_UNCHOKE:
1245        case BT_INTERESTED:
1246        case BT_NOT_INTERESTED:
1247        case BT_FEXT_HAVE_ALL:
1248        case BT_FEXT_HAVE_NONE:
1249            return len == 1;
1250
1251        case BT_HAVE:
1252        case BT_FEXT_SUGGEST:
1253        case BT_FEXT_ALLOWED_FAST:
1254            return len == 5;
1255
1256        case BT_BITFIELD:
1257            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1258
1259        case BT_REQUEST:
1260        case BT_CANCEL:
1261        case BT_FEXT_REJECT:
1262            return len == 13;
1263
1264        case BT_PIECE:
1265            return len > 9 && len <= 16393;
1266
1267        case BT_PORT:
1268            return len == 3;
1269
1270        case BT_LTEP:
1271            return len >= 2;
1272
1273        default:
1274            return FALSE;
1275    }
1276}
1277
1278static int clientGotBlock( tr_peermsgs *               msgs,
1279                           const uint8_t *             block,
1280                           const struct peer_request * req );
1281
1282static int
1283readBtPiece( tr_peermsgs      * msgs,
1284             struct evbuffer  * inbuf,
1285             size_t             inlen,
1286             size_t           * setme_piece_bytes_read )
1287{
1288    struct peer_request * req = &msgs->incoming.blockReq;
1289
1290    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1291    dbgmsg( msgs, "In readBtPiece" );
1292
1293    if( !req->length )
1294    {
1295        if( inlen < 8 )
1296            return READ_LATER;
1297
1298        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1299        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1300        req->length = msgs->incoming.length - 9;
1301        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1302        return READ_NOW;
1303    }
1304    else
1305    {
1306        int err;
1307
1308        /* read in another chunk of data */
1309        const size_t nLeft = req->length - EVBUFFER_LENGTH( msgs->incoming.block );
1310        size_t n = MIN( nLeft, inlen );
1311        size_t i = n;
1312        void * buf = tr_sessionGetBuffer( getSession( msgs ) );
1313        const size_t buflen = SESSION_BUFFER_SIZE;
1314
1315        while( i > 0 )
1316        {
1317            const size_t thisPass = MIN( i, buflen );
1318            tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, thisPass );
1319            evbuffer_add( msgs->incoming.block, buf, thisPass );
1320            i -= thisPass;
1321        }
1322
1323        tr_sessionReleaseBuffer( getSession( msgs ) );
1324        buf = NULL;
1325
1326        fireClientGotData( msgs, n, TRUE );
1327        *setme_piece_bytes_read += n;
1328        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1329               n, req->index, req->offset, req->length,
1330               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1331        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1332            return READ_LATER;
1333
1334        /* we've got the whole block ... process it */
1335        err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), req );
1336
1337        /* cleanup */
1338        evbuffer_free( msgs->incoming.block );
1339        msgs->incoming.block = evbuffer_new( );
1340        req->length = 0;
1341        msgs->state = AWAITING_BT_LENGTH;
1342        return err ? READ_ERR : READ_NOW;
1343    }
1344}
1345
1346static void updateDesiredRequestCount( tr_peermsgs * msgs, uint64_t now );
1347
1348static int
1349readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1350{
1351    uint32_t      ui32;
1352    uint32_t      msglen = msgs->incoming.length;
1353    const uint8_t id = msgs->incoming.id;
1354#ifndef NDEBUG
1355    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1356#endif
1357    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1358
1359    --msglen; /* id length */
1360
1361    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1362
1363    if( inlen < msglen )
1364        return READ_LATER;
1365
1366    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1367    {
1368        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1369        fireError( msgs, EMSGSIZE );
1370        return READ_ERR;
1371    }
1372
1373    switch( id )
1374    {
1375        case BT_CHOKE:
1376            dbgmsg( msgs, "got Choke" );
1377            msgs->peer->clientIsChoked = 1;
1378            if( !fext )
1379                fireGotChoke( msgs );
1380            break;
1381
1382        case BT_UNCHOKE:
1383            dbgmsg( msgs, "got Unchoke" );
1384            msgs->peer->clientIsChoked = 0;
1385            updateDesiredRequestCount( msgs, tr_date( ) );
1386            break;
1387
1388        case BT_INTERESTED:
1389            dbgmsg( msgs, "got Interested" );
1390            msgs->peer->peerIsInterested = 1;
1391            break;
1392
1393        case BT_NOT_INTERESTED:
1394            dbgmsg( msgs, "got Not Interested" );
1395            msgs->peer->peerIsInterested = 0;
1396            break;
1397
1398        case BT_HAVE:
1399            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1400            dbgmsg( msgs, "got Have: %u", ui32 );
1401            if( tr_torrentHasMetadata( msgs->torrent )
1402                    && ( ui32 >= msgs->torrent->info.pieceCount ) )
1403            {
1404                fireError( msgs, ERANGE );
1405                return READ_ERR;
1406            }
1407            if( tr_bitsetAdd( &msgs->peer->have, ui32 ) )
1408            {
1409                fireError( msgs, ERANGE );
1410                return READ_ERR;
1411            }
1412            updatePeerProgress( msgs );
1413            break;
1414
1415        case BT_BITFIELD: {
1416            const size_t bitCount = tr_torrentHasMetadata( msgs->torrent )
1417                                  ? msgs->torrent->info.pieceCount
1418                                  : msglen * 8;
1419            dbgmsg( msgs, "got a bitfield" );
1420            tr_bitsetReserve( &msgs->peer->have, bitCount );
1421            tr_peerIoReadBytes( msgs->peer->io, inbuf,
1422                                msgs->peer->have.bitfield.bits, msglen );
1423            updatePeerProgress( msgs );
1424            break;
1425        }
1426
1427        case BT_REQUEST:
1428        {
1429            struct peer_request r;
1430            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1431            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1432            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1433            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1434            peerMadeRequest( msgs, &r );
1435            break;
1436        }
1437
1438        case BT_CANCEL:
1439        {
1440            int i;
1441            struct peer_request r;
1442            const uint64_t now_msec = tr_date( );
1443            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1444            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1445            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1446            tr_historyAdd( msgs->torrent->blocksSentToClient, now_msec, 1 );
1447            tr_historyAdd( msgs->peer->cancelsSentToClient, now_msec, 1 );
1448            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1449
1450            for( i=0; i<msgs->peer->pendingReqsToClient; ++i ) {
1451                const struct peer_request * req = msgs->peerAskedFor + i;
1452                if( ( req->index == r.index ) && ( req->offset == r.offset ) && ( req->length == r.length ) )
1453                    break;
1454            }
1455
1456            if( i < msgs->peer->pendingReqsToClient )
1457                tr_removeElementFromArray( msgs->peerAskedFor, i, sizeof( struct peer_request ),
1458                                           msgs->peer->pendingReqsToClient-- );
1459            break;
1460        }
1461
1462        case BT_PIECE:
1463            assert( 0 ); /* handled elsewhere! */
1464            break;
1465
1466        case BT_PORT:
1467            dbgmsg( msgs, "Got a BT_PORT" );
1468            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->dht_port );
1469            if( msgs->peer->dht_port > 0 )
1470                tr_dhtAddNode( getSession(msgs),
1471                               tr_peerAddress( msgs->peer ),
1472                               msgs->peer->dht_port, 0 );
1473            break;
1474
1475        case BT_FEXT_SUGGEST:
1476            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1477            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1478            if( fext )
1479                fireClientGotSuggest( msgs, ui32 );
1480            else {
1481                fireError( msgs, EMSGSIZE );
1482                return READ_ERR;
1483            }
1484            break;
1485
1486        case BT_FEXT_ALLOWED_FAST:
1487            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1488            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1489            if( fext )
1490                fireClientGotAllowedFast( msgs, ui32 );
1491            else {
1492                fireError( msgs, EMSGSIZE );
1493                return READ_ERR;
1494            }
1495            break;
1496
1497        case BT_FEXT_HAVE_ALL:
1498            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1499            if( fext ) {
1500                tr_bitsetSetHaveAll( &msgs->peer->have );
1501                updatePeerProgress( msgs );
1502            } else {
1503                fireError( msgs, EMSGSIZE );
1504                return READ_ERR;
1505            }
1506            break;
1507
1508        case BT_FEXT_HAVE_NONE:
1509            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1510            if( fext ) {
1511                tr_bitsetSetHaveNone( &msgs->peer->have );
1512                updatePeerProgress( msgs );
1513            } else {
1514                fireError( msgs, EMSGSIZE );
1515                return READ_ERR;
1516            }
1517            break;
1518
1519        case BT_FEXT_REJECT:
1520        {
1521            struct peer_request r;
1522            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1523            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1524            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1525            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1526            if( fext )
1527                fireGotRej( msgs, &r );
1528            else {
1529                fireError( msgs, EMSGSIZE );
1530                return READ_ERR;
1531            }
1532            break;
1533        }
1534
1535        case BT_LTEP:
1536            dbgmsg( msgs, "Got a BT_LTEP" );
1537            parseLtep( msgs, msglen, inbuf );
1538            break;
1539
1540        default:
1541            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1542            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1543            break;
1544    }
1545
1546    assert( msglen + 1 == msgs->incoming.length );
1547    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1548
1549    msgs->state = AWAITING_BT_LENGTH;
1550    return READ_NOW;
1551}
1552
1553static void
1554addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1555{
1556    if( !msgs->peer->blame )
1557         msgs->peer->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1558    tr_bitfieldAdd( msgs->peer->blame, index );
1559}
1560
1561/* returns 0 on success, or an errno on failure */
1562static int
1563clientGotBlock( tr_peermsgs *               msgs,
1564                const uint8_t *             data,
1565                const struct peer_request * req )
1566{
1567    int err;
1568    tr_torrent * tor = msgs->torrent;
1569    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1570
1571    assert( msgs );
1572    assert( req );
1573
1574    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1575        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1576                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1577        return EMSGSIZE;
1578    }
1579
1580    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1581
1582    if( !tr_peerMgrDidPeerRequest( msgs->torrent, msgs->peer, block ) ) {
1583        dbgmsg( msgs, "we didn't ask for this message..." );
1584        return 0;
1585    }
1586    if( tr_cpPieceIsComplete( &msgs->torrent->completion, req->index ) ) {
1587        dbgmsg( msgs, "we did ask for this message, but the piece is already complete..." );
1588        return 0;
1589    }
1590
1591    /**
1592    ***  Save the block
1593    **/
1594
1595    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1596        return err;
1597
1598    addPeerToBlamefield( msgs, req->index );
1599    fireGotBlock( msgs, req );
1600    return 0;
1601}
1602
1603static int peerPulse( void * vmsgs );
1604
1605static void
1606didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1607{
1608    tr_peermsgs * msgs = vmsgs;
1609    firePeerGotData( msgs, bytesWritten, wasPieceData );
1610
1611    if ( tr_isPeerIo( io ) && io->userData )
1612        peerPulse( msgs );
1613}
1614
1615static ReadState
1616canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1617{
1618    ReadState         ret;
1619    tr_peermsgs *     msgs = vmsgs;
1620    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1621    const size_t      inlen = EVBUFFER_LENGTH( in );
1622
1623    dbgmsg( msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state );
1624
1625    if( !inlen )
1626    {
1627        ret = READ_LATER;
1628    }
1629    else if( msgs->state == AWAITING_BT_PIECE )
1630    {
1631        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1632    }
1633    else switch( msgs->state )
1634    {
1635        case AWAITING_BT_LENGTH:
1636            ret = readBtLength ( msgs, in, inlen ); break;
1637
1638        case AWAITING_BT_ID:
1639            ret = readBtId     ( msgs, in, inlen ); break;
1640
1641        case AWAITING_BT_MESSAGE:
1642            ret = readBtMessage( msgs, in, inlen ); break;
1643
1644        default:
1645            ret = READ_ERR;
1646            assert( 0 );
1647    }
1648
1649    dbgmsg( msgs, "canRead: ret is %d", (int)ret );
1650
1651    /* log the raw data that was read */
1652    if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
1653        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1654
1655    return ret;
1656}
1657
1658int
1659tr_peerMsgsIsReadingBlock( const tr_peermsgs * msgs, tr_block_index_t block )
1660{
1661    if( msgs->state != AWAITING_BT_PIECE )
1662        return FALSE;
1663
1664    return block == _tr_block( msgs->torrent,
1665                               msgs->incoming.blockReq.index,
1666                               msgs->incoming.blockReq.offset );
1667}
1668
1669/**
1670***
1671**/
1672
1673static void
1674updateDesiredRequestCount( tr_peermsgs * msgs, uint64_t now )
1675{
1676    const tr_torrent * const torrent = msgs->torrent;
1677
1678    if( tr_torrentIsSeed( msgs->torrent ) )
1679    {
1680        msgs->desiredRequestCount = 0;
1681    }
1682    else if( msgs->peer->clientIsChoked )
1683    {
1684        msgs->desiredRequestCount = 0;
1685    }
1686    else if( !msgs->peer->clientIsInterested )
1687    {
1688        msgs->desiredRequestCount = 0;
1689    }
1690    else
1691    {
1692        int irate;
1693        int estimatedBlocksInPeriod;
1694        double rate;
1695        const int floor = 4;
1696        const int seconds = REQUEST_BUF_SECS;
1697
1698        /* Get the rate limit we should use.
1699         * FIXME: this needs to consider all the other peers as well... */
1700        rate = tr_peerGetPieceSpeed( msgs->peer, now, TR_PEER_TO_CLIENT );
1701        if( tr_torrentUsesSpeedLimit( torrent, TR_PEER_TO_CLIENT ) )
1702            rate = MIN( rate, tr_torrentGetSpeedLimit( torrent, TR_PEER_TO_CLIENT ) );
1703
1704        /* honor the session limits, if enabled */
1705        if( tr_torrentUsesSessionLimits( torrent ) )
1706            if( tr_sessionGetActiveSpeedLimit( torrent->session, TR_PEER_TO_CLIENT, &irate ) )
1707                rate = MIN( rate, irate );
1708
1709        /* use this desired rate to figure out how
1710         * many requests we should send to this peer */
1711        estimatedBlocksInPeriod = ( rate * seconds * 1024 ) / torrent->blockSize;
1712        msgs->desiredRequestCount = MAX( floor, estimatedBlocksInPeriod );
1713
1714        /* honor the peer's maximum request count, if specified */
1715        if( msgs->reqq > 0 )
1716            if( msgs->desiredRequestCount > msgs->reqq )
1717                msgs->desiredRequestCount = msgs->reqq;
1718    }
1719}
1720
1721static void
1722updateMetadataRequests( tr_peermsgs * msgs, time_t now )
1723{
1724    int piece;
1725
1726    if( msgs->peerSupportsMetadataXfer
1727        && tr_torrentGetNextMetadataRequest( msgs->torrent, now, &piece ) )
1728    {
1729        tr_benc tmp;
1730        int payloadLen;
1731        char * payload;
1732        tr_peerIo  * io  = msgs->peer->io;
1733        struct evbuffer * out = msgs->outMessages;
1734
1735        /* build the data message */
1736        tr_bencInitDict( &tmp, 3 );
1737        tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REQUEST );
1738        tr_bencDictAddInt( &tmp, "piece", piece );
1739        payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1740        tr_bencFree( &tmp );
1741
1742        dbgmsg( msgs, "requesting metadata piece #%d", piece );
1743
1744        /* write it out as a LTEP message to our outMessages buffer */
1745        tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + payloadLen );
1746        tr_peerIoWriteUint8 ( io, out, BT_LTEP );
1747        tr_peerIoWriteUint8 ( io, out, msgs->ut_metadata_id );
1748        tr_peerIoWriteBytes ( io, out, payload, payloadLen );
1749        pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1750        dbgOutMessageLen( msgs );
1751
1752        tr_free( payload );
1753    }
1754}
1755
1756static void
1757updateBlockRequests( tr_peermsgs * msgs )
1758{
1759    if( tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT )
1760        && ( msgs->desiredRequestCount > 0 )
1761        && ( msgs->peer->pendingReqsToPeer <= ( msgs->desiredRequestCount * 0.66 ) ) )
1762    {
1763        int i;
1764        int n;
1765        const int numwant = msgs->desiredRequestCount - msgs->peer->pendingReqsToPeer;
1766        tr_block_index_t * blocks = tr_new( tr_block_index_t, numwant );
1767
1768        tr_peerMgrGetNextRequests( msgs->torrent, msgs->peer, numwant, blocks, &n );
1769
1770        for( i=0; i<n; ++i )
1771        {
1772            struct peer_request req;
1773            blockToReq( msgs->torrent, blocks[i], &req );
1774            protocolSendRequest( msgs, &req );
1775        }
1776
1777        tr_free( blocks );
1778    }
1779}
1780
1781static void
1782prefetchPieces( tr_peermsgs *msgs )
1783{
1784    int i;
1785
1786    /* Maintain 12 prefetched blocks per unchoked peer */
1787    for( i=msgs->prefetchCount; i<msgs->peer->pendingReqsToClient && i<12; ++i )
1788    {
1789        const struct peer_request * req = msgs->peerAskedFor + i;
1790        tr_ioPrefetch( msgs->torrent, req->index, req->offset, req->length );
1791        ++msgs->prefetchCount;
1792    }
1793}
1794
1795static size_t
1796fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1797{
1798    int piece;
1799    size_t bytesWritten = 0;
1800    struct peer_request req;
1801    const tr_bool haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1802    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1803
1804    /**
1805    ***  Protocol messages
1806    **/
1807
1808    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1809    {
1810        dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1811        msgs->outMessagesBatchedAt = now;
1812    }
1813    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1814    {
1815        const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
1816        /* flush the protocol messages */
1817        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1818        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, FALSE );
1819        msgs->clientSentAnythingAt = now;
1820        msgs->outMessagesBatchedAt = 0;
1821        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1822        bytesWritten +=  len;
1823    }
1824
1825    /**
1826    ***  Metadata Pieces
1827    **/
1828
1829    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= METADATA_PIECE_SIZE )
1830        && popNextMetadataRequest( msgs, &piece ) )
1831    {
1832        char * data;
1833        int dataLen;
1834        tr_bool ok = FALSE;
1835
1836        data = tr_torrentGetMetadataPiece( msgs->torrent, piece, &dataLen );
1837        if( ( dataLen > 0 ) && ( data != NULL ) )
1838        {
1839            tr_benc tmp;
1840            int payloadLen;
1841            char * payload;
1842            tr_peerIo  * io  = msgs->peer->io;
1843            struct evbuffer * out = msgs->outMessages;
1844
1845            /* build the data message */
1846            tr_bencInitDict( &tmp, 3 );
1847            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_DATA );
1848            tr_bencDictAddInt( &tmp, "piece", piece );
1849            tr_bencDictAddInt( &tmp, "total_size", msgs->torrent->infoDictLength );
1850            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1851            tr_bencFree( &tmp );
1852
1853            /* write it out as a LTEP message to our outMessages buffer */
1854            tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + payloadLen + dataLen );
1855            tr_peerIoWriteUint8 ( io, out, BT_LTEP );
1856            tr_peerIoWriteUint8 ( io, out, msgs->ut_metadata_id );
1857            tr_peerIoWriteBytes ( io, out, payload, payloadLen );
1858            tr_peerIoWriteBytes ( io, out, data, dataLen );
1859            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1860            dbgOutMessageLen( msgs );
1861
1862            tr_free( payload );
1863            tr_free( data );
1864
1865            ok = TRUE;
1866        }
1867
1868        if( !ok ) /* send a rejection message */
1869        {
1870            tr_benc tmp;
1871            int payloadLen;
1872            char * payload;
1873            tr_peerIo  * io  = msgs->peer->io;
1874            struct evbuffer * out = msgs->outMessages;
1875
1876            /* build the rejection message */
1877            tr_bencInitDict( &tmp, 2 );
1878            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1879            tr_bencDictAddInt( &tmp, "piece", piece );
1880            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1881            tr_bencFree( &tmp );
1882
1883            /* write it out as a LTEP message to our outMessages buffer */
1884            tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + payloadLen );
1885            tr_peerIoWriteUint8 ( io, out, BT_LTEP );
1886            tr_peerIoWriteUint8 ( io, out, msgs->ut_metadata_id );
1887            tr_peerIoWriteBytes ( io, out, payload, payloadLen );
1888            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1889            dbgOutMessageLen( msgs );
1890
1891            tr_free( payload );
1892        }
1893    }
1894
1895    /**
1896    ***  Data Blocks
1897    **/
1898
1899    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= msgs->torrent->blockSize )
1900        && popNextRequest( msgs, &req ) )
1901    {
1902        --msgs->prefetchCount;
1903
1904        if( requestIsValid( msgs, &req )
1905            && tr_cpPieceIsComplete( &msgs->torrent->completion, req.index ) )
1906        {
1907            /* FIXME(libevent2) use evbuffer_reserve_space() + evbuffer_commit_space() */
1908            int err;
1909            const uint32_t msglen = 4 + 1 + 4 + 4 + req.length;
1910            struct evbuffer * out;
1911            tr_peerIo * io = msgs->peer->io;
1912
1913            out = evbuffer_new( );
1914            evbuffer_expand( out, msglen );
1915
1916            tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1917            tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1918            tr_peerIoWriteUint32( io, out, req.index );
1919            tr_peerIoWriteUint32( io, out, req.offset );
1920
1921            err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, EVBUFFER_DATA(out)+EVBUFFER_LENGTH(out) );
1922            if( err )
1923            {
1924                if( fext )
1925                    protocolSendReject( msgs, &req );
1926            }
1927            else
1928            {
1929                const uint64_t now_msec = tr_date( );
1930                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1931                EVBUFFER_LENGTH(out) += req.length;
1932                assert( EVBUFFER_LENGTH( out ) == msglen );
1933                tr_peerIoWriteBuf( io, out, TRUE );
1934                bytesWritten += EVBUFFER_LENGTH( out );
1935                msgs->clientSentAnythingAt = now;
1936                tr_historyAdd( msgs->torrent->blocksSentToClient, now_msec, 1 );
1937                tr_historyAdd( msgs->peer->blocksSentToPeer, now_msec, 1 );
1938            }
1939
1940            evbuffer_free( out );
1941
1942            if( err )
1943            {
1944                bytesWritten = 0;
1945                msgs = NULL;
1946            }
1947        }
1948        else if( fext ) /* peer needs a reject message */
1949        {
1950            protocolSendReject( msgs, &req );
1951        }
1952
1953        if( msgs != NULL )
1954            prefetchPieces( msgs );
1955    }
1956
1957    /**
1958    ***  Keepalive
1959    **/
1960
1961    if( ( msgs != NULL )
1962        && ( msgs->clientSentAnythingAt != 0 )
1963        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1964    {
1965        dbgmsg( msgs, "sending a keepalive message" );
1966        tr_peerIoWriteUint32( msgs->peer->io, msgs->outMessages, 0 );
1967        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1968    }
1969
1970    return bytesWritten;
1971}
1972
1973static int
1974peerPulse( void * vmsgs )
1975{
1976    tr_peermsgs * msgs = vmsgs;
1977    const time_t  now = tr_time( );
1978
1979    if ( tr_isPeerIo( msgs->peer->io ) ) {
1980        updateDesiredRequestCount( msgs, tr_date( ) );
1981        updateBlockRequests( msgs );
1982        updateMetadataRequests( msgs, now );
1983    }
1984
1985    for( ;; )
1986        if( fillOutputBuffer( msgs, now ) < 1 )
1987            break;
1988
1989    return TRUE; /* loop forever */
1990}
1991
1992void
1993tr_peerMsgsPulse( tr_peermsgs * msgs )
1994{
1995    if( msgs != NULL )
1996        peerPulse( msgs );
1997}
1998
1999static void
2000gotError( tr_peerIo  * io UNUSED,
2001          short        what,
2002          void       * vmsgs )
2003{
2004    if( what & EVBUFFER_TIMEOUT )
2005        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
2006    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
2007        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
2008               what, errno, tr_strerror( errno ) );
2009    fireError( vmsgs, ENOTCONN );
2010}
2011
2012static void
2013sendBitfield( tr_peermsgs * msgs )
2014{
2015    struct evbuffer * out = msgs->outMessages;
2016    tr_bitfield *     field;
2017    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
2018    size_t            i;
2019    size_t            lazyCount = 0;
2020
2021    field = tr_bitfieldDup( tr_cpPieceBitfield( &msgs->torrent->completion ) );
2022
2023    if( tr_sessionIsLazyBitfieldEnabled( getSession( msgs ) ) )
2024    {
2025        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
2026            speed over a truly random sample -- let's limit the pool size to
2027            the first 1000 pieces so large torrents don't bog things down */
2028        size_t poolSize;
2029        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
2030        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
2031
2032        /* build the pool */
2033        for( i=poolSize=0; i<maxPoolSize; ++i )
2034            if( tr_bitfieldHas( field, i ) )
2035                pool[poolSize++] = i;
2036
2037        /* pull random piece indices from the pool */
2038        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
2039        {
2040            const int pos = tr_cryptoWeakRandInt( poolSize );
2041            const tr_piece_index_t piece = pool[pos];
2042            tr_bitfieldRem( field, piece );
2043            lazyPieces[lazyCount++] = piece;
2044            pool[pos] = pool[--poolSize];
2045        }
2046
2047        /* cleanup */
2048        tr_free( pool );
2049    }
2050
2051    tr_peerIoWriteUint32( msgs->peer->io, out,
2052                          sizeof( uint8_t ) + field->byteCount );
2053    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_BITFIELD );
2054    /* FIXME(libevent2): use evbuffer_add_reference() */
2055    tr_peerIoWriteBytes ( msgs->peer->io, out, field->bits, field->byteCount );
2056    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu",
2057            EVBUFFER_LENGTH( out ) );
2058    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
2059
2060    for( i = 0; i < lazyCount; ++i )
2061        protocolSendHave( msgs, lazyPieces[i] );
2062
2063    tr_bitfieldFree( field );
2064}
2065
2066static void
2067tellPeerWhatWeHave( tr_peermsgs * msgs )
2068{
2069    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
2070
2071    if( fext && ( tr_cpGetStatus( &msgs->torrent->completion ) == TR_SEED ) )
2072    {
2073        protocolSendHaveAll( msgs );
2074    }
2075    else if( fext && ( tr_cpHaveValid( &msgs->torrent->completion ) == 0 ) )
2076    {
2077        protocolSendHaveNone( msgs );
2078    }
2079    else
2080    {
2081        sendBitfield( msgs );
2082    }
2083}
2084
2085/**
2086***
2087**/
2088
2089/* some peers give us error messages if we send
2090   more than this many peers in a single pex message
2091   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
2092#define MAX_PEX_ADDED 50
2093#define MAX_PEX_DROPPED 50
2094
2095typedef struct
2096{
2097    tr_pex *  added;
2098    tr_pex *  dropped;
2099    tr_pex *  elements;
2100    int       addedCount;
2101    int       droppedCount;
2102    int       elementCount;
2103}
2104PexDiffs;
2105
2106static void
2107pexAddedCb( void * vpex,
2108            void * userData )
2109{
2110    PexDiffs * diffs = userData;
2111    tr_pex *   pex = vpex;
2112
2113    if( diffs->addedCount < MAX_PEX_ADDED )
2114    {
2115        diffs->added[diffs->addedCount++] = *pex;
2116        diffs->elements[diffs->elementCount++] = *pex;
2117    }
2118}
2119
2120static inline void
2121pexDroppedCb( void * vpex,
2122              void * userData )
2123{
2124    PexDiffs * diffs = userData;
2125    tr_pex *   pex = vpex;
2126
2127    if( diffs->droppedCount < MAX_PEX_DROPPED )
2128    {
2129        diffs->dropped[diffs->droppedCount++] = *pex;
2130    }
2131}
2132
2133static inline void
2134pexElementCb( void * vpex,
2135              void * userData )
2136{
2137    PexDiffs * diffs = userData;
2138    tr_pex * pex = vpex;
2139
2140    diffs->elements[diffs->elementCount++] = *pex;
2141}
2142
2143static void
2144sendPex( tr_peermsgs * msgs )
2145{
2146    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2147    {
2148        PexDiffs diffs;
2149        PexDiffs diffs6;
2150        tr_pex * newPex = NULL;
2151        tr_pex * newPex6 = NULL;
2152        const int newCount = tr_peerMgrGetPeers( msgs->torrent, &newPex, TR_AF_INET, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2153        const int newCount6 = tr_peerMgrGetPeers( msgs->torrent, &newPex6, TR_AF_INET6, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2154
2155        /* build the diffs */
2156        diffs.added = tr_new( tr_pex, newCount );
2157        diffs.addedCount = 0;
2158        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2159        diffs.droppedCount = 0;
2160        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2161        diffs.elementCount = 0;
2162        tr_set_compare( msgs->pex, msgs->pexCount,
2163                        newPex, newCount,
2164                        tr_pexCompare, sizeof( tr_pex ),
2165                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2166        diffs6.added = tr_new( tr_pex, newCount6 );
2167        diffs6.addedCount = 0;
2168        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
2169        diffs6.droppedCount = 0;
2170        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
2171        diffs6.elementCount = 0;
2172        tr_set_compare( msgs->pex6, msgs->pexCount6,
2173                        newPex6, newCount6,
2174                        tr_pexCompare, sizeof( tr_pex ),
2175                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
2176        dbgmsg(
2177            msgs,
2178            "pex: old peer count %d+%d, new peer count %d+%d, "
2179            "added %d+%d, removed %d+%d",
2180            msgs->pexCount, msgs->pexCount6, newCount, newCount6,
2181            diffs.addedCount, diffs6.addedCount,
2182            diffs.droppedCount, diffs6.droppedCount );
2183
2184        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2185            !diffs6.droppedCount )
2186        {
2187            tr_free( diffs.elements );
2188            tr_free( diffs6.elements );
2189        }
2190        else
2191        {
2192            int  i;
2193            tr_benc val;
2194            char * benc;
2195            int bencLen;
2196            uint8_t * tmp, *walk;
2197            tr_peerIo       * io  = msgs->peer->io;
2198            struct evbuffer * out = msgs->outMessages;
2199
2200            /* update peer */
2201            tr_free( msgs->pex );
2202            msgs->pex = diffs.elements;
2203            msgs->pexCount = diffs.elementCount;
2204            tr_free( msgs->pex6 );
2205            msgs->pex6 = diffs6.elements;
2206            msgs->pexCount6 = diffs6.elementCount;
2207
2208            /* build the pex payload */
2209            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
2210                                         * speed vs. likelihood? */
2211
2212            if( diffs.addedCount > 0)
2213            {
2214                /* "added" */
2215                tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2216                for( i = 0; i < diffs.addedCount; ++i ) {
2217                    memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2218                    memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2219                }
2220                assert( ( walk - tmp ) == diffs.addedCount * 6 );
2221                tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2222                tr_free( tmp );
2223
2224                /* "added.f" */
2225                tmp = walk = tr_new( uint8_t, diffs.addedCount );
2226                for( i = 0; i < diffs.addedCount; ++i )
2227                    *walk++ = diffs.added[i].flags;
2228                assert( ( walk - tmp ) == diffs.addedCount );
2229                tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2230                tr_free( tmp );
2231            }
2232
2233            if( diffs.droppedCount > 0 )
2234            {
2235                /* "dropped" */
2236                tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2237                for( i = 0; i < diffs.droppedCount; ++i ) {
2238                    memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2239                    memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2240                }
2241                assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2242                tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2243                tr_free( tmp );
2244            }
2245
2246            if( diffs6.addedCount > 0 )
2247            {
2248                /* "added6" */
2249                tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2250                for( i = 0; i < diffs6.addedCount; ++i ) {
2251                    memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2252                    walk += 16;
2253                    memcpy( walk, &diffs6.added[i].port, 2 );
2254                    walk += 2;
2255                }
2256                assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2257                tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2258                tr_free( tmp );
2259
2260                /* "added6.f" */
2261                tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2262                for( i = 0; i < diffs6.addedCount; ++i )
2263                    *walk++ = diffs6.added[i].flags;
2264                assert( ( walk - tmp ) == diffs6.addedCount );
2265                tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2266                tr_free( tmp );
2267            }
2268
2269            if( diffs6.droppedCount > 0 )
2270            {
2271                /* "dropped6" */
2272                tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2273                for( i = 0; i < diffs6.droppedCount; ++i ) {
2274                    memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2275                    walk += 16;
2276                    memcpy( walk, &diffs6.dropped[i].port, 2 );
2277                    walk += 2;
2278                }
2279                assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2280                tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2281                tr_free( tmp );
2282            }
2283
2284            /* write the pex message */
2285            benc = tr_bencToStr( &val, TR_FMT_BENC, &bencLen );
2286            tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + bencLen );
2287            tr_peerIoWriteUint8 ( io, out, BT_LTEP );
2288            tr_peerIoWriteUint8 ( io, out, msgs->ut_pex_id );
2289            tr_peerIoWriteBytes ( io, out, benc, bencLen );
2290            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2291            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
2292            dbgOutMessageLen( msgs );
2293
2294            tr_free( benc );
2295            tr_bencFree( &val );
2296        }
2297
2298        /* cleanup */
2299        tr_free( diffs.added );
2300        tr_free( diffs.dropped );
2301        tr_free( newPex );
2302        tr_free( diffs6.added );
2303        tr_free( diffs6.dropped );
2304        tr_free( newPex6 );
2305
2306        /*msgs->clientSentPexAt = tr_time( );*/
2307    }
2308}
2309
2310static void
2311pexPulse( int foo UNUSED, short bar UNUSED, void * vmsgs )
2312{
2313    struct tr_peermsgs * msgs = vmsgs;
2314
2315    sendPex( msgs );
2316
2317    tr_timerAdd( &msgs->pexTimer, PEX_INTERVAL_SECS, 0 );
2318}
2319
2320/**
2321***
2322**/
2323
2324tr_peermsgs*
2325tr_peerMsgsNew( struct tr_torrent * torrent,
2326                struct tr_peer    * peer,
2327                tr_delivery_func    func,
2328                void              * userData,
2329                tr_publisher_tag  * setme )
2330{
2331    tr_peermsgs * m;
2332
2333    assert( peer );
2334    assert( peer->io );
2335
2336    m = tr_new0( tr_peermsgs, 1 );
2337    m->publisher = TR_PUBLISHER_INIT;
2338    m->peer = peer;
2339    m->torrent = torrent;
2340    m->peer->clientIsChoked = 1;
2341    m->peer->peerIsChoked = 1;
2342    m->peer->clientIsInterested = 0;
2343    m->peer->peerIsInterested = 0;
2344    m->state = AWAITING_BT_LENGTH;
2345    m->outMessages = evbuffer_new( );
2346    m->outMessagesBatchedAt = 0;
2347    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2348    m->incoming.block = evbuffer_new( );
2349    evtimer_set( &m->pexTimer, pexPulse, m );
2350    tr_timerAdd( &m->pexTimer, PEX_INTERVAL_SECS, 0 );
2351    peer->msgs = m;
2352
2353    *setme = tr_publisherSubscribe( &m->publisher, func, userData );
2354
2355    if( tr_peerIoSupportsLTEP( peer->io ) )
2356        sendLtepHandshake( m );
2357
2358    if( tr_dhtEnabled( torrent->session ) && tr_peerIoSupportsDHT( peer->io ))
2359    {
2360        /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
2361        const struct tr_address *addr = tr_peerIoGetAddress( peer->io, NULL );
2362        if( addr->type == TR_AF_INET || tr_globalIPv6() ) {
2363            protocolSendPort( m, tr_dhtPort( torrent->session ) );
2364        }
2365    }
2366
2367    tellPeerWhatWeHave( m );
2368
2369    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2370    updateDesiredRequestCount( m, tr_date( ) );
2371
2372    return m;
2373}
2374
2375void
2376tr_peerMsgsFree( tr_peermsgs* msgs )
2377{
2378    if( msgs )
2379    {
2380        evtimer_del( &msgs->pexTimer );
2381        tr_publisherDestruct( &msgs->publisher );
2382
2383        evbuffer_free( msgs->incoming.block );
2384        evbuffer_free( msgs->outMessages );
2385        tr_free( msgs->pex6 );
2386        tr_free( msgs->pex );
2387
2388        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2389        tr_free( msgs );
2390    }
2391}
2392
2393void
2394tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2395                        tr_publisher_tag tag )
2396{
2397    tr_publisherUnsubscribe( &peer->publisher, tag );
2398}
Note: See TracBrowser for help on using the repository browser.