source: trunk/libtransmission/peer-msgs.c @ 7419

Last change on this file since 7419 was 7419, checked in by charles, 12 years ago

(trunk libT) really fuck up the peer i/o code. also this breaks the mac build until someone removes iobuf.c from libtransmission's list of files.

  • Property svn:keywords set to Date Rev Author Id
File size: 65.2 KB
Line 
1/*
2 * This file Copyright (C) 2007-2008 Charles Kerr <charles@transmissionbt.com>
3 *
4 * This file is licensed by the GPL version 2.  Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-msgs.c 7419 2008-12-16 22:08:17Z charles $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <limits.h> /* INT_MAX */
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include <event.h>
21
22#include "transmission.h"
23#include "bencode.h"
24#include "completion.h"
25#include "crypto.h"
26#include "inout.h"
27#ifdef WIN32
28#include "net.h" /* for ECONN */
29#endif
30#include "peer-io.h"
31#include "peer-mgr.h"
32#include "peer-mgr-private.h"
33#include "peer-msgs.h"
34#include "ratecontrol.h"
35#include "stats.h"
36#include "torrent.h"
37#include "trevent.h"
38#include "utils.h"
39
40/**
41***
42**/
43
44enum
45{
46    BT_CHOKE                = 0,
47    BT_UNCHOKE              = 1,
48    BT_INTERESTED           = 2,
49    BT_NOT_INTERESTED       = 3,
50    BT_HAVE                 = 4,
51    BT_BITFIELD             = 5,
52    BT_REQUEST              = 6,
53    BT_PIECE                = 7,
54    BT_CANCEL               = 8,
55    BT_PORT                 = 9,
56
57    BT_FEXT_SUGGEST         = 13,
58    BT_FEXT_HAVE_ALL        = 14,
59    BT_FEXT_HAVE_NONE       = 15,
60    BT_FEXT_REJECT          = 16,
61    BT_FEXT_ALLOWED_FAST    = 17,
62
63    BT_LTEP                 = 20,
64
65    LTEP_HANDSHAKE          = 0,
66
67    TR_LTEP_PEX             = 1,
68
69
70
71    MIN_CHOKE_PERIOD_SEC    = ( 10 ),
72
73    /* idle seconds before we send a keepalive */
74    KEEPALIVE_INTERVAL_SECS = 100,
75
76    PEX_INTERVAL            = ( 90 * 1000 ), /* msec between sendPex() calls */
77    PEER_PULSE_INTERVAL     = ( 250 ),       /* msec between peerPulse() calls
78                                               */
79
80    MAX_QUEUE_SIZE          = ( 100 ),
81
82    /* how long an unsent request can stay queued before it's returned
83       back to the peer-mgr's pool of requests */
84    QUEUED_REQUEST_TTL_SECS = 20,
85
86    /* how long a sent request can stay queued before it's returned
87       back to the peer-mgr's pool of requests */
88    SENT_REQUEST_TTL_SECS = 240,
89
90    /* used in lowering the outMessages queue period */
91    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
92    HIGH_PRIORITY_INTERVAL_SECS = 2,
93    LOW_PRIORITY_INTERVAL_SECS = 20,
94
95    /* number of pieces to remove from the bitfield when
96     * lazy bitfields are turned on */
97    LAZY_PIECE_COUNT = 26,
98
99    /* number of pieces we'll allow in our fast set */
100    MAX_FAST_SET_SIZE = 3
101};
102
103/**
104***  REQUEST MANAGEMENT
105**/
106
107enum
108{
109    AWAITING_BT_LENGTH,
110    AWAITING_BT_ID,
111    AWAITING_BT_MESSAGE,
112    AWAITING_BT_PIECE
113};
114
115struct peer_request
116{
117    uint32_t    index;
118    uint32_t    offset;
119    uint32_t    length;
120    time_t      time_requested;
121};
122
123static int
124compareRequest( const void * va, const void * vb )
125{
126    const struct peer_request * a = va;
127    const struct peer_request * b = vb;
128
129    if( a->index != b->index )
130        return a->index < b->index ? -1 : 1;
131
132    if( a->offset != b->offset )
133        return a->offset < b->offset ? -1 : 1;
134
135    if( a->length != b->length )
136        return a->length < b->length ? -1 : 1;
137
138    return 0;
139}
140
141struct request_list
142{
143    uint16_t               count;
144    uint16_t               max;
145    struct peer_request *  requests;
146};
147
148static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
149
150static void
151reqListReserve( struct request_list * list,
152                uint16_t              max )
153{
154    if( list->max < max )
155    {
156        list->max = max;
157        list->requests = tr_renew( struct peer_request,
158                                   list->requests,
159                                   list->max );
160    }
161}
162
163static void
164reqListClear( struct request_list * list )
165{
166    tr_free( list->requests );
167    *list = REQUEST_LIST_INIT;
168}
169
170static void
171reqListCopy( struct request_list * dest, const struct request_list * src )
172{
173    dest->count = dest->max = src->count;
174    dest->requests = tr_memdup( src->requests, dest->count * sizeof( struct peer_request ) );
175}
176
177static void
178reqListRemoveOne( struct request_list * list,
179                  int                   i )
180{
181    assert( 0 <= i && i < list->count );
182
183    memmove( &list->requests[i],
184             &list->requests[i + 1],
185             sizeof( struct peer_request ) * ( --list->count - i ) );
186}
187
188static void
189reqListAppend( struct request_list *       list,
190               const struct peer_request * req )
191{
192    if( ++list->count >= list->max )
193        reqListReserve( list, list->max + 8 );
194
195    list->requests[list->count - 1] = *req;
196}
197
198static int
199reqListPop( struct request_list * list,
200            struct peer_request * setme )
201{
202    int success;
203
204    if( !list->count )
205        success = FALSE;
206    else {
207        *setme = list->requests[0];
208        reqListRemoveOne( list, 0 );
209        success = TRUE;
210    }
211
212    return success;
213}
214
215static int
216reqListFind( struct request_list *       list,
217             const struct peer_request * key )
218{
219    uint16_t i;
220
221    for( i = 0; i < list->count; ++i )
222        if( !compareRequest( key, list->requests + i ) )
223            return i;
224
225    return -1;
226}
227
228static int
229reqListRemove( struct request_list *       list,
230               const struct peer_request * key )
231{
232    int success;
233    const int i = reqListFind( list, key );
234
235    if( i < 0 )
236        success = FALSE;
237    else {
238        reqListRemoveOne( list, i );
239        success = TRUE;
240    }
241
242    return success;
243}
244
245/**
246***
247**/
248
249/* this is raw, unchanged data from the peer regarding
250 * the current message that it's sending us. */
251struct tr_incoming
252{
253    uint8_t                id;
254    uint32_t               length; /* includes the +1 for id length */
255    struct peer_request    blockReq; /* metadata for incoming blocks */
256    struct evbuffer *      block; /* piece data for incoming blocks */
257};
258
259/**
260 * Low-level communication state information about a connected peer.
261 *
262 * This structure remembers the low-level protocol states that we're
263 * in with this peer, such as active requests, pex messages, and so on.
264 * Its fields are all private to peer-msgs.c.
265 *
266 * Data not directly involved with sending & receiving messages is
267 * stored in tr_peer, where it can be accessed by both peermsgs and
268 * the peer manager.
269 *
270 * @see struct peer_atom
271 * @see tr_peer
272 */
273struct tr_peermsgs
274{
275    tr_bool         peerSentBitfield;
276    tr_bool         peerSupportsPex;
277    tr_bool         clientSentLtepHandshake;
278    tr_bool         peerSentLtepHandshake;
279    tr_bool         haveFastSet;
280
281    uint8_t         state;
282    uint8_t         ut_pex_id;
283    uint16_t        pexCount;
284    uint16_t        pexCount6;
285    uint16_t        minActiveRequests;
286    uint16_t        maxActiveRequests;
287
288    size_t                 fastsetSize;
289    tr_piece_index_t       fastset[MAX_FAST_SET_SIZE];
290
291    /* how long the outMessages batch should be allowed to grow before
292     * it's flushed -- some messages (like requests >:) should be sent
293     * very quickly; others aren't as urgent. */
294    int                    outMessagesBatchPeriod;
295
296    tr_peer *              peer;
297
298    tr_session *           session;
299    tr_torrent *           torrent;
300
301    tr_publisher_t *       publisher;
302
303    struct evbuffer *      outMessages; /* all the non-piece messages */
304
305    struct request_list    peerAskedFor;
306    struct request_list    clientAskedFor;
307    struct request_list    clientWillAskFor;
308
309    tr_timer             * pexTimer;
310    tr_pex               * pex;
311    tr_pex               * pex6;
312
313    time_t                 clientSentPexAt;
314    time_t                 clientSentAnythingAt;
315
316    /* when we started batching the outMessages */
317    time_t                outMessagesBatchedAt;
318
319    struct tr_incoming    incoming;
320
321    /* if the peer supports the Extension Protocol in BEP 10 and
322       supplied a reqq argument, it's stored here.  otherwise the
323       value is zero and should be ignored. */
324    int64_t               reqq;
325};
326
327/**
328***
329**/
330
331static void
332myDebug( const char * file, int line,
333         const struct tr_peermsgs * msgs,
334         const char * fmt, ... )
335{
336    FILE * fp = tr_getLog( );
337
338    if( fp )
339    {
340        va_list           args;
341        char              timestr[64];
342        struct evbuffer * buf = evbuffer_new( );
343        char *            base = tr_basename( file );
344
345        evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
346                             tr_getLogTimeStr( timestr, sizeof( timestr ) ),
347                             msgs->torrent->info.name,
348                             tr_peerIoGetAddrStr( msgs->peer->io ),
349                             msgs->peer->client );
350        va_start( args, fmt );
351        evbuffer_add_vprintf( buf, fmt, args );
352        va_end( args );
353        evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
354        fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
355
356        tr_free( base );
357        evbuffer_free( buf );
358    }
359}
360
361#define dbgmsg( msgs, ... ) \
362    do { \
363        if( tr_deepLoggingIsActive( ) ) \
364            myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
365    } while( 0 )
366
367/**
368***
369**/
370
371static void
372pokeBatchPeriod( tr_peermsgs * msgs,
373                 int           interval )
374{
375    if( msgs->outMessagesBatchPeriod > interval )
376    {
377        msgs->outMessagesBatchPeriod = interval;
378        dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
379    }
380}
381
382static void
383dbgOutMessageLen( tr_peermsgs * msgs )
384{
385    dbgmsg( msgs, "outMessage size is now %zu", EVBUFFER_LENGTH( msgs->outMessages ) );
386}
387
388static void
389protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
390{
391    tr_peerIo       * io  = msgs->peer->io;
392    struct evbuffer * out = msgs->outMessages;
393
394    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
395
396    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
397    tr_peerIoWriteUint8 ( io, out, BT_FEXT_REJECT );
398    tr_peerIoWriteUint32( io, out, req->index );
399    tr_peerIoWriteUint32( io, out, req->offset );
400    tr_peerIoWriteUint32( io, out, req->length );
401
402    dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
403    dbgOutMessageLen( msgs );
404}
405
406static void
407protocolSendRequest( tr_peermsgs *               msgs,
408                     const struct peer_request * req )
409{
410    tr_peerIo       * io  = msgs->peer->io;
411    struct evbuffer * out = msgs->outMessages;
412
413    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
414    tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
415    tr_peerIoWriteUint32( io, out, req->index );
416    tr_peerIoWriteUint32( io, out, req->offset );
417    tr_peerIoWriteUint32( io, out, req->length );
418
419    dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
420    dbgOutMessageLen( msgs );
421    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
422}
423
424static void
425protocolSendCancel( tr_peermsgs *               msgs,
426                    const struct peer_request * req )
427{
428    tr_peerIo       * io  = msgs->peer->io;
429    struct evbuffer * out = msgs->outMessages;
430
431    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
432    tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
433    tr_peerIoWriteUint32( io, out, req->index );
434    tr_peerIoWriteUint32( io, out, req->offset );
435    tr_peerIoWriteUint32( io, out, req->length );
436
437    dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
438    dbgOutMessageLen( msgs );
439    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
440}
441
442static void
443protocolSendHave( tr_peermsgs * msgs,
444                  uint32_t      index )
445{
446    tr_peerIo       * io  = msgs->peer->io;
447    struct evbuffer * out = msgs->outMessages;
448
449    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
450    tr_peerIoWriteUint8 ( io, out, BT_HAVE );
451    tr_peerIoWriteUint32( io, out, index );
452
453    dbgmsg( msgs, "sending Have %u...", index );
454    dbgOutMessageLen( msgs );
455    pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
456}
457
458#if 0
459static void
460protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
461{
462    tr_peerIo       * io  = msgs->peer->io;
463    struct evbuffer * out = msgs->outMessages;
464
465    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
466
467    tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
468    tr_peerIoWriteUint8 ( io, out, BT_FEXT_ALLOWED_FAST );
469    tr_peerIoWriteUint32( io, out, pieceIndex );
470
471    dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
472    dbgOutMessageLen( msgs );
473}
474#endif
475
476static void
477protocolSendChoke( tr_peermsgs * msgs,
478                   int           choke )
479{
480    tr_peerIo       * io  = msgs->peer->io;
481    struct evbuffer * out = msgs->outMessages;
482
483    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
484    tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
485
486    dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
487    dbgOutMessageLen( msgs );
488    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
489}
490
491static void
492protocolSendHaveAll( tr_peermsgs * msgs )
493{
494    tr_peerIo       * io  = msgs->peer->io;
495    struct evbuffer * out = msgs->outMessages;
496
497    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
498
499    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
500    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_ALL );
501
502    dbgmsg( msgs, "sending HAVE_ALL..." );
503    dbgOutMessageLen( msgs );
504    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
505}
506
507static void
508protocolSendHaveNone( tr_peermsgs * msgs )
509{
510    tr_peerIo       * io  = msgs->peer->io;
511    struct evbuffer * out = msgs->outMessages;
512
513    assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
514
515    tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
516    tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_NONE );
517
518    dbgmsg( msgs, "sending HAVE_NONE..." );
519    dbgOutMessageLen( msgs );
520    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
521}
522
523/**
524***  EVENTS
525**/
526
527static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0, 0 };
528
529static void
530publish( tr_peermsgs * msgs, tr_peer_event * e )
531{
532    assert( msgs->peer );
533    assert( msgs->peer->msgs == msgs );
534
535    tr_publisherPublish( msgs->publisher, msgs->peer, e );
536}
537
538static void
539fireError( tr_peermsgs * msgs, int err )
540{
541    tr_peer_event e = blankEvent;
542    e.eventType = TR_PEER_ERROR;
543    e.err = err;
544    publish( msgs, &e );
545}
546
547static void
548fireUploadOnly( tr_peermsgs * msgs, tr_bool uploadOnly )
549{
550    tr_peer_event e = blankEvent;
551    e.eventType = TR_PEER_UPLOAD_ONLY;
552    e.uploadOnly = uploadOnly;
553    publish( msgs, &e );
554}
555
556static void
557fireNeedReq( tr_peermsgs * msgs )
558{
559    tr_peer_event e = blankEvent;
560    e.eventType = TR_PEER_NEED_REQ;
561    publish( msgs, &e );
562}
563
564static void
565firePeerProgress( tr_peermsgs * msgs )
566{
567    tr_peer_event e = blankEvent;
568    e.eventType = TR_PEER_PEER_PROGRESS;
569    e.progress = msgs->peer->progress;
570    publish( msgs, &e );
571}
572
573static void
574fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
575{
576    tr_peer_event e = blankEvent;
577    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
578    e.pieceIndex = req->index;
579    e.offset = req->offset;
580    e.length = req->length;
581    publish( msgs, &e );
582}
583
584static void
585fireClientGotData( tr_peermsgs * msgs,
586                   uint32_t      length,
587                   int           wasPieceData )
588{
589    tr_peer_event e = blankEvent;
590
591    e.length = length;
592    e.eventType = TR_PEER_CLIENT_GOT_DATA;
593    e.wasPieceData = wasPieceData;
594    publish( msgs, &e );
595}
596
597static void
598fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
599{
600    tr_peer_event e = blankEvent;
601    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
602    e.pieceIndex = pieceIndex;
603    publish( msgs, &e );
604}
605
606static void
607fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
608{
609    tr_peer_event e = blankEvent;
610    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
611    e.pieceIndex = pieceIndex;
612    publish( msgs, &e );
613}
614
615static void
616firePeerGotData( tr_peermsgs  * msgs,
617                 uint32_t       length,
618                 int            wasPieceData )
619{
620    tr_peer_event e = blankEvent;
621
622    e.length = length;
623    e.eventType = TR_PEER_PEER_GOT_DATA;
624    e.wasPieceData = wasPieceData;
625
626    publish( msgs, &e );
627}
628
629static void
630fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
631{
632    tr_peer_event e = blankEvent;
633    e.eventType = TR_PEER_CANCEL;
634    e.pieceIndex = req->index;
635    e.offset = req->offset;
636    e.length = req->length;
637    publish( msgs, &e );
638}
639
640/**
641***  ALLOWED FAST SET
642***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
643**/
644
645size_t
646tr_generateAllowedSet( tr_piece_index_t * setmePieces,
647                       size_t             desiredSetSize,
648                       size_t             pieceCount,
649                       const uint8_t    * infohash,
650                       const tr_address * addr )
651{
652    size_t setSize = 0;
653
654    assert( setmePieces );
655    assert( desiredSetSize <= pieceCount );
656    assert( desiredSetSize );
657    assert( pieceCount );
658    assert( infohash );
659    assert( addr );
660
661    if( addr->type == TR_AF_INET )
662    {
663        uint8_t w[SHA_DIGEST_LENGTH + 4];
664        uint8_t x[SHA_DIGEST_LENGTH];
665
666        *(uint32_t*)w = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 );   /* (1) */
667        memcpy( w + 4, infohash, SHA_DIGEST_LENGTH );                /* (2) */
668        tr_sha1( x, w, sizeof( w ), NULL );                          /* (3) */
669
670        while( setSize<desiredSetSize )
671        {
672            int i;
673            for( i=0; i<5 && setSize<desiredSetSize; ++i )           /* (4) */
674            {
675                size_t k;
676                uint32_t j = i * 4;                                  /* (5) */
677                uint32_t y = ntohl( *( uint32_t* )( x + j ) );       /* (6) */
678                uint32_t index = y % pieceCount;                     /* (7) */
679
680                for( k=0; k<setSize; ++k )                           /* (8) */
681                    if( setmePieces[k] == index )
682                        break;
683
684                if( k == setSize )
685                    setmePieces[setSize++] = index;                  /* (9) */
686            }
687
688            tr_sha1( x, x, sizeof( x ), NULL );                      /* (3) */
689        }
690    }
691
692    return setSize;
693}
694
695static void
696updateFastSet( tr_peermsgs * msgs UNUSED )
697{
698#if 0
699    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
700    const int peerIsNeedy = msgs->peer->progress < 0.10;
701
702    if( fext && peerIsNeedy && !msgs->haveFastSet )
703    {
704        size_t i;
705        const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
706        const tr_info * inf = &msgs->torrent->info;
707        const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
708
709        /* build the fast set */
710        msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
711        msgs->haveFastSet = 1;
712
713        /* send it to the peer */
714        for( i=0; i<msgs->fastsetSize; ++i )
715            protocolSendAllowedFast( msgs, msgs->fastset[i] );
716    }
717#endif
718}
719
720/**
721***  INTEREST
722**/
723
724static int
725isPieceInteresting( const tr_peermsgs * msgs,
726                    tr_piece_index_t    piece )
727{
728    const tr_torrent * torrent = msgs->torrent;
729
730    return ( !torrent->info.pieces[piece].dnd )                 /* we want it */
731          && ( !tr_cpPieceIsComplete( torrent->completion, piece ) ) /* !have */
732          && ( tr_bitfieldHas( msgs->peer->have, piece ) );    /* peer has it */
733}
734
735/* "interested" means we'll ask for piece data if they unchoke us */
736static int
737isPeerInteresting( const tr_peermsgs * msgs )
738{
739    tr_piece_index_t    i;
740    const tr_torrent *  torrent;
741    const tr_bitfield * bitfield;
742    const int           clientIsSeed = tr_torrentIsSeed( msgs->torrent );
743
744    if( clientIsSeed )
745        return FALSE;
746
747    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
748        return FALSE;
749
750    torrent = msgs->torrent;
751    bitfield = tr_cpPieceBitfield( torrent->completion );
752
753    if( !msgs->peer->have )
754        return TRUE;
755
756    assert( bitfield->byteCount == msgs->peer->have->byteCount );
757
758    for( i = 0; i < torrent->info.pieceCount; ++i )
759        if( isPieceInteresting( msgs, i ) )
760            return TRUE;
761
762    return FALSE;
763}
764
765static void
766sendInterest( tr_peermsgs * msgs,
767              int           weAreInterested )
768{
769    struct evbuffer * out = msgs->outMessages;
770
771    assert( msgs );
772    assert( weAreInterested == 0 || weAreInterested == 1 );
773
774    msgs->peer->clientIsInterested = weAreInterested;
775    dbgmsg( msgs, "Sending %s", weAreInterested ? "Interested" : "Not Interested" );
776    tr_peerIoWriteUint32( msgs->peer->io, out, sizeof( uint8_t ) );
777    tr_peerIoWriteUint8 ( msgs->peer->io, out, weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
778    pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
779    dbgOutMessageLen( msgs );
780}
781
782static void
783updateInterest( tr_peermsgs * msgs )
784{
785    const int i = isPeerInteresting( msgs );
786
787    if( i != msgs->peer->clientIsInterested )
788        sendInterest( msgs, i );
789    if( i )
790        fireNeedReq( msgs );
791}
792
793static int
794popNextRequest( tr_peermsgs *         msgs,
795                struct peer_request * setme )
796{
797    return reqListPop( &msgs->peerAskedFor, setme );
798}
799
800static void
801cancelAllRequestsToClient( tr_peermsgs * msgs )
802{
803    struct peer_request req;
804    const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
805
806    while( popNextRequest( msgs, &req ) )
807        if( mustSendCancel )
808            protocolSendReject( msgs, &req );
809}
810
811void
812tr_peerMsgsSetChoke( tr_peermsgs * msgs,
813                     int           choke )
814{
815    const time_t now = time( NULL );
816    const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
817
818    assert( msgs );
819    assert( msgs->peer );
820    assert( choke == 0 || choke == 1 );
821
822    if( msgs->peer->chokeChangedAt > fibrillationTime )
823    {
824        dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
825    }
826    else if( msgs->peer->peerIsChoked != choke )
827    {
828        msgs->peer->peerIsChoked = choke;
829        if( choke )
830            cancelAllRequestsToClient( msgs );
831        protocolSendChoke( msgs, choke );
832        msgs->peer->chokeChangedAt = now;
833    }
834}
835
836/**
837***
838**/
839
840void
841tr_peerMsgsHave( tr_peermsgs * msgs,
842                 uint32_t      index )
843{
844    protocolSendHave( msgs, index );
845
846    /* since we have more pieces now, we might not be interested in this peer */
847    updateInterest( msgs );
848}
849
850/**
851***
852**/
853
854static int
855reqIsValid( const tr_peermsgs * peer,
856            uint32_t            index,
857            uint32_t            offset,
858            uint32_t            length )
859{
860    return tr_torrentReqIsValid( peer->torrent, index, offset, length );
861}
862
863static int
864requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
865{
866    return reqIsValid( msgs, req->index, req->offset, req->length );
867}
868
869static void
870expireOldRequests( tr_peermsgs * msgs, const time_t now  )
871{
872    int i;
873    time_t oldestAllowed;
874    struct request_list tmp = REQUEST_LIST_INIT;
875    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
876    dbgmsg( msgs, "entering `expire old requests' block" );
877
878    /* cancel requests that have been queued for too long */
879    oldestAllowed = now - QUEUED_REQUEST_TTL_SECS;
880    reqListCopy( &tmp, &msgs->clientWillAskFor );
881    for( i = 0; i < tmp.count; ++i ) {
882        const struct peer_request * req = &tmp.requests[i];
883        if( req->time_requested < oldestAllowed )
884            tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
885    }
886    reqListClear( &tmp );
887
888    /* if the peer doesn't support "Reject Request",
889     * cancel requests that were sent too long ago. */
890    if( !fext ) {
891        oldestAllowed = now - SENT_REQUEST_TTL_SECS;
892        reqListCopy( &tmp, &msgs->clientAskedFor );
893        for( i=0; i<tmp.count; ++i ) {
894            const struct peer_request * req = &tmp.requests[i];
895            if( req->time_requested < oldestAllowed )
896                tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
897        }
898        reqListClear( &tmp );
899    }
900
901    dbgmsg( msgs, "leaving `expire old requests' block" );
902}
903
904static void
905pumpRequestQueue( tr_peermsgs * msgs, const time_t now )
906{
907    const int           max = msgs->maxActiveRequests;
908    const int           min = msgs->minActiveRequests;
909    int                 sent = 0;
910    int                 count = msgs->clientAskedFor.count;
911    struct peer_request req;
912
913    if( count > min )
914        return;
915    if( msgs->peer->clientIsChoked )
916        return;
917    if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
918        return;
919
920    while( ( count < max ) && reqListPop( &msgs->clientWillAskFor, &req ) )
921    {
922        const tr_block_index_t block = _tr_block( msgs->torrent, req.index, req.offset );
923
924        assert( requestIsValid( msgs, &req ) );
925        assert( tr_bitfieldHas( msgs->peer->have, req.index ) );
926
927        /* don't ask for it if we've already got it... this block may have
928         * come in from a different peer after we cancelled a request for it */
929        if( !tr_cpBlockIsComplete( msgs->torrent->completion, block ) )
930        {
931            protocolSendRequest( msgs, &req );
932            req.time_requested = now;
933            reqListAppend( &msgs->clientAskedFor, &req );
934
935            ++count;
936            ++sent;
937        }
938    }
939
940    if( sent )
941        dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
942                sent,
943                msgs->clientAskedFor.count,
944                msgs->clientWillAskFor.count );
945
946    if( count < max )
947        fireNeedReq( msgs );
948}
949
950static int
951requestQueueIsFull( const tr_peermsgs * msgs )
952{
953    const int req_max = msgs->maxActiveRequests;
954    return msgs->clientWillAskFor.count >= req_max;
955}
956
957tr_addreq_t
958tr_peerMsgsAddRequest( tr_peermsgs *    msgs,
959                       uint32_t         index,
960                       uint32_t         offset,
961                       uint32_t         length,
962                       int              doForce )
963{
964    struct peer_request req;
965
966    assert( msgs );
967    assert( msgs->torrent );
968    assert( reqIsValid( msgs, index, offset, length ) );
969
970    /**
971    ***  Reasons to decline the request
972    **/
973
974    /* don't send requests to choked clients */
975    if( !doForce && msgs->peer->clientIsChoked ) {
976        dbgmsg( msgs, "declining request because they're choking us" );
977        return TR_ADDREQ_CLIENT_CHOKED;
978    }
979
980    /* peer doesn't have this piece */
981    if( !doForce && !tr_bitfieldHas( msgs->peer->have, index ) )
982        return TR_ADDREQ_MISSING;
983
984    /* peer's queue is full */
985    if( !doForce && requestQueueIsFull( msgs ) ) {
986        dbgmsg( msgs, "declining request because we're full" );
987        return TR_ADDREQ_FULL;
988    }
989
990    /* have we already asked for this piece? */
991    req.index = index;
992    req.offset = offset;
993    req.length = length;
994    if( !doForce ) {
995        if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
996            dbgmsg( msgs, "declining because it's a duplicate" );
997            return TR_ADDREQ_DUPLICATE;
998        }
999        if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
1000            dbgmsg( msgs, "declining because it's a duplicate" );
1001            return TR_ADDREQ_DUPLICATE;
1002        }
1003    }
1004
1005    /**
1006    ***  Accept this request
1007    **/
1008
1009    dbgmsg( msgs, "adding req for %"PRIu32":%"PRIu32"->%"PRIu32" to our `will request' list",
1010            index, offset, length );
1011    req.time_requested = time( NULL );
1012    reqListAppend( &msgs->clientWillAskFor, &req );
1013    return TR_ADDREQ_OK;
1014}
1015
1016static void
1017cancelAllRequestsToPeer( tr_peermsgs * msgs, tr_bool sendCancel )
1018{
1019    int i;
1020    struct request_list a = msgs->clientWillAskFor;
1021    struct request_list b = msgs->clientAskedFor;
1022    dbgmsg( msgs, "cancelling all requests to peer" );
1023
1024    msgs->clientAskedFor = REQUEST_LIST_INIT;
1025    msgs->clientWillAskFor = REQUEST_LIST_INIT;
1026
1027    for( i=0; i<a.count; ++i )
1028        fireCancelledReq( msgs, &a.requests[i] );
1029
1030    for( i = 0; i < b.count; ++i ) {
1031        fireCancelledReq( msgs, &b.requests[i] );
1032        if( sendCancel )
1033            protocolSendCancel( msgs, &b.requests[i] );
1034    }
1035
1036    reqListClear( &a );
1037    reqListClear( &b );
1038}
1039
1040void
1041tr_peerMsgsCancel( tr_peermsgs * msgs,
1042                   uint32_t      pieceIndex,
1043                   uint32_t      offset,
1044                   uint32_t      length )
1045{
1046    struct peer_request req;
1047
1048    assert( msgs != NULL );
1049    assert( length > 0 );
1050
1051
1052    /* have we asked the peer for this piece? */
1053    req.index = pieceIndex;
1054    req.offset = offset;
1055    req.length = length;
1056
1057    /* if it's only in the queue and hasn't been sent yet, free it */
1058    if( reqListRemove( &msgs->clientWillAskFor, &req ) ) {
1059        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length );
1060        fireCancelledReq( msgs, &req );
1061    }
1062
1063    /* if it's already been sent, send a cancel message too */
1064    if( reqListRemove( &msgs->clientAskedFor, &req ) ) {
1065        dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length );
1066        protocolSendCancel( msgs, &req );
1067        fireCancelledReq( msgs, &req );
1068    }
1069}
1070
1071
1072/**
1073***
1074**/
1075
1076static void
1077sendLtepHandshake( tr_peermsgs * msgs )
1078{
1079    tr_benc val, *m;
1080    char * buf;
1081    int len;
1082    int pex;
1083    struct evbuffer * out = msgs->outMessages;
1084
1085    if( msgs->clientSentLtepHandshake )
1086        return;
1087
1088    dbgmsg( msgs, "sending an ltep handshake" );
1089    msgs->clientSentLtepHandshake = 1;
1090
1091    /* decide if we want to advertise pex support */
1092    if( !tr_torrentAllowsPex( msgs->torrent ) )
1093        pex = 0;
1094    else if( msgs->peerSentLtepHandshake )
1095        pex = msgs->peerSupportsPex ? 1 : 0;
1096    else
1097        pex = 1;
1098
1099    tr_bencInitDict( &val, 5 );
1100    tr_bencDictAddInt( &val, "e", msgs->session->encryptionMode != TR_CLEAR_PREFERRED );
1101    tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( msgs->session ) );
1102    tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
1103    tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
1104    m  = tr_bencDictAddDict( &val, "m", 1 );
1105    if( pex )
1106        tr_bencDictAddInt( m, "ut_pex", TR_LTEP_PEX );
1107    buf = tr_bencSave( &val, &len );
1108
1109    tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + len );
1110    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
1111    tr_peerIoWriteUint8 ( msgs->peer->io, out, LTEP_HANDSHAKE );
1112    tr_peerIoWriteBytes ( msgs->peer->io, out, buf, len );
1113    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1114    dbgOutMessageLen( msgs );
1115
1116    /* cleanup */
1117    tr_bencFree( &val );
1118    tr_free( buf );
1119}
1120
1121static void
1122parseLtepHandshake( tr_peermsgs *     msgs,
1123                    int               len,
1124                    struct evbuffer * inbuf )
1125{
1126    int64_t   i;
1127    tr_benc   val, * sub;
1128    uint8_t * tmp = tr_new( uint8_t, len );
1129
1130    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
1131    msgs->peerSentLtepHandshake = 1;
1132
1133    if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
1134    {
1135        dbgmsg( msgs, "GET  extended-handshake, couldn't get dictionary" );
1136        tr_free( tmp );
1137        return;
1138    }
1139
1140    dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len,  tmp );
1141
1142    /* does the peer prefer encrypted connections? */
1143    if( tr_bencDictFindInt( &val, "e", &i ) )
1144        msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
1145                                              : ENCRYPTION_PREFERENCE_NO;
1146
1147    /* check supported messages for utorrent pex */
1148    msgs->peerSupportsPex = 0;
1149    if( tr_bencDictFindDict( &val, "m", &sub ) ) {
1150        if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
1151            msgs->ut_pex_id = (uint8_t) i;
1152            msgs->peerSupportsPex = msgs->ut_pex_id == 0 ? 0 : 1;
1153            dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
1154        }
1155    }
1156
1157    /* look for upload_only (BEP 21) */
1158    if( tr_bencDictFindInt( &val, "upload_only", &i ) )
1159        fireUploadOnly( msgs, i!=0 );
1160
1161    /* get peer's listening port */
1162    if( tr_bencDictFindInt( &val, "p", &i ) ) {
1163        msgs->peer->port = htons( (uint16_t)i );
1164        dbgmsg( msgs, "msgs->port is now %hu", msgs->peer->port );
1165    }
1166
1167    /* get peer's maximum request queue size */
1168    if( tr_bencDictFindInt( &val, "reqq", &i ) )
1169        msgs->reqq = i;
1170
1171    tr_bencFree( &val );
1172    tr_free( tmp );
1173}
1174
1175static void
1176parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1177{
1178    int loaded = 0;
1179    uint8_t * tmp = tr_new( uint8_t, msglen );
1180    tr_benc val;
1181    const tr_torrent * tor = msgs->torrent;
1182    const uint8_t * added;
1183    size_t added_len;
1184
1185    tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1186
1187    if( tr_torrentAllowsPex( tor )
1188      && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1189    {
1190        if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1191        {
1192            const uint8_t * added_f = NULL;
1193            tr_pex *        pex;
1194            size_t          i, n;
1195            size_t          added_f_len = 0;
1196            tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1197            pex =
1198                tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len,
1199                                        &n );
1200            for( i = 0; i < n; ++i )
1201                tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1202                                  TR_PEER_FROM_PEX, pex + i );
1203            tr_free( pex );
1204        }
1205       
1206        if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1207        {
1208            const uint8_t * added_f = NULL;
1209            tr_pex *        pex;
1210            size_t          i, n;
1211            size_t          added_f_len = 0;
1212            tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1213            pex =
1214                tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len,
1215                                         &n );
1216            for( i = 0; i < n; ++i )
1217                tr_peerMgrAddPex( msgs->session->peerMgr, tor->info.hash,
1218                                  TR_PEER_FROM_PEX, pex + i );
1219            tr_free( pex );
1220        }
1221       
1222    }
1223
1224    if( loaded )
1225        tr_bencFree( &val );
1226    tr_free( tmp );
1227}
1228
1229static void sendPex( tr_peermsgs * msgs );
1230
1231static void
1232parseLtep( tr_peermsgs *     msgs,
1233           int               msglen,
1234           struct evbuffer * inbuf )
1235{
1236    uint8_t ltep_msgid;
1237
1238    tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1239    msglen--;
1240
1241    if( ltep_msgid == LTEP_HANDSHAKE )
1242    {
1243        dbgmsg( msgs, "got ltep handshake" );
1244        parseLtepHandshake( msgs, msglen, inbuf );
1245        if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1246        {
1247            sendLtepHandshake( msgs );
1248            sendPex( msgs );
1249        }
1250    }
1251    else if( ltep_msgid == TR_LTEP_PEX )
1252    {
1253        dbgmsg( msgs, "got ut pex" );
1254        msgs->peerSupportsPex = 1;
1255        parseUtPex( msgs, msglen, inbuf );
1256    }
1257    else
1258    {
1259        dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1260        evbuffer_drain( inbuf, msglen );
1261    }
1262}
1263
1264static int
1265readBtLength( tr_peermsgs *     msgs,
1266              struct evbuffer * inbuf,
1267              size_t            inlen )
1268{
1269    uint32_t len;
1270
1271    if( inlen < sizeof( len ) )
1272        return READ_LATER;
1273
1274    tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1275
1276    if( len == 0 ) /* peer sent us a keepalive message */
1277        dbgmsg( msgs, "got KeepAlive" );
1278    else
1279    {
1280        msgs->incoming.length = len;
1281        msgs->state = AWAITING_BT_ID;
1282    }
1283
1284    return READ_NOW;
1285}
1286
1287static int readBtMessage( tr_peermsgs *     msgs,
1288                          struct evbuffer * inbuf,
1289                          size_t            inlen );
1290
1291static int
1292readBtId( tr_peermsgs *     msgs,
1293          struct evbuffer * inbuf,
1294          size_t            inlen )
1295{
1296    uint8_t id;
1297
1298    if( inlen < sizeof( uint8_t ) )
1299        return READ_LATER;
1300
1301    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1302    msgs->incoming.id = id;
1303
1304    if( id == BT_PIECE )
1305    {
1306        msgs->state = AWAITING_BT_PIECE;
1307        return READ_NOW;
1308    }
1309    else if( msgs->incoming.length != 1 )
1310    {
1311        msgs->state = AWAITING_BT_MESSAGE;
1312        return READ_NOW;
1313    }
1314    else return readBtMessage( msgs, inbuf, inlen - 1 );
1315}
1316
1317static void
1318updatePeerProgress( tr_peermsgs * msgs )
1319{
1320    msgs->peer->progress = tr_bitfieldCountTrueBits( msgs->peer->have ) / (float)msgs->torrent->info.pieceCount;
1321    dbgmsg( msgs, "peer progress is %f", msgs->peer->progress );
1322    updateFastSet( msgs );
1323    updateInterest( msgs );
1324    firePeerProgress( msgs );
1325}
1326
1327static void
1328peerMadeRequest( tr_peermsgs *               msgs,
1329                 const struct peer_request * req )
1330{
1331    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1332    const int reqIsValid = requestIsValid( msgs, req );
1333    const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
1334    const int peerIsChoked = msgs->peer->peerIsChoked;
1335
1336    int allow = FALSE;
1337
1338    if( !reqIsValid )
1339        dbgmsg( msgs, "rejecting an invalid request." );
1340    else if( !clientHasPiece )
1341        dbgmsg( msgs, "rejecting request for a piece we don't have." );
1342    else if( peerIsChoked )
1343        dbgmsg( msgs, "rejecting request from choked peer" );
1344    else
1345        allow = TRUE;
1346
1347    if( allow )
1348        reqListAppend( &msgs->peerAskedFor, req );
1349    else if( fext )
1350        protocolSendReject( msgs, req );
1351}
1352
1353static int
1354messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1355{
1356    switch( id )
1357    {
1358        case BT_CHOKE:
1359        case BT_UNCHOKE:
1360        case BT_INTERESTED:
1361        case BT_NOT_INTERESTED:
1362        case BT_FEXT_HAVE_ALL:
1363        case BT_FEXT_HAVE_NONE:
1364            return len == 1;
1365
1366        case BT_HAVE:
1367        case BT_FEXT_SUGGEST:
1368        case BT_FEXT_ALLOWED_FAST:
1369            return len == 5;
1370
1371        case BT_BITFIELD:
1372            return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1373
1374        case BT_REQUEST:
1375        case BT_CANCEL:
1376        case BT_FEXT_REJECT:
1377            return len == 13;
1378
1379        case BT_PIECE:
1380            return len > 9 && len <= 16393;
1381
1382        case BT_PORT:
1383            return len == 3;
1384
1385        case BT_LTEP:
1386            return len >= 2;
1387
1388        default:
1389            return FALSE;
1390    }
1391}
1392
1393static int clientGotBlock( tr_peermsgs *               msgs,
1394                           const uint8_t *             block,
1395                           const struct peer_request * req );
1396
1397static int
1398readBtPiece( tr_peermsgs      * msgs,
1399             struct evbuffer  * inbuf,
1400             size_t             inlen,
1401             size_t           * setme_piece_bytes_read )
1402{
1403    struct peer_request * req = &msgs->incoming.blockReq;
1404
1405    assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1406    dbgmsg( msgs, "In readBtPiece" );
1407
1408    if( !req->length )
1409    {
1410        if( inlen < 8 )
1411            return READ_LATER;
1412
1413        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1414        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1415        req->length = msgs->incoming.length - 9;
1416        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1417        return READ_NOW;
1418    }
1419    else
1420    {
1421        int err;
1422
1423        /* read in another chunk of data */
1424        const size_t nLeft = req->length - EVBUFFER_LENGTH( msgs->incoming.block );
1425        size_t n = MIN( nLeft, inlen );
1426        uint8_t * buf = tr_new( uint8_t, n );
1427        assert( EVBUFFER_LENGTH( inbuf ) >= n );
1428        tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, n );
1429        evbuffer_add( msgs->incoming.block, buf, n );
1430        fireClientGotData( msgs, n, TRUE );
1431        *setme_piece_bytes_read += n;
1432        tr_free( buf );
1433        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1434               n, req->index, req->offset, req->length,
1435               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1436        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1437            return READ_LATER;
1438
1439        /* we've got the whole block ... process it */
1440        err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), req );
1441
1442        /* cleanup */
1443        evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH( msgs->incoming.block ) );
1444        req->length = 0;
1445        msgs->state = AWAITING_BT_LENGTH;
1446        if( !err )
1447            return READ_NOW;
1448        else {
1449            fireError( msgs, err );
1450            return READ_ERR;
1451        }
1452    }
1453}
1454
1455static int
1456readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1457{
1458    uint32_t      ui32;
1459    uint32_t      msglen = msgs->incoming.length;
1460    const uint8_t id = msgs->incoming.id;
1461    const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
1462    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1463
1464    --msglen; /* id length */
1465
1466    if( inlen < msglen )
1467        return READ_LATER;
1468
1469    dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1470
1471    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1472    {
1473        dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1474        fireError( msgs, EMSGSIZE );
1475        return READ_ERR;
1476    }
1477
1478    switch( id )
1479    {
1480        case BT_CHOKE:
1481            dbgmsg( msgs, "got Choke" );
1482            msgs->peer->clientIsChoked = 1;
1483            if( !fext )
1484                cancelAllRequestsToPeer( msgs, FALSE );
1485            break;
1486
1487        case BT_UNCHOKE:
1488            dbgmsg( msgs, "got Unchoke" );
1489            msgs->peer->clientIsChoked = 0;
1490            fireNeedReq( msgs );
1491            break;
1492
1493        case BT_INTERESTED:
1494            dbgmsg( msgs, "got Interested" );
1495            msgs->peer->peerIsInterested = 1;
1496            break;
1497
1498        case BT_NOT_INTERESTED:
1499            dbgmsg( msgs, "got Not Interested" );
1500            msgs->peer->peerIsInterested = 0;
1501            break;
1502
1503        case BT_HAVE:
1504            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1505            dbgmsg( msgs, "got Have: %u", ui32 );
1506            if( tr_bitfieldAdd( msgs->peer->have, ui32 ) ) {
1507                fireError( msgs, ERANGE );
1508                return READ_ERR;
1509            }
1510            updatePeerProgress( msgs );
1511            tr_rcTransferred( msgs->torrent->swarmSpeed,
1512                              msgs->torrent->info.pieceSize );
1513            break;
1514
1515        case BT_BITFIELD:
1516        {
1517            dbgmsg( msgs, "got a bitfield" );
1518            msgs->peerSentBitfield = 1;
1519            tr_peerIoReadBytes( msgs->peer->io, inbuf, msgs->peer->have->bits, msglen );
1520            updatePeerProgress( msgs );
1521            fireNeedReq( msgs );
1522            break;
1523        }
1524
1525        case BT_REQUEST:
1526        {
1527            struct peer_request r;
1528            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1529            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1530            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1531            dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1532            peerMadeRequest( msgs, &r );
1533            break;
1534        }
1535
1536        case BT_CANCEL:
1537        {
1538            struct peer_request r;
1539            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1540            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1541            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1542            dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1543            if( reqListRemove( &msgs->peerAskedFor, &r ) && fext )
1544                protocolSendReject( msgs, &r );
1545            break;
1546        }
1547
1548        case BT_PIECE:
1549            assert( 0 ); /* handled elsewhere! */
1550            break;
1551
1552        case BT_PORT:
1553            dbgmsg( msgs, "Got a BT_PORT" );
1554            tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->port );
1555            break;
1556
1557        case BT_FEXT_SUGGEST:
1558            dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1559            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1560            if( fext )
1561                fireClientGotSuggest( msgs, ui32 );
1562            else {
1563                fireError( msgs, EMSGSIZE );
1564                return READ_ERR;
1565            }
1566            break;
1567
1568        case BT_FEXT_ALLOWED_FAST:
1569            dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1570            tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1571            if( fext )
1572                fireClientGotAllowedFast( msgs, ui32 );
1573            else {
1574                fireError( msgs, EMSGSIZE );
1575                return READ_ERR;
1576            }
1577            break;
1578
1579        case BT_FEXT_HAVE_ALL:
1580            dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1581            if( fext ) {
1582                tr_bitfieldAddRange( msgs->peer->have, 0, msgs->torrent->info.pieceCount );
1583                updatePeerProgress( msgs );
1584            } else {
1585                fireError( msgs, EMSGSIZE );
1586                return READ_ERR;
1587            }
1588            break;
1589
1590
1591        case BT_FEXT_HAVE_NONE:
1592            dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1593            if( fext ) {
1594                tr_bitfieldClear( msgs->peer->have );
1595                updatePeerProgress( msgs );
1596            } else {
1597                fireError( msgs, EMSGSIZE );
1598                return READ_ERR;
1599            }
1600            break;
1601
1602        case BT_FEXT_REJECT:
1603        {
1604            struct peer_request r;
1605            dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1606            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1607            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1608            tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1609            if( fext )
1610                reqListRemove( &msgs->clientAskedFor, &r );
1611            else {
1612                fireError( msgs, EMSGSIZE );
1613                return READ_ERR;
1614            }
1615            break;
1616        }
1617
1618        case BT_LTEP:
1619            dbgmsg( msgs, "Got a BT_LTEP" );
1620            parseLtep( msgs, msglen, inbuf );
1621            break;
1622
1623        default:
1624            dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1625            tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1626            break;
1627    }
1628
1629    assert( msglen + 1 == msgs->incoming.length );
1630    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1631
1632    msgs->state = AWAITING_BT_LENGTH;
1633    return READ_NOW;
1634}
1635
1636static void
1637decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount )
1638{
1639    tr_torrent * tor = msgs->torrent;
1640
1641    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1642}
1643
1644static void
1645clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
1646{
1647    decrementDownloadedCount( msgs, req->length );
1648}
1649
1650static void
1651addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1652{
1653    if( !msgs->peer->blame )
1654         msgs->peer->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1655    tr_bitfieldAdd( msgs->peer->blame, index );
1656}
1657
1658/* returns 0 on success, or an errno on failure */
1659static int
1660clientGotBlock( tr_peermsgs *               msgs,
1661                const uint8_t *             data,
1662                const struct peer_request * req )
1663{
1664    int err;
1665    tr_torrent * tor = msgs->torrent;
1666    const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1667
1668    assert( msgs );
1669    assert( req );
1670
1671    if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1672        dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1673                tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1674        return EMSGSIZE;
1675    }
1676
1677    /* save the block */
1678    dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1679
1680    /**
1681    *** Remove the block from our `we asked for this' list
1682    **/
1683
1684    if( !reqListRemove( &msgs->clientAskedFor, req ) ) {
1685        clientGotUnwantedBlock( msgs, req );
1686        dbgmsg( msgs, "we didn't ask for this message..." );
1687        return 0;
1688    }
1689
1690    dbgmsg( msgs, "peer has %d more blocks we've asked for",
1691            msgs->clientAskedFor.count );
1692
1693    /**
1694    *** Error checks
1695    **/
1696
1697    if( tr_cpBlockIsComplete( tor->completion, block ) ) {
1698        dbgmsg( msgs, "we have this block already..." );
1699        clientGotUnwantedBlock( msgs, req );
1700        return 0;
1701    }
1702
1703    /**
1704    ***  Save the block
1705    **/
1706
1707    if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
1708        return err;
1709
1710    addPeerToBlamefield( msgs, req->index );
1711    fireGotBlock( msgs, req );
1712    return 0;
1713}
1714
1715static int peerPulse( void * vmsgs );
1716
1717static void
1718didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1719{
1720    tr_peermsgs * msgs = vmsgs;
1721    firePeerGotData( msgs, bytesWritten, wasPieceData );
1722    peerPulse( msgs );
1723}
1724
1725static ReadState
1726canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1727{
1728    ReadState         ret;
1729    tr_peermsgs *     msgs = vmsgs;
1730    struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1731    const size_t      inlen = EVBUFFER_LENGTH( in );
1732
1733    if( !inlen )
1734    {
1735        ret = READ_LATER;
1736    }
1737    else if( msgs->state == AWAITING_BT_PIECE )
1738    {
1739        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1740    }
1741    else switch( msgs->state )
1742    {
1743        case AWAITING_BT_LENGTH:
1744            ret = readBtLength ( msgs, in, inlen ); break;
1745
1746        case AWAITING_BT_ID:
1747            ret = readBtId     ( msgs, in, inlen ); break;
1748
1749        case AWAITING_BT_MESSAGE:
1750            ret = readBtMessage( msgs, in, inlen ); break;
1751
1752        default:
1753            assert( 0 );
1754    }
1755
1756    /* log the raw data that was read */
1757    if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
1758        fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1759
1760    return ret;
1761}
1762
1763/**
1764***
1765**/
1766
1767static int
1768ratePulse( void * vmsgs )
1769{
1770    tr_peermsgs * msgs = vmsgs;
1771    const double rateToClient = tr_peerGetPieceSpeed( msgs->peer, TR_PEER_TO_CLIENT );
1772    const int seconds = 10;
1773    const int estimatedBlocksInPeriod = ( rateToClient * seconds * 1024 ) / msgs->torrent->blockSize;
1774
1775    msgs->minActiveRequests = 8;
1776    msgs->maxActiveRequests = msgs->minActiveRequests + estimatedBlocksInPeriod;
1777
1778    if( msgs->reqq > 0 )
1779        msgs->maxActiveRequests = MIN( msgs->maxActiveRequests, msgs->reqq );
1780
1781    return TRUE;
1782}
1783
1784static size_t
1785fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1786{
1787    size_t bytesWritten = 0;
1788    struct peer_request req;
1789    const int haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1790    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1791
1792    /**
1793    ***  Protocol messages
1794    **/
1795
1796    if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1797    {
1798        dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1799        msgs->outMessagesBatchedAt = now;
1800    }
1801    else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1802    {
1803        const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
1804        /* flush the protocol messages */
1805        dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1806        tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, FALSE );
1807        msgs->clientSentAnythingAt = now;
1808        msgs->outMessagesBatchedAt = 0;
1809        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1810        bytesWritten +=  len;
1811    }
1812
1813    /**
1814    ***  Blocks
1815    **/
1816
1817    if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io ) >= msgs->torrent->blockSize )
1818        && popNextRequest( msgs, &req ) )
1819    {
1820        if( requestIsValid( msgs, &req )
1821            && tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) )
1822        {
1823            /* send a block */
1824            uint8_t * buf = tr_new( uint8_t, req.length );
1825            const int err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf );
1826            if( err ) {
1827                fireError( msgs, err );
1828                bytesWritten = 0;
1829                msgs = NULL;
1830            } else {
1831                tr_peerIo * io = msgs->peer->io;
1832                struct evbuffer * out = evbuffer_new( );
1833                dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1834                tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1835                tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1836                tr_peerIoWriteUint32( io, out, req.index );
1837                tr_peerIoWriteUint32( io, out, req.offset );
1838                tr_peerIoWriteBytes ( io, out, buf, req.length );
1839                tr_peerIoWriteBuf( io, out, TRUE );
1840                bytesWritten += EVBUFFER_LENGTH( out );
1841                evbuffer_free( out );
1842                msgs->clientSentAnythingAt = now;
1843            }
1844            tr_free( buf );
1845        }
1846        else if( fext ) /* peer needs a reject message */
1847        {
1848            protocolSendReject( msgs, &req );
1849        }
1850    }
1851
1852    /**
1853    ***  Keepalive
1854    **/
1855
1856    if( ( msgs != NULL )
1857        && ( msgs->clientSentAnythingAt != 0 )
1858        && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1859    {
1860        dbgmsg( msgs, "sending a keepalive message" );
1861        tr_peerIoWriteUint32( msgs->peer->io, msgs->outMessages, 0 );
1862        pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1863    }
1864
1865    return bytesWritten;
1866}
1867
1868static int
1869peerPulse( void * vmsgs )
1870{
1871    tr_peermsgs * msgs = vmsgs;
1872    const time_t  now = time( NULL );
1873
1874    ratePulse( msgs );
1875
1876    pumpRequestQueue( msgs, now );
1877    expireOldRequests( msgs, now );
1878
1879    for( ;; )
1880        if( fillOutputBuffer( msgs, now ) < 1 )
1881            break;
1882
1883    return TRUE; /* loop forever */
1884}
1885
1886void
1887tr_peerMsgsPulse( tr_peermsgs * msgs )
1888{
1889    if( msgs )
1890        peerPulse( msgs );
1891}
1892
1893static void
1894gotError( tr_peerIo  * io UNUSED,
1895          short        what,
1896          void       * vmsgs )
1897{
1898    if( what & EVBUFFER_TIMEOUT )
1899        dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1900    if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
1901        dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1902               what, errno, tr_strerror( errno ) );
1903    fireError( vmsgs, ENOTCONN );
1904}
1905
1906static void
1907sendBitfield( tr_peermsgs * msgs )
1908{
1909    struct evbuffer * out = msgs->outMessages;
1910    tr_bitfield *     field;
1911    tr_piece_index_t  lazyPieces[LAZY_PIECE_COUNT];
1912    size_t            i;
1913    size_t            lazyCount = 0;
1914
1915    field = tr_bitfieldDup( tr_cpPieceBitfield( msgs->torrent->completion ) );
1916
1917    if( tr_sessionIsLazyBitfieldEnabled( msgs->session ) )
1918    {
1919        /** Lazy bitfields aren't a high priority or secure, so I'm opting for
1920            speed over a truly random sample -- let's limit the pool size to
1921            the first 1000 pieces so large torrents don't bog things down */
1922        size_t poolSize;
1923        const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
1924        tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
1925
1926        /* build the pool */
1927        for( i=poolSize=0; i<maxPoolSize; ++i )
1928            if( tr_bitfieldHas( field, i ) )
1929                pool[poolSize++] = i;
1930
1931        /* pull random piece indices from the pool */
1932        while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
1933        {
1934            const int pos = tr_cryptoWeakRandInt( poolSize );
1935            const tr_piece_index_t piece = pool[pos];
1936            tr_bitfieldRem( field, piece );
1937            lazyPieces[lazyCount++] = piece;
1938            pool[pos] = pool[--poolSize];
1939        }
1940
1941        /* cleanup */
1942        tr_free( pool );
1943    }
1944
1945    tr_peerIoWriteUint32( msgs->peer->io, out,
1946                          sizeof( uint8_t ) + field->byteCount );
1947    tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_BITFIELD );
1948    tr_peerIoWriteBytes ( msgs->peer->io, out, field->bits, field->byteCount );
1949    dbgmsg( msgs, "sending bitfield... outMessage size is now %zu",
1950            EVBUFFER_LENGTH( out ) );
1951    pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1952
1953    for( i = 0; i < lazyCount; ++i )
1954        protocolSendHave( msgs, lazyPieces[i] );
1955
1956    tr_bitfieldFree( field );
1957}
1958
1959static void
1960tellPeerWhatWeHave( tr_peermsgs * msgs )
1961{
1962    const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1963
1964    if( fext && ( tr_cpGetStatus( msgs->torrent->completion ) == TR_SEED ) )
1965    {
1966        protocolSendHaveAll( msgs );
1967    }
1968    else if( fext && ( tr_cpHaveValid( msgs->torrent->completion ) == 0 ) )
1969    {
1970        protocolSendHaveNone( msgs );
1971    }
1972    else
1973    {
1974        sendBitfield( msgs );
1975    }
1976}
1977
1978/**
1979***
1980**/
1981
1982/* some peers give us error messages if we send
1983   more than this many peers in a single pex message
1984   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
1985#define MAX_PEX_ADDED 50
1986#define MAX_PEX_DROPPED 50
1987
1988typedef struct
1989{
1990    tr_pex *  added;
1991    tr_pex *  dropped;
1992    tr_pex *  elements;
1993    int       addedCount;
1994    int       droppedCount;
1995    int       elementCount;
1996}
1997PexDiffs;
1998
1999static void
2000pexAddedCb( void * vpex,
2001            void * userData )
2002{
2003    PexDiffs * diffs = userData;
2004    tr_pex *   pex = vpex;
2005
2006    if( diffs->addedCount < MAX_PEX_ADDED )
2007    {
2008        diffs->added[diffs->addedCount++] = *pex;
2009        diffs->elements[diffs->elementCount++] = *pex;
2010    }
2011}
2012
2013static void
2014pexDroppedCb( void * vpex,
2015              void * userData )
2016{
2017    PexDiffs * diffs = userData;
2018    tr_pex *   pex = vpex;
2019
2020    if( diffs->droppedCount < MAX_PEX_DROPPED )
2021    {
2022        diffs->dropped[diffs->droppedCount++] = *pex;
2023    }
2024}
2025
2026static void
2027pexElementCb( void * vpex,
2028              void * userData )
2029{
2030    PexDiffs * diffs = userData;
2031    tr_pex *   pex = vpex;
2032
2033    diffs->elements[diffs->elementCount++] = *pex;
2034}
2035
2036static void
2037sendPex( tr_peermsgs * msgs )
2038{
2039    if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2040    {
2041        PexDiffs diffs;
2042        PexDiffs diffs6;
2043        tr_pex * newPex = NULL;
2044        tr_pex * newPex6 = NULL;
2045        const int newCount = tr_peerMgrGetPeers( msgs->session->peerMgr,
2046                                                 msgs->torrent->info.hash,
2047                                                 &newPex, TR_AF_INET );
2048        const int newCount6 = tr_peerMgrGetPeers( msgs->session->peerMgr,
2049                                                  msgs->torrent->info.hash,
2050                                                  &newPex6, TR_AF_INET6 );
2051
2052        /* build the diffs */
2053        diffs.added = tr_new( tr_pex, newCount );
2054        diffs.addedCount = 0;
2055        diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2056        diffs.droppedCount = 0;
2057        diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2058        diffs.elementCount = 0;
2059        tr_set_compare( msgs->pex, msgs->pexCount,
2060                        newPex, newCount,
2061                        tr_pexCompare, sizeof( tr_pex ),
2062                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2063        diffs6.added = tr_new( tr_pex, newCount6 );
2064        diffs6.addedCount = 0;
2065        diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
2066        diffs6.droppedCount = 0;
2067        diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
2068        diffs6.elementCount = 0;
2069        tr_set_compare( msgs->pex6, msgs->pexCount6,
2070                        newPex6, newCount6,
2071                        tr_pexCompare, sizeof( tr_pex ),
2072                        pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
2073        dbgmsg(
2074            msgs,
2075            "pex: old peer count %d, new peer count %d, added %d, removed %d",
2076            msgs->pexCount, newCount + newCount6,
2077            diffs.addedCount + diffs6.addedCount,
2078            diffs.droppedCount + diffs6.droppedCount );
2079
2080        if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2081            !diffs6.droppedCount )
2082        {
2083            tr_free( diffs.elements );
2084            tr_free( diffs6.elements );
2085        }
2086        else
2087        {
2088            int  i;
2089            tr_benc val;
2090            char * benc;
2091            int bencLen;
2092            uint8_t * tmp, *walk;
2093            struct evbuffer * out = msgs->outMessages;
2094
2095            /* update peer */
2096            tr_free( msgs->pex );
2097            msgs->pex = diffs.elements;
2098            msgs->pexCount = diffs.elementCount;
2099            tr_free( msgs->pex6 );
2100            msgs->pex6 = diffs6.elements;
2101            msgs->pexCount6 = diffs6.elementCount;
2102
2103            /* build the pex payload */
2104            tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
2105                                         * speed vs. likelihood? */
2106
2107            /* "added" */
2108            tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2109            for( i = 0; i < diffs.addedCount; ++i )
2110            {
2111                memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2112                memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2113            }
2114            assert( ( walk - tmp ) == diffs.addedCount * 6 );
2115            tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2116            tr_free( tmp );
2117
2118            /* "added.f" */
2119            tmp = walk = tr_new( uint8_t, diffs.addedCount );
2120            for( i = 0; i < diffs.addedCount; ++i )
2121                *walk++ = diffs.added[i].flags;
2122            assert( ( walk - tmp ) == diffs.addedCount );
2123            tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2124            tr_free( tmp );
2125
2126            /* "dropped" */
2127            tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2128            for( i = 0; i < diffs.droppedCount; ++i )
2129            {
2130                memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2131                memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2132            }
2133            assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2134            tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2135            tr_free( tmp );
2136           
2137            /* "added6" */
2138            tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2139            for( i = 0; i < diffs6.addedCount; ++i )
2140            {
2141                memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2142                walk += 16;
2143                memcpy( walk, &diffs6.added[i].port, 2 );
2144                walk += 2;
2145            }
2146            assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2147            tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2148            tr_free( tmp );
2149           
2150            /* "added6.f" */
2151            tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2152            for( i = 0; i < diffs6.addedCount; ++i )
2153                *walk++ = diffs6.added[i].flags;
2154            assert( ( walk - tmp ) == diffs6.addedCount );
2155            tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2156            tr_free( tmp );
2157           
2158            /* "dropped6" */
2159            tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2160            for( i = 0; i < diffs6.droppedCount; ++i )
2161            {
2162                memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2163                walk += 16;
2164                memcpy( walk, &diffs6.dropped[i].port, 2 );
2165                walk += 2;
2166            }
2167            assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2168            tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2169            tr_free( tmp );
2170
2171            /* write the pex message */
2172            benc = tr_bencSave( &val, &bencLen );
2173            tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + bencLen );
2174            tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
2175            tr_peerIoWriteUint8 ( msgs->peer->io, out, msgs->ut_pex_id );
2176            tr_peerIoWriteBytes ( msgs->peer->io, out, benc, bencLen );
2177            pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2178            dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
2179
2180            tr_free( benc );
2181            tr_bencFree( &val );
2182        }
2183
2184        /* cleanup */
2185        tr_free( diffs.added );
2186        tr_free( diffs.dropped );
2187        tr_free( newPex );
2188        tr_free( diffs6.added );
2189        tr_free( diffs6.dropped );
2190        tr_free( newPex6 );
2191
2192        msgs->clientSentPexAt = time( NULL );
2193    }
2194}
2195
2196static int
2197pexPulse( void * vpeer )
2198{
2199    sendPex( vpeer );
2200    return TRUE;
2201}
2202
2203/**
2204***
2205**/
2206
2207tr_peermsgs*
2208tr_peerMsgsNew( struct tr_torrent * torrent,
2209                struct tr_peer *    peer,
2210                tr_delivery_func    func,
2211                void *              userData,
2212                tr_publisher_tag *  setme )
2213{
2214    tr_peermsgs * m;
2215
2216    assert( peer );
2217    assert( peer->io );
2218
2219    m = tr_new0( tr_peermsgs, 1 );
2220    m->publisher = tr_publisherNew( );
2221    m->peer = peer;
2222    m->session = torrent->session;
2223    m->torrent = torrent;
2224    m->peer->clientIsChoked = 1;
2225    m->peer->peerIsChoked = 1;
2226    m->peer->clientIsInterested = 0;
2227    m->peer->peerIsInterested = 0;
2228    m->peer->have = tr_bitfieldNew( torrent->info.pieceCount );
2229    m->state = AWAITING_BT_LENGTH;
2230    m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL );
2231    m->outMessages = evbuffer_new( );
2232    m->outMessagesBatchedAt = 0;
2233    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2234    m->incoming.block = evbuffer_new( );
2235    m->peerAskedFor = REQUEST_LIST_INIT;
2236    m->clientAskedFor = REQUEST_LIST_INIT;
2237    m->clientWillAskFor = REQUEST_LIST_INIT;
2238    peer->msgs = m;
2239
2240    *setme = tr_publisherSubscribe( m->publisher, func, userData );
2241
2242    if( tr_peerIoSupportsLTEP( peer->io ) )
2243        sendLtepHandshake( m );
2244
2245    tellPeerWhatWeHave( m );
2246
2247    tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2248    ratePulse( m );
2249
2250    return m;
2251}
2252
2253void
2254tr_peerMsgsFree( tr_peermsgs* msgs )
2255{
2256    if( msgs )
2257    {
2258        tr_timerFree( &msgs->pexTimer );
2259        tr_publisherFree( &msgs->publisher );
2260        reqListClear( &msgs->clientWillAskFor );
2261        reqListClear( &msgs->clientAskedFor );
2262        reqListClear( &msgs->peerAskedFor );
2263
2264        evbuffer_free( msgs->incoming.block );
2265        evbuffer_free( msgs->outMessages );
2266        tr_free( msgs->pex6 );
2267        tr_free( msgs->pex );
2268
2269        memset( msgs, ~0, sizeof( tr_peermsgs ) );
2270        tr_free( msgs );
2271    }
2272}
2273
2274void
2275tr_peerMsgsUnsubscribe( tr_peermsgs *    peer,
2276                        tr_publisher_tag tag )
2277{
2278    tr_publisherUnsubscribe( peer->publisher, tag );
2279}
2280
Note: See TracBrowser for help on using the repository browser.