source: trunk/libtransmission/peer-mgr.c @ 12531

Last change on this file since 12531 was 12531, checked in by jordan, 10 years ago

(trunk libT) #4336 "availablility nonsense" -- fix bug in tr_cpMissingBytesInPiece() introduced last week by r12515 for #4332. Add assertions to the nightly build to watch for regressions of this fix.

The bug was that I fixed #4332's off-by-one improperly in tr_cpMissingBlocksInPiece(). The piece's last block has to be calculated separately because its byte size may be different than the other blocks, The mistake in r12515 was that the last block could wind up being counted twice.

  • Property svn:keywords set to Date Rev Author Id
File size: 114.7 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 12531 2011-07-02 13:20:17Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h> /* error codes ERANGE, ... */
15#include <limits.h> /* INT_MAX */
16#include <string.h> /* memcpy, memcmp, strstr */
17#include <stdlib.h> /* qsort */
18
19#include <event2/event.h>
20#include <libutp/utp.h>
21
22#include "transmission.h"
23#include "announcer.h"
24#include "bandwidth.h"
25#include "blocklist.h"
26#include "cache.h"
27#include "clients.h"
28#include "completion.h"
29#include "crypto.h"
30#include "handshake.h"
31#include "net.h"
32#include "peer-io.h"
33#include "peer-mgr.h"
34#include "peer-msgs.h"
35#include "ptrarray.h"
36#include "session.h"
37#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
38#include "torrent.h"
39#include "utils.h"
40#include "webseed.h"
41
42enum
43{
44    /* how frequently to cull old atoms */
45    ATOM_PERIOD_MSEC = ( 60 * 1000 ),
46
47    /* how frequently to change which peers are choked */
48    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
49
50    /* an optimistically unchoked peer is immune from rechoking
51       for this many calls to rechokeUploads(). */
52    OPTIMISTIC_UNCHOKE_MULTIPLIER = 4,
53
54    /* how frequently to reallocate bandwidth */
55    BANDWIDTH_PERIOD_MSEC = 500,
56
57    /* how frequently to age out old piece request lists */
58    REFILL_UPKEEP_PERIOD_MSEC = ( 10 * 1000 ),
59
60    /* how frequently to decide which peers live and die */
61    RECONNECT_PERIOD_MSEC = 500,
62
63    /* when many peers are available, keep idle ones this long */
64    MIN_UPLOAD_IDLE_SECS = ( 60 ),
65
66    /* when few peers are available, keep idle ones this long */
67    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
68
69    /* max number of peers to ask for per second overall.
70     * this throttle is to avoid overloading the router */
71    MAX_CONNECTIONS_PER_SECOND = 12,
72
73    MAX_CONNECTIONS_PER_PULSE = (int)(MAX_CONNECTIONS_PER_SECOND * (RECONNECT_PERIOD_MSEC/1000.0)),
74
75    /* number of bad pieces a peer is allowed to send before we ban them */
76    MAX_BAD_PIECES_PER_PEER = 5,
77
78    /* amount of time to keep a list of request pieces lying around
79       before it's considered too old and needs to be rebuilt */
80    PIECE_LIST_SHELF_LIFE_SECS = 60,
81
82    /* use for bitwise operations w/peer_atom.flags2 */
83    MYFLAG_BANNED = 1,
84
85    /* use for bitwise operations w/peer_atom.flags2 */
86    /* unreachable for now... but not banned.
87     * if they try to connect to us it's okay */
88    MYFLAG_UNREACHABLE = 2,
89
90    /* the minimum we'll wait before attempting to reconnect to a peer */
91    MINIMUM_RECONNECT_INTERVAL_SECS = 5,
92
93    /** how long we'll let requests we've made linger before we cancel them */
94    REQUEST_TTL_SECS = 120,
95
96    NO_BLOCKS_CANCEL_HISTORY = 120,
97
98    CANCEL_HISTORY_SEC = 60
99};
100
101const tr_peer_event TR_PEER_EVENT_INIT = { 0, 0, NULL, 0, 0, 0, false, 0 };
102
103/**
104***
105**/
106
107/**
108 * Peer information that should be kept even before we've connected and
109 * after we've disconnected. These are kept in a pool of peer_atoms to decide
110 * which ones would make good candidates for connecting to, and to watch out
111 * for banned peers.
112 *
113 * @see tr_peer
114 * @see tr_peermsgs
115 */
116struct peer_atom
117{
118    uint8_t     fromFirst;          /* where the peer was first found */
119    uint8_t     fromBest;           /* the "best" value of where the peer has been found */
120    uint8_t     flags;              /* these match the added_f flags */
121    uint8_t     flags2;             /* flags that aren't defined in added_f */
122    int8_t      seedProbability;    /* how likely is this to be a seed... [0..100] or -1 for unknown */
123    int8_t      blocklisted;        /* -1 for unknown, true for blocklisted, false for not blocklisted */
124
125    tr_port     port;
126    bool        utp_failed;         /* We recently failed to connect over uTP */
127    uint16_t    numFails;
128    time_t      time;               /* when the peer's connection status last changed */
129    time_t      piece_data_time;
130
131    time_t      lastConnectionAttemptAt;
132    time_t      lastConnectionAt;
133
134    /* similar to a TTL field, but less rigid --
135     * if the swarm is small, the atom will be kept past this date. */
136    time_t      shelf_date;
137    tr_peer   * peer;               /* will be NULL if not connected */
138    tr_address  addr;
139};
140
141#ifdef NDEBUG
142#define tr_isAtom(a) (TRUE)
143#else
144static bool
145tr_isAtom( const struct peer_atom * atom )
146{
147    return ( atom != NULL )
148        && ( atom->fromFirst < TR_PEER_FROM__MAX )
149        && ( atom->fromBest < TR_PEER_FROM__MAX )
150        && ( tr_address_is_valid( &atom->addr ) );
151}
152#endif
153
154static const char*
155tr_atomAddrStr( const struct peer_atom * atom )
156{
157    return atom ? tr_peerIoAddrStr( &atom->addr, atom->port ) : "[no atom]";
158}
159
160struct block_request
161{
162    tr_block_index_t block;
163    tr_peer * peer;
164    time_t sentAt;
165};
166
167struct weighted_piece
168{
169    tr_piece_index_t index;
170    int16_t salt;
171    int16_t requestCount;
172};
173
174enum piece_sort_state
175{
176    PIECES_UNSORTED,
177    PIECES_SORTED_BY_INDEX,
178    PIECES_SORTED_BY_WEIGHT
179};
180
181/** @brief Opaque, per-torrent data structure for peer connection information */
182typedef struct tr_torrent_peers
183{
184    tr_ptrArray                outgoingHandshakes; /* tr_handshake */
185    tr_ptrArray                pool; /* struct peer_atom */
186    tr_ptrArray                peers; /* tr_peer */
187    tr_ptrArray                webseeds; /* tr_webseed */
188
189    tr_torrent               * tor;
190    struct tr_peerMgr        * manager;
191
192    tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
193    int                        optimisticUnchokeTimeScaler;
194
195    bool                       isRunning;
196    bool                       needsCompletenessCheck;
197
198    struct block_request     * requests;
199    int                        requestCount;
200    int                        requestAlloc;
201
202    struct weighted_piece    * pieces;
203    int                        pieceCount;
204    enum piece_sort_state      pieceSortState;
205
206    /* An array of pieceCount items stating how many peers have each piece.
207       This is used to help us for downloading pieces "rarest first."
208       This may be NULL if we don't have metainfo yet, or if we're not
209       downloading and don't care about rarity */
210    uint16_t                 * pieceReplication;
211    size_t                     pieceReplicationSize;
212
213    int                        interestedCount;
214    int                        maxPeers;
215    time_t                     lastCancel;
216
217    /* Before the endgame this should be 0. In endgame, is contains the average
218     * number of pending requests per peer. Only peers which have more pending
219     * requests are considered 'fast' are allowed to request a block that's
220     * already been requested from another (slower?) peer. */
221    int                        endgame;
222}
223Torrent;
224
225struct tr_peerMgr
226{
227    tr_session    * session;
228    tr_ptrArray     incomingHandshakes; /* tr_handshake */
229    struct event  * bandwidthTimer;
230    struct event  * rechokeTimer;
231    struct event  * refillUpkeepTimer;
232    struct event  * atomTimer;
233};
234
235#define tordbg( t, ... ) \
236    do { \
237        if( tr_deepLoggingIsActive( ) ) \
238            tr_deepLog( __FILE__, __LINE__, tr_torrentName( t->tor ), __VA_ARGS__ ); \
239    } while( 0 )
240
241#define dbgmsg( ... ) \
242    do { \
243        if( tr_deepLoggingIsActive( ) ) \
244            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
245    } while( 0 )
246
247/**
248***
249**/
250
251static inline void
252managerLock( const struct tr_peerMgr * manager )
253{
254    tr_sessionLock( manager->session );
255}
256
257static inline void
258managerUnlock( const struct tr_peerMgr * manager )
259{
260    tr_sessionUnlock( manager->session );
261}
262
263static inline void
264torrentLock( Torrent * torrent )
265{
266    managerLock( torrent->manager );
267}
268
269static inline void
270torrentUnlock( Torrent * torrent )
271{
272    managerUnlock( torrent->manager );
273}
274
275static inline int
276torrentIsLocked( const Torrent * t )
277{
278    return tr_sessionIsLocked( t->manager->session );
279}
280
281/**
282***
283**/
284
285static int
286handshakeCompareToAddr( const void * va, const void * vb )
287{
288    const tr_handshake * a = va;
289
290    return tr_address_compare( tr_handshakeGetAddr( a, NULL ), vb );
291}
292
293static int
294handshakeCompare( const void * a, const void * b )
295{
296    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
297}
298
299static inline tr_handshake*
300getExistingHandshake( tr_ptrArray * handshakes, const tr_address * addr )
301{
302    if( tr_ptrArrayEmpty( handshakes ) )
303        return NULL;
304
305    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
306}
307
308static int
309comparePeerAtomToAddress( const void * va, const void * vb )
310{
311    const struct peer_atom * a = va;
312
313    return tr_address_compare( &a->addr, vb );
314}
315
316static int
317compareAtomsByAddress( const void * va, const void * vb )
318{
319    const struct peer_atom * b = vb;
320
321    assert( tr_isAtom( b ) );
322
323    return comparePeerAtomToAddress( va, &b->addr );
324}
325
326/**
327***
328**/
329
330const tr_address *
331tr_peerAddress( const tr_peer * peer )
332{
333    return &peer->atom->addr;
334}
335
336static Torrent*
337getExistingTorrent( tr_peerMgr *    manager,
338                    const uint8_t * hash )
339{
340    tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash );
341
342    return tor == NULL ? NULL : tor->torrentPeers;
343}
344
345static int
346peerCompare( const void * a, const void * b )
347{
348    return tr_address_compare( tr_peerAddress( a ), tr_peerAddress( b ) );
349}
350
351static struct peer_atom*
352getExistingAtom( const Torrent    * t,
353                 const tr_address * addr )
354{
355    Torrent * tt = (Torrent*)t;
356    assert( torrentIsLocked( t ) );
357    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
358}
359
360static bool
361peerIsInUse( const Torrent * ct, const struct peer_atom * atom )
362{
363    Torrent * t = (Torrent*) ct;
364
365    assert( torrentIsLocked ( t ) );
366
367    return ( atom->peer != NULL )
368        || getExistingHandshake( &t->outgoingHandshakes, &atom->addr )
369        || getExistingHandshake( &t->manager->incomingHandshakes, &atom->addr );
370}
371
372void
373tr_peerConstruct( tr_peer * peer )
374{
375    memset( peer, 0, sizeof( tr_peer ) );
376
377    peer->have = TR_BITFIELD_INIT;
378}
379
380static tr_peer*
381peerNew( struct peer_atom * atom )
382{
383    tr_peer * peer = tr_new( tr_peer, 1 );
384    tr_peerConstruct( peer );
385
386    peer->atom = atom;
387    atom->peer = peer;
388
389    return peer;
390}
391
392static tr_peer*
393getPeer( Torrent * torrent, struct peer_atom * atom )
394{
395    tr_peer * peer;
396
397    assert( torrentIsLocked( torrent ) );
398
399    peer = atom->peer;
400
401    if( peer == NULL )
402    {
403        peer = peerNew( atom );
404        tr_bitfieldConstruct( &peer->have, torrent->tor->blockCount );
405        tr_bitfieldConstruct( &peer->blame, torrent->tor->blockCount );
406        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
407    }
408
409    return peer;
410}
411
412static void peerDeclinedAllRequests( Torrent *, const tr_peer * );
413
414void
415tr_peerDestruct( tr_torrent * tor, tr_peer * peer )
416{
417    assert( peer != NULL );
418
419    peerDeclinedAllRequests( tor->torrentPeers, peer );
420
421    if( peer->msgs != NULL )
422        tr_peerMsgsFree( peer->msgs );
423
424    if( peer->io ) {
425        tr_peerIoClear( peer->io );
426        tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
427    }
428
429    tr_bitfieldDestruct( &peer->have );
430    tr_bitfieldDestruct( &peer->blame );
431    tr_free( peer->client );
432
433    if( peer->atom )
434        peer->atom->peer = NULL;
435}
436
437static void
438peerDelete( Torrent * t, tr_peer * peer )
439{
440    tr_peerDestruct( t->tor, peer );
441    tr_free( peer );
442}
443
444static bool
445replicationExists( const Torrent * t )
446{
447    return t->pieceReplication != NULL;
448}
449
450static void
451replicationFree( Torrent * t )
452{
453    tr_free( t->pieceReplication );
454    t->pieceReplication = NULL;
455    t->pieceReplicationSize = 0;
456}
457
458static void
459replicationNew( Torrent * t )
460{
461    tr_piece_index_t piece_i;
462    const tr_piece_index_t piece_count = t->tor->info.pieceCount;
463    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
464    const int peer_count = tr_ptrArraySize( &t->peers );
465
466    assert( !replicationExists( t ) );
467
468    t->pieceReplicationSize = piece_count;
469    t->pieceReplication = tr_new0( uint16_t, piece_count );
470
471    for( piece_i=0; piece_i<piece_count; ++piece_i )
472    {
473        int peer_i;
474        uint16_t r = 0;
475
476        for( peer_i=0; peer_i<peer_count; ++peer_i )
477            if( tr_bitfieldHas( &peers[peer_i]->have, piece_i ) )
478                ++r;
479
480        t->pieceReplication[piece_i] = r;
481    }
482}
483
484static void
485torrentFree( void * vt )
486{
487    Torrent * t = vt;
488
489    assert( t );
490    assert( !t->isRunning );
491    assert( torrentIsLocked( t ) );
492    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
493    assert( tr_ptrArrayEmpty( &t->peers ) );
494
495    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
496    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
497    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
498    tr_ptrArrayDestruct( &t->peers, NULL );
499
500    replicationFree( t );
501
502    tr_free( t->requests );
503    tr_free( t->pieces );
504    tr_free( t );
505}
506
507static void peerCallbackFunc( tr_peer *, const tr_peer_event *, void * );
508
509static Torrent*
510torrentNew( tr_peerMgr * manager, tr_torrent * tor )
511{
512    int       i;
513    Torrent * t;
514
515    t = tr_new0( Torrent, 1 );
516    t->manager = manager;
517    t->tor = tor;
518    t->pool = TR_PTR_ARRAY_INIT;
519    t->peers = TR_PTR_ARRAY_INIT;
520    t->webseeds = TR_PTR_ARRAY_INIT;
521    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
522
523    for( i = 0; i < tor->info.webseedCount; ++i )
524    {
525        tr_webseed * w =
526            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
527        tr_ptrArrayAppend( &t->webseeds, w );
528    }
529
530    return t;
531}
532
533tr_peerMgr*
534tr_peerMgrNew( tr_session * session )
535{
536    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
537    m->session = session;
538    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
539    return m;
540}
541
542static void
543deleteTimer( struct event ** t )
544{
545    if( *t != NULL )
546    {
547        event_free( *t );
548        *t = NULL;
549    }
550}
551
552static void
553deleteTimers( struct tr_peerMgr * m )
554{
555    deleteTimer( &m->atomTimer );
556    deleteTimer( &m->bandwidthTimer );
557    deleteTimer( &m->rechokeTimer );
558    deleteTimer( &m->refillUpkeepTimer );
559}
560
561void
562tr_peerMgrFree( tr_peerMgr * manager )
563{
564    managerLock( manager );
565
566    deleteTimers( manager );
567
568    /* free the handshakes. Abort invokes handshakeDoneCB(), which removes
569     * the item from manager->handshakes, so this is a little roundabout... */
570    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
571        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
572
573    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
574
575    managerUnlock( manager );
576    tr_free( manager );
577}
578
579static int
580clientIsDownloadingFrom( const tr_torrent * tor, const tr_peer * peer )
581{
582    if( !tr_torrentHasMetadata( tor ) )
583        return true;
584
585    return peer->clientIsInterested && !peer->clientIsChoked;
586}
587
588static int
589clientIsUploadingTo( const tr_peer * peer )
590{
591    return peer->peerIsInterested && !peer->peerIsChoked;
592}
593
594/***
595****
596***/
597
598void
599tr_peerMgrOnBlocklistChanged( tr_peerMgr * mgr )
600{
601    tr_torrent * tor = NULL;
602    tr_session * session = mgr->session;
603
604    /* we cache whether or not a peer is blocklisted...
605       since the blocklist has changed, erase that cached value */
606    while(( tor = tr_torrentNext( session, tor )))
607    {
608        int i;
609        Torrent * t = tor->torrentPeers;
610        const int n = tr_ptrArraySize( &t->pool );
611        for( i=0; i<n; ++i ) {
612            struct peer_atom * atom = tr_ptrArrayNth( &t->pool, i );
613            atom->blocklisted = -1;
614        }
615    }
616}
617
618static bool
619isAtomBlocklisted( tr_session * session, struct peer_atom * atom )
620{
621    if( atom->blocklisted < 0 )
622        atom->blocklisted = tr_sessionIsAddressBlocked( session, &atom->addr );
623
624    assert( tr_isBool( atom->blocklisted ) );
625    return atom->blocklisted;
626}
627
628
629/***
630****
631***/
632
633static void
634atomSetSeedProbability( struct peer_atom * atom, int seedProbability )
635{
636    assert( atom != NULL );
637    assert( -1<=seedProbability && seedProbability<=100 );
638
639    atom->seedProbability = seedProbability;
640
641    if( seedProbability == 100 )
642        atom->flags |= ADDED_F_SEED_FLAG;
643    else if( seedProbability != -1 )
644        atom->flags &= ~ADDED_F_SEED_FLAG;
645}
646
647static inline bool
648atomIsSeed( const struct peer_atom * atom )
649{
650    return atom->seedProbability == 100;
651}
652
653static void
654atomSetSeed( const Torrent * t, struct peer_atom * atom )
655{
656    if( !atomIsSeed( atom ) )
657    {
658        tordbg( t, "marking peer %s as a seed", tr_atomAddrStr( atom ) );
659
660        atomSetSeedProbability( atom, 100 );
661    }
662}
663
664
665bool
666tr_peerMgrPeerIsSeed( const tr_torrent  * tor,
667                      const tr_address  * addr )
668{
669    bool isSeed = false;
670    const Torrent * t = tor->torrentPeers;
671    const struct peer_atom * atom = getExistingAtom( t, addr );
672
673    if( atom )
674        isSeed = atomIsSeed( atom );
675
676    return isSeed;
677}
678
679void
680tr_peerMgrSetUtpSupported( tr_torrent * tor, const tr_address * addr )
681{
682    struct peer_atom * atom = getExistingAtom( tor->torrentPeers, addr );
683
684    if( atom )
685        atom->flags |= ADDED_F_UTP_FLAGS;
686}
687
688void
689tr_peerMgrSetUtpFailed( tr_torrent *tor, const tr_address *addr, bool failed )
690{
691    struct peer_atom * atom = getExistingAtom( tor->torrentPeers, addr );
692
693    if( atom )
694        atom->utp_failed = failed;
695}
696
697
698/**
699***  REQUESTS
700***
701*** There are two data structures associated with managing block requests:
702***
703*** 1. Torrent::requests, an array of "struct block_request" which keeps
704***    track of which blocks have been requested, and when, and by which peers.
705***    This is list is used for (a) cancelling requests that have been pending
706***    for too long and (b) avoiding duplicate requests before endgame.
707***
708*** 2. Torrent::pieces, an array of "struct weighted_piece" which lists the
709***    pieces that we want to request. It's used to decide which blocks to
710***    return next when tr_peerMgrGetBlockRequests() is called.
711**/
712
713/**
714*** struct block_request
715**/
716
717static int
718compareReqByBlock( const void * va, const void * vb )
719{
720    const struct block_request * a = va;
721    const struct block_request * b = vb;
722
723    /* primary key: block */
724    if( a->block < b->block ) return -1;
725    if( a->block > b->block ) return 1;
726
727    /* secondary key: peer */
728    if( a->peer < b->peer ) return -1;
729    if( a->peer > b->peer ) return 1;
730
731    return 0;
732}
733
734static void
735requestListAdd( Torrent * t, tr_block_index_t block, tr_peer * peer )
736{
737    struct block_request key;
738
739    /* ensure enough room is available... */
740    if( t->requestCount + 1 >= t->requestAlloc )
741    {
742        const int CHUNK_SIZE = 128;
743        t->requestAlloc += CHUNK_SIZE;
744        t->requests = tr_renew( struct block_request,
745                                t->requests, t->requestAlloc );
746    }
747
748    /* populate the record we're inserting */
749    key.block = block;
750    key.peer = peer;
751    key.sentAt = tr_time( );
752
753    /* insert the request to our array... */
754    {
755        bool exact;
756        const int pos = tr_lowerBound( &key, t->requests, t->requestCount,
757                                       sizeof( struct block_request ),
758                                       compareReqByBlock, &exact );
759        assert( !exact );
760        memmove( t->requests + pos + 1,
761                 t->requests + pos,
762                 sizeof( struct block_request ) * ( t->requestCount++ - pos ) );
763        t->requests[pos] = key;
764    }
765
766    if( peer != NULL )
767    {
768        ++peer->pendingReqsToPeer;
769        assert( peer->pendingReqsToPeer >= 0 );
770    }
771
772    /*fprintf( stderr, "added request of block %lu from peer %s... "
773                       "there are now %d block\n",
774                       (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
775}
776
777static struct block_request *
778requestListLookup( Torrent * t, tr_block_index_t block, const tr_peer * peer )
779{
780    struct block_request key;
781    key.block = block;
782    key.peer = (tr_peer*) peer;
783
784    return bsearch( &key, t->requests, t->requestCount,
785                    sizeof( struct block_request ),
786                    compareReqByBlock );
787}
788
789/**
790 * Find the peers are we currently requesting the block
791 * with index @a block from and append them to @a peerArr.
792 */
793static void
794getBlockRequestPeers( Torrent * t, tr_block_index_t block,
795                      tr_ptrArray * peerArr )
796{
797    bool exact;
798    int i, pos;
799    struct block_request key;
800
801    key.block = block;
802    key.peer = NULL;
803    pos = tr_lowerBound( &key, t->requests, t->requestCount,
804                         sizeof( struct block_request ),
805                         compareReqByBlock, &exact );
806
807    assert( !exact ); /* shouldn't have a request with .peer == NULL */
808
809    for( i = pos; i < t->requestCount; ++i )
810    {
811        if( t->requests[i].block != block )
812            break;
813        tr_ptrArrayAppend( peerArr, t->requests[i].peer );
814    }
815}
816
817static void
818decrementPendingReqCount( const struct block_request * b )
819{
820    if( b->peer != NULL )
821        if( b->peer->pendingReqsToPeer > 0 )
822            --b->peer->pendingReqsToPeer;
823}
824
825static void
826requestListRemove( Torrent * t, tr_block_index_t block, const tr_peer * peer )
827{
828    const struct block_request * b = requestListLookup( t, block, peer );
829    if( b != NULL )
830    {
831        const int pos = b - t->requests;
832        assert( pos < t->requestCount );
833
834        decrementPendingReqCount( b );
835
836        tr_removeElementFromArray( t->requests,
837                                   pos,
838                                   sizeof( struct block_request ),
839                                   t->requestCount-- );
840
841        /*fprintf( stderr, "removing request of block %lu from peer %s... "
842                           "there are now %d block requests left\n",
843                           (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
844    }
845}
846
847static int
848countActiveWebseeds( const Torrent * t )
849{
850    int activeCount = 0;
851    const tr_webseed ** w = (const tr_webseed **) tr_ptrArrayBase( &t->webseeds );
852    const tr_webseed ** const wend = w + tr_ptrArraySize( &t->webseeds );
853
854    for( ; w!=wend; ++w )
855        if( tr_webseedIsActive( *w ) )
856            ++activeCount;
857
858    return activeCount;
859}
860
861static bool
862testForEndgame( const Torrent * t )
863{
864    /* we consider ourselves to be in endgame if the number of bytes
865       we've got requested is >= the number of bytes left to download */
866    return ( t->requestCount * t->tor->blockSize )
867               >= tr_cpLeftUntilDone( &t->tor->completion );
868}
869
870static void
871updateEndgame( Torrent * t )
872{
873    assert( t->requestCount >= 0 );
874
875    if( !testForEndgame( t ) )
876    {
877        /* not in endgame */
878        t->endgame = 0;
879    }
880    else if( !t->endgame ) /* only recalculate when endgame first begins */
881    {
882        int numDownloading = 0;
883        const tr_peer ** p = (const tr_peer **) tr_ptrArrayBase( &t->peers );
884        const tr_peer ** const pend = p + tr_ptrArraySize( &t->peers );
885
886        /* add the active bittorrent peers... */
887        for( ; p!=pend; ++p )
888            if( (*p)->pendingReqsToPeer > 0 )
889                ++numDownloading;
890
891        /* add the active webseeds... */
892        numDownloading += countActiveWebseeds( t );
893
894        /* average number of pending requests per downloading peer */
895        t->endgame = t->requestCount / MAX( numDownloading, 1 );
896    }
897}
898
899
900/****
901*****
902*****  Piece List Manipulation / Accessors
903*****
904****/
905
906static inline void
907invalidatePieceSorting( Torrent * t )
908{
909    t->pieceSortState = PIECES_UNSORTED;
910}
911
912static const tr_torrent * weightTorrent;
913
914static const uint16_t * weightReplication;
915
916static void
917setComparePieceByWeightTorrent( Torrent * t )
918{
919    if( !replicationExists( t ) )
920        replicationNew( t );
921
922    weightTorrent = t->tor;
923    weightReplication = t->pieceReplication;
924}
925
926/* we try to create a "weight" s.t. high-priority pieces come before others,
927 * and that partially-complete pieces come before empty ones. */
928static int
929comparePieceByWeight( const void * va, const void * vb )
930{
931    const struct weighted_piece * a = va;
932    const struct weighted_piece * b = vb;
933    int ia, ib, missing, pending;
934    const tr_torrent * tor = weightTorrent;
935    const uint16_t * rep = weightReplication;
936
937    /* primary key: weight */
938    missing = tr_cpMissingBlocksInPiece( &tor->completion, a->index );
939    pending = a->requestCount;
940    ia = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
941    missing = tr_cpMissingBlocksInPiece( &tor->completion, b->index );
942    pending = b->requestCount;
943    ib = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
944    if( ia < ib ) return -1;
945    if( ia > ib ) return 1;
946
947    /* secondary key: higher priorities go first */
948    ia = tor->info.pieces[a->index].priority;
949    ib = tor->info.pieces[b->index].priority;
950    if( ia > ib ) return -1;
951    if( ia < ib ) return 1;
952
953    /* tertiary key: rarest first. */
954    ia = rep[a->index];
955    ib = rep[b->index];
956    if( ia < ib ) return -1;
957    if( ia > ib ) return 1;
958
959    /* quaternary key: random */
960    if( a->salt < b->salt ) return -1;
961    if( a->salt > b->salt ) return 1;
962
963    /* okay, they're equal */
964    return 0;
965}
966
967static int
968comparePieceByIndex( const void * va, const void * vb )
969{
970    const struct weighted_piece * a = va;
971    const struct weighted_piece * b = vb;
972    if( a->index < b->index ) return -1;
973    if( a->index > b->index ) return 1;
974    return 0;
975}
976
977static void
978pieceListSort( Torrent * t, enum piece_sort_state state )
979{
980    assert( state==PIECES_SORTED_BY_INDEX
981         || state==PIECES_SORTED_BY_WEIGHT );
982
983
984    if( state == PIECES_SORTED_BY_WEIGHT )
985    {
986        setComparePieceByWeightTorrent( t );
987        qsort( t->pieces, t->pieceCount, sizeof( struct weighted_piece ), comparePieceByWeight );
988    }
989    else
990        qsort( t->pieces, t->pieceCount, sizeof( struct weighted_piece ), comparePieceByIndex );
991
992    t->pieceSortState = state;
993}
994
995/**
996 * These functions are useful for testing, but too expensive for nightly builds.
997 * let's leave it disabled but add an easy hook to compile it back in
998 */
999#if 1
1000#define assertWeightedPiecesAreSorted(t)
1001#define assertReplicationCountIsExact(t)
1002#else
1003static void
1004assertWeightedPiecesAreSorted( Torrent * t )
1005{
1006    if( !t->endgame )
1007    {
1008        int i;
1009        setComparePieceByWeightTorrent( t );
1010        for( i=0; i<t->pieceCount-1; ++i )
1011            assert( comparePieceByWeight( &t->pieces[i], &t->pieces[i+1] ) <= 0 );
1012    }
1013}
1014static void
1015assertReplicationCountIsExact( Torrent * t )
1016{
1017    /* This assert might fail due to errors of implementations in other
1018     * clients. It happens when receiving duplicate bitfields/HaveAll/HaveNone
1019     * from a client. If a such a behavior is noticed,
1020     * a bug report should be filled to the faulty client. */
1021
1022    size_t piece_i;
1023    const uint16_t * rep = t->pieceReplication;
1024    const size_t piece_count = t->pieceReplicationSize;
1025    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1026    const int peer_count = tr_ptrArraySize( &t->peers );
1027
1028    assert( piece_count == t->tor->info.pieceCount );
1029
1030    for( piece_i=0; piece_i<piece_count; ++piece_i )
1031    {
1032        int peer_i;
1033        uint16_t r = 0;
1034
1035        for( peer_i=0; peer_i<peer_count; ++peer_i )
1036            if( tr_bitsetHas( &peers[peer_i]->have, piece_i ) )
1037                ++r;
1038
1039        assert( rep[piece_i] == r );
1040    }
1041}
1042#endif
1043
1044static struct weighted_piece *
1045pieceListLookup( Torrent * t, tr_piece_index_t index )
1046{
1047    int i;
1048
1049    for( i=0; i<t->pieceCount; ++i )
1050        if( t->pieces[i].index == index )
1051            return &t->pieces[i];
1052
1053    return NULL;
1054}
1055
1056static void
1057pieceListRebuild( Torrent * t )
1058{
1059
1060    if( !tr_torrentIsSeed( t->tor ) )
1061    {
1062        tr_piece_index_t i;
1063        tr_piece_index_t * pool;
1064        tr_piece_index_t poolCount = 0;
1065        const tr_torrent * tor = t->tor;
1066        const tr_info * inf = tr_torrentInfo( tor );
1067        struct weighted_piece * pieces;
1068        int pieceCount;
1069
1070        /* build the new list */
1071        pool = tr_new( tr_piece_index_t, inf->pieceCount );
1072        for( i=0; i<inf->pieceCount; ++i )
1073            if( !inf->pieces[i].dnd )
1074                if( !tr_cpPieceIsComplete( &tor->completion, i ) )
1075                    pool[poolCount++] = i;
1076        pieceCount = poolCount;
1077        pieces = tr_new0( struct weighted_piece, pieceCount );
1078        for( i=0; i<poolCount; ++i ) {
1079            struct weighted_piece * piece = pieces + i;
1080            piece->index = pool[i];
1081            piece->requestCount = 0;
1082            piece->salt = tr_cryptoWeakRandInt( 4096 );
1083        }
1084
1085        /* if we already had a list of pieces, merge it into
1086         * the new list so we don't lose its requestCounts */
1087        if( t->pieces != NULL )
1088        {
1089            struct weighted_piece * o = t->pieces;
1090            struct weighted_piece * oend = o + t->pieceCount;
1091            struct weighted_piece * n = pieces;
1092            struct weighted_piece * nend = n + pieceCount;
1093
1094            pieceListSort( t, PIECES_SORTED_BY_INDEX );
1095
1096            while( o!=oend && n!=nend ) {
1097                if( o->index < n->index )
1098                    ++o;
1099                else if( o->index > n->index )
1100                    ++n;
1101                else
1102                    *n++ = *o++;
1103            }
1104
1105            tr_free( t->pieces );
1106        }
1107
1108        t->pieces = pieces;
1109        t->pieceCount = pieceCount;
1110
1111        pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
1112
1113        /* cleanup */
1114        tr_free( pool );
1115    }
1116}
1117
1118static void
1119pieceListRemovePiece( Torrent * t, tr_piece_index_t piece )
1120{
1121    struct weighted_piece * p;
1122
1123    if(( p = pieceListLookup( t, piece )))
1124    {
1125        const int pos = p - t->pieces;
1126
1127        tr_removeElementFromArray( t->pieces,
1128                                   pos,
1129                                   sizeof( struct weighted_piece ),
1130                                   t->pieceCount-- );
1131
1132        if( t->pieceCount == 0 )
1133        {
1134            tr_free( t->pieces );
1135            t->pieces = NULL;
1136        }
1137    }
1138}
1139
1140static void
1141pieceListResortPiece( Torrent * t, struct weighted_piece * p )
1142{
1143    int pos;
1144    bool isSorted = true;
1145
1146    if( p == NULL )
1147        return;
1148
1149    /* is the torrent already sorted? */
1150    pos = p - t->pieces;
1151    setComparePieceByWeightTorrent( t );
1152    if( isSorted && ( pos > 0 ) && ( comparePieceByWeight( p-1, p ) > 0 ) )
1153        isSorted = false;
1154    if( isSorted && ( pos < t->pieceCount - 1 ) && ( comparePieceByWeight( p, p+1 ) > 0 ) )
1155        isSorted = false;
1156
1157    if( t->pieceSortState != PIECES_SORTED_BY_WEIGHT )
1158    {
1159       pieceListSort( t, PIECES_SORTED_BY_WEIGHT);
1160       isSorted = true;
1161    }
1162
1163    /* if it's not sorted, move it around */
1164    if( !isSorted )
1165    {
1166        bool exact;
1167        const struct weighted_piece tmp = *p;
1168
1169        tr_removeElementFromArray( t->pieces,
1170                                   pos,
1171                                   sizeof( struct weighted_piece ),
1172                                   t->pieceCount-- );
1173
1174        pos = tr_lowerBound( &tmp, t->pieces, t->pieceCount,
1175                             sizeof( struct weighted_piece ),
1176                             comparePieceByWeight, &exact );
1177
1178        memmove( &t->pieces[pos + 1],
1179                 &t->pieces[pos],
1180                 sizeof( struct weighted_piece ) * ( t->pieceCount++ - pos ) );
1181
1182        t->pieces[pos] = tmp;
1183    }
1184
1185    assertWeightedPiecesAreSorted( t );
1186}
1187
1188static void
1189pieceListRemoveRequest( Torrent * t, tr_block_index_t block )
1190{
1191    struct weighted_piece * p;
1192    const tr_piece_index_t index = tr_torBlockPiece( t->tor, block );
1193
1194    if( ((p = pieceListLookup( t, index ))) && ( p->requestCount > 0 ) )
1195    {
1196        --p->requestCount;
1197        pieceListResortPiece( t, p );
1198    }
1199}
1200
1201
1202/****
1203*****
1204*****  Replication count ( for rarest first policy )
1205*****
1206****/
1207
1208/**
1209 * Increase the replication count of this piece and sort it if the
1210 * piece list is already sorted
1211 */
1212static void
1213tr_incrReplicationOfPiece( Torrent * t, const size_t index )
1214{
1215    assert( replicationExists( t ) );
1216    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1217
1218    /* One more replication of this piece is present in the swarm */
1219    ++t->pieceReplication[index];
1220
1221    /* we only resort the piece if the list is already sorted */
1222    if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1223        pieceListResortPiece( t, pieceListLookup( t, index ) );
1224}
1225
1226/**
1227 * Increases the replication count of pieces present in the bitfield
1228 */
1229static void
1230tr_incrReplicationFromBitfield( Torrent * t, const tr_bitfield * b )
1231{
1232    size_t i;
1233    uint16_t * rep = t->pieceReplication;
1234    const size_t n = t->tor->info.pieceCount;
1235
1236    assert( replicationExists( t ) );
1237
1238    for( i=0; i<n; ++i )
1239        if( tr_bitfieldHas( b, i ) )
1240            ++rep[i];
1241
1242    if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1243        invalidatePieceSorting( t );
1244}
1245
1246/**
1247 * Increase the replication count of every piece
1248 */
1249static void
1250tr_incrReplication( Torrent * t )
1251{
1252    int i;
1253    const int n = t->pieceReplicationSize;
1254
1255    assert( replicationExists( t ) );
1256    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1257
1258    for( i=0; i<n; ++i )
1259        ++t->pieceReplication[i];
1260}
1261
1262/**
1263 * Decrease the replication count of pieces present in the bitset.
1264 */
1265static void
1266tr_decrReplicationFromBitfield( Torrent * t, const tr_bitfield * b )
1267{
1268    int i;
1269    const int n = t->pieceReplicationSize;
1270
1271    assert( replicationExists( t ) );
1272    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1273
1274    if( tr_bitfieldHasAll( b ) )
1275    {
1276        for( i=0; i<n; ++i )
1277            --t->pieceReplication[i];
1278    }
1279    else if ( !tr_bitfieldHasNone( b ) )
1280    {
1281        for( i=0; i<n; ++i )
1282            if( tr_bitfieldHas( b, i ) )
1283                --t->pieceReplication[i];
1284
1285        if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1286            invalidatePieceSorting( t );
1287    }
1288}
1289
1290/**
1291***
1292**/
1293
1294void
1295tr_peerMgrRebuildRequests( tr_torrent * tor )
1296{
1297    assert( tr_isTorrent( tor ) );
1298
1299    pieceListRebuild( tor->torrentPeers );
1300}
1301
1302void
1303tr_peerMgrGetNextRequests( tr_torrent           * tor,
1304                           tr_peer              * peer,
1305                           int                    numwant,
1306                           tr_block_index_t     * setme,
1307                           int                  * numgot )
1308{
1309    int i;
1310    int got;
1311    Torrent * t;
1312    struct weighted_piece * pieces;
1313    const tr_bitfield * const have = &peer->have;
1314
1315    /* sanity clause */
1316    assert( tr_isTorrent( tor ) );
1317    assert( peer->clientIsInterested );
1318    assert( !peer->clientIsChoked );
1319    assert( numwant > 0 );
1320
1321    /* walk through the pieces and find blocks that should be requested */
1322    got = 0;
1323    t = tor->torrentPeers;
1324
1325    /* prep the pieces list */
1326    if( t->pieces == NULL )
1327        pieceListRebuild( t );
1328
1329    if( t->pieceSortState != PIECES_SORTED_BY_WEIGHT )
1330        pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
1331
1332    assertReplicationCountIsExact( t );
1333    assertWeightedPiecesAreSorted( t );
1334
1335    updateEndgame( t );
1336    pieces = t->pieces;
1337    for( i=0; i<t->pieceCount && got<numwant; ++i )
1338    {
1339        struct weighted_piece * p = pieces + i;
1340
1341        /* if the peer has this piece that we want... */
1342        if( tr_bitfieldHas( have, p->index ) )
1343        {
1344            tr_block_index_t b;
1345            tr_block_index_t first;
1346            tr_block_index_t last;
1347            tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1348
1349            tr_torGetPieceBlockRange( tor, p->index, &first, &last );
1350
1351            for( b=first; b<=last && got<numwant; ++b )
1352            {
1353                int peerCount;
1354                tr_peer ** peers;
1355
1356                /* don't request blocks we've already got */
1357                if( tr_cpBlockIsComplete( &tor->completion, b ) )
1358                    continue;
1359
1360                /* always add peer if this block has no peers yet */
1361                tr_ptrArrayClear( &peerArr );
1362                getBlockRequestPeers( t, b, &peerArr );
1363                peers = (tr_peer **) tr_ptrArrayPeek( &peerArr, &peerCount );
1364                if( peerCount != 0 )
1365                {
1366                    /* don't make a second block request until the endgame */
1367                    if( !t->endgame )
1368                        continue;
1369
1370                    /* don't have more than two peers requesting this block */
1371                    if( peerCount > 1 )
1372                        continue;
1373
1374                    /* don't send the same request to the same peer twice */
1375                    if( peer == peers[0] )
1376                        continue;
1377
1378                    /* in the endgame allow an additional peer to download a
1379                       block but only if the peer seems to be handling requests
1380                       relatively fast */
1381                    if( peer->pendingReqsToPeer + numwant - got < t->endgame )
1382                        continue;
1383                }
1384
1385                /* update the caller's table */
1386                setme[got++] = b;
1387
1388                /* update our own tables */
1389                requestListAdd( t, b, peer );
1390                ++p->requestCount;
1391            }
1392
1393            tr_ptrArrayDestruct( &peerArr, NULL );
1394        }
1395    }
1396
1397    /* In most cases we've just changed the weights of a small number of pieces.
1398     * So rather than qsort()ing the entire array, it's faster to apply an
1399     * adaptive insertion sort algorithm. */
1400    if( got > 0 )
1401    {
1402        /* not enough requests || last piece modified */
1403        if ( i == t->pieceCount ) --i;
1404
1405        setComparePieceByWeightTorrent( t );
1406        while( --i >= 0 )
1407        {
1408            bool exact;
1409
1410            /* relative position! */
1411            const int newpos = tr_lowerBound( &t->pieces[i], &t->pieces[i + 1],
1412                                              t->pieceCount - (i + 1),
1413                                              sizeof( struct weighted_piece ),
1414                                              comparePieceByWeight, &exact );
1415            if( newpos > 0 )
1416            {
1417                const struct weighted_piece piece = t->pieces[i];
1418                memmove( &t->pieces[i],
1419                         &t->pieces[i + 1],
1420                         sizeof( struct weighted_piece ) * ( newpos ) );
1421                t->pieces[i + newpos] = piece;
1422            }
1423        }
1424    }
1425
1426    assertWeightedPiecesAreSorted( t );
1427    *numgot = got;
1428}
1429
1430bool
1431tr_peerMgrDidPeerRequest( const tr_torrent  * tor,
1432                          const tr_peer     * peer,
1433                          tr_block_index_t    block )
1434{
1435    const Torrent * t = tor->torrentPeers;
1436    return requestListLookup( (Torrent*)t, block, peer ) != NULL;
1437}
1438
1439/* cancel requests that are too old */
1440static void
1441refillUpkeep( int foo UNUSED, short bar UNUSED, void * vmgr )
1442{
1443    time_t now;
1444    time_t too_old;
1445    tr_torrent * tor;
1446    int cancel_buflen = 0;
1447    struct block_request * cancel = NULL;
1448    tr_peerMgr * mgr = vmgr;
1449    managerLock( mgr );
1450
1451    now = tr_time( );
1452    too_old = now - REQUEST_TTL_SECS;
1453
1454    /* alloc the temporary "cancel" buffer */
1455    tor = NULL;
1456    while(( tor = tr_torrentNext( mgr->session, tor )))
1457        cancel_buflen = MAX( cancel_buflen, tor->torrentPeers->requestCount );
1458    if( cancel_buflen > 0 )
1459        cancel = tr_new( struct block_request, cancel_buflen );
1460
1461    /* prune requests that are too old */
1462    tor = NULL;
1463    while(( tor = tr_torrentNext( mgr->session, tor )))
1464    {
1465        Torrent * t = tor->torrentPeers;
1466        const int n = t->requestCount;
1467        if( n > 0 )
1468        {
1469            int keepCount = 0;
1470            int cancelCount = 0;
1471            const struct block_request * it;
1472            const struct block_request * end;
1473
1474            for( it=t->requests, end=it+n; it!=end; ++it )
1475            {
1476                if( ( it->sentAt <= too_old ) && it->peer->msgs && !tr_peerMsgsIsReadingBlock( it->peer->msgs, it->block ) )
1477                    cancel[cancelCount++] = *it;
1478                else
1479                {
1480                    if( it != &t->requests[keepCount] )
1481                        t->requests[keepCount] = *it;
1482                    keepCount++;
1483                }
1484            }
1485
1486            /* prune out the ones we aren't keeping */
1487            t->requestCount = keepCount;
1488
1489            /* send cancel messages for all the "cancel" ones */
1490            for( it=cancel, end=it+cancelCount; it!=end; ++it ) {
1491                if( ( it->peer != NULL ) && ( it->peer->msgs != NULL ) ) {
1492                    tr_historyAdd( &it->peer->cancelsSentToPeer, now, 1 );
1493                    tr_peerMsgsCancel( it->peer->msgs, it->block );
1494                    decrementPendingReqCount( it );
1495                }
1496            }
1497
1498            /* decrement the pending request counts for the timed-out blocks */
1499            for( it=cancel, end=it+cancelCount; it!=end; ++it )
1500                pieceListRemoveRequest( t, it->block );
1501        }
1502    }
1503
1504    tr_free( cancel );
1505    tr_timerAddMsec( mgr->refillUpkeepTimer, REFILL_UPKEEP_PERIOD_MSEC );
1506    managerUnlock( mgr );
1507}
1508
1509static void
1510addStrike( Torrent * t, tr_peer * peer )
1511{
1512    tordbg( t, "increasing peer %s strike count to %d",
1513            tr_atomAddrStr( peer->atom ), peer->strikes + 1 );
1514
1515    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
1516    {
1517        struct peer_atom * atom = peer->atom;
1518        atom->flags2 |= MYFLAG_BANNED;
1519        peer->doPurge = 1;
1520        tordbg( t, "banning peer %s", tr_atomAddrStr( atom ) );
1521    }
1522}
1523
1524static void
1525gotBadPiece( Torrent * t, tr_piece_index_t pieceIndex )
1526{
1527    tr_torrent *   tor = t->tor;
1528    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
1529
1530    tor->corruptCur += byteCount;
1531    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1532
1533    tr_announcerAddBytes( tor, TR_ANN_CORRUPT, byteCount );
1534}
1535
1536static void
1537peerSuggestedPiece( Torrent            * t UNUSED,
1538                    tr_peer            * peer UNUSED,
1539                    tr_piece_index_t     pieceIndex UNUSED,
1540                    int                  isFastAllowed UNUSED )
1541{
1542#if 0
1543    assert( t );
1544    assert( peer );
1545    assert( peer->msgs );
1546
1547    /* is this a valid piece? */
1548    if(  pieceIndex >= t->tor->info.pieceCount )
1549        return;
1550
1551    /* don't ask for it if we've already got it */
1552    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
1553        return;
1554
1555    /* don't ask for it if they don't have it */
1556    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
1557        return;
1558
1559    /* don't ask for it if we're choked and it's not fast */
1560    if( !isFastAllowed && peer->clientIsChoked )
1561        return;
1562
1563    /* request the blocks that we don't have in this piece */
1564    {
1565        tr_block_index_t b;
1566        tr_block_index_t first;
1567        tr_block_index_t last;
1568        const tr_torrent * tor = t->tor;
1569
1570        tr_torGetPieceBlockRange( t->tor, pieceIndex, &first, &last );
1571
1572        for( b=first; b<=last; ++b )
1573        {
1574            if( !tr_cpBlockIsComplete( tor->completion, b ) )
1575            {
1576                const uint32_t offset = getBlockOffsetInPiece( tor, b );
1577                const uint32_t length = tr_torBlockCountBytes( tor, b );
1578                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
1579                incrementPieceRequests( t, pieceIndex );
1580            }
1581        }
1582    }
1583#endif
1584}
1585
1586static void
1587removeRequestFromTables( Torrent * t, tr_block_index_t block, const tr_peer * peer )
1588{
1589    requestListRemove( t, block, peer );
1590    pieceListRemoveRequest( t, block );
1591}
1592
1593/* peer choked us, or maybe it disconnected.
1594   either way we need to remove all its requests */
1595static void
1596peerDeclinedAllRequests( Torrent * t, const tr_peer * peer )
1597{
1598    int i, n;
1599    tr_block_index_t * blocks = tr_new( tr_block_index_t, t->requestCount );
1600
1601    for( i=n=0; i<t->requestCount; ++i )
1602        if( peer == t->requests[i].peer )
1603            blocks[n++] = t->requests[i].block;
1604
1605    for( i=0; i<n; ++i )
1606        removeRequestFromTables( t, blocks[i], peer );
1607
1608    tr_free( blocks );
1609}
1610
1611static void tr_peerMgrSetBlame( tr_torrent *, tr_piece_index_t, int );
1612
1613static void
1614peerCallbackFunc( tr_peer * peer, const tr_peer_event * e, void * vt )
1615{
1616    Torrent * t = vt;
1617
1618    torrentLock( t );
1619
1620    assert( peer != NULL );
1621
1622    switch( e->eventType )
1623    {
1624        case TR_PEER_PEER_GOT_DATA:
1625        {
1626            const time_t now = tr_time( );
1627            tr_torrent * tor = t->tor;
1628
1629            if( e->wasPieceData )
1630            {
1631                tor->uploadedCur += e->length;
1632                tr_announcerAddBytes( tor, TR_ANN_UP, e->length );
1633                tr_torrentSetActivityDate( tor, now );
1634                tr_torrentSetDirty( tor );
1635            }
1636
1637            /* update the stats */
1638            if( e->wasPieceData )
1639                tr_statsAddUploaded( tor->session, e->length );
1640
1641            /* update our atom */
1642            if( peer->atom && e->wasPieceData )
1643                peer->atom->piece_data_time = now;
1644
1645            break;
1646        }
1647
1648        case TR_PEER_CLIENT_GOT_HAVE:
1649            if( replicationExists( t ) ) {
1650                tr_incrReplicationOfPiece( t, e->pieceIndex );
1651                assertReplicationCountIsExact( t );
1652            }
1653            break;
1654
1655        case TR_PEER_CLIENT_GOT_HAVE_ALL:
1656            if( replicationExists( t ) ) {
1657                tr_incrReplication( t );
1658                assertReplicationCountIsExact( t );
1659            }
1660            break;
1661
1662        case TR_PEER_CLIENT_GOT_HAVE_NONE:
1663            /* noop */
1664            break;
1665
1666        case TR_PEER_CLIENT_GOT_BITFIELD:
1667            assert( e->bitfield != NULL );
1668            if( replicationExists( t ) ) {
1669                tr_incrReplicationFromBitfield( t, e->bitfield );
1670                assertReplicationCountIsExact( t );
1671            }
1672            break;
1673
1674        case TR_PEER_CLIENT_GOT_REJ: {
1675            tr_block_index_t b = _tr_block( t->tor, e->pieceIndex, e->offset );
1676            if( b < t->tor->blockCount )
1677                removeRequestFromTables( t, b, peer );
1678            else
1679                tordbg( t, "Peer %s sent an out-of-range reject message",
1680                           tr_atomAddrStr( peer->atom ) );
1681            break;
1682        }
1683
1684        case TR_PEER_CLIENT_GOT_CHOKE:
1685            peerDeclinedAllRequests( t, peer );
1686            break;
1687
1688        case TR_PEER_CLIENT_GOT_PORT:
1689            if( peer->atom )
1690                peer->atom->port = e->port;
1691            break;
1692
1693        case TR_PEER_CLIENT_GOT_SUGGEST:
1694            peerSuggestedPiece( t, peer, e->pieceIndex, false );
1695            break;
1696
1697        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1698            peerSuggestedPiece( t, peer, e->pieceIndex, true );
1699            break;
1700
1701        case TR_PEER_CLIENT_GOT_DATA:
1702        {
1703            const time_t now = tr_time( );
1704            tr_torrent * tor = t->tor;
1705
1706            if( e->wasPieceData )
1707            {
1708                tor->downloadedCur += e->length;
1709                tr_torrentSetActivityDate( tor, now );
1710                tr_torrentSetDirty( tor );
1711            }
1712
1713            /* update the stats */
1714            if( e->wasPieceData )
1715                tr_statsAddDownloaded( tor->session, e->length );
1716
1717            /* update our atom */
1718            if( peer->atom && e->wasPieceData )
1719                peer->atom->piece_data_time = now;
1720
1721            break;
1722        }
1723
1724        case TR_PEER_CLIENT_GOT_BLOCK:
1725        {
1726            tr_torrent * tor = t->tor;
1727            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1728            int i, peerCount;
1729            tr_peer ** peers;
1730            tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1731
1732            removeRequestFromTables( t, block, peer );
1733            getBlockRequestPeers( t, block, &peerArr );
1734            peers = (tr_peer **) tr_ptrArrayPeek( &peerArr, &peerCount );
1735
1736            /* remove additional block requests and send cancel to peers */
1737            for( i=0; i<peerCount; i++ ) {
1738                tr_peer * p = peers[i];
1739                assert( p != peer );
1740                if( p->msgs ) {
1741                    tr_historyAdd( &p->cancelsSentToPeer, tr_time( ), 1 );
1742                    tr_peerMsgsCancel( p->msgs, block );
1743                }
1744                removeRequestFromTables( t, block, p );
1745            }
1746
1747            tr_ptrArrayDestruct( &peerArr, false );
1748
1749            tr_historyAdd( &peer->blocksSentToClient, tr_time( ), 1 );
1750
1751            if( tr_cpBlockIsComplete( &tor->completion, block ) )
1752            {
1753                /* we already have this block... */
1754                const uint32_t n = tr_torBlockCountBytes( tor, block );
1755                tor->downloadedCur -= MIN( tor->downloadedCur, n );
1756                tordbg( t, "we have this block already..." );
1757            }
1758            else
1759            {
1760                tr_cpBlockAdd( &tor->completion, block );
1761                pieceListResortPiece( t, pieceListLookup( t, e->pieceIndex ) );
1762                tr_torrentSetDirty( tor );
1763
1764                if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1765                {
1766                    const tr_piece_index_t p = e->pieceIndex;
1767                    const bool ok = tr_torrentCheckPiece( tor, p );
1768
1769                    tordbg( t, "[LAZY] checked just-completed piece %zu", (size_t)p );
1770
1771                    if( !ok )
1772                    {
1773                        tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1774                                   (unsigned long)p );
1775                    }
1776
1777                    tr_peerMgrSetBlame( tor, p, ok );
1778
1779                    if( !ok )
1780                    {
1781                        gotBadPiece( t, p );
1782                    }
1783                    else
1784                    {
1785                        int i;
1786                        int peerCount;
1787                        tr_peer ** peers;
1788                        tr_file_index_t fileIndex;
1789
1790                        /* only add this to downloadedCur if we got it from a peer --
1791                         * webseeds shouldn't count against our ratio. As one tracker
1792                         * admin put it, "Those pieces are downloaded directly from the
1793                         * content distributor, not the peers, it is the tracker's job
1794                         * to manage the swarms, not the web server and does not fit
1795                         * into the jurisdiction of the tracker." */
1796                        if( peer->msgs != NULL ) {
1797                            const uint32_t n = tr_torPieceCountBytes( tor, p );
1798                            tr_announcerAddBytes( tor, TR_ANN_DOWN, n );
1799                        }
1800
1801                        peerCount = tr_ptrArraySize( &t->peers );
1802                        peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1803                        for( i=0; i<peerCount; ++i )
1804                            tr_peerMsgsHave( peers[i]->msgs, p );
1805
1806                        for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex ) {
1807                            const tr_file * file = &tor->info.files[fileIndex];
1808                            if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) ) {
1809                                if( tr_cpFileIsComplete( &tor->completion, fileIndex ) ) {
1810                                    tr_cacheFlushFile( tor->session->cache, tor, fileIndex );
1811                                    tr_torrentFileCompleted( tor, fileIndex );
1812                                }
1813                            }
1814                        }
1815
1816                        pieceListRemovePiece( t, p );
1817                    }
1818                }
1819
1820                t->needsCompletenessCheck = true;
1821            }
1822            break;
1823        }
1824
1825        case TR_PEER_ERROR:
1826            if( ( e->err == ERANGE ) || ( e->err == EMSGSIZE ) || ( e->err == ENOTCONN ) )
1827            {
1828                /* some protocol error from the peer */
1829                peer->doPurge = 1;
1830                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1831                        tr_atomAddrStr( peer->atom ) );
1832            }
1833            else
1834            {
1835                tordbg( t, "unhandled error: %s", tr_strerror( e->err ) );
1836            }
1837            break;
1838
1839        default:
1840            assert( 0 );
1841    }
1842
1843    torrentUnlock( t );
1844}
1845
1846static int
1847getDefaultShelfLife( uint8_t from )
1848{
1849    /* in general, peers obtained from firsthand contact
1850     * are better than those from secondhand, etc etc */
1851    switch( from )
1852    {
1853        case TR_PEER_FROM_INCOMING : return 60 * 60 * 6;
1854        case TR_PEER_FROM_LTEP     : return 60 * 60 * 6;
1855        case TR_PEER_FROM_TRACKER  : return 60 * 60 * 3;
1856        case TR_PEER_FROM_DHT      : return 60 * 60 * 3;
1857        case TR_PEER_FROM_PEX      : return 60 * 60 * 2;
1858        case TR_PEER_FROM_RESUME   : return 60 * 60;
1859        case TR_PEER_FROM_LPD      : return 10 * 60;
1860        default                    : return 60 * 60;
1861    }
1862}
1863
1864static void
1865ensureAtomExists( Torrent           * t,
1866                  const tr_address  * addr,
1867                  const tr_port       port,
1868                  const uint8_t       flags,
1869                  const int8_t        seedProbability,
1870                  const uint8_t       from )
1871{
1872    struct peer_atom * a;
1873
1874    assert( tr_address_is_valid( addr ) );
1875    assert( from < TR_PEER_FROM__MAX );
1876
1877    a = getExistingAtom( t, addr );
1878
1879    if( a == NULL )
1880    {
1881        const int jitter = tr_cryptoWeakRandInt( 60*10 );
1882        a = tr_new0( struct peer_atom, 1 );
1883        a->addr = *addr;
1884        a->port = port;
1885        a->flags = flags;
1886        a->fromFirst = from;
1887        a->fromBest = from;
1888        a->shelf_date = tr_time( ) + getDefaultShelfLife( from ) + jitter;
1889        a->blocklisted = -1;
1890        atomSetSeedProbability( a, seedProbability );
1891        tr_ptrArrayInsertSorted( &t->pool, a, compareAtomsByAddress );
1892
1893        tordbg( t, "got a new atom: %s", tr_atomAddrStr( a ) );
1894    }
1895    else
1896    {
1897        if( from < a->fromBest )
1898            a->fromBest = from;
1899
1900        if( a->seedProbability == -1 )
1901            atomSetSeedProbability( a, seedProbability );
1902
1903        a->flags |= flags;
1904    }
1905}
1906
1907static int
1908getMaxPeerCount( const tr_torrent * tor )
1909{
1910    return tor->maxConnectedPeers;
1911}
1912
1913static int
1914getPeerCount( const Torrent * t )
1915{
1916    return tr_ptrArraySize( &t->peers );/* + tr_ptrArraySize( &t->outgoingHandshakes ); */
1917}
1918
1919/* FIXME: this is kind of a mess. */
1920static bool
1921myHandshakeDoneCB( tr_handshake  * handshake,
1922                   tr_peerIo     * io,
1923                   bool            readAnythingFromPeer,
1924                   bool            isConnected,
1925                   const uint8_t * peer_id,
1926                   void          * vmanager )
1927{
1928    bool               ok = isConnected;
1929    bool               success = false;
1930    tr_port            port;
1931    const tr_address * addr;
1932    tr_peerMgr       * manager = vmanager;
1933    Torrent          * t;
1934    tr_handshake     * ours;
1935
1936    assert( io );
1937    assert( tr_isBool( ok ) );
1938
1939    t = tr_peerIoHasTorrentHash( io )
1940        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1941        : NULL;
1942
1943    if( tr_peerIoIsIncoming ( io ) )
1944        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1945                                        handshake, handshakeCompare );
1946    else if( t )
1947        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1948                                        handshake, handshakeCompare );
1949    else
1950        ours = handshake;
1951
1952    assert( ours );
1953    assert( ours == handshake );
1954
1955    if( t )
1956        torrentLock( t );
1957
1958    addr = tr_peerIoGetAddress( io, &port );
1959
1960    if( !ok || !t || !t->isRunning )
1961    {
1962        if( t )
1963        {
1964            struct peer_atom * atom = getExistingAtom( t, addr );
1965            if( atom )
1966            {
1967                ++atom->numFails;
1968
1969                if( !readAnythingFromPeer )
1970                {
1971                    tordbg( t, "marking peer %s as unreachable... numFails is %d", tr_atomAddrStr( atom ), (int)atom->numFails );
1972                    atom->flags2 |= MYFLAG_UNREACHABLE;
1973                }
1974            }
1975        }
1976    }
1977    else /* looking good */
1978    {
1979        struct peer_atom * atom;
1980
1981        ensureAtomExists( t, addr, port, 0, -1, TR_PEER_FROM_INCOMING );
1982        atom = getExistingAtom( t, addr );
1983        atom->time = tr_time( );
1984        atom->piece_data_time = 0;
1985        atom->lastConnectionAt = tr_time( );
1986
1987        if( !tr_peerIoIsIncoming( io ) )
1988        {
1989            atom->flags |= ADDED_F_CONNECTABLE;
1990            atom->flags2 &= ~MYFLAG_UNREACHABLE;
1991        }
1992
1993        /* In principle, this flag specifies whether the peer groks uTP,
1994           not whether it's currently connected over uTP. */
1995        if( io->utp_socket )
1996            atom->flags |= ADDED_F_UTP_FLAGS;
1997
1998        if( atom->flags2 & MYFLAG_BANNED )
1999        {
2000            tordbg( t, "banned peer %s tried to reconnect",
2001                    tr_atomAddrStr( atom ) );
2002        }
2003        else if( tr_peerIoIsIncoming( io )
2004               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
2005
2006        {
2007        }
2008        else
2009        {
2010            tr_peer * peer = atom->peer;
2011
2012            if( peer ) /* we already have this peer */
2013            {
2014            }
2015            else
2016            {
2017                peer = getPeer( t, atom );
2018                tr_free( peer->client );
2019
2020                if( !peer_id )
2021                    peer->client = NULL;
2022                else {
2023                    char client[128];
2024                    tr_clientForId( client, sizeof( client ), peer_id );
2025                    peer->client = tr_strdup( client );
2026                }
2027
2028                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
2029                                                                balanced by our unref in peerDelete()  */
2030                tr_peerIoSetParent( peer->io, &t->tor->bandwidth );
2031                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t );
2032
2033                success = true;
2034            }
2035        }
2036    }
2037
2038    if( t )
2039        torrentUnlock( t );
2040
2041    return success;
2042}
2043
2044void
2045tr_peerMgrAddIncoming( tr_peerMgr * manager,
2046                       tr_address * addr,
2047                       tr_port      port,
2048                       int          socket,
2049                       struct UTPSocket * utp_socket )
2050{
2051    tr_session * session;
2052
2053    managerLock( manager );
2054
2055    assert( tr_isSession( manager->session ) );
2056    session = manager->session;
2057
2058    if( tr_sessionIsAddressBlocked( session, addr ) )
2059    {
2060        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_address_to_string( addr ) );
2061        if(socket >= 0)
2062            tr_netClose( session, socket );
2063        else
2064            UTP_Close( utp_socket );
2065    }
2066    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
2067    {
2068        if(socket >= 0)
2069            tr_netClose( session, socket );
2070        else
2071            UTP_Close( utp_socket );
2072    }
2073    else /* we don't have a connection to them yet... */
2074    {
2075        tr_peerIo *    io;
2076        tr_handshake * handshake;
2077
2078        io = tr_peerIoNewIncoming( session, &session->bandwidth, addr, port, socket, utp_socket );
2079
2080        handshake = tr_handshakeNew( io,
2081                                     session->encryptionMode,
2082                                     myHandshakeDoneCB,
2083                                     manager );
2084
2085        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
2086
2087        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
2088                                 handshakeCompare );
2089    }
2090
2091    managerUnlock( manager );
2092}
2093
2094void
2095tr_peerMgrAddPex( tr_torrent * tor, uint8_t from,
2096                  const tr_pex * pex, int8_t seedProbability )
2097{
2098    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
2099    {
2100        Torrent * t = tor->torrentPeers;
2101        managerLock( t->manager );
2102
2103        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
2104            if( tr_address_is_valid_for_peers( &pex->addr, pex->port ) )
2105                ensureAtomExists( t, &pex->addr, pex->port, pex->flags, seedProbability, from );
2106
2107        managerUnlock( t->manager );
2108    }
2109}
2110
2111void
2112tr_peerMgrMarkAllAsSeeds( tr_torrent * tor )
2113{
2114    Torrent * t = tor->torrentPeers;
2115    const int n = tr_ptrArraySize( &t->pool );
2116    struct peer_atom ** it = (struct peer_atom**) tr_ptrArrayBase( &t->pool );
2117    struct peer_atom ** end = it + n;
2118
2119    while( it != end )
2120        atomSetSeed( t, *it++ );
2121}
2122
2123tr_pex *
2124tr_peerMgrCompactToPex( const void *    compact,
2125                        size_t          compactLen,
2126                        const uint8_t * added_f,
2127                        size_t          added_f_len,
2128                        size_t *        pexCount )
2129{
2130    size_t          i;
2131    size_t          n = compactLen / 6;
2132    const uint8_t * walk = compact;
2133    tr_pex *        pex = tr_new0( tr_pex, n );
2134
2135    for( i = 0; i < n; ++i )
2136    {
2137        pex[i].addr.type = TR_AF_INET;
2138        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
2139        memcpy( &pex[i].port, walk, 2 ); walk += 2;
2140        if( added_f && ( n == added_f_len ) )
2141            pex[i].flags = added_f[i];
2142    }
2143
2144    *pexCount = n;
2145    return pex;
2146}
2147
2148tr_pex *
2149tr_peerMgrCompact6ToPex( const void    * compact,
2150                         size_t          compactLen,
2151                         const uint8_t * added_f,
2152                         size_t          added_f_len,
2153                         size_t        * pexCount )
2154{
2155    size_t          i;
2156    size_t          n = compactLen / 18;
2157    const uint8_t * walk = compact;
2158    tr_pex *        pex = tr_new0( tr_pex, n );
2159
2160    for( i = 0; i < n; ++i )
2161    {
2162        pex[i].addr.type = TR_AF_INET6;
2163        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
2164        memcpy( &pex[i].port, walk, 2 ); walk += 2;
2165        if( added_f && ( n == added_f_len ) )
2166            pex[i].flags = added_f[i];
2167    }
2168
2169    *pexCount = n;
2170    return pex;
2171}
2172
2173tr_pex *
2174tr_peerMgrArrayToPex( const void * array,
2175                      size_t       arrayLen,
2176                      size_t      * pexCount )
2177{
2178    size_t          i;
2179    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
2180    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
2181    const uint8_t * walk = array;
2182    tr_pex        * pex = tr_new0( tr_pex, n );
2183
2184    for( i = 0 ; i < n ; i++ ) {
2185        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
2186        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
2187        pex[i].flags = 0x00;
2188        walk += sizeof( tr_address ) + 2;
2189    }
2190
2191    *pexCount = n;
2192    return pex;
2193}
2194
2195/**
2196***
2197**/
2198
2199static void
2200tr_peerMgrSetBlame( tr_torrent     * tor,
2201                    tr_piece_index_t pieceIndex,
2202                    int              success )
2203{
2204    if( !success )
2205    {
2206        int        peerCount, i;
2207        Torrent *  t = tor->torrentPeers;
2208        tr_peer ** peers;
2209
2210        assert( torrentIsLocked( t ) );
2211
2212        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
2213        for( i = 0; i < peerCount; ++i )
2214        {
2215            tr_peer * peer = peers[i];
2216            if( tr_bitfieldHas( &peer->blame, pieceIndex ) )
2217            {
2218                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
2219                        tr_atomAddrStr( peer->atom ),
2220                        pieceIndex, (int)peer->strikes + 1 );
2221                addStrike( t, peer );
2222            }
2223        }
2224    }
2225}
2226
2227int
2228tr_pexCompare( const void * va, const void * vb )
2229{
2230    const tr_pex * a = va;
2231    const tr_pex * b = vb;
2232    int i;
2233
2234    assert( tr_isPex( a ) );
2235    assert( tr_isPex( b ) );
2236
2237    if(( i = tr_address_compare( &a->addr, &b->addr )))
2238        return i;
2239
2240    if( a->port != b->port )
2241        return a->port < b->port ? -1 : 1;
2242
2243    return 0;
2244}
2245
2246#if 0
2247static int
2248peerPrefersCrypto( const tr_peer * peer )
2249{
2250    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
2251        return true;
2252
2253    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
2254        return false;
2255
2256    return tr_peerIoIsEncrypted( peer->io );
2257}
2258#endif
2259
2260/* better goes first */
2261static int
2262compareAtomsByUsefulness( const void * va, const void *vb )
2263{
2264    const struct peer_atom * a = * (const struct peer_atom**) va;
2265    const struct peer_atom * b = * (const struct peer_atom**) vb;
2266
2267    assert( tr_isAtom( a ) );
2268    assert( tr_isAtom( b ) );
2269
2270    if( a->piece_data_time != b->piece_data_time )
2271        return a->piece_data_time > b->piece_data_time ? -1 : 1;
2272    if( a->fromBest != b->fromBest )
2273        return a->fromBest < b->fromBest ? -1 : 1;
2274    if( a->numFails != b->numFails )
2275        return a->numFails < b->numFails ? -1 : 1;
2276
2277    return 0;
2278}
2279
2280static bool
2281isAtomInteresting( const tr_torrent * tor, struct peer_atom * atom )
2282{
2283    if( tr_torrentIsSeed( tor ) && atomIsSeed( atom ) )
2284        return false;
2285
2286    if( peerIsInUse( tor->torrentPeers, atom ) )
2287        return true;
2288
2289    if( isAtomBlocklisted( tor->session, atom ) )
2290        return false;
2291
2292    if( atom->flags2 & MYFLAG_BANNED )
2293        return false;
2294
2295    return true;
2296}
2297
2298int
2299tr_peerMgrGetPeers( tr_torrent   * tor,
2300                    tr_pex      ** setme_pex,
2301                    uint8_t        af,
2302                    uint8_t        list_mode,
2303                    int            maxCount )
2304{
2305    int i;
2306    int n;
2307    int count = 0;
2308    int atomCount = 0;
2309    const Torrent * t = tor->torrentPeers;
2310    struct peer_atom ** atoms = NULL;
2311    tr_pex * pex;
2312    tr_pex * walk;
2313
2314    assert( tr_isTorrent( tor ) );
2315    assert( setme_pex != NULL );
2316    assert( af==TR_AF_INET || af==TR_AF_INET6 );
2317    assert( list_mode==TR_PEERS_CONNECTED || list_mode==TR_PEERS_INTERESTING );
2318
2319    managerLock( t->manager );
2320
2321    /**
2322    ***  build a list of atoms
2323    **/
2324
2325    if( list_mode == TR_PEERS_CONNECTED ) /* connected peers only */
2326    {
2327        int i;
2328        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2329        atomCount = tr_ptrArraySize( &t->peers );
2330        atoms = tr_new( struct peer_atom *, atomCount );
2331        for( i=0; i<atomCount; ++i )
2332            atoms[i] = peers[i]->atom;
2333    }
2334    else /* TR_PEERS_INTERESTING */
2335    {
2336        int i;
2337        struct peer_atom ** atomBase = (struct peer_atom**) tr_ptrArrayBase( &t->pool );
2338        n = tr_ptrArraySize( &t->pool );
2339        atoms = tr_new( struct peer_atom *, n );
2340        for( i=0; i<n; ++i )
2341            if( isAtomInteresting( tor, atomBase[i] ) )
2342                atoms[atomCount++] = atomBase[i];
2343    }
2344
2345    qsort( atoms, atomCount, sizeof( struct peer_atom * ), compareAtomsByUsefulness );
2346
2347    /**
2348    ***  add the first N of them into our return list
2349    **/
2350
2351    n = MIN( atomCount, maxCount );
2352    pex = walk = tr_new0( tr_pex, n );
2353
2354    for( i=0; i<atomCount && count<n; ++i )
2355    {
2356        const struct peer_atom * atom = atoms[i];
2357        if( atom->addr.type == af )
2358        {
2359            assert( tr_address_is_valid( &atom->addr ) );
2360            walk->addr = atom->addr;
2361            walk->port = atom->port;
2362            walk->flags = atom->flags;
2363            ++count;
2364            ++walk;
2365        }
2366    }
2367
2368    qsort( pex, count, sizeof( tr_pex ), tr_pexCompare );
2369
2370    assert( ( walk - pex ) == count );
2371    *setme_pex = pex;
2372
2373    /* cleanup */
2374    tr_free( atoms );
2375    managerUnlock( t->manager );
2376    return count;
2377}
2378
2379static void atomPulse      ( int, short, void * );
2380static void bandwidthPulse ( int, short, void * );
2381static void rechokePulse   ( int, short, void * );
2382static void reconnectPulse ( int, short, void * );
2383
2384static struct event *
2385createTimer( tr_session * session, int msec, void (*callback)(int, short, void *), void * cbdata )
2386{
2387    struct event * timer = evtimer_new( session->event_base, callback, cbdata );
2388    tr_timerAddMsec( timer, msec );
2389    return timer;
2390}
2391
2392static void
2393ensureMgrTimersExist( struct tr_peerMgr * m )
2394{
2395    if( m->atomTimer == NULL )
2396        m->atomTimer = createTimer( m->session, ATOM_PERIOD_MSEC, atomPulse, m );
2397
2398    if( m->bandwidthTimer == NULL )
2399        m->bandwidthTimer = createTimer( m->session, BANDWIDTH_PERIOD_MSEC, bandwidthPulse, m );
2400
2401    if( m->rechokeTimer == NULL )
2402        m->rechokeTimer = createTimer( m->session, RECHOKE_PERIOD_MSEC, rechokePulse, m );
2403
2404    if( m->refillUpkeepTimer == NULL )
2405        m->refillUpkeepTimer = createTimer( m->session, REFILL_UPKEEP_PERIOD_MSEC, refillUpkeep, m );
2406}
2407
2408void
2409tr_peerMgrStartTorrent( tr_torrent * tor )
2410{
2411    Torrent * t = tor->torrentPeers;
2412
2413    assert( tr_isTorrent( tor ) );
2414    assert( tr_torrentIsLocked( tor ) );
2415
2416    ensureMgrTimersExist( t->manager );
2417
2418    t->isRunning = true;
2419    t->maxPeers = t->tor->maxConnectedPeers;
2420    t->pieceSortState = PIECES_UNSORTED;
2421
2422    rechokePulse( 0, 0, t->manager );
2423}
2424
2425static void
2426stopTorrent( Torrent * t )
2427{
2428    tr_peer * peer;
2429
2430    t->isRunning = false;
2431
2432    replicationFree( t );
2433    invalidatePieceSorting( t );
2434
2435    /* disconnect the peers. */
2436    while(( peer = tr_ptrArrayPop( &t->peers )))
2437        peerDelete( t, peer );
2438
2439    /* disconnect the handshakes. handshakeAbort calls handshakeDoneCB(),
2440     * which removes the handshake from t->outgoingHandshakes... */
2441    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
2442        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
2443}
2444
2445void
2446tr_peerMgrStopTorrent( tr_torrent * tor )
2447{
2448    assert( tr_isTorrent( tor ) );
2449    assert( tr_torrentIsLocked( tor ) );
2450
2451    stopTorrent( tor->torrentPeers );
2452}
2453
2454void
2455tr_peerMgrAddTorrent( tr_peerMgr * manager, tr_torrent * tor )
2456{
2457    assert( tr_isTorrent( tor ) );
2458    assert( tr_torrentIsLocked( tor ) );
2459    assert( tor->torrentPeers == NULL );
2460
2461    tor->torrentPeers = torrentNew( manager, tor );
2462}
2463
2464void
2465tr_peerMgrRemoveTorrent( tr_torrent * tor )
2466{
2467    assert( tr_isTorrent( tor ) );
2468    assert( tr_torrentIsLocked( tor ) );
2469
2470    stopTorrent( tor->torrentPeers );
2471    torrentFree( tor->torrentPeers );
2472}
2473
2474void
2475tr_peerUpdateProgress( tr_torrent * tor, tr_peer * peer )
2476{
2477    const tr_bitfield * have = &peer->have;
2478
2479    if( tr_bitfieldHasAll( have ) )
2480    {
2481        peer->progress = 1.0;
2482    }
2483    else if( tr_bitfieldHasNone( have ) )
2484    {
2485        peer->progress = 0.0;
2486    }
2487    else
2488    {
2489        const float true_count = tr_bitfieldCountTrueBits( have );
2490
2491        if( tr_torrentHasMetadata( tor ) )
2492            peer->progress = true_count / tor->info.pieceCount;
2493        else /* without pieceCount, this result is only a best guess... */
2494            peer->progress = true_count / ( have->bit_count + 1 );
2495    }
2496
2497    if( peer->atom && ( peer->progress >= 1.0 ) )
2498        atomSetSeed( tor->torrentPeers, peer->atom );
2499}
2500
2501void
2502tr_peerMgrOnTorrentGotMetainfo( tr_torrent * tor )
2503{
2504    int i;
2505    const int peerCount = tr_ptrArraySize( &tor->torrentPeers->peers );
2506    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &tor->torrentPeers->peers );
2507
2508    /* some peer_msgs' progress fields may not be accurate if we
2509       didn't have the metadata before now... so refresh them all... */
2510    for( i=0; i<peerCount; ++i )
2511        tr_peerUpdateProgress( tor, peers[i] );
2512}
2513
2514void
2515tr_peerMgrTorrentAvailability( const tr_torrent * tor, int8_t * tab, unsigned int tabCount )
2516{
2517    assert( tr_isTorrent( tor ) );
2518    assert( torrentIsLocked( tor->torrentPeers ) );
2519    assert( tab != NULL );
2520    assert( tabCount > 0 );
2521
2522    memset( tab, 0, tabCount );
2523
2524    if( tr_torrentHasMetadata( tor ) )
2525    {
2526        tr_piece_index_t i;
2527        const int peerCount = tr_ptrArraySize( &tor->torrentPeers->peers );
2528        const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &tor->torrentPeers->peers );
2529        const float interval = tor->info.pieceCount / (float)tabCount;
2530        const bool isSeed = tr_cpGetStatus( &tor->completion ) == TR_SEED;
2531
2532        for( i=0; i<tabCount; ++i )
2533        {
2534            const int piece = i * interval;
2535
2536            if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
2537                tab[i] = -1;
2538            else if( peerCount ) {
2539                int j;
2540                for( j=0; j<peerCount; ++j )
2541                    if( tr_bitfieldHas( &peers[j]->have, piece ) )
2542                        ++tab[i];
2543            }
2544        }
2545    }
2546}
2547
2548static bool
2549peerIsSeed( const tr_peer * peer )
2550{
2551    if( peer->progress >= 1.0 )
2552        return true;
2553
2554    if( peer->atom && atomIsSeed( peer->atom ) )
2555        return true;
2556
2557    return false;
2558}
2559
2560/* count how many bytes we want that connected peers have */
2561uint64_t
2562tr_peerMgrGetDesiredAvailable( const tr_torrent * tor )
2563{
2564    size_t i;
2565    size_t n;
2566    uint64_t desiredAvailable;
2567    const Torrent * t = tor->torrentPeers;
2568
2569    /* common shortcuts... */
2570
2571    if( tr_torrentIsSeed( t->tor ) )
2572        return 0;
2573
2574    if( !tr_torrentHasMetadata( tor ) )
2575        return 0;
2576
2577    n = tr_ptrArraySize( &t->peers );
2578    if( n == 0 )
2579        return 0;
2580    else {
2581        const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2582        for( i=0; i<n; ++i )
2583            if( peers[i]->atom && atomIsSeed( peers[i]->atom ) )
2584                return tr_cpLeftUntilDone( &tor->completion );
2585    }
2586
2587    if( !t->pieceReplication || !t->pieceReplicationSize )
2588        return 0;
2589
2590    /* do it the hard way */
2591
2592    desiredAvailable = 0;
2593    for( i=0, n=MIN(tor->info.pieceCount, t->pieceReplicationSize); i<n; ++i )
2594        if( !tor->info.pieces[i].dnd && ( t->pieceReplication[i] > 0 ) )
2595            desiredAvailable += tr_cpMissingBytesInPiece( &t->tor->completion, i );
2596
2597    assert( desiredAvailable <= tor->info.totalSize );
2598    return desiredAvailable;
2599}
2600
2601void
2602tr_peerMgrTorrentStats( tr_torrent  * tor,
2603                        int         * setmePeersConnected,
2604                        int         * setmeWebseedsSendingToUs,
2605                        int         * setmePeersSendingToUs,
2606                        int         * setmePeersGettingFromUs,
2607                        int         * setmePeersFrom )
2608{
2609    int i, size;
2610    const Torrent * t = tor->torrentPeers;
2611    const tr_peer ** peers;
2612
2613    assert( tr_torrentIsLocked( tor ) );
2614
2615    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2616    size = tr_ptrArraySize( &t->peers );
2617
2618    *setmePeersConnected       = 0;
2619    *setmePeersGettingFromUs   = 0;
2620    *setmePeersSendingToUs     = 0;
2621    *setmeWebseedsSendingToUs  = 0;
2622
2623    for( i=0; i<TR_PEER_FROM__MAX; ++i )
2624        setmePeersFrom[i] = 0;
2625
2626    for( i=0; i<size; ++i )
2627    {
2628        const tr_peer * peer = peers[i];
2629        const struct peer_atom * atom = peer->atom;
2630
2631        if( peer->io == NULL ) /* not connected */
2632            continue;
2633
2634        ++*setmePeersConnected;
2635
2636        ++setmePeersFrom[atom->fromFirst];
2637
2638        if( clientIsDownloadingFrom( tor, peer ) )
2639            ++*setmePeersSendingToUs;
2640
2641        if( clientIsUploadingTo( peer ) )
2642            ++*setmePeersGettingFromUs;
2643    }
2644
2645    *setmeWebseedsSendingToUs = countActiveWebseeds( t );
2646}
2647
2648double*
2649tr_peerMgrWebSpeeds_KBps( const tr_torrent * tor )
2650{
2651    int i;
2652    const Torrent * t = tor->torrentPeers;
2653    const int webseedCount = tr_ptrArraySize( &t->webseeds );
2654    const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2655    const uint64_t now = tr_time_msec( );
2656    double * ret = tr_new0( double, webseedCount );
2657
2658    assert( tr_isTorrent( tor ) );
2659    assert( tr_torrentIsLocked( tor ) );
2660    assert( t->manager != NULL );
2661    assert( webseedCount == tor->info.webseedCount );
2662
2663    for( i=0; i<webseedCount; ++i ) {
2664        int Bps;
2665        if( tr_webseedGetSpeed_Bps( webseeds[i], now, &Bps ) )
2666            ret[i] = Bps / (double)tr_speed_K;
2667        else
2668            ret[i] = -1.0;
2669    }
2670
2671    return ret;
2672}
2673
2674int
2675tr_peerGetPieceSpeed_Bps( const tr_peer * peer, uint64_t now, tr_direction direction )
2676{
2677    return peer->io ? tr_peerIoGetPieceSpeed_Bps( peer->io, now, direction ) : 0.0;
2678}
2679
2680struct tr_peer_stat *
2681tr_peerMgrPeerStats( const tr_torrent * tor, int * setmeCount )
2682{
2683    int i;
2684    const Torrent * t = tor->torrentPeers;
2685    const int size = tr_ptrArraySize( &t->peers );
2686    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2687    const uint64_t now_msec = tr_time_msec( );
2688    const time_t now = tr_time();
2689    tr_peer_stat * ret = tr_new0( tr_peer_stat, size );
2690
2691    assert( tr_isTorrent( tor ) );
2692    assert( tr_torrentIsLocked( tor ) );
2693    assert( t->manager );
2694
2695    for( i=0; i<size; ++i )
2696    {
2697        char *                   pch;
2698        const tr_peer *          peer = peers[i];
2699        const struct peer_atom * atom = peer->atom;
2700        tr_peer_stat *           stat = ret + i;
2701
2702        tr_address_to_string_with_buf( &atom->addr, stat->addr, sizeof( stat->addr ) );
2703        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
2704                   sizeof( stat->client ) );
2705        stat->port                = ntohs( peer->atom->port );
2706        stat->from                = atom->fromFirst;
2707        stat->progress            = peer->progress;
2708        stat->isUTP               = peer->io->utp_socket != NULL;
2709        stat->isEncrypted         = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
2710        stat->rateToPeer_KBps     = toSpeedKBps( tr_peerGetPieceSpeed_Bps( peer, now_msec, TR_CLIENT_TO_PEER ) );
2711        stat->rateToClient_KBps   = toSpeedKBps( tr_peerGetPieceSpeed_Bps( peer, now_msec, TR_PEER_TO_CLIENT ) );
2712        stat->peerIsChoked        = peer->peerIsChoked;
2713        stat->peerIsInterested    = peer->peerIsInterested;
2714        stat->clientIsChoked      = peer->clientIsChoked;
2715        stat->clientIsInterested  = peer->clientIsInterested;
2716        stat->isIncoming          = tr_peerIoIsIncoming( peer->io );
2717        stat->isDownloadingFrom   = clientIsDownloadingFrom( tor, peer );
2718        stat->isUploadingTo       = clientIsUploadingTo( peer );
2719        stat->isSeed              = peerIsSeed( peer );
2720
2721        stat->blocksToPeer        = tr_historyGet( &peer->blocksSentToPeer,    now, CANCEL_HISTORY_SEC );
2722        stat->blocksToClient      = tr_historyGet( &peer->blocksSentToClient,  now, CANCEL_HISTORY_SEC );
2723        stat->cancelsToPeer       = tr_historyGet( &peer->cancelsSentToPeer,   now, CANCEL_HISTORY_SEC );
2724        stat->cancelsToClient     = tr_historyGet( &peer->cancelsSentToClient, now, CANCEL_HISTORY_SEC );
2725
2726        stat->pendingReqsToPeer   = peer->pendingReqsToPeer;
2727        stat->pendingReqsToClient = peer->pendingReqsToClient;
2728
2729        pch = stat->flagStr;
2730        if( stat->isUTP ) *pch++ = 'T';
2731        if( t->optimistic == peer ) *pch++ = 'O';
2732        if( stat->isDownloadingFrom ) *pch++ = 'D';
2733        else if( stat->clientIsInterested ) *pch++ = 'd';
2734        if( stat->isUploadingTo ) *pch++ = 'U';
2735        else if( stat->peerIsInterested ) *pch++ = 'u';
2736        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
2737        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
2738        if( stat->isEncrypted ) *pch++ = 'E';
2739        if( stat->from == TR_PEER_FROM_DHT ) *pch++ = 'H';
2740        else if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
2741        if( stat->isIncoming ) *pch++ = 'I';
2742        *pch = '\0';
2743    }
2744
2745    *setmeCount = size;
2746
2747    return ret;
2748}
2749
2750/***
2751****
2752****
2753***/
2754
2755void
2756tr_peerMgrClearInterest( tr_torrent * tor )
2757{
2758    int i;
2759    Torrent * t = tor->torrentPeers;
2760    const int peerCount = tr_ptrArraySize( &t->peers );
2761
2762    assert( tr_isTorrent( tor ) );
2763    assert( tr_torrentIsLocked( tor ) );
2764
2765    for( i=0; i<peerCount; ++i )
2766    {
2767        const tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2768        tr_peerMsgsSetInterested( peer->msgs, false );
2769    }
2770}
2771
2772/* does this peer have any pieces that we want? */
2773static bool
2774isPeerInteresting( const tr_torrent  * const tor,
2775                   const tr_bitfield * const interesting_pieces,
2776                   const tr_peer     * const peer )
2777{
2778    tr_piece_index_t i, n;
2779
2780    /* these cases should have already been handled by the calling code... */
2781    assert( !tr_torrentIsSeed( tor ) );
2782    assert( tr_torrentIsPieceTransferAllowed( tor, TR_PEER_TO_CLIENT ) );
2783
2784    if( peerIsSeed( peer ) )
2785        return true;
2786
2787    for( i=0, n=tor->info.pieceCount; i<n; ++i )
2788        if( tr_bitfieldHas( interesting_pieces, i ) && tr_bitfieldHas( &peer->have, i ) )
2789            return true;
2790
2791    return false;
2792}
2793
2794typedef enum
2795{
2796    RECHOKE_STATE_GOOD,
2797    RECHOKE_STATE_UNTESTED,
2798    RECHOKE_STATE_BAD
2799}
2800tr_rechoke_state;
2801
2802struct tr_rechoke_info
2803{
2804    tr_peer * peer;
2805    int salt;
2806    int rechoke_state;
2807};
2808
2809static int
2810compare_rechoke_info( const void * va, const void * vb )
2811{
2812    const struct tr_rechoke_info * a = va;
2813    const struct tr_rechoke_info * b = vb;
2814
2815    if( a->rechoke_state != b->rechoke_state )
2816        return a->rechoke_state - b->rechoke_state;
2817
2818    return a->salt - b->salt;
2819}
2820
2821/* determines who we send "interested" messages to */
2822static void
2823rechokeDownloads( Torrent * t )
2824{
2825    int i;
2826    int maxPeers = 0;
2827    int rechoke_count = 0;
2828    struct tr_rechoke_info * rechoke = NULL;
2829    const int MIN_INTERESTING_PEERS = 5;
2830    const int peerCount = tr_ptrArraySize( &t->peers );
2831    const time_t now = tr_time( );
2832
2833    /* some cases where this function isn't necessary */
2834    if( tr_torrentIsSeed( t->tor ) )
2835        return;
2836    if ( !tr_torrentIsPieceTransferAllowed( t->tor, TR_PEER_TO_CLIENT ) )
2837        return;
2838
2839    /* decide HOW MANY peers to be interested in */
2840    {
2841        int blocks = 0;
2842        int cancels = 0;
2843        time_t timeSinceCancel;
2844
2845        /* Count up how many blocks & cancels each peer has.
2846         *
2847         * There are two situations where we send out cancels --
2848         *
2849         * 1. We've got unresponsive peers, which is handled by deciding
2850         *    -which- peers to be interested in.
2851         *
2852         * 2. We've hit our bandwidth cap, which is handled by deciding
2853         *    -how many- peers to be interested in.
2854         *
2855         * We're working on 2. here, so we need to ignore unresponsive
2856         * peers in our calculations lest they confuse Transmission into
2857         * thinking it's hit its bandwidth cap.
2858         */
2859        for( i=0; i<peerCount; ++i )
2860        {
2861            const tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2862            const int b = tr_historyGet( &peer->blocksSentToClient, now, CANCEL_HISTORY_SEC );
2863            const int c = tr_historyGet( &peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC );
2864
2865            if( b == 0 ) /* ignore unresponsive peers, as described above */
2866                continue;
2867
2868            blocks += b;
2869            cancels += c;
2870        }
2871
2872        if( cancels > 0 )
2873        {
2874            /* cancelRate: of the block requests we've recently made, the percentage we cancelled.
2875             * higher values indicate more congestion. */
2876            const double cancelRate = cancels / (double)(cancels + blocks);
2877            const double mult = 1 - MIN( cancelRate, 0.5 );
2878            maxPeers = t->interestedCount * mult;
2879            tordbg( t, "cancel rate is %.3f -- reducing the "
2880                       "number of peers we're interested in by %.0f percent",
2881                       cancelRate, mult * 100 );
2882            t->lastCancel = now;
2883        }
2884
2885        timeSinceCancel = now - t->lastCancel;
2886        if( timeSinceCancel )
2887        {
2888            const int maxIncrease = 15;
2889            const time_t maxHistory = 2 * CANCEL_HISTORY_SEC;
2890            const double mult = MIN( timeSinceCancel, maxHistory ) / (double) maxHistory;
2891            const int inc = maxIncrease * mult;
2892            maxPeers = t->maxPeers + inc;
2893            tordbg( t, "time since last cancel is %li -- increasing the "
2894                       "number of peers we're interested in by %d",
2895                       timeSinceCancel, inc );
2896        }
2897    }
2898
2899    /* don't let the previous section's number tweaking go too far... */
2900    if( maxPeers < MIN_INTERESTING_PEERS )
2901        maxPeers = MIN_INTERESTING_PEERS;
2902    if( maxPeers > t->tor->maxConnectedPeers )
2903        maxPeers = t->tor->maxConnectedPeers;
2904
2905    t->maxPeers = maxPeers;
2906
2907    if( peerCount > 0 )
2908    {
2909        const tr_torrent * const tor = t->tor;
2910        const int n = tor->info.pieceCount;
2911        tr_bitfield interesting_pieces = TR_BITFIELD_INIT;
2912
2913        /* build a bitfield of interesting pieces... */
2914        tr_bitfieldConstruct( &interesting_pieces, n );
2915        for( i=0; i<n; i++ )
2916            if( !tor->info.pieces[i].dnd && !tr_cpPieceIsComplete( &tor->completion, i ) )
2917                tr_bitfieldAdd( &interesting_pieces, i );
2918
2919        /* decide WHICH peers to be interested in (based on their cancel-to-block ratio) */
2920        for( i=0; i<peerCount; ++i )
2921        {
2922            tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2923
2924            if( !isPeerInteresting( t->tor, &interesting_pieces, peer ) )
2925            {
2926                tr_peerMsgsSetInterested( peer->msgs, false );
2927            }
2928            else
2929            {
2930                tr_rechoke_state rechoke_state;
2931                const int blocks = tr_historyGet( &peer->blocksSentToClient, now, CANCEL_HISTORY_SEC );
2932                const int cancels = tr_historyGet( &peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC );
2933
2934                if( !blocks && !cancels )
2935                    rechoke_state = RECHOKE_STATE_UNTESTED;
2936                else if( !cancels )
2937                    rechoke_state = RECHOKE_STATE_GOOD;
2938                else if( !blocks )
2939                    rechoke_state = RECHOKE_STATE_BAD;
2940                else if( ( cancels * 10 ) < blocks )
2941                    rechoke_state = RECHOKE_STATE_GOOD;
2942                else
2943                    rechoke_state = RECHOKE_STATE_BAD;
2944
2945                if( rechoke == NULL )
2946                    rechoke = tr_new( struct tr_rechoke_info, peerCount );
2947
2948                 rechoke[rechoke_count].peer = peer;
2949                 rechoke[rechoke_count].rechoke_state = rechoke_state;
2950                 rechoke[rechoke_count].salt = tr_cryptoWeakRandInt( INT_MAX );
2951                 rechoke_count++;
2952            }
2953
2954        }
2955
2956        tr_bitfieldDestruct( &interesting_pieces );
2957    }
2958
2959    /* now that we know which & how many peers to be interested in... update the peer interest */
2960    qsort( rechoke, rechoke_count, sizeof( struct tr_rechoke_info ), compare_rechoke_info );
2961    t->interestedCount = MIN( maxPeers, rechoke_count );
2962    for( i=0; i<rechoke_count; ++i )
2963        tr_peerMsgsSetInterested( rechoke[i].peer->msgs, i<t->interestedCount );
2964
2965    /* cleanup */
2966    tr_free( rechoke );
2967}
2968
2969/**
2970***
2971**/
2972
2973struct ChokeData
2974{
2975    bool            isInterested;
2976    bool            wasChoked;
2977    bool            isChoked;
2978    int             rate;
2979    int             salt;
2980    tr_peer *       peer;
2981};
2982
2983static int
2984compareChoke( const void * va, const void * vb )
2985{
2986    const struct ChokeData * a = va;
2987    const struct ChokeData * b = vb;
2988
2989    if( a->rate != b->rate ) /* prefer higher overall speeds */
2990        return a->rate > b->rate ? -1 : 1;
2991
2992    if( a->wasChoked != b->wasChoked ) /* prefer unchoked */
2993        return a->wasChoked ? 1 : -1;
2994
2995    if( a->salt != b->salt ) /* random order */
2996        return a->salt - b->salt;
2997
2998    return 0;
2999}
3000
3001/* is this a new connection? */
3002static int
3003isNew( const tr_peer * peer )
3004{
3005    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
3006}
3007
3008/* get a rate for deciding which peers to choke and unchoke. */
3009static int
3010getRate( const tr_torrent * tor, struct peer_atom * atom, uint64_t now )
3011{
3012    int Bps;
3013
3014    if( tr_torrentIsSeed( tor ) )
3015        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_CLIENT_TO_PEER );
3016
3017    /* downloading a private torrent... take upload speed into account
3018     * because there may only be a small window of opportunity to share */
3019    else if( tr_torrentIsPrivate( tor ) )
3020        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_PEER_TO_CLIENT )
3021            + tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_CLIENT_TO_PEER );
3022
3023    /* downloading a public torrent */
3024    else
3025        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_PEER_TO_CLIENT );
3026
3027    /* convert it to bytes per second */
3028    return Bps;
3029}
3030
3031static inline bool
3032isBandwidthMaxedOut( const tr_bandwidth * b,
3033                     const uint64_t now_msec, tr_direction dir )
3034{
3035    if( !tr_bandwidthIsLimited( b, dir ) )
3036        return false;
3037    else {
3038        const int got = tr_bandwidthGetPieceSpeed_Bps( b, now_msec, dir );
3039        const int want = tr_bandwidthGetDesiredSpeed_Bps( b, dir );
3040        return got >= want;
3041    }
3042}
3043
3044static void
3045rechokeUploads( Torrent * t, const uint64_t now )
3046{
3047    int i, size, unchokedInterested;
3048    const int peerCount = tr_ptrArraySize( &t->peers );
3049    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
3050    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
3051    const tr_session * session = t->manager->session;
3052    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
3053    const bool isMaxedOut = isBandwidthMaxedOut( &t->tor->bandwidth, now, TR_UP );
3054
3055    assert( torrentIsLocked( t ) );
3056
3057    /* an optimistic unchoke peer's "optimistic"
3058     * state lasts for N calls to rechokeUploads(). */
3059    if( t->optimisticUnchokeTimeScaler > 0 )
3060        t->optimisticUnchokeTimeScaler--;
3061    else
3062        t->optimistic = NULL;
3063
3064    /* sort the peers by preference and rate */
3065    for( i = 0, size = 0; i < peerCount; ++i )
3066    {
3067        tr_peer * peer = peers[i];
3068        struct peer_atom * atom = peer->atom;
3069
3070        if( peerIsSeed( peer ) ) /* choke seeds and partial seeds */
3071        {
3072            tr_peerMsgsSetChoke( peer->msgs, true );
3073        }
3074        else if( chokeAll ) /* choke everyone if we're not uploading */
3075        {
3076            tr_peerMsgsSetChoke( peer->msgs, true );
3077        }
3078        else if( peer != t->optimistic )
3079        {
3080            struct ChokeData * n = &choke[size++];
3081            n->peer         = peer;
3082            n->isInterested = peer->peerIsInterested;
3083            n->wasChoked    = peer->peerIsChoked;
3084            n->rate         = getRate( t->tor, atom, now );
3085            n->salt         = tr_cryptoWeakRandInt( INT_MAX );
3086            n->isChoked     = true;
3087        }
3088    }
3089
3090    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
3091
3092    /**
3093     * Reciprocation and number of uploads capping is managed by unchoking
3094     * the N peers which have the best upload rate and are interested.
3095     * This maximizes the client's download rate. These N peers are
3096     * referred to as downloaders, because they are interested in downloading
3097     * from the client.
3098     *
3099     * Peers which have a better upload rate (as compared to the downloaders)
3100     * but aren't interested get unchoked. If they become interested, the
3101     * downloader with the worst upload rate gets choked. If a client has
3102     * a complete file, it uses its upload rate rather than its download
3103     * rate to decide which peers to unchoke.
3104     *
3105     * If our bandwidth is maxed out, don't unchoke any more peers.
3106     */
3107    unchokedInterested = 0;
3108    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
3109        choke[i].isChoked = isMaxedOut ? choke[i].wasChoked : false;
3110        if( choke[i].isInterested )
3111            ++unchokedInterested;
3112    }
3113
3114    /* optimistic unchoke */
3115    if( !t->optimistic && !isMaxedOut && (i<size) )
3116    {
3117        int n;
3118        struct ChokeData * c;
3119        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
3120
3121        for( ; i<size; ++i )
3122        {
3123            if( choke[i].isInterested )
3124            {
3125                const tr_peer * peer = choke[i].peer;
3126                int x = 1, y;
3127                if( isNew( peer ) ) x *= 3;
3128                for( y=0; y<x; ++y )
3129                    tr_ptrArrayAppend( &randPool, &choke[i] );
3130            }
3131        }
3132
3133        if(( n = tr_ptrArraySize( &randPool )))
3134        {
3135            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
3136            c->isChoked = false;
3137            t->optimistic = c->peer;
3138            t->optimisticUnchokeTimeScaler = OPTIMISTIC_UNCHOKE_MULTIPLIER;
3139        }
3140
3141        tr_ptrArrayDestruct( &randPool, NULL );
3142    }
3143
3144    for( i=0; i<size; ++i )
3145        tr_peerMsgsSetChoke( choke[i].peer->msgs, choke[i].isChoked );
3146
3147    /* cleanup */
3148    tr_free( choke );
3149}
3150
3151static void
3152rechokePulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3153{
3154    tr_torrent * tor = NULL;
3155    tr_peerMgr * mgr = vmgr;
3156    const uint64_t now = tr_time_msec( );
3157
3158    managerLock( mgr );
3159
3160    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3161        if( tor->isRunning ) {
3162            Torrent * t = tor->torrentPeers;
3163            if( !tr_ptrArrayEmpty( &t->peers ) ) {
3164                rechokeUploads( t, now );
3165                rechokeDownloads( t );
3166            }
3167        }
3168    }
3169
3170    tr_timerAddMsec( mgr->rechokeTimer, RECHOKE_PERIOD_MSEC );
3171    managerUnlock( mgr );
3172}
3173
3174/***
3175****
3176****  Life and Death
3177****
3178***/
3179
3180static bool
3181shouldPeerBeClosed( const Torrent    * t,
3182                    const tr_peer    * peer,
3183                    int                peerCount,
3184                    const time_t       now )
3185{
3186    const tr_torrent *       tor = t->tor;
3187    const struct peer_atom * atom = peer->atom;
3188
3189    /* if it's marked for purging, close it */
3190    if( peer->doPurge )
3191    {
3192        tordbg( t, "purging peer %s because its doPurge flag is set",
3193                tr_atomAddrStr( atom ) );
3194        return true;
3195    }
3196
3197    /* disconnect if we're both seeds and enough time has passed for PEX */
3198    if( tr_torrentIsSeed( tor ) && peerIsSeed( peer ) )
3199        return !tr_torrentAllowsPex(tor) || (now-atom->time>=30);
3200
3201    /* disconnect if it's been too long since piece data has been transferred.
3202     * this is on a sliding scale based on number of available peers... */
3203    {
3204        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
3205        /* if we have >= relaxIfFewerThan, strictness is 100%.
3206         * if we have zero connections, strictness is 0% */
3207        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
3208                               ? 1.0
3209                               : peerCount / (float)relaxStrictnessIfFewerThanN;
3210        const int lo = MIN_UPLOAD_IDLE_SECS;
3211        const int hi = MAX_UPLOAD_IDLE_SECS;
3212        const int limit = hi - ( ( hi - lo ) * strictness );
3213        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
3214/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
3215        if( idleTime > limit ) {
3216            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
3217                       tr_atomAddrStr( atom ), idleTime );
3218            return true;
3219        }
3220    }
3221
3222    return false;
3223}
3224
3225static tr_peer **
3226getPeersToClose( Torrent * t, const time_t now_sec, int * setmeSize )
3227{
3228    int i, peerCount, outsize;
3229    struct tr_peer ** ret = NULL;
3230    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
3231
3232    assert( torrentIsLocked( t ) );
3233
3234    for( i = outsize = 0; i < peerCount; ++i ) {
3235        if( shouldPeerBeClosed( t, peers[i], peerCount, now_sec ) ) {
3236            if( ret == NULL )
3237                ret = tr_new( tr_peer *, peerCount );
3238            ret[outsize++] = peers[i];
3239        }
3240    }
3241
3242    *setmeSize = outsize;
3243    return ret;
3244}
3245
3246static int
3247getReconnectIntervalSecs( const struct peer_atom * atom, const time_t now )
3248{
3249    int sec;
3250
3251    /* if we were recently connected to this peer and transferring piece
3252     * data, try to reconnect to them sooner rather that later -- we don't
3253     * want network troubles to get in the way of a good peer. */
3254    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
3255        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3256
3257    /* don't allow reconnects more often than our minimum */
3258    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
3259        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3260
3261    /* otherwise, the interval depends on how many times we've tried
3262     * and failed to connect to the peer */
3263    else switch( atom->numFails ) {
3264        case 0: sec = 0; break;
3265        case 1: sec = 5; break;
3266        case 2: sec = 2 * 60; break;
3267        case 3: sec = 15 * 60; break;
3268        case 4: sec = 30 * 60; break;
3269        case 5: sec = 60 * 60; break;
3270        default: sec = 120 * 60; break;
3271    }
3272
3273    /* penalize peers that were unreachable the last time we tried */
3274    if( atom->flags2 & MYFLAG_UNREACHABLE )
3275        sec += sec;
3276
3277    dbgmsg( "reconnect interval for %s is %d seconds", tr_atomAddrStr( atom ), sec );
3278    return sec;
3279}
3280
3281static void
3282removePeer( Torrent * t, tr_peer * peer )
3283{
3284    tr_peer * removed;
3285    struct peer_atom * atom = peer->atom;
3286
3287    assert( torrentIsLocked( t ) );
3288    assert( atom );
3289
3290    atom->time = tr_time( );
3291
3292    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
3293
3294    if( replicationExists( t ) )
3295        tr_decrReplicationFromBitfield( t, &peer->have );
3296
3297    assert( removed == peer );
3298    peerDelete( t, removed );
3299}
3300
3301static void
3302closePeer( Torrent * t, tr_peer * peer )
3303{
3304    struct peer_atom * atom;
3305
3306    assert( t != NULL );
3307    assert( peer != NULL );
3308
3309    atom = peer->atom;
3310
3311    /* if we transferred piece data, then they might be good peers,
3312       so reset their `numFails' weight to zero. otherwise we connected
3313       to them fruitlessly, so mark it as another fail */
3314    if( atom->piece_data_time ) {
3315        tordbg( t, "resetting atom %s numFails to 0", tr_atomAddrStr(atom) );
3316        atom->numFails = 0;
3317    } else {
3318        ++atom->numFails;
3319        tordbg( t, "incremented atom %s numFails to %d", tr_atomAddrStr(atom), (int)atom->numFails );
3320    }
3321
3322    tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
3323    removePeer( t, peer );
3324}
3325
3326static void
3327removeAllPeers( Torrent * t )
3328{
3329    while( !tr_ptrArrayEmpty( &t->peers ) )
3330        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
3331}
3332
3333static void
3334closeBadPeers( Torrent * t, const time_t now_sec )
3335{
3336    if( !tr_ptrArrayEmpty( &t->peers ) )
3337    {
3338        int i;
3339        int peerCount;
3340        struct tr_peer ** peers = getPeersToClose( t, now_sec, &peerCount );
3341        for( i=0; i<peerCount; ++i )
3342            closePeer( t, peers[i] );
3343        tr_free( peers );
3344    }
3345}
3346
3347struct peer_liveliness
3348{
3349    tr_peer * peer;
3350    void * clientData;
3351    time_t pieceDataTime;
3352    time_t time;
3353    int speed;
3354    bool doPurge;
3355};
3356
3357static int
3358comparePeerLiveliness( const void * va, const void * vb )
3359{
3360    const struct peer_liveliness * a = va;
3361    const struct peer_liveliness * b = vb;
3362
3363    if( a->doPurge != b->doPurge )
3364        return a->doPurge ? 1 : -1;
3365
3366    if( a->speed != b->speed ) /* faster goes first */
3367        return a->speed > b->speed ? -1 : 1;
3368
3369    /* the one to give us data more recently goes first */
3370    if( a->pieceDataTime != b->pieceDataTime )
3371        return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
3372
3373    /* the one we connected to most recently goes first */
3374    if( a->time != b->time )
3375        return a->time > b->time ? -1 : 1;
3376
3377    return 0;
3378}
3379
3380static void
3381sortPeersByLivelinessImpl( tr_peer  ** peers,
3382                           void     ** clientData,
3383                           int         n,
3384                           uint64_t    now,
3385                           int (*compare) ( const void *va, const void *vb ) )
3386{
3387    int i;
3388    struct peer_liveliness *lives, *l;
3389
3390    /* build a sortable array of peer + extra info */
3391    lives = l = tr_new0( struct peer_liveliness, n );
3392    for( i=0; i<n; ++i, ++l )
3393    {
3394        tr_peer * p = peers[i];
3395        l->peer = p;
3396        l->doPurge = p->doPurge;
3397        l->pieceDataTime = p->atom->piece_data_time;
3398        l->time = p->atom->time;
3399        l->speed = tr_peerGetPieceSpeed_Bps( p, now, TR_UP )
3400                 + tr_peerGetPieceSpeed_Bps( p, now, TR_DOWN );
3401        if( clientData )
3402            l->clientData = clientData[i];
3403    }
3404
3405    /* sort 'em */
3406    assert( n == ( l - lives ) );
3407    qsort( lives, n, sizeof( struct peer_liveliness ), compare );
3408
3409    /* build the peer array */
3410    for( i=0, l=lives; i<n; ++i, ++l ) {
3411        peers[i] = l->peer;
3412        if( clientData )
3413            clientData[i] = l->clientData;
3414    }
3415    assert( n == ( l - lives ) );
3416
3417    /* cleanup */
3418    tr_free( lives );
3419}
3420
3421static void
3422sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now )
3423{
3424    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLiveliness );
3425}
3426
3427
3428static void
3429enforceTorrentPeerLimit( Torrent * t, uint64_t now )
3430{
3431    int n = tr_ptrArraySize( &t->peers );
3432    const int max = tr_torrentGetPeerLimit( t->tor );
3433    if( n > max )
3434    {
3435        void * base = tr_ptrArrayBase( &t->peers );
3436        tr_peer ** peers = tr_memdup( base, n*sizeof( tr_peer* ) );
3437        sortPeersByLiveliness( peers, NULL, n, now );
3438        while( n > max )
3439            closePeer( t, peers[--n] );
3440        tr_free( peers );
3441    }
3442}
3443
3444static void
3445enforceSessionPeerLimit( tr_session * session, uint64_t now )
3446{
3447    int n = 0;
3448    tr_torrent * tor = NULL;
3449    const int max = tr_sessionGetPeerLimit( session );
3450
3451    /* count the total number of peers */
3452    while(( tor = tr_torrentNext( session, tor )))
3453        n += tr_ptrArraySize( &tor->torrentPeers->peers );
3454
3455    /* if there are too many, prune out the worst */
3456    if( n > max )
3457    {
3458        tr_peer ** peers = tr_new( tr_peer*, n );
3459        Torrent ** torrents = tr_new( Torrent*, n );
3460
3461        /* populate the peer array */
3462        n = 0;
3463        tor = NULL;
3464        while(( tor = tr_torrentNext( session, tor ))) {
3465            int i;
3466            Torrent * t = tor->torrentPeers;
3467            const int tn = tr_ptrArraySize( &t->peers );
3468            for( i=0; i<tn; ++i, ++n ) {
3469                peers[n] = tr_ptrArrayNth( &t->peers, i );
3470                torrents[n] = t;
3471            }
3472        }
3473
3474        /* sort 'em */
3475        sortPeersByLiveliness( peers, (void**)torrents, n, now );
3476
3477        /* cull out the crappiest */
3478        while( n-- > max )
3479            closePeer( torrents[n], peers[n] );
3480
3481        /* cleanup */
3482        tr_free( torrents );
3483        tr_free( peers );
3484    }
3485}
3486
3487static void makeNewPeerConnections( tr_peerMgr * mgr, const int max );
3488
3489static void
3490reconnectPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3491{
3492    tr_torrent * tor;
3493    tr_peerMgr * mgr = vmgr;
3494    const time_t now_sec = tr_time( );
3495    const uint64_t now_msec = tr_time_msec( );
3496
3497    /**
3498    ***  enforce the per-session and per-torrent peer limits
3499    **/
3500
3501    /* if we're over the per-torrent peer limits, cull some peers */
3502    tor = NULL;
3503    while(( tor = tr_torrentNext( mgr->session, tor )))
3504        if( tor->isRunning )
3505            enforceTorrentPeerLimit( tor->torrentPeers, now_msec );
3506
3507    /* if we're over the per-session peer limits, cull some peers */
3508    enforceSessionPeerLimit( mgr->session, now_msec );
3509
3510    /* remove crappy peers */
3511    tor = NULL;
3512    while(( tor = tr_torrentNext( mgr->session, tor )))
3513        if( !tor->torrentPeers->isRunning )
3514            removeAllPeers( tor->torrentPeers );
3515        else
3516            closeBadPeers( tor->torrentPeers, now_sec );
3517
3518    /* try to make new peer connections */
3519    makeNewPeerConnections( mgr, MAX_CONNECTIONS_PER_PULSE );
3520}
3521
3522/****
3523*****
3524*****  BANDWIDTH ALLOCATION
3525*****
3526****/
3527
3528static void
3529pumpAllPeers( tr_peerMgr * mgr )
3530{
3531    tr_torrent * tor = NULL;
3532
3533    while(( tor = tr_torrentNext( mgr->session, tor )))
3534    {
3535        int j;
3536        Torrent * t = tor->torrentPeers;
3537
3538        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
3539        {
3540            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
3541            tr_peerMsgsPulse( peer->msgs );
3542        }
3543    }
3544}
3545
3546static void
3547bandwidthPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3548{
3549    tr_torrent * tor;
3550    tr_peerMgr * mgr = vmgr;
3551    managerLock( mgr );
3552
3553    /* FIXME: this next line probably isn't necessary... */
3554    pumpAllPeers( mgr );
3555
3556    /* allocate bandwidth to the peers */
3557    tr_bandwidthAllocate( &mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
3558    tr_bandwidthAllocate( &mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
3559
3560    /* torrent upkeep */
3561    tor = NULL;
3562    while(( tor = tr_torrentNext( mgr->session, tor )))
3563    {
3564        /* possibly stop torrents that have seeded enough */
3565        tr_torrentCheckSeedLimit( tor );
3566
3567        /* run the completeness check for any torrents that need it */
3568        if( tor->torrentPeers->needsCompletenessCheck ) {
3569            tor->torrentPeers->needsCompletenessCheck  = false;
3570            tr_torrentRecheckCompleteness( tor );
3571        }
3572
3573        /* stop torrents that are ready to stop, but couldn't be stopped
3574           earlier during the peer-io callback call chain */
3575        if( tor->isStopping )
3576            tr_torrentStop( tor );
3577    }
3578
3579    reconnectPulse( 0, 0, mgr );
3580
3581    tr_timerAddMsec( mgr->bandwidthTimer, BANDWIDTH_PERIOD_MSEC );
3582    managerUnlock( mgr );
3583}
3584
3585/***
3586****
3587***/
3588
3589static int
3590compareAtomPtrsByAddress( const void * va, const void *vb )
3591{
3592    const struct peer_atom * a = * (const struct peer_atom**) va;
3593    const struct peer_atom * b = * (const struct peer_atom**) vb;
3594
3595    assert( tr_isAtom( a ) );
3596    assert( tr_isAtom( b ) );
3597
3598    return tr_address_compare( &a->addr, &b->addr );
3599}
3600
3601/* best come first, worst go last */
3602static int
3603compareAtomPtrsByShelfDate( const void * va, const void *vb )
3604{
3605    time_t atime;
3606    time_t btime;
3607    const struct peer_atom * a = * (const struct peer_atom**) va;
3608    const struct peer_atom * b = * (const struct peer_atom**) vb;
3609    const int data_time_cutoff_secs = 60 * 60;
3610    const time_t tr_now = tr_time( );
3611
3612    assert( tr_isAtom( a ) );
3613    assert( tr_isAtom( b ) );
3614
3615    /* primary key: the last piece data time *if* it was within the last hour */
3616    atime = a->piece_data_time; if( atime + data_time_cutoff_secs < tr_now ) atime = 0;
3617    btime = b->piece_data_time; if( btime + data_time_cutoff_secs < tr_now ) btime = 0;
3618    if( atime != btime )
3619        return atime > btime ? -1 : 1;
3620
3621    /* secondary key: shelf date. */
3622    if( a->shelf_date != b->shelf_date )
3623        return a->shelf_date > b->shelf_date ? -1 : 1;
3624
3625    return 0;
3626}
3627
3628static int
3629getMaxAtomCount( const tr_torrent * tor )
3630{
3631    const int n = tor->maxConnectedPeers;
3632    /* approximate fit of the old jump discontinuous function */
3633    if( n >= 55 ) return     n + 150;
3634    if( n >= 20 ) return 2 * n + 95;
3635    return               4 * n + 55;
3636}
3637
3638static void
3639atomPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3640{
3641    tr_torrent * tor = NULL;
3642    tr_peerMgr * mgr = vmgr;
3643    managerLock( mgr );
3644
3645    while(( tor = tr_torrentNext( mgr->session, tor )))
3646    {
3647        int atomCount;
3648        Torrent * t = tor->torrentPeers;
3649        const int maxAtomCount = getMaxAtomCount( tor );
3650        struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
3651
3652        if( atomCount > maxAtomCount ) /* we've got too many atoms... time to prune */
3653        {
3654            int i;
3655            int keepCount = 0;
3656            int testCount = 0;
3657            struct peer_atom ** keep = tr_new( struct peer_atom*, atomCount );
3658            struct peer_atom ** test = tr_new( struct peer_atom*, atomCount );
3659
3660            /* keep the ones that are in use */
3661            for( i=0; i<atomCount; ++i ) {
3662                struct peer_atom * atom = atoms[i];
3663                if( peerIsInUse( t, atom ) )
3664                    keep[keepCount++] = atom;
3665                else
3666                    test[testCount++] = atom;
3667            }
3668
3669            /* if there's room, keep the best of what's left */
3670            i = 0;
3671            if( keepCount < maxAtomCount ) {
3672                qsort( test, testCount, sizeof( struct peer_atom * ), compareAtomPtrsByShelfDate );
3673                while( i<testCount && keepCount<maxAtomCount )
3674                    keep[keepCount++] = test[i++];
3675            }
3676
3677            /* free the culled atoms */
3678            while( i<testCount )
3679                tr_free( test[i++] );
3680
3681            /* rebuild Torrent.pool with what's left */
3682            tr_ptrArrayDestruct( &t->pool, NULL );
3683            t->pool = TR_PTR_ARRAY_INIT;
3684            qsort( keep, keepCount, sizeof( struct peer_atom * ), compareAtomPtrsByAddress );
3685            for( i=0; i<keepCount; ++i )
3686                tr_ptrArrayAppend( &t->pool, keep[i] );
3687
3688            tordbg( t, "max atom count is %d... pruned from %d to %d\n", maxAtomCount, atomCount, keepCount );
3689
3690            /* cleanup */
3691            tr_free( test );
3692            tr_free( keep );
3693        }
3694    }
3695
3696    tr_timerAddMsec( mgr->atomTimer, ATOM_PERIOD_MSEC );
3697    managerUnlock( mgr );
3698}
3699
3700/***
3701****
3702****
3703****
3704***/
3705
3706/* is this atom someone that we'd want to initiate a connection to? */
3707static bool
3708isPeerCandidate( const tr_torrent * tor, struct peer_atom * atom, const time_t now )
3709{
3710    /* not if we're both seeds */
3711    if( tr_torrentIsSeed( tor ) && atomIsSeed( atom ) )
3712        return false;
3713
3714    /* not if we've already got a connection to them... */
3715    if( peerIsInUse( tor->torrentPeers, atom ) )
3716        return false;
3717
3718    /* not if we just tried them already */
3719    if( ( now - atom->time ) < getReconnectIntervalSecs( atom, now ) )
3720        return false;
3721
3722    /* not if they're blocklisted */
3723    if( isAtomBlocklisted( tor->session, atom ) )
3724        return false;
3725
3726    /* not if they're banned... */
3727    if( atom->flags2 & MYFLAG_BANNED )
3728        return false;
3729
3730    return true;
3731}
3732
3733struct peer_candidate
3734{
3735    uint64_t score;
3736    tr_torrent * tor;
3737    struct peer_atom * atom;
3738};
3739
3740static bool
3741torrentWasRecentlyStarted( const tr_torrent * tor )
3742{
3743    return difftime( tr_time( ), tor->startDate ) < 120;
3744}
3745
3746static inline uint64_t
3747addValToKey( uint64_t value, int width, uint64_t addme )
3748{
3749    value = (value << (uint64_t)width);
3750    value |= addme;
3751    return value;
3752}
3753
3754/* smaller value is better */
3755static uint64_t
3756getPeerCandidateScore( const tr_torrent * tor, const struct peer_atom * atom, uint8_t salt  )
3757{
3758    uint64_t i;
3759    uint64_t score = 0;
3760    const bool failed = atom->lastConnectionAt < atom->lastConnectionAttemptAt;
3761
3762    /* prefer peers we've connected to, or never tried, over peers we failed to connect to. */
3763    i = failed ? 1 : 0;
3764    score = addValToKey( score, 1, i );
3765
3766    /* prefer the one we attempted least recently (to cycle through all peers) */
3767    i = atom->lastConnectionAttemptAt;
3768    score = addValToKey( score, 32, i );
3769
3770    /* prefer peers belonging to a torrent of a higher priority */
3771    switch( tr_torrentGetPriority( tor ) ) {
3772        case TR_PRI_HIGH:    i = 0; break;
3773        case TR_PRI_NORMAL:  i = 1; break;
3774        case TR_PRI_LOW:     i = 2; break;
3775    }
3776    score = addValToKey( score, 4, i );
3777
3778    /* prefer recently-started torrents */
3779    i = torrentWasRecentlyStarted( tor ) ? 0 : 1;
3780    score = addValToKey( score, 1, i );
3781
3782    /* prefer torrents we're downloading with */
3783    i = tr_torrentIsSeed( tor ) ? 1 : 0;
3784    score = addValToKey( score, 1, i );
3785
3786    /* prefer peers that are known to be connectible */
3787    i = ( atom->flags & ADDED_F_CONNECTABLE ) ? 0 : 1;
3788    score = addValToKey( score, 1, i );
3789
3790    /* prefer peers that we might have a chance of uploading to...
3791       so lower seed probability is better */
3792    if( atom->seedProbability == 100 ) i = 101;
3793    else if( atom->seedProbability == -1 ) i = 100;
3794    else i = atom->seedProbability;
3795    score = addValToKey( score, 8, i );
3796
3797    /* Prefer peers that we got from more trusted sources.
3798     * lower `fromBest' values indicate more trusted sources */
3799    score = addValToKey( score, 4, atom->fromBest );
3800
3801    /* salt */
3802    score = addValToKey( score, 8, salt );
3803
3804    return score;
3805}
3806
3807/* sort an array of peer candidates */
3808static int
3809comparePeerCandidates( const void * va, const void * vb )
3810{
3811    const struct peer_candidate * a = va;
3812    const struct peer_candidate * b = vb;
3813
3814    if( a->score < b->score ) return -1;
3815    if( a->score > b->score ) return 1;
3816
3817    return 0;
3818}
3819
3820/** @return an array of all the atoms we might want to connect to */
3821static struct peer_candidate*
3822getPeerCandidates( tr_session * session, int * candidateCount )
3823{
3824    int atomCount;
3825    int peerCount;
3826    tr_torrent * tor;
3827    struct peer_candidate * candidates;
3828    struct peer_candidate * walk;
3829    const time_t now = tr_time( );
3830    const uint64_t now_msec = tr_time_msec( );
3831    /* leave 5% of connection slots for incoming connections -- ticket #2609 */
3832    const int maxCandidates = tr_sessionGetPeerLimit( session ) * 0.95;
3833
3834    /* count how many peers and atoms we've got */
3835    tor= NULL;
3836    atomCount = 0;
3837    peerCount = 0;
3838    while(( tor = tr_torrentNext( session, tor ))) {
3839        atomCount += tr_ptrArraySize( &tor->torrentPeers->pool );
3840        peerCount += tr_ptrArraySize( &tor->torrentPeers->peers );
3841    }
3842
3843    /* don't start any new handshakes if we're full up */
3844    if( maxCandidates <= peerCount ) {
3845        *candidateCount = 0;
3846        return NULL;
3847    }
3848
3849    /* allocate an array of candidates */
3850    walk = candidates = tr_new( struct peer_candidate, atomCount );
3851
3852    /* populate the candidate array */
3853    tor = NULL;
3854    while(( tor = tr_torrentNext( session, tor )))
3855    {
3856        int i, nAtoms;
3857        struct peer_atom ** atoms;
3858
3859        if( !tor->torrentPeers->isRunning )
3860            continue;
3861
3862        /* if we've already got enough peers in this torrent... */
3863        if( tr_torrentGetPeerLimit( tor ) <= tr_ptrArraySize( &tor->torrentPeers->peers ) )
3864            continue;
3865
3866        /* if we've already got enough speed in this torrent... */
3867        if( tr_torrentIsSeed( tor ) && isBandwidthMaxedOut( &tor->bandwidth, now_msec, TR_UP ) )
3868            continue;
3869
3870        atoms = (struct peer_atom**) tr_ptrArrayPeek( &tor->torrentPeers->pool, &nAtoms );
3871        for( i=0; i<nAtoms; ++i )
3872        {
3873            struct peer_atom * atom = atoms[i];
3874
3875            if( isPeerCandidate( tor, atom, now ) )
3876            {
3877                const uint8_t salt = tr_cryptoWeakRandInt( 1024 );
3878                walk->tor = tor;
3879                walk->atom = atom;
3880                walk->score = getPeerCandidateScore( tor, atom, salt );
3881                ++walk;
3882            }
3883        }
3884    }
3885
3886    *candidateCount = walk - candidates;
3887    if( *candidateCount > 1 )
3888        qsort( candidates, *candidateCount, sizeof( struct peer_candidate ), comparePeerCandidates );
3889    return candidates;
3890}
3891
3892static void
3893initiateConnection( tr_peerMgr * mgr, Torrent * t, struct peer_atom * atom )
3894{
3895    tr_peerIo * io;
3896    const time_t now = tr_time( );
3897    bool utp = tr_sessionIsUTPEnabled(mgr->session) && !atom->utp_failed;
3898
3899    if( atom->fromFirst == TR_PEER_FROM_PEX )
3900        /* PEX has explicit signalling for uTP support.  If an atom
3901           originally came from PEX and doesn't have the uTP flag, skip the
3902           uTP connection attempt.  Are we being optimistic here? */
3903        utp = utp && (atom->flags & ADDED_F_UTP_FLAGS);
3904
3905    tordbg( t, "Starting an OUTGOING%s connection with %s",
3906            utp ? " µTP" : "",
3907            tr_atomAddrStr( atom ) );
3908
3909    io = tr_peerIoNewOutgoing( mgr->session,
3910                               &mgr->session->bandwidth,
3911                               &atom->addr,
3912                               atom->port,
3913                               t->tor->info.hash,
3914                               t->tor->completeness == TR_SEED,
3915                               utp );
3916
3917    if( io == NULL )
3918    {
3919        tordbg( t, "peerIo not created; marking peer %s as unreachable",
3920                tr_atomAddrStr( atom ) );
3921        atom->flags2 |= MYFLAG_UNREACHABLE;
3922        atom->numFails++;
3923    }
3924    else
3925    {
3926        tr_handshake * handshake = tr_handshakeNew( io,
3927                                                    mgr->session->encryptionMode,
3928                                                    myHandshakeDoneCB,
3929                                                    mgr );
3930
3931        assert( tr_peerIoGetTorrentHash( io ) );
3932
3933        tr_peerIoUnref( io ); /* balanced by the initial ref
3934                                 in tr_peerIoNewOutgoing() */
3935
3936        tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
3937                                 handshakeCompare );
3938    }
3939
3940    atom->lastConnectionAttemptAt = now;
3941    atom->time = now;
3942}
3943
3944static void
3945initiateCandidateConnection( tr_peerMgr * mgr, struct peer_candidate * c )
3946{
3947#if 0
3948    fprintf( stderr, "Starting an OUTGOING connection with %s - [%s] seedProbability==%d; %s, %s\n",
3949             tr_atomAddrStr( c->atom ),
3950             tr_torrentName( c->tor ),
3951             (int)c->atom->seedProbability,
3952             tr_torrentIsPrivate( c->tor ) ? "private" : "public",
3953             tr_torrentIsSeed( c->tor ) ? "seed" : "downloader" );
3954#endif
3955
3956    initiateConnection( mgr, c->tor->torrentPeers, c->atom );
3957}
3958
3959static void
3960makeNewPeerConnections( struct tr_peerMgr * mgr, const int max )
3961{
3962    int i, n;
3963    struct peer_candidate * candidates;
3964
3965    candidates = getPeerCandidates( mgr->session, &n );
3966
3967    for( i=0; i<n && i<max; ++i )
3968        initiateCandidateConnection( mgr, &candidates[i] );
3969
3970    tr_free( candidates );
3971}
Note: See TracBrowser for help on using the repository browser.