source: trunk/libtransmission/peer-msgs.c @ 12022

Last change on this file since 12022 was 12022, checked in by jordan, 11 years ago

(trunk libT) #4035 "In seed state, transmission disconnect from leechers" -- fixed.

  • Property svn:keywords set to Date Rev Author Id
File size: 70.1 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 12022 2011-02-24 14:35:45Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdarg.h>
17#include <stdio.h>
18#include <stdlib.h>
19#include <string.h>
20
21#include <event2/bufferevent.h>
22#include <event2/event.h>
23
24#include "transmission.h"
25#include "bencode.h"
26#include "cache.h"
27#include "completion.h"
28#include "crypto.h"
29#ifdef WIN32
30#include "net.h" /* for ECONN */
31#endif
32#include "peer-io.h"
33#include "peer-mgr.h"
34#include "peer-msgs.h"
35#include "session.h"
36#include "stats.h"
37#include "torrent.h"
38#include "torrent-magnet.h"
39#include "tr-dht.h"
40#include "utils.h"
41#include "version.h"
42
43/**
44***
45**/
46
47enum
48{
49    BT_CHOKE                = 0,
50    BT_UNCHOKE              = 1,
51    BT_INTERESTED           = 2,
52    BT_NOT_INTERESTED       = 3,
53    BT_HAVE                 = 4,
54    BT_BITFIELD             = 5,
55    BT_REQUEST              = 6,
56    BT_PIECE                = 7,
57    BT_CANCEL               = 8,
58    BT_PORT                 = 9,
59
60    BT_FEXT_SUGGEST         = 13,
61    BT_FEXT_HAVE_ALL        = 14,
62    BT_FEXT_HAVE_NONE       = 15,
63    BT_FEXT_REJECT          = 16,
64    BT_FEXT_ALLOWED_FAST    = 17,
65
66    BT_LTEP                 = 20,
67
68    LTEP_HANDSHAKE          = 0,
69
70    UT_PEX_ID               = 1,
71    UT_METADATA_ID          = 3,
72
73    MAX_PEX_PEER_COUNT      = 50,
74
75    MIN_CHOKE_PERIOD_SEC    = 10,
76
77    /* idle seconds before we send a keepalive */
78    KEEPALIVE_INTERVAL_SECS = 100,
79
80    PEX_INTERVAL_SECS       = 90, /* sec between sendPex() calls */
81
82    REQQ                    = 512,
83
84    METADATA_REQQ           = 64,
85
86    /* used in lowering the outMessages queue period */
87    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
88    HIGH_PRIORITY_INTERVAL_SECS = 2,
89    LOW_PRIORITY_INTERVAL_SECS = 10,
90
91    /* number of pieces we'll allow in our fast set */
92    MAX_FAST_SET_SIZE = 3,
93
94    /* defined in BEP #9 */
95    METADATA_MSG_TYPE_REQUEST = 0,
96    METADATA_MSG_TYPE_DATA = 1,
97    METADATA_MSG_TYPE_REJECT = 2
98};
99
100enum
101{
102    AWAITING_BT_LENGTH,
103    AWAITING_BT_ID,
104    AWAITING_BT_MESSAGE,
105    AWAITING_BT_PIECE
106};
107
108/**
109***
110**/
111
112struct peer_request
113{
114    uint32_t    index;
115    uint32_t    offset;
116    uint32_t    length;
117};
118
119static void
120blockToReq( const tr_torrent     * tor,
121            tr_block_index_t       block,
122            struct peer_request  * setme )
123{
124    tr_torrentGetBlockLocation( tor, block, &setme->index,
125                                            &setme->offset,
126                                            &setme->length );
127}
128
129/**
130***
131**/
132
133/* this is raw, unchanged data from the peer regarding
134 * the current message that it's sending us. */
135struct tr_incoming
136{
137    uint8_t                id;
138    uint32_t               length; /* includes the +1 for id length */
139    struct peer_request    blockReq; /* metadata for incoming blocks */
140    struct evbuffer *      block; /* piece data for incoming blocks */
141};
142
143/**
144 * Low-level communication state information about a connected peer.
145 *
146 * This structure remembers the low-level protocol states that we're
147 * in with this peer, such as active requests, pex messages, and so on.
148 * Its fields are all private to peer-msgs.c.
149 *
150 * Data not directly involved with sending & receiving messages is
151 * stored in tr_peer, where it can be accessed by both peermsgs and
152 * the peer manager.
153 *
154 * @see struct peer_atom
155 * @see tr_peer
156 */
157struct tr_peermsgs
158{
159    tr_bool         peerSupportsPex;
160    tr_bool         peerSupportsMetadataXfer;
161    tr_bool         clientSentLtepHandshake;
162    tr_bool         peerSentLtepHandshake;
163
164    /*tr_bool         haveFastSet;*/
165
166    int             desiredRequestCount;
167
168    int             prefetchCount;
169
170    /* how long the outMessages batch should be allowed to grow before
171     * it's flushed -- some messages (like requests >:) should be sent
172     * very quickly; others aren't as urgent. */
173    int8_t          outMessagesBatchPeriod;
174
175    uint8_t         state;
176    uint8_t         ut_pex_id;
177    uint8_t         ut_metadata_id;
178    uint16_t        pexCount;
179    uint16_t        pexCount6;
180
181#if 0
182    size_t                 fastsetSize;
183    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
184#endif
185
186    tr_peer *              peer;
187
188    tr_torrent *           torrent;
189
190    tr_peer_callback      * callback;
191    void                  * callbackData;
192
193    struct evbuffer *      outMessages; /* all the non-piece messages */
194
195    struct peer_request    peerAskedFor[REQQ];
196
197    int                    peerAskedForMetadata[METADATA_REQQ];
198    int                    peerAskedForMetadataCount;
199
200    tr_pex               * pex;
201    tr_pex               * pex6;
202
203    /*time_t                 clientSentPexAt;*/
204    time_t                 clientSentAnythingAt;
205
206    /* when we started batching the outMessages */
207    time_t                outMessagesBatchedAt;
208
209    struct tr_incoming    incoming;
210
211    /* if the peer supports the Extension Protocol in BEP 10 and
212       supplied a reqq argument, it's stored here. Otherwise, the
213       value is zero and should be ignored. */
214    int64_t               reqq;
215
216    struct event        * pexTimer;
217};
218
219/**
220***
221**/
222
223#if 0
224static tr_bitfield*
225getHave( const struct tr_peermsgs * msgs )
226{
227    if( msgs->peer->have == NULL )
228        msgs->peer->have = tr_bitfieldNew( msgs->torrent->info.pieceCount );
229    return msgs->peer->have;
230}
231#endif
232
233static inline tr_session*
234getSession( struct tr_peermsgs * msgs )
235{
236    return msgs->torrent->session;
237}
238
239/**
240***
241**/
242
243static void
244myDebug( const char * file, int line,
245         const struct tr_peermsgs * msgs,
246         const char * fmt, ... )
247{
248    FILE * fp = tr_getLog( );
249
250    if( fp )
251    {
252        va_list           args;
253        char              timestr[64];
254        struct evbuffer * buf = evbuffer_new( );
255        char *            base = tr_basename( file );
256
257        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
258                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
259                             tr_torrentName( msgs->torrent ),
260                             tr_peerIoGetAddrStr( msgs->peer->io ),
261                             msgs->peer->client );
262        va_start( args, fmt );
263        evbuffer_add_vprintf( buf, fmt, args );
264        va_end( args );
265        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
266        fputs( (const char*)evbuffer_pullup( buf, -1 ), fp );
267
268        tr_free( base );
269        evbuffer_free( buf );
270    }
271}
272
273#define dbgmsg( msgs, ... ) \
274    do { \
275        if( tr_deepLoggingIsActive( ) ) \
276            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
277    } while( 0 )
278
279/**
280***
281**/
282
283static void
284pokeBatchPeriod( tr_peermsgs * msgs,
285                 int           interval )
286{
287    if( msgs->outMessagesBatchPeriod > interval )
288    {
289        msgs->outMessagesBatchPeriod = interval;
290        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
291    }
292}
293
294static void
295dbgOutMessageLen( tr_peermsgs * msgs )
296{
297    dbgmsg( msgs, "outMessage size is now %zu", evbuffer_get_length( msgs->outMessages ) );
298}
299
300static void
301protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
302{
303    struct evbuffer * out = msgs->outMessages;
304
305    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
306
307    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
308    evbuffer_add_uint8 ( out, BT_FEXT_REJECT );
309    evbuffer_add_uint32( out, req->index );
310    evbuffer_add_uint32( out, req->offset );
311    evbuffer_add_uint32( out, req->length );
312
313    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
314    dbgOutMessageLen( msgs );
315}
316
317static void
318protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
319{
320    struct evbuffer * out = msgs->outMessages;
321
322    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
323    evbuffer_add_uint8 ( out, BT_REQUEST );
324    evbuffer_add_uint32( out, req->index );
325    evbuffer_add_uint32( out, req->offset );
326    evbuffer_add_uint32( out, req->length );
327
328    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
329    dbgOutMessageLen( msgs );
330    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
331}
332
333static void
334protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
335{
336    struct evbuffer * out = msgs->outMessages;
337
338    evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
339    evbuffer_add_uint8 ( out, BT_CANCEL );
340    evbuffer_add_uint32( out, req->index );
341    evbuffer_add_uint32( out, req->offset );
342    evbuffer_add_uint32( out, req->length );
343
344    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
345    dbgOutMessageLen( msgs );
346    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
347}
348
349static void
350protocolSendPort(tr_peermsgs *msgs, uint16_t port)
351{
352    struct evbuffer * out = msgs->outMessages;
353
354    dbgmsg( msgs, "sending Port %u", port);
355    evbuffer_add_uint32( out, 3 );
356    evbuffer_add_uint8 ( out, BT_PORT );
357    evbuffer_add_uint16( out, port);
358}
359
360static void
361protocolSendHave( tr_peermsgs * msgs, uint32_t index )
362{
363    struct evbuffer * out = msgs->outMessages;
364
365    evbuffer_add_uint32( out, sizeof(uint8_t) + sizeof(uint32_t) );
366    evbuffer_add_uint8 ( out, BT_HAVE );
367    evbuffer_add_uint32( out, index );
368
369    dbgmsg( msgs, "sending Have %u", index );
370    dbgOutMessageLen( msgs );
371    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
372}
373
374#if 0
375static void
376protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
377{
378    tr_peerIo       * io  = msgs->peer->io;
379    struct evbuffer * out = msgs->outMessages;
380
381    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
382
383    evbuffer_add_uint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
384    evbuffer_add_uint8 ( io, out, BT_FEXT_ALLOWED_FAST );
385    evbuffer_add_uint32( io, out, pieceIndex );
386
387    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
388    dbgOutMessageLen( msgs );
389}
390#endif
391
392static void
393protocolSendChoke( tr_peermsgs * msgs, int choke )
394{
395    struct evbuffer * out = msgs->outMessages;
396
397    evbuffer_add_uint32( out, sizeof( uint8_t ) );
398    evbuffer_add_uint8 ( out, choke ? BT_CHOKE : BT_UNCHOKE );
399
400    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
401    dbgOutMessageLen( msgs );
402    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
403}
404
405static void
406protocolSendHaveAll( tr_peermsgs * msgs )
407{
408    struct evbuffer * out = msgs->outMessages;
409
410    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
411
412    evbuffer_add_uint32( out, sizeof( uint8_t ) );
413    evbuffer_add_uint8 ( out, BT_FEXT_HAVE_ALL );
414
415    dbgmsg( msgs, "sending HAVE_ALL..." );
416    dbgOutMessageLen( msgs );
417    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
418}
419
420static void
421protocolSendHaveNone( tr_peermsgs * msgs )
422{
423    struct evbuffer * out = msgs->outMessages;
424
425    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
426
427    evbuffer_add_uint32( out, sizeof( uint8_t ) );
428    evbuffer_add_uint8 ( out, BT_FEXT_HAVE_NONE );
429
430    dbgmsg( msgs, "sending HAVE_NONE..." );
431    dbgOutMessageLen( msgs );
432    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
433}
434
435/**
436***  EVENTS
437**/
438
439static void
440publish( tr_peermsgs * msgs, tr_peer_event * e )
441{
442    assert( msgs->peer );
443    assert( msgs->peer->msgs == msgs );
444
445    if( msgs->callback != NULL )
446        msgs->callback( msgs->peer, e, msgs->callbackData );
447}
448
449static void
450fireError( tr_peermsgs * msgs, int err )
451{
452    tr_peer_event e = TR_PEER_EVENT_INIT;
453    e.eventType = TR_PEER_ERROR;
454    e.err = err;
455    publish( msgs, &e );
456}
457
458static void
459fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
460{
461    tr_peer_event e = TR_PEER_EVENT_INIT;
462    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
463    e.pieceIndex = req->index;
464    e.offset = req->offset;
465    e.length = req->length;
466    publish( msgs, &e );
467}
468
469static void
470fireGotRej( tr_peermsgs * msgs, const struct peer_request * req )
471{
472    tr_peer_event e = TR_PEER_EVENT_INIT;
473    e.eventType = TR_PEER_CLIENT_GOT_REJ;
474    e.pieceIndex = req->index;
475    e.offset = req->offset;
476    e.length = req->length;
477    publish( msgs, &e );
478}
479
480static void
481fireGotChoke( tr_peermsgs * msgs )
482{
483    tr_peer_event e = TR_PEER_EVENT_INIT;
484    e.eventType = TR_PEER_CLIENT_GOT_CHOKE;
485    publish( msgs, &e );
486}
487
488static void
489fireClientGotHaveAll( tr_peermsgs * msgs )
490{
491    tr_peer_event e = TR_PEER_EVENT_INIT;
492    e.eventType = TR_PEER_CLIENT_GOT_HAVE_ALL;
493    publish( msgs, &e );
494}
495
496static void
497fireClientGotHaveNone( tr_peermsgs * msgs )
498{
499    tr_peer_event e = TR_PEER_EVENT_INIT;
500    e.eventType = TR_PEER_CLIENT_GOT_HAVE_NONE;
501    publish( msgs, &e );
502}
503
504static void
505fireClientGotData( tr_peermsgs * msgs,
506                   uint32_t      length,
507                   int           wasPieceData )
508{
509    tr_peer_event e = TR_PEER_EVENT_INIT;
510
511    e.length = length;
512    e.eventType = TR_PEER_CLIENT_GOT_DATA;
513    e.wasPieceData = wasPieceData;
514    publish( msgs, &e );
515}
516
517static void
518fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
519{
520    tr_peer_event e = TR_PEER_EVENT_INIT;
521    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
522    e.pieceIndex = pieceIndex;
523    publish( msgs, &e );
524}
525
526static void
527fireClientGotPort( tr_peermsgs * msgs, tr_port port )
528{
529    tr_peer_event e = TR_PEER_EVENT_INIT;
530    e.eventType = TR_PEER_CLIENT_GOT_PORT;
531    e.port = port;
532    publish( msgs, &e );
533}
534
535static void
536fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
537{
538    tr_peer_event e = TR_PEER_EVENT_INIT;
539    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
540    e.pieceIndex = pieceIndex;
541    publish( msgs, &e );
542}
543
544static void
545fireClientGotBitfield( tr_peermsgs * msgs, tr_bitfield * bitfield )
546{
547    tr_peer_event e = TR_PEER_EVENT_INIT;
548    e.eventType = TR_PEER_CLIENT_GOT_BITFIELD;
549    e.bitfield = bitfield;
550    publish( msgs, &e );
551}
552
553static void
554fireClientGotHave( tr_peermsgs * msgs, tr_piece_index_t index )
555{
556    tr_peer_event e = TR_PEER_EVENT_INIT;
557    e.eventType = TR_PEER_CLIENT_GOT_HAVE;
558    e.pieceIndex = index;
559    publish( msgs, &e );
560}
561
562static void
563firePeerGotData( tr_peermsgs  * msgs,
564                 uint32_t       length,
565                 int            wasPieceData )
566{
567    tr_peer_event e = TR_PEER_EVENT_INIT;
568
569    e.length = length;
570    e.eventType = TR_PEER_PEER_GOT_DATA;
571    e.wasPieceData = wasPieceData;
572
573    publish( msgs, &e );
574}
575
576/**
577***  ALLOWED FAST SET
578***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
579**/
580
581size_t
582tr_generateAllowedSet( tr_piece_index_t * setmePieces,
583                       size_t             desiredSetSize,
584                       size_t             pieceCount,
585                       const uint8_t    * infohash,
586                       const tr_address * addr )
587{
588    size_t setSize = 0;
589
590    assert( setmePieces );
591    assert( desiredSetSize <= pieceCount );
592    assert( desiredSetSize );
593    assert( pieceCount );
594    assert( infohash );
595    assert( addr );
596
597    if( addr->type == TR_AF_INET )
598    {
599        uint8_t w[SHA_DIGEST_LENGTH + 4], *walk=w;
600        uint8_t x[SHA_DIGEST_LENGTH];
601
602        uint32_t ui32 = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
603        memcpy( w, &ui32, sizeof( uint32_t ) );
604        walk += sizeof( uint32_t );
605        memcpy( walk, infohash, SHA_DIGEST_LENGTH );                 /* (2) */
606        walk += SHA_DIGEST_LENGTH;
607        tr_sha1( x, w, walk-w, NULL );                               /* (3) */
608        assert( sizeof( w ) == walk-w );
609
610        while( setSize<desiredSetSize )
611        {
612            int i;
613            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
614            {
615                size_t k;
616                uint32_t j = i * 4;                                  /* (5) */
617                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
618                uint32_t index = y % pieceCount;                     /* (7) */
619
620                for( k=0; k<setSize; ++k )                           /* (8) */
621                    if( setmePieces[k] == index )
622                        break;
623
624                if( k == setSize )
625                    setmePieces[setSize++] = index;                  /* (9) */
626            }
627
628            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
629        }
630    }
631
632    return setSize;
633}
634
635static void
636updateFastSet( tr_peermsgs * msgs UNUSED )
637{
638#if 0
639    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
640    const int peerIsNeedy = msgs->peer->progress < 0.10;
641
642    if( fext && peerIsNeedy && !msgs->haveFastSet )
643    {
644        size_t i;
645        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
646        const tr_info * inf = &msgs->torrent->info;
647        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
648
649        /* build the fast set */
650        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
651        msgs->haveFastSet = 1;
652
653        /* send it to the peer */
654        for( i=0; i<msgs->fastsetSize; ++i )
655            protocolSendAllowedFast( msgs, msgs->fastset[i] );
656    }
657#endif
658}
659
660/**
661***  INTEREST
662**/
663
664static void
665sendInterest( tr_peermsgs * msgs, tr_bool clientIsInterested )
666{
667    struct evbuffer * out = msgs->outMessages;
668
669    assert( msgs );
670    assert( tr_isBool( clientIsInterested ) );
671
672    msgs->peer->clientIsInterested = clientIsInterested;
673    dbgmsg( msgs, "Sending %s", clientIsInterested ? "Interested" : "Not Interested" );
674    evbuffer_add_uint32( out, sizeof( uint8_t ) );
675    evbuffer_add_uint8 ( out, clientIsInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
676
677    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
678    dbgOutMessageLen( msgs );
679}
680
681static void
682updateInterest( tr_peermsgs * msgs UNUSED )
683{
684    /* FIXME -- might need to poke the mgr on startup */
685}
686
687void
688tr_peerMsgsSetInterested( tr_peermsgs * msgs, int isInterested )
689{
690    assert( tr_isBool( isInterested ) );
691
692    if( isInterested != msgs->peer->clientIsInterested )
693        sendInterest( msgs, isInterested );
694}
695
696static tr_bool
697popNextMetadataRequest( tr_peermsgs * msgs, int * piece )
698{
699    if( msgs->peerAskedForMetadataCount == 0 )
700        return FALSE;
701
702    *piece = msgs->peerAskedForMetadata[0];
703
704    tr_removeElementFromArray( msgs->peerAskedForMetadata, 0, sizeof( int ),
705                               msgs->peerAskedForMetadataCount-- );
706
707    return TRUE;
708}
709
710static tr_bool
711popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
712{
713    if( msgs->peer->pendingReqsToClient == 0 )
714        return FALSE;
715
716    *setme = msgs->peerAskedFor[0];
717
718    tr_removeElementFromArray( msgs->peerAskedFor, 0, sizeof( struct peer_request ),
719                               msgs->peer->pendingReqsToClient-- );
720
721    return TRUE;
722}
723
724static void
725cancelAllRequestsToClient( tr_peermsgs * msgs )
726{
727    struct peer_request req;
728    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
729
730    while( popNextRequest( msgs, &req ))
731        if( mustSendCancel )
732            protocolSendReject( msgs, &req );
733}
734
735void
736tr_peerMsgsSetChoke( tr_peermsgs * msgs,
737                     int           choke )
738{
739    const time_t now = tr_time( );
740    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
741
742    assert( msgs );
743    assert( msgs->peer );
744    assert( choke == 0 || choke == 1 );
745
746    if( msgs->peer->chokeChangedAt > fibrillationTime )
747    {
748        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
749    }
750    else if( msgs->peer->peerIsChoked != choke )
751    {
752        msgs->peer->peerIsChoked = choke;
753        if( choke )
754            cancelAllRequestsToClient( msgs );
755        protocolSendChoke( msgs, choke );
756        msgs->peer->chokeChangedAt = now;
757    }
758}
759
760/**
761***
762**/
763
764void
765tr_peerMsgsHave( tr_peermsgs * msgs,
766                 uint32_t      index )
767{
768    protocolSendHave( msgs, index );
769
770    /* since we have more pieces now, we might not be interested in this peer */
771    updateInterest( msgs );
772}
773
774/**
775***
776**/
777
778static tr_bool
779reqIsValid( const tr_peermsgs * peer,
780            uint32_t            index,
781            uint32_t            offset,
782            uint32_t            length )
783{
784    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
785}
786
787static tr_bool
788requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
789{
790    return reqIsValid( msgs, req->index, req->offset, req->length );
791}
792
793void
794tr_peerMsgsCancel( tr_peermsgs * msgs, tr_block_index_t block )
795{
796    struct peer_request req;
797/*fprintf( stderr, "SENDING CANCEL MESSAGE FOR BLOCK %zu\n\t\tFROM PEER %p ------------------------------------\n", (size_t)block, msgs->peer );*/
798    blockToReq( msgs->torrent, block, &req );
799    protocolSendCancel( msgs, &req );
800}
801
802/**
803***
804**/
805
806static void
807sendLtepHandshake( tr_peermsgs * msgs )
808{
809    tr_benc val, *m;
810    char * buf;
811    int len;
812    tr_bool allow_pex;
813    tr_bool allow_metadata_xfer;
814    struct evbuffer * out = msgs->outMessages;
815    const unsigned char * ipv6 = tr_globalIPv6();
816
817    if( msgs->clientSentLtepHandshake )
818        return;
819
820    dbgmsg( msgs, "sending an ltep handshake" );
821    msgs->clientSentLtepHandshake = 1;
822
823    /* decide if we want to advertise metadata xfer support (BEP 9) */
824    if( tr_torrentIsPrivate( msgs->torrent ) )
825        allow_metadata_xfer = 0;
826    else
827        allow_metadata_xfer = 1;
828
829    /* decide if we want to advertise pex support */
830    if( !tr_torrentAllowsPex( msgs->torrent ) )
831        allow_pex = 0;
832    else if( msgs->peerSentLtepHandshake )
833        allow_pex = msgs->peerSupportsPex ? 1 : 0;
834    else
835        allow_pex = 1;
836
837    tr_bencInitDict( &val, 8 );
838    tr_bencDictAddInt( &val, "e", getSession(msgs)->encryptionMode != TR_CLEAR_PREFERRED );
839    if( ipv6 != NULL )
840        tr_bencDictAddRaw( &val, "ipv6", ipv6, 16 );
841    if( allow_metadata_xfer && tr_torrentHasMetadata( msgs->torrent )
842                            && ( msgs->torrent->infoDictLength > 0 ) )
843        tr_bencDictAddInt( &val, "metadata_size", msgs->torrent->infoDictLength );
844    tr_bencDictAddInt( &val, "p", tr_sessionGetPublicPeerPort( getSession(msgs) ) );
845    tr_bencDictAddInt( &val, "reqq", REQQ );
846    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
847    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
848    m  = tr_bencDictAddDict( &val, "m", 2 );
849    if( allow_metadata_xfer )
850        tr_bencDictAddInt( m, "ut_metadata", UT_METADATA_ID );
851    if( allow_pex )
852        tr_bencDictAddInt( m, "ut_pex", UT_PEX_ID );
853
854    buf = tr_bencToStr( &val, TR_FMT_BENC, &len );
855
856    evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + len );
857    evbuffer_add_uint8 ( out, BT_LTEP );
858    evbuffer_add_uint8 ( out, LTEP_HANDSHAKE );
859    evbuffer_add       ( out, buf, len );
860    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
861    dbgOutMessageLen( msgs );
862
863    /* cleanup */
864    tr_bencFree( &val );
865    tr_free( buf );
866}
867
868static void
869parseLtepHandshake( tr_peermsgs *     msgs,
870                    int               len,
871                    struct evbuffer * inbuf )
872{
873    int64_t   i;
874    tr_benc   val, * sub;
875    uint8_t * tmp = tr_new( uint8_t, len );
876    const uint8_t *addr;
877    size_t addr_len;
878    tr_pex pex;
879    int8_t seedProbability = -1;
880
881    memset( &pex, 0, sizeof( tr_pex ) );
882
883    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
884    msgs->peerSentLtepHandshake = 1;
885
886    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
887    {
888        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
889        tr_free( tmp );
890        return;
891    }
892
893    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
894
895    /* does the peer prefer encrypted connections? */
896    if( tr_bencDictFindInt( &val, "e", &i ) ) {
897        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
898                                              : ENCRYPTION_PREFERENCE_NO;
899        if( i )
900            pex.flags |= ADDED_F_ENCRYPTION_FLAG;
901    }
902
903    /* check supported messages for utorrent pex */
904    msgs->peerSupportsPex = 0;
905    msgs->peerSupportsMetadataXfer = 0;
906
907    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
908        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
909            msgs->peerSupportsPex = i != 0;
910            msgs->ut_pex_id = (uint8_t) i;
911            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
912        }
913        if( tr_bencDictFindInt( sub, "ut_metadata", &i ) ) {
914            msgs->peerSupportsMetadataXfer = i != 0;
915            msgs->ut_metadata_id = (uint8_t) i;
916            dbgmsg( msgs, "msgs->ut_metadata_id is %d", (int)msgs->ut_metadata_id );
917        }
918        if( tr_bencDictFindInt( sub, "ut_holepunch", &i ) ) {
919            /* Mysterious µTorrent extension that we don't grok.  However,
920               it implies support for µTP, so use it to indicate that. */
921            tr_peerMgrSetUtpFailed( msgs->torrent,
922                                    tr_peerIoGetAddress( msgs->peer->io, NULL ),
923                                    FALSE );
924        }
925    }
926
927    /* look for metainfo size (BEP 9) */
928    if( tr_bencDictFindInt( &val, "metadata_size", &i ) )
929        tr_torrentSetMetadataSizeHint( msgs->torrent, i );
930
931    /* look for upload_only (BEP 21) */
932    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
933        seedProbability = i==0 ? 0 : 100;
934
935    /* get peer's listening port */
936    if( tr_bencDictFindInt( &val, "p", &i ) ) {
937        pex.port = htons( (uint16_t)i );
938        fireClientGotPort( msgs, pex.port );
939        dbgmsg( msgs, "peer's port is now %d", (int)i );
940    }
941
942    if( tr_peerIoIsIncoming( msgs->peer->io )
943        && tr_bencDictFindRaw( &val, "ipv4", &addr, &addr_len )
944        && ( addr_len == 4 ) )
945    {
946        pex.addr.type = TR_AF_INET;
947        memcpy( &pex.addr.addr.addr4, addr, 4 );
948        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
949    }
950
951    if( tr_peerIoIsIncoming( msgs->peer->io )
952        && tr_bencDictFindRaw( &val, "ipv6", &addr, &addr_len )
953        && ( addr_len == 16 ) )
954    {
955        pex.addr.type = TR_AF_INET6;
956        memcpy( &pex.addr.addr.addr6, addr, 16 );
957        tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
958    }
959
960    /* get peer's maximum request queue size */
961    if( tr_bencDictFindInt( &val, "reqq", &i ) )
962        msgs->reqq = i;
963
964    tr_bencFree( &val );
965    tr_free( tmp );
966}
967
968static void
969parseUtMetadata( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
970{
971    tr_benc dict;
972    char * msg_end;
973    char * benc_end;
974    int64_t msg_type = -1;
975    int64_t piece = -1;
976    int64_t total_size = 0;
977    uint8_t * tmp = tr_new( uint8_t, msglen );
978
979    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
980    msg_end = (char*)tmp + msglen;
981
982    if( !tr_bencLoad( tmp, msglen, &dict, &benc_end ) )
983    {
984        tr_bencDictFindInt( &dict, "msg_type", &msg_type );
985        tr_bencDictFindInt( &dict, "piece", &piece );
986        tr_bencDictFindInt( &dict, "total_size", &total_size );
987        tr_bencFree( &dict );
988    }
989
990    dbgmsg( msgs, "got ut_metadata msg: type %d, piece %d, total_size %d",
991            (int)msg_type, (int)piece, (int)total_size );
992
993    if( msg_type == METADATA_MSG_TYPE_REJECT )
994    {
995        /* NOOP */
996    }
997
998    if( ( msg_type == METADATA_MSG_TYPE_DATA )
999        && ( !tr_torrentHasMetadata( msgs->torrent ) )
1000        && ( msg_end - benc_end <= METADATA_PIECE_SIZE )
1001        && ( piece * METADATA_PIECE_SIZE + (msg_end - benc_end) <= total_size ) )
1002    {
1003        const int pieceLen = msg_end - benc_end;
1004        tr_torrentSetMetadataPiece( msgs->torrent, piece, benc_end, pieceLen );
1005    }
1006
1007    if( msg_type == METADATA_MSG_TYPE_REQUEST )
1008    {
1009        if( ( piece >= 0 )
1010            && tr_torrentHasMetadata( msgs->torrent )
1011            && !tr_torrentIsPrivate( msgs->torrent )
1012            && ( msgs->peerAskedForMetadataCount < METADATA_REQQ ) )
1013        {
1014            msgs->peerAskedForMetadata[msgs->peerAskedForMetadataCount++] = piece;
1015        }
1016        else
1017        {
1018            tr_benc tmp;
1019            int payloadLen;
1020            char * payload;
1021            struct evbuffer * out = msgs->outMessages;
1022
1023            /* build the rejection message */
1024            tr_bencInitDict( &tmp, 2 );
1025            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1026            tr_bencDictAddInt( &tmp, "piece", piece );
1027            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1028            tr_bencFree( &tmp );
1029
1030            /* write it out as a LTEP message to our outMessages buffer */
1031            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen );
1032            evbuffer_add_uint8 ( out, BT_LTEP );
1033            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1034            evbuffer_add       ( out, payload, payloadLen );
1035            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1036            dbgOutMessageLen( msgs );
1037
1038            tr_free( payload );
1039        }
1040    }
1041
1042    tr_free( tmp );
1043}
1044
1045static void
1046parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1047{
1048    int loaded = 0;
1049    uint8_t * tmp = tr_new( uint8_t, msglen );
1050    tr_benc val;
1051    tr_torrent * tor = msgs->torrent;
1052    const uint8_t * added;
1053    size_t added_len;
1054
1055    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1056
1057    if( tr_torrentAllowsPex( tor )
1058      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1059    {
1060        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1061        {
1062            tr_pex * pex;
1063            size_t i, n;
1064            size_t added_f_len = 0;
1065            const uint8_t * added_f = NULL;
1066
1067            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1068            pex = tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len, &n );
1069
1070            n = MIN( n, MAX_PEX_PEER_COUNT );
1071            for( i=0; i<n; ++i )
1072            {
1073                int seedProbability = -1;
1074                if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
1075                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
1076            }
1077
1078            tr_free( pex );
1079        }
1080
1081        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1082        {
1083            tr_pex * pex;
1084            size_t i, n;
1085            size_t added_f_len = 0;
1086            const uint8_t * added_f = NULL;
1087
1088            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1089            pex = tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len, &n );
1090
1091            n = MIN( n, MAX_PEX_PEER_COUNT );
1092            for( i=0; i<n; ++i )
1093            {
1094                int seedProbability = -1;
1095                if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
1096                tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
1097            }
1098
1099            tr_free( pex );
1100        }
1101    }
1102
1103    if( loaded )
1104        tr_bencFree( &val );
1105    tr_free( tmp );
1106}
1107
1108static void sendPex( tr_peermsgs * msgs );
1109
1110static void
1111parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer  * inbuf )
1112{
1113    uint8_t ltep_msgid;
1114
1115    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1116    msglen--;
1117
1118    if( ltep_msgid == LTEP_HANDSHAKE )
1119    {
1120        dbgmsg( msgs, "got ltep handshake" );
1121        parseLtepHandshake( msgs, msglen, inbuf );
1122        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1123        {
1124            sendLtepHandshake( msgs );
1125            sendPex( msgs );
1126        }
1127    }
1128    else if( ltep_msgid == UT_PEX_ID )
1129    {
1130        dbgmsg( msgs, "got ut pex" );
1131        msgs->peerSupportsPex = 1;
1132        parseUtPex( msgs, msglen, inbuf );
1133    }
1134    else if( ltep_msgid == UT_METADATA_ID )
1135    {
1136        dbgmsg( msgs, "got ut metadata" );
1137        msgs->peerSupportsMetadataXfer = 1;
1138        parseUtMetadata( msgs, msglen, inbuf );
1139    }
1140    else
1141    {
1142        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1143        evbuffer_drain( inbuf, msglen );
1144    }
1145}
1146
1147static int
1148readBtLength( tr_peermsgs *     msgs,
1149              struct evbuffer * inbuf,
1150              size_t            inlen )
1151{
1152    uint32_t len;
1153
1154    if( inlen < sizeof( len ) )
1155        return READ_LATER;
1156
1157    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1158
1159    if( len == 0 ) /* peer sent us a keepalive message */
1160        dbgmsg( msgs, "got KeepAlive" );
1161    else
1162    {
1163        msgs->incoming.length = len;
1164        msgs->state = AWAITING_BT_ID;
1165    }
1166
1167    return READ_NOW;
1168}
1169
1170static int readBtMessage( tr_peermsgs     * msgs,
1171                          struct evbuffer * inbuf,
1172                          size_t            inlen );
1173
1174static int
1175readBtId( tr_peermsgs * msgs, struct evbuffer  * inbuf, size_t inlen )
1176{
1177    uint8_t id;
1178
1179    if( inlen < sizeof( uint8_t ) )
1180        return READ_LATER;
1181
1182    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1183    msgs->incoming.id = id;
1184    dbgmsg( msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length );
1185
1186    if( id == BT_PIECE )
1187    {
1188        msgs->state = AWAITING_BT_PIECE;
1189        return READ_NOW;
1190    }
1191    else if( msgs->incoming.length != 1 )
1192    {
1193        msgs->state = AWAITING_BT_MESSAGE;
1194        return READ_NOW;
1195    }
1196    else return readBtMessage( msgs, inbuf, inlen - 1 );
1197}
1198
1199static void
1200updatePeerProgress( tr_peermsgs * msgs )
1201{
1202    tr_peerUpdateProgress( msgs->torrent, msgs->peer );
1203
1204    updateFastSet( msgs );
1205    updateInterest( msgs );
1206}
1207
1208static void
1209prefetchPieces( tr_peermsgs *msgs )
1210{
1211    int i;
1212
1213    if( !getSession(msgs)->isPrefetchEnabled )
1214        return;
1215
1216    /* Maintain 12 prefetched blocks per unchoked peer */
1217    for( i=msgs->prefetchCount; i<msgs->peer->pendingReqsToClient && i<12; ++i )
1218    {
1219        const struct peer_request * req = msgs->peerAskedFor + i;
1220        if( requestIsValid( msgs, req ) )
1221        {
1222            tr_cachePrefetchBlock( getSession(msgs)->cache, msgs->torrent, req->index, req->offset, req->length );
1223            ++msgs->prefetchCount;
1224        }
1225    }
1226}
1227
1228static void
1229peerMadeRequest( tr_peermsgs *               msgs,
1230                 const struct peer_request * req )
1231{
1232    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1233    const int reqIsValid = requestIsValid( msgs, req );
1234    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( &msgs->torrent->completion, req->index );
1235    const int peerIsChoked = msgs->peer->peerIsChoked;
1236
1237    int allow = FALSE;
1238
1239    if( !reqIsValid )
1240        dbgmsg( msgs, "rejecting an invalid request." );
1241    else if( !clientHasPiece )
1242        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1243    else if( peerIsChoked )
1244        dbgmsg( msgs, "rejecting request from choked peer" );
1245    else if( msgs->peer->pendingReqsToClient + 1 >= REQQ )
1246        dbgmsg( msgs, "rejecting request ... reqq is full" );
1247    else
1248        allow = TRUE;
1249
1250    if( allow ) {
1251        msgs->peerAskedFor[msgs->peer->pendingReqsToClient++] = *req;
1252        prefetchPieces( msgs );
1253    } else if( fext ) {
1254        protocolSendReject( msgs, req );
1255    }
1256}
1257
1258static tr_bool
1259messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1260{
1261    switch( id )
1262    {
1263        case BT_CHOKE:
1264        case BT_UNCHOKE:
1265        case BT_INTERESTED:
1266        case BT_NOT_INTERESTED:
1267        case BT_FEXT_HAVE_ALL:
1268        case BT_FEXT_HAVE_NONE:
1269            return len == 1;
1270
1271        case BT_HAVE:
1272        case BT_FEXT_SUGGEST:
1273        case BT_FEXT_ALLOWED_FAST:
1274            return len == 5;
1275
1276        case BT_BITFIELD:
1277            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1278
1279        case BT_REQUEST:
1280        case BT_CANCEL:
1281        case BT_FEXT_REJECT:
1282            return len == 13;
1283
1284        case BT_PIECE:
1285            return len > 9 && len <= 16393;
1286
1287        case BT_PORT:
1288            return len == 3;
1289
1290        case BT_LTEP:
1291            return len >= 2;
1292
1293        default:
1294            return FALSE;
1295    }
1296}
1297
1298static int clientGotBlock( tr_peermsgs *               msgs,
1299                           struct evbuffer *           block,
1300                           const struct peer_request * req );
1301
1302static int
1303readBtPiece( tr_peermsgs      * msgs,
1304             struct evbuffer  * inbuf,
1305             size_t             inlen,
1306             size_t           * setme_piece_bytes_read )
1307{
1308    struct peer_request * req = &msgs->incoming.blockReq;
1309
1310    assert( evbuffer_get_length( inbuf ) >= inlen );
1311    dbgmsg( msgs, "In readBtPiece" );
1312
1313    if( !req->length )
1314    {
1315        if( inlen < 8 )
1316            return READ_LATER;
1317
1318        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1319        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1320        req->length = msgs->incoming.length - 9;
1321        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1322        return READ_NOW;
1323    }
1324    else
1325    {
1326        int err;
1327
1328        /* read in another chunk of data */
1329        const size_t nLeft = req->length - evbuffer_get_length( msgs->incoming.block );
1330        size_t n = MIN( nLeft, inlen );
1331        size_t i = n;
1332        void * buf = tr_sessionGetBuffer( getSession( msgs ) );
1333        const size_t buflen = SESSION_BUFFER_SIZE;
1334
1335        while( i > 0 )
1336        {
1337            const size_t thisPass = MIN( i, buflen );
1338            tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, thisPass );
1339            evbuffer_add( msgs->incoming.block, buf, thisPass );
1340            i -= thisPass;
1341        }
1342
1343        tr_sessionReleaseBuffer( getSession( msgs ) );
1344        buf = NULL;
1345
1346        fireClientGotData( msgs, n, TRUE );
1347        *setme_piece_bytes_read += n;
1348        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1349               n, req->index, req->offset, req->length,
1350               (int)( req->length - evbuffer_get_length( msgs->incoming.block ) ) );
1351        if( evbuffer_get_length( msgs->incoming.block ) < req->length )
1352            return READ_LATER;
1353
1354        /* we've got the whole block ... process it */
1355        err = clientGotBlock( msgs, msgs->incoming.block, req );
1356
1357        /* cleanup */
1358        evbuffer_free( msgs->incoming.block );
1359        msgs->incoming.block = evbuffer_new( );
1360        req->length = 0;
1361        msgs->state = AWAITING_BT_LENGTH;
1362        return err ? READ_ERR : READ_NOW;
1363    }
1364}
1365
1366static void updateDesiredRequestCount( tr_peermsgs * msgs );
1367
1368static int
1369readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1370{
1371    uint32_t      ui32;
1372    uint32_t      msglen = msgs->incoming.length;
1373    const uint8_t id = msgs->incoming.id;
1374#ifndef NDEBUG
1375    const size_t  startBufLen = evbuffer_get_length( inbuf );
1376#endif
1377    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1378
1379    --msglen; /* id length */
1380
1381    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1382
1383    if( inlen < msglen )
1384        return READ_LATER;
1385
1386    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1387    {
1388        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1389        fireError( msgs, EMSGSIZE );
1390        return READ_ERR;
1391    }
1392
1393    switch( id )
1394    {
1395        case BT_CHOKE:
1396            dbgmsg( msgs, "got Choke" );
1397            msgs->peer->clientIsChoked = 1;
1398            if( !fext )
1399                fireGotChoke( msgs );
1400            break;
1401
1402        case BT_UNCHOKE:
1403            dbgmsg( msgs, "got Unchoke" );
1404            msgs->peer->clientIsChoked = 0;
1405            updateDesiredRequestCount( msgs );
1406            break;
1407
1408        case BT_INTERESTED:
1409            dbgmsg( msgs, "got Interested" );
1410            msgs->peer->peerIsInterested = 1;
1411            break;
1412
1413        case BT_NOT_INTERESTED:
1414            dbgmsg( msgs, "got Not Interested" );
1415            msgs->peer->peerIsInterested = 0;
1416            break;
1417
1418        case BT_HAVE:
1419            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1420            dbgmsg( msgs, "got Have: %u", ui32 );
1421            if( tr_torrentHasMetadata( msgs->torrent )
1422                    && ( ui32 >= msgs->torrent->info.pieceCount ) )
1423            {
1424                fireError( msgs, ERANGE );
1425                return READ_ERR;
1426            }
1427
1428            /* a peer can send the same HAVE message twice... */
1429            if( !tr_bitsetHas( &msgs->peer->have, ui32 ) ) {
1430                tr_bitsetAdd( &msgs->peer->have, ui32 );
1431                fireClientGotHave( msgs, ui32 );
1432            }
1433            updatePeerProgress( msgs );
1434            break;
1435
1436        case BT_BITFIELD: {
1437            tr_bitfield tmp = TR_BITFIELD_INIT;
1438            const size_t bitCount = tr_torrentHasMetadata( msgs->torrent )
1439                                  ? msgs->torrent->info.pieceCount
1440                                  : msglen * 8;
1441            tr_bitfieldConstruct( &tmp, bitCount );
1442            dbgmsg( msgs, "got a bitfield" );
1443            tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp.bits, msglen );
1444            tr_bitsetSetBitfield( &msgs->peer->have, &tmp );
1445            fireClientGotBitfield( msgs, &tmp );
1446            tr_bitfieldDestruct( &tmp );
1447            updatePeerProgress( msgs );
1448            break;
1449        }
1450
1451        case BT_REQUEST:
1452        {
1453            struct peer_request r;
1454            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1455            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1456            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1457            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1458            peerMadeRequest( msgs, &r );
1459            break;
1460        }
1461
1462        case BT_CANCEL:
1463        {
1464            int i;
1465            struct peer_request r;
1466            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1467            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1468            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1469            tr_historyAdd( &msgs->peer->cancelsSentToClient, tr_time( ), 1 );
1470            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1471
1472            for( i=0; i<msgs->peer->pendingReqsToClient; ++i ) {
1473                const struct peer_request * req = msgs->peerAskedFor + i;
1474                if( ( req->index == r.index ) && ( req->offset == r.offset ) && ( req->length == r.length ) )
1475                    break;
1476            }
1477
1478            if( i < msgs->peer->pendingReqsToClient )
1479                tr_removeElementFromArray( msgs->peerAskedFor, i, sizeof( struct peer_request ),
1480                                           msgs->peer->pendingReqsToClient-- );
1481            break;
1482        }
1483
1484        case BT_PIECE:
1485            assert( 0 ); /* handled elsewhere! */
1486            break;
1487
1488        case BT_PORT:
1489            dbgmsg( msgs, "Got a BT_PORT" );
1490            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->dht_port );
1491            if( msgs->peer->dht_port > 0 )
1492                tr_dhtAddNode( getSession(msgs),
1493                               tr_peerAddress( msgs->peer ),
1494                               msgs->peer->dht_port, 0 );
1495            break;
1496
1497        case BT_FEXT_SUGGEST:
1498            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1499            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1500            if( fext )
1501                fireClientGotSuggest( msgs, ui32 );
1502            else {
1503                fireError( msgs, EMSGSIZE );
1504                return READ_ERR;
1505            }
1506            break;
1507
1508        case BT_FEXT_ALLOWED_FAST:
1509            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1510            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1511            if( fext )
1512                fireClientGotAllowedFast( msgs, ui32 );
1513            else {
1514                fireError( msgs, EMSGSIZE );
1515                return READ_ERR;
1516            }
1517            break;
1518
1519        case BT_FEXT_HAVE_ALL:
1520            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1521            if( fext ) {
1522                tr_bitsetSetHaveAll( &msgs->peer->have );
1523                fireClientGotHaveAll( msgs );
1524                updatePeerProgress( msgs );
1525            } else {
1526                fireError( msgs, EMSGSIZE );
1527                return READ_ERR;
1528            }
1529            break;
1530
1531        case BT_FEXT_HAVE_NONE:
1532            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1533            if( fext ) {
1534                tr_bitsetSetHaveNone( &msgs->peer->have );
1535                fireClientGotHaveNone( msgs );
1536                updatePeerProgress( msgs );
1537            } else {
1538                fireError( msgs, EMSGSIZE );
1539                return READ_ERR;
1540            }
1541            break;
1542
1543        case BT_FEXT_REJECT:
1544        {
1545            struct peer_request r;
1546            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1547            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1548            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1549            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1550            if( fext )
1551                fireGotRej( msgs, &r );
1552            else {
1553                fireError( msgs, EMSGSIZE );
1554                return READ_ERR;
1555            }
1556            break;
1557        }
1558
1559        case BT_LTEP:
1560            dbgmsg( msgs, "Got a BT_LTEP" );
1561            parseLtep( msgs, msglen, inbuf );
1562            break;
1563
1564        default:
1565            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1566            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1567            break;
1568    }
1569
1570    assert( msglen + 1 == msgs->incoming.length );
1571    assert( evbuffer_get_length( inbuf ) == startBufLen - msglen );
1572
1573    msgs->state = AWAITING_BT_LENGTH;
1574    return READ_NOW;
1575}
1576
1577static void
1578addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1579{
1580    if( !msgs->peer->blame )
1581         msgs->peer->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1582    tr_bitfieldAdd( msgs->peer->blame, index );
1583}
1584
1585/* returns 0 on success, or an errno on failure */
1586static int
1587clientGotBlock( tr_peermsgs *               msgs,
1588                struct evbuffer *           data,
1589                const struct peer_request * req )
1590{
1591    int err;
1592    tr_torrent * tor = msgs->torrent;
1593    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1594
1595    assert( msgs );
1596    assert( req );
1597
1598    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1599        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1600                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1601        return EMSGSIZE;
1602    }
1603
1604    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1605
1606    if( !tr_peerMgrDidPeerRequest( msgs->torrent, msgs->peer, block ) ) {
1607        dbgmsg( msgs, "we didn't ask for this message..." );
1608        return 0;
1609    }
1610    if( tr_cpPieceIsComplete( &msgs->torrent->completion, req->index ) ) {
1611        dbgmsg( msgs, "we did ask for this message, but the piece is already complete..." );
1612        return 0;
1613    }
1614
1615    /**
1616    ***  Save the block
1617    **/
1618
1619    if(( err = tr_cacheWriteBlock( getSession(msgs)->cache, tor, req->index, req->offset, req->length, data )))
1620        return err;
1621
1622    addPeerToBlamefield( msgs, req->index );
1623    fireGotBlock( msgs, req );
1624    return 0;
1625}
1626
1627static int peerPulse( void * vmsgs );
1628
1629static void
1630didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1631{
1632    tr_peermsgs * msgs = vmsgs;
1633    firePeerGotData( msgs, bytesWritten, wasPieceData );
1634
1635    if ( tr_isPeerIo( io ) && io->userData )
1636        peerPulse( msgs );
1637}
1638
1639static ReadState
1640canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1641{
1642    ReadState         ret;
1643    tr_peermsgs *     msgs = vmsgs;
1644    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1645    const size_t      inlen = evbuffer_get_length( in );
1646
1647    dbgmsg( msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state );
1648
1649    if( !inlen )
1650    {
1651        ret = READ_LATER;
1652    }
1653    else if( msgs->state == AWAITING_BT_PIECE )
1654    {
1655        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1656    }
1657    else switch( msgs->state )
1658    {
1659        case AWAITING_BT_LENGTH:
1660            ret = readBtLength ( msgs, in, inlen ); break;
1661
1662        case AWAITING_BT_ID:
1663            ret = readBtId     ( msgs, in, inlen ); break;
1664
1665        case AWAITING_BT_MESSAGE:
1666            ret = readBtMessage( msgs, in, inlen ); break;
1667
1668        default:
1669            ret = READ_ERR;
1670            assert( 0 );
1671    }
1672
1673    dbgmsg( msgs, "canRead: ret is %d", (int)ret );
1674
1675    /* log the raw data that was read */
1676    if( ( ret != READ_ERR ) && ( evbuffer_get_length( in ) != inlen ) )
1677        fireClientGotData( msgs, inlen - evbuffer_get_length( in ), FALSE );
1678
1679    return ret;
1680}
1681
1682int
1683tr_peerMsgsIsReadingBlock( const tr_peermsgs * msgs, tr_block_index_t block )
1684{
1685    if( msgs->state != AWAITING_BT_PIECE )
1686        return FALSE;
1687
1688    return block == _tr_block( msgs->torrent,
1689                               msgs->incoming.blockReq.index,
1690                               msgs->incoming.blockReq.offset );
1691}
1692
1693/**
1694***
1695**/
1696
1697static void
1698updateDesiredRequestCount( tr_peermsgs * msgs )
1699{
1700    const tr_torrent * const torrent = msgs->torrent;
1701
1702    if( tr_torrentIsSeed( msgs->torrent ) )
1703    {
1704        msgs->desiredRequestCount = 0;
1705    }
1706    else if( msgs->peer->clientIsChoked )
1707    {
1708        msgs->desiredRequestCount = 0;
1709    }
1710    else if( !msgs->peer->clientIsInterested )
1711    {
1712        msgs->desiredRequestCount = 0;
1713    }
1714    else
1715    {
1716        int estimatedBlocksInPeriod;
1717        int rate_Bps;
1718        int irate_Bps;
1719        const int floor = 4;
1720        const int seconds = REQUEST_BUF_SECS;
1721        const uint64_t now = tr_time_msec( );
1722
1723        /* Get the rate limit we should use.
1724         * FIXME: this needs to consider all the other peers as well... */
1725        rate_Bps = tr_peerGetPieceSpeed_Bps( msgs->peer, now, TR_PEER_TO_CLIENT );
1726        if( tr_torrentUsesSpeedLimit( torrent, TR_PEER_TO_CLIENT ) )
1727            rate_Bps = MIN( rate_Bps, tr_torrentGetSpeedLimit_Bps( torrent, TR_PEER_TO_CLIENT ) );
1728
1729        /* honor the session limits, if enabled */
1730        if( tr_torrentUsesSessionLimits( torrent ) )
1731            if( tr_sessionGetActiveSpeedLimit_Bps( torrent->session, TR_PEER_TO_CLIENT, &irate_Bps ) )
1732                rate_Bps = MIN( rate_Bps, irate_Bps );
1733
1734        /* use this desired rate to figure out how
1735         * many requests we should send to this peer */
1736        estimatedBlocksInPeriod = ( rate_Bps * seconds ) / torrent->blockSize;
1737        msgs->desiredRequestCount = MAX( floor, estimatedBlocksInPeriod );
1738
1739        /* honor the peer's maximum request count, if specified */
1740        if( msgs->reqq > 0 )
1741            if( msgs->desiredRequestCount > msgs->reqq )
1742                msgs->desiredRequestCount = msgs->reqq;
1743    }
1744}
1745
1746static void
1747updateMetadataRequests( tr_peermsgs * msgs, time_t now )
1748{
1749    int piece;
1750
1751    if( msgs->peerSupportsMetadataXfer
1752        && tr_torrentGetNextMetadataRequest( msgs->torrent, now, &piece ) )
1753    {
1754        tr_benc tmp;
1755        int payloadLen;
1756        char * payload;
1757        struct evbuffer * out = msgs->outMessages;
1758
1759        /* build the data message */
1760        tr_bencInitDict( &tmp, 3 );
1761        tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REQUEST );
1762        tr_bencDictAddInt( &tmp, "piece", piece );
1763        payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1764        tr_bencFree( &tmp );
1765
1766        dbgmsg( msgs, "requesting metadata piece #%d", piece );
1767
1768        /* write it out as a LTEP message to our outMessages buffer */
1769        evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen );
1770        evbuffer_add_uint8 ( out, BT_LTEP );
1771        evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1772        evbuffer_add       ( out, payload, payloadLen );
1773        pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1774        dbgOutMessageLen( msgs );
1775
1776        tr_free( payload );
1777    }
1778}
1779
1780static void
1781updateBlockRequests( tr_peermsgs * msgs )
1782{
1783    if( tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT )
1784        && ( msgs->desiredRequestCount > 0 )
1785        && ( msgs->peer->pendingReqsToPeer <= ( msgs->desiredRequestCount * 0.66 ) ) )
1786    {
1787        int i;
1788        int n;
1789        const int numwant = msgs->desiredRequestCount - msgs->peer->pendingReqsToPeer;
1790        tr_block_index_t * blocks = tr_new( tr_block_index_t, numwant );
1791
1792        tr_peerMgrGetNextRequests( msgs->torrent, msgs->peer, numwant, blocks, &n );
1793
1794        for( i=0; i<n; ++i )
1795        {
1796            struct peer_request req;
1797            blockToReq( msgs->torrent, blocks[i], &req );
1798            protocolSendRequest( msgs, &req );
1799        }
1800
1801        tr_free( blocks );
1802    }
1803}
1804
1805static size_t
1806fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1807{
1808    int piece;
1809    size_t bytesWritten = 0;
1810    struct peer_request req;
1811    const tr_bool haveMessages = evbuffer_get_length( msgs->outMessages ) != 0;
1812    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1813
1814    /**
1815    ***  Protocol messages
1816    **/
1817
1818    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1819    {
1820        dbgmsg( msgs, "started an outMessages batch (length is %zu)", evbuffer_get_length( msgs->outMessages ) );
1821        msgs->outMessagesBatchedAt = now;
1822    }
1823    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1824    {
1825        const size_t len = evbuffer_get_length( msgs->outMessages );
1826        /* flush the protocol messages */
1827        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1828        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, FALSE );
1829        msgs->clientSentAnythingAt = now;
1830        msgs->outMessagesBatchedAt = 0;
1831        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1832        bytesWritten +=  len;
1833    }
1834
1835    /**
1836    ***  Metadata Pieces
1837    **/
1838
1839    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= METADATA_PIECE_SIZE )
1840        && popNextMetadataRequest( msgs, &piece ) )
1841    {
1842        char * data;
1843        int dataLen;
1844        tr_bool ok = FALSE;
1845
1846        data = tr_torrentGetMetadataPiece( msgs->torrent, piece, &dataLen );
1847        if( ( dataLen > 0 ) && ( data != NULL ) )
1848        {
1849            tr_benc tmp;
1850            int payloadLen;
1851            char * payload;
1852            struct evbuffer * out = msgs->outMessages;
1853
1854            /* build the data message */
1855            tr_bencInitDict( &tmp, 3 );
1856            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_DATA );
1857            tr_bencDictAddInt( &tmp, "piece", piece );
1858            tr_bencDictAddInt( &tmp, "total_size", msgs->torrent->infoDictLength );
1859            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1860            tr_bencFree( &tmp );
1861
1862            /* write it out as a LTEP message to our outMessages buffer */
1863            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen + dataLen );
1864            evbuffer_add_uint8 ( out, BT_LTEP );
1865            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1866            evbuffer_add       ( out, payload, payloadLen );
1867            evbuffer_add       ( out, data, dataLen );
1868            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1869            dbgOutMessageLen( msgs );
1870
1871            tr_free( payload );
1872            tr_free( data );
1873
1874            ok = TRUE;
1875        }
1876
1877        if( !ok ) /* send a rejection message */
1878        {
1879            tr_benc tmp;
1880            int payloadLen;
1881            char * payload;
1882            struct evbuffer * out = msgs->outMessages;
1883
1884            /* build the rejection message */
1885            tr_bencInitDict( &tmp, 2 );
1886            tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1887            tr_bencDictAddInt( &tmp, "piece", piece );
1888            payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1889            tr_bencFree( &tmp );
1890
1891            /* write it out as a LTEP message to our outMessages buffer */
1892            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + payloadLen );
1893            evbuffer_add_uint8 ( out, BT_LTEP );
1894            evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1895            evbuffer_add       ( out, payload, payloadLen );
1896            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1897            dbgOutMessageLen( msgs );
1898
1899            tr_free( payload );
1900        }
1901    }
1902
1903    /**
1904    ***  Data Blocks
1905    **/
1906
1907    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= msgs->torrent->blockSize )
1908        && popNextRequest( msgs, &req ) )
1909    {
1910        --msgs->prefetchCount;
1911
1912        if( requestIsValid( msgs, &req )
1913            && tr_cpPieceIsComplete( &msgs->torrent->completion, req.index ) )
1914        {
1915            int err;
1916            const uint32_t msglen = 4 + 1 + 4 + 4 + req.length;
1917            struct evbuffer * out;
1918            struct evbuffer_iovec iovec[1];
1919
1920            out = evbuffer_new( );
1921            evbuffer_expand( out, msglen );
1922
1923            evbuffer_add_uint32( out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1924            evbuffer_add_uint8 ( out, BT_PIECE );
1925            evbuffer_add_uint32( out, req.index );
1926            evbuffer_add_uint32( out, req.offset );
1927
1928            evbuffer_reserve_space( out, req.length, iovec, 1 );
1929            err = tr_cacheReadBlock( getSession(msgs)->cache, msgs->torrent, req.index, req.offset, req.length, iovec[0].iov_base );
1930            iovec[0].iov_len = req.length;
1931            evbuffer_commit_space( out, iovec, 1 );
1932
1933            /* check the piece if it needs checking... */
1934            if( !err && tr_torrentPieceNeedsCheck( msgs->torrent, req.index ) )
1935                if(( err = !tr_torrentCheckPiece( msgs->torrent, req.index )))
1936                    tr_torrentSetLocalError( msgs->torrent, _( "Please Verify Local Data! Piece #%zu is corrupt." ), (size_t)req.index );
1937
1938            if( err )
1939            {
1940                if( fext )
1941                    protocolSendReject( msgs, &req );
1942            }
1943            else
1944            {
1945                const size_t n = evbuffer_get_length( out );
1946                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1947                assert( n == msglen );
1948                tr_peerIoWriteBuf( msgs->peer->io, out, TRUE );
1949                bytesWritten += n;
1950                msgs->clientSentAnythingAt = now;
1951                tr_historyAdd( &msgs->peer->blocksSentToPeer, tr_time( ), 1 );
1952            }
1953
1954            evbuffer_free( out );
1955
1956            if( err )
1957            {
1958                bytesWritten = 0;
1959                msgs = NULL;
1960            }
1961        }
1962        else if( fext ) /* peer needs a reject message */
1963        {
1964            protocolSendReject( msgs, &req );
1965        }
1966
1967        if( msgs != NULL )
1968            prefetchPieces( msgs );
1969    }
1970
1971    /**
1972    ***  Keepalive
1973    **/
1974
1975    if( ( msgs != NULL )
1976        && ( msgs->clientSentAnythingAt != 0 )
1977        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1978    {
1979        dbgmsg( msgs, "sending a keepalive message" );
1980        evbuffer_add_uint32( msgs->outMessages, 0 );
1981        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1982    }
1983
1984    return bytesWritten;
1985}
1986
1987static int
1988peerPulse( void * vmsgs )
1989{
1990    tr_peermsgs * msgs = vmsgs;
1991    const time_t  now = tr_time( );
1992
1993    if ( tr_isPeerIo( msgs->peer->io ) ) {
1994        updateDesiredRequestCount( msgs );
1995        updateBlockRequests( msgs );
1996        updateMetadataRequests( msgs, now );
1997    }
1998
1999    for( ;; )
2000        if( fillOutputBuffer( msgs, now ) < 1 )
2001            break;
2002
2003    return TRUE; /* loop forever */
2004}
2005
2006void
2007tr_peerMsgsPulse( tr_peermsgs * msgs )
2008{
2009    if( msgs != NULL )
2010        peerPulse( msgs );
2011}
2012
2013static void
2014gotError( tr_peerIo  * io UNUSED,
2015          short        what,
2016          void       * vmsgs )
2017{
2018    if( what & BEV_EVENT_TIMEOUT )
2019        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
2020    if( what & ( BEV_EVENT_EOF | BEV_EVENT_ERROR ) )
2021        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
2022               what, errno, tr_strerror( errno ) );
2023    fireError( vmsgs, ENOTCONN );
2024}
2025
2026static void
2027sendBitfield( tr_peermsgs * msgs )
2028{
2029    struct evbuffer * out = msgs->outMessages;
2030    tr_bitfield * bf = tr_cpCreatePieceBitfield( &msgs->torrent->completion );
2031
2032    evbuffer_add_uint32( out, sizeof( uint8_t ) + bf->byteCount );
2033    evbuffer_add_uint8 ( out, BT_BITFIELD );
2034    evbuffer_add       ( out, bf->bits, bf->byteCount );
2035    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu", evbuffer_get_length( out ) );
2036    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
2037
2038    tr_bitfieldFree( bf );
2039}
2040
2041static void
2042tellPeerWhatWeHave( tr_peermsgs * msgs )
2043{
2044    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
2045
2046    if( fext && ( tr_cpBlockBitset( &msgs->torrent->completion )->haveAll ) )
2047    {
2048        protocolSendHaveAll( msgs );
2049    }
2050    else if( fext && ( tr_cpBlockBitset( &msgs->torrent->completion )->haveNone ) )
2051    {
2052        protocolSendHaveNone( msgs );
2053    }
2054    else
2055    {
2056        sendBitfield( msgs );
2057    }
2058}
2059
2060/**
2061***
2062**/
2063
2064/* some peers give us error messages if we send
2065   more than this many peers in a single pex message
2066   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
2067#define MAX_PEX_ADDED 50
2068#define MAX_PEX_DROPPED 50
2069
2070typedef struct
2071{
2072    tr_pex *  added;
2073    tr_pex *  dropped;
2074    tr_pex *  elements;
2075    int       addedCount;
2076    int       droppedCount;
2077    int       elementCount;
2078}
2079PexDiffs;
2080
2081static void
2082pexAddedCb( void * vpex,
2083            void * userData )
2084{
2085    PexDiffs * diffs = userData;
2086    tr_pex *   pex = vpex;
2087
2088    if( diffs->addedCount < MAX_PEX_ADDED )
2089    {
2090        diffs->added[diffs->addedCount++] = *pex;
2091        diffs->elements[diffs->elementCount++] = *pex;
2092    }
2093}
2094
2095static inline void
2096pexDroppedCb( void * vpex,
2097              void * userData )
2098{
2099    PexDiffs * diffs = userData;
2100    tr_pex *   pex = vpex;
2101
2102    if( diffs->droppedCount < MAX_PEX_DROPPED )
2103    {
2104        diffs->dropped[diffs->droppedCount++] = *pex;
2105    }
2106}
2107
2108static inline void
2109pexElementCb( void * vpex,
2110              void * userData )
2111{
2112    PexDiffs * diffs = userData;
2113    tr_pex * pex = vpex;
2114
2115    diffs->elements[diffs->elementCount++] = *pex;
2116}
2117
2118static void
2119sendPex( tr_peermsgs * msgs )
2120{
2121    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2122    {
2123        PexDiffs diffs;
2124        PexDiffs diffs6;
2125        tr_pex * newPex = NULL;
2126        tr_pex * newPex6 = NULL;
2127        const int newCount = tr_peerMgrGetPeers( msgs->torrent, &newPex, TR_AF_INET, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2128        const int newCount6 = tr_peerMgrGetPeers( msgs->torrent, &newPex6, TR_AF_INET6, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2129
2130        /* build the diffs */
2131        diffs.added = tr_new( tr_pex, newCount );
2132        diffs.addedCount = 0;
2133        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2134        diffs.droppedCount = 0;
2135        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2136        diffs.elementCount = 0;
2137        tr_set_compare( msgs->pex, msgs->pexCount,
2138                        newPex, newCount,
2139                        tr_pexCompare, sizeof( tr_pex ),
2140                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2141        diffs6.added = tr_new( tr_pex, newCount6 );
2142        diffs6.addedCount = 0;
2143        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
2144        diffs6.droppedCount = 0;
2145        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
2146        diffs6.elementCount = 0;
2147        tr_set_compare( msgs->pex6, msgs->pexCount6,
2148                        newPex6, newCount6,
2149                        tr_pexCompare, sizeof( tr_pex ),
2150                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
2151        dbgmsg(
2152            msgs,
2153            "pex: old peer count %d+%d, new peer count %d+%d, "
2154            "added %d+%d, removed %d+%d",
2155            msgs->pexCount, msgs->pexCount6, newCount, newCount6,
2156            diffs.addedCount, diffs6.addedCount,
2157            diffs.droppedCount, diffs6.droppedCount );
2158
2159        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2160            !diffs6.droppedCount )
2161        {
2162            tr_free( diffs.elements );
2163            tr_free( diffs6.elements );
2164        }
2165        else
2166        {
2167            int  i;
2168            tr_benc val;
2169            char * benc;
2170            int bencLen;
2171            uint8_t * tmp, *walk;
2172            struct evbuffer * out = msgs->outMessages;
2173
2174            /* update peer */
2175            tr_free( msgs->pex );
2176            msgs->pex = diffs.elements;
2177            msgs->pexCount = diffs.elementCount;
2178            tr_free( msgs->pex6 );
2179            msgs->pex6 = diffs6.elements;
2180            msgs->pexCount6 = diffs6.elementCount;
2181
2182            /* build the pex payload */
2183            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
2184                                         * speed vs. likelihood? */
2185
2186            if( diffs.addedCount > 0)
2187            {
2188                /* "added" */
2189                tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2190                for( i = 0; i < diffs.addedCount; ++i ) {
2191                    memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2192                    memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2193                }
2194                assert( ( walk - tmp ) == diffs.addedCount * 6 );
2195                tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2196                tr_free( tmp );
2197
2198                /* "added.f"
2199                 * unset each holepunch flag because we don't support it. */
2200                tmp = walk = tr_new( uint8_t, diffs.addedCount );
2201                for( i = 0; i < diffs.addedCount; ++i )
2202                    *walk++ = diffs.added[i].flags & ~ADDED_F_HOLEPUNCH;
2203                assert( ( walk - tmp ) == diffs.addedCount );
2204                tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2205                tr_free( tmp );
2206            }
2207
2208            if( diffs.droppedCount > 0 )
2209            {
2210                /* "dropped" */
2211                tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2212                for( i = 0; i < diffs.droppedCount; ++i ) {
2213                    memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2214                    memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2215                }
2216                assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2217                tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2218                tr_free( tmp );
2219            }
2220
2221            if( diffs6.addedCount > 0 )
2222            {
2223                /* "added6" */
2224                tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2225                for( i = 0; i < diffs6.addedCount; ++i ) {
2226                    memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2227                    walk += 16;
2228                    memcpy( walk, &diffs6.added[i].port, 2 );
2229                    walk += 2;
2230                }
2231                assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2232                tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2233                tr_free( tmp );
2234
2235                /* "added6.f"
2236                 * unset each holepunch flag because we don't support it. */
2237                tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2238                for( i = 0; i < diffs6.addedCount; ++i )
2239                    *walk++ = diffs6.added[i].flags & ~ADDED_F_HOLEPUNCH;
2240                assert( ( walk - tmp ) == diffs6.addedCount );
2241                tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2242                tr_free( tmp );
2243            }
2244
2245            if( diffs6.droppedCount > 0 )
2246            {
2247                /* "dropped6" */
2248                tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2249                for( i = 0; i < diffs6.droppedCount; ++i ) {
2250                    memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2251                    walk += 16;
2252                    memcpy( walk, &diffs6.dropped[i].port, 2 );
2253                    walk += 2;
2254                }
2255                assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2256                tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2257                tr_free( tmp );
2258            }
2259
2260            /* write the pex message */
2261            benc = tr_bencToStr( &val, TR_FMT_BENC, &bencLen );
2262            evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + bencLen );
2263            evbuffer_add_uint8 ( out, BT_LTEP );
2264            evbuffer_add_uint8 ( out, msgs->ut_pex_id );
2265            evbuffer_add       ( out, benc, bencLen );
2266            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2267            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", evbuffer_get_length( out ) );
2268            dbgOutMessageLen( msgs );
2269
2270            tr_free( benc );
2271            tr_bencFree( &val );
2272        }
2273
2274        /* cleanup */
2275        tr_free( diffs.added );
2276        tr_free( diffs.dropped );
2277        tr_free( newPex );
2278        tr_free( diffs6.added );
2279        tr_free( diffs6.dropped );
2280        tr_free( newPex6 );
2281
2282        /*msgs->clientSentPexAt = tr_time( );*/
2283    }
2284}
2285
2286static void
2287pexPulse( int foo UNUSED, short bar UNUSED, void * vmsgs )
2288{
2289    struct tr_peermsgs * msgs = vmsgs;
2290
2291    sendPex( msgs );
2292
2293    tr_timerAdd( msgs->pexTimer, PEX_INTERVAL_SECS, 0 );
2294}
2295
2296/**
2297***
2298**/
2299
2300tr_peermsgs*
2301tr_peerMsgsNew( struct tr_torrent    * torrent,
2302                struct tr_peer       * peer,
2303                tr_peer_callback     * callback,
2304                void                 * callbackData )
2305{
2306    tr_peermsgs * m;
2307
2308    assert( peer );
2309    assert( peer->io );
2310
2311    m = tr_new0( tr_peermsgs, 1 );
2312    m->callback = callback;
2313    m->callbackData = callbackData;
2314    m->peer = peer;
2315    m->torrent = torrent;
2316    m->peer->clientIsChoked = 1;
2317    m->peer->peerIsChoked = 1;
2318    m->peer->clientIsInterested = 0;
2319    m->peer->peerIsInterested = 0;
2320    m->state = AWAITING_BT_LENGTH;
2321    m->outMessages = evbuffer_new( );
2322    m->outMessagesBatchedAt = 0;
2323    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2324    m->incoming.block = evbuffer_new( );
2325    m->pexTimer = evtimer_new( torrent->session->event_base, pexPulse, m );
2326    peer->msgs = m;
2327    tr_timerAdd( m->pexTimer, PEX_INTERVAL_SECS, 0 );
2328
2329    if( tr_peerIoSupportsUTP( peer->io ) ) {
2330        const tr_address * addr = tr_peerIoGetAddress( peer->io, NULL );
2331        tr_peerMgrSetUtpSupported( torrent, addr );
2332        tr_peerMgrSetUtpFailed( torrent, addr, FALSE );
2333    }
2334
2335    if( tr_peerIoSupportsLTEP( peer->io ) )
2336        sendLtepHandshake( m );
2337
2338    tellPeerWhatWeHave( m );
2339
2340    if( tr_dhtEnabled( torrent->session ) && tr_peerIoSupportsDHT( peer->io ))
2341    {
2342        /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
2343        const struct tr_address *addr = tr_peerIoGetAddress( peer->io, NULL );
2344        if( addr->type == TR_AF_INET || tr_globalIPv6() ) {
2345            protocolSendPort( m, tr_dhtPort( torrent->session ) );
2346        }
2347    }
2348
2349    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2350    updateDesiredRequestCount( m );
2351
2352    return m;
2353}
2354
2355void
2356tr_peerMsgsFree( tr_peermsgs* msgs )
2357{
2358    if( msgs )
2359    {
2360        event_free( msgs->pexTimer );
2361
2362        evbuffer_free( msgs->incoming.block );
2363        evbuffer_free( msgs->outMessages );
2364        tr_free( msgs->pex6 );
2365        tr_free( msgs->pex );
2366
2367        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2368        tr_free( msgs );
2369    }
2370}
Note: See TracBrowser for help on using the repository browser.