source: trunk/libtransmission/peer-msgs.c @ 12013

Last change on this file since 12013 was 12013, checked in by jordan, 11 years ago

(trunk libT) #4051 "Use composition for the tr_history fields in tr_peer" -- fixed.

If we use composition on these objects we can save a handful of pointers per peer. This isn't a big deal, but it's an easy/safe change to do.

  • Property svn:keywords set to Date Rev Author Id
File size: 70.4 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 12013 2011-02-23 06:01:16Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdarg.h>
17#include <stdio.h>
18#include <stdlib.h>
19#include <string.h>
20
21#include <event2/bufferevent.h>
22#include <event2/event.h>
23
24#include "transmission.h"
25#include "bencode.h"
26#include "cache.h"
27#include "completion.h"
28#include "crypto.h"
29#ifdef WIN32
30#include "net.h" /* for ECONN */
31#endif
32#include "peer-io.h"
33#include "peer-mgr.h"
34#include "peer-msgs.h"
35#include "session.h"
36#include "stats.h"
37#include "torrent.h"
38#include "torrent-magnet.h"
39#include "tr-dht.h"
40#include "utils.h"
41#include "version.h"
42
43/**
44***
45**/
46
47enum
48{
49    BT_CHOKE                = 0,
50    BT_UNCHOKE              = 1,
51    BT_INTERESTED           = 2,
52    BT_NOT_INTERESTED       = 3,
53    BT_HAVE                 = 4,
54    BT_BITFIELD             = 5,
55    BT_REQUEST              = 6,
56    BT_PIECE                = 7,
57    BT_CANCEL               = 8,
58    BT_PORT                 = 9,
59
60    BT_FEXT_SUGGEST         = 13,
61    BT_FEXT_HAVE_ALL        = 14,
62    BT_FEXT_HAVE_NONE       = 15,
63    BT_FEXT_REJECT          = 16,
64    BT_FEXT_ALLOWED_FAST    = 17,
65
66    BT_LTEP                 = 20,
67
68    LTEP_HANDSHAKE          = 0,
69
70    UT_PEX_ID               = 1,
71    UT_METADATA_ID          = 3,
72
73    MAX_PEX_PEER_COUNT      = 50,
74
75    MIN_CHOKE_PERIOD_SEC    = 10,
76
77    /* idle seconds before we send a keepalive */
78    KEEPALIVE_INTERVAL_SECS = 100,
79
80    PEX_INTERVAL_SECS       = 90, /* sec between sendPex() calls */
81
82    REQQ                    = 512,
83
84    METADATA_REQQ           = 64,
85
86    /* used in lowering the outMessages queue period */
87    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
88    HIGH_PRIORITY_INTERVAL_SECS = 2,
89    LOW_PRIORITY_INTERVAL_SECS = 10,
90
91    /* number of pieces we'll allow in our fast set */
92    MAX_FAST_SET_SIZE = 3,
93
94    /* defined in BEP #9 */
95    METADATA_MSG_TYPE_REQUEST = 0,
96    METADATA_MSG_TYPE_DATA = 1,
97    METADATA_MSG_TYPE_REJECT = 2
98};
99
100enum
101{
102    AWAITING_BT_LENGTH,
103    AWAITING_BT_ID,
104    AWAITING_BT_MESSAGE,
105    AWAITING_BT_PIECE
106};
107
108/**
109***
110**/
111
112struct peer_request
113{
114    uint32_t    index;
115    uint32_t    offset;
116    uint32_t    length;
117};
118
119static void
120blockToReq( const tr_torrent     * tor,
121            tr_block_index_t       block,
122            struct peer_request  * setme )
123{
124    tr_torrentGetBlockLocation( tor, block, &setme->index,
125                                            &setme->offset,
126                                            &setme->length );
127}
128
129/**
130***
131**/
132
133/* this is raw, unchanged data from the peer regarding
134 * the current message that it's sending us. */
135struct tr_incoming
136{
137    uint8_t                id;
138    uint32_t               length; /* includes the +1 for id length */
139    struct peer_request    blockReq; /* metadata for incoming blocks */
140    struct evbuffer *      block; /* piece data for incoming blocks */
141};
142
143/**
144 * Low-level communication state information about a connected peer.
145 *
146 * This structure remembers the low-level protocol states that we're
147 * in with this peer, such as active requests, pex messages, and so on.
148 * Its fields are all private to peer-msgs.c.
149 *
150 * Data not directly involved with sending & receiving messages is
151 * stored in tr_peer, where it can be accessed by both peermsgs and
152 * the peer manager.
153 *
154 * @see struct peer_atom
155 * @see tr_peer
156 */
157struct tr_peermsgs
158{
159    tr_bool         peerSupportsPex;
160    tr_bool         peerSupportsMetadataXfer;
161    tr_bool         clientSentLtepHandshake;
162    tr_bool         peerSentLtepHandshake;
163
164    /*tr_bool         haveFastSet;*/
165
166    int             desiredRequestCount;
167
168    int             prefetchCount;
169
170    /* how long the outMessages batch should be allowed to grow before
171     * it's flushed -- some messages (like requests >:) should be sent
172     * very quickly; others aren't as urgent. */
173    int8_t          outMessagesBatchPeriod;
174
175    uint8_t         state;
176    uint8_t         ut_pex_id;
177    uint8_t         ut_metadata_id;
178    uint16_t        pexCount;
179    uint16_t        pexCount6;
180
181#if 0
182    size_t                 fastsetSize;
183    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
184#endif
185
186    tr_peer *              peer;
187
188    tr_torrent *           torrent;
189
190    tr_peer_callback      * callback;
191    void                  * callbackData;
192
193    struct evbuffer *      outMessages; /* all the non-piece messages */
194
195    struct peer_request    peerAskedFor[REQQ];
196
197    int                    peerAskedForMetadata[METADATA_REQQ];
198    int                    peerAskedForMetadataCount;
199
200    tr_pex               * pex;
201    tr_pex               * pex6;
202
203    /*time_t                 clientSentPexAt;*/
204    time_t                 clientSentAnythingAt;
205
206    /* when we started batching the outMessages */
207    time_t                outMessagesBatchedAt;
208
209    struct tr_incoming    incoming;
210
211    /* if the peer supports the Extension Protocol in BEP 10 and
212       supplied a reqq argument, it's stored here. Otherwise, the
213       value is zero and should be ignored. */
214    int64_t               reqq;
215
216    struct event        * pexTimer;
217};
218
219/**
220***
221**/
222
223#if 0
224static tr_bitfield*
225getHave( const struct tr_peermsgs * msgs )
226{
227    if( msgs->peer->have == NULL )
228        msgs->peer->have = tr_bitfieldNew( msgs->torrent->info.pieceCount );
229    return msgs->peer->have;
230}
231#endif
232
233static inline tr_session*
234getSession( struct tr_peermsgs * msgs )
235{
236    return msgs->torrent->session;
237}
238
239/**
240***
241**/
242
243static void
244myDebug( const char * file, int line,
245         const struct tr_peermsgs * msgs,
246         const char * fmt, ... )
247{
248    FILE * fp = tr_getLog( );
249
250    if( fp )
251    {
252        va_list           args;
253        char              timestr[64];
254        struct evbuffer * buf = evbuffer_new( );
255        char *            base = tr_basename( file );
256
257        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
258                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
259                             tr_torrentName( msgs->torrent ),
260                             tr_peerIoGetAddrStr( msgs->peer->io ),
261                             msgs->peer->client );
262        va_start( args, fmt );
263        evbuffer_add_vprintf( buf, fmt, args );
264        va_end( args );
265        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
266        fputs( (const char*)evbuffer_pullup( buf, -1 ), fp );
267
268        tr_free( base );
269        evbuffer_free( buf );
270    }
271}
272
273#define dbgmsg( msgs, ... ) \
274    do { \
275        if( tr_deepLoggingIsActive( ) ) \
276            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
277    } while( 0 )
278
279/**
280***
281**/
282
283static void
284pokeBatchPeriod( tr_peermsgs * msgs,
285                 int           interval )
286{
287    if( msgs->outMessagesBatchPeriod > interval )
288    {
289        msgs->outMessagesBatchPeriod = interval;
290        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
291    }
292}
293
294static void
295dbgOutMessageLen( tr_peermsgs * msgs )
296{
297    dbgmsg( msgs, "outMessage size is now %zu", evbuffer_get_length( msgs->outMessages ) );
298}
299
300static void
301protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
302{
303    struct evbuffer * out = msgs->outMessages;
304
305    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
306
307    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
308    evbuffer_add_uint8 ( out, BT_FEXT_REJECT );
309    evbuffer_add_uint32( out, req->index );
310    evbuffer_add_uint32( out, req->offset );
311    evbuffer_add_uint32( out, req->length );
312
313    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
314    dbgOutMessageLen( msgs );
315}
316
317static void
318protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
319{
320    struct evbuffer * out = msgs->outMessages;
321
322    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
323    evbuffer_add_uint8 ( out, BT_REQUEST );
324    evbuffer_add_uint32( out, req->index );
325    evbuffer_add_uint32( out, req->offset );
326    evbuffer_add_uint32( out, req->length );
327
328    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
329    dbgOutMessageLen( msgs );
330    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
331}
332
333static void
334protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
335{
336    struct evbuffer * out = msgs->outMessages;
337
338    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
339    evbuffer_add_uint8 ( out, BT_CANCEL );
340    evbuffer_add_uint32( out, req->index );
341    evbuffer_add_uint32( out, req->offset );
342    evbuffer_add_uint32( out, req->length );
343
344    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
345    dbgOutMessageLen( msgs );
346    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
347}
348
349static void
350protocolSendPort(tr_peermsgs *msgs, uint16_t port)
351{
352    struct evbuffer * out = msgs->outMessages;
353
354    dbgmsg( msgs, "sending Port %u", port);
355    evbuffer_add_uint32( out, 3 );
356    evbuffer_add_uint8 ( out, BT_PORT );
357    evbuffer_add_uint16( out, port);
358}
359
360static void
361protocolSendHave( tr_peermsgs * msgs, uint32_t index )
362{
363    struct evbuffer * out = msgs->outMessages;
364
365    evbuffer_add_uint32( out, sizeof(uint8_t) + sizeof(uint32_t) );
366    evbuffer_add_uint8 ( out, BT_HAVE );
367    evbuffer_add_uint32( out, index );
368
369    dbgmsg( msgs, "sending Have %u", index );
370    dbgOutMessageLen( msgs );
371    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
372}
373
374#if 0
375static void
376protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
377{
378    tr_peerIo       * io  = msgs->peer->io;
379    struct evbuffer * out = msgs->outMessages;
380
381    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
382
383    evbuffer_add_uint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
384    evbuffer_add_uint8 ( io, out, BT_FEXT_ALLOWED_FAST );
385    evbuffer_add_uint32( io, out, pieceIndex );
386
387    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
388    dbgOutMessageLen( msgs );
389}
390#endif
391
392static void
393protocolSendChoke( tr_peermsgs * msgs, int choke )
394{
395    struct evbuffer * out = msgs->outMessages;
396
397    evbuffer_add_uint32( out, sizeof( uint8_t ) );
398    evbuffer_add_uint8 ( out, choke ? BT_CHOKE : BT_UNCHOKE );
399
400    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
401    dbgOutMessageLen( msgs );
402    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
403}
404
405static void
406protocolSendHaveAll( tr_peermsgs * msgs )
407{
408    struct evbuffer * out = msgs->outMessages;
409
410    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
411
412    evbuffer_add_uint32( out, sizeof( uint8_t ) );
413    evbuffer_add_uint8 ( out, BT_FEXT_HAVE_ALL );
414
415    dbgmsg( msgs, "sending HAVE_ALL..." );
416    dbgOutMessageLen( msgs );
417    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
418}
419
420static void
421protocolSendHaveNone( tr_peermsgs * msgs )
422{
423    struct evbuffer * out = msgs->outMessages;
424
425    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
426
427    evbuffer_add_uint32( out, sizeof( uint8_t ) );
428    evbuffer_add_uint8 ( out, BT_FEXT_HAVE_NONE );
429
430    dbgmsg( msgs, "sending HAVE_NONE..." );
431    dbgOutMessageLen( msgs );
432    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
433}
434
435/**
436***  EVENTS
437**/
438
439static void
440publish( tr_peermsgs * msgs, tr_peer_event * e )
441{
442    assert( msgs->peer );
443    assert( msgs->peer->msgs == msgs );
444
445    if( msgs->callback != NULL )
446        msgs->callback( msgs->peer, e, msgs->callbackData );
447}
448
449static void
450fireError( tr_peermsgs * msgs, int err )
451{
452    tr_peer_event e = TR_PEER_EVENT_INIT;
453    e.eventType = TR_PEER_ERROR;
454    e.err = err;
455    publish( msgs, &e );
456}
457
458static void
459firePeerProgress( tr_peermsgs * msgs )
460{
461    tr_peer_event e = TR_PEER_EVENT_INIT;
462    e.eventType = TR_PEER_PEER_PROGRESS;
463    e.progress = msgs->peer->progress;
464    publish( msgs, &e );
465}
466
467static void
468fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
469{
470    tr_peer_event e = TR_PEER_EVENT_INIT;
471    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
472    e.pieceIndex = req->index;
473    e.offset = req->offset;
474    e.length = req->length;
475    publish( msgs, &e );
476}
477
478static void
479fireGotRej( tr_peermsgs * msgs, const struct peer_request * req )
480{
481    tr_peer_event e = TR_PEER_EVENT_INIT;
482    e.eventType = TR_PEER_CLIENT_GOT_REJ;
483    e.pieceIndex = req->index;
484    e.offset = req->offset;
485    e.length = req->length;
486    publish( msgs, &e );
487}
488
489static void
490fireGotChoke( tr_peermsgs * msgs )
491{
492    tr_peer_event e = TR_PEER_EVENT_INIT;
493    e.eventType = TR_PEER_CLIENT_GOT_CHOKE;
494    publish( msgs, &e );
495}
496
497static void
498fireClientGotHaveAll( tr_peermsgs * msgs )
499{
500    tr_peer_event e = TR_PEER_EVENT_INIT;
501    e.eventType = TR_PEER_CLIENT_GOT_HAVE_ALL;
502    publish( msgs, &e );
503}
504
505static void
506fireClientGotHaveNone( tr_peermsgs * msgs )
507{
508    tr_peer_event e = TR_PEER_EVENT_INIT;
509    e.eventType = TR_PEER_CLIENT_GOT_HAVE_NONE;
510    publish( msgs, &e );
511}
512
513static void
514fireClientGotData( tr_peermsgs * msgs,
515                   uint32_t      length,
516                   int           wasPieceData )
517{
518    tr_peer_event e = TR_PEER_EVENT_INIT;
519
520    e.length = length;
521    e.eventType = TR_PEER_CLIENT_GOT_DATA;
522    e.wasPieceData = wasPieceData;
523    publish( msgs, &e );
524}
525
526static void
527fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
528{
529    tr_peer_event e = TR_PEER_EVENT_INIT;
530    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
531    e.pieceIndex = pieceIndex;
532    publish( msgs, &e );
533}
534
535static void
536fireClientGotPort( tr_peermsgs * msgs, tr_port port )
537{
538    tr_peer_event e = TR_PEER_EVENT_INIT;
539    e.eventType = TR_PEER_CLIENT_GOT_PORT;
540    e.port = port;
541    publish( msgs, &e );
542}
543
544static void
545fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
546{
547    tr_peer_event e = TR_PEER_EVENT_INIT;
548    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
549    e.pieceIndex = pieceIndex;
550    publish( msgs, &e );
551}
552
553static void
554fireClientGotBitfield( tr_peermsgs * msgs, tr_bitfield * bitfield )
555{
556    tr_peer_event e = TR_PEER_EVENT_INIT;
557    e.eventType = TR_PEER_CLIENT_GOT_BITFIELD;
558    e.bitfield = bitfield;
559    publish( msgs, &e );
560}
561
562static void
563fireClientGotHave( tr_peermsgs * msgs, tr_piece_index_t index )
564{
565    tr_peer_event e = TR_PEER_EVENT_INIT;
566    e.eventType = TR_PEER_CLIENT_GOT_HAVE;
567    e.pieceIndex = index;
568    publish( msgs, &e );
569}
570
571static void
572firePeerGotData( tr_peermsgs  * msgs,
573                 uint32_t       length,
574                 int            wasPieceData )
575{
576    tr_peer_event e = TR_PEER_EVENT_INIT;
577
578    e.length = length;
579    e.eventType = TR_PEER_PEER_GOT_DATA;
580    e.wasPieceData = wasPieceData;
581
582    publish( msgs, &e );
583}
584
585/**
586***  ALLOWED FAST SET
587***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
588**/
589
590size_t
591tr_generateAllowedSet( tr_piece_index_t * setmePieces,
592                       size_t             desiredSetSize,
593                       size_t             pieceCount,
594                       const uint8_t    * infohash,
595                       const tr_address * addr )
596{
597    size_t setSize = 0;
598
599    assert( setmePieces );
600    assert( desiredSetSize <= pieceCount );
601    assert( desiredSetSize );
602    assert( pieceCount );
603    assert( infohash );
604    assert( addr );
605
606    if( addr->type == TR_AF_INET )
607    {
608        uint8_t w[SHA_DIGEST_LENGTH + 4], *walk=w;
609        uint8_t x[SHA_DIGEST_LENGTH];
610
611        uint32_t ui32 = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
612        memcpy( w, &ui32, sizeof( uint32_t ) );
613        walk += sizeof( uint32_t );
614        memcpy( walk, infohash, SHA_DIGEST_LENGTH );                 /* (2) */
615        walk += SHA_DIGEST_LENGTH;
616        tr_sha1( x, w, walk-w, NULL );                               /* (3) */
617        assert( sizeof( w ) == walk-w );
618
619        while( setSize<desiredSetSize )
620        {
621            int i;
622            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
623            {
624                size_t k;
625                uint32_t j = i * 4;                                  /* (5) */
626                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
627                uint32_t index = y % pieceCount;                     /* (7) */
628
629                for( k=0; k<setSize; ++k )                           /* (8) */
630                    if( setmePieces[k] == index )
631                        break;
632
633                if( k == setSize )
634                    setmePieces[setSize++] = index;                  /* (9) */
635            }
636
637            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
638        }
639    }
640
641    return setSize;
642}
643
644static void
645updateFastSet( tr_peermsgs * msgs UNUSED )
646{
647#if 0
648    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
649    const int peerIsNeedy = msgs->peer->progress < 0.10;
650
651    if( fext && peerIsNeedy && !msgs->haveFastSet )
652    {
653        size_t i;
654        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
655        const tr_info * inf = &msgs->torrent->info;
656        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
657
658        /* build the fast set */
659        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
660        msgs->haveFastSet = 1;
661
662        /* send it to the peer */
663        for( i=0; i<msgs->fastsetSize; ++i )
664            protocolSendAllowedFast( msgs, msgs->fastset[i] );
665    }
666#endif
667}
668
669/**
670***  INTEREST
671**/
672
673static void
674sendInterest( tr_peermsgs * msgs, tr_bool clientIsInterested )
675{
676    struct evbuffer * out = msgs->outMessages;
677
678    assert( msgs );
679    assert( tr_isBool( clientIsInterested ) );
680
681    msgs->peer->clientIsInterested = clientIsInterested;
682    dbgmsg( msgs, "Sending %s", clientIsInterested ? "Interested" : "Not Interested" );
683    evbuffer_add_uint32( out, sizeof( uint8_t ) );
684    evbuffer_add_uint8 ( out, clientIsInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
685
686    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
687    dbgOutMessageLen( msgs );
688}
689
690static void
691updateInterest( tr_peermsgs * msgs UNUSED )
692{
693    /* FIXME -- might need to poke the mgr on startup */
694}
695
696void
697tr_peerMsgsSetInterested( tr_peermsgs * msgs, int isInterested )
698{
699    assert( tr_isBool( isInterested ) );
700
701    if( isInterested != msgs->peer->clientIsInterested )
702        sendInterest( msgs, isInterested );
703}
704
705static tr_bool
706popNextMetadataRequest( tr_peermsgs * msgs, int * piece )
707{
708    if( msgs->peerAskedForMetadataCount == 0 )
709        return FALSE;
710
711    *piece = msgs->peerAskedForMetadata[0];
712
713    tr_removeElementFromArray( msgs->peerAskedForMetadata, 0, sizeof( int ),
714                               msgs->peerAskedForMetadataCount-- );
715
716    return TRUE;
717}
718
719static tr_bool
720popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
721{
722    if( msgs->peer->pendingReqsToClient == 0 )
723        return FALSE;
724
725    *setme = msgs->peerAskedFor[0];
726
727    tr_removeElementFromArray( msgs->peerAskedFor, 0, sizeof( struct peer_request ),
728                               msgs->peer->pendingReqsToClient-- );
729
730    return TRUE;
731}
732
733static void
734cancelAllRequestsToClient( tr_peermsgs * msgs )
735{
736    struct peer_request req;
737    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
738
739    while( popNextRequest( msgs, &req ))
740        if( mustSendCancel )
741            protocolSendReject( msgs, &req );
742}
743
744void
745tr_peerMsgsSetChoke( tr_peermsgs * msgs,
746                     int           choke )
747{
748    const time_t now = tr_time( );
749    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
750
751    assert( msgs );
752    assert( msgs->peer );
753    assert( choke == 0 || choke == 1 );
754
755    if( msgs->peer->chokeChangedAt > fibrillationTime )
756    {
757        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
758    }
759    else if( msgs->peer->peerIsChoked != choke )
760    {
761        msgs->peer->peerIsChoked = choke;
762        if( choke )
763            cancelAllRequestsToClient( msgs );
764        protocolSendChoke( msgs, choke );
765        msgs->peer->chokeChangedAt = now;
766    }
767}
768
769/**
770***
771**/
772
773void
774tr_peerMsgsHave( tr_peermsgs * msgs,
775                 uint32_t      index )
776{
777    protocolSendHave( msgs, index );
778
779    /* since we have more pieces now, we might not be interested in this peer */
780    updateInterest( msgs );
781}
782
783/**
784***
785**/
786
787static tr_bool
788reqIsValid( const tr_peermsgs * peer,
789            uint32_t            index,
790            uint32_t            offset,
791            uint32_t            length )
792{
793    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
794}
795
796static tr_bool
797requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
798{
799    return reqIsValid( msgs, req->index, req->offset, req->length );
800}
801
802void
803tr_peerMsgsCancel( tr_peermsgs * msgs, tr_block_index_t block )
804{
805    struct peer_request req;
806/*fprintf( stderr, "SENDING CANCEL MESSAGE FOR BLOCK %zu\n\t\tFROM PEER %p ------------------------------------\n", (size_t)block, msgs->peer );*/
807    blockToReq( msgs->torrent, block, &req );
808    protocolSendCancel( msgs, &req );
809}
810
811/**
812***
813**/
814
815static void
816sendLtepHandshake( tr_peermsgs * msgs )
817{
818    tr_benc val, *m;
819    char * buf;
820    int len;
821    tr_bool allow_pex;
822    tr_bool allow_metadata_xfer;
823    struct evbuffer * out = msgs->outMessages;
824    const unsigned char * ipv6 = tr_globalIPv6();
825
826    if( msgs->clientSentLtepHandshake )
827        return;
828
829    dbgmsg( msgs, "sending an ltep handshake" );
830    msgs->clientSentLtepHandshake = 1;
831
832    /* decide if we want to advertise metadata xfer support (BEP 9) */
833    if( tr_torrentIsPrivate( msgs->torrent ) )
834        allow_metadata_xfer = 0;
835    else
836        allow_metadata_xfer = 1;
837
838    /* decide if we want to advertise pex support */
839    if( !tr_torrentAllowsPex( msgs->torrent ) )
840        allow_pex = 0;
841    else if( msgs->peerSentLtepHandshake )
842        allow_pex = msgs->peerSupportsPex ? 1 : 0;
843    else
844        allow_pex = 1;
845
846    tr_bencInitDict( &val, 8 );
847    tr_bencDictAddInt( &val, "e", getSession(msgs)->encryptionMode != TR_CLEAR_PREFERRED );
848    if( ipv6 != NULL )
849        tr_bencDictAddRaw( &val, "ipv6", ipv6, 16 );
850    if( allow_metadata_xfer && tr_torrentHasMetadata( msgs->torrent )
851                            && ( msgs->torrent->infoDictLength > 0 ) )
852        tr_bencDictAddInt( &val, "metadata_size", msgs->torrent->infoDictLength );
853    tr_bencDictAddInt( &val, "p", tr_sessionGetPublicPeerPort( getSession(msgs) ) );
854    tr_bencDictAddInt( &val, "reqq", REQQ );
855    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
856    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
857    m  = tr_bencDictAddDict( &val, "m", 2 );
858    if( allow_metadata_xfer )
859        tr_bencDictAddInt( m, "ut_metadata", UT_METADATA_ID );
860    if( allow_pex )
861        tr_bencDictAddInt( m, "ut_pex", UT_PEX_ID );
862
863    buf = tr_bencToStr( &val, TR_FMT_BENC, &len );
864
865    evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + len );
866    evbuffer_add_uint8 ( out, BT_LTEP );
867    evbuffer_add_uint8 ( out, LTEP_HANDSHAKE );
868    evbuffer_add       ( out, buf, len );
869    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
870    dbgOutMessageLen( msgs );
871
872    /* cleanup */
873    tr_bencFree( &val );
874    tr_free( buf );
875}
876
877static void
878parseLtepHandshake( tr_peermsgs *     msgs,
879                    int               len,
880                    struct evbuffer * inbuf )
881{
882    int64_t   i;
883    tr_benc   val, * sub;
884    uint8_t * tmp = tr_new( uint8_t, len );
885    const uint8_t *addr;
886    size_t addr_len;
887    tr_pex pex;
888    int8_t seedProbability = -1;
889
890    memset( &pex, 0, sizeof( tr_pex ) );
891
892    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
893    msgs->peerSentLtepHandshake = 1;
894
895    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
896    {
897        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
898        tr_free( tmp );
899        return;
900    }
901
902    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
903
904    /* does the peer prefer encrypted connections? */
905    if( tr_bencDictFindInt( &val, "e", &i ) ) {
906        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
907                                              : ENCRYPTION_PREFERENCE_NO;
908        if( i )
909            pex.flags |= ADDED_F_ENCRYPTION_FLAG;
910    }
911
912    /* check supported messages for utorrent pex */
913    msgs->peerSupportsPex = 0;
914    msgs->peerSupportsMetadataXfer = 0;
915
916    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
917        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
918            msgs->peerSupportsPex = i != 0;
919            msgs->ut_pex_id = (uint8_t) i;
920            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
921        }
922        if( tr_bencDictFindInt( sub, "ut_metadata", &i ) ) {
923            msgs->peerSupportsMetadataXfer = i != 0;
924            msgs->ut_metadata_id = (uint8_t) i;
925            dbgmsg( msgs, "msgs->ut_metadata_id is %d", (int)msgs->ut_metadata_id );
926        }
927        if( tr_bencDictFindInt( sub, "ut_holepunch", &i ) ) {
928            /* Mysterious µTorrent extension that we don't grok.  However,
929               it implies support for µTP, so use it to indicate that. */
930            tr_peerMgrSetUtpFailed( msgs->torrent,
931                                    tr_peerIoGetAddress( msgs->peer->io, NULL ),
932                                    FALSE );
933        }
934    }
935
936    /* look for metainfo size (BEP 9) */
937    if( tr_bencDictFindInt( &val, "metadata_size", &i ) )
938        tr_torrentSetMetadataSizeHint( msgs->torrent, i );
939
940    /* look for upload_only (BEP 21) */
941    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
942        seedProbability = i==0 ? 0 : 100;
943
944    /* get peer's listening port */
945    if( tr_bencDictFindInt( &val, "p", &i ) ) {
946        pex.port = htons( (uint16_t)i );
947        fireClientGotPort( msgs, pex.port );
948        dbgmsg( msgs, "peer's port is now %d", (int)i );
949    }
950
951    if( tr_peerIoIsIncoming( msgs->peer->io )
952        && tr_bencDictFindRaw( &val, "ipv4", &addr, &addr_len )
953        && ( addr_len == 4 ) )
954    {
955        pex.addr.type = TR_AF_INET;
956        memcpy( &pex.addr.addr.addr4, addr, 4 );
957        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
958    }
959
960    if( tr_peerIoIsIncoming( msgs->peer->io )
961        && tr_bencDictFindRaw( &val, "ipv6", &addr, &addr_len )
962        && ( addr_len == 16 ) )
963    {
964        pex.addr.type = TR_AF_INET6;
965        memcpy( &pex.addr.addr.addr6, addr, 16 );
966        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
967    }
968
969    /* get peer's maximum request queue size */
970    if( tr_bencDictFindInt( &val, "reqq", &i ) )
971        msgs->reqq = i;
972
973    tr_bencFree( &val );
974    tr_free( tmp );
975}
976
977static void
978parseUtMetadata( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
979{
980    tr_benc dict;
981    char * msg_end;
982    char * benc_end;
983    int64_t msg_type = -1;
984    int64_t piece = -1;
985    int64_t total_size = 0;
986    uint8_t * tmp = tr_new( uint8_t, msglen );
987
988    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
989    msg_end = (char*)tmp + msglen;
990
991    if( !tr_bencLoad( tmp, msglen, &dict, &benc_end ) )
992    {
993        tr_bencDictFindInt( &dict, "msg_type", &msg_type );
994        tr_bencDictFindInt( &dict, "piece", &piece );
995        tr_bencDictFindInt( &dict, "total_size", &total_size );
996        tr_bencFree( &dict );
997    }
998
999    dbgmsg( msgs, "got ut_metadata msg: type %d, piece %d, total_size %d",
1000            (int)msg_type, (int)piece, (int)total_size );
1001
1002    if( msg_type == METADATA_MSG_TYPE_REJECT )
1003    {
1004        /* NOOP */
1005    }
1006
1007    if( ( msg_type == METADATA_MSG_TYPE_DATA )
1008        && ( !tr_torrentHasMetadata( msgs->torrent ) )
1009        && ( msg_end - benc_end <= METADATA_PIECE_SIZE )
1010        && ( piece * METADATA_PIECE_SIZE + (msg_end - benc_end) <= total_size ) )
1011    {
1012        const int pieceLen = msg_end - benc_end;
1013        tr_torrentSetMetadataPiece( msgs->torrent, piece, benc_end, pieceLen );
1014    }
1015
1016    if( msg_type == METADATA_MSG_TYPE_REQUEST )
1017    {
1018        if( ( piece >= 0 )
1019            && tr_torrentHasMetadata( msgs->torrent )
1020            && !tr_torrentIsPrivate( msgs->torrent )
1021            && ( msgs->peerAskedForMetadataCount < METADATA_REQQ ) )
1022        {
1023            msgs->peerAskedForMetadata[msgs->peerAskedForMetadataCount++] = piece;
1024        }
1025        else
1026        {
1027            tr_benc tmp;
1028            int payloadLen;
1029            char * payload;
1030            struct evbuffer * out = msgs->outMessages;
1031
1032            /* build the rejection message */
1033            tr_bencInitDict( &tmp, 2 );
1034            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1035            tr_bencDictAddInt( &tmp, "piece", piece );
1036            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1037            tr_bencFree( &tmp );
1038
1039            /* write it out as a LTEP message to our outMessages buffer */
1040            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen );
1041            evbuffer_add_uint8 ( out, BT_LTEP );
1042            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1043            evbuffer_add       ( out, payload, payloadLen );
1044            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1045            dbgOutMessageLen( msgs );
1046
1047            tr_free( payload );
1048        }
1049    }
1050
1051    tr_free( tmp );
1052}
1053
1054static void
1055parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1056{
1057    int loaded = 0;
1058    uint8_t * tmp = tr_new( uint8_t, msglen );
1059    tr_benc val;
1060    tr_torrent * tor = msgs->torrent;
1061    const uint8_t * added;
1062    size_t added_len;
1063
1064    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1065
1066    if( tr_torrentAllowsPex( tor )
1067      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1068    {
1069        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1070        {
1071            tr_pex * pex;
1072            size_t i, n;
1073            size_t added_f_len = 0;
1074            const uint8_t * added_f = NULL;
1075
1076            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1077            pex = tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len, &n );
1078
1079            n = MIN( n, MAX_PEX_PEER_COUNT );
1080            for( i=0; i<n; ++i )
1081            {
1082                int seedProbability = -1;
1083                if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
1084                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
1085            }
1086
1087            tr_free( pex );
1088        }
1089
1090        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1091        {
1092            tr_pex * pex;
1093            size_t i, n;
1094            size_t added_f_len = 0;
1095            const uint8_t * added_f = NULL;
1096
1097            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1098            pex = tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len, &n );
1099
1100            n = MIN( n, MAX_PEX_PEER_COUNT );
1101            for( i=0; i<n; ++i )
1102            {
1103                int seedProbability = -1;
1104                if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
1105                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
1106            }
1107
1108            tr_free( pex );
1109        }
1110    }
1111
1112    if( loaded )
1113        tr_bencFree( &val );
1114    tr_free( tmp );
1115}
1116
1117static void sendPex( tr_peermsgs * msgs );
1118
1119static void
1120parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer  * inbuf )
1121{
1122    uint8_t ltep_msgid;
1123
1124    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1125    msglen--;
1126
1127    if( ltep_msgid == LTEP_HANDSHAKE )
1128    {
1129        dbgmsg( msgs, "got ltep handshake" );
1130        parseLtepHandshake( msgs, msglen, inbuf );
1131        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1132        {
1133            sendLtepHandshake( msgs );
1134            sendPex( msgs );
1135        }
1136    }
1137    else if( ltep_msgid == UT_PEX_ID )
1138    {
1139        dbgmsg( msgs, "got ut pex" );
1140        msgs->peerSupportsPex = 1;
1141        parseUtPex( msgs, msglen, inbuf );
1142    }
1143    else if( ltep_msgid == UT_METADATA_ID )
1144    {
1145        dbgmsg( msgs, "got ut metadata" );
1146        msgs->peerSupportsMetadataXfer = 1;
1147        parseUtMetadata( msgs, msglen, inbuf );
1148    }
1149    else
1150    {
1151        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1152        evbuffer_drain( inbuf, msglen );
1153    }
1154}
1155
1156static int
1157readBtLength( tr_peermsgs *     msgs,
1158              struct evbuffer * inbuf,
1159              size_t            inlen )
1160{
1161    uint32_t len;
1162
1163    if( inlen < sizeof( len ) )
1164        return READ_LATER;
1165
1166    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1167
1168    if( len == 0 ) /* peer sent us a keepalive message */
1169        dbgmsg( msgs, "got KeepAlive" );
1170    else
1171    {
1172        msgs->incoming.length = len;
1173        msgs->state = AWAITING_BT_ID;
1174    }
1175
1176    return READ_NOW;
1177}
1178
1179static int readBtMessage( tr_peermsgs     * msgs,
1180                          struct evbuffer * inbuf,
1181                          size_t            inlen );
1182
1183static int
1184readBtId( tr_peermsgs * msgs, struct evbuffer  * inbuf, size_t inlen )
1185{
1186    uint8_t id;
1187
1188    if( inlen < sizeof( uint8_t ) )
1189        return READ_LATER;
1190
1191    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1192    msgs->incoming.id = id;
1193    dbgmsg( msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length );
1194
1195    if( id == BT_PIECE )
1196    {
1197        msgs->state = AWAITING_BT_PIECE;
1198        return READ_NOW;
1199    }
1200    else if( msgs->incoming.length != 1 )
1201    {
1202        msgs->state = AWAITING_BT_MESSAGE;
1203        return READ_NOW;
1204    }
1205    else return readBtMessage( msgs, inbuf, inlen - 1 );
1206}
1207
1208static void
1209updatePeerProgress( tr_peermsgs * msgs )
1210{
1211    msgs->peer->progress = tr_bitsetPercent( &msgs->peer->have );
1212    dbgmsg( msgs, "peer progress is %f", msgs->peer->progress );
1213    updateFastSet( msgs );
1214    updateInterest( msgs );
1215    firePeerProgress( msgs );
1216}
1217
1218static void
1219prefetchPieces( tr_peermsgs *msgs )
1220{
1221    int i;
1222
1223    if( !getSession(msgs)->isPrefetchEnabled )
1224        return;
1225
1226    /* Maintain 12 prefetched blocks per unchoked peer */
1227    for( i=msgs->prefetchCount; i<msgs->peer->pendingReqsToClient && i<12; ++i )
1228    {
1229        const struct peer_request * req = msgs->peerAskedFor + i;
1230        if( requestIsValid( msgs, req ) )
1231        {
1232            tr_cachePrefetchBlock( getSession(msgs)->cache, msgs->torrent, req->index, req->offset, req->length );
1233            ++msgs->prefetchCount;
1234        }
1235    }
1236}
1237
1238static void
1239peerMadeRequest( tr_peermsgs *               msgs,
1240                 const struct peer_request * req )
1241{
1242    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1243    const int reqIsValid = requestIsValid( msgs, req );
1244    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( &msgs->torrent->completion, req->index );
1245    const int peerIsChoked = msgs->peer->peerIsChoked;
1246
1247    int allow = FALSE;
1248
1249    if( !reqIsValid )
1250        dbgmsg( msgs, "rejecting an invalid request." );
1251    else if( !clientHasPiece )
1252        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1253    else if( peerIsChoked )
1254        dbgmsg( msgs, "rejecting request from choked peer" );
1255    else if( msgs->peer->pendingReqsToClient + 1 >= REQQ )
1256        dbgmsg( msgs, "rejecting request ... reqq is full" );
1257    else
1258        allow = TRUE;
1259
1260    if( allow ) {
1261        msgs->peerAskedFor[msgs->peer->pendingReqsToClient++] = *req;
1262        prefetchPieces( msgs );
1263    } else if( fext ) {
1264        protocolSendReject( msgs, req );
1265    }
1266}
1267
1268static tr_bool
1269messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1270{
1271    switch( id )
1272    {
1273        case BT_CHOKE:
1274        case BT_UNCHOKE:
1275        case BT_INTERESTED:
1276        case BT_NOT_INTERESTED:
1277        case BT_FEXT_HAVE_ALL:
1278        case BT_FEXT_HAVE_NONE:
1279            return len == 1;
1280
1281        case BT_HAVE:
1282        case BT_FEXT_SUGGEST:
1283        case BT_FEXT_ALLOWED_FAST:
1284            return len == 5;
1285
1286        case BT_BITFIELD:
1287            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1288
1289        case BT_REQUEST:
1290        case BT_CANCEL:
1291        case BT_FEXT_REJECT:
1292            return len == 13;
1293
1294        case BT_PIECE:
1295            return len > 9 && len <= 16393;
1296
1297        case BT_PORT:
1298            return len == 3;
1299
1300        case BT_LTEP:
1301            return len >= 2;
1302
1303        default:
1304            return FALSE;
1305    }
1306}
1307
1308static int clientGotBlock( tr_peermsgs *               msgs,
1309                           struct evbuffer *           block,
1310                           const struct peer_request * req );
1311
1312static int
1313readBtPiece( tr_peermsgs      * msgs,
1314             struct evbuffer  * inbuf,
1315             size_t             inlen,
1316             size_t           * setme_piece_bytes_read )
1317{
1318    struct peer_request * req = &msgs->incoming.blockReq;
1319
1320    assert( evbuffer_get_length( inbuf ) >= inlen );
1321    dbgmsg( msgs, "In readBtPiece" );
1322
1323    if( !req->length )
1324    {
1325        if( inlen < 8 )
1326            return READ_LATER;
1327
1328        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1329        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1330        req->length = msgs->incoming.length - 9;
1331        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1332        return READ_NOW;
1333    }
1334    else
1335    {
1336        int err;
1337
1338        /* read in another chunk of data */
1339        const size_t nLeft = req->length - evbuffer_get_length( msgs->incoming.block );
1340        size_t n = MIN( nLeft, inlen );
1341        size_t i = n;
1342        void * buf = tr_sessionGetBuffer( getSession( msgs ) );
1343        const size_t buflen = SESSION_BUFFER_SIZE;
1344
1345        while( i > 0 )
1346        {
1347            const size_t thisPass = MIN( i, buflen );
1348            tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, thisPass );
1349            evbuffer_add( msgs->incoming.block, buf, thisPass );
1350            i -= thisPass;
1351        }
1352
1353        tr_sessionReleaseBuffer( getSession( msgs ) );
1354        buf = NULL;
1355
1356        fireClientGotData( msgs, n, TRUE );
1357        *setme_piece_bytes_read += n;
1358        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1359               n, req->index, req->offset, req->length,
1360               (int)( req->length - evbuffer_get_length( msgs->incoming.block ) ) );
1361        if( evbuffer_get_length( msgs->incoming.block ) < req->length )
1362            return READ_LATER;
1363
1364        /* we've got the whole block ... process it */
1365        err = clientGotBlock( msgs, msgs->incoming.block, req );
1366
1367        /* cleanup */
1368        evbuffer_free( msgs->incoming.block );
1369        msgs->incoming.block = evbuffer_new( );
1370        req->length = 0;
1371        msgs->state = AWAITING_BT_LENGTH;
1372        return err ? READ_ERR : READ_NOW;
1373    }
1374}
1375
1376static void updateDesiredRequestCount( tr_peermsgs * msgs );
1377
1378static int
1379readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1380{
1381    uint32_t      ui32;
1382    uint32_t      msglen = msgs->incoming.length;
1383    const uint8_t id = msgs->incoming.id;
1384#ifndef NDEBUG
1385    const size_t  startBufLen = evbuffer_get_length( inbuf );
1386#endif
1387    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1388
1389    --msglen; /* id length */
1390
1391    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1392
1393    if( inlen < msglen )
1394        return READ_LATER;
1395
1396    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1397    {
1398        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1399        fireError( msgs, EMSGSIZE );
1400        return READ_ERR;
1401    }
1402
1403    switch( id )
1404    {
1405        case BT_CHOKE:
1406            dbgmsg( msgs, "got Choke" );
1407            msgs->peer->clientIsChoked = 1;
1408            if( !fext )
1409                fireGotChoke( msgs );
1410            break;
1411
1412        case BT_UNCHOKE:
1413            dbgmsg( msgs, "got Unchoke" );
1414            msgs->peer->clientIsChoked = 0;
1415            updateDesiredRequestCount( msgs );
1416            break;
1417
1418        case BT_INTERESTED:
1419            dbgmsg( msgs, "got Interested" );
1420            msgs->peer->peerIsInterested = 1;
1421            break;
1422
1423        case BT_NOT_INTERESTED:
1424            dbgmsg( msgs, "got Not Interested" );
1425            msgs->peer->peerIsInterested = 0;
1426            break;
1427
1428        case BT_HAVE:
1429            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1430            dbgmsg( msgs, "got Have: %u", ui32 );
1431            if( tr_torrentHasMetadata( msgs->torrent )
1432                    && ( ui32 >= msgs->torrent->info.pieceCount ) )
1433            {
1434                fireError( msgs, ERANGE );
1435                return READ_ERR;
1436            }
1437
1438            /* a peer can send the same HAVE message twice... */
1439            if( !tr_bitsetHas( &msgs->peer->have, ui32 ) ) {
1440                tr_bitsetAdd( &msgs->peer->have, ui32 );
1441                fireClientGotHave( msgs, ui32 );
1442            }
1443            updatePeerProgress( msgs );
1444            break;
1445
1446        case BT_BITFIELD: {
1447            tr_bitfield tmp = TR_BITFIELD_INIT;
1448            const size_t bitCount = tr_torrentHasMetadata( msgs->torrent )
1449                                  ? msgs->torrent->info.pieceCount
1450                                  : msglen * 8;
1451            tr_bitfieldConstruct( &tmp, bitCount );
1452            dbgmsg( msgs, "got a bitfield" );
1453            tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp.bits, msglen );
1454            tr_bitsetSetBitfield( &msgs->peer->have, &tmp );
1455            fireClientGotBitfield( msgs, &tmp );
1456            tr_bitfieldDestruct( &tmp );
1457            updatePeerProgress( msgs );
1458            break;
1459        }
1460
1461        case BT_REQUEST:
1462        {
1463            struct peer_request r;
1464            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1465            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1466            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1467            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1468            peerMadeRequest( msgs, &r );
1469            break;
1470        }
1471
1472        case BT_CANCEL:
1473        {
1474            int i;
1475            struct peer_request r;
1476            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1477            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1478            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1479            tr_historyAdd( &msgs->peer->cancelsSentToClient, tr_time( ), 1 );
1480            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1481
1482            for( i=0; i<msgs->peer->pendingReqsToClient; ++i ) {
1483                const struct peer_request * req = msgs->peerAskedFor + i;
1484                if( ( req->index == r.index ) && ( req->offset == r.offset ) && ( req->length == r.length ) )
1485                    break;
1486            }
1487
1488            if( i < msgs->peer->pendingReqsToClient )
1489                tr_removeElementFromArray( msgs->peerAskedFor, i, sizeof( struct peer_request ),
1490                                           msgs->peer->pendingReqsToClient-- );
1491            break;
1492        }
1493
1494        case BT_PIECE:
1495            assert( 0 ); /* handled elsewhere! */
1496            break;
1497
1498        case BT_PORT:
1499            dbgmsg( msgs, "Got a BT_PORT" );
1500            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->dht_port );
1501            if( msgs->peer->dht_port > 0 )
1502                tr_dhtAddNode( getSession(msgs),
1503                               tr_peerAddress( msgs->peer ),
1504                               msgs->peer->dht_port, 0 );
1505            break;
1506
1507        case BT_FEXT_SUGGEST:
1508            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1509            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1510            if( fext )
1511                fireClientGotSuggest( msgs, ui32 );
1512            else {
1513                fireError( msgs, EMSGSIZE );
1514                return READ_ERR;
1515            }
1516            break;
1517
1518        case BT_FEXT_ALLOWED_FAST:
1519            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1520            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1521            if( fext )
1522                fireClientGotAllowedFast( msgs, ui32 );
1523            else {
1524                fireError( msgs, EMSGSIZE );
1525                return READ_ERR;
1526            }
1527            break;
1528
1529        case BT_FEXT_HAVE_ALL:
1530            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1531            if( fext ) {
1532                tr_bitsetSetHaveAll( &msgs->peer->have );
1533                fireClientGotHaveAll( msgs );
1534                updatePeerProgress( msgs );
1535            } else {
1536                fireError( msgs, EMSGSIZE );
1537                return READ_ERR;
1538            }
1539            break;
1540
1541        case BT_FEXT_HAVE_NONE:
1542            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1543            if( fext ) {
1544                tr_bitsetSetHaveNone( &msgs->peer->have );
1545                fireClientGotHaveNone( msgs );
1546                updatePeerProgress( msgs );
1547            } else {
1548                fireError( msgs, EMSGSIZE );
1549                return READ_ERR;
1550            }
1551            break;
1552
1553        case BT_FEXT_REJECT:
1554        {
1555            struct peer_request r;
1556            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1557            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1558            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1559            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1560            if( fext )
1561                fireGotRej( msgs, &r );
1562            else {
1563                fireError( msgs, EMSGSIZE );
1564                return READ_ERR;
1565            }
1566            break;
1567        }
1568
1569        case BT_LTEP:
1570            dbgmsg( msgs, "Got a BT_LTEP" );
1571            parseLtep( msgs, msglen, inbuf );
1572            break;
1573
1574        default:
1575            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1576            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1577            break;
1578    }
1579
1580    assert( msglen + 1 == msgs->incoming.length );
1581    assert( evbuffer_get_length( inbuf ) == startBufLen - msglen );
1582
1583    msgs->state = AWAITING_BT_LENGTH;
1584    return READ_NOW;
1585}
1586
1587static void
1588addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1589{
1590    if( !msgs->peer->blame )
1591         msgs->peer->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1592    tr_bitfieldAdd( msgs->peer->blame, index );
1593}
1594
1595/* returns 0 on success, or an errno on failure */
1596static int
1597clientGotBlock( tr_peermsgs *               msgs,
1598                struct evbuffer *           data,
1599                const struct peer_request * req )
1600{
1601    int err;
1602    tr_torrent * tor = msgs->torrent;
1603    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1604
1605    assert( msgs );
1606    assert( req );
1607
1608    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1609        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1610                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1611        return EMSGSIZE;
1612    }
1613
1614    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1615
1616    if( !tr_peerMgrDidPeerRequest( msgs->torrent, msgs->peer, block ) ) {
1617        dbgmsg( msgs, "we didn't ask for this message..." );
1618        return 0;
1619    }
1620    if( tr_cpPieceIsComplete( &msgs->torrent->completion, req->index ) ) {
1621        dbgmsg( msgs, "we did ask for this message, but the piece is already complete..." );
1622        return 0;
1623    }
1624
1625    /**
1626    ***  Save the block
1627    **/
1628
1629    if(( err = tr_cacheWriteBlock( getSession(msgs)->cache, tor, req->index, req->offset, req->length, data )))
1630        return err;
1631
1632    addPeerToBlamefield( msgs, req->index );
1633    fireGotBlock( msgs, req );
1634    return 0;
1635}
1636
1637static int peerPulse( void * vmsgs );
1638
1639static void
1640didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1641{
1642    tr_peermsgs * msgs = vmsgs;
1643    firePeerGotData( msgs, bytesWritten, wasPieceData );
1644
1645    if ( tr_isPeerIo( io ) && io->userData )
1646        peerPulse( msgs );
1647}
1648
1649static ReadState
1650canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1651{
1652    ReadState         ret;
1653    tr_peermsgs *     msgs = vmsgs;
1654    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1655    const size_t      inlen = evbuffer_get_length( in );
1656
1657    dbgmsg( msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state );
1658
1659    if( !inlen )
1660    {
1661        ret = READ_LATER;
1662    }
1663    else if( msgs->state == AWAITING_BT_PIECE )
1664    {
1665        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1666    }
1667    else switch( msgs->state )
1668    {
1669        case AWAITING_BT_LENGTH:
1670            ret = readBtLength ( msgs, in, inlen ); break;
1671
1672        case AWAITING_BT_ID:
1673            ret = readBtId     ( msgs, in, inlen ); break;
1674
1675        case AWAITING_BT_MESSAGE:
1676            ret = readBtMessage( msgs, in, inlen ); break;
1677
1678        default:
1679            ret = READ_ERR;
1680            assert( 0 );
1681    }
1682
1683    dbgmsg( msgs, "canRead: ret is %d", (int)ret );
1684
1685    /* log the raw data that was read */
1686    if( ( ret != READ_ERR ) && ( evbuffer_get_length( in ) != inlen ) )
1687        fireClientGotData( msgs, inlen - evbuffer_get_length( in ), FALSE );
1688
1689    return ret;
1690}
1691
1692int
1693tr_peerMsgsIsReadingBlock( const tr_peermsgs * msgs, tr_block_index_t block )
1694{
1695    if( msgs->state != AWAITING_BT_PIECE )
1696        return FALSE;
1697
1698    return block == _tr_block( msgs->torrent,
1699                               msgs->incoming.blockReq.index,
1700                               msgs->incoming.blockReq.offset );
1701}
1702
1703/**
1704***
1705**/
1706
1707static void
1708updateDesiredRequestCount( tr_peermsgs * msgs )
1709{
1710    const tr_torrent * const torrent = msgs->torrent;
1711
1712    if( tr_torrentIsSeed( msgs->torrent ) )
1713    {
1714        msgs->desiredRequestCount = 0;
1715    }
1716    else if( msgs->peer->clientIsChoked )
1717    {
1718        msgs->desiredRequestCount = 0;
1719    }
1720    else if( !msgs->peer->clientIsInterested )
1721    {
1722        msgs->desiredRequestCount = 0;
1723    }
1724    else
1725    {
1726        int estimatedBlocksInPeriod;
1727        int rate_Bps;
1728        int irate_Bps;
1729        const int floor = 4;
1730        const int seconds = REQUEST_BUF_SECS;
1731        const uint64_t now = tr_time_msec( );
1732
1733        /* Get the rate limit we should use.
1734         * FIXME: this needs to consider all the other peers as well... */
1735        rate_Bps = tr_peerGetPieceSpeed_Bps( msgs->peer, now, TR_PEER_TO_CLIENT );
1736        if( tr_torrentUsesSpeedLimit( torrent, TR_PEER_TO_CLIENT ) )
1737            rate_Bps = MIN( rate_Bps, tr_torrentGetSpeedLimit_Bps( torrent, TR_PEER_TO_CLIENT ) );
1738
1739        /* honor the session limits, if enabled */
1740        if( tr_torrentUsesSessionLimits( torrent ) )
1741            if( tr_sessionGetActiveSpeedLimit_Bps( torrent->session, TR_PEER_TO_CLIENT, &irate_Bps ) )
1742                rate_Bps = MIN( rate_Bps, irate_Bps );
1743
1744        /* use this desired rate to figure out how
1745         * many requests we should send to this peer */
1746        estimatedBlocksInPeriod = ( rate_Bps * seconds ) / torrent->blockSize;
1747        msgs->desiredRequestCount = MAX( floor, estimatedBlocksInPeriod );
1748
1749        /* honor the peer's maximum request count, if specified */
1750        if( msgs->reqq > 0 )
1751            if( msgs->desiredRequestCount > msgs->reqq )
1752                msgs->desiredRequestCount = msgs->reqq;
1753    }
1754}
1755
1756static void
1757updateMetadataRequests( tr_peermsgs * msgs, time_t now )
1758{
1759    int piece;
1760
1761    if( msgs->peerSupportsMetadataXfer
1762        && tr_torrentGetNextMetadataRequest( msgs->torrent, now, &piece ) )
1763    {
1764        tr_benc tmp;
1765        int payloadLen;
1766        char * payload;
1767        struct evbuffer * out = msgs->outMessages;
1768
1769        /* build the data message */
1770        tr_bencInitDict( &tmp, 3 );
1771        tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REQUEST );
1772        tr_bencDictAddInt( &tmp, "piece", piece );
1773        payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1774        tr_bencFree( &tmp );
1775
1776        dbgmsg( msgs, "requesting metadata piece #%d", piece );
1777
1778        /* write it out as a LTEP message to our outMessages buffer */
1779        evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen );
1780        evbuffer_add_uint8 ( out, BT_LTEP );
1781        evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1782        evbuffer_add       ( out, payload, payloadLen );
1783        pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1784        dbgOutMessageLen( msgs );
1785
1786        tr_free( payload );
1787    }
1788}
1789
1790static void
1791updateBlockRequests( tr_peermsgs * msgs )
1792{
1793    if( tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT )
1794        && ( msgs->desiredRequestCount > 0 )
1795        && ( msgs->peer->pendingReqsToPeer <= ( msgs->desiredRequestCount * 0.66 ) ) )
1796    {
1797        int i;
1798        int n;
1799        const int numwant = msgs->desiredRequestCount - msgs->peer->pendingReqsToPeer;
1800        tr_block_index_t * blocks = tr_new( tr_block_index_t, numwant );
1801
1802        tr_peerMgrGetNextRequests( msgs->torrent, msgs->peer, numwant, blocks, &n );
1803
1804        for( i=0; i<n; ++i )
1805        {
1806            struct peer_request req;
1807            blockToReq( msgs->torrent, blocks[i], &req );
1808            protocolSendRequest( msgs, &req );
1809        }
1810
1811        tr_free( blocks );
1812    }
1813}
1814
1815static size_t
1816fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1817{
1818    int piece;
1819    size_t bytesWritten = 0;
1820    struct peer_request req;
1821    const tr_bool haveMessages = evbuffer_get_length( msgs->outMessages ) != 0;
1822    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1823
1824    /**
1825    ***  Protocol messages
1826    **/
1827
1828    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1829    {
1830        dbgmsg( msgs, "started an outMessages batch (length is %zu)", evbuffer_get_length( msgs->outMessages ) );
1831        msgs->outMessagesBatchedAt = now;
1832    }
1833    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1834    {
1835        const size_t len = evbuffer_get_length( msgs->outMessages );
1836        /* flush the protocol messages */
1837        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1838        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, FALSE );
1839        msgs->clientSentAnythingAt = now;
1840        msgs->outMessagesBatchedAt = 0;
1841        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1842        bytesWritten +=  len;
1843    }
1844
1845    /**
1846    ***  Metadata Pieces
1847    **/
1848
1849    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= METADATA_PIECE_SIZE )
1850        && popNextMetadataRequest( msgs, &piece ) )
1851    {
1852        char * data;
1853        int dataLen;
1854        tr_bool ok = FALSE;
1855
1856        data = tr_torrentGetMetadataPiece( msgs->torrent, piece, &dataLen );
1857        if( ( dataLen > 0 ) && ( data != NULL ) )
1858        {
1859            tr_benc tmp;
1860            int payloadLen;
1861            char * payload;
1862            struct evbuffer * out = msgs->outMessages;
1863
1864            /* build the data message */
1865            tr_bencInitDict( &tmp, 3 );
1866            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_DATA );
1867            tr_bencDictAddInt( &tmp, "piece", piece );
1868            tr_bencDictAddInt( &tmp, "total_size", msgs->torrent->infoDictLength );
1869            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1870            tr_bencFree( &tmp );
1871
1872            /* write it out as a LTEP message to our outMessages buffer */
1873            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen + dataLen );
1874            evbuffer_add_uint8 ( out, BT_LTEP );
1875            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1876            evbuffer_add       ( out, payload, payloadLen );
1877            evbuffer_add       ( out, data, dataLen );
1878            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1879            dbgOutMessageLen( msgs );
1880
1881            tr_free( payload );
1882            tr_free( data );
1883
1884            ok = TRUE;
1885        }
1886
1887        if( !ok ) /* send a rejection message */
1888        {
1889            tr_benc tmp;
1890            int payloadLen;
1891            char * payload;
1892            struct evbuffer * out = msgs->outMessages;
1893
1894            /* build the rejection message */
1895            tr_bencInitDict( &tmp, 2 );
1896            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1897            tr_bencDictAddInt( &tmp, "piece", piece );
1898            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1899            tr_bencFree( &tmp );
1900
1901            /* write it out as a LTEP message to our outMessages buffer */
1902            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen );
1903            evbuffer_add_uint8 ( out, BT_LTEP );
1904            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1905            evbuffer_add       ( out, payload, payloadLen );
1906            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1907            dbgOutMessageLen( msgs );
1908
1909            tr_free( payload );
1910        }
1911    }
1912
1913    /**
1914    ***  Data Blocks
1915    **/
1916
1917    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= msgs->torrent->blockSize )
1918        && popNextRequest( msgs, &req ) )
1919    {
1920        --msgs->prefetchCount;
1921
1922        if( requestIsValid( msgs, &req )
1923            && tr_cpPieceIsComplete( &msgs->torrent->completion, req.index ) )
1924        {
1925            int err;
1926            const uint32_t msglen = 4 + 1 + 4 + 4 + req.length;
1927            struct evbuffer * out;
1928            struct evbuffer_iovec iovec[1];
1929
1930            out = evbuffer_new( );
1931            evbuffer_expand( out, msglen );
1932
1933            evbuffer_add_uint32( out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1934            evbuffer_add_uint8 ( out, BT_PIECE );
1935            evbuffer_add_uint32( out, req.index );
1936            evbuffer_add_uint32( out, req.offset );
1937
1938            evbuffer_reserve_space( out, req.length, iovec, 1 );
1939            err = tr_cacheReadBlock( getSession(msgs)->cache, msgs->torrent, req.index, req.offset, req.length, iovec[0].iov_base );
1940            iovec[0].iov_len = req.length;
1941            evbuffer_commit_space( out, iovec, 1 );
1942
1943            /* check the piece if it needs checking... */
1944            if( !err && tr_torrentPieceNeedsCheck( msgs->torrent, req.index ) )
1945                if(( err = !tr_torrentCheckPiece( msgs->torrent, req.index )))
1946                    tr_torrentSetLocalError( msgs->torrent, _( "Please Verify Local Data! Piece #%zu is corrupt." ), (size_t)req.index );
1947
1948            if( err )
1949            {
1950                if( fext )
1951                    protocolSendReject( msgs, &req );
1952            }
1953            else
1954            {
1955                const size_t n = evbuffer_get_length( out );
1956                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1957                assert( n == msglen );
1958                tr_peerIoWriteBuf( msgs->peer->io, out, TRUE );
1959                bytesWritten += n;
1960                msgs->clientSentAnythingAt = now;
1961                tr_historyAdd( &msgs->peer->blocksSentToPeer, tr_time( ), 1 );
1962            }
1963
1964            evbuffer_free( out );
1965
1966            if( err )
1967            {
1968                bytesWritten = 0;
1969                msgs = NULL;
1970            }
1971        }
1972        else if( fext ) /* peer needs a reject message */
1973        {
1974            protocolSendReject( msgs, &req );
1975        }
1976
1977        if( msgs != NULL )
1978            prefetchPieces( msgs );
1979    }
1980
1981    /**
1982    ***  Keepalive
1983    **/
1984
1985    if( ( msgs != NULL )
1986        && ( msgs->clientSentAnythingAt != 0 )
1987        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1988    {
1989        dbgmsg( msgs, "sending a keepalive message" );
1990        evbuffer_add_uint32( msgs->outMessages, 0 );
1991        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1992    }
1993
1994    return bytesWritten;
1995}
1996
1997static int
1998peerPulse( void * vmsgs )
1999{
2000    tr_peermsgs * msgs = vmsgs;
2001    const time_t  now = tr_time( );
2002
2003    if ( tr_isPeerIo( msgs->peer->io ) ) {
2004        updateDesiredRequestCount( msgs );
2005        updateBlockRequests( msgs );
2006        updateMetadataRequests( msgs, now );
2007    }
2008
2009    for( ;; )
2010        if( fillOutputBuffer( msgs, now ) < 1 )
2011            break;
2012
2013    return TRUE; /* loop forever */
2014}
2015
2016void
2017tr_peerMsgsPulse( tr_peermsgs * msgs )
2018{
2019    if( msgs != NULL )
2020        peerPulse( msgs );
2021}
2022
2023static void
2024gotError( tr_peerIo  * io UNUSED,
2025          short        what,
2026          void       * vmsgs )
2027{
2028    if( what & BEV_EVENT_TIMEOUT )
2029        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
2030    if( what & ( BEV_EVENT_EOF | BEV_EVENT_ERROR ) )
2031        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
2032               what, errno, tr_strerror( errno ) );
2033    fireError( vmsgs, ENOTCONN );
2034}
2035
2036static void
2037sendBitfield( tr_peermsgs * msgs )
2038{
2039    struct evbuffer * out = msgs->outMessages;
2040    tr_bitfield * bf = tr_cpCreatePieceBitfield( &msgs->torrent->completion );
2041
2042    evbuffer_add_uint32( out, sizeof( uint8_t ) + bf->byteCount );
2043    evbuffer_add_uint8 ( out, BT_BITFIELD );
2044    evbuffer_add       ( out, bf->bits, bf->byteCount );
2045    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu", evbuffer_get_length( out ) );
2046    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
2047
2048    tr_bitfieldFree( bf );
2049}
2050
2051static void
2052tellPeerWhatWeHave( tr_peermsgs * msgs )
2053{
2054    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
2055
2056    if( fext && ( tr_cpBlockBitset( &msgs->torrent->completion )->haveAll ) )
2057    {
2058        protocolSendHaveAll( msgs );
2059    }
2060    else if( fext && ( tr_cpBlockBitset( &msgs->torrent->completion )->haveNone ) )
2061    {
2062        protocolSendHaveNone( msgs );
2063    }
2064    else
2065    {
2066        sendBitfield( msgs );
2067    }
2068}
2069
2070/**
2071***
2072**/
2073
2074/* some peers give us error messages if we send
2075   more than this many peers in a single pex message
2076   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
2077#define MAX_PEX_ADDED 50
2078#define MAX_PEX_DROPPED 50
2079
2080typedef struct
2081{
2082    tr_pex *  added;
2083    tr_pex *  dropped;
2084    tr_pex *  elements;
2085    int       addedCount;
2086    int       droppedCount;
2087    int       elementCount;
2088}
2089PexDiffs;
2090
2091static void
2092pexAddedCb( void * vpex,
2093            void * userData )
2094{
2095    PexDiffs * diffs = userData;
2096    tr_pex *   pex = vpex;
2097
2098    if( diffs->addedCount < MAX_PEX_ADDED )
2099    {
2100        diffs->added[diffs->addedCount++] = *pex;
2101        diffs->elements[diffs->elementCount++] = *pex;
2102    }
2103}
2104
2105static inline void
2106pexDroppedCb( void * vpex,
2107              void * userData )
2108{
2109    PexDiffs * diffs = userData;
2110    tr_pex *   pex = vpex;
2111
2112    if( diffs->droppedCount < MAX_PEX_DROPPED )
2113    {
2114        diffs->dropped[diffs->droppedCount++] = *pex;
2115    }
2116}
2117
2118static inline void
2119pexElementCb( void * vpex,
2120              void * userData )
2121{
2122    PexDiffs * diffs = userData;
2123    tr_pex * pex = vpex;
2124
2125    diffs->elements[diffs->elementCount++] = *pex;
2126}
2127
2128static void
2129sendPex( tr_peermsgs * msgs )
2130{
2131    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2132    {
2133        PexDiffs diffs;
2134        PexDiffs diffs6;
2135        tr_pex * newPex = NULL;
2136        tr_pex * newPex6 = NULL;
2137        const int newCount = tr_peerMgrGetPeers( msgs->torrent, &newPex, TR_AF_INET, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2138        const int newCount6 = tr_peerMgrGetPeers( msgs->torrent, &newPex6, TR_AF_INET6, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2139
2140        /* build the diffs */
2141        diffs.added = tr_new( tr_pex, newCount );
2142        diffs.addedCount = 0;
2143        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2144        diffs.droppedCount = 0;
2145        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2146        diffs.elementCount = 0;
2147        tr_set_compare( msgs->pex, msgs->pexCount,
2148                        newPex, newCount,
2149                        tr_pexCompare, sizeof( tr_pex ),
2150                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2151        diffs6.added = tr_new( tr_pex, newCount6 );
2152        diffs6.addedCount = 0;
2153        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
2154        diffs6.droppedCount = 0;
2155        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
2156        diffs6.elementCount = 0;
2157        tr_set_compare( msgs->pex6, msgs->pexCount6,
2158                        newPex6, newCount6,
2159                        tr_pexCompare, sizeof( tr_pex ),
2160                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
2161        dbgmsg(
2162            msgs,
2163            "pex: old peer count %d+%d, new peer count %d+%d, "
2164            "added %d+%d, removed %d+%d",
2165            msgs->pexCount, msgs->pexCount6, newCount, newCount6,
2166            diffs.addedCount, diffs6.addedCount,
2167            diffs.droppedCount, diffs6.droppedCount );
2168
2169        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2170            !diffs6.droppedCount )
2171        {
2172            tr_free( diffs.elements );
2173            tr_free( diffs6.elements );
2174        }
2175        else
2176        {
2177            int  i;
2178            tr_benc val;
2179            char * benc;
2180            int bencLen;
2181            uint8_t * tmp, *walk;
2182            struct evbuffer * out = msgs->outMessages;
2183
2184            /* update peer */
2185            tr_free( msgs->pex );
2186            msgs->pex = diffs.elements;
2187            msgs->pexCount = diffs.elementCount;
2188            tr_free( msgs->pex6 );
2189            msgs->pex6 = diffs6.elements;
2190            msgs->pexCount6 = diffs6.elementCount;
2191
2192            /* build the pex payload */
2193            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
2194                                         * speed vs. likelihood? */
2195
2196            if( diffs.addedCount > 0)
2197            {
2198                /* "added" */
2199                tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2200                for( i = 0; i < diffs.addedCount; ++i ) {
2201                    memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2202                    memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2203                }
2204                assert( ( walk - tmp ) == diffs.addedCount * 6 );
2205                tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2206                tr_free( tmp );
2207
2208                /* "added.f"
2209                 * unset each holepunch flag because we don't support it. */
2210                tmp = walk = tr_new( uint8_t, diffs.addedCount );
2211                for( i = 0; i < diffs.addedCount; ++i )
2212                    *walk++ = diffs.added[i].flags & ~ADDED_F_HOLEPUNCH;
2213                assert( ( walk - tmp ) == diffs.addedCount );
2214                tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2215                tr_free( tmp );
2216            }
2217
2218            if( diffs.droppedCount > 0 )
2219            {
2220                /* "dropped" */
2221                tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2222                for( i = 0; i < diffs.droppedCount; ++i ) {
2223                    memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2224                    memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2225                }
2226                assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2227                tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2228                tr_free( tmp );
2229            }
2230
2231            if( diffs6.addedCount > 0 )
2232            {
2233                /* "added6" */
2234                tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2235                for( i = 0; i < diffs6.addedCount; ++i ) {
2236                    memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2237                    walk += 16;
2238                    memcpy( walk, &diffs6.added[i].port, 2 );
2239                    walk += 2;
2240                }
2241                assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2242                tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2243                tr_free( tmp );
2244
2245                /* "added6.f"
2246                 * unset each holepunch flag because we don't support it. */
2247                tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2248                for( i = 0; i < diffs6.addedCount; ++i )
2249                    *walk++ = diffs6.added[i].flags & ~ADDED_F_HOLEPUNCH;
2250                assert( ( walk - tmp ) == diffs6.addedCount );
2251                tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2252                tr_free( tmp );
2253            }
2254
2255            if( diffs6.droppedCount > 0 )
2256            {
2257                /* "dropped6" */
2258                tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2259                for( i = 0; i < diffs6.droppedCount; ++i ) {
2260                    memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2261                    walk += 16;
2262                    memcpy( walk, &diffs6.dropped[i].port, 2 );
2263                    walk += 2;
2264                }
2265                assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2266                tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2267                tr_free( tmp );
2268            }
2269
2270            /* write the pex message */
2271            benc = tr_bencToStr( &val, TR_FMT_BENC, &bencLen );
2272            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + bencLen );
2273            evbuffer_add_uint8 ( out, BT_LTEP );
2274            evbuffer_add_uint8 ( out, msgs->ut_pex_id );
2275            evbuffer_add       ( out, benc, bencLen );
2276            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2277            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", evbuffer_get_length( out ) );
2278            dbgOutMessageLen( msgs );
2279
2280            tr_free( benc );
2281            tr_bencFree( &val );
2282        }
2283
2284        /* cleanup */
2285        tr_free( diffs.added );
2286        tr_free( diffs.dropped );
2287        tr_free( newPex );
2288        tr_free( diffs6.added );
2289        tr_free( diffs6.dropped );
2290        tr_free( newPex6 );
2291
2292        /*msgs->clientSentPexAt = tr_time( );*/
2293    }
2294}
2295
2296static void
2297pexPulse( int foo UNUSED, short bar UNUSED, void * vmsgs )
2298{
2299    struct tr_peermsgs * msgs = vmsgs;
2300
2301    sendPex( msgs );
2302
2303    tr_timerAdd( msgs->pexTimer, PEX_INTERVAL_SECS, 0 );
2304}
2305
2306/**
2307***
2308**/
2309
2310tr_peermsgs*
2311tr_peerMsgsNew( struct tr_torrent    * torrent,
2312                struct tr_peer       * peer,
2313                tr_peer_callback     * callback,
2314                void                 * callbackData )
2315{
2316    tr_peermsgs * m;
2317
2318    assert( peer );
2319    assert( peer->io );
2320
2321    m = tr_new0( tr_peermsgs, 1 );
2322    m->callback = callback;
2323    m->callbackData = callbackData;
2324    m->peer = peer;
2325    m->torrent = torrent;
2326    m->peer->clientIsChoked = 1;
2327    m->peer->peerIsChoked = 1;
2328    m->peer->clientIsInterested = 0;
2329    m->peer->peerIsInterested = 0;
2330    m->state = AWAITING_BT_LENGTH;
2331    m->outMessages = evbuffer_new( );
2332    m->outMessagesBatchedAt = 0;
2333    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2334    m->incoming.block = evbuffer_new( );
2335    m->pexTimer = evtimer_new( torrent->session->event_base, pexPulse, m );
2336    peer->msgs = m;
2337    tr_timerAdd( m->pexTimer, PEX_INTERVAL_SECS, 0 );
2338
2339    if( tr_peerIoSupportsUTP( peer->io ) ) {
2340        const tr_address * addr = tr_peerIoGetAddress( peer->io, NULL );
2341        tr_peerMgrSetUtpSupported( torrent, addr );
2342        tr_peerMgrSetUtpFailed( torrent, addr, FALSE );
2343    }
2344
2345    if( tr_peerIoSupportsLTEP( peer->io ) )
2346        sendLtepHandshake( m );
2347
2348    tellPeerWhatWeHave( m );
2349
2350    if( tr_dhtEnabled( torrent->session ) && tr_peerIoSupportsDHT( peer->io ))
2351    {
2352        /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
2353        const struct tr_address *addr = tr_peerIoGetAddress( peer->io, NULL );
2354        if( addr->type == TR_AF_INET || tr_globalIPv6() ) {
2355            protocolSendPort( m, tr_dhtPort( torrent->session ) );
2356        }
2357    }
2358
2359    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2360    updateDesiredRequestCount( m );
2361
2362    return m;
2363}
2364
2365void
2366tr_peerMsgsFree( tr_peermsgs* msgs )
2367{
2368    if( msgs )
2369    {
2370        event_free( msgs->pexTimer );
2371
2372        evbuffer_free( msgs->incoming.block );
2373        evbuffer_free( msgs->outMessages );
2374        tr_free( msgs->pex6 );
2375        tr_free( msgs->pex );
2376
2377        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2378        tr_free( msgs );
2379    }
2380}
Note: See TracBrowser for help on using the repository browser.