source: trunk/libtransmission/peer-mgr.c @ 12328

Last change on this file since 12328 was 12328, checked in by jordan, 11 years ago

(trunk libT) more heap pruning: avoid four unnecessary malloc() + free() calls per tr_peer.

This commit also changes tr_recentHistory from being a general-purpose tool to being a little more hardcoded for the only purpose it's used, in tr_peerMgr. If its files (history.[ch]) don't find any other "customers" in libtransmission, eventually it should be demoted to being a private helper class inside of peer-mgr.c and have the history.[ch] files removed from the build.

  • Property svn:keywords set to Date Rev Author Id
File size: 114.2 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 12328 2011-04-06 23:27:11Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h> /* error codes ERANGE, ... */
15#include <limits.h> /* INT_MAX */
16#include <string.h> /* memcpy, memcmp, strstr */
17#include <stdlib.h> /* qsort */
18
19#include <event2/event.h>
20#include <libutp/utp.h>
21
22#include "transmission.h"
23#include "announcer.h"
24#include "bandwidth.h"
25#include "blocklist.h"
26#include "cache.h"
27#include "clients.h"
28#include "completion.h"
29#include "crypto.h"
30#include "handshake.h"
31#include "net.h"
32#include "peer-io.h"
33#include "peer-mgr.h"
34#include "peer-msgs.h"
35#include "ptrarray.h"
36#include "session.h"
37#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
38#include "torrent.h"
39#include "utils.h"
40#include "webseed.h"
41
42enum
43{
44    /* how frequently to cull old atoms */
45    ATOM_PERIOD_MSEC = ( 60 * 1000 ),
46
47    /* how frequently to change which peers are choked */
48    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
49
50    /* an optimistically unchoked peer is immune from rechoking
51       for this many calls to rechokeUploads(). */
52    OPTIMISTIC_UNCHOKE_MULTIPLIER = 4,
53
54    /* how frequently to reallocate bandwidth */
55    BANDWIDTH_PERIOD_MSEC = 500,
56
57    /* how frequently to age out old piece request lists */
58    REFILL_UPKEEP_PERIOD_MSEC = ( 10 * 1000 ),
59
60    /* how frequently to decide which peers live and die */
61    RECONNECT_PERIOD_MSEC = 500,
62
63    /* when many peers are available, keep idle ones this long */
64    MIN_UPLOAD_IDLE_SECS = ( 60 ),
65
66    /* when few peers are available, keep idle ones this long */
67    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
68
69    /* max number of peers to ask for per second overall.
70     * this throttle is to avoid overloading the router */
71    MAX_CONNECTIONS_PER_SECOND = 12,
72
73    MAX_CONNECTIONS_PER_PULSE = (int)(MAX_CONNECTIONS_PER_SECOND * (RECONNECT_PERIOD_MSEC/1000.0)),
74
75    /* number of bad pieces a peer is allowed to send before we ban them */
76    MAX_BAD_PIECES_PER_PEER = 5,
77
78    /* amount of time to keep a list of request pieces lying around
79       before it's considered too old and needs to be rebuilt */
80    PIECE_LIST_SHELF_LIFE_SECS = 60,
81
82    /* use for bitwise operations w/peer_atom.flags2 */
83    MYFLAG_BANNED = 1,
84
85    /* use for bitwise operations w/peer_atom.flags2 */
86    /* unreachable for now... but not banned.
87     * if they try to connect to us it's okay */
88    MYFLAG_UNREACHABLE = 2,
89
90    /* the minimum we'll wait before attempting to reconnect to a peer */
91    MINIMUM_RECONNECT_INTERVAL_SECS = 5,
92
93    /** how long we'll let requests we've made linger before we cancel them */
94    REQUEST_TTL_SECS = 120,
95
96    NO_BLOCKS_CANCEL_HISTORY = 120,
97
98    CANCEL_HISTORY_SEC = 60
99};
100
101const tr_peer_event TR_PEER_EVENT_INIT = { 0, 0, NULL, 0, 0, 0, false, 0 };
102
103/**
104***
105**/
106
107/**
108 * Peer information that should be kept even before we've connected and
109 * after we've disconnected. These are kept in a pool of peer_atoms to decide
110 * which ones would make good candidates for connecting to, and to watch out
111 * for banned peers.
112 *
113 * @see tr_peer
114 * @see tr_peermsgs
115 */
116struct peer_atom
117{
118    uint8_t     fromFirst;          /* where the peer was first found */
119    uint8_t     fromBest;           /* the "best" value of where the peer has been found */
120    uint8_t     flags;              /* these match the added_f flags */
121    uint8_t     flags2;             /* flags that aren't defined in added_f */
122    int8_t      seedProbability;    /* how likely is this to be a seed... [0..100] or -1 for unknown */
123    int8_t      blocklisted;        /* -1 for unknown, true for blocklisted, false for not blocklisted */
124
125    tr_port     port;
126    bool        utp_failed;         /* We recently failed to connect over uTP */
127    uint16_t    numFails;
128    time_t      time;               /* when the peer's connection status last changed */
129    time_t      piece_data_time;
130
131    time_t      lastConnectionAttemptAt;
132    time_t      lastConnectionAt;
133
134    /* similar to a TTL field, but less rigid --
135     * if the swarm is small, the atom will be kept past this date. */
136    time_t      shelf_date;
137    tr_peer   * peer;               /* will be NULL if not connected */
138    tr_address  addr;
139};
140
141#ifdef NDEBUG
142#define tr_isAtom(a) (TRUE)
143#else
144static bool
145tr_isAtom( const struct peer_atom * atom )
146{
147    return ( atom != NULL )
148        && ( atom->fromFirst < TR_PEER_FROM__MAX )
149        && ( atom->fromBest < TR_PEER_FROM__MAX )
150        && ( tr_address_is_valid( &atom->addr ) );
151}
152#endif
153
154static const char*
155tr_atomAddrStr( const struct peer_atom * atom )
156{
157    return atom ? tr_peerIoAddrStr( &atom->addr, atom->port ) : "[no atom]";
158}
159
160struct block_request
161{
162    tr_block_index_t block;
163    tr_peer * peer;
164    time_t sentAt;
165};
166
167struct weighted_piece
168{
169    tr_piece_index_t index;
170    int16_t salt;
171    int16_t requestCount;
172};
173
174enum piece_sort_state
175{
176    PIECES_UNSORTED,
177    PIECES_SORTED_BY_INDEX,
178    PIECES_SORTED_BY_WEIGHT
179};
180
181/** @brief Opaque, per-torrent data structure for peer connection information */
182typedef struct tr_torrent_peers
183{
184    tr_ptrArray                outgoingHandshakes; /* tr_handshake */
185    tr_ptrArray                pool; /* struct peer_atom */
186    tr_ptrArray                peers; /* tr_peer */
187    tr_ptrArray                webseeds; /* tr_webseed */
188
189    tr_torrent               * tor;
190    struct tr_peerMgr        * manager;
191
192    tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
193    int                        optimisticUnchokeTimeScaler;
194
195    bool                       isRunning;
196    bool                       needsCompletenessCheck;
197
198    struct block_request     * requests;
199    int                        requestCount;
200    int                        requestAlloc;
201
202    struct weighted_piece    * pieces;
203    int                        pieceCount;
204    enum piece_sort_state      pieceSortState;
205
206    /* An array of pieceCount items stating how many peers have each piece.
207       This is used to help us for downloading pieces "rarest first."
208       This may be NULL if we don't have metainfo yet, or if we're not
209       downloading and don't care about rarity */
210    uint16_t                 * pieceReplication;
211    size_t                     pieceReplicationSize;
212
213    int                        interestedCount;
214    int                        maxPeers;
215    time_t                     lastCancel;
216
217    /* Before the endgame this should be 0. In endgame, is contains the average
218     * number of pending requests per peer. Only peers which have more pending
219     * requests are considered 'fast' are allowed to request a block that's
220     * already been requested from another (slower?) peer. */
221    int                        endgame;
222}
223Torrent;
224
225struct tr_peerMgr
226{
227    tr_session    * session;
228    tr_ptrArray     incomingHandshakes; /* tr_handshake */
229    struct event  * bandwidthTimer;
230    struct event  * rechokeTimer;
231    struct event  * refillUpkeepTimer;
232    struct event  * atomTimer;
233};
234
235#define tordbg( t, ... ) \
236    do { \
237        if( tr_deepLoggingIsActive( ) ) \
238            tr_deepLog( __FILE__, __LINE__, tr_torrentName( t->tor ), __VA_ARGS__ ); \
239    } while( 0 )
240
241#define dbgmsg( ... ) \
242    do { \
243        if( tr_deepLoggingIsActive( ) ) \
244            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
245    } while( 0 )
246
247/**
248***
249**/
250
251static inline void
252managerLock( const struct tr_peerMgr * manager )
253{
254    tr_sessionLock( manager->session );
255}
256
257static inline void
258managerUnlock( const struct tr_peerMgr * manager )
259{
260    tr_sessionUnlock( manager->session );
261}
262
263static inline void
264torrentLock( Torrent * torrent )
265{
266    managerLock( torrent->manager );
267}
268
269static inline void
270torrentUnlock( Torrent * torrent )
271{
272    managerUnlock( torrent->manager );
273}
274
275static inline int
276torrentIsLocked( const Torrent * t )
277{
278    return tr_sessionIsLocked( t->manager->session );
279}
280
281/**
282***
283**/
284
285static int
286handshakeCompareToAddr( const void * va, const void * vb )
287{
288    const tr_handshake * a = va;
289
290    return tr_address_compare( tr_handshakeGetAddr( a, NULL ), vb );
291}
292
293static int
294handshakeCompare( const void * a, const void * b )
295{
296    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
297}
298
299static inline tr_handshake*
300getExistingHandshake( tr_ptrArray * handshakes, const tr_address * addr )
301{
302    if( tr_ptrArrayEmpty( handshakes ) )
303        return NULL;
304
305    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
306}
307
308static int
309comparePeerAtomToAddress( const void * va, const void * vb )
310{
311    const struct peer_atom * a = va;
312
313    return tr_address_compare( &a->addr, vb );
314}
315
316static int
317compareAtomsByAddress( const void * va, const void * vb )
318{
319    const struct peer_atom * b = vb;
320
321    assert( tr_isAtom( b ) );
322
323    return comparePeerAtomToAddress( va, &b->addr );
324}
325
326/**
327***
328**/
329
330const tr_address *
331tr_peerAddress( const tr_peer * peer )
332{
333    return &peer->atom->addr;
334}
335
336static Torrent*
337getExistingTorrent( tr_peerMgr *    manager,
338                    const uint8_t * hash )
339{
340    tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash );
341
342    return tor == NULL ? NULL : tor->torrentPeers;
343}
344
345static int
346peerCompare( const void * a, const void * b )
347{
348    return tr_address_compare( tr_peerAddress( a ), tr_peerAddress( b ) );
349}
350
351static struct peer_atom*
352getExistingAtom( const Torrent    * t,
353                 const tr_address * addr )
354{
355    Torrent * tt = (Torrent*)t;
356    assert( torrentIsLocked( t ) );
357    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
358}
359
360static bool
361peerIsInUse( const Torrent * ct, const struct peer_atom * atom )
362{
363    Torrent * t = (Torrent*) ct;
364
365    assert( torrentIsLocked ( t ) );
366
367    return ( atom->peer != NULL )
368        || getExistingHandshake( &t->outgoingHandshakes, &atom->addr )
369        || getExistingHandshake( &t->manager->incomingHandshakes, &atom->addr );
370}
371
372void
373tr_peerConstruct( tr_peer * peer )
374{
375    memset( peer, 0, sizeof( tr_peer ) );
376
377    peer->have = TR_BITFIELD_INIT;
378}
379
380static tr_peer*
381peerNew( struct peer_atom * atom )
382{
383    tr_peer * peer = tr_new( tr_peer, 1 );
384    tr_peerConstruct( peer );
385
386    peer->atom = atom;
387    atom->peer = peer;
388
389    return peer;
390}
391
392static tr_peer*
393getPeer( Torrent * torrent, struct peer_atom * atom )
394{
395    tr_peer * peer;
396
397    assert( torrentIsLocked( torrent ) );
398
399    peer = atom->peer;
400
401    if( peer == NULL )
402    {
403        peer = peerNew( atom );
404        tr_bitfieldConstruct( &peer->have, torrent->tor->blockCount );
405        tr_bitfieldConstruct( &peer->blame, torrent->tor->blockCount );
406        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
407    }
408
409    return peer;
410}
411
412static void peerDeclinedAllRequests( Torrent *, const tr_peer * );
413
414void
415tr_peerDestruct( tr_torrent * tor, tr_peer * peer )
416{
417    assert( peer != NULL );
418
419    peerDeclinedAllRequests( tor->torrentPeers, peer );
420
421    if( peer->msgs != NULL )
422        tr_peerMsgsFree( peer->msgs );
423
424    if( peer->io ) {
425        tr_peerIoClear( peer->io );
426        tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
427    }
428
429    tr_bitfieldDestruct( &peer->have );
430    tr_bitfieldDestruct( &peer->blame );
431    tr_free( peer->client );
432
433    if( peer->atom )
434        peer->atom->peer = NULL;
435}
436
437static void
438peerDelete( Torrent * t, tr_peer * peer )
439{
440    tr_peerDestruct( t->tor, peer );
441    tr_free( peer );
442}
443
444static bool
445replicationExists( const Torrent * t )
446{
447    return t->pieceReplication != NULL;
448}
449
450static void
451replicationFree( Torrent * t )
452{
453    tr_free( t->pieceReplication );
454    t->pieceReplication = NULL;
455    t->pieceReplicationSize = 0;
456}
457
458static void
459replicationNew( Torrent * t )
460{
461    tr_piece_index_t piece_i;
462    const tr_piece_index_t piece_count = t->tor->info.pieceCount;
463    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
464    const int peer_count = tr_ptrArraySize( &t->peers );
465
466    assert( !replicationExists( t ) );
467
468    t->pieceReplicationSize = piece_count;
469    t->pieceReplication = tr_new0( uint16_t, piece_count );
470
471    for( piece_i=0; piece_i<piece_count; ++piece_i )
472    {
473        int peer_i;
474        uint16_t r = 0;
475
476        for( peer_i=0; peer_i<peer_count; ++peer_i )
477            if( tr_bitfieldHas( &peers[peer_i]->have, piece_i ) )
478                ++r;
479
480        t->pieceReplication[piece_i] = r;
481    }
482}
483
484static void
485torrentFree( void * vt )
486{
487    Torrent * t = vt;
488
489    assert( t );
490    assert( !t->isRunning );
491    assert( torrentIsLocked( t ) );
492    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
493    assert( tr_ptrArrayEmpty( &t->peers ) );
494
495    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
496    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
497    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
498    tr_ptrArrayDestruct( &t->peers, NULL );
499
500    replicationFree( t );
501
502    tr_free( t->requests );
503    tr_free( t->pieces );
504    tr_free( t );
505}
506
507static void peerCallbackFunc( tr_peer *, const tr_peer_event *, void * );
508
509static Torrent*
510torrentNew( tr_peerMgr * manager, tr_torrent * tor )
511{
512    int       i;
513    Torrent * t;
514
515    t = tr_new0( Torrent, 1 );
516    t->manager = manager;
517    t->tor = tor;
518    t->pool = TR_PTR_ARRAY_INIT;
519    t->peers = TR_PTR_ARRAY_INIT;
520    t->webseeds = TR_PTR_ARRAY_INIT;
521    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
522
523    for( i = 0; i < tor->info.webseedCount; ++i )
524    {
525        tr_webseed * w =
526            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
527        tr_ptrArrayAppend( &t->webseeds, w );
528    }
529
530    return t;
531}
532
533tr_peerMgr*
534tr_peerMgrNew( tr_session * session )
535{
536    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
537    m->session = session;
538    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
539    return m;
540}
541
542static void
543deleteTimer( struct event ** t )
544{
545    if( *t != NULL )
546    {
547        event_free( *t );
548        *t = NULL;
549    }
550}
551
552static void
553deleteTimers( struct tr_peerMgr * m )
554{
555    deleteTimer( &m->atomTimer );
556    deleteTimer( &m->bandwidthTimer );
557    deleteTimer( &m->rechokeTimer );
558    deleteTimer( &m->refillUpkeepTimer );
559}
560
561void
562tr_peerMgrFree( tr_peerMgr * manager )
563{
564    managerLock( manager );
565
566    deleteTimers( manager );
567
568    /* free the handshakes. Abort invokes handshakeDoneCB(), which removes
569     * the item from manager->handshakes, so this is a little roundabout... */
570    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
571        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
572
573    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
574
575    managerUnlock( manager );
576    tr_free( manager );
577}
578
579static int
580clientIsDownloadingFrom( const tr_torrent * tor, const tr_peer * peer )
581{
582    if( !tr_torrentHasMetadata( tor ) )
583        return true;
584
585    return peer->clientIsInterested && !peer->clientIsChoked;
586}
587
588static int
589clientIsUploadingTo( const tr_peer * peer )
590{
591    return peer->peerIsInterested && !peer->peerIsChoked;
592}
593
594/***
595****
596***/
597
598void
599tr_peerMgrOnBlocklistChanged( tr_peerMgr * mgr )
600{
601    tr_torrent * tor = NULL;
602    tr_session * session = mgr->session;
603
604    /* we cache whether or not a peer is blocklisted...
605       since the blocklist has changed, erase that cached value */
606    while(( tor = tr_torrentNext( session, tor )))
607    {
608        int i;
609        Torrent * t = tor->torrentPeers;
610        const int n = tr_ptrArraySize( &t->pool );
611        for( i=0; i<n; ++i ) {
612            struct peer_atom * atom = tr_ptrArrayNth( &t->pool, i );
613            atom->blocklisted = -1;
614        }
615    }
616}
617
618static bool
619isAtomBlocklisted( tr_session * session, struct peer_atom * atom )
620{
621    if( atom->blocklisted < 0 )
622        atom->blocklisted = tr_sessionIsAddressBlocked( session, &atom->addr );
623
624    assert( tr_isBool( atom->blocklisted ) );
625    return atom->blocklisted;
626}
627
628
629/***
630****
631***/
632
633static void
634atomSetSeedProbability( struct peer_atom * atom, int seedProbability )
635{
636    assert( atom != NULL );
637    assert( -1<=seedProbability && seedProbability<=100 );
638
639    atom->seedProbability = seedProbability;
640
641    if( seedProbability == 100 )
642        atom->flags |= ADDED_F_SEED_FLAG;
643    else if( seedProbability != -1 )
644        atom->flags &= ~ADDED_F_SEED_FLAG;
645}
646
647static inline bool
648atomIsSeed( const struct peer_atom * atom )
649{
650    return atom->seedProbability == 100;
651}
652
653static void
654atomSetSeed( const Torrent * t, struct peer_atom * atom )
655{
656    if( !atomIsSeed( atom ) )
657    {
658        tordbg( t, "marking peer %s as a seed", tr_atomAddrStr( atom ) );
659
660        atomSetSeedProbability( atom, 100 );
661    }
662}
663
664
665bool
666tr_peerMgrPeerIsSeed( const tr_torrent  * tor,
667                      const tr_address  * addr )
668{
669    bool isSeed = false;
670    const Torrent * t = tor->torrentPeers;
671    const struct peer_atom * atom = getExistingAtom( t, addr );
672
673    if( atom )
674        isSeed = atomIsSeed( atom );
675
676    return isSeed;
677}
678
679void
680tr_peerMgrSetUtpSupported( tr_torrent * tor, const tr_address * addr )
681{
682    struct peer_atom * atom = getExistingAtom( tor->torrentPeers, addr );
683
684    if( atom )
685        atom->flags |= ADDED_F_UTP_FLAGS;
686}
687
688void
689tr_peerMgrSetUtpFailed( tr_torrent *tor, const tr_address *addr, bool failed )
690{
691    struct peer_atom * atom = getExistingAtom( tor->torrentPeers, addr );
692
693    if( atom )
694        atom->utp_failed = failed;
695}
696
697
698/**
699***  REQUESTS
700***
701*** There are two data structures associated with managing block requests:
702***
703*** 1. Torrent::requests, an array of "struct block_request" which keeps
704***    track of which blocks have been requested, and when, and by which peers.
705***    This is list is used for (a) cancelling requests that have been pending
706***    for too long and (b) avoiding duplicate requests before endgame.
707***
708*** 2. Torrent::pieces, an array of "struct weighted_piece" which lists the
709***    pieces that we want to request. It's used to decide which blocks to
710***    return next when tr_peerMgrGetBlockRequests() is called.
711**/
712
713/**
714*** struct block_request
715**/
716
717static int
718compareReqByBlock( const void * va, const void * vb )
719{
720    const struct block_request * a = va;
721    const struct block_request * b = vb;
722
723    /* primary key: block */
724    if( a->block < b->block ) return -1;
725    if( a->block > b->block ) return 1;
726
727    /* secondary key: peer */
728    if( a->peer < b->peer ) return -1;
729    if( a->peer > b->peer ) return 1;
730
731    return 0;
732}
733
734static void
735requestListAdd( Torrent * t, tr_block_index_t block, tr_peer * peer )
736{
737    struct block_request key;
738
739    /* ensure enough room is available... */
740    if( t->requestCount + 1 >= t->requestAlloc )
741    {
742        const int CHUNK_SIZE = 128;
743        t->requestAlloc += CHUNK_SIZE;
744        t->requests = tr_renew( struct block_request,
745                                t->requests, t->requestAlloc );
746    }
747
748    /* populate the record we're inserting */
749    key.block = block;
750    key.peer = peer;
751    key.sentAt = tr_time( );
752
753    /* insert the request to our array... */
754    {
755        bool exact;
756        const int pos = tr_lowerBound( &key, t->requests, t->requestCount,
757                                       sizeof( struct block_request ),
758                                       compareReqByBlock, &exact );
759        assert( !exact );
760        memmove( t->requests + pos + 1,
761                 t->requests + pos,
762                 sizeof( struct block_request ) * ( t->requestCount++ - pos ) );
763        t->requests[pos] = key;
764    }
765
766    if( peer != NULL )
767    {
768        ++peer->pendingReqsToPeer;
769        assert( peer->pendingReqsToPeer >= 0 );
770    }
771
772    /*fprintf( stderr, "added request of block %lu from peer %s... "
773                       "there are now %d block\n",
774                       (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
775}
776
777static struct block_request *
778requestListLookup( Torrent * t, tr_block_index_t block, const tr_peer * peer )
779{
780    struct block_request key;
781    key.block = block;
782    key.peer = (tr_peer*) peer;
783
784    return bsearch( &key, t->requests, t->requestCount,
785                    sizeof( struct block_request ),
786                    compareReqByBlock );
787}
788
789/**
790 * Find the peers are we currently requesting the block
791 * with index @a block from and append them to @a peerArr.
792 */
793static void
794getBlockRequestPeers( Torrent * t, tr_block_index_t block,
795                      tr_ptrArray * peerArr )
796{
797    bool exact;
798    int i, pos;
799    struct block_request key;
800
801    key.block = block;
802    key.peer = NULL;
803    pos = tr_lowerBound( &key, t->requests, t->requestCount,
804                         sizeof( struct block_request ),
805                         compareReqByBlock, &exact );
806
807    assert( !exact ); /* shouldn't have a request with .peer == NULL */
808
809    for( i = pos; i < t->requestCount; ++i )
810    {
811        if( t->requests[i].block != block )
812            break;
813        tr_ptrArrayAppend( peerArr, t->requests[i].peer );
814    }
815}
816
817static void
818decrementPendingReqCount( const struct block_request * b )
819{
820    if( b->peer != NULL )
821        if( b->peer->pendingReqsToPeer > 0 )
822            --b->peer->pendingReqsToPeer;
823}
824
825static void
826requestListRemove( Torrent * t, tr_block_index_t block, const tr_peer * peer )
827{
828    const struct block_request * b = requestListLookup( t, block, peer );
829    if( b != NULL )
830    {
831        const int pos = b - t->requests;
832        assert( pos < t->requestCount );
833
834        decrementPendingReqCount( b );
835
836        tr_removeElementFromArray( t->requests,
837                                   pos,
838                                   sizeof( struct block_request ),
839                                   t->requestCount-- );
840
841        /*fprintf( stderr, "removing request of block %lu from peer %s... "
842                           "there are now %d block requests left\n",
843                           (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
844    }
845}
846
847static int
848countActiveWebseeds( const Torrent * t )
849{
850    int activeCount = 0;
851    const tr_webseed ** w = (const tr_webseed **) tr_ptrArrayBase( &t->webseeds );
852    const tr_webseed ** const wend = w + tr_ptrArraySize( &t->webseeds );
853
854    for( ; w!=wend; ++w )
855        if( tr_webseedIsActive( *w ) )
856            ++activeCount;
857
858    return activeCount;
859}
860
861static bool
862testForEndgame( const Torrent * t )
863{
864    /* we consider ourselves to be in endgame if the number of bytes
865       we've got requested is >= the number of bytes left to download */
866    return ( t->requestCount * t->tor->blockSize )
867               >= tr_cpLeftUntilDone( &t->tor->completion );
868}
869
870static void
871updateEndgame( Torrent * t )
872{
873    assert( t->requestCount >= 0 );
874
875    if( !testForEndgame( t ) )
876    {
877        /* not in endgame */
878        t->endgame = 0;
879    }
880    else if( !t->endgame ) /* only recalculate when endgame first begins */
881    {
882        int numDownloading = 0;
883        const tr_peer ** p = (const tr_peer **) tr_ptrArrayBase( &t->peers );
884        const tr_peer ** const pend = p + tr_ptrArraySize( &t->peers );
885
886        /* add the active bittorrent peers... */
887        for( ; p!=pend; ++p )
888            if( (*p)->pendingReqsToPeer > 0 )
889                ++numDownloading;
890
891        /* add the active webseeds... */
892        numDownloading += countActiveWebseeds( t );
893
894        /* average number of pending requests per downloading peer */
895        t->endgame = t->requestCount / MAX( numDownloading, 1 );
896    }
897}
898
899
900/****
901*****
902*****  Piece List Manipulation / Accessors
903*****
904****/
905
906static inline void
907invalidatePieceSorting( Torrent * t )
908{
909    t->pieceSortState = PIECES_UNSORTED;
910}
911
912static const tr_torrent * weightTorrent;
913
914static const uint16_t * weightReplication;
915
916static void
917setComparePieceByWeightTorrent( Torrent * t )
918{
919    if( !replicationExists( t ) )
920        replicationNew( t );
921
922    weightTorrent = t->tor;
923    weightReplication = t->pieceReplication;
924}
925
926/* we try to create a "weight" s.t. high-priority pieces come before others,
927 * and that partially-complete pieces come before empty ones. */
928static int
929comparePieceByWeight( const void * va, const void * vb )
930{
931    const struct weighted_piece * a = va;
932    const struct weighted_piece * b = vb;
933    int ia, ib, missing, pending;
934    const tr_torrent * tor = weightTorrent;
935    const uint16_t * rep = weightReplication;
936
937    /* primary key: weight */
938    missing = tr_cpMissingBlocksInPiece( &tor->completion, a->index );
939    pending = a->requestCount;
940    ia = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
941    missing = tr_cpMissingBlocksInPiece( &tor->completion, b->index );
942    pending = b->requestCount;
943    ib = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
944    if( ia < ib ) return -1;
945    if( ia > ib ) return 1;
946
947    /* secondary key: higher priorities go first */
948    ia = tor->info.pieces[a->index].priority;
949    ib = tor->info.pieces[b->index].priority;
950    if( ia > ib ) return -1;
951    if( ia < ib ) return 1;
952
953    /* tertiary key: rarest first. */
954    ia = rep[a->index];
955    ib = rep[b->index];
956    if( ia < ib ) return -1;
957    if( ia > ib ) return 1;
958
959    /* quaternary key: random */
960    if( a->salt < b->salt ) return -1;
961    if( a->salt > b->salt ) return 1;
962
963    /* okay, they're equal */
964    return 0;
965}
966
967static int
968comparePieceByIndex( const void * va, const void * vb )
969{
970    const struct weighted_piece * a = va;
971    const struct weighted_piece * b = vb;
972    if( a->index < b->index ) return -1;
973    if( a->index > b->index ) return 1;
974    return 0;
975}
976
977static void
978pieceListSort( Torrent * t, enum piece_sort_state state )
979{
980    assert( state==PIECES_SORTED_BY_INDEX
981         || state==PIECES_SORTED_BY_WEIGHT );
982
983
984    if( state == PIECES_SORTED_BY_WEIGHT )
985    {
986        setComparePieceByWeightTorrent( t );
987        qsort( t->pieces, t->pieceCount, sizeof( struct weighted_piece ), comparePieceByWeight );
988    }
989    else
990        qsort( t->pieces, t->pieceCount, sizeof( struct weighted_piece ), comparePieceByIndex );
991
992    t->pieceSortState = state;
993}
994
995/**
996 * These functions are useful for testing, but too expensive for nightly builds.
997 * let's leave it disabled but add an easy hook to compile it back in
998 */
999#if 1
1000#define assertWeightedPiecesAreSorted(t)
1001#define assertReplicationCountIsExact(t)
1002#else
1003static void
1004assertWeightedPiecesAreSorted( Torrent * t )
1005{
1006    if( !t->endgame )
1007    {
1008        int i;
1009        setComparePieceByWeightTorrent( t );
1010        for( i=0; i<t->pieceCount-1; ++i )
1011            assert( comparePieceByWeight( &t->pieces[i], &t->pieces[i+1] ) <= 0 );
1012    }
1013}
1014static void
1015assertReplicationCountIsExact( Torrent * t )
1016{
1017    /* This assert might fail due to errors of implementations in other
1018     * clients. It happens when receiving duplicate bitfields/HaveAll/HaveNone
1019     * from a client. If a such a behavior is noticed,
1020     * a bug report should be filled to the faulty client. */
1021
1022    size_t piece_i;
1023    const uint16_t * rep = t->pieceReplication;
1024    const size_t piece_count = t->pieceReplicationSize;
1025    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1026    const int peer_count = tr_ptrArraySize( &t->peers );
1027
1028    assert( piece_count == t->tor->info.pieceCount );
1029
1030    for( piece_i=0; piece_i<piece_count; ++piece_i )
1031    {
1032        int peer_i;
1033        uint16_t r = 0;
1034
1035        for( peer_i=0; peer_i<peer_count; ++peer_i )
1036            if( tr_bitsetHas( &peers[peer_i]->have, piece_i ) )
1037                ++r;
1038
1039        assert( rep[piece_i] == r );
1040    }
1041}
1042#endif
1043
1044static struct weighted_piece *
1045pieceListLookup( Torrent * t, tr_piece_index_t index )
1046{
1047    int i;
1048
1049    for( i=0; i<t->pieceCount; ++i )
1050        if( t->pieces[i].index == index )
1051            return &t->pieces[i];
1052
1053    return NULL;
1054}
1055
1056static void
1057pieceListRebuild( Torrent * t )
1058{
1059
1060    if( !tr_torrentIsSeed( t->tor ) )
1061    {
1062        tr_piece_index_t i;
1063        tr_piece_index_t * pool;
1064        tr_piece_index_t poolCount = 0;
1065        const tr_torrent * tor = t->tor;
1066        const tr_info * inf = tr_torrentInfo( tor );
1067        struct weighted_piece * pieces;
1068        int pieceCount;
1069
1070        /* build the new list */
1071        pool = tr_new( tr_piece_index_t, inf->pieceCount );
1072        for( i=0; i<inf->pieceCount; ++i )
1073            if( !inf->pieces[i].dnd )
1074                if( !tr_cpPieceIsComplete( &tor->completion, i ) )
1075                    pool[poolCount++] = i;
1076        pieceCount = poolCount;
1077        pieces = tr_new0( struct weighted_piece, pieceCount );
1078        for( i=0; i<poolCount; ++i ) {
1079            struct weighted_piece * piece = pieces + i;
1080            piece->index = pool[i];
1081            piece->requestCount = 0;
1082            piece->salt = tr_cryptoWeakRandInt( 4096 );
1083        }
1084
1085        /* if we already had a list of pieces, merge it into
1086         * the new list so we don't lose its requestCounts */
1087        if( t->pieces != NULL )
1088        {
1089            struct weighted_piece * o = t->pieces;
1090            struct weighted_piece * oend = o + t->pieceCount;
1091            struct weighted_piece * n = pieces;
1092            struct weighted_piece * nend = n + pieceCount;
1093
1094            pieceListSort( t, PIECES_SORTED_BY_INDEX );
1095
1096            while( o!=oend && n!=nend ) {
1097                if( o->index < n->index )
1098                    ++o;
1099                else if( o->index > n->index )
1100                    ++n;
1101                else
1102                    *n++ = *o++;
1103            }
1104
1105            tr_free( t->pieces );
1106        }
1107
1108        t->pieces = pieces;
1109        t->pieceCount = pieceCount;
1110
1111        pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
1112
1113        /* cleanup */
1114        tr_free( pool );
1115    }
1116}
1117
1118static void
1119pieceListRemovePiece( Torrent * t, tr_piece_index_t piece )
1120{
1121    struct weighted_piece * p;
1122
1123    if(( p = pieceListLookup( t, piece )))
1124    {
1125        const int pos = p - t->pieces;
1126
1127        tr_removeElementFromArray( t->pieces,
1128                                   pos,
1129                                   sizeof( struct weighted_piece ),
1130                                   t->pieceCount-- );
1131
1132        if( t->pieceCount == 0 )
1133        {
1134            tr_free( t->pieces );
1135            t->pieces = NULL;
1136        }
1137    }
1138}
1139
1140static void
1141pieceListResortPiece( Torrent * t, struct weighted_piece * p )
1142{
1143    int pos;
1144    bool isSorted = true;
1145
1146    if( p == NULL )
1147        return;
1148
1149    /* is the torrent already sorted? */
1150    pos = p - t->pieces;
1151    setComparePieceByWeightTorrent( t );
1152    if( isSorted && ( pos > 0 ) && ( comparePieceByWeight( p-1, p ) > 0 ) )
1153        isSorted = false;
1154    if( isSorted && ( pos < t->pieceCount - 1 ) && ( comparePieceByWeight( p, p+1 ) > 0 ) )
1155        isSorted = false;
1156
1157    if( t->pieceSortState != PIECES_SORTED_BY_WEIGHT )
1158    {
1159       pieceListSort( t, PIECES_SORTED_BY_WEIGHT);
1160       isSorted = true;
1161    }
1162
1163    /* if it's not sorted, move it around */
1164    if( !isSorted )
1165    {
1166        bool exact;
1167        const struct weighted_piece tmp = *p;
1168
1169        tr_removeElementFromArray( t->pieces,
1170                                   pos,
1171                                   sizeof( struct weighted_piece ),
1172                                   t->pieceCount-- );
1173
1174        pos = tr_lowerBound( &tmp, t->pieces, t->pieceCount,
1175                             sizeof( struct weighted_piece ),
1176                             comparePieceByWeight, &exact );
1177
1178        memmove( &t->pieces[pos + 1],
1179                 &t->pieces[pos],
1180                 sizeof( struct weighted_piece ) * ( t->pieceCount++ - pos ) );
1181
1182        t->pieces[pos] = tmp;
1183    }
1184
1185    assertWeightedPiecesAreSorted( t );
1186}
1187
1188static void
1189pieceListRemoveRequest( Torrent * t, tr_block_index_t block )
1190{
1191    struct weighted_piece * p;
1192    const tr_piece_index_t index = tr_torBlockPiece( t->tor, block );
1193
1194    if( ((p = pieceListLookup( t, index ))) && ( p->requestCount > 0 ) )
1195    {
1196        --p->requestCount;
1197        pieceListResortPiece( t, p );
1198    }
1199}
1200
1201
1202/****
1203*****
1204*****  Replication count ( for rarest first policy )
1205*****
1206****/
1207
1208/**
1209 * Increase the replication count of this piece and sort it if the
1210 * piece list is already sorted
1211 */
1212static void
1213tr_incrReplicationOfPiece( Torrent * t, const size_t index )
1214{
1215    assert( replicationExists( t ) );
1216    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1217
1218    /* One more replication of this piece is present in the swarm */
1219    ++t->pieceReplication[index];
1220
1221    /* we only resort the piece if the list is already sorted */
1222    if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1223        pieceListResortPiece( t, pieceListLookup( t, index ) );
1224}
1225
1226/**
1227 * Increases the replication count of pieces present in the bitfield
1228 */
1229static void
1230tr_incrReplicationFromBitfield( Torrent * t, const tr_bitfield * b )
1231{
1232    size_t i;
1233    uint16_t * rep = t->pieceReplication;
1234    const size_t n = t->tor->info.pieceCount;
1235
1236    assert( replicationExists( t ) );
1237
1238    for( i=0; i<n; ++i )
1239        if( tr_bitfieldHas( b, i ) )
1240            ++rep[i];
1241
1242    if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1243        invalidatePieceSorting( t );
1244}
1245
1246/**
1247 * Increase the replication count of every piece
1248 */
1249static void
1250tr_incrReplication( Torrent * t )
1251{
1252    int i;
1253    const int n = t->pieceReplicationSize;
1254
1255    assert( replicationExists( t ) );
1256    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1257
1258    for( i=0; i<n; ++i )
1259        ++t->pieceReplication[i];
1260}
1261
1262/**
1263 * Decrease the replication count of pieces present in the bitset.
1264 */
1265static void
1266tr_decrReplicationFromBitfield( Torrent * t, const tr_bitfield * b )
1267{
1268    int i;
1269    const int n = t->pieceReplicationSize;
1270
1271    assert( replicationExists( t ) );
1272    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1273
1274    if( tr_bitfieldHasAll( b ) )
1275    {
1276        for( i=0; i<n; ++i )
1277            --t->pieceReplication[i];
1278    }
1279    else if ( !tr_bitfieldHasNone( b ) )
1280    {
1281        for( i=0; i<n; ++i )
1282            if( tr_bitfieldHas( b, i ) )
1283                --t->pieceReplication[i];
1284
1285        if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1286            invalidatePieceSorting( t );
1287    }
1288}
1289
1290/**
1291***
1292**/
1293
1294void
1295tr_peerMgrRebuildRequests( tr_torrent * tor )
1296{
1297    assert( tr_isTorrent( tor ) );
1298
1299    pieceListRebuild( tor->torrentPeers );
1300}
1301
1302void
1303tr_peerMgrGetNextRequests( tr_torrent           * tor,
1304                           tr_peer              * peer,
1305                           int                    numwant,
1306                           tr_block_index_t     * setme,
1307                           int                  * numgot )
1308{
1309    int i;
1310    int got;
1311    Torrent * t;
1312    struct weighted_piece * pieces;
1313    const tr_bitfield * const have = &peer->have;
1314
1315    /* sanity clause */
1316    assert( tr_isTorrent( tor ) );
1317    assert( peer->clientIsInterested );
1318    assert( !peer->clientIsChoked );
1319    assert( numwant > 0 );
1320
1321    /* walk through the pieces and find blocks that should be requested */
1322    got = 0;
1323    t = tor->torrentPeers;
1324
1325    /* prep the pieces list */
1326    if( t->pieces == NULL )
1327        pieceListRebuild( t );
1328
1329    if( t->pieceSortState != PIECES_SORTED_BY_WEIGHT )
1330        pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
1331
1332    assertReplicationCountIsExact( t );
1333    assertWeightedPiecesAreSorted( t );
1334
1335    updateEndgame( t );
1336    pieces = t->pieces;
1337    for( i=0; i<t->pieceCount && got<numwant; ++i )
1338    {
1339        struct weighted_piece * p = pieces + i;
1340
1341        /* if the peer has this piece that we want... */
1342        if( tr_bitfieldHas( have, p->index ) )
1343        {
1344            tr_block_index_t b;
1345            tr_block_index_t first;
1346            tr_block_index_t last;
1347            tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1348
1349            tr_torGetPieceBlockRange( tor, p->index, &first, &last );
1350
1351            for( b=first; b<=last && got<numwant; ++b )
1352            {
1353                int peerCount;
1354                tr_peer ** peers;
1355
1356                /* don't request blocks we've already got */
1357                if( tr_cpBlockIsComplete( &tor->completion, b ) )
1358                    continue;
1359
1360                /* always add peer if this block has no peers yet */
1361                tr_ptrArrayClear( &peerArr );
1362                getBlockRequestPeers( t, b, &peerArr );
1363                peers = (tr_peer **) tr_ptrArrayPeek( &peerArr, &peerCount );
1364                if( peerCount != 0 )
1365                {
1366                    /* don't make a second block request until the endgame */
1367                    if( !t->endgame )
1368                        continue;
1369
1370                    /* don't have more than two peers requesting this block */
1371                    if( peerCount > 1 )
1372                        continue;
1373
1374                    /* don't send the same request to the same peer twice */
1375                    if( peer == peers[0] )
1376                        continue;
1377
1378                    /* in the endgame allow an additional peer to download a
1379                       block but only if the peer seems to be handling requests
1380                       relatively fast */
1381                    if( peer->pendingReqsToPeer + numwant - got < t->endgame )
1382                        continue;
1383                }
1384
1385                /* update the caller's table */
1386                setme[got++] = b;
1387
1388                /* update our own tables */
1389                requestListAdd( t, b, peer );
1390                ++p->requestCount;
1391            }
1392
1393            tr_ptrArrayDestruct( &peerArr, NULL );
1394        }
1395    }
1396
1397    /* In most cases we've just changed the weights of a small number of pieces.
1398     * So rather than qsort()ing the entire array, it's faster to apply an
1399     * adaptive insertion sort algorithm. */
1400    if( got > 0 )
1401    {
1402        /* not enough requests || last piece modified */
1403        if ( i == t->pieceCount ) --i;
1404
1405        setComparePieceByWeightTorrent( t );
1406        while( --i >= 0 )
1407        {
1408            bool exact;
1409
1410            /* relative position! */
1411            const int newpos = tr_lowerBound( &t->pieces[i], &t->pieces[i + 1],
1412                                              t->pieceCount - (i + 1),
1413                                              sizeof( struct weighted_piece ),
1414                                              comparePieceByWeight, &exact );
1415            if( newpos > 0 )
1416            {
1417                const struct weighted_piece piece = t->pieces[i];
1418                memmove( &t->pieces[i],
1419                         &t->pieces[i + 1],
1420                         sizeof( struct weighted_piece ) * ( newpos ) );
1421                t->pieces[i + newpos] = piece;
1422            }
1423        }
1424    }
1425
1426    assertWeightedPiecesAreSorted( t );
1427    *numgot = got;
1428}
1429
1430bool
1431tr_peerMgrDidPeerRequest( const tr_torrent  * tor,
1432                          const tr_peer     * peer,
1433                          tr_block_index_t    block )
1434{
1435    const Torrent * t = tor->torrentPeers;
1436    return requestListLookup( (Torrent*)t, block, peer ) != NULL;
1437}
1438
1439/* cancel requests that are too old */
1440static void
1441refillUpkeep( int foo UNUSED, short bar UNUSED, void * vmgr )
1442{
1443    time_t now;
1444    time_t too_old;
1445    tr_torrent * tor;
1446    tr_peerMgr * mgr = vmgr;
1447    managerLock( mgr );
1448
1449    now = tr_time( );
1450    too_old = now - REQUEST_TTL_SECS;
1451
1452    tor = NULL;
1453    while(( tor = tr_torrentNext( mgr->session, tor )))
1454    {
1455        Torrent * t = tor->torrentPeers;
1456        const int n = t->requestCount;
1457        if( n > 0 )
1458        {
1459            int keepCount = 0;
1460            int cancelCount = 0;
1461            const struct block_request * it;
1462            const struct block_request * end;
1463            struct block_request * cancel = tr_new( struct block_request, n );
1464
1465            for( it=t->requests, end=it+n; it!=end; ++it )
1466            {
1467                if( ( it->sentAt <= too_old ) && it->peer->msgs && !tr_peerMsgsIsReadingBlock( it->peer->msgs, it->block ) )
1468                    cancel[cancelCount++] = *it;
1469                else
1470                {
1471                    if( it != &t->requests[keepCount] )
1472                        t->requests[keepCount] = *it;
1473                    keepCount++;
1474                }
1475            }
1476
1477            /* prune out the ones we aren't keeping */
1478            t->requestCount = keepCount;
1479
1480            /* send cancel messages for all the "cancel" ones */
1481            for( it=cancel, end=it+cancelCount; it!=end; ++it ) {
1482                if( ( it->peer != NULL ) && ( it->peer->msgs != NULL ) ) {
1483                    tr_historyAdd( &it->peer->cancelsSentToPeer, now, 1 );
1484                    tr_peerMsgsCancel( it->peer->msgs, it->block );
1485                    decrementPendingReqCount( it );
1486                }
1487            }
1488
1489            /* decrement the pending request counts for the timed-out blocks */
1490            for( it=cancel, end=it+cancelCount; it!=end; ++it )
1491                pieceListRemoveRequest( t, it->block );
1492
1493            /* cleanup loop */
1494            tr_free( cancel );
1495        }
1496    }
1497
1498    tr_timerAddMsec( mgr->refillUpkeepTimer, REFILL_UPKEEP_PERIOD_MSEC );
1499    managerUnlock( mgr );
1500}
1501
1502static void
1503addStrike( Torrent * t, tr_peer * peer )
1504{
1505    tordbg( t, "increasing peer %s strike count to %d",
1506            tr_atomAddrStr( peer->atom ), peer->strikes + 1 );
1507
1508    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
1509    {
1510        struct peer_atom * atom = peer->atom;
1511        atom->flags2 |= MYFLAG_BANNED;
1512        peer->doPurge = 1;
1513        tordbg( t, "banning peer %s", tr_atomAddrStr( atom ) );
1514    }
1515}
1516
1517static void
1518gotBadPiece( Torrent * t, tr_piece_index_t pieceIndex )
1519{
1520    tr_torrent *   tor = t->tor;
1521    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
1522
1523    tor->corruptCur += byteCount;
1524    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1525
1526    tr_announcerAddBytes( tor, TR_ANN_CORRUPT, byteCount );
1527}
1528
1529static void
1530peerSuggestedPiece( Torrent            * t UNUSED,
1531                    tr_peer            * peer UNUSED,
1532                    tr_piece_index_t     pieceIndex UNUSED,
1533                    int                  isFastAllowed UNUSED )
1534{
1535#if 0
1536    assert( t );
1537    assert( peer );
1538    assert( peer->msgs );
1539
1540    /* is this a valid piece? */
1541    if(  pieceIndex >= t->tor->info.pieceCount )
1542        return;
1543
1544    /* don't ask for it if we've already got it */
1545    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
1546        return;
1547
1548    /* don't ask for it if they don't have it */
1549    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
1550        return;
1551
1552    /* don't ask for it if we're choked and it's not fast */
1553    if( !isFastAllowed && peer->clientIsChoked )
1554        return;
1555
1556    /* request the blocks that we don't have in this piece */
1557    {
1558        tr_block_index_t b;
1559        tr_block_index_t first;
1560        tr_block_index_t last;
1561        const tr_torrent * tor = t->tor;
1562
1563        tr_torGetPieceBlockRange( t->tor, pieceIndex, &first, &last );
1564
1565        for( b=first; b<=last; ++b )
1566        {
1567            if( !tr_cpBlockIsComplete( tor->completion, b ) )
1568            {
1569                const uint32_t offset = getBlockOffsetInPiece( tor, b );
1570                const uint32_t length = tr_torBlockCountBytes( tor, b );
1571                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
1572                incrementPieceRequests( t, pieceIndex );
1573            }
1574        }
1575    }
1576#endif
1577}
1578
1579static void
1580removeRequestFromTables( Torrent * t, tr_block_index_t block, const tr_peer * peer )
1581{
1582    requestListRemove( t, block, peer );
1583    pieceListRemoveRequest( t, block );
1584}
1585
1586/* peer choked us, or maybe it disconnected.
1587   either way we need to remove all its requests */
1588static void
1589peerDeclinedAllRequests( Torrent * t, const tr_peer * peer )
1590{
1591    int i, n;
1592    tr_block_index_t * blocks = tr_new( tr_block_index_t, t->requestCount );
1593
1594    for( i=n=0; i<t->requestCount; ++i )
1595        if( peer == t->requests[i].peer )
1596            blocks[n++] = t->requests[i].block;
1597
1598    for( i=0; i<n; ++i )
1599        removeRequestFromTables( t, blocks[i], peer );
1600
1601    tr_free( blocks );
1602}
1603
1604static void tr_peerMgrSetBlame( tr_torrent *, tr_piece_index_t, int );
1605
1606static void
1607peerCallbackFunc( tr_peer * peer, const tr_peer_event * e, void * vt )
1608{
1609    Torrent * t = vt;
1610
1611    torrentLock( t );
1612
1613    assert( peer != NULL );
1614
1615    switch( e->eventType )
1616    {
1617        case TR_PEER_PEER_GOT_DATA:
1618        {
1619            const time_t now = tr_time( );
1620            tr_torrent * tor = t->tor;
1621
1622            if( e->wasPieceData )
1623            {
1624                tor->uploadedCur += e->length;
1625                tr_announcerAddBytes( tor, TR_ANN_UP, e->length );
1626                tr_torrentSetActivityDate( tor, now );
1627                tr_torrentSetDirty( tor );
1628            }
1629
1630            /* update the stats */
1631            if( e->wasPieceData )
1632                tr_statsAddUploaded( tor->session, e->length );
1633
1634            /* update our atom */
1635            if( peer->atom && e->wasPieceData )
1636                peer->atom->piece_data_time = now;
1637
1638            break;
1639        }
1640
1641        case TR_PEER_CLIENT_GOT_HAVE:
1642            if( replicationExists( t ) ) {
1643                tr_incrReplicationOfPiece( t, e->pieceIndex );
1644                assertReplicationCountIsExact( t );
1645            }
1646            break;
1647
1648        case TR_PEER_CLIENT_GOT_HAVE_ALL:
1649            if( replicationExists( t ) ) {
1650                tr_incrReplication( t );
1651                assertReplicationCountIsExact( t );
1652            }
1653            break;
1654
1655        case TR_PEER_CLIENT_GOT_HAVE_NONE:
1656            /* noop */
1657            break;
1658
1659        case TR_PEER_CLIENT_GOT_BITFIELD:
1660            assert( e->bitfield != NULL );
1661            if( replicationExists( t ) ) {
1662                tr_incrReplicationFromBitfield( t, e->bitfield );
1663                assertReplicationCountIsExact( t );
1664            }
1665            break;
1666
1667        case TR_PEER_CLIENT_GOT_REJ: {
1668            tr_block_index_t b = _tr_block( t->tor, e->pieceIndex, e->offset );
1669            if( b < t->tor->blockCount )
1670                removeRequestFromTables( t, b, peer );
1671            else
1672                tordbg( t, "Peer %s sent an out-of-range reject message",
1673                           tr_atomAddrStr( peer->atom ) );
1674            break;
1675        }
1676
1677        case TR_PEER_CLIENT_GOT_CHOKE:
1678            peerDeclinedAllRequests( t, peer );
1679            break;
1680
1681        case TR_PEER_CLIENT_GOT_PORT:
1682            if( peer->atom )
1683                peer->atom->port = e->port;
1684            break;
1685
1686        case TR_PEER_CLIENT_GOT_SUGGEST:
1687            peerSuggestedPiece( t, peer, e->pieceIndex, false );
1688            break;
1689
1690        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1691            peerSuggestedPiece( t, peer, e->pieceIndex, true );
1692            break;
1693
1694        case TR_PEER_CLIENT_GOT_DATA:
1695        {
1696            const time_t now = tr_time( );
1697            tr_torrent * tor = t->tor;
1698
1699            if( e->wasPieceData )
1700            {
1701                tor->downloadedCur += e->length;
1702                tr_torrentSetActivityDate( tor, now );
1703                tr_torrentSetDirty( tor );
1704            }
1705
1706            /* update the stats */
1707            if( e->wasPieceData )
1708                tr_statsAddDownloaded( tor->session, e->length );
1709
1710            /* update our atom */
1711            if( peer->atom && e->wasPieceData )
1712                peer->atom->piece_data_time = now;
1713
1714            break;
1715        }
1716
1717        case TR_PEER_CLIENT_GOT_BLOCK:
1718        {
1719            tr_torrent * tor = t->tor;
1720            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1721            int i, peerCount;
1722            tr_peer ** peers;
1723            tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1724
1725            removeRequestFromTables( t, block, peer );
1726            getBlockRequestPeers( t, block, &peerArr );
1727            peers = (tr_peer **) tr_ptrArrayPeek( &peerArr, &peerCount );
1728
1729            /* remove additional block requests and send cancel to peers */
1730            for( i=0; i<peerCount; i++ ) {
1731                tr_peer * p = peers[i];
1732                assert( p != peer );
1733                if( p->msgs ) {
1734                    tr_historyAdd( &p->cancelsSentToPeer, tr_time( ), 1 );
1735                    tr_peerMsgsCancel( p->msgs, block );
1736                }
1737                removeRequestFromTables( t, block, p );
1738            }
1739
1740            tr_ptrArrayDestruct( &peerArr, false );
1741
1742            tr_historyAdd( &peer->blocksSentToClient, tr_time( ), 1 );
1743
1744            if( tr_cpBlockIsComplete( &tor->completion, block ) )
1745            {
1746                /* we already have this block... */
1747                const uint32_t n = tr_torBlockCountBytes( tor, block );
1748                tor->downloadedCur -= MIN( tor->downloadedCur, n );
1749                tordbg( t, "we have this block already..." );
1750            }
1751            else
1752            {
1753                tr_cpBlockAdd( &tor->completion, block );
1754                pieceListResortPiece( t, pieceListLookup( t, e->pieceIndex ) );
1755                tr_torrentSetDirty( tor );
1756
1757                if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1758                {
1759                    const tr_piece_index_t p = e->pieceIndex;
1760                    const bool ok = tr_torrentCheckPiece( tor, p );
1761
1762                    tordbg( t, "[LAZY] checked just-completed piece %zu", (size_t)p );
1763
1764                    if( !ok )
1765                    {
1766                        tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1767                                   (unsigned long)p );
1768                    }
1769
1770                    tr_peerMgrSetBlame( tor, p, ok );
1771
1772                    if( !ok )
1773                    {
1774                        gotBadPiece( t, p );
1775                    }
1776                    else
1777                    {
1778                        int i;
1779                        int peerCount;
1780                        tr_peer ** peers;
1781                        tr_file_index_t fileIndex;
1782
1783                        /* only add this to downloadedCur if we got it from a peer --
1784                         * webseeds shouldn't count against our ratio. As one tracker
1785                         * admin put it, "Those pieces are downloaded directly from the
1786                         * content distributor, not the peers, it is the tracker's job
1787                         * to manage the swarms, not the web server and does not fit
1788                         * into the jurisdiction of the tracker." */
1789                        if( peer->msgs != NULL ) {
1790                            const uint32_t n = tr_torPieceCountBytes( tor, p );
1791                            tr_announcerAddBytes( tor, TR_ANN_DOWN, n );
1792                        }
1793
1794                        peerCount = tr_ptrArraySize( &t->peers );
1795                        peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1796                        for( i=0; i<peerCount; ++i )
1797                            tr_peerMsgsHave( peers[i]->msgs, p );
1798
1799                        for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex ) {
1800                            const tr_file * file = &tor->info.files[fileIndex];
1801                            if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) ) {
1802                                if( tr_cpFileIsComplete( &tor->completion, fileIndex ) ) {
1803                                    tr_cacheFlushFile( tor->session->cache, tor, fileIndex );
1804                                    tr_torrentFileCompleted( tor, fileIndex );
1805                                }
1806                            }
1807                        }
1808
1809                        pieceListRemovePiece( t, p );
1810                    }
1811                }
1812
1813                t->needsCompletenessCheck = true;
1814            }
1815            break;
1816        }
1817
1818        case TR_PEER_ERROR:
1819            if( ( e->err == ERANGE ) || ( e->err == EMSGSIZE ) || ( e->err == ENOTCONN ) )
1820            {
1821                /* some protocol error from the peer */
1822                peer->doPurge = 1;
1823                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1824                        tr_atomAddrStr( peer->atom ) );
1825            }
1826            else
1827            {
1828                tordbg( t, "unhandled error: %s", tr_strerror( e->err ) );
1829            }
1830            break;
1831
1832        default:
1833            assert( 0 );
1834    }
1835
1836    torrentUnlock( t );
1837}
1838
1839static int
1840getDefaultShelfLife( uint8_t from )
1841{
1842    /* in general, peers obtained from firsthand contact
1843     * are better than those from secondhand, etc etc */
1844    switch( from )
1845    {
1846        case TR_PEER_FROM_INCOMING : return 60 * 60 * 6;
1847        case TR_PEER_FROM_LTEP     : return 60 * 60 * 6;
1848        case TR_PEER_FROM_TRACKER  : return 60 * 60 * 3;
1849        case TR_PEER_FROM_DHT      : return 60 * 60 * 3;
1850        case TR_PEER_FROM_PEX      : return 60 * 60 * 2;
1851        case TR_PEER_FROM_RESUME   : return 60 * 60;
1852        case TR_PEER_FROM_LPD      : return 10 * 60;
1853        default                    : return 60 * 60;
1854    }
1855}
1856
1857static void
1858ensureAtomExists( Torrent           * t,
1859                  const tr_address  * addr,
1860                  const tr_port       port,
1861                  const uint8_t       flags,
1862                  const int8_t        seedProbability,
1863                  const uint8_t       from )
1864{
1865    struct peer_atom * a;
1866
1867    assert( tr_address_is_valid( addr ) );
1868    assert( from < TR_PEER_FROM__MAX );
1869
1870    a = getExistingAtom( t, addr );
1871
1872    if( a == NULL )
1873    {
1874        const int jitter = tr_cryptoWeakRandInt( 60*10 );
1875        a = tr_new0( struct peer_atom, 1 );
1876        a->addr = *addr;
1877        a->port = port;
1878        a->flags = flags;
1879        a->fromFirst = from;
1880        a->fromBest = from;
1881        a->shelf_date = tr_time( ) + getDefaultShelfLife( from ) + jitter;
1882        a->blocklisted = -1;
1883        atomSetSeedProbability( a, seedProbability );
1884        tr_ptrArrayInsertSorted( &t->pool, a, compareAtomsByAddress );
1885
1886        tordbg( t, "got a new atom: %s", tr_atomAddrStr( a ) );
1887    }
1888    else
1889    {
1890        if( from < a->fromBest )
1891            a->fromBest = from;
1892
1893        if( a->seedProbability == -1 )
1894            atomSetSeedProbability( a, seedProbability );
1895
1896        a->flags |= flags;
1897    }
1898}
1899
1900static int
1901getMaxPeerCount( const tr_torrent * tor )
1902{
1903    return tor->maxConnectedPeers;
1904}
1905
1906static int
1907getPeerCount( const Torrent * t )
1908{
1909    return tr_ptrArraySize( &t->peers );/* + tr_ptrArraySize( &t->outgoingHandshakes ); */
1910}
1911
1912/* FIXME: this is kind of a mess. */
1913static bool
1914myHandshakeDoneCB( tr_handshake  * handshake,
1915                   tr_peerIo     * io,
1916                   bool            readAnythingFromPeer,
1917                   bool            isConnected,
1918                   const uint8_t * peer_id,
1919                   void          * vmanager )
1920{
1921    bool               ok = isConnected;
1922    bool               success = false;
1923    tr_port            port;
1924    const tr_address * addr;
1925    tr_peerMgr       * manager = vmanager;
1926    Torrent          * t;
1927    tr_handshake     * ours;
1928
1929    assert( io );
1930    assert( tr_isBool( ok ) );
1931
1932    t = tr_peerIoHasTorrentHash( io )
1933        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1934        : NULL;
1935
1936    if( tr_peerIoIsIncoming ( io ) )
1937        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1938                                        handshake, handshakeCompare );
1939    else if( t )
1940        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1941                                        handshake, handshakeCompare );
1942    else
1943        ours = handshake;
1944
1945    assert( ours );
1946    assert( ours == handshake );
1947
1948    if( t )
1949        torrentLock( t );
1950
1951    addr = tr_peerIoGetAddress( io, &port );
1952
1953    if( !ok || !t || !t->isRunning )
1954    {
1955        if( t )
1956        {
1957            struct peer_atom * atom = getExistingAtom( t, addr );
1958            if( atom )
1959            {
1960                ++atom->numFails;
1961
1962                if( !readAnythingFromPeer )
1963                {
1964                    tordbg( t, "marking peer %s as unreachable... numFails is %d", tr_atomAddrStr( atom ), (int)atom->numFails );
1965                    atom->flags2 |= MYFLAG_UNREACHABLE;
1966                }
1967            }
1968        }
1969    }
1970    else /* looking good */
1971    {
1972        struct peer_atom * atom;
1973
1974        ensureAtomExists( t, addr, port, 0, -1, TR_PEER_FROM_INCOMING );
1975        atom = getExistingAtom( t, addr );
1976        atom->time = tr_time( );
1977        atom->piece_data_time = 0;
1978        atom->lastConnectionAt = tr_time( );
1979
1980        if( !tr_peerIoIsIncoming( io ) )
1981        {
1982            atom->flags |= ADDED_F_CONNECTABLE;
1983            atom->flags2 &= ~MYFLAG_UNREACHABLE;
1984        }
1985
1986        /* In principle, this flag specifies whether the peer groks uTP,
1987           not whether it's currently connected over uTP. */
1988        if( io->utp_socket )
1989            atom->flags |= ADDED_F_UTP_FLAGS;
1990
1991        if( atom->flags2 & MYFLAG_BANNED )
1992        {
1993            tordbg( t, "banned peer %s tried to reconnect",
1994                    tr_atomAddrStr( atom ) );
1995        }
1996        else if( tr_peerIoIsIncoming( io )
1997               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1998
1999        {
2000        }
2001        else
2002        {
2003            tr_peer * peer = atom->peer;
2004
2005            if( peer ) /* we already have this peer */
2006            {
2007            }
2008            else
2009            {
2010                peer = getPeer( t, atom );
2011                tr_free( peer->client );
2012
2013                if( !peer_id )
2014                    peer->client = NULL;
2015                else {
2016                    char client[128];
2017                    tr_clientForId( client, sizeof( client ), peer_id );
2018                    peer->client = tr_strdup( client );
2019                }
2020
2021                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
2022                                                                balanced by our unref in peerDelete()  */
2023                tr_peerIoSetParent( peer->io, &t->tor->bandwidth );
2024                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t );
2025
2026                success = true;
2027            }
2028        }
2029    }
2030
2031    if( t )
2032        torrentUnlock( t );
2033
2034    return success;
2035}
2036
2037void
2038tr_peerMgrAddIncoming( tr_peerMgr * manager,
2039                       tr_address * addr,
2040                       tr_port      port,
2041                       int          socket,
2042                       struct UTPSocket * utp_socket )
2043{
2044    tr_session * session;
2045
2046    managerLock( manager );
2047
2048    assert( tr_isSession( manager->session ) );
2049    session = manager->session;
2050
2051    if( tr_sessionIsAddressBlocked( session, addr ) )
2052    {
2053        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_address_to_string( addr ) );
2054        if(socket >= 0)
2055            tr_netClose( session, socket );
2056        else
2057            UTP_Close( utp_socket );
2058    }
2059    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
2060    {
2061        if(socket >= 0)
2062            tr_netClose( session, socket );
2063        else
2064            UTP_Close( utp_socket );
2065    }
2066    else /* we don't have a connection to them yet... */
2067    {
2068        tr_peerIo *    io;
2069        tr_handshake * handshake;
2070
2071        io = tr_peerIoNewIncoming( session, &session->bandwidth, addr, port, socket, utp_socket );
2072
2073        handshake = tr_handshakeNew( io,
2074                                     session->encryptionMode,
2075                                     myHandshakeDoneCB,
2076                                     manager );
2077
2078        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
2079
2080        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
2081                                 handshakeCompare );
2082    }
2083
2084    managerUnlock( manager );
2085}
2086
2087void
2088tr_peerMgrAddPex( tr_torrent * tor, uint8_t from,
2089                  const tr_pex * pex, int8_t seedProbability )
2090{
2091    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
2092    {
2093        Torrent * t = tor->torrentPeers;
2094        managerLock( t->manager );
2095
2096        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
2097            if( tr_address_is_valid_for_peers( &pex->addr, pex->port ) )
2098                ensureAtomExists( t, &pex->addr, pex->port, pex->flags, seedProbability, from );
2099
2100        managerUnlock( t->manager );
2101    }
2102}
2103
2104void
2105tr_peerMgrMarkAllAsSeeds( tr_torrent * tor )
2106{
2107    Torrent * t = tor->torrentPeers;
2108    const int n = tr_ptrArraySize( &t->pool );
2109    struct peer_atom ** it = (struct peer_atom**) tr_ptrArrayBase( &t->pool );
2110    struct peer_atom ** end = it + n;
2111
2112    while( it != end )
2113        atomSetSeed( t, *it++ );
2114}
2115
2116tr_pex *
2117tr_peerMgrCompactToPex( const void *    compact,
2118                        size_t          compactLen,
2119                        const uint8_t * added_f,
2120                        size_t          added_f_len,
2121                        size_t *        pexCount )
2122{
2123    size_t          i;
2124    size_t          n = compactLen / 6;
2125    const uint8_t * walk = compact;
2126    tr_pex *        pex = tr_new0( tr_pex, n );
2127
2128    for( i = 0; i < n; ++i )
2129    {
2130        pex[i].addr.type = TR_AF_INET;
2131        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
2132        memcpy( &pex[i].port, walk, 2 ); walk += 2;
2133        if( added_f && ( n == added_f_len ) )
2134            pex[i].flags = added_f[i];
2135    }
2136
2137    *pexCount = n;
2138    return pex;
2139}
2140
2141tr_pex *
2142tr_peerMgrCompact6ToPex( const void    * compact,
2143                         size_t          compactLen,
2144                         const uint8_t * added_f,
2145                         size_t          added_f_len,
2146                         size_t        * pexCount )
2147{
2148    size_t          i;
2149    size_t          n = compactLen / 18;
2150    const uint8_t * walk = compact;
2151    tr_pex *        pex = tr_new0( tr_pex, n );
2152
2153    for( i = 0; i < n; ++i )
2154    {
2155        pex[i].addr.type = TR_AF_INET6;
2156        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
2157        memcpy( &pex[i].port, walk, 2 ); walk += 2;
2158        if( added_f && ( n == added_f_len ) )
2159            pex[i].flags = added_f[i];
2160    }
2161
2162    *pexCount = n;
2163    return pex;
2164}
2165
2166tr_pex *
2167tr_peerMgrArrayToPex( const void * array,
2168                      size_t       arrayLen,
2169                      size_t      * pexCount )
2170{
2171    size_t          i;
2172    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
2173    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
2174    const uint8_t * walk = array;
2175    tr_pex        * pex = tr_new0( tr_pex, n );
2176
2177    for( i = 0 ; i < n ; i++ ) {
2178        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
2179        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
2180        pex[i].flags = 0x00;
2181        walk += sizeof( tr_address ) + 2;
2182    }
2183
2184    *pexCount = n;
2185    return pex;
2186}
2187
2188/**
2189***
2190**/
2191
2192static void
2193tr_peerMgrSetBlame( tr_torrent     * tor,
2194                    tr_piece_index_t pieceIndex,
2195                    int              success )
2196{
2197    if( !success )
2198    {
2199        int        peerCount, i;
2200        Torrent *  t = tor->torrentPeers;
2201        tr_peer ** peers;
2202
2203        assert( torrentIsLocked( t ) );
2204
2205        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
2206        for( i = 0; i < peerCount; ++i )
2207        {
2208            tr_peer * peer = peers[i];
2209            if( tr_bitfieldHas( &peer->blame, pieceIndex ) )
2210            {
2211                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
2212                        tr_atomAddrStr( peer->atom ),
2213                        pieceIndex, (int)peer->strikes + 1 );
2214                addStrike( t, peer );
2215            }
2216        }
2217    }
2218}
2219
2220int
2221tr_pexCompare( const void * va, const void * vb )
2222{
2223    const tr_pex * a = va;
2224    const tr_pex * b = vb;
2225    int i;
2226
2227    assert( tr_isPex( a ) );
2228    assert( tr_isPex( b ) );
2229
2230    if(( i = tr_address_compare( &a->addr, &b->addr )))
2231        return i;
2232
2233    if( a->port != b->port )
2234        return a->port < b->port ? -1 : 1;
2235
2236    return 0;
2237}
2238
2239#if 0
2240static int
2241peerPrefersCrypto( const tr_peer * peer )
2242{
2243    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
2244        return true;
2245
2246    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
2247        return false;
2248
2249    return tr_peerIoIsEncrypted( peer->io );
2250}
2251#endif
2252
2253/* better goes first */
2254static int
2255compareAtomsByUsefulness( const void * va, const void *vb )
2256{
2257    const struct peer_atom * a = * (const struct peer_atom**) va;
2258    const struct peer_atom * b = * (const struct peer_atom**) vb;
2259
2260    assert( tr_isAtom( a ) );
2261    assert( tr_isAtom( b ) );
2262
2263    if( a->piece_data_time != b->piece_data_time )
2264        return a->piece_data_time > b->piece_data_time ? -1 : 1;
2265    if( a->fromBest != b->fromBest )
2266        return a->fromBest < b->fromBest ? -1 : 1;
2267    if( a->numFails != b->numFails )
2268        return a->numFails < b->numFails ? -1 : 1;
2269
2270    return 0;
2271}
2272
2273static bool
2274isAtomInteresting( const tr_torrent * tor, struct peer_atom * atom )
2275{
2276    if( tr_torrentIsSeed( tor ) && atomIsSeed( atom ) )
2277        return false;
2278
2279    if( peerIsInUse( tor->torrentPeers, atom ) )
2280        return true;
2281
2282    if( isAtomBlocklisted( tor->session, atom ) )
2283        return false;
2284
2285    if( atom->flags2 & MYFLAG_BANNED )
2286        return false;
2287
2288    return true;
2289}
2290
2291int
2292tr_peerMgrGetPeers( tr_torrent   * tor,
2293                    tr_pex      ** setme_pex,
2294                    uint8_t        af,
2295                    uint8_t        list_mode,
2296                    int            maxCount )
2297{
2298    int i;
2299    int n;
2300    int count = 0;
2301    int atomCount = 0;
2302    const Torrent * t = tor->torrentPeers;
2303    struct peer_atom ** atoms = NULL;
2304    tr_pex * pex;
2305    tr_pex * walk;
2306
2307    assert( tr_isTorrent( tor ) );
2308    assert( setme_pex != NULL );
2309    assert( af==TR_AF_INET || af==TR_AF_INET6 );
2310    assert( list_mode==TR_PEERS_CONNECTED || list_mode==TR_PEERS_INTERESTING );
2311
2312    managerLock( t->manager );
2313
2314    /**
2315    ***  build a list of atoms
2316    **/
2317
2318    if( list_mode == TR_PEERS_CONNECTED ) /* connected peers only */
2319    {
2320        int i;
2321        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2322        atomCount = tr_ptrArraySize( &t->peers );
2323        atoms = tr_new( struct peer_atom *, atomCount );
2324        for( i=0; i<atomCount; ++i )
2325            atoms[i] = peers[i]->atom;
2326    }
2327    else /* TR_PEERS_INTERESTING */
2328    {
2329        int i;
2330        struct peer_atom ** atomBase = (struct peer_atom**) tr_ptrArrayBase( &t->pool );
2331        n = tr_ptrArraySize( &t->pool );
2332        atoms = tr_new( struct peer_atom *, n );
2333        for( i=0; i<n; ++i )
2334            if( isAtomInteresting( tor, atomBase[i] ) )
2335                atoms[atomCount++] = atomBase[i];
2336    }
2337
2338    qsort( atoms, atomCount, sizeof( struct peer_atom * ), compareAtomsByUsefulness );
2339
2340    /**
2341    ***  add the first N of them into our return list
2342    **/
2343
2344    n = MIN( atomCount, maxCount );
2345    pex = walk = tr_new0( tr_pex, n );
2346
2347    for( i=0; i<atomCount && count<n; ++i )
2348    {
2349        const struct peer_atom * atom = atoms[i];
2350        if( atom->addr.type == af )
2351        {
2352            assert( tr_address_is_valid( &atom->addr ) );
2353            walk->addr = atom->addr;
2354            walk->port = atom->port;
2355            walk->flags = atom->flags;
2356            ++count;
2357            ++walk;
2358        }
2359    }
2360
2361    qsort( pex, count, sizeof( tr_pex ), tr_pexCompare );
2362
2363    assert( ( walk - pex ) == count );
2364    *setme_pex = pex;
2365
2366    /* cleanup */
2367    tr_free( atoms );
2368    managerUnlock( t->manager );
2369    return count;
2370}
2371
2372static void atomPulse      ( int, short, void * );
2373static void bandwidthPulse ( int, short, void * );
2374static void rechokePulse   ( int, short, void * );
2375static void reconnectPulse ( int, short, void * );
2376
2377static struct event *
2378createTimer( tr_session * session, int msec, void (*callback)(int, short, void *), void * cbdata )
2379{
2380    struct event * timer = evtimer_new( session->event_base, callback, cbdata );
2381    tr_timerAddMsec( timer, msec );
2382    return timer;
2383}
2384
2385static void
2386ensureMgrTimersExist( struct tr_peerMgr * m )
2387{
2388    if( m->atomTimer == NULL )
2389        m->atomTimer = createTimer( m->session, ATOM_PERIOD_MSEC, atomPulse, m );
2390
2391    if( m->bandwidthTimer == NULL )
2392        m->bandwidthTimer = createTimer( m->session, BANDWIDTH_PERIOD_MSEC, bandwidthPulse, m );
2393
2394    if( m->rechokeTimer == NULL )
2395        m->rechokeTimer = createTimer( m->session, RECHOKE_PERIOD_MSEC, rechokePulse, m );
2396
2397    if( m->refillUpkeepTimer == NULL )
2398        m->refillUpkeepTimer = createTimer( m->session, REFILL_UPKEEP_PERIOD_MSEC, refillUpkeep, m );
2399}
2400
2401void
2402tr_peerMgrStartTorrent( tr_torrent * tor )
2403{
2404    Torrent * t = tor->torrentPeers;
2405
2406    assert( tr_isTorrent( tor ) );
2407    assert( tr_torrentIsLocked( tor ) );
2408
2409    ensureMgrTimersExist( t->manager );
2410
2411    t->isRunning = true;
2412    t->maxPeers = t->tor->maxConnectedPeers;
2413    t->pieceSortState = PIECES_UNSORTED;
2414
2415    rechokePulse( 0, 0, t->manager );
2416}
2417
2418static void
2419stopTorrent( Torrent * t )
2420{
2421    tr_peer * peer;
2422
2423    t->isRunning = false;
2424
2425    replicationFree( t );
2426    invalidatePieceSorting( t );
2427
2428    /* disconnect the peers. */
2429    while(( peer = tr_ptrArrayPop( &t->peers )))
2430        peerDelete( t, peer );
2431
2432    /* disconnect the handshakes. handshakeAbort calls handshakeDoneCB(),
2433     * which removes the handshake from t->outgoingHandshakes... */
2434    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
2435        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
2436}
2437
2438void
2439tr_peerMgrStopTorrent( tr_torrent * tor )
2440{
2441    assert( tr_isTorrent( tor ) );
2442    assert( tr_torrentIsLocked( tor ) );
2443
2444    stopTorrent( tor->torrentPeers );
2445}
2446
2447void
2448tr_peerMgrAddTorrent( tr_peerMgr * manager, tr_torrent * tor )
2449{
2450    assert( tr_isTorrent( tor ) );
2451    assert( tr_torrentIsLocked( tor ) );
2452    assert( tor->torrentPeers == NULL );
2453
2454    tor->torrentPeers = torrentNew( manager, tor );
2455}
2456
2457void
2458tr_peerMgrRemoveTorrent( tr_torrent * tor )
2459{
2460    assert( tr_isTorrent( tor ) );
2461    assert( tr_torrentIsLocked( tor ) );
2462
2463    stopTorrent( tor->torrentPeers );
2464    torrentFree( tor->torrentPeers );
2465}
2466
2467void
2468tr_peerUpdateProgress( tr_torrent * tor, tr_peer * peer )
2469{
2470    const tr_bitfield * have = &peer->have;
2471
2472    if( tr_bitfieldHasAll( have ) )
2473    {
2474        peer->progress = 1.0;
2475    }
2476    else if( tr_bitfieldHasNone( have ) )
2477    {
2478        peer->progress = 0.0;
2479    }
2480    else
2481    {
2482        const float true_count = tr_bitfieldCountTrueBits( have );
2483
2484        if( tr_torrentHasMetadata( tor ) )
2485            peer->progress = true_count / tor->info.pieceCount;
2486        else /* without pieceCount, this result is only a best guess... */
2487            peer->progress = true_count / ( have->bit_count + 1 );
2488    }
2489
2490    if( peer->atom && ( peer->progress >= 1.0 ) )
2491        atomSetSeed( tor->torrentPeers, peer->atom );
2492}
2493
2494void
2495tr_peerMgrOnTorrentGotMetainfo( tr_torrent * tor )
2496{
2497    int i;
2498    const int peerCount = tr_ptrArraySize( &tor->torrentPeers->peers );
2499    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &tor->torrentPeers->peers );
2500
2501    /* some peer_msgs' progress fields may not be accurate if we
2502       didn't have the metadata before now... so refresh them all... */
2503    for( i=0; i<peerCount; ++i )
2504        tr_peerUpdateProgress( tor, peers[i] );
2505}
2506
2507void
2508tr_peerMgrTorrentAvailability( const tr_torrent * tor, int8_t * tab, unsigned int tabCount )
2509{
2510    assert( tr_isTorrent( tor ) );
2511    assert( torrentIsLocked( tor->torrentPeers ) );
2512    assert( tab != NULL );
2513    assert( tabCount > 0 );
2514
2515    memset( tab, 0, tabCount );
2516
2517    if( tr_torrentHasMetadata( tor ) )
2518    {
2519        tr_piece_index_t i;
2520        const int peerCount = tr_ptrArraySize( &tor->torrentPeers->peers );
2521        const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &tor->torrentPeers->peers );
2522        const float interval = tor->info.pieceCount / (float)tabCount;
2523        const bool isSeed = tr_cpGetStatus( &tor->completion ) == TR_SEED;
2524
2525        for( i=0; i<tabCount; ++i )
2526        {
2527            const int piece = i * interval;
2528
2529            if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
2530                tab[i] = -1;
2531            else if( peerCount ) {
2532                int j;
2533                for( j=0; j<peerCount; ++j )
2534                    if( tr_bitfieldHas( &peers[j]->have, piece ) )
2535                        ++tab[i];
2536            }
2537        }
2538    }
2539}
2540
2541static bool
2542peerIsSeed( const tr_peer * peer )
2543{
2544    if( peer->progress >= 1.0 )
2545        return true;
2546
2547    if( peer->atom && atomIsSeed( peer->atom ) )
2548        return true;
2549
2550    return false;
2551}
2552
2553/* count how many pieces we want that connected peers have */
2554uint64_t
2555tr_peerMgrGetDesiredAvailable( const tr_torrent * tor )
2556{
2557    size_t i;
2558    size_t n;
2559    uint64_t desiredAvailable;
2560    const Torrent * t = tor->torrentPeers;
2561
2562    /* common shortcuts... */
2563
2564    if( tr_torrentIsSeed( t->tor ) )
2565        return 0;
2566
2567    if( !tr_torrentHasMetadata( tor ) )
2568        return 0;
2569
2570    n = tr_ptrArraySize( &t->peers );
2571    if( n == 0 )
2572        return 0;
2573    else {
2574        const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2575        for( i=0; i<n; ++i )
2576            if( peers[i]->atom && atomIsSeed( peers[i]->atom ) )
2577                return tr_cpLeftUntilDone( &tor->completion );
2578    }
2579
2580    if( !t->pieceReplication || !t->pieceReplicationSize )
2581        return 0;
2582
2583    /* do it the hard way */
2584
2585    desiredAvailable = 0;
2586    for( i=0, n=tor->info.pieceCount; i<n; ++i )
2587    {
2588        if( tor->info.pieces[i].dnd )
2589            continue;
2590        if( t->pieceReplicationSize >= i )
2591            continue;
2592        if( t->pieceReplication[i] == 0 )
2593            continue;
2594
2595        desiredAvailable += tr_cpMissingBytesInPiece( &t->tor->completion, i );
2596    }
2597
2598    return desiredAvailable;
2599}
2600
2601void
2602tr_peerMgrTorrentStats( tr_torrent  * tor,
2603                        int         * setmePeersConnected,
2604                        int         * setmeWebseedsSendingToUs,
2605                        int         * setmePeersSendingToUs,
2606                        int         * setmePeersGettingFromUs,
2607                        int         * setmePeersFrom )
2608{
2609    int i, size;
2610    const Torrent * t = tor->torrentPeers;
2611    const tr_peer ** peers;
2612
2613    assert( tr_torrentIsLocked( tor ) );
2614
2615    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2616    size = tr_ptrArraySize( &t->peers );
2617
2618    *setmePeersConnected       = 0;
2619    *setmePeersGettingFromUs   = 0;
2620    *setmePeersSendingToUs     = 0;
2621    *setmeWebseedsSendingToUs  = 0;
2622
2623    for( i=0; i<TR_PEER_FROM__MAX; ++i )
2624        setmePeersFrom[i] = 0;
2625
2626    for( i=0; i<size; ++i )
2627    {
2628        const tr_peer * peer = peers[i];
2629        const struct peer_atom * atom = peer->atom;
2630
2631        if( peer->io == NULL ) /* not connected */
2632            continue;
2633
2634        ++*setmePeersConnected;
2635
2636        ++setmePeersFrom[atom->fromFirst];
2637
2638        if( clientIsDownloadingFrom( tor, peer ) )
2639            ++*setmePeersSendingToUs;
2640
2641        if( clientIsUploadingTo( peer ) )
2642            ++*setmePeersGettingFromUs;
2643    }
2644
2645    *setmeWebseedsSendingToUs = countActiveWebseeds( t );
2646}
2647
2648double*
2649tr_peerMgrWebSpeeds_KBps( const tr_torrent * tor )
2650{
2651    int i;
2652    const Torrent * t = tor->torrentPeers;
2653    const int webseedCount = tr_ptrArraySize( &t->webseeds );
2654    const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2655    const uint64_t now = tr_time_msec( );
2656    double * ret = tr_new0( double, webseedCount );
2657
2658    assert( tr_isTorrent( tor ) );
2659    assert( tr_torrentIsLocked( tor ) );
2660    assert( t->manager != NULL );
2661    assert( webseedCount == tor->info.webseedCount );
2662
2663    for( i=0; i<webseedCount; ++i ) {
2664        int Bps;
2665        if( tr_webseedGetSpeed_Bps( webseeds[i], now, &Bps ) )
2666            ret[i] = Bps / (double)tr_speed_K;
2667        else
2668            ret[i] = -1.0;
2669    }
2670
2671    return ret;
2672}
2673
2674int
2675tr_peerGetPieceSpeed_Bps( const tr_peer * peer, uint64_t now, tr_direction direction )
2676{
2677    return peer->io ? tr_peerIoGetPieceSpeed_Bps( peer->io, now, direction ) : 0.0;
2678}
2679
2680struct tr_peer_stat *
2681tr_peerMgrPeerStats( const tr_torrent * tor, int * setmeCount )
2682{
2683    int i;
2684    const Torrent * t = tor->torrentPeers;
2685    const int size = tr_ptrArraySize( &t->peers );
2686    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2687    const uint64_t now_msec = tr_time_msec( );
2688    const time_t now = tr_time();
2689    tr_peer_stat * ret = tr_new0( tr_peer_stat, size );
2690
2691    assert( tr_isTorrent( tor ) );
2692    assert( tr_torrentIsLocked( tor ) );
2693    assert( t->manager );
2694
2695    for( i=0; i<size; ++i )
2696    {
2697        char *                   pch;
2698        const tr_peer *          peer = peers[i];
2699        const struct peer_atom * atom = peer->atom;
2700        tr_peer_stat *           stat = ret + i;
2701
2702        tr_address_to_string_with_buf( &atom->addr, stat->addr, sizeof( stat->addr ) );
2703        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
2704                   sizeof( stat->client ) );
2705        stat->port                = ntohs( peer->atom->port );
2706        stat->from                = atom->fromFirst;
2707        stat->progress            = peer->progress;
2708        stat->isUTP               = peer->io->utp_socket != NULL;
2709        stat->isEncrypted         = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
2710        stat->rateToPeer_KBps     = toSpeedKBps( tr_peerGetPieceSpeed_Bps( peer, now_msec, TR_CLIENT_TO_PEER ) );
2711        stat->rateToClient_KBps   = toSpeedKBps( tr_peerGetPieceSpeed_Bps( peer, now_msec, TR_PEER_TO_CLIENT ) );
2712        stat->peerIsChoked        = peer->peerIsChoked;
2713        stat->peerIsInterested    = peer->peerIsInterested;
2714        stat->clientIsChoked      = peer->clientIsChoked;
2715        stat->clientIsInterested  = peer->clientIsInterested;
2716        stat->isIncoming          = tr_peerIoIsIncoming( peer->io );
2717        stat->isDownloadingFrom   = clientIsDownloadingFrom( tor, peer );
2718        stat->isUploadingTo       = clientIsUploadingTo( peer );
2719        stat->isSeed              = peerIsSeed( peer );
2720
2721        stat->blocksToPeer        = tr_historyGet( &peer->blocksSentToPeer,    now, CANCEL_HISTORY_SEC );
2722        stat->blocksToClient      = tr_historyGet( &peer->blocksSentToClient,  now, CANCEL_HISTORY_SEC );
2723        stat->cancelsToPeer       = tr_historyGet( &peer->cancelsSentToPeer,   now, CANCEL_HISTORY_SEC );
2724        stat->cancelsToClient     = tr_historyGet( &peer->cancelsSentToClient, now, CANCEL_HISTORY_SEC );
2725
2726        stat->pendingReqsToPeer   = peer->pendingReqsToPeer;
2727        stat->pendingReqsToClient = peer->pendingReqsToClient;
2728
2729        pch = stat->flagStr;
2730        if( stat->isUTP ) *pch++ = 'T';
2731        if( t->optimistic == peer ) *pch++ = 'O';
2732        if( stat->isDownloadingFrom ) *pch++ = 'D';
2733        else if( stat->clientIsInterested ) *pch++ = 'd';
2734        if( stat->isUploadingTo ) *pch++ = 'U';
2735        else if( stat->peerIsInterested ) *pch++ = 'u';
2736        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
2737        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
2738        if( stat->isEncrypted ) *pch++ = 'E';
2739        if( stat->from == TR_PEER_FROM_DHT ) *pch++ = 'H';
2740        else if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
2741        if( stat->isIncoming ) *pch++ = 'I';
2742        *pch = '\0';
2743    }
2744
2745    *setmeCount = size;
2746
2747    return ret;
2748}
2749
2750/**
2751***
2752**/
2753
2754void
2755tr_peerMgrClearInterest( tr_torrent * tor )
2756{
2757    int i;
2758    Torrent * t = tor->torrentPeers;
2759    const int peerCount = tr_ptrArraySize( &t->peers );
2760
2761    assert( tr_isTorrent( tor ) );
2762    assert( tr_torrentIsLocked( tor ) );
2763
2764    for( i=0; i<peerCount; ++i )
2765    {
2766        const tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2767        tr_peerMsgsSetInterested( peer->msgs, false );
2768    }
2769}
2770
2771/* do we still want this piece and does the peer have it? */
2772static bool
2773isPieceInteresting( const tr_torrent * tor, const tr_peer * peer, tr_piece_index_t index )
2774{
2775    return ( !tor->info.pieces[index].dnd ) /* we want it */
2776        && ( !tr_cpPieceIsComplete( &tor->completion, index ) )  /* we don't have it */
2777        && ( tr_bitfieldHas( &peer->have, index ) ); /* peer has it */
2778}
2779
2780/* does this peer have any pieces that we want? */
2781static bool
2782isPeerInteresting( const tr_torrent * tor, const tr_peer * peer )
2783{
2784    tr_piece_index_t i, n;
2785
2786    if ( tr_torrentIsSeed( tor ) )
2787        return false;
2788
2789    if( !tr_torrentIsPieceTransferAllowed( tor, TR_PEER_TO_CLIENT ) )
2790        return false;
2791
2792    for( i=0, n=tor->info.pieceCount; i<n; ++i )
2793        if( isPieceInteresting( tor, peer, i ) )
2794            return true;
2795
2796    return false;
2797}
2798
2799/* determines who we send "interested" messages to */
2800static void
2801rechokeDownloads( Torrent * t )
2802{
2803    int i;
2804    const time_t now = tr_time( );
2805    const int MIN_INTERESTING_PEERS = 5;
2806    const int peerCount = tr_ptrArraySize( &t->peers );
2807    int maxPeers         = 0;
2808
2809    int badCount         = 0;
2810    int goodCount        = 0;
2811    int untestedCount    = 0;
2812    tr_peer ** bad       = tr_new( tr_peer*, peerCount );
2813    tr_peer ** good      = tr_new( tr_peer*, peerCount );
2814    tr_peer ** untested  = tr_new( tr_peer*, peerCount );
2815
2816    /* decide how many peers to be interested in */
2817    {
2818        int blocks = 0;
2819        int cancels = 0;
2820        time_t timeSinceCancel;
2821
2822        /* Count up how many blocks & cancels each peer has.
2823         *
2824         * There are two situations where we send out cancels --
2825         *
2826         * 1. We've got unresponsive peers, which is handled by deciding
2827         *    -which- peers to be interested in.
2828         *
2829         * 2. We've hit our bandwidth cap, which is handled by deciding
2830         *    -how many- peers to be interested in.
2831         *
2832         * We're working on 2. here, so we need to ignore unresponsive
2833         * peers in our calculations lest they confuse Transmission into
2834         * thinking it's hit its bandwidth cap.
2835         */
2836        for( i=0; i<peerCount; ++i )
2837        {
2838            const tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2839            const int b = tr_historyGet( &peer->blocksSentToClient, now, CANCEL_HISTORY_SEC );
2840            const int c = tr_historyGet( &peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC );
2841
2842            if( b == 0 ) /* ignore unresponsive peers, as described above */
2843                continue;
2844
2845            blocks += b;
2846            cancels += c;
2847        }
2848
2849        if( cancels > 0 )
2850        {
2851            /* cancelRate: of the block requests we've recently made, the percentage we cancelled.
2852             * higher values indicate more congestion. */
2853            const double cancelRate = cancels / (double)(cancels + blocks);
2854            const double mult = 1 - MIN( cancelRate, 0.5 );
2855            maxPeers = t->interestedCount * mult;
2856            tordbg( t, "cancel rate is %.3f -- reducing the "
2857                       "number of peers we're interested in by %.0f percent",
2858                       cancelRate, mult * 100 );
2859            t->lastCancel = now;
2860        }
2861
2862        timeSinceCancel = now - t->lastCancel;
2863        if( timeSinceCancel )
2864        {
2865            const int maxIncrease = 15;
2866            const time_t maxHistory = 2 * CANCEL_HISTORY_SEC;
2867            const double mult = MIN( timeSinceCancel, maxHistory ) / (double) maxHistory;
2868            const int inc = maxIncrease * mult;
2869            maxPeers = t->maxPeers + inc;
2870            tordbg( t, "time since last cancel is %li -- increasing the "
2871                       "number of peers we're interested in by %d",
2872                       timeSinceCancel, inc );
2873        }
2874    }
2875
2876    /* don't let the previous section's number tweaking go too far... */
2877    if( maxPeers < MIN_INTERESTING_PEERS )
2878        maxPeers = MIN_INTERESTING_PEERS;
2879    if( maxPeers > t->tor->maxConnectedPeers )
2880        maxPeers = t->tor->maxConnectedPeers;
2881
2882    t->maxPeers = maxPeers;
2883
2884    /* separate the peers into "good" (ones with a low cancel-to-block ratio),
2885     * untested peers, and "bad" (ones with a high cancel-to-block ratio).
2886     * That's the order in which we'll choose who to show interest in */
2887    {
2888        /* Randomize the peer array so the peers in the three groups will be unsorted... */
2889        int n = peerCount;
2890        tr_peer ** peers = tr_memdup( tr_ptrArrayBase( &t->peers ), n * sizeof( tr_peer * ) );
2891
2892        while( n > 0 )
2893        {
2894            const int i = tr_cryptoWeakRandInt( n );
2895            tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2896
2897            if( !isPeerInteresting( t->tor, peer ) )
2898            {
2899                tr_peerMsgsSetInterested( peer->msgs, false );
2900            }
2901            else
2902            {
2903                const int blocks = tr_historyGet( &peer->blocksSentToClient, now, CANCEL_HISTORY_SEC );
2904                const int cancels = tr_historyGet( &peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC );
2905
2906                if( !blocks && !cancels )
2907                    untested[untestedCount++] = peer;
2908                else if( !cancels )
2909                    good[goodCount++] = peer;
2910                else if( !blocks )
2911                    bad[badCount++] = peer;
2912                else if( ( cancels * 10 ) < blocks )
2913                    good[goodCount++] = peer;
2914                else
2915                    bad[badCount++] = peer;
2916            }
2917
2918            /* remove 'peer' from the array 'peers' */
2919            if( i != n - 1 )
2920                peers[i] = peers[n-1];
2921            --n;
2922        }
2923
2924        tr_free( peers );
2925    }
2926
2927    t->interestedCount = 0;
2928
2929    /* We've decided (1) how many peers to be interested in,
2930     * and (2) which peers are the best candidates,
2931     * Now it's time to update our `interest' flags. */
2932    for( i=0; i<goodCount; ++i ) {
2933        const bool b = t->interestedCount < maxPeers;
2934        tr_peerMsgsSetInterested( good[i]->msgs, b );
2935        if( b )
2936            ++t->interestedCount;
2937    }
2938    for( i=0; i<untestedCount; ++i ) {
2939        const bool b = t->interestedCount < maxPeers;
2940        tr_peerMsgsSetInterested( untested[i]->msgs, b );
2941        if( b )
2942            ++t->interestedCount;
2943    }
2944    for( i=0; i<badCount; ++i ) {
2945        const bool b = t->interestedCount < maxPeers;
2946        tr_peerMsgsSetInterested( bad[i]->msgs, b );
2947        if( b )
2948            ++t->interestedCount;
2949    }
2950
2951/*fprintf( stderr, "num interested: %d\n", t->interestedCount );*/
2952
2953    /* cleanup */
2954    tr_free( untested );
2955    tr_free( good );
2956    tr_free( bad );
2957}
2958
2959/**
2960***
2961**/
2962
2963struct ChokeData
2964{
2965    bool            isInterested;
2966    bool            wasChoked;
2967    bool            isChoked;
2968    int             rate;
2969    int             salt;
2970    tr_peer *       peer;
2971};
2972
2973static int
2974compareChoke( const void * va, const void * vb )
2975{
2976    const struct ChokeData * a = va;
2977    const struct ChokeData * b = vb;
2978
2979    if( a->rate != b->rate ) /* prefer higher overall speeds */
2980        return a->rate > b->rate ? -1 : 1;
2981
2982    if( a->wasChoked != b->wasChoked ) /* prefer unchoked */
2983        return a->wasChoked ? 1 : -1;
2984
2985    if( a->salt != b->salt ) /* random order */
2986        return a->salt - b->salt;
2987
2988    return 0;
2989}
2990
2991/* is this a new connection? */
2992static int
2993isNew( const tr_peer * peer )
2994{
2995    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
2996}
2997
2998/* get a rate for deciding which peers to choke and unchoke. */
2999static int
3000getRate( const tr_torrent * tor, struct peer_atom * atom, uint64_t now )
3001{
3002    int Bps;
3003
3004    if( tr_torrentIsSeed( tor ) )
3005        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_CLIENT_TO_PEER );
3006
3007    /* downloading a private torrent... take upload speed into account
3008     * because there may only be a small window of opportunity to share */
3009    else if( tr_torrentIsPrivate( tor ) )
3010        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_PEER_TO_CLIENT )
3011            + tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_CLIENT_TO_PEER );
3012
3013    /* downloading a public torrent */
3014    else
3015        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_PEER_TO_CLIENT );
3016
3017    /* convert it to bytes per second */
3018    return Bps;
3019}
3020
3021static inline bool
3022isBandwidthMaxedOut( const tr_bandwidth * b,
3023                     const uint64_t now_msec, tr_direction dir )
3024{
3025    if( !tr_bandwidthIsLimited( b, dir ) )
3026        return false;
3027    else {
3028        const int got = tr_bandwidthGetPieceSpeed_Bps( b, now_msec, dir );
3029        const int want = tr_bandwidthGetDesiredSpeed_Bps( b, dir );
3030        return got >= want;
3031    }
3032}
3033
3034static void
3035rechokeUploads( Torrent * t, const uint64_t now )
3036{
3037    int i, size, unchokedInterested;
3038    const int peerCount = tr_ptrArraySize( &t->peers );
3039    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
3040    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
3041    const tr_session * session = t->manager->session;
3042    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
3043    const bool isMaxedOut = isBandwidthMaxedOut( &t->tor->bandwidth, now, TR_UP );
3044
3045    assert( torrentIsLocked( t ) );
3046
3047    /* an optimistic unchoke peer's "optimistic"
3048     * state lasts for N calls to rechokeUploads(). */
3049    if( t->optimisticUnchokeTimeScaler > 0 )
3050        t->optimisticUnchokeTimeScaler--;
3051    else
3052        t->optimistic = NULL;
3053
3054    /* sort the peers by preference and rate */
3055    for( i = 0, size = 0; i < peerCount; ++i )
3056    {
3057        tr_peer * peer = peers[i];
3058        struct peer_atom * atom = peer->atom;
3059
3060        if( peerIsSeed( peer ) ) /* choke seeds and partial seeds */
3061        {
3062            tr_peerMsgsSetChoke( peer->msgs, true );
3063        }
3064        else if( chokeAll ) /* choke everyone if we're not uploading */
3065        {
3066            tr_peerMsgsSetChoke( peer->msgs, true );
3067        }
3068        else if( peer != t->optimistic )
3069        {
3070            struct ChokeData * n = &choke[size++];
3071            n->peer         = peer;
3072            n->isInterested = peer->peerIsInterested;
3073            n->wasChoked    = peer->peerIsChoked;
3074            n->rate         = getRate( t->tor, atom, now );
3075            n->salt         = tr_cryptoWeakRandInt( INT_MAX );
3076            n->isChoked     = true;
3077        }
3078    }
3079
3080    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
3081
3082    /**
3083     * Reciprocation and number of uploads capping is managed by unchoking
3084     * the N peers which have the best upload rate and are interested.
3085     * This maximizes the client's download rate. These N peers are
3086     * referred to as downloaders, because they are interested in downloading
3087     * from the client.
3088     *
3089     * Peers which have a better upload rate (as compared to the downloaders)
3090     * but aren't interested get unchoked. If they become interested, the
3091     * downloader with the worst upload rate gets choked. If a client has
3092     * a complete file, it uses its upload rate rather than its download
3093     * rate to decide which peers to unchoke.
3094     *
3095     * If our bandwidth is maxed out, don't unchoke any more peers.
3096     */
3097    unchokedInterested = 0;
3098    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
3099        choke[i].isChoked = isMaxedOut ? choke[i].wasChoked : false;
3100        if( choke[i].isInterested )
3101            ++unchokedInterested;
3102    }
3103
3104    /* optimistic unchoke */
3105    if( !t->optimistic && !isMaxedOut && (i<size) )
3106    {
3107        int n;
3108        struct ChokeData * c;
3109        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
3110
3111        for( ; i<size; ++i )
3112        {
3113            if( choke[i].isInterested )
3114            {
3115                const tr_peer * peer = choke[i].peer;
3116                int x = 1, y;
3117                if( isNew( peer ) ) x *= 3;
3118                for( y=0; y<x; ++y )
3119                    tr_ptrArrayAppend( &randPool, &choke[i] );
3120            }
3121        }
3122
3123        if(( n = tr_ptrArraySize( &randPool )))
3124        {
3125            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
3126            c->isChoked = false;
3127            t->optimistic = c->peer;
3128            t->optimisticUnchokeTimeScaler = OPTIMISTIC_UNCHOKE_MULTIPLIER;
3129        }
3130
3131        tr_ptrArrayDestruct( &randPool, NULL );
3132    }
3133
3134    for( i=0; i<size; ++i )
3135        tr_peerMsgsSetChoke( choke[i].peer->msgs, choke[i].isChoked );
3136
3137    /* cleanup */
3138    tr_free( choke );
3139}
3140
3141static void
3142rechokePulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3143{
3144    tr_torrent * tor = NULL;
3145    tr_peerMgr * mgr = vmgr;
3146    const uint64_t now = tr_time_msec( );
3147
3148    managerLock( mgr );
3149
3150    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3151        if( tor->isRunning ) {
3152            Torrent * t = tor->torrentPeers;
3153            if( tr_ptrArrayEmpty( &t->peers ) )
3154                continue;
3155            rechokeUploads( t, now );
3156            if( !tr_torrentIsSeed( tor ) )
3157                rechokeDownloads( t );
3158        }
3159    }
3160
3161    tr_timerAddMsec( mgr->rechokeTimer, RECHOKE_PERIOD_MSEC );
3162    managerUnlock( mgr );
3163}
3164
3165/***
3166****
3167****  Life and Death
3168****
3169***/
3170
3171static bool
3172shouldPeerBeClosed( const Torrent    * t,
3173                    const tr_peer    * peer,
3174                    int                peerCount,
3175                    const time_t       now )
3176{
3177    const tr_torrent *       tor = t->tor;
3178    const struct peer_atom * atom = peer->atom;
3179
3180    /* if it's marked for purging, close it */
3181    if( peer->doPurge )
3182    {
3183        tordbg( t, "purging peer %s because its doPurge flag is set",
3184                tr_atomAddrStr( atom ) );
3185        return true;
3186    }
3187
3188    /* disconnect if we're both seeds and enough time has passed for PEX */
3189    if( tr_torrentIsSeed( tor ) && peerIsSeed( peer ) )
3190        return !tr_torrentAllowsPex(tor) || (now-atom->time>=30);
3191
3192    /* disconnect if it's been too long since piece data has been transferred.
3193     * this is on a sliding scale based on number of available peers... */
3194    {
3195        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
3196        /* if we have >= relaxIfFewerThan, strictness is 100%.
3197         * if we have zero connections, strictness is 0% */
3198        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
3199                               ? 1.0
3200                               : peerCount / (float)relaxStrictnessIfFewerThanN;
3201        const int lo = MIN_UPLOAD_IDLE_SECS;
3202        const int hi = MAX_UPLOAD_IDLE_SECS;
3203        const int limit = hi - ( ( hi - lo ) * strictness );
3204        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
3205/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
3206        if( idleTime > limit ) {
3207            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
3208                       tr_atomAddrStr( atom ), idleTime );
3209            return true;
3210        }
3211    }
3212
3213    return false;
3214}
3215
3216static tr_peer **
3217getPeersToClose( Torrent * t, const time_t now_sec, int * setmeSize )
3218{
3219    int i, peerCount, outsize;
3220    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
3221    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
3222
3223    assert( torrentIsLocked( t ) );
3224
3225    for( i = outsize = 0; i < peerCount; ++i )
3226        if( shouldPeerBeClosed( t, peers[i], peerCount, now_sec ) )
3227            ret[outsize++] = peers[i];
3228
3229    *setmeSize = outsize;
3230    return ret;
3231}
3232
3233static int
3234getReconnectIntervalSecs( const struct peer_atom * atom, const time_t now )
3235{
3236    int sec;
3237
3238    /* if we were recently connected to this peer and transferring piece
3239     * data, try to reconnect to them sooner rather that later -- we don't
3240     * want network troubles to get in the way of a good peer. */
3241    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
3242        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3243
3244    /* don't allow reconnects more often than our minimum */
3245    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
3246        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3247
3248    /* otherwise, the interval depends on how many times we've tried
3249     * and failed to connect to the peer */
3250    else switch( atom->numFails ) {
3251        case 0: sec = 0; break;
3252        case 1: sec = 5; break;
3253        case 2: sec = 2 * 60; break;
3254        case 3: sec = 15 * 60; break;
3255        case 4: sec = 30 * 60; break;
3256        case 5: sec = 60 * 60; break;
3257        default: sec = 120 * 60; break;
3258    }
3259
3260    /* penalize peers that were unreachable the last time we tried */
3261    if( atom->flags2 & MYFLAG_UNREACHABLE )
3262        sec += sec;
3263
3264    dbgmsg( "reconnect interval for %s is %d seconds", tr_atomAddrStr( atom ), sec );
3265    return sec;
3266}
3267
3268static void
3269removePeer( Torrent * t, tr_peer * peer )
3270{
3271    tr_peer * removed;
3272    struct peer_atom * atom = peer->atom;
3273
3274    assert( torrentIsLocked( t ) );
3275    assert( atom );
3276
3277    atom->time = tr_time( );
3278
3279    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
3280
3281    if( replicationExists( t ) )
3282        tr_decrReplicationFromBitfield( t, &peer->have );
3283
3284    assert( removed == peer );
3285    peerDelete( t, removed );
3286}
3287
3288static void
3289closePeer( Torrent * t, tr_peer * peer )
3290{
3291    struct peer_atom * atom;
3292
3293    assert( t != NULL );
3294    assert( peer != NULL );
3295
3296    atom = peer->atom;
3297
3298    /* if we transferred piece data, then they might be good peers,
3299       so reset their `numFails' weight to zero. otherwise we connected
3300       to them fruitlessly, so mark it as another fail */
3301    if( atom->piece_data_time ) {
3302        tordbg( t, "resetting atom %s numFails to 0", tr_atomAddrStr(atom) );
3303        atom->numFails = 0;
3304    } else {
3305        ++atom->numFails;
3306        tordbg( t, "incremented atom %s numFails to %d", tr_atomAddrStr(atom), (int)atom->numFails );
3307    }
3308
3309    tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
3310    removePeer( t, peer );
3311}
3312
3313static void
3314removeAllPeers( Torrent * t )
3315{
3316    while( !tr_ptrArrayEmpty( &t->peers ) )
3317        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
3318}
3319
3320static void
3321closeBadPeers( Torrent * t, const time_t now_sec )
3322{
3323    int i;
3324    int peerCount;
3325    struct tr_peer ** peers = getPeersToClose( t, now_sec, &peerCount );
3326    for( i=0; i<peerCount; ++i )
3327        closePeer( t, peers[i] );
3328    tr_free( peers );
3329}
3330
3331struct peer_liveliness
3332{
3333    tr_peer * peer;
3334    void * clientData;
3335    time_t pieceDataTime;
3336    time_t time;
3337    int speed;
3338    bool doPurge;
3339};
3340
3341static int
3342comparePeerLiveliness( const void * va, const void * vb )
3343{
3344    const struct peer_liveliness * a = va;
3345    const struct peer_liveliness * b = vb;
3346
3347    if( a->doPurge != b->doPurge )
3348        return a->doPurge ? 1 : -1;
3349
3350    if( a->speed != b->speed ) /* faster goes first */
3351        return a->speed > b->speed ? -1 : 1;
3352
3353    /* the one to give us data more recently goes first */
3354    if( a->pieceDataTime != b->pieceDataTime )
3355        return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
3356
3357    /* the one we connected to most recently goes first */
3358    if( a->time != b->time )
3359        return a->time > b->time ? -1 : 1;
3360
3361    return 0;
3362}
3363
3364static void
3365sortPeersByLivelinessImpl( tr_peer  ** peers,
3366                           void     ** clientData,
3367                           int         n,
3368                           uint64_t    now,
3369                           int (*compare) ( const void *va, const void *vb ) )
3370{
3371    int i;
3372    struct peer_liveliness *lives, *l;
3373
3374    /* build a sortable array of peer + extra info */
3375    lives = l = tr_new0( struct peer_liveliness, n );
3376    for( i=0; i<n; ++i, ++l )
3377    {
3378        tr_peer * p = peers[i];
3379        l->peer = p;
3380        l->doPurge = p->doPurge;
3381        l->pieceDataTime = p->atom->piece_data_time;
3382        l->time = p->atom->time;
3383        l->speed = tr_peerGetPieceSpeed_Bps( p, now, TR_UP )
3384                 + tr_peerGetPieceSpeed_Bps( p, now, TR_DOWN );
3385        if( clientData )
3386            l->clientData = clientData[i];
3387    }
3388
3389    /* sort 'em */
3390    assert( n == ( l - lives ) );
3391    qsort( lives, n, sizeof( struct peer_liveliness ), compare );
3392
3393    /* build the peer array */
3394    for( i=0, l=lives; i<n; ++i, ++l ) {
3395        peers[i] = l->peer;
3396        if( clientData )
3397            clientData[i] = l->clientData;
3398    }
3399    assert( n == ( l - lives ) );
3400
3401    /* cleanup */
3402    tr_free( lives );
3403}
3404
3405static void
3406sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now )
3407{
3408    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLiveliness );
3409}
3410
3411
3412static void
3413enforceTorrentPeerLimit( Torrent * t, uint64_t now )
3414{
3415    int n = tr_ptrArraySize( &t->peers );
3416    const int max = tr_torrentGetPeerLimit( t->tor );
3417    if( n > max )
3418    {
3419        void * base = tr_ptrArrayBase( &t->peers );
3420        tr_peer ** peers = tr_memdup( base, n*sizeof( tr_peer* ) );
3421        sortPeersByLiveliness( peers, NULL, n, now );
3422        while( n > max )
3423            closePeer( t, peers[--n] );
3424        tr_free( peers );
3425    }
3426}
3427
3428static void
3429enforceSessionPeerLimit( tr_session * session, uint64_t now )
3430{
3431    int n = 0;
3432    tr_torrent * tor = NULL;
3433    const int max = tr_sessionGetPeerLimit( session );
3434
3435    /* count the total number of peers */
3436    while(( tor = tr_torrentNext( session, tor )))
3437        n += tr_ptrArraySize( &tor->torrentPeers->peers );
3438
3439    /* if there are too many, prune out the worst */
3440    if( n > max )
3441    {
3442        tr_peer ** peers = tr_new( tr_peer*, n );
3443        Torrent ** torrents = tr_new( Torrent*, n );
3444
3445        /* populate the peer array */
3446        n = 0;
3447        tor = NULL;
3448        while(( tor = tr_torrentNext( session, tor ))) {
3449            int i;
3450            Torrent * t = tor->torrentPeers;
3451            const int tn = tr_ptrArraySize( &t->peers );
3452            for( i=0; i<tn; ++i, ++n ) {
3453                peers[n] = tr_ptrArrayNth( &t->peers, i );
3454                torrents[n] = t;
3455            }
3456        }
3457
3458        /* sort 'em */
3459        sortPeersByLiveliness( peers, (void**)torrents, n, now );
3460
3461        /* cull out the crappiest */
3462        while( n-- > max )
3463            closePeer( torrents[n], peers[n] );
3464
3465        /* cleanup */
3466        tr_free( torrents );
3467        tr_free( peers );
3468    }
3469}
3470
3471static void makeNewPeerConnections( tr_peerMgr * mgr, const int max );
3472
3473static void
3474reconnectPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3475{
3476    tr_torrent * tor;
3477    tr_peerMgr * mgr = vmgr;
3478    const time_t now_sec = tr_time( );
3479    const uint64_t now_msec = tr_time_msec( );
3480
3481    /**
3482    ***  enforce the per-session and per-torrent peer limits
3483    **/
3484
3485    /* if we're over the per-torrent peer limits, cull some peers */
3486    tor = NULL;
3487    while(( tor = tr_torrentNext( mgr->session, tor )))
3488        if( tor->isRunning )
3489            enforceTorrentPeerLimit( tor->torrentPeers, now_msec );
3490
3491    /* if we're over the per-session peer limits, cull some peers */
3492    enforceSessionPeerLimit( mgr->session, now_msec );
3493
3494    /* remove crappy peers */
3495    tor = NULL;
3496    while(( tor = tr_torrentNext( mgr->session, tor )))
3497        if( !tor->torrentPeers->isRunning )
3498            removeAllPeers( tor->torrentPeers );
3499        else
3500            closeBadPeers( tor->torrentPeers, now_sec );
3501
3502    /* try to make new peer connections */
3503    makeNewPeerConnections( mgr, MAX_CONNECTIONS_PER_PULSE );
3504}
3505
3506/****
3507*****
3508*****  BANDWIDTH ALLOCATION
3509*****
3510****/
3511
3512static void
3513pumpAllPeers( tr_peerMgr * mgr )
3514{
3515    tr_torrent * tor = NULL;
3516
3517    while(( tor = tr_torrentNext( mgr->session, tor )))
3518    {
3519        int j;
3520        Torrent * t = tor->torrentPeers;
3521
3522        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
3523        {
3524            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
3525            tr_peerMsgsPulse( peer->msgs );
3526        }
3527    }
3528}
3529
3530static void
3531bandwidthPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3532{
3533    tr_torrent * tor;
3534    tr_peerMgr * mgr = vmgr;
3535    managerLock( mgr );
3536
3537    /* FIXME: this next line probably isn't necessary... */
3538    pumpAllPeers( mgr );
3539
3540    /* allocate bandwidth to the peers */
3541    tr_bandwidthAllocate( &mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
3542    tr_bandwidthAllocate( &mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
3543
3544    /* possibly stop torrents that have seeded enough */
3545    tor = NULL;
3546    while(( tor = tr_torrentNext( mgr->session, tor )))
3547        tr_torrentCheckSeedLimit( tor );
3548
3549    /* run the completeness check for any torrents that need it */
3550    tor = NULL;
3551    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3552        if( tor->torrentPeers->needsCompletenessCheck ) {
3553            tor->torrentPeers->needsCompletenessCheck  = false;
3554            tr_torrentRecheckCompleteness( tor );
3555        }
3556    }
3557
3558    /* stop torrents that are ready to stop, but couldn't be stopped earlier
3559     * during the peer-io callback call chain */
3560    tor = NULL;
3561    while(( tor = tr_torrentNext( mgr->session, tor )))
3562        if( tor->isStopping )
3563            tr_torrentStop( tor );
3564
3565    reconnectPulse( 0, 0, mgr );
3566
3567    tr_timerAddMsec( mgr->bandwidthTimer, BANDWIDTH_PERIOD_MSEC );
3568    managerUnlock( mgr );
3569}
3570
3571/***
3572****
3573***/
3574
3575static int
3576compareAtomPtrsByAddress( const void * va, const void *vb )
3577{
3578    const struct peer_atom * a = * (const struct peer_atom**) va;
3579    const struct peer_atom * b = * (const struct peer_atom**) vb;
3580
3581    assert( tr_isAtom( a ) );
3582    assert( tr_isAtom( b ) );
3583
3584    return tr_address_compare( &a->addr, &b->addr );
3585}
3586
3587/* best come first, worst go last */
3588static int
3589compareAtomPtrsByShelfDate( const void * va, const void *vb )
3590{
3591    time_t atime;
3592    time_t btime;
3593    const struct peer_atom * a = * (const struct peer_atom**) va;
3594    const struct peer_atom * b = * (const struct peer_atom**) vb;
3595    const int data_time_cutoff_secs = 60 * 60;
3596    const time_t tr_now = tr_time( );
3597
3598    assert( tr_isAtom( a ) );
3599    assert( tr_isAtom( b ) );
3600
3601    /* primary key: the last piece data time *if* it was within the last hour */
3602    atime = a->piece_data_time; if( atime + data_time_cutoff_secs < tr_now ) atime = 0;
3603    btime = b->piece_data_time; if( btime + data_time_cutoff_secs < tr_now ) btime = 0;
3604    if( atime != btime )
3605        return atime > btime ? -1 : 1;
3606
3607    /* secondary key: shelf date. */
3608    if( a->shelf_date != b->shelf_date )
3609        return a->shelf_date > b->shelf_date ? -1 : 1;
3610
3611    return 0;
3612}
3613
3614static int
3615getMaxAtomCount( const tr_torrent * tor )
3616{
3617    const int n = tor->maxConnectedPeers;
3618    /* approximate fit of the old jump discontinuous function */
3619    if( n >= 55 ) return     n + 150;
3620    if( n >= 20 ) return 2 * n + 95;
3621    return               4 * n + 55;
3622}
3623
3624static void
3625atomPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3626{
3627    tr_torrent * tor = NULL;
3628    tr_peerMgr * mgr = vmgr;
3629    managerLock( mgr );
3630
3631    while(( tor = tr_torrentNext( mgr->session, tor )))
3632    {
3633        int atomCount;
3634        Torrent * t = tor->torrentPeers;
3635        const int maxAtomCount = getMaxAtomCount( tor );
3636        struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
3637
3638        if( atomCount > maxAtomCount ) /* we've got too many atoms... time to prune */
3639        {
3640            int i;
3641            int keepCount = 0;
3642            int testCount = 0;
3643            struct peer_atom ** keep = tr_new( struct peer_atom*, atomCount );
3644            struct peer_atom ** test = tr_new( struct peer_atom*, atomCount );
3645
3646            /* keep the ones that are in use */
3647            for( i=0; i<atomCount; ++i ) {
3648                struct peer_atom * atom = atoms[i];
3649                if( peerIsInUse( t, atom ) )
3650                    keep[keepCount++] = atom;
3651                else
3652                    test[testCount++] = atom;
3653            }
3654
3655            /* if there's room, keep the best of what's left */
3656            i = 0;
3657            if( keepCount < maxAtomCount ) {
3658                qsort( test, testCount, sizeof( struct peer_atom * ), compareAtomPtrsByShelfDate );
3659                while( i<testCount && keepCount<maxAtomCount )
3660                    keep[keepCount++] = test[i++];
3661            }
3662
3663            /* free the culled atoms */
3664            while( i<testCount )
3665                tr_free( test[i++] );
3666
3667            /* rebuild Torrent.pool with what's left */
3668            tr_ptrArrayDestruct( &t->pool, NULL );
3669            t->pool = TR_PTR_ARRAY_INIT;
3670            qsort( keep, keepCount, sizeof( struct peer_atom * ), compareAtomPtrsByAddress );
3671            for( i=0; i<keepCount; ++i )
3672                tr_ptrArrayAppend( &t->pool, keep[i] );
3673
3674            tordbg( t, "max atom count is %d... pruned from %d to %d\n", maxAtomCount, atomCount, keepCount );
3675
3676            /* cleanup */
3677            tr_free( test );
3678            tr_free( keep );
3679        }
3680    }
3681
3682    tr_timerAddMsec( mgr->atomTimer, ATOM_PERIOD_MSEC );
3683    managerUnlock( mgr );
3684}
3685
3686/***
3687****
3688****
3689****
3690***/
3691
3692/* is this atom someone that we'd want to initiate a connection to? */
3693static bool
3694isPeerCandidate( const tr_torrent * tor, struct peer_atom * atom, const time_t now )
3695{
3696    /* not if we're both seeds */
3697    if( tr_torrentIsSeed( tor ) && atomIsSeed( atom ) )
3698        return false;
3699
3700    /* not if we've already got a connection to them... */
3701    if( peerIsInUse( tor->torrentPeers, atom ) )
3702        return false;
3703
3704    /* not if we just tried them already */
3705    if( ( now - atom->time ) < getReconnectIntervalSecs( atom, now ) )
3706        return false;
3707
3708    /* not if they're blocklisted */
3709    if( isAtomBlocklisted( tor->session, atom ) )
3710        return false;
3711
3712    /* not if they're banned... */
3713    if( atom->flags2 & MYFLAG_BANNED )
3714        return false;
3715
3716    return true;
3717}
3718
3719struct peer_candidate
3720{
3721    uint64_t score;
3722    tr_torrent * tor;
3723    struct peer_atom * atom;
3724};
3725
3726static bool
3727torrentWasRecentlyStarted( const tr_torrent * tor )
3728{
3729    return difftime( tr_time( ), tor->startDate ) < 120;
3730}
3731
3732static inline uint64_t
3733addValToKey( uint64_t value, int width, uint64_t addme )
3734{
3735    value = (value << (uint64_t)width);
3736    value |= addme;
3737    return value;
3738}
3739
3740/* smaller value is better */
3741static uint64_t
3742getPeerCandidateScore( const tr_torrent * tor, const struct peer_atom * atom, uint8_t salt  )
3743{
3744    uint64_t i;
3745    uint64_t score = 0;
3746    const bool failed = atom->lastConnectionAt < atom->lastConnectionAttemptAt;
3747
3748    /* prefer peers we've connected to, or never tried, over peers we failed to connect to. */
3749    i = failed ? 1 : 0;
3750    score = addValToKey( score, 1, i );
3751
3752    /* prefer the one we attempted least recently (to cycle through all peers) */
3753    i = atom->lastConnectionAttemptAt;
3754    score = addValToKey( score, 32, i );
3755
3756    /* prefer peers belonging to a torrent of a higher priority */
3757    switch( tr_torrentGetPriority( tor ) ) {
3758        case TR_PRI_HIGH:    i = 0; break;
3759        case TR_PRI_NORMAL:  i = 1; break;
3760        case TR_PRI_LOW:     i = 2; break;
3761    }
3762    score = addValToKey( score, 4, i );
3763
3764    /* prefer recently-started torrents */
3765    i = torrentWasRecentlyStarted( tor ) ? 0 : 1;
3766    score = addValToKey( score, 1, i );
3767
3768    /* prefer torrents we're downloading with */
3769    i = tr_torrentIsSeed( tor ) ? 1 : 0;
3770    score = addValToKey( score, 1, i );
3771
3772    /* prefer peers that are known to be connectible */
3773    i = ( atom->flags & ADDED_F_CONNECTABLE ) ? 0 : 1;
3774    score = addValToKey( score, 1, i );
3775
3776    /* prefer peers that we might have a chance of uploading to...
3777       so lower seed probability is better */
3778    if( atom->seedProbability == 100 ) i = 101;
3779    else if( atom->seedProbability == -1 ) i = 100;
3780    else i = atom->seedProbability;
3781    score = addValToKey( score, 8, i );
3782
3783    /* Prefer peers that we got from more trusted sources.
3784     * lower `fromBest' values indicate more trusted sources */
3785    score = addValToKey( score, 4, atom->fromBest );
3786
3787    /* salt */
3788    score = addValToKey( score, 8, salt );
3789
3790    return score;
3791}
3792
3793/* sort an array of peer candidates */
3794static int
3795comparePeerCandidates( const void * va, const void * vb )
3796{
3797    const struct peer_candidate * a = va;
3798    const struct peer_candidate * b = vb;
3799
3800    if( a->score < b->score ) return -1;
3801    if( a->score > b->score ) return 1;
3802
3803    return 0;
3804}
3805
3806/** @return an array of all the atoms we might want to connect to */
3807static struct peer_candidate*
3808getPeerCandidates( tr_session * session, int * candidateCount )
3809{
3810    int n;
3811    tr_torrent * tor;
3812    struct peer_candidate * candidates;
3813    struct peer_candidate * walk;
3814    const time_t now = tr_time( );
3815    const uint64_t now_msec = tr_time_msec( );
3816    /* leave 5% of connection slots for incoming connections -- ticket #2609 */
3817    const int maxCandidates = tr_sessionGetPeerLimit( session ) * 0.95;
3818
3819    /* don't start any new handshakes if we're full up */
3820    n = 0;
3821    tor= NULL;
3822    while(( tor = tr_torrentNext( session, tor )))
3823        n += tr_ptrArraySize( &tor->torrentPeers->peers );
3824    if( maxCandidates <= n ) {
3825        *candidateCount = 0;
3826        return NULL;
3827    }
3828
3829    /* allocate an array of candidates */
3830    n = 0;
3831    tor= NULL;
3832    while(( tor = tr_torrentNext( session, tor )))
3833        n += tr_ptrArraySize( &tor->torrentPeers->pool );
3834    walk = candidates = tr_new( struct peer_candidate, n );
3835
3836    /* populate the candidate array */
3837    tor = NULL;
3838    while(( tor = tr_torrentNext( session, tor )))
3839    {
3840        int i, nAtoms;
3841        struct peer_atom ** atoms;
3842
3843        if( !tor->torrentPeers->isRunning )
3844            continue;
3845
3846        /* if we've already got enough peers in this torrent... */
3847        if( tr_torrentGetPeerLimit( tor ) <= tr_ptrArraySize( &tor->torrentPeers->peers ) )
3848            continue;
3849
3850        /* if we've already got enough speed in this torrent... */
3851        if( tr_torrentIsSeed( tor ) && isBandwidthMaxedOut( &tor->bandwidth, now_msec, TR_UP ) )
3852            continue;
3853
3854        atoms = (struct peer_atom**) tr_ptrArrayPeek( &tor->torrentPeers->pool, &nAtoms );
3855        for( i=0; i<nAtoms; ++i )
3856        {
3857            struct peer_atom * atom = atoms[i];
3858
3859            if( isPeerCandidate( tor, atom, now ) )
3860            {
3861                const uint8_t salt = tr_cryptoWeakRandInt( 1024 );
3862                walk->tor = tor;
3863                walk->atom = atom;
3864                walk->score = getPeerCandidateScore( tor, atom, salt );
3865                ++walk;
3866            }
3867        }
3868    }
3869
3870    *candidateCount = walk - candidates;
3871    if( *candidateCount > 1 )
3872        qsort( candidates, *candidateCount, sizeof( struct peer_candidate ), comparePeerCandidates );
3873    return candidates;
3874}
3875
3876static void
3877initiateConnection( tr_peerMgr * mgr, Torrent * t, struct peer_atom * atom )
3878{
3879    tr_peerIo * io;
3880    const time_t now = tr_time( );
3881    bool utp = tr_sessionIsUTPEnabled(mgr->session) && !atom->utp_failed;
3882
3883    if( atom->fromFirst == TR_PEER_FROM_PEX )
3884        /* PEX has explicit signalling for uTP support.  If an atom
3885           originally came from PEX and doesn't have the uTP flag, skip the
3886           uTP connection attempt.  Are we being optimistic here? */
3887        utp = utp && (atom->flags & ADDED_F_UTP_FLAGS);
3888
3889    tordbg( t, "Starting an OUTGOING%s connection with %s",
3890            utp ? " µTP" : "",
3891            tr_atomAddrStr( atom ) );
3892
3893    io = tr_peerIoNewOutgoing( mgr->session,
3894                               &mgr->session->bandwidth,
3895                               &atom->addr,
3896                               atom->port,
3897                               t->tor->info.hash,
3898                               t->tor->completeness == TR_SEED,
3899                               utp );
3900
3901    if( io == NULL )
3902    {
3903        tordbg( t, "peerIo not created; marking peer %s as unreachable",
3904                tr_atomAddrStr( atom ) );
3905        atom->flags2 |= MYFLAG_UNREACHABLE;
3906        atom->numFails++;
3907    }
3908    else
3909    {
3910        tr_handshake * handshake = tr_handshakeNew( io,
3911                                                    mgr->session->encryptionMode,
3912                                                    myHandshakeDoneCB,
3913                                                    mgr );
3914
3915        assert( tr_peerIoGetTorrentHash( io ) );
3916
3917        tr_peerIoUnref( io ); /* balanced by the initial ref
3918                                 in tr_peerIoNewOutgoing() */
3919
3920        tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
3921                                 handshakeCompare );
3922    }
3923
3924    atom->lastConnectionAttemptAt = now;
3925    atom->time = now;
3926}
3927
3928static void
3929initiateCandidateConnection( tr_peerMgr * mgr, struct peer_candidate * c )
3930{
3931#if 0
3932    fprintf( stderr, "Starting an OUTGOING connection with %s - [%s] seedProbability==%d; %s, %s\n",
3933             tr_atomAddrStr( c->atom ),
3934             tr_torrentName( c->tor ),
3935             (int)c->atom->seedProbability,
3936             tr_torrentIsPrivate( c->tor ) ? "private" : "public",
3937             tr_torrentIsSeed( c->tor ) ? "seed" : "downloader" );
3938#endif
3939
3940    initiateConnection( mgr, c->tor->torrentPeers, c->atom );
3941}
3942
3943static void
3944makeNewPeerConnections( struct tr_peerMgr * mgr, const int max )
3945{
3946    int i, n;
3947    struct peer_candidate * candidates;
3948
3949    candidates = getPeerCandidates( mgr->session, &n );
3950
3951    for( i=0; i<n && i<max; ++i )
3952        initiateCandidateConnection( mgr, &candidates[i] );
3953
3954    tr_free( candidates );
3955}
Note: See TracBrowser for help on using the repository browser.