source: trunk/libtransmission/peer-mgr.c @ 12228

Last change on this file since 12228 was 12228, checked in by jordan, 11 years ago

(trunk libT) still fiddling around with #includes -- this time removing unncecessary libT includes from libT .c files

  • Property svn:keywords set to Date Rev Author Id
File size: 113.4 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 12228 2011-03-25 01:41:57Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h> /* error codes ERANGE, ... */
15#include <limits.h> /* INT_MAX */
16#include <string.h> /* memcpy, memcmp, strstr */
17#include <stdlib.h> /* qsort */
18
19#include <event2/event.h>
20#include <libutp/utp.h>
21
22#include "transmission.h"
23#include "announcer.h"
24#include "bandwidth.h"
25#include "blocklist.h"
26#include "cache.h"
27#include "clients.h"
28#include "completion.h"
29#include "crypto.h"
30#include "handshake.h"
31#include "net.h"
32#include "peer-io.h"
33#include "peer-mgr.h"
34#include "peer-msgs.h"
35#include "ptrarray.h"
36#include "session.h"
37#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
38#include "torrent.h"
39#include "utils.h"
40#include "webseed.h"
41
42enum
43{
44    /* how frequently to cull old atoms */
45    ATOM_PERIOD_MSEC = ( 60 * 1000 ),
46
47    /* how frequently to change which peers are choked */
48    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
49
50    /* an optimistically unchoked peer is immune from rechoking
51       for this many calls to rechokeUploads(). */
52    OPTIMISTIC_UNCHOKE_MULTIPLIER = 4,
53
54    /* how frequently to reallocate bandwidth */
55    BANDWIDTH_PERIOD_MSEC = 500,
56
57    /* how frequently to age out old piece request lists */
58    REFILL_UPKEEP_PERIOD_MSEC = ( 10 * 1000 ),
59
60    /* how frequently to decide which peers live and die */
61    RECONNECT_PERIOD_MSEC = 500,
62
63    /* when many peers are available, keep idle ones this long */
64    MIN_UPLOAD_IDLE_SECS = ( 60 ),
65
66    /* when few peers are available, keep idle ones this long */
67    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
68
69    /* max number of peers to ask for per second overall.
70     * this throttle is to avoid overloading the router */
71    MAX_CONNECTIONS_PER_SECOND = 12,
72
73    MAX_CONNECTIONS_PER_PULSE = (int)(MAX_CONNECTIONS_PER_SECOND * (RECONNECT_PERIOD_MSEC/1000.0)),
74
75    /* number of bad pieces a peer is allowed to send before we ban them */
76    MAX_BAD_PIECES_PER_PEER = 5,
77
78    /* amount of time to keep a list of request pieces lying around
79       before it's considered too old and needs to be rebuilt */
80    PIECE_LIST_SHELF_LIFE_SECS = 60,
81
82    /* use for bitwise operations w/peer_atom.flags2 */
83    MYFLAG_BANNED = 1,
84
85    /* use for bitwise operations w/peer_atom.flags2 */
86    /* unreachable for now... but not banned.
87     * if they try to connect to us it's okay */
88    MYFLAG_UNREACHABLE = 2,
89
90    /* the minimum we'll wait before attempting to reconnect to a peer */
91    MINIMUM_RECONNECT_INTERVAL_SECS = 5,
92
93    /** how long we'll let requests we've made linger before we cancel them */
94    REQUEST_TTL_SECS = 120,
95
96    NO_BLOCKS_CANCEL_HISTORY = 120,
97
98    CANCEL_HISTORY_SEC = 60
99};
100
101const tr_peer_event TR_PEER_EVENT_INIT = { 0, 0, NULL, 0, 0, 0, false, 0 };
102
103/**
104***
105**/
106
107/**
108 * Peer information that should be kept even before we've connected and
109 * after we've disconnected. These are kept in a pool of peer_atoms to decide
110 * which ones would make good candidates for connecting to, and to watch out
111 * for banned peers.
112 *
113 * @see tr_peer
114 * @see tr_peermsgs
115 */
116struct peer_atom
117{
118    uint8_t     fromFirst;          /* where the peer was first found */
119    uint8_t     fromBest;           /* the "best" value of where the peer has been found */
120    uint8_t     flags;              /* these match the added_f flags */
121    uint8_t     flags2;             /* flags that aren't defined in added_f */
122    int8_t      seedProbability;    /* how likely is this to be a seed... [0..100] or -1 for unknown */
123    int8_t      blocklisted;        /* -1 for unknown, true for blocklisted, false for not blocklisted */
124
125    tr_port     port;
126    bool        utp_failed;         /* We recently failed to connect over uTP */
127    uint16_t    numFails;
128    time_t      time;               /* when the peer's connection status last changed */
129    time_t      piece_data_time;
130
131    time_t      lastConnectionAttemptAt;
132    time_t      lastConnectionAt;
133
134    /* similar to a TTL field, but less rigid --
135     * if the swarm is small, the atom will be kept past this date. */
136    time_t      shelf_date;
137    tr_peer   * peer;               /* will be NULL if not connected */
138    tr_address  addr;
139};
140
141#ifdef NDEBUG
142#define tr_isAtom(a) (TRUE)
143#else
144static bool
145tr_isAtom( const struct peer_atom * atom )
146{
147    return ( atom != NULL )
148        && ( atom->fromFirst < TR_PEER_FROM__MAX )
149        && ( atom->fromBest < TR_PEER_FROM__MAX )
150        && ( tr_isAddress( &atom->addr ) );
151}
152#endif
153
154static const char*
155tr_atomAddrStr( const struct peer_atom * atom )
156{
157    return atom ? tr_peerIoAddrStr( &atom->addr, atom->port ) : "[no atom]";
158}
159
160struct block_request
161{
162    tr_block_index_t block;
163    tr_peer * peer;
164    time_t sentAt;
165};
166
167struct weighted_piece
168{
169    tr_piece_index_t index;
170    int16_t salt;
171    int16_t requestCount;
172};
173
174enum piece_sort_state
175{
176    PIECES_UNSORTED,
177    PIECES_SORTED_BY_INDEX,
178    PIECES_SORTED_BY_WEIGHT
179};
180
181/** @brief Opaque, per-torrent data structure for peer connection information */
182typedef struct tr_torrent_peers
183{
184    tr_ptrArray                outgoingHandshakes; /* tr_handshake */
185    tr_ptrArray                pool; /* struct peer_atom */
186    tr_ptrArray                peers; /* tr_peer */
187    tr_ptrArray                webseeds; /* tr_webseed */
188
189    tr_torrent               * tor;
190    struct tr_peerMgr        * manager;
191
192    tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
193    int                        optimisticUnchokeTimeScaler;
194
195    bool                       isRunning;
196    bool                       needsCompletenessCheck;
197
198    struct block_request     * requests;
199    int                        requestCount;
200    int                        requestAlloc;
201
202    struct weighted_piece    * pieces;
203    int                        pieceCount;
204    enum piece_sort_state      pieceSortState;
205
206    /* An array of pieceCount items stating how many peers have each piece.
207       This is used to help us for downloading pieces "rarest first."
208       This may be NULL if we don't have metainfo yet, or if we're not
209       downloading and don't care about rarity */
210    uint16_t                 * pieceReplication;
211    size_t                     pieceReplicationSize;
212
213    int                        interestedCount;
214    int                        maxPeers;
215    time_t                     lastCancel;
216
217    /* Before the endgame this should be 0. In endgame, is contains the average
218     * number of pending requests per peer. Only peers which have more pending
219     * requests are considered 'fast' are allowed to request a block that's
220     * already been requested from another (slower?) peer. */
221    int                        endgame;
222}
223Torrent;
224
225struct tr_peerMgr
226{
227    tr_session    * session;
228    tr_ptrArray     incomingHandshakes; /* tr_handshake */
229    struct event  * bandwidthTimer;
230    struct event  * rechokeTimer;
231    struct event  * refillUpkeepTimer;
232    struct event  * atomTimer;
233};
234
235#define tordbg( t, ... ) \
236    do { \
237        if( tr_deepLoggingIsActive( ) ) \
238            tr_deepLog( __FILE__, __LINE__, tr_torrentName( t->tor ), __VA_ARGS__ ); \
239    } while( 0 )
240
241#define dbgmsg( ... ) \
242    do { \
243        if( tr_deepLoggingIsActive( ) ) \
244            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
245    } while( 0 )
246
247/**
248***
249**/
250
251static inline void
252managerLock( const struct tr_peerMgr * manager )
253{
254    tr_sessionLock( manager->session );
255}
256
257static inline void
258managerUnlock( const struct tr_peerMgr * manager )
259{
260    tr_sessionUnlock( manager->session );
261}
262
263static inline void
264torrentLock( Torrent * torrent )
265{
266    managerLock( torrent->manager );
267}
268
269static inline void
270torrentUnlock( Torrent * torrent )
271{
272    managerUnlock( torrent->manager );
273}
274
275static inline int
276torrentIsLocked( const Torrent * t )
277{
278    return tr_sessionIsLocked( t->manager->session );
279}
280
281/**
282***
283**/
284
285static int
286handshakeCompareToAddr( const void * va, const void * vb )
287{
288    const tr_handshake * a = va;
289
290    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
291}
292
293static int
294handshakeCompare( const void * a, const void * b )
295{
296    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
297}
298
299static inline tr_handshake*
300getExistingHandshake( tr_ptrArray * handshakes, const tr_address * addr )
301{
302    if( tr_ptrArrayEmpty( handshakes ) )
303        return NULL;
304
305    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
306}
307
308static int
309comparePeerAtomToAddress( const void * va, const void * vb )
310{
311    const struct peer_atom * a = va;
312
313    return tr_compareAddresses( &a->addr, vb );
314}
315
316static int
317compareAtomsByAddress( const void * va, const void * vb )
318{
319    const struct peer_atom * b = vb;
320
321    assert( tr_isAtom( b ) );
322
323    return comparePeerAtomToAddress( va, &b->addr );
324}
325
326/**
327***
328**/
329
330const tr_address *
331tr_peerAddress( const tr_peer * peer )
332{
333    return &peer->atom->addr;
334}
335
336static Torrent*
337getExistingTorrent( tr_peerMgr *    manager,
338                    const uint8_t * hash )
339{
340    tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash );
341
342    return tor == NULL ? NULL : tor->torrentPeers;
343}
344
345static int
346peerCompare( const void * a, const void * b )
347{
348    return tr_compareAddresses( tr_peerAddress( a ), tr_peerAddress( b ) );
349}
350
351static struct peer_atom*
352getExistingAtom( const Torrent    * t,
353                 const tr_address * addr )
354{
355    Torrent * tt = (Torrent*)t;
356    assert( torrentIsLocked( t ) );
357    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
358}
359
360static bool
361peerIsInUse( const Torrent * ct, const struct peer_atom * atom )
362{
363    Torrent * t = (Torrent*) ct;
364
365    assert( torrentIsLocked ( t ) );
366
367    return ( atom->peer != NULL )
368        || getExistingHandshake( &t->outgoingHandshakes, &atom->addr )
369        || getExistingHandshake( &t->manager->incomingHandshakes, &atom->addr );
370}
371
372void
373tr_peerConstruct( tr_peer * peer )
374{
375    memset( peer, 0, sizeof( tr_peer ) );
376
377    peer->have = TR_BITSET_INIT;
378
379    tr_historyConstruct( &peer->blocksSentToClient,  CANCEL_HISTORY_SEC, ( RECHOKE_PERIOD_MSEC / 1000 ) );
380    tr_historyConstruct( &peer->blocksSentToPeer,    CANCEL_HISTORY_SEC, ( RECHOKE_PERIOD_MSEC / 1000 ) );
381    tr_historyConstruct( &peer->cancelsSentToClient, CANCEL_HISTORY_SEC, ( RECHOKE_PERIOD_MSEC / 1000 ) );
382    tr_historyConstruct( &peer->cancelsSentToPeer,   CANCEL_HISTORY_SEC, ( REFILL_UPKEEP_PERIOD_MSEC / 1000 ) );
383}
384
385static tr_peer*
386peerNew( struct peer_atom * atom )
387{
388    tr_peer * peer = tr_new( tr_peer, 1 );
389    tr_peerConstruct( peer );
390
391    peer->atom = atom;
392    atom->peer = peer;
393
394    return peer;
395}
396
397static tr_peer*
398getPeer( Torrent * torrent, struct peer_atom * atom )
399{
400    tr_peer * peer;
401
402    assert( torrentIsLocked( torrent ) );
403
404    peer = atom->peer;
405
406    if( peer == NULL )
407    {
408        peer = peerNew( atom );
409        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
410    }
411
412    return peer;
413}
414
415static void peerDeclinedAllRequests( Torrent *, const tr_peer * );
416
417void
418tr_peerDestruct( tr_torrent * tor, tr_peer * peer )
419{
420    assert( peer != NULL );
421
422    peerDeclinedAllRequests( tor->torrentPeers, peer );
423
424    if( peer->msgs != NULL )
425        tr_peerMsgsFree( peer->msgs );
426
427    if( peer->io ) {
428        tr_peerIoClear( peer->io );
429        tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
430    }
431
432    tr_historyDestruct( &peer->blocksSentToClient  );
433    tr_historyDestruct( &peer->blocksSentToPeer    );
434    tr_historyDestruct( &peer->cancelsSentToClient );
435    tr_historyDestruct( &peer->cancelsSentToPeer   );
436
437    tr_bitsetDestruct( &peer->have );
438    tr_bitfieldFree( peer->blame );
439    tr_free( peer->client );
440
441    if( peer->atom )
442        peer->atom->peer = NULL;
443}
444
445static void
446peerDelete( Torrent * t, tr_peer * peer )
447{
448    tr_peerDestruct( t->tor, peer );
449    tr_free( peer );
450}
451
452static bool
453replicationExists( const Torrent * t )
454{
455    return t->pieceReplication != NULL;
456}
457
458static void
459replicationFree( Torrent * t )
460{
461    tr_free( t->pieceReplication );
462    t->pieceReplication = NULL;
463    t->pieceReplicationSize = 0;
464}
465
466static void
467replicationNew( Torrent * t )
468{
469    tr_piece_index_t piece_i;
470    const tr_piece_index_t piece_count = t->tor->info.pieceCount;
471    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
472    const int peer_count = tr_ptrArraySize( &t->peers );
473
474    assert( !replicationExists( t ) );
475
476    t->pieceReplicationSize = piece_count;
477    t->pieceReplication = tr_new0( uint16_t, piece_count );
478
479    for( piece_i=0; piece_i<piece_count; ++piece_i )
480    {
481        int peer_i;
482        uint16_t r = 0;
483
484        for( peer_i=0; peer_i<peer_count; ++peer_i )
485            if( tr_bitsetHas( &peers[peer_i]->have, piece_i ) )
486                ++r;
487
488        t->pieceReplication[piece_i] = r;
489    }
490}
491
492static void
493torrentFree( void * vt )
494{
495    Torrent * t = vt;
496
497    assert( t );
498    assert( !t->isRunning );
499    assert( torrentIsLocked( t ) );
500    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
501    assert( tr_ptrArrayEmpty( &t->peers ) );
502
503    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
504    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
505    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
506    tr_ptrArrayDestruct( &t->peers, NULL );
507
508    replicationFree( t );
509
510    tr_free( t->requests );
511    tr_free( t->pieces );
512    tr_free( t );
513}
514
515static void peerCallbackFunc( tr_peer *, const tr_peer_event *, void * );
516
517static Torrent*
518torrentNew( tr_peerMgr * manager, tr_torrent * tor )
519{
520    int       i;
521    Torrent * t;
522
523    t = tr_new0( Torrent, 1 );
524    t->manager = manager;
525    t->tor = tor;
526    t->pool = TR_PTR_ARRAY_INIT;
527    t->peers = TR_PTR_ARRAY_INIT;
528    t->webseeds = TR_PTR_ARRAY_INIT;
529    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
530
531    for( i = 0; i < tor->info.webseedCount; ++i )
532    {
533        tr_webseed * w =
534            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
535        tr_ptrArrayAppend( &t->webseeds, w );
536    }
537
538    return t;
539}
540
541tr_peerMgr*
542tr_peerMgrNew( tr_session * session )
543{
544    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
545    m->session = session;
546    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
547    return m;
548}
549
550static void
551deleteTimer( struct event ** t )
552{
553    if( *t != NULL )
554    {
555        event_free( *t );
556        *t = NULL;
557    }
558}
559
560static void
561deleteTimers( struct tr_peerMgr * m )
562{
563    deleteTimer( &m->atomTimer );
564    deleteTimer( &m->bandwidthTimer );
565    deleteTimer( &m->rechokeTimer );
566    deleteTimer( &m->refillUpkeepTimer );
567}
568
569void
570tr_peerMgrFree( tr_peerMgr * manager )
571{
572    managerLock( manager );
573
574    deleteTimers( manager );
575
576    /* free the handshakes. Abort invokes handshakeDoneCB(), which removes
577     * the item from manager->handshakes, so this is a little roundabout... */
578    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
579        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
580
581    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
582
583    managerUnlock( manager );
584    tr_free( manager );
585}
586
587static int
588clientIsDownloadingFrom( const tr_torrent * tor, const tr_peer * peer )
589{
590    if( !tr_torrentHasMetadata( tor ) )
591        return true;
592
593    return peer->clientIsInterested && !peer->clientIsChoked;
594}
595
596static int
597clientIsUploadingTo( const tr_peer * peer )
598{
599    return peer->peerIsInterested && !peer->peerIsChoked;
600}
601
602/***
603****
604***/
605
606void
607tr_peerMgrOnBlocklistChanged( tr_peerMgr * mgr )
608{
609    tr_torrent * tor = NULL;
610    tr_session * session = mgr->session;
611
612    /* we cache whether or not a peer is blocklisted...
613       since the blocklist has changed, erase that cached value */
614    while(( tor = tr_torrentNext( session, tor )))
615    {
616        int i;
617        Torrent * t = tor->torrentPeers;
618        const int n = tr_ptrArraySize( &t->pool );
619        for( i=0; i<n; ++i ) {
620            struct peer_atom * atom = tr_ptrArrayNth( &t->pool, i );
621            atom->blocklisted = -1;
622        }
623    }
624}
625
626static bool
627isAtomBlocklisted( tr_session * session, struct peer_atom * atom )
628{
629    if( atom->blocklisted < 0 )
630        atom->blocklisted = tr_sessionIsAddressBlocked( session, &atom->addr );
631
632    assert( tr_isBool( atom->blocklisted ) );
633    return atom->blocklisted;
634}
635
636
637/***
638****
639***/
640
641static void
642atomSetSeedProbability( struct peer_atom * atom, int seedProbability )
643{
644    assert( atom != NULL );
645    assert( -1<=seedProbability && seedProbability<=100 );
646
647    atom->seedProbability = seedProbability;
648
649    if( seedProbability == 100 )
650        atom->flags |= ADDED_F_SEED_FLAG;
651    else if( seedProbability != -1 )
652        atom->flags &= ~ADDED_F_SEED_FLAG;
653}
654
655static inline bool
656atomIsSeed( const struct peer_atom * atom )
657{
658    return atom->seedProbability == 100;
659}
660
661static void
662atomSetSeed( const Torrent * t, struct peer_atom * atom )
663{
664    if( !atomIsSeed( atom ) )
665    {
666        tordbg( t, "marking peer %s as a seed", tr_atomAddrStr( atom ) );
667
668        atomSetSeedProbability( atom, 100 );
669    }
670}
671
672
673bool
674tr_peerMgrPeerIsSeed( const tr_torrent  * tor,
675                      const tr_address  * addr )
676{
677    bool isSeed = false;
678    const Torrent * t = tor->torrentPeers;
679    const struct peer_atom * atom = getExistingAtom( t, addr );
680
681    if( atom )
682        isSeed = atomIsSeed( atom );
683
684    return isSeed;
685}
686
687void
688tr_peerMgrSetUtpSupported( tr_torrent * tor, const tr_address * addr )
689{
690    struct peer_atom * atom = getExistingAtom( tor->torrentPeers, addr );
691
692    if( atom )
693        atom->flags |= ADDED_F_UTP_FLAGS;
694}
695
696void
697tr_peerMgrSetUtpFailed( tr_torrent *tor, const tr_address *addr, bool failed )
698{
699    struct peer_atom * atom = getExistingAtom( tor->torrentPeers, addr );
700
701    if( atom )
702        atom->utp_failed = failed;
703}
704
705
706/**
707***  REQUESTS
708***
709*** There are two data structures associated with managing block requests:
710***
711*** 1. Torrent::requests, an array of "struct block_request" which keeps
712***    track of which blocks have been requested, and when, and by which peers.
713***    This is list is used for (a) cancelling requests that have been pending
714***    for too long and (b) avoiding duplicate requests before endgame.
715***
716*** 2. Torrent::pieces, an array of "struct weighted_piece" which lists the
717***    pieces that we want to request. It's used to decide which blocks to
718***    return next when tr_peerMgrGetBlockRequests() is called.
719**/
720
721/**
722*** struct block_request
723**/
724
725static int
726compareReqByBlock( const void * va, const void * vb )
727{
728    const struct block_request * a = va;
729    const struct block_request * b = vb;
730
731    /* primary key: block */
732    if( a->block < b->block ) return -1;
733    if( a->block > b->block ) return 1;
734
735    /* secondary key: peer */
736    if( a->peer < b->peer ) return -1;
737    if( a->peer > b->peer ) return 1;
738
739    return 0;
740}
741
742static void
743requestListAdd( Torrent * t, tr_block_index_t block, tr_peer * peer )
744{
745    struct block_request key;
746
747    /* ensure enough room is available... */
748    if( t->requestCount + 1 >= t->requestAlloc )
749    {
750        const int CHUNK_SIZE = 128;
751        t->requestAlloc += CHUNK_SIZE;
752        t->requests = tr_renew( struct block_request,
753                                t->requests, t->requestAlloc );
754    }
755
756    /* populate the record we're inserting */
757    key.block = block;
758    key.peer = peer;
759    key.sentAt = tr_time( );
760
761    /* insert the request to our array... */
762    {
763        bool exact;
764        const int pos = tr_lowerBound( &key, t->requests, t->requestCount,
765                                       sizeof( struct block_request ),
766                                       compareReqByBlock, &exact );
767        assert( !exact );
768        memmove( t->requests + pos + 1,
769                 t->requests + pos,
770                 sizeof( struct block_request ) * ( t->requestCount++ - pos ) );
771        t->requests[pos] = key;
772    }
773
774    if( peer != NULL )
775    {
776        ++peer->pendingReqsToPeer;
777        assert( peer->pendingReqsToPeer >= 0 );
778    }
779
780    /*fprintf( stderr, "added request of block %lu from peer %s... "
781                       "there are now %d block\n",
782                       (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
783}
784
785static struct block_request *
786requestListLookup( Torrent * t, tr_block_index_t block, const tr_peer * peer )
787{
788    struct block_request key;
789    key.block = block;
790    key.peer = (tr_peer*) peer;
791
792    return bsearch( &key, t->requests, t->requestCount,
793                    sizeof( struct block_request ),
794                    compareReqByBlock );
795}
796
797/**
798 * Find the peers are we currently requesting the block
799 * with index @a block from and append them to @a peerArr.
800 */
801static void
802getBlockRequestPeers( Torrent * t, tr_block_index_t block,
803                      tr_ptrArray * peerArr )
804{
805    bool exact;
806    int i, pos;
807    struct block_request key;
808
809    key.block = block;
810    key.peer = NULL;
811    pos = tr_lowerBound( &key, t->requests, t->requestCount,
812                         sizeof( struct block_request ),
813                         compareReqByBlock, &exact );
814
815    assert( !exact ); /* shouldn't have a request with .peer == NULL */
816
817    for( i = pos; i < t->requestCount; ++i )
818    {
819        if( t->requests[i].block != block )
820            break;
821        tr_ptrArrayAppend( peerArr, t->requests[i].peer );
822    }
823}
824
825static void
826decrementPendingReqCount( const struct block_request * b )
827{
828    if( b->peer != NULL )
829        if( b->peer->pendingReqsToPeer > 0 )
830            --b->peer->pendingReqsToPeer;
831}
832
833static void
834requestListRemove( Torrent * t, tr_block_index_t block, const tr_peer * peer )
835{
836    const struct block_request * b = requestListLookup( t, block, peer );
837    if( b != NULL )
838    {
839        const int pos = b - t->requests;
840        assert( pos < t->requestCount );
841
842        decrementPendingReqCount( b );
843
844        tr_removeElementFromArray( t->requests,
845                                   pos,
846                                   sizeof( struct block_request ),
847                                   t->requestCount-- );
848
849        /*fprintf( stderr, "removing request of block %lu from peer %s... "
850                           "there are now %d block requests left\n",
851                           (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
852    }
853}
854
855static int
856countActiveWebseeds( const Torrent * t )
857{
858    int activeCount = 0;
859    const tr_webseed ** w = (const tr_webseed **) tr_ptrArrayBase( &t->webseeds );
860    const tr_webseed ** const wend = w + tr_ptrArraySize( &t->webseeds );
861
862    for( ; w!=wend; ++w )
863        if( tr_webseedIsActive( *w ) )
864            ++activeCount;
865
866    return activeCount;
867}
868
869static void
870updateEndgame( Torrent * t )
871{
872    const tr_torrent * tor = t->tor;
873    const tr_block_index_t missing = tr_cpBlocksMissing( &tor->completion );
874
875    assert( t->requestCount >= 0 );
876
877    if( (tr_block_index_t) t->requestCount < missing )
878    {
879        /* not in endgame */
880        t->endgame = 0;
881    }
882    else if( !t->endgame ) /* only recalculate when endgame first begins */
883    {
884        int numDownloading = 0;
885        const tr_peer ** p = (const tr_peer **) tr_ptrArrayBase( &t->peers );
886        const tr_peer ** const pend = p + tr_ptrArraySize( &t->peers );
887
888        /* add the active bittorrent peers... */
889        for( ; p!=pend; ++p )
890            if( (*p)->pendingReqsToPeer > 0 )
891                ++numDownloading;
892
893        /* add the active webseeds... */
894        numDownloading += countActiveWebseeds( t );
895
896        /* average number of pending requests per downloading peer */
897        t->endgame = t->requestCount / MAX( numDownloading, 1 );
898    }
899}
900
901
902/****
903*****
904*****  Piece List Manipulation / Accessors
905*****
906****/
907
908static inline void
909invalidatePieceSorting( Torrent * t )
910{
911    t->pieceSortState = PIECES_UNSORTED;
912}
913
914static const tr_torrent * weightTorrent;
915
916static const uint16_t * weightReplication;
917
918static void
919setComparePieceByWeightTorrent( Torrent * t )
920{
921    if( !replicationExists( t ) )
922        replicationNew( t );
923
924    weightTorrent = t->tor;
925    weightReplication = t->pieceReplication;
926}
927
928/* we try to create a "weight" s.t. high-priority pieces come before others,
929 * and that partially-complete pieces come before empty ones. */
930static int
931comparePieceByWeight( const void * va, const void * vb )
932{
933    const struct weighted_piece * a = va;
934    const struct weighted_piece * b = vb;
935    int ia, ib, missing, pending;
936    const tr_torrent * tor = weightTorrent;
937    const uint16_t * rep = weightReplication;
938
939    /* primary key: weight */
940    missing = tr_cpMissingBlocksInPiece( &tor->completion, a->index );
941    pending = a->requestCount;
942    ia = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
943    missing = tr_cpMissingBlocksInPiece( &tor->completion, b->index );
944    pending = b->requestCount;
945    ib = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
946    if( ia < ib ) return -1;
947    if( ia > ib ) return 1;
948
949    /* secondary key: higher priorities go first */
950    ia = tor->info.pieces[a->index].priority;
951    ib = tor->info.pieces[b->index].priority;
952    if( ia > ib ) return -1;
953    if( ia < ib ) return 1;
954
955    /* tertiary key: rarest first. */
956    ia = rep[a->index];
957    ib = rep[b->index];
958    if( ia < ib ) return -1;
959    if( ia > ib ) return 1;
960
961    /* quaternary key: random */
962    if( a->salt < b->salt ) return -1;
963    if( a->salt > b->salt ) return 1;
964
965    /* okay, they're equal */
966    return 0;
967}
968
969static int
970comparePieceByIndex( const void * va, const void * vb )
971{
972    const struct weighted_piece * a = va;
973    const struct weighted_piece * b = vb;
974    if( a->index < b->index ) return -1;
975    if( a->index > b->index ) return 1;
976    return 0;
977}
978
979static void
980pieceListSort( Torrent * t, enum piece_sort_state state )
981{
982    assert( state==PIECES_SORTED_BY_INDEX
983         || state==PIECES_SORTED_BY_WEIGHT );
984
985
986    if( state == PIECES_SORTED_BY_WEIGHT )
987    {
988        setComparePieceByWeightTorrent( t );
989        qsort( t->pieces, t->pieceCount, sizeof( struct weighted_piece ), comparePieceByWeight );
990    }
991    else
992        qsort( t->pieces, t->pieceCount, sizeof( struct weighted_piece ), comparePieceByIndex );
993
994    t->pieceSortState = state;
995}
996
997/**
998 * These functions are useful for testing, but too expensive for nightly builds.
999 * let's leave it disabled but add an easy hook to compile it back in
1000 */
1001#if 1
1002#define assertWeightedPiecesAreSorted(t)
1003#define assertReplicationCountIsExact(t)
1004#else
1005static void
1006assertWeightedPiecesAreSorted( Torrent * t )
1007{
1008    if( !t->endgame )
1009    {
1010        int i;
1011        setComparePieceByWeightTorrent( t );
1012        for( i=0; i<t->pieceCount-1; ++i )
1013            assert( comparePieceByWeight( &t->pieces[i], &t->pieces[i+1] ) <= 0 );
1014    }
1015}
1016static void
1017assertReplicationCountIsExact( Torrent * t )
1018{
1019    /* This assert might fail due to errors of implementations in other
1020     * clients. It happens when receiving duplicate bitfields/HaveAll/HaveNone
1021     * from a client. If a such a behavior is noticed,
1022     * a bug report should be filled to the faulty client. */
1023
1024    size_t piece_i;
1025    const uint16_t * rep = t->pieceReplication;
1026    const size_t piece_count = t->pieceReplicationSize;
1027    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1028    const int peer_count = tr_ptrArraySize( &t->peers );
1029
1030    assert( piece_count == t->tor->info.pieceCount );
1031
1032    for( piece_i=0; piece_i<piece_count; ++piece_i )
1033    {
1034        int peer_i;
1035        uint16_t r = 0;
1036
1037        for( peer_i=0; peer_i<peer_count; ++peer_i )
1038            if( tr_bitsetHas( &peers[peer_i]->have, piece_i ) )
1039                ++r;
1040
1041        assert( rep[piece_i] == r );
1042    }
1043}
1044#endif
1045
1046static struct weighted_piece *
1047pieceListLookup( Torrent * t, tr_piece_index_t index )
1048{
1049    int i;
1050
1051    for( i=0; i<t->pieceCount; ++i )
1052        if( t->pieces[i].index == index )
1053            return &t->pieces[i];
1054
1055    return NULL;
1056}
1057
1058static void
1059pieceListRebuild( Torrent * t )
1060{
1061
1062    if( !tr_torrentIsSeed( t->tor ) )
1063    {
1064        tr_piece_index_t i;
1065        tr_piece_index_t * pool;
1066        tr_piece_index_t poolCount = 0;
1067        const tr_torrent * tor = t->tor;
1068        const tr_info * inf = tr_torrentInfo( tor );
1069        struct weighted_piece * pieces;
1070        int pieceCount;
1071
1072        /* build the new list */
1073        pool = tr_new( tr_piece_index_t, inf->pieceCount );
1074        for( i=0; i<inf->pieceCount; ++i )
1075            if( !inf->pieces[i].dnd )
1076                if( !tr_cpPieceIsComplete( &tor->completion, i ) )
1077                    pool[poolCount++] = i;
1078        pieceCount = poolCount;
1079        pieces = tr_new0( struct weighted_piece, pieceCount );
1080        for( i=0; i<poolCount; ++i ) {
1081            struct weighted_piece * piece = pieces + i;
1082            piece->index = pool[i];
1083            piece->requestCount = 0;
1084            piece->salt = tr_cryptoWeakRandInt( 4096 );
1085        }
1086
1087        /* if we already had a list of pieces, merge it into
1088         * the new list so we don't lose its requestCounts */
1089        if( t->pieces != NULL )
1090        {
1091            struct weighted_piece * o = t->pieces;
1092            struct weighted_piece * oend = o + t->pieceCount;
1093            struct weighted_piece * n = pieces;
1094            struct weighted_piece * nend = n + pieceCount;
1095
1096            pieceListSort( t, PIECES_SORTED_BY_INDEX );
1097
1098            while( o!=oend && n!=nend ) {
1099                if( o->index < n->index )
1100                    ++o;
1101                else if( o->index > n->index )
1102                    ++n;
1103                else
1104                    *n++ = *o++;
1105            }
1106
1107            tr_free( t->pieces );
1108        }
1109
1110        t->pieces = pieces;
1111        t->pieceCount = pieceCount;
1112
1113        pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
1114
1115        /* cleanup */
1116        tr_free( pool );
1117    }
1118}
1119
1120static void
1121pieceListRemovePiece( Torrent * t, tr_piece_index_t piece )
1122{
1123    struct weighted_piece * p;
1124
1125    if(( p = pieceListLookup( t, piece )))
1126    {
1127        const int pos = p - t->pieces;
1128
1129        tr_removeElementFromArray( t->pieces,
1130                                   pos,
1131                                   sizeof( struct weighted_piece ),
1132                                   t->pieceCount-- );
1133
1134        if( t->pieceCount == 0 )
1135        {
1136            tr_free( t->pieces );
1137            t->pieces = NULL;
1138        }
1139    }
1140}
1141
1142static void
1143pieceListResortPiece( Torrent * t, struct weighted_piece * p )
1144{
1145    int pos;
1146    bool isSorted = true;
1147
1148    if( p == NULL )
1149        return;
1150
1151    /* is the torrent already sorted? */
1152    pos = p - t->pieces;
1153    setComparePieceByWeightTorrent( t );
1154    if( isSorted && ( pos > 0 ) && ( comparePieceByWeight( p-1, p ) > 0 ) )
1155        isSorted = false;
1156    if( isSorted && ( pos < t->pieceCount - 1 ) && ( comparePieceByWeight( p, p+1 ) > 0 ) )
1157        isSorted = false;
1158
1159    if( t->pieceSortState != PIECES_SORTED_BY_WEIGHT )
1160    {
1161       pieceListSort( t, PIECES_SORTED_BY_WEIGHT);
1162       isSorted = true;
1163    }
1164
1165    /* if it's not sorted, move it around */
1166    if( !isSorted )
1167    {
1168        bool exact;
1169        const struct weighted_piece tmp = *p;
1170
1171        tr_removeElementFromArray( t->pieces,
1172                                   pos,
1173                                   sizeof( struct weighted_piece ),
1174                                   t->pieceCount-- );
1175
1176        pos = tr_lowerBound( &tmp, t->pieces, t->pieceCount,
1177                             sizeof( struct weighted_piece ),
1178                             comparePieceByWeight, &exact );
1179
1180        memmove( &t->pieces[pos + 1],
1181                 &t->pieces[pos],
1182                 sizeof( struct weighted_piece ) * ( t->pieceCount++ - pos ) );
1183
1184        t->pieces[pos] = tmp;
1185    }
1186
1187    assertWeightedPiecesAreSorted( t );
1188}
1189
1190static void
1191pieceListRemoveRequest( Torrent * t, tr_block_index_t block )
1192{
1193    struct weighted_piece * p;
1194    const tr_piece_index_t index = tr_torBlockPiece( t->tor, block );
1195
1196    if( ((p = pieceListLookup( t, index ))) && ( p->requestCount > 0 ) )
1197    {
1198        --p->requestCount;
1199        pieceListResortPiece( t, p );
1200    }
1201}
1202
1203
1204/****
1205*****
1206*****  Replication count ( for rarest first policy )
1207*****
1208****/
1209
1210/**
1211 * Increase the replication count of this piece and sort it if the
1212 * piece list is already sorted
1213 */
1214static void
1215tr_incrReplicationOfPiece( Torrent * t, const size_t index )
1216{
1217    assert( replicationExists( t ) );
1218    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1219
1220    /* One more replication of this piece is present in the swarm */
1221    ++t->pieceReplication[index];
1222
1223    /* we only resort the piece if the list is already sorted */
1224    if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1225        pieceListResortPiece( t, pieceListLookup( t, index ) );
1226}
1227
1228/**
1229 * Increases the replication count of pieces present in the bitfield
1230 */
1231static void
1232tr_incrReplicationFromBitfield( Torrent * t, const tr_bitfield * b )
1233{
1234    size_t i;
1235    uint16_t * rep = t->pieceReplication;
1236    const size_t n = t->tor->info.pieceCount;
1237
1238    assert( replicationExists( t ) );
1239    assert( n == t->pieceReplicationSize );
1240    assert( tr_bitfieldTestFast( b, n-1 ) );
1241
1242    for( i=0; i<n; ++i )
1243        if( tr_bitfieldHas( b, i ) )
1244            ++rep[i];
1245
1246    if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1247        invalidatePieceSorting( t );
1248}
1249
1250/**
1251 * Increase the replication count of every piece
1252 */
1253static void
1254tr_incrReplication( Torrent * t )
1255{
1256    int i;
1257    const int n = t->pieceReplicationSize;
1258
1259    assert( replicationExists( t ) );
1260    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1261
1262    for( i=0; i<n; ++i )
1263        ++t->pieceReplication[i];
1264}
1265
1266/**
1267 * Decrease the replication count of pieces present in the bitset.
1268 */
1269static void
1270tr_decrReplicationFromBitset( Torrent * t, const tr_bitset * bitset )
1271{
1272    int i;
1273    const int n = t->pieceReplicationSize;
1274
1275    assert( replicationExists( t ) );
1276    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1277
1278    if( bitset->haveAll )
1279    {
1280        for( i=0; i<n; ++i )
1281            --t->pieceReplication[i];
1282    }
1283    else if ( !bitset->haveNone )
1284    {
1285        const tr_bitfield * const b = &bitset->bitfield;
1286
1287        for( i=0; i<n; ++i )
1288            if( tr_bitfieldHas( b, i ) )
1289                --t->pieceReplication[i];
1290
1291        if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1292            invalidatePieceSorting( t );
1293    }
1294}
1295
1296/**
1297***
1298**/
1299
1300void
1301tr_peerMgrRebuildRequests( tr_torrent * tor )
1302{
1303    assert( tr_isTorrent( tor ) );
1304
1305    pieceListRebuild( tor->torrentPeers );
1306}
1307
1308void
1309tr_peerMgrGetNextRequests( tr_torrent           * tor,
1310                           tr_peer              * peer,
1311                           int                    numwant,
1312                           tr_block_index_t     * setme,
1313                           int                  * numgot )
1314{
1315    int i;
1316    int got;
1317    Torrent * t;
1318    struct weighted_piece * pieces;
1319    const tr_bitset * have = &peer->have;
1320
1321    /* sanity clause */
1322    assert( tr_isTorrent( tor ) );
1323    assert( peer->clientIsInterested );
1324    assert( !peer->clientIsChoked );
1325    assert( numwant > 0 );
1326
1327    /* walk through the pieces and find blocks that should be requested */
1328    got = 0;
1329    t = tor->torrentPeers;
1330
1331    /* prep the pieces list */
1332    if( t->pieces == NULL )
1333        pieceListRebuild( t );
1334
1335    if( t->pieceSortState != PIECES_SORTED_BY_WEIGHT )
1336        pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
1337
1338    assertReplicationCountIsExact( t );
1339    assertWeightedPiecesAreSorted( t );
1340
1341    updateEndgame( t );
1342    pieces = t->pieces;
1343    for( i=0; i<t->pieceCount && got<numwant; ++i )
1344    {
1345        struct weighted_piece * p = pieces + i;
1346
1347        /* if the peer has this piece that we want... */
1348        if( tr_bitsetHas( have, p->index ) )
1349        {
1350            tr_block_index_t b;
1351            tr_block_index_t first;
1352            tr_block_index_t last;
1353            tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1354
1355            tr_torGetPieceBlockRange( tor, p->index, &first, &last );
1356
1357            for( b=first; b<=last && got<numwant; ++b )
1358            {
1359                int peerCount;
1360                tr_peer ** peers;
1361
1362                /* don't request blocks we've already got */
1363                if( tr_cpBlockIsComplete( &tor->completion, b ) )
1364                    continue;
1365
1366                /* always add peer if this block has no peers yet */
1367                tr_ptrArrayClear( &peerArr );
1368                getBlockRequestPeers( t, b, &peerArr );
1369                peers = (tr_peer **) tr_ptrArrayPeek( &peerArr, &peerCount );
1370                if( peerCount != 0 )
1371                {
1372                    /* don't make a second block request until the endgame */
1373                    if( !t->endgame )
1374                        continue;
1375
1376                    /* don't have more than two peers requesting this block */
1377                    if( peerCount > 1 )
1378                        continue;
1379
1380                    /* don't send the same request to the same peer twice */
1381                    if( peer == peers[0] )
1382                        continue;
1383
1384                    /* in the endgame allow an additional peer to download a
1385                       block but only if the peer seems to be handling requests
1386                       relatively fast */
1387                    if( peer->pendingReqsToPeer + numwant - got < t->endgame )
1388                        continue;
1389                }
1390
1391                /* update the caller's table */
1392                setme[got++] = b;
1393
1394                /* update our own tables */
1395                requestListAdd( t, b, peer );
1396                ++p->requestCount;
1397            }
1398
1399            tr_ptrArrayDestruct( &peerArr, NULL );
1400        }
1401    }
1402
1403    /* In most cases we've just changed the weights of a small number of pieces.
1404     * So rather than qsort()ing the entire array, it's faster to apply an
1405     * adaptive insertion sort algorithm. */
1406    if( got > 0 )
1407    {
1408        /* not enough requests || last piece modified */
1409        if ( i == t->pieceCount ) --i;
1410
1411        setComparePieceByWeightTorrent( t );
1412        while( --i >= 0 )
1413        {
1414            bool exact;
1415
1416            /* relative position! */
1417            const int newpos = tr_lowerBound( &t->pieces[i], &t->pieces[i + 1],
1418                                              t->pieceCount - (i + 1),
1419                                              sizeof( struct weighted_piece ),
1420                                              comparePieceByWeight, &exact );
1421            if( newpos > 0 )
1422            {
1423                const struct weighted_piece piece = t->pieces[i];
1424                memmove( &t->pieces[i],
1425                         &t->pieces[i + 1],
1426                         sizeof( struct weighted_piece ) * ( newpos ) );
1427                t->pieces[i + newpos] = piece;
1428            }
1429        }
1430    }
1431
1432    assertWeightedPiecesAreSorted( t );
1433    *numgot = got;
1434}
1435
1436bool
1437tr_peerMgrDidPeerRequest( const tr_torrent  * tor,
1438                          const tr_peer     * peer,
1439                          tr_block_index_t    block )
1440{
1441    const Torrent * t = tor->torrentPeers;
1442    return requestListLookup( (Torrent*)t, block, peer ) != NULL;
1443}
1444
1445/* cancel requests that are too old */
1446static void
1447refillUpkeep( int foo UNUSED, short bar UNUSED, void * vmgr )
1448{
1449    time_t now;
1450    time_t too_old;
1451    tr_torrent * tor;
1452    tr_peerMgr * mgr = vmgr;
1453    managerLock( mgr );
1454
1455    now = tr_time( );
1456    too_old = now - REQUEST_TTL_SECS;
1457
1458    tor = NULL;
1459    while(( tor = tr_torrentNext( mgr->session, tor )))
1460    {
1461        Torrent * t = tor->torrentPeers;
1462        const int n = t->requestCount;
1463        if( n > 0 )
1464        {
1465            int keepCount = 0;
1466            int cancelCount = 0;
1467            struct block_request * cancel = tr_new( struct block_request, n );
1468            const struct block_request * it;
1469            const struct block_request * end;
1470
1471            for( it=t->requests, end=it+n; it!=end; ++it )
1472            {
1473                if( ( it->sentAt <= too_old ) && it->peer->msgs && !tr_peerMsgsIsReadingBlock( it->peer->msgs, it->block ) )
1474                    cancel[cancelCount++] = *it;
1475                else
1476                {
1477                    if( it != &t->requests[keepCount] )
1478                        t->requests[keepCount] = *it;
1479                    keepCount++;
1480                }
1481            }
1482
1483            /* prune out the ones we aren't keeping */
1484            t->requestCount = keepCount;
1485
1486            /* send cancel messages for all the "cancel" ones */
1487            for( it=cancel, end=it+cancelCount; it!=end; ++it ) {
1488                if( ( it->peer != NULL ) && ( it->peer->msgs != NULL ) ) {
1489                    tr_historyAdd( &it->peer->cancelsSentToPeer, now, 1 );
1490                    tr_peerMsgsCancel( it->peer->msgs, it->block );
1491                    decrementPendingReqCount( it );
1492                }
1493            }
1494
1495            /* decrement the pending request counts for the timed-out blocks */
1496            for( it=cancel, end=it+cancelCount; it!=end; ++it )
1497                pieceListRemoveRequest( t, it->block );
1498
1499            /* cleanup loop */
1500            tr_free( cancel );
1501        }
1502    }
1503
1504    tr_timerAddMsec( mgr->refillUpkeepTimer, REFILL_UPKEEP_PERIOD_MSEC );
1505    managerUnlock( mgr );
1506}
1507
1508static void
1509addStrike( Torrent * t, tr_peer * peer )
1510{
1511    tordbg( t, "increasing peer %s strike count to %d",
1512            tr_atomAddrStr( peer->atom ), peer->strikes + 1 );
1513
1514    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
1515    {
1516        struct peer_atom * atom = peer->atom;
1517        atom->flags2 |= MYFLAG_BANNED;
1518        peer->doPurge = 1;
1519        tordbg( t, "banning peer %s", tr_atomAddrStr( atom ) );
1520    }
1521}
1522
1523static void
1524gotBadPiece( Torrent * t, tr_piece_index_t pieceIndex )
1525{
1526    tr_torrent *   tor = t->tor;
1527    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
1528
1529    tor->corruptCur += byteCount;
1530    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1531
1532    tr_announcerAddBytes( tor, TR_ANN_CORRUPT, byteCount );
1533}
1534
1535static void
1536peerSuggestedPiece( Torrent            * t UNUSED,
1537                    tr_peer            * peer UNUSED,
1538                    tr_piece_index_t     pieceIndex UNUSED,
1539                    int                  isFastAllowed UNUSED )
1540{
1541#if 0
1542    assert( t );
1543    assert( peer );
1544    assert( peer->msgs );
1545
1546    /* is this a valid piece? */
1547    if(  pieceIndex >= t->tor->info.pieceCount )
1548        return;
1549
1550    /* don't ask for it if we've already got it */
1551    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
1552        return;
1553
1554    /* don't ask for it if they don't have it */
1555    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
1556        return;
1557
1558    /* don't ask for it if we're choked and it's not fast */
1559    if( !isFastAllowed && peer->clientIsChoked )
1560        return;
1561
1562    /* request the blocks that we don't have in this piece */
1563    {
1564        tr_block_index_t b;
1565        tr_block_index_t first;
1566        tr_block_index_t last;
1567        const tr_torrent * tor = t->tor;
1568
1569        tr_torGetPieceBlockRange( t->tor, pieceIndex, &first, &last );
1570
1571        for( b=first; b<=last; ++b )
1572        {
1573            if( !tr_cpBlockIsComplete( tor->completion, b ) )
1574            {
1575                const uint32_t offset = getBlockOffsetInPiece( tor, b );
1576                const uint32_t length = tr_torBlockCountBytes( tor, b );
1577                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
1578                incrementPieceRequests( t, pieceIndex );
1579            }
1580        }
1581    }
1582#endif
1583}
1584
1585static void
1586removeRequestFromTables( Torrent * t, tr_block_index_t block, const tr_peer * peer )
1587{
1588    requestListRemove( t, block, peer );
1589    pieceListRemoveRequest( t, block );
1590}
1591
1592/* peer choked us, or maybe it disconnected.
1593   either way we need to remove all its requests */
1594static void
1595peerDeclinedAllRequests( Torrent * t, const tr_peer * peer )
1596{
1597    int i, n;
1598    tr_block_index_t * blocks = tr_new( tr_block_index_t, t->requestCount );
1599
1600    for( i=n=0; i<t->requestCount; ++i )
1601        if( peer == t->requests[i].peer )
1602            blocks[n++] = t->requests[i].block;
1603
1604    for( i=0; i<n; ++i )
1605        removeRequestFromTables( t, blocks[i], peer );
1606
1607    tr_free( blocks );
1608}
1609
1610static void tr_peerMgrSetBlame( tr_torrent *, tr_piece_index_t, int );
1611
1612static void
1613peerCallbackFunc( tr_peer * peer, const tr_peer_event * e, void * vt )
1614{
1615    Torrent * t = vt;
1616
1617    torrentLock( t );
1618
1619    assert( peer != NULL );
1620
1621    switch( e->eventType )
1622    {
1623        case TR_PEER_PEER_GOT_DATA:
1624        {
1625            const time_t now = tr_time( );
1626            tr_torrent * tor = t->tor;
1627
1628            if( e->wasPieceData )
1629            {
1630                tor->uploadedCur += e->length;
1631                tr_announcerAddBytes( tor, TR_ANN_UP, e->length );
1632                tr_torrentSetActivityDate( tor, now );
1633                tr_torrentSetDirty( tor );
1634            }
1635
1636            /* update the stats */
1637            if( e->wasPieceData )
1638                tr_statsAddUploaded( tor->session, e->length );
1639
1640            /* update our atom */
1641            if( peer->atom && e->wasPieceData )
1642                peer->atom->piece_data_time = now;
1643
1644            break;
1645        }
1646
1647        case TR_PEER_CLIENT_GOT_HAVE:
1648            if( replicationExists( t ) ) {
1649                tr_incrReplicationOfPiece( t, e->pieceIndex );
1650                assertReplicationCountIsExact( t );
1651            }
1652            break;
1653
1654        case TR_PEER_CLIENT_GOT_HAVE_ALL:
1655            if( replicationExists( t ) ) {
1656                tr_incrReplication( t );
1657                assertReplicationCountIsExact( t );
1658            }
1659            break;
1660
1661        case TR_PEER_CLIENT_GOT_HAVE_NONE:
1662            /* noop */
1663            break;
1664
1665        case TR_PEER_CLIENT_GOT_BITFIELD:
1666            assert( e->bitfield != NULL );
1667            if( replicationExists( t ) ) {
1668                tr_incrReplicationFromBitfield( t, e->bitfield );
1669                assertReplicationCountIsExact( t );
1670            }
1671            break;
1672
1673        case TR_PEER_CLIENT_GOT_REJ: {
1674            tr_block_index_t b = _tr_block( t->tor, e->pieceIndex, e->offset );
1675            if( b < t->tor->blockCount )
1676                removeRequestFromTables( t, b, peer );
1677            else
1678                tordbg( t, "Peer %s sent an out-of-range reject message",
1679                           tr_atomAddrStr( peer->atom ) );
1680            break;
1681        }
1682
1683        case TR_PEER_CLIENT_GOT_CHOKE:
1684            peerDeclinedAllRequests( t, peer );
1685            break;
1686
1687        case TR_PEER_CLIENT_GOT_PORT:
1688            if( peer->atom )
1689                peer->atom->port = e->port;
1690            break;
1691
1692        case TR_PEER_CLIENT_GOT_SUGGEST:
1693            peerSuggestedPiece( t, peer, e->pieceIndex, false );
1694            break;
1695
1696        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1697            peerSuggestedPiece( t, peer, e->pieceIndex, true );
1698            break;
1699
1700        case TR_PEER_CLIENT_GOT_DATA:
1701        {
1702            const time_t now = tr_time( );
1703            tr_torrent * tor = t->tor;
1704
1705            if( e->wasPieceData )
1706            {
1707                tor->downloadedCur += e->length;
1708                tr_torrentSetActivityDate( tor, now );
1709                tr_torrentSetDirty( tor );
1710            }
1711
1712            /* update the stats */
1713            if( e->wasPieceData )
1714                tr_statsAddDownloaded( tor->session, e->length );
1715
1716            /* update our atom */
1717            if( peer->atom && e->wasPieceData )
1718                peer->atom->piece_data_time = now;
1719
1720            break;
1721        }
1722
1723        case TR_PEER_CLIENT_GOT_BLOCK:
1724        {
1725            tr_torrent * tor = t->tor;
1726            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1727            int i, peerCount;
1728            tr_peer ** peers;
1729            tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1730
1731            removeRequestFromTables( t, block, peer );
1732            getBlockRequestPeers( t, block, &peerArr );
1733            peers = (tr_peer **) tr_ptrArrayPeek( &peerArr, &peerCount );
1734
1735            /* remove additional block requests and send cancel to peers */
1736            for( i=0; i<peerCount; i++ ) {
1737                tr_peer * p = peers[i];
1738                assert( p != peer );
1739                if( p->msgs ) {
1740                    tr_historyAdd( &p->cancelsSentToPeer, tr_time( ), 1 );
1741                    tr_peerMsgsCancel( p->msgs, block );
1742                }
1743                removeRequestFromTables( t, block, p );
1744            }
1745
1746            tr_ptrArrayDestruct( &peerArr, false );
1747
1748            tr_historyAdd( &peer->blocksSentToClient, tr_time( ), 1 );
1749
1750            if( tr_cpBlockIsComplete( &tor->completion, block ) )
1751            {
1752                /* we already have this block... */
1753                const uint32_t n = tr_torBlockCountBytes( tor, block );
1754                tor->downloadedCur -= MIN( tor->downloadedCur, n );
1755                tordbg( t, "we have this block already..." );
1756            }
1757            else
1758            {
1759                tr_cpBlockAdd( &tor->completion, block );
1760                pieceListResortPiece( t, pieceListLookup( t, e->pieceIndex ) );
1761                tr_torrentSetDirty( tor );
1762
1763                if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1764                {
1765                    const tr_piece_index_t p = e->pieceIndex;
1766                    const bool ok = tr_torrentCheckPiece( tor, p );
1767
1768                    tordbg( t, "[LAZY] checked just-completed piece %zu", (size_t)p );
1769
1770                    if( !ok )
1771                    {
1772                        tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1773                                   (unsigned long)p );
1774                    }
1775
1776                    tr_peerMgrSetBlame( tor, p, ok );
1777
1778                    if( !ok )
1779                    {
1780                        gotBadPiece( t, p );
1781                    }
1782                    else
1783                    {
1784                        int i;
1785                        int peerCount;
1786                        tr_peer ** peers;
1787                        tr_file_index_t fileIndex;
1788
1789                        /* only add this to downloadedCur if we got it from a peer --
1790                         * webseeds shouldn't count against our ratio. As one tracker
1791                         * admin put it, "Those pieces are downloaded directly from the
1792                         * content distributor, not the peers, it is the tracker's job
1793                         * to manage the swarms, not the web server and does not fit
1794                         * into the jurisdiction of the tracker." */
1795                        if( peer->msgs != NULL ) {
1796                            const uint32_t n = tr_torPieceCountBytes( tor, p );
1797                            tr_announcerAddBytes( tor, TR_ANN_DOWN, n );
1798                        }
1799
1800                        peerCount = tr_ptrArraySize( &t->peers );
1801                        peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1802                        for( i=0; i<peerCount; ++i )
1803                            tr_peerMsgsHave( peers[i]->msgs, p );
1804
1805                        for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex ) {
1806                            const tr_file * file = &tor->info.files[fileIndex];
1807                            if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) ) {
1808                                if( tr_cpFileIsComplete( &tor->completion, fileIndex ) ) {
1809                                    tr_cacheFlushFile( tor->session->cache, tor, fileIndex );
1810                                    tr_torrentFileCompleted( tor, fileIndex );
1811                                }
1812                            }
1813                        }
1814
1815                        pieceListRemovePiece( t, p );
1816                    }
1817                }
1818
1819                t->needsCompletenessCheck = true;
1820            }
1821            break;
1822        }
1823
1824        case TR_PEER_ERROR:
1825            if( ( e->err == ERANGE ) || ( e->err == EMSGSIZE ) || ( e->err == ENOTCONN ) )
1826            {
1827                /* some protocol error from the peer */
1828                peer->doPurge = 1;
1829                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1830                        tr_atomAddrStr( peer->atom ) );
1831            }
1832            else
1833            {
1834                tordbg( t, "unhandled error: %s", tr_strerror( e->err ) );
1835            }
1836            break;
1837
1838        default:
1839            assert( 0 );
1840    }
1841
1842    torrentUnlock( t );
1843}
1844
1845static int
1846getDefaultShelfLife( uint8_t from )
1847{
1848    /* in general, peers obtained from firsthand contact
1849     * are better than those from secondhand, etc etc */
1850    switch( from )
1851    {
1852        case TR_PEER_FROM_INCOMING : return 60 * 60 * 6;
1853        case TR_PEER_FROM_LTEP     : return 60 * 60 * 6;
1854        case TR_PEER_FROM_TRACKER  : return 60 * 60 * 3;
1855        case TR_PEER_FROM_DHT      : return 60 * 60 * 3;
1856        case TR_PEER_FROM_PEX      : return 60 * 60 * 2;
1857        case TR_PEER_FROM_RESUME   : return 60 * 60;
1858        case TR_PEER_FROM_LPD      : return 10 * 60;
1859        default                    : return 60 * 60;
1860    }
1861}
1862
1863static void
1864ensureAtomExists( Torrent           * t,
1865                  const tr_address  * addr,
1866                  const tr_port       port,
1867                  const uint8_t       flags,
1868                  const int8_t        seedProbability,
1869                  const uint8_t       from )
1870{
1871    struct peer_atom * a;
1872
1873    assert( tr_isAddress( addr ) );
1874    assert( from < TR_PEER_FROM__MAX );
1875
1876    a = getExistingAtom( t, addr );
1877
1878    if( a == NULL )
1879    {
1880        const int jitter = tr_cryptoWeakRandInt( 60*10 );
1881        a = tr_new0( struct peer_atom, 1 );
1882        a->addr = *addr;
1883        a->port = port;
1884        a->flags = flags;
1885        a->fromFirst = from;
1886        a->fromBest = from;
1887        a->shelf_date = tr_time( ) + getDefaultShelfLife( from ) + jitter;
1888        a->blocklisted = -1;
1889        atomSetSeedProbability( a, seedProbability );
1890        tr_ptrArrayInsertSorted( &t->pool, a, compareAtomsByAddress );
1891
1892        tordbg( t, "got a new atom: %s", tr_atomAddrStr( a ) );
1893    }
1894    else
1895    {
1896        if( from < a->fromBest )
1897            a->fromBest = from;
1898
1899        if( a->seedProbability == -1 )
1900            atomSetSeedProbability( a, seedProbability );
1901
1902        a->flags |= flags;
1903    }
1904}
1905
1906static int
1907getMaxPeerCount( const tr_torrent * tor )
1908{
1909    return tor->maxConnectedPeers;
1910}
1911
1912static int
1913getPeerCount( const Torrent * t )
1914{
1915    return tr_ptrArraySize( &t->peers );/* + tr_ptrArraySize( &t->outgoingHandshakes ); */
1916}
1917
1918/* FIXME: this is kind of a mess. */
1919static bool
1920myHandshakeDoneCB( tr_handshake  * handshake,
1921                   tr_peerIo     * io,
1922                   bool            readAnythingFromPeer,
1923                   bool            isConnected,
1924                   const uint8_t * peer_id,
1925                   void          * vmanager )
1926{
1927    bool               ok = isConnected;
1928    bool               success = false;
1929    tr_port            port;
1930    const tr_address * addr;
1931    tr_peerMgr       * manager = vmanager;
1932    Torrent          * t;
1933    tr_handshake     * ours;
1934
1935    assert( io );
1936    assert( tr_isBool( ok ) );
1937
1938    t = tr_peerIoHasTorrentHash( io )
1939        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1940        : NULL;
1941
1942    if( tr_peerIoIsIncoming ( io ) )
1943        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1944                                        handshake, handshakeCompare );
1945    else if( t )
1946        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1947                                        handshake, handshakeCompare );
1948    else
1949        ours = handshake;
1950
1951    assert( ours );
1952    assert( ours == handshake );
1953
1954    if( t )
1955        torrentLock( t );
1956
1957    addr = tr_peerIoGetAddress( io, &port );
1958
1959    if( !ok || !t || !t->isRunning )
1960    {
1961        if( t )
1962        {
1963            struct peer_atom * atom = getExistingAtom( t, addr );
1964            if( atom )
1965            {
1966                ++atom->numFails;
1967
1968                if( !readAnythingFromPeer )
1969                {
1970                    tordbg( t, "marking peer %s as unreachable... numFails is %d", tr_atomAddrStr( atom ), (int)atom->numFails );
1971                    atom->flags2 |= MYFLAG_UNREACHABLE;
1972                }
1973            }
1974        }
1975    }
1976    else /* looking good */
1977    {
1978        struct peer_atom * atom;
1979
1980        ensureAtomExists( t, addr, port, 0, -1, TR_PEER_FROM_INCOMING );
1981        atom = getExistingAtom( t, addr );
1982        atom->time = tr_time( );
1983        atom->piece_data_time = 0;
1984        atom->lastConnectionAt = tr_time( );
1985
1986        if( !tr_peerIoIsIncoming( io ) )
1987        {
1988            atom->flags |= ADDED_F_CONNECTABLE;
1989            atom->flags2 &= ~MYFLAG_UNREACHABLE;
1990        }
1991
1992        /* In principle, this flag specifies whether the peer groks uTP,
1993           not whether it's currently connected over uTP. */
1994        if( io->utp_socket )
1995            atom->flags |= ADDED_F_UTP_FLAGS;
1996
1997        if( atom->flags2 & MYFLAG_BANNED )
1998        {
1999            tordbg( t, "banned peer %s tried to reconnect",
2000                    tr_atomAddrStr( atom ) );
2001        }
2002        else if( tr_peerIoIsIncoming( io )
2003               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
2004
2005        {
2006        }
2007        else
2008        {
2009            tr_peer * peer = atom->peer;
2010
2011            if( peer ) /* we already have this peer */
2012            {
2013            }
2014            else
2015            {
2016                peer = getPeer( t, atom );
2017                tr_free( peer->client );
2018
2019                if( !peer_id )
2020                    peer->client = NULL;
2021                else {
2022                    char client[128];
2023                    tr_clientForId( client, sizeof( client ), peer_id );
2024                    peer->client = tr_strdup( client );
2025                }
2026
2027                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
2028                                                                balanced by our unref in peerDelete()  */
2029                tr_peerIoSetParent( peer->io, t->tor->bandwidth );
2030                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t );
2031
2032                success = true;
2033            }
2034        }
2035    }
2036
2037    if( t )
2038        torrentUnlock( t );
2039
2040    return success;
2041}
2042
2043void
2044tr_peerMgrAddIncoming( tr_peerMgr * manager,
2045                       tr_address * addr,
2046                       tr_port      port,
2047                       int          socket,
2048                       struct UTPSocket * utp_socket )
2049{
2050    tr_session * session;
2051
2052    managerLock( manager );
2053
2054    assert( tr_isSession( manager->session ) );
2055    session = manager->session;
2056
2057    if( tr_sessionIsAddressBlocked( session, addr ) )
2058    {
2059        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
2060        if(socket >= 0)
2061            tr_netClose( session, socket );
2062        else
2063            UTP_Close( utp_socket );
2064    }
2065    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
2066    {
2067        if(socket >= 0)
2068            tr_netClose( session, socket );
2069        else
2070            UTP_Close( utp_socket );
2071    }
2072    else /* we don't have a connection to them yet... */
2073    {
2074        tr_peerIo *    io;
2075        tr_handshake * handshake;
2076
2077        io = tr_peerIoNewIncoming( session, session->bandwidth, addr, port, socket, utp_socket );
2078
2079        handshake = tr_handshakeNew( io,
2080                                     session->encryptionMode,
2081                                     myHandshakeDoneCB,
2082                                     manager );
2083
2084        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
2085
2086        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
2087                                 handshakeCompare );
2088    }
2089
2090    managerUnlock( manager );
2091}
2092
2093void
2094tr_peerMgrAddPex( tr_torrent * tor, uint8_t from,
2095                  const tr_pex * pex, int8_t seedProbability )
2096{
2097    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
2098    {
2099        Torrent * t = tor->torrentPeers;
2100        managerLock( t->manager );
2101
2102        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
2103            if( tr_isValidPeerAddress( &pex->addr, pex->port ) )
2104                ensureAtomExists( t, &pex->addr, pex->port, pex->flags, seedProbability, from );
2105
2106        managerUnlock( t->manager );
2107    }
2108}
2109
2110void
2111tr_peerMgrMarkAllAsSeeds( tr_torrent * tor )
2112{
2113    Torrent * t = tor->torrentPeers;
2114    const int n = tr_ptrArraySize( &t->pool );
2115    struct peer_atom ** it = (struct peer_atom**) tr_ptrArrayBase( &t->pool );
2116    struct peer_atom ** end = it + n;
2117
2118    while( it != end )
2119        atomSetSeed( t, *it++ );
2120}
2121
2122tr_pex *
2123tr_peerMgrCompactToPex( const void *    compact,
2124                        size_t          compactLen,
2125                        const uint8_t * added_f,
2126                        size_t          added_f_len,
2127                        size_t *        pexCount )
2128{
2129    size_t          i;
2130    size_t          n = compactLen / 6;
2131    const uint8_t * walk = compact;
2132    tr_pex *        pex = tr_new0( tr_pex, n );
2133
2134    for( i = 0; i < n; ++i )
2135    {
2136        pex[i].addr.type = TR_AF_INET;
2137        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
2138        memcpy( &pex[i].port, walk, 2 ); walk += 2;
2139        if( added_f && ( n == added_f_len ) )
2140            pex[i].flags = added_f[i];
2141    }
2142
2143    *pexCount = n;
2144    return pex;
2145}
2146
2147tr_pex *
2148tr_peerMgrCompact6ToPex( const void    * compact,
2149                         size_t          compactLen,
2150                         const uint8_t * added_f,
2151                         size_t          added_f_len,
2152                         size_t        * pexCount )
2153{
2154    size_t          i;
2155    size_t          n = compactLen / 18;
2156    const uint8_t * walk = compact;
2157    tr_pex *        pex = tr_new0( tr_pex, n );
2158
2159    for( i = 0; i < n; ++i )
2160    {
2161        pex[i].addr.type = TR_AF_INET6;
2162        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
2163        memcpy( &pex[i].port, walk, 2 ); walk += 2;
2164        if( added_f && ( n == added_f_len ) )
2165            pex[i].flags = added_f[i];
2166    }
2167
2168    *pexCount = n;
2169    return pex;
2170}
2171
2172tr_pex *
2173tr_peerMgrArrayToPex( const void * array,
2174                      size_t       arrayLen,
2175                      size_t      * pexCount )
2176{
2177    size_t          i;
2178    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
2179    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
2180    const uint8_t * walk = array;
2181    tr_pex        * pex = tr_new0( tr_pex, n );
2182
2183    for( i = 0 ; i < n ; i++ ) {
2184        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
2185        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
2186        pex[i].flags = 0x00;
2187        walk += sizeof( tr_address ) + 2;
2188    }
2189
2190    *pexCount = n;
2191    return pex;
2192}
2193
2194/**
2195***
2196**/
2197
2198static void
2199tr_peerMgrSetBlame( tr_torrent     * tor,
2200                    tr_piece_index_t pieceIndex,
2201                    int              success )
2202{
2203    if( !success )
2204    {
2205        int        peerCount, i;
2206        Torrent *  t = tor->torrentPeers;
2207        tr_peer ** peers;
2208
2209        assert( torrentIsLocked( t ) );
2210
2211        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
2212        for( i = 0; i < peerCount; ++i )
2213        {
2214            tr_peer * peer = peers[i];
2215            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
2216            {
2217                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
2218                        tr_atomAddrStr( peer->atom ),
2219                        pieceIndex, (int)peer->strikes + 1 );
2220                addStrike( t, peer );
2221            }
2222        }
2223    }
2224}
2225
2226int
2227tr_pexCompare( const void * va, const void * vb )
2228{
2229    const tr_pex * a = va;
2230    const tr_pex * b = vb;
2231    int i;
2232
2233    assert( tr_isPex( a ) );
2234    assert( tr_isPex( b ) );
2235
2236    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
2237        return i;
2238
2239    if( a->port != b->port )
2240        return a->port < b->port ? -1 : 1;
2241
2242    return 0;
2243}
2244
2245#if 0
2246static int
2247peerPrefersCrypto( const tr_peer * peer )
2248{
2249    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
2250        return true;
2251
2252    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
2253        return false;
2254
2255    return tr_peerIoIsEncrypted( peer->io );
2256}
2257#endif
2258
2259/* better goes first */
2260static int
2261compareAtomsByUsefulness( const void * va, const void *vb )
2262{
2263    const struct peer_atom * a = * (const struct peer_atom**) va;
2264    const struct peer_atom * b = * (const struct peer_atom**) vb;
2265
2266    assert( tr_isAtom( a ) );
2267    assert( tr_isAtom( b ) );
2268
2269    if( a->piece_data_time != b->piece_data_time )
2270        return a->piece_data_time > b->piece_data_time ? -1 : 1;
2271    if( a->fromBest != b->fromBest )
2272        return a->fromBest < b->fromBest ? -1 : 1;
2273    if( a->numFails != b->numFails )
2274        return a->numFails < b->numFails ? -1 : 1;
2275
2276    return 0;
2277}
2278
2279int
2280tr_peerMgrGetPeers( tr_torrent   * tor,
2281                    tr_pex      ** setme_pex,
2282                    uint8_t        af,
2283                    uint8_t        list_mode,
2284                    int            maxCount )
2285{
2286    int i;
2287    int n;
2288    int count = 0;
2289    int atomCount = 0;
2290    const Torrent * t = tor->torrentPeers;
2291    struct peer_atom ** atoms = NULL;
2292    tr_pex * pex;
2293    tr_pex * walk;
2294
2295    assert( tr_isTorrent( tor ) );
2296    assert( setme_pex != NULL );
2297    assert( af==TR_AF_INET || af==TR_AF_INET6 );
2298    assert( list_mode==TR_PEERS_CONNECTED || list_mode==TR_PEERS_ALL );
2299
2300    managerLock( t->manager );
2301
2302    /**
2303    ***  build a list of atoms
2304    **/
2305
2306    if( list_mode == TR_PEERS_CONNECTED ) /* connected peers only */
2307    {
2308        int i;
2309        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2310        atomCount = tr_ptrArraySize( &t->peers );
2311        atoms = tr_new( struct peer_atom *, atomCount );
2312        for( i=0; i<atomCount; ++i )
2313            atoms[i] = peers[i]->atom;
2314    }
2315    else /* TR_PEERS_ALL */
2316    {
2317        const struct peer_atom ** atomsBase = (const struct peer_atom**) tr_ptrArrayBase( &t->pool );
2318        atomCount = tr_ptrArraySize( &t->pool );
2319        atoms = tr_memdup( atomsBase, atomCount * sizeof( struct peer_atom * ) );
2320    }
2321
2322    qsort( atoms, atomCount, sizeof( struct peer_atom * ), compareAtomsByUsefulness );
2323
2324    /**
2325    ***  add the first N of them into our return list
2326    **/
2327
2328    n = MIN( atomCount, maxCount );
2329    pex = walk = tr_new0( tr_pex, n );
2330
2331    for( i=0; i<atomCount && count<n; ++i )
2332    {
2333        const struct peer_atom * atom = atoms[i];
2334        if( atom->addr.type == af )
2335        {
2336            assert( tr_isAddress( &atom->addr ) );
2337            walk->addr = atom->addr;
2338            walk->port = atom->port;
2339            walk->flags = atom->flags;
2340            ++count;
2341            ++walk;
2342        }
2343    }
2344
2345    qsort( pex, count, sizeof( tr_pex ), tr_pexCompare );
2346
2347    assert( ( walk - pex ) == count );
2348    *setme_pex = pex;
2349
2350    /* cleanup */
2351    tr_free( atoms );
2352    managerUnlock( t->manager );
2353    return count;
2354}
2355
2356static void atomPulse      ( int, short, void * );
2357static void bandwidthPulse ( int, short, void * );
2358static void rechokePulse   ( int, short, void * );
2359static void reconnectPulse ( int, short, void * );
2360
2361static struct event *
2362createTimer( tr_session * session, int msec, void (*callback)(int, short, void *), void * cbdata )
2363{
2364    struct event * timer = evtimer_new( session->event_base, callback, cbdata );
2365    tr_timerAddMsec( timer, msec );
2366    return timer;
2367}
2368
2369static void
2370ensureMgrTimersExist( struct tr_peerMgr * m )
2371{
2372    if( m->atomTimer == NULL )
2373        m->atomTimer = createTimer( m->session, ATOM_PERIOD_MSEC, atomPulse, m );
2374
2375    if( m->bandwidthTimer == NULL )
2376        m->bandwidthTimer = createTimer( m->session, BANDWIDTH_PERIOD_MSEC, bandwidthPulse, m );
2377
2378    if( m->rechokeTimer == NULL )
2379        m->rechokeTimer = createTimer( m->session, RECHOKE_PERIOD_MSEC, rechokePulse, m );
2380
2381    if( m->refillUpkeepTimer == NULL )
2382        m->refillUpkeepTimer = createTimer( m->session, REFILL_UPKEEP_PERIOD_MSEC, refillUpkeep, m );
2383}
2384
2385void
2386tr_peerMgrStartTorrent( tr_torrent * tor )
2387{
2388    Torrent * t = tor->torrentPeers;
2389
2390    assert( tr_isTorrent( tor ) );
2391    assert( tr_torrentIsLocked( tor ) );
2392
2393    ensureMgrTimersExist( t->manager );
2394
2395    t->isRunning = true;
2396    t->maxPeers = t->tor->maxConnectedPeers;
2397    t->pieceSortState = PIECES_UNSORTED;
2398
2399    rechokePulse( 0, 0, t->manager );
2400}
2401
2402static void
2403stopTorrent( Torrent * t )
2404{
2405    int i, n;
2406
2407    t->isRunning = false;
2408
2409    replicationFree( t );
2410    invalidatePieceSorting( t );
2411
2412    /* disconnect the peers. */
2413    for( i=0, n=tr_ptrArraySize( &t->peers ); i<n; ++i )
2414        peerDelete( t, tr_ptrArrayNth( &t->peers, i ) );
2415    tr_ptrArrayClear( &t->peers );
2416
2417    /* disconnect the handshakes. handshakeAbort calls handshakeDoneCB(),
2418     * which removes the handshake from t->outgoingHandshakes... */
2419    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
2420        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
2421}
2422
2423void
2424tr_peerMgrStopTorrent( tr_torrent * tor )
2425{
2426    assert( tr_isTorrent( tor ) );
2427    assert( tr_torrentIsLocked( tor ) );
2428
2429    stopTorrent( tor->torrentPeers );
2430}
2431
2432void
2433tr_peerMgrAddTorrent( tr_peerMgr * manager, tr_torrent * tor )
2434{
2435    assert( tr_isTorrent( tor ) );
2436    assert( tr_torrentIsLocked( tor ) );
2437    assert( tor->torrentPeers == NULL );
2438
2439    tor->torrentPeers = torrentNew( manager, tor );
2440}
2441
2442void
2443tr_peerMgrRemoveTorrent( tr_torrent * tor )
2444{
2445    assert( tr_isTorrent( tor ) );
2446    assert( tr_torrentIsLocked( tor ) );
2447
2448    stopTorrent( tor->torrentPeers );
2449    torrentFree( tor->torrentPeers );
2450}
2451
2452void
2453tr_peerUpdateProgress( tr_torrent * tor, tr_peer * peer )
2454{
2455    const tr_bitset * have = &peer->have;
2456
2457    if( have->haveAll )
2458    {
2459        peer->progress = 1.0;
2460    }
2461    else if( have->haveNone )
2462    {
2463        peer->progress = 0.0;
2464    }
2465    else
2466    {
2467        const float trueCount = tr_bitfieldCountTrueBits( &have->bitfield );
2468
2469        if( tr_torrentHasMetadata( tor ) )
2470            peer->progress = trueCount / tor->info.pieceCount;
2471        else /* without pieceCount, this result is only a best guess... */
2472            peer->progress = trueCount / ( have->bitfield.bitCount + 1 );
2473    }
2474
2475    if( peer->atom && ( peer->progress >= 1.0 ) )
2476        atomSetSeed( tor->torrentPeers, peer->atom );
2477}
2478
2479void
2480tr_peerMgrOnTorrentGotMetainfo( tr_torrent * tor )
2481{
2482    int i;
2483    const int peerCount = tr_ptrArraySize( &tor->torrentPeers->peers );
2484    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &tor->torrentPeers->peers );
2485
2486    /* some peer_msgs' progress fields may not be accurate if we
2487       didn't have the metadata before now... so refresh them all... */
2488    for( i=0; i<peerCount; ++i )
2489        tr_peerUpdateProgress( tor, peers[i] );
2490}
2491
2492void
2493tr_peerMgrTorrentAvailability( const tr_torrent * tor, int8_t * tab, unsigned int tabCount )
2494{
2495    assert( tr_isTorrent( tor ) );
2496    assert( torrentIsLocked( tor->torrentPeers ) );
2497    assert( tab != NULL );
2498    assert( tabCount > 0 );
2499
2500    memset( tab, 0, tabCount );
2501
2502    if( tr_torrentHasMetadata( tor ) )
2503    {
2504        tr_piece_index_t i;
2505        const int peerCount = tr_ptrArraySize( &tor->torrentPeers->peers );
2506        const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &tor->torrentPeers->peers );
2507        const float interval = tor->info.pieceCount / (float)tabCount;
2508        const bool isSeed = tr_cpGetStatus( &tor->completion ) == TR_SEED;
2509
2510        for( i=0; i<tabCount; ++i )
2511        {
2512            const int piece = i * interval;
2513
2514            if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
2515                tab[i] = -1;
2516            else if( peerCount ) {
2517                int j;
2518                for( j=0; j<peerCount; ++j )
2519                    if( tr_bitsetHas( &peers[j]->have, piece ) )
2520                        ++tab[i];
2521            }
2522        }
2523    }
2524}
2525
2526/* Returns the pieces that are available from peers */
2527tr_bitfield*
2528tr_peerMgrGetAvailable( const tr_torrent * tor )
2529{
2530    int i;
2531    Torrent * t = tor->torrentPeers;
2532    const int peerCount = tr_ptrArraySize( &t->peers );
2533    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2534    tr_bitfield * pieces = tr_bitfieldNew( t->tor->info.pieceCount );
2535
2536    assert( tr_torrentIsLocked( tor ) );
2537
2538    for( i=0; i<peerCount; ++i )
2539        tr_bitsetOr( pieces, &peers[i]->have );
2540
2541    return pieces;
2542}
2543
2544void
2545tr_peerMgrTorrentStats( tr_torrent  * tor,
2546                        int         * setmePeersConnected,
2547                        int         * setmeSeedsConnected,
2548                        int         * setmeWebseedsSendingToUs,
2549                        int         * setmePeersSendingToUs,
2550                        int         * setmePeersGettingFromUs,
2551                        int         * setmePeersFrom )
2552{
2553    int i, size;
2554    const Torrent * t = tor->torrentPeers;
2555    const tr_peer ** peers;
2556
2557    assert( tr_torrentIsLocked( tor ) );
2558
2559    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2560    size = tr_ptrArraySize( &t->peers );
2561
2562    *setmePeersConnected       = 0;
2563    *setmeSeedsConnected       = 0;
2564    *setmePeersGettingFromUs   = 0;
2565    *setmePeersSendingToUs     = 0;
2566    *setmeWebseedsSendingToUs  = 0;
2567
2568    for( i=0; i<TR_PEER_FROM__MAX; ++i )
2569        setmePeersFrom[i] = 0;
2570
2571    for( i=0; i<size; ++i )
2572    {
2573        const tr_peer * peer = peers[i];
2574        const struct peer_atom * atom = peer->atom;
2575
2576        if( peer->io == NULL ) /* not connected */
2577            continue;
2578
2579        ++*setmePeersConnected;
2580
2581        ++setmePeersFrom[atom->fromFirst];
2582
2583        if( clientIsDownloadingFrom( tor, peer ) )
2584            ++*setmePeersSendingToUs;
2585
2586        if( clientIsUploadingTo( peer ) )
2587            ++*setmePeersGettingFromUs;
2588
2589        if( atomIsSeed( atom ) )
2590            ++*setmeSeedsConnected;
2591    }
2592
2593    *setmeWebseedsSendingToUs = countActiveWebseeds( t );
2594}
2595
2596double*
2597tr_peerMgrWebSpeeds_KBps( const tr_torrent * tor )
2598{
2599    int i;
2600    const Torrent * t = tor->torrentPeers;
2601    const int webseedCount = tr_ptrArraySize( &t->webseeds );
2602    const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2603    const uint64_t now = tr_time_msec( );
2604    double * ret = tr_new0( double, webseedCount );
2605
2606    assert( tr_isTorrent( tor ) );
2607    assert( tr_torrentIsLocked( tor ) );
2608    assert( t->manager != NULL );
2609    assert( webseedCount == tor->info.webseedCount );
2610
2611    for( i=0; i<webseedCount; ++i ) {
2612        int Bps;
2613        if( tr_webseedGetSpeed_Bps( webseeds[i], now, &Bps ) )
2614            ret[i] = Bps / (double)tr_speed_K;
2615        else
2616            ret[i] = -1.0;
2617    }
2618
2619    return ret;
2620}
2621
2622int
2623tr_peerGetPieceSpeed_Bps( const tr_peer * peer, uint64_t now, tr_direction direction )
2624{
2625    return peer->io ? tr_peerIoGetPieceSpeed_Bps( peer->io, now, direction ) : 0.0;
2626}
2627
2628static bool
2629peerIsSeed( const tr_peer * peer )
2630{
2631    if( peer->progress >= 1.0 )
2632        return true;
2633
2634    if( peer->atom && atomIsSeed( peer->atom ) )
2635        return true;
2636
2637    return false;
2638}
2639
2640struct tr_peer_stat *
2641tr_peerMgrPeerStats( const tr_torrent * tor, int * setmeCount )
2642{
2643    int i;
2644    const Torrent * t = tor->torrentPeers;
2645    const int size = tr_ptrArraySize( &t->peers );
2646    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2647    const uint64_t now_msec = tr_time_msec( );
2648    const time_t now = tr_time();
2649    tr_peer_stat * ret = tr_new0( tr_peer_stat, size );
2650
2651    assert( tr_isTorrent( tor ) );
2652    assert( tr_torrentIsLocked( tor ) );
2653    assert( t->manager );
2654
2655    for( i=0; i<size; ++i )
2656    {
2657        char *                   pch;
2658        const tr_peer *          peer = peers[i];
2659        const struct peer_atom * atom = peer->atom;
2660        tr_peer_stat *           stat = ret + i;
2661
2662        tr_ntop( &atom->addr, stat->addr, sizeof( stat->addr ) );
2663        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
2664                   sizeof( stat->client ) );
2665        stat->port                = ntohs( peer->atom->port );
2666        stat->from                = atom->fromFirst;
2667        stat->progress            = peer->progress;
2668        stat->isUTP               = peer->io->utp_socket != NULL;
2669        stat->isEncrypted         = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
2670        stat->rateToPeer_KBps     = toSpeedKBps( tr_peerGetPieceSpeed_Bps( peer, now_msec, TR_CLIENT_TO_PEER ) );
2671        stat->rateToClient_KBps   = toSpeedKBps( tr_peerGetPieceSpeed_Bps( peer, now_msec, TR_PEER_TO_CLIENT ) );
2672        stat->peerIsChoked        = peer->peerIsChoked;
2673        stat->peerIsInterested    = peer->peerIsInterested;
2674        stat->clientIsChoked      = peer->clientIsChoked;
2675        stat->clientIsInterested  = peer->clientIsInterested;
2676        stat->isIncoming          = tr_peerIoIsIncoming( peer->io );
2677        stat->isDownloadingFrom   = clientIsDownloadingFrom( tor, peer );
2678        stat->isUploadingTo       = clientIsUploadingTo( peer );
2679        stat->isSeed              = peerIsSeed( peer );
2680
2681        stat->blocksToPeer        = tr_historyGet( &peer->blocksSentToPeer,    now, CANCEL_HISTORY_SEC );
2682        stat->blocksToClient      = tr_historyGet( &peer->blocksSentToClient,  now, CANCEL_HISTORY_SEC );
2683        stat->cancelsToPeer       = tr_historyGet( &peer->cancelsSentToPeer,   now, CANCEL_HISTORY_SEC );
2684        stat->cancelsToClient     = tr_historyGet( &peer->cancelsSentToClient, now, CANCEL_HISTORY_SEC );
2685
2686        stat->pendingReqsToPeer   = peer->pendingReqsToPeer;
2687        stat->pendingReqsToClient = peer->pendingReqsToClient;
2688
2689        pch = stat->flagStr;
2690        if( stat->isUTP ) *pch++ = 'T';
2691        if( t->optimistic == peer ) *pch++ = 'O';
2692        if( stat->isDownloadingFrom ) *pch++ = 'D';
2693        else if( stat->clientIsInterested ) *pch++ = 'd';
2694        if( stat->isUploadingTo ) *pch++ = 'U';
2695        else if( stat->peerIsInterested ) *pch++ = 'u';
2696        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
2697        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
2698        if( stat->isEncrypted ) *pch++ = 'E';
2699        if( stat->from == TR_PEER_FROM_DHT ) *pch++ = 'H';
2700        else if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
2701        if( stat->isIncoming ) *pch++ = 'I';
2702        *pch = '\0';
2703    }
2704
2705    *setmeCount = size;
2706
2707    return ret;
2708}
2709
2710/**
2711***
2712**/
2713
2714void
2715tr_peerMgrClearInterest( tr_torrent * tor )
2716{
2717    int i;
2718    Torrent * t = tor->torrentPeers;
2719    const int peerCount = tr_ptrArraySize( &t->peers );
2720
2721    assert( tr_isTorrent( tor ) );
2722    assert( tr_torrentIsLocked( tor ) );
2723
2724    for( i=0; i<peerCount; ++i )
2725    {
2726        const tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2727        tr_peerMsgsSetInterested( peer->msgs, false );
2728    }
2729}
2730
2731/* do we still want this piece and does the peer have it? */
2732static bool
2733isPieceInteresting( const tr_torrent * tor, const tr_peer * peer, tr_piece_index_t index )
2734{
2735    return ( !tor->info.pieces[index].dnd ) /* we want it */
2736        && ( !tr_cpPieceIsComplete( &tor->completion, index ) )  /* we don't have it */
2737        && ( tr_bitsetHas( &peer->have, index ) ); /* peer has it */
2738}
2739
2740/* does this peer have any pieces that we want? */
2741static bool
2742isPeerInteresting( const tr_torrent * tor, const tr_peer * peer )
2743{
2744    tr_piece_index_t i, n;
2745
2746    if ( tr_torrentIsSeed( tor ) )
2747        return false;
2748
2749    if( !tr_torrentIsPieceTransferAllowed( tor, TR_PEER_TO_CLIENT ) )
2750        return false;
2751
2752    for( i=0, n=tor->info.pieceCount; i<n; ++i )
2753        if( isPieceInteresting( tor, peer, i ) )
2754            return true;
2755
2756    return false;
2757}
2758
2759/* determines who we send "interested" messages to */
2760static void
2761rechokeDownloads( Torrent * t )
2762{
2763    int i;
2764    const time_t now = tr_time( );
2765    const int MIN_INTERESTING_PEERS = 5;
2766    const int peerCount = tr_ptrArraySize( &t->peers );
2767    int maxPeers;
2768
2769    int badCount         = 0;
2770    int goodCount        = 0;
2771    int untestedCount    = 0;
2772    tr_peer ** bad       = tr_new( tr_peer*, peerCount );
2773    tr_peer ** good      = tr_new( tr_peer*, peerCount );
2774    tr_peer ** untested  = tr_new( tr_peer*, peerCount );
2775
2776    /* decide how many peers to be interested in */
2777    {
2778        int blocks = 0;
2779        int cancels = 0;
2780        time_t timeSinceCancel;
2781
2782        /* Count up how many blocks & cancels each peer has.
2783         *
2784         * There are two situations where we send out cancels --
2785         *
2786         * 1. We've got unresponsive peers, which is handled by deciding
2787         *    -which- peers to be interested in.
2788         *
2789         * 2. We've hit our bandwidth cap, which is handled by deciding
2790         *    -how many- peers to be interested in.
2791         *
2792         * We're working on 2. here, so we need to ignore unresponsive
2793         * peers in our calculations lest they confuse Transmission into
2794         * thinking it's hit its bandwidth cap.
2795         */
2796        for( i=0; i<peerCount; ++i )
2797        {
2798            const tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2799            const int b = tr_historyGet( &peer->blocksSentToClient, now, CANCEL_HISTORY_SEC );
2800            const int c = tr_historyGet( &peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC );
2801
2802            if( b == 0 ) /* ignore unresponsive peers, as described above */
2803                continue;
2804
2805            blocks += b;
2806            cancels += c;
2807        }
2808
2809        if( cancels > 0 )
2810        {
2811            /* cancelRate: of the block requests we've recently made, the percentage we cancelled.
2812             * higher values indicate more congestion. */
2813            const double cancelRate = cancels / (double)(cancels + blocks);
2814            const double mult = 1 - MIN( cancelRate, 0.5 );
2815            maxPeers = t->interestedCount * mult;
2816            tordbg( t, "cancel rate is %.3f -- reducing the "
2817                       "number of peers we're interested in by %.0f percent",
2818                       cancelRate, mult * 100 );
2819            t->lastCancel = now;
2820        }
2821
2822        timeSinceCancel = now - t->lastCancel;
2823        if( timeSinceCancel )
2824        {
2825            const int maxIncrease = 15;
2826            const time_t maxHistory = 2 * CANCEL_HISTORY_SEC;
2827            const double mult = MIN( timeSinceCancel, maxHistory ) / (double) maxHistory;
2828            const int inc = maxIncrease * mult;
2829            maxPeers = t->maxPeers + inc;
2830            tordbg( t, "time since last cancel is %li -- increasing the "
2831                       "number of peers we're interested in by %d",
2832                       timeSinceCancel, inc );
2833        }
2834    }
2835
2836    /* don't let the previous section's number tweaking go too far... */
2837    if( maxPeers < MIN_INTERESTING_PEERS )
2838        maxPeers = MIN_INTERESTING_PEERS;
2839    if( maxPeers > t->tor->maxConnectedPeers )
2840        maxPeers = t->tor->maxConnectedPeers;
2841
2842    t->maxPeers = maxPeers;
2843
2844    /* separate the peers into "good" (ones with a low cancel-to-block ratio),
2845     * untested peers, and "bad" (ones with a high cancel-to-block ratio).
2846     * That's the order in which we'll choose who to show interest in */
2847    {
2848        /* Randomize the peer array so the peers in the three groups will be unsorted... */
2849        int n = peerCount;
2850        tr_peer ** peers = tr_memdup( tr_ptrArrayBase( &t->peers ), n * sizeof( tr_peer * ) );
2851
2852        while( n > 0 )
2853        {
2854            const int i = tr_cryptoWeakRandInt( n );
2855            tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2856
2857            if( !isPeerInteresting( t->tor, peer ) )
2858            {
2859                tr_peerMsgsSetInterested( peer->msgs, false );
2860            }
2861            else
2862            {
2863                const int blocks = tr_historyGet( &peer->blocksSentToClient, now, CANCEL_HISTORY_SEC );
2864                const int cancels = tr_historyGet( &peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC );
2865
2866                if( !blocks && !cancels )
2867                    untested[untestedCount++] = peer;
2868                else if( !cancels )
2869                    good[goodCount++] = peer;
2870                else if( !blocks )
2871                    bad[badCount++] = peer;
2872                else if( ( cancels * 10 ) < blocks )
2873                    good[goodCount++] = peer;
2874                else
2875                    bad[badCount++] = peer;
2876            }
2877
2878            tr_removeElementFromArray( peers, i, sizeof(tr_peer*), n-- );
2879        }
2880
2881        tr_free( peers );
2882    }
2883
2884    t->interestedCount = 0;
2885
2886    /* We've decided (1) how many peers to be interested in,
2887     * and (2) which peers are the best candidates,
2888     * Now it's time to update our `interest' flags. */
2889    for( i=0; i<goodCount; ++i ) {
2890        const bool b = t->interestedCount < maxPeers;
2891        tr_peerMsgsSetInterested( good[i]->msgs, b );
2892        if( b )
2893            ++t->interestedCount;
2894    }
2895    for( i=0; i<untestedCount; ++i ) {
2896        const bool b = t->interestedCount < maxPeers;
2897        tr_peerMsgsSetInterested( untested[i]->msgs, b );
2898        if( b )
2899            ++t->interestedCount;
2900    }
2901    for( i=0; i<badCount; ++i ) {
2902        const bool b = t->interestedCount < maxPeers;
2903        tr_peerMsgsSetInterested( bad[i]->msgs, b );
2904        if( b )
2905            ++t->interestedCount;
2906    }
2907
2908/*fprintf( stderr, "num interested: %d\n", t->interestedCount );*/
2909
2910    /* cleanup */
2911    tr_free( untested );
2912    tr_free( good );
2913    tr_free( bad );
2914}
2915
2916/**
2917***
2918**/
2919
2920struct ChokeData
2921{
2922    bool            isInterested;
2923    bool            wasChoked;
2924    bool            isChoked;
2925    int             rate;
2926    int             salt;
2927    tr_peer *       peer;
2928};
2929
2930static int
2931compareChoke( const void * va, const void * vb )
2932{
2933    const struct ChokeData * a = va;
2934    const struct ChokeData * b = vb;
2935
2936    if( a->rate != b->rate ) /* prefer higher overall speeds */
2937        return a->rate > b->rate ? -1 : 1;
2938
2939    if( a->wasChoked != b->wasChoked ) /* prefer unchoked */
2940        return a->wasChoked ? 1 : -1;
2941
2942    if( a->salt != b->salt ) /* random order */
2943        return a->salt - b->salt;
2944
2945    return 0;
2946}
2947
2948/* is this a new connection? */
2949static int
2950isNew( const tr_peer * peer )
2951{
2952    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
2953}
2954
2955/* get a rate for deciding which peers to choke and unchoke. */
2956static int
2957getRate( const tr_torrent * tor, struct peer_atom * atom, uint64_t now )
2958{
2959    int Bps;
2960
2961    if( tr_torrentIsSeed( tor ) )
2962        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_CLIENT_TO_PEER );
2963
2964    /* downloading a private torrent... take upload speed into account
2965     * because there may only be a small window of opportunity to share */
2966    else if( tr_torrentIsPrivate( tor ) )
2967        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_PEER_TO_CLIENT )
2968            + tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_CLIENT_TO_PEER );
2969
2970    /* downloading a public torrent */
2971    else
2972        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_PEER_TO_CLIENT );
2973
2974    /* convert it to bytes per second */
2975    return Bps;
2976}
2977
2978static inline bool
2979isBandwidthMaxedOut( const tr_bandwidth * b,
2980                     const uint64_t now_msec, tr_direction dir )
2981{
2982    if( !tr_bandwidthIsLimited( b, dir ) )
2983        return false;
2984    else {
2985        const int got = tr_bandwidthGetPieceSpeed_Bps( b, now_msec, dir );
2986        const int want = tr_bandwidthGetDesiredSpeed_Bps( b, dir );
2987        return got >= want;
2988    }
2989}
2990
2991static void
2992rechokeUploads( Torrent * t, const uint64_t now )
2993{
2994    int i, size, unchokedInterested;
2995    const int peerCount = tr_ptrArraySize( &t->peers );
2996    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
2997    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
2998    const tr_session * session = t->manager->session;
2999    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
3000    const bool isMaxedOut = isBandwidthMaxedOut( t->tor->bandwidth, now, TR_UP );
3001
3002    assert( torrentIsLocked( t ) );
3003
3004    /* an optimistic unchoke peer's "optimistic"
3005     * state lasts for N calls to rechokeUploads(). */
3006    if( t->optimisticUnchokeTimeScaler > 0 )
3007        t->optimisticUnchokeTimeScaler--;
3008    else
3009        t->optimistic = NULL;
3010
3011    /* sort the peers by preference and rate */
3012    for( i = 0, size = 0; i < peerCount; ++i )
3013    {
3014        tr_peer * peer = peers[i];
3015        struct peer_atom * atom = peer->atom;
3016
3017        if( peerIsSeed( peer ) ) /* choke seeds and partial seeds */
3018        {
3019            tr_peerMsgsSetChoke( peer->msgs, true );
3020        }
3021        else if( chokeAll ) /* choke everyone if we're not uploading */
3022        {
3023            tr_peerMsgsSetChoke( peer->msgs, true );
3024        }
3025        else if( peer != t->optimistic )
3026        {
3027            struct ChokeData * n = &choke[size++];
3028            n->peer         = peer;
3029            n->isInterested = peer->peerIsInterested;
3030            n->wasChoked    = peer->peerIsChoked;
3031            n->rate         = getRate( t->tor, atom, now );
3032            n->salt         = tr_cryptoWeakRandInt( INT_MAX );
3033            n->isChoked     = true;
3034        }
3035    }
3036
3037    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
3038
3039    /**
3040     * Reciprocation and number of uploads capping is managed by unchoking
3041     * the N peers which have the best upload rate and are interested.
3042     * This maximizes the client's download rate. These N peers are
3043     * referred to as downloaders, because they are interested in downloading
3044     * from the client.
3045     *
3046     * Peers which have a better upload rate (as compared to the downloaders)
3047     * but aren't interested get unchoked. If they become interested, the
3048     * downloader with the worst upload rate gets choked. If a client has
3049     * a complete file, it uses its upload rate rather than its download
3050     * rate to decide which peers to unchoke.
3051     *
3052     * If our bandwidth is maxed out, don't unchoke any more peers.
3053     */
3054    unchokedInterested = 0;
3055    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
3056        choke[i].isChoked = isMaxedOut ? choke[i].wasChoked : false;
3057        if( choke[i].isInterested )
3058            ++unchokedInterested;
3059    }
3060
3061    /* optimistic unchoke */
3062    if( !t->optimistic && !isMaxedOut && (i<size) )
3063    {
3064        int n;
3065        struct ChokeData * c;
3066        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
3067
3068        for( ; i<size; ++i )
3069        {
3070            if( choke[i].isInterested )
3071            {
3072                const tr_peer * peer = choke[i].peer;
3073                int x = 1, y;
3074                if( isNew( peer ) ) x *= 3;
3075                for( y=0; y<x; ++y )
3076                    tr_ptrArrayAppend( &randPool, &choke[i] );
3077            }
3078        }
3079
3080        if(( n = tr_ptrArraySize( &randPool )))
3081        {
3082            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
3083            c->isChoked = false;
3084            t->optimistic = c->peer;
3085            t->optimisticUnchokeTimeScaler = OPTIMISTIC_UNCHOKE_MULTIPLIER;
3086        }
3087
3088        tr_ptrArrayDestruct( &randPool, NULL );
3089    }
3090
3091    for( i=0; i<size; ++i )
3092        tr_peerMsgsSetChoke( choke[i].peer->msgs, choke[i].isChoked );
3093
3094    /* cleanup */
3095    tr_free( choke );
3096}
3097
3098static void
3099rechokePulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3100{
3101    tr_torrent * tor = NULL;
3102    tr_peerMgr * mgr = vmgr;
3103    const uint64_t now = tr_time_msec( );
3104
3105    managerLock( mgr );
3106
3107    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3108        if( tor->isRunning ) {
3109            rechokeUploads( tor->torrentPeers, now );
3110            if( !tr_torrentIsSeed( tor ) )
3111                rechokeDownloads( tor->torrentPeers );
3112        }
3113    }
3114
3115    tr_timerAddMsec( mgr->rechokeTimer, RECHOKE_PERIOD_MSEC );
3116    managerUnlock( mgr );
3117}
3118
3119/***
3120****
3121****  Life and Death
3122****
3123***/
3124
3125static bool
3126shouldPeerBeClosed( const Torrent    * t,
3127                    const tr_peer    * peer,
3128                    int                peerCount,
3129                    const time_t       now )
3130{
3131    const tr_torrent *       tor = t->tor;
3132    const struct peer_atom * atom = peer->atom;
3133
3134    /* if it's marked for purging, close it */
3135    if( peer->doPurge )
3136    {
3137        tordbg( t, "purging peer %s because its doPurge flag is set",
3138                tr_atomAddrStr( atom ) );
3139        return true;
3140    }
3141
3142    /* disconnect if we're both seeds and enough time has passed for PEX */
3143    if( tr_torrentIsSeed( tor ) && peerIsSeed( peer ) )
3144        return !tr_torrentAllowsPex(tor) || (now-atom->time>=30);
3145
3146    /* disconnect if it's been too long since piece data has been transferred.
3147     * this is on a sliding scale based on number of available peers... */
3148    {
3149        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
3150        /* if we have >= relaxIfFewerThan, strictness is 100%.
3151         * if we have zero connections, strictness is 0% */
3152        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
3153                               ? 1.0
3154                               : peerCount / (float)relaxStrictnessIfFewerThanN;
3155        const int lo = MIN_UPLOAD_IDLE_SECS;
3156        const int hi = MAX_UPLOAD_IDLE_SECS;
3157        const int limit = hi - ( ( hi - lo ) * strictness );
3158        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
3159/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
3160        if( idleTime > limit ) {
3161            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
3162                       tr_atomAddrStr( atom ), idleTime );
3163            return true;
3164        }
3165    }
3166
3167    return false;
3168}
3169
3170static tr_peer **
3171getPeersToClose( Torrent * t, const time_t now_sec, int * setmeSize )
3172{
3173    int i, peerCount, outsize;
3174    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
3175    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
3176
3177    assert( torrentIsLocked( t ) );
3178
3179    for( i = outsize = 0; i < peerCount; ++i )
3180        if( shouldPeerBeClosed( t, peers[i], peerCount, now_sec ) )
3181            ret[outsize++] = peers[i];
3182
3183    *setmeSize = outsize;
3184    return ret;
3185}
3186
3187static int
3188getReconnectIntervalSecs( const struct peer_atom * atom, const time_t now )
3189{
3190    int sec;
3191
3192    /* if we were recently connected to this peer and transferring piece
3193     * data, try to reconnect to them sooner rather that later -- we don't
3194     * want network troubles to get in the way of a good peer. */
3195    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
3196        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3197
3198    /* don't allow reconnects more often than our minimum */
3199    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
3200        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3201
3202    /* otherwise, the interval depends on how many times we've tried
3203     * and failed to connect to the peer */
3204    else switch( atom->numFails ) {
3205        case 0: sec = 0; break;
3206        case 1: sec = 5; break;
3207        case 2: sec = 2 * 60; break;
3208        case 3: sec = 15 * 60; break;
3209        case 4: sec = 30 * 60; break;
3210        case 5: sec = 60 * 60; break;
3211        default: sec = 120 * 60; break;
3212    }
3213
3214    /* penalize peers that were unreachable the last time we tried */
3215    if( atom->flags2 & MYFLAG_UNREACHABLE )
3216        sec += sec;
3217
3218    dbgmsg( "reconnect interval for %s is %d seconds", tr_atomAddrStr( atom ), sec );
3219    return sec;
3220}
3221
3222static void
3223removePeer( Torrent * t, tr_peer * peer )
3224{
3225    tr_peer * removed;
3226    struct peer_atom * atom = peer->atom;
3227
3228    assert( torrentIsLocked( t ) );
3229    assert( atom );
3230
3231    atom->time = tr_time( );
3232
3233    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
3234
3235    if( replicationExists( t ) )
3236        tr_decrReplicationFromBitset( t, &peer->have );
3237
3238    assert( removed == peer );
3239    peerDelete( t, removed );
3240}
3241
3242static void
3243closePeer( Torrent * t, tr_peer * peer )
3244{
3245    struct peer_atom * atom;
3246
3247    assert( t != NULL );
3248    assert( peer != NULL );
3249
3250    atom = peer->atom;
3251
3252    /* if we transferred piece data, then they might be good peers,
3253       so reset their `numFails' weight to zero. otherwise we connected
3254       to them fruitlessly, so mark it as another fail */
3255    if( atom->piece_data_time ) {
3256        tordbg( t, "resetting atom %s numFails to 0", tr_atomAddrStr(atom) );
3257        atom->numFails = 0;
3258    } else {
3259        ++atom->numFails;
3260        tordbg( t, "incremented atom %s numFails to %d", tr_atomAddrStr(atom), (int)atom->numFails );
3261    }
3262
3263    tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
3264    removePeer( t, peer );
3265}
3266
3267static void
3268removeAllPeers( Torrent * t )
3269{
3270    while( !tr_ptrArrayEmpty( &t->peers ) )
3271        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
3272}
3273
3274static void
3275closeBadPeers( Torrent * t, const time_t now_sec )
3276{
3277    int i;
3278    int peerCount;
3279    struct tr_peer ** peers = getPeersToClose( t, now_sec, &peerCount );
3280    for( i=0; i<peerCount; ++i )
3281        closePeer( t, peers[i] );
3282    tr_free( peers );
3283}
3284
3285struct peer_liveliness
3286{
3287    tr_peer * peer;
3288    void * clientData;
3289    time_t pieceDataTime;
3290    time_t time;
3291    int speed;
3292    bool doPurge;
3293};
3294
3295static int
3296comparePeerLiveliness( const void * va, const void * vb )
3297{
3298    const struct peer_liveliness * a = va;
3299    const struct peer_liveliness * b = vb;
3300
3301    if( a->doPurge != b->doPurge )
3302        return a->doPurge ? 1 : -1;
3303
3304    if( a->speed != b->speed ) /* faster goes first */
3305        return a->speed > b->speed ? -1 : 1;
3306
3307    /* the one to give us data more recently goes first */
3308    if( a->pieceDataTime != b->pieceDataTime )
3309        return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
3310
3311    /* the one we connected to most recently goes first */
3312    if( a->time != b->time )
3313        return a->time > b->time ? -1 : 1;
3314
3315    return 0;
3316}
3317
3318static void
3319sortPeersByLivelinessImpl( tr_peer  ** peers,
3320                           void     ** clientData,
3321                           int         n,
3322                           uint64_t    now,
3323                           int (*compare) ( const void *va, const void *vb ) )
3324{
3325    int i;
3326    struct peer_liveliness *lives, *l;
3327
3328    /* build a sortable array of peer + extra info */
3329    lives = l = tr_new0( struct peer_liveliness, n );
3330    for( i=0; i<n; ++i, ++l )
3331    {
3332        tr_peer * p = peers[i];
3333        l->peer = p;
3334        l->doPurge = p->doPurge;
3335        l->pieceDataTime = p->atom->piece_data_time;
3336        l->time = p->atom->time;
3337        l->speed = tr_peerGetPieceSpeed_Bps( p, now, TR_UP )
3338                 + tr_peerGetPieceSpeed_Bps( p, now, TR_DOWN );
3339        if( clientData )
3340            l->clientData = clientData[i];
3341    }
3342
3343    /* sort 'em */
3344    assert( n == ( l - lives ) );
3345    qsort( lives, n, sizeof( struct peer_liveliness ), compare );
3346
3347    /* build the peer array */
3348    for( i=0, l=lives; i<n; ++i, ++l ) {
3349        peers[i] = l->peer;
3350        if( clientData )
3351            clientData[i] = l->clientData;
3352    }
3353    assert( n == ( l - lives ) );
3354
3355    /* cleanup */
3356    tr_free( lives );
3357}
3358
3359static void
3360sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now )
3361{
3362    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLiveliness );
3363}
3364
3365
3366static void
3367enforceTorrentPeerLimit( Torrent * t, uint64_t now )
3368{
3369    int n = tr_ptrArraySize( &t->peers );
3370    const int max = tr_torrentGetPeerLimit( t->tor );
3371    if( n > max )
3372    {
3373        void * base = tr_ptrArrayBase( &t->peers );
3374        tr_peer ** peers = tr_memdup( base, n*sizeof( tr_peer* ) );
3375        sortPeersByLiveliness( peers, NULL, n, now );
3376        while( n > max )
3377            closePeer( t, peers[--n] );
3378        tr_free( peers );
3379    }
3380}
3381
3382static void
3383enforceSessionPeerLimit( tr_session * session, uint64_t now )
3384{
3385    int n = 0;
3386    tr_torrent * tor = NULL;
3387    const int max = tr_sessionGetPeerLimit( session );
3388
3389    /* count the total number of peers */
3390    while(( tor = tr_torrentNext( session, tor )))
3391        n += tr_ptrArraySize( &tor->torrentPeers->peers );
3392
3393    /* if there are too many, prune out the worst */
3394    if( n > max )
3395    {
3396        tr_peer ** peers = tr_new( tr_peer*, n );
3397        Torrent ** torrents = tr_new( Torrent*, n );
3398
3399        /* populate the peer array */
3400        n = 0;
3401        tor = NULL;
3402        while(( tor = tr_torrentNext( session, tor ))) {
3403            int i;
3404            Torrent * t = tor->torrentPeers;
3405            const int tn = tr_ptrArraySize( &t->peers );
3406            for( i=0; i<tn; ++i, ++n ) {
3407                peers[n] = tr_ptrArrayNth( &t->peers, i );
3408                torrents[n] = t;
3409            }
3410        }
3411
3412        /* sort 'em */
3413        sortPeersByLiveliness( peers, (void**)torrents, n, now );
3414
3415        /* cull out the crappiest */
3416        while( n-- > max )
3417            closePeer( torrents[n], peers[n] );
3418
3419        /* cleanup */
3420        tr_free( torrents );
3421        tr_free( peers );
3422    }
3423}
3424
3425static void makeNewPeerConnections( tr_peerMgr * mgr, const int max );
3426
3427static void
3428reconnectPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3429{
3430    tr_torrent * tor;
3431    tr_peerMgr * mgr = vmgr;
3432    const time_t now_sec = tr_time( );
3433    const uint64_t now_msec = tr_time_msec( );
3434
3435    /**
3436    ***  enforce the per-session and per-torrent peer limits
3437    **/
3438
3439    /* if we're over the per-torrent peer limits, cull some peers */
3440    tor = NULL;
3441    while(( tor = tr_torrentNext( mgr->session, tor )))
3442        if( tor->isRunning )
3443            enforceTorrentPeerLimit( tor->torrentPeers, now_msec );
3444
3445    /* if we're over the per-session peer limits, cull some peers */
3446    enforceSessionPeerLimit( mgr->session, now_msec );
3447
3448    /* remove crappy peers */
3449    tor = NULL;
3450    while(( tor = tr_torrentNext( mgr->session, tor )))
3451        if( !tor->torrentPeers->isRunning )
3452            removeAllPeers( tor->torrentPeers );
3453        else
3454            closeBadPeers( tor->torrentPeers, now_sec );
3455
3456    /* try to make new peer connections */
3457    makeNewPeerConnections( mgr, MAX_CONNECTIONS_PER_PULSE );
3458}
3459
3460/****
3461*****
3462*****  BANDWIDTH ALLOCATION
3463*****
3464****/
3465
3466static void
3467pumpAllPeers( tr_peerMgr * mgr )
3468{
3469    tr_torrent * tor = NULL;
3470
3471    while(( tor = tr_torrentNext( mgr->session, tor )))
3472    {
3473        int j;
3474        Torrent * t = tor->torrentPeers;
3475
3476        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
3477        {
3478            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
3479            tr_peerMsgsPulse( peer->msgs );
3480        }
3481    }
3482}
3483
3484static void
3485bandwidthPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3486{
3487    tr_torrent * tor;
3488    tr_peerMgr * mgr = vmgr;
3489    managerLock( mgr );
3490
3491    /* FIXME: this next line probably isn't necessary... */
3492    pumpAllPeers( mgr );
3493
3494    /* allocate bandwidth to the peers */
3495    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
3496    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
3497
3498    /* possibly stop torrents that have seeded enough */
3499    tor = NULL;
3500    while(( tor = tr_torrentNext( mgr->session, tor )))
3501        tr_torrentCheckSeedLimit( tor );
3502
3503    /* run the completeness check for any torrents that need it */
3504    tor = NULL;
3505    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3506        if( tor->torrentPeers->needsCompletenessCheck ) {
3507            tor->torrentPeers->needsCompletenessCheck  = false;
3508            tr_torrentRecheckCompleteness( tor );
3509        }
3510    }
3511
3512    /* stop torrents that are ready to stop, but couldn't be stopped earlier
3513     * during the peer-io callback call chain */
3514    tor = NULL;
3515    while(( tor = tr_torrentNext( mgr->session, tor )))
3516        if( tor->isStopping )
3517            tr_torrentStop( tor );
3518
3519    reconnectPulse( 0, 0, mgr );
3520
3521    tr_timerAddMsec( mgr->bandwidthTimer, BANDWIDTH_PERIOD_MSEC );
3522    managerUnlock( mgr );
3523}
3524
3525/***
3526****
3527***/
3528
3529static int
3530compareAtomPtrsByAddress( const void * va, const void *vb )
3531{
3532    const struct peer_atom * a = * (const struct peer_atom**) va;
3533    const struct peer_atom * b = * (const struct peer_atom**) vb;
3534
3535    assert( tr_isAtom( a ) );
3536    assert( tr_isAtom( b ) );
3537
3538    return tr_compareAddresses( &a->addr, &b->addr );
3539}
3540
3541/* best come first, worst go last */
3542static int
3543compareAtomPtrsByShelfDate( const void * va, const void *vb )
3544{
3545    time_t atime;
3546    time_t btime;
3547    const struct peer_atom * a = * (const struct peer_atom**) va;
3548    const struct peer_atom * b = * (const struct peer_atom**) vb;
3549    const int data_time_cutoff_secs = 60 * 60;
3550    const time_t tr_now = tr_time( );
3551
3552    assert( tr_isAtom( a ) );
3553    assert( tr_isAtom( b ) );
3554
3555    /* primary key: the last piece data time *if* it was within the last hour */
3556    atime = a->piece_data_time; if( atime + data_time_cutoff_secs < tr_now ) atime = 0;
3557    btime = b->piece_data_time; if( btime + data_time_cutoff_secs < tr_now ) btime = 0;
3558    if( atime != btime )
3559        return atime > btime ? -1 : 1;
3560
3561    /* secondary key: shelf date. */
3562    if( a->shelf_date != b->shelf_date )
3563        return a->shelf_date > b->shelf_date ? -1 : 1;
3564
3565    return 0;
3566}
3567
3568static int
3569getMaxAtomCount( const tr_torrent * tor )
3570{
3571    const int n = tor->maxConnectedPeers;
3572    /* approximate fit of the old jump discontinuous function */
3573    if( n >= 55 ) return     n + 150;
3574    if( n >= 20 ) return 2 * n + 95;
3575    return               4 * n + 55;
3576}
3577
3578static void
3579atomPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3580{
3581    tr_torrent * tor = NULL;
3582    tr_peerMgr * mgr = vmgr;
3583    managerLock( mgr );
3584
3585    while(( tor = tr_torrentNext( mgr->session, tor )))
3586    {
3587        int atomCount;
3588        Torrent * t = tor->torrentPeers;
3589        const int maxAtomCount = getMaxAtomCount( tor );
3590        struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
3591
3592        if( atomCount > maxAtomCount ) /* we've got too many atoms... time to prune */
3593        {
3594            int i;
3595            int keepCount = 0;
3596            int testCount = 0;
3597            struct peer_atom ** keep = tr_new( struct peer_atom*, atomCount );
3598            struct peer_atom ** test = tr_new( struct peer_atom*, atomCount );
3599
3600            /* keep the ones that are in use */
3601            for( i=0; i<atomCount; ++i ) {
3602                struct peer_atom * atom = atoms[i];
3603                if( peerIsInUse( t, atom ) )
3604                    keep[keepCount++] = atom;
3605                else
3606                    test[testCount++] = atom;
3607            }
3608
3609            /* if there's room, keep the best of what's left */
3610            i = 0;
3611            if( keepCount < maxAtomCount ) {
3612                qsort( test, testCount, sizeof( struct peer_atom * ), compareAtomPtrsByShelfDate );
3613                while( i<testCount && keepCount<maxAtomCount )
3614                    keep[keepCount++] = test[i++];
3615            }
3616
3617            /* free the culled atoms */
3618            while( i<testCount )
3619                tr_free( test[i++] );
3620
3621            /* rebuild Torrent.pool with what's left */
3622            tr_ptrArrayDestruct( &t->pool, NULL );
3623            t->pool = TR_PTR_ARRAY_INIT;
3624            qsort( keep, keepCount, sizeof( struct peer_atom * ), compareAtomPtrsByAddress );
3625            for( i=0; i<keepCount; ++i )
3626                tr_ptrArrayAppend( &t->pool, keep[i] );
3627
3628            tordbg( t, "max atom count is %d... pruned from %d to %d\n", maxAtomCount, atomCount, keepCount );
3629
3630            /* cleanup */
3631            tr_free( test );
3632            tr_free( keep );
3633        }
3634    }
3635
3636    tr_timerAddMsec( mgr->atomTimer, ATOM_PERIOD_MSEC );
3637    managerUnlock( mgr );
3638}
3639
3640/***
3641****
3642****
3643****
3644***/
3645
3646/* is this atom someone that we'd want to initiate a connection to? */
3647static bool
3648isPeerCandidate( const tr_torrent * tor, struct peer_atom * atom, const time_t now )
3649{
3650    /* not if we're both seeds */
3651    if( tr_torrentIsSeed( tor ) && atomIsSeed( atom ) )
3652        return false;
3653
3654    /* not if we've already got a connection to them... */
3655    if( peerIsInUse( tor->torrentPeers, atom ) )
3656        return false;
3657
3658    /* not if we just tried them already */
3659    if( ( now - atom->time ) < getReconnectIntervalSecs( atom, now ) )
3660        return false;
3661
3662    /* not if they're blocklisted */
3663    if( isAtomBlocklisted( tor->session, atom ) )
3664        return false;
3665
3666    /* not if they're banned... */
3667    if( atom->flags2 & MYFLAG_BANNED )
3668        return false;
3669
3670    return true;
3671}
3672
3673struct peer_candidate
3674{
3675    uint64_t score;
3676    tr_torrent * tor;
3677    struct peer_atom * atom;
3678};
3679
3680static bool
3681torrentWasRecentlyStarted( const tr_torrent * tor )
3682{
3683    return difftime( tr_time( ), tor->startDate ) < 120;
3684}
3685
3686static inline uint64_t
3687addValToKey( uint64_t value, int width, uint64_t addme )
3688{
3689    value = (value << (uint64_t)width);
3690    value |= addme;
3691    return value;
3692}
3693
3694/* smaller value is better */
3695static uint64_t
3696getPeerCandidateScore( const tr_torrent * tor, const struct peer_atom * atom, uint8_t salt  )
3697{
3698    uint64_t i;
3699    uint64_t score = 0;
3700    const bool failed = atom->lastConnectionAt < atom->lastConnectionAttemptAt;
3701
3702    /* prefer peers we've connected to, or never tried, over peers we failed to connect to. */
3703    i = failed ? 1 : 0;
3704    score = addValToKey( score, 1, i );
3705
3706    /* prefer the one we attempted least recently (to cycle through all peers) */
3707    i = atom->lastConnectionAttemptAt;
3708    score = addValToKey( score, 32, i );
3709
3710    /* prefer peers belonging to a torrent of a higher priority */
3711    switch( tr_torrentGetPriority( tor ) ) {
3712        case TR_PRI_HIGH:    i = 0; break;
3713        case TR_PRI_NORMAL:  i = 1; break;
3714        case TR_PRI_LOW:     i = 2; break;
3715    }
3716    score = addValToKey( score, 4, i );
3717
3718    /* prefer recently-started torrents */
3719    i = torrentWasRecentlyStarted( tor ) ? 0 : 1;
3720    score = addValToKey( score, 1, i );
3721
3722    /* prefer torrents we're downloading with */
3723    i = tr_torrentIsSeed( tor ) ? 1 : 0;
3724    score = addValToKey( score, 1, i );
3725
3726    /* prefer peers that are known to be connectible */
3727    i = ( atom->flags & ADDED_F_CONNECTABLE ) ? 0 : 1;
3728    score = addValToKey( score, 1, i );
3729
3730    /* prefer peers that we might have a chance of uploading to...
3731       so lower seed probability is better */
3732    if( atom->seedProbability == 100 ) i = 101;
3733    else if( atom->seedProbability == -1 ) i = 100;
3734    else i = atom->seedProbability;
3735    score = addValToKey( score, 8, i );
3736
3737    /* Prefer peers that we got from more trusted sources.
3738     * lower `fromBest' values indicate more trusted sources */
3739    score = addValToKey( score, 4, atom->fromBest );
3740
3741    /* salt */
3742    score = addValToKey( score, 8, salt );
3743
3744    return score;
3745}
3746
3747/* sort an array of peer candidates */
3748static int
3749comparePeerCandidates( const void * va, const void * vb )
3750{
3751    const struct peer_candidate * a = va;
3752    const struct peer_candidate * b = vb;
3753
3754    if( a->score < b->score ) return -1;
3755    if( a->score > b->score ) return 1;
3756
3757    return 0;
3758}
3759
3760/** @return an array of all the atoms we might want to connect to */
3761static struct peer_candidate*
3762getPeerCandidates( tr_session * session, int * candidateCount )
3763{
3764    int n;
3765    tr_torrent * tor;
3766    struct peer_candidate * candidates;
3767    struct peer_candidate * walk;
3768    const time_t now = tr_time( );
3769    const uint64_t now_msec = tr_time_msec( );
3770    /* leave 5% of connection slots for incoming connections -- ticket #2609 */
3771    const int maxCandidates = tr_sessionGetPeerLimit( session ) * 0.95;
3772
3773    /* don't start any new handshakes if we're full up */
3774    n = 0;
3775    tor= NULL;
3776    while(( tor = tr_torrentNext( session, tor )))
3777        n += tr_ptrArraySize( &tor->torrentPeers->peers );
3778    if( maxCandidates <= n ) {
3779        *candidateCount = 0;
3780        return NULL;
3781    }
3782
3783    /* allocate an array of candidates */
3784    n = 0;
3785    tor= NULL;
3786    while(( tor = tr_torrentNext( session, tor )))
3787        n += tr_ptrArraySize( &tor->torrentPeers->pool );
3788    walk = candidates = tr_new( struct peer_candidate, n );
3789
3790    /* populate the candidate array */
3791    tor = NULL;
3792    while(( tor = tr_torrentNext( session, tor )))
3793    {
3794        int i, nAtoms;
3795        struct peer_atom ** atoms;
3796
3797        if( !tor->torrentPeers->isRunning )
3798            continue;
3799
3800        /* if we've already got enough peers in this torrent... */
3801        if( tr_torrentGetPeerLimit( tor ) <= tr_ptrArraySize( &tor->torrentPeers->peers ) )
3802            continue;
3803
3804        /* if we've already got enough speed in this torrent... */
3805        if( tr_torrentIsSeed( tor ) && isBandwidthMaxedOut( tor->bandwidth, now_msec, TR_UP ) )
3806            continue;
3807
3808        atoms = (struct peer_atom**) tr_ptrArrayPeek( &tor->torrentPeers->pool, &nAtoms );
3809        for( i=0; i<nAtoms; ++i )
3810        {
3811            struct peer_atom * atom = atoms[i];
3812
3813            if( isPeerCandidate( tor, atom, now ) )
3814            {
3815                const uint8_t salt = tr_cryptoWeakRandInt( 1024 );
3816                walk->tor = tor;
3817                walk->atom = atom;
3818                walk->score = getPeerCandidateScore( tor, atom, salt );
3819                ++walk;
3820            }
3821        }
3822    }
3823
3824    *candidateCount = walk - candidates;
3825    if( *candidateCount > 1 )
3826        qsort( candidates, *candidateCount, sizeof( struct peer_candidate ), comparePeerCandidates );
3827    return candidates;
3828}
3829
3830static void
3831initiateConnection( tr_peerMgr * mgr, Torrent * t, struct peer_atom * atom )
3832{
3833    tr_peerIo * io;
3834    const time_t now = tr_time( );
3835    bool utp = tr_sessionIsUTPEnabled(mgr->session) && !atom->utp_failed;
3836
3837    if( atom->fromFirst == TR_PEER_FROM_PEX )
3838        /* PEX has explicit signalling for uTP support.  If an atom
3839           originally came from PEX and doesn't have the uTP flag, skip the
3840           uTP connection attempt.  Are we being optimistic here? */
3841        utp = utp && (atom->flags & ADDED_F_UTP_FLAGS);
3842
3843    tordbg( t, "Starting an OUTGOING%s connection with %s",
3844            utp ? " µTP" : "",
3845            tr_atomAddrStr( atom ) );
3846
3847    io = tr_peerIoNewOutgoing( mgr->session,
3848                               mgr->session->bandwidth,
3849                               &atom->addr,
3850                               atom->port,
3851                               t->tor->info.hash,
3852                               t->tor->completeness == TR_SEED,
3853                               utp );
3854
3855    if( io == NULL )
3856    {
3857        tordbg( t, "peerIo not created; marking peer %s as unreachable",
3858                tr_atomAddrStr( atom ) );
3859        atom->flags2 |= MYFLAG_UNREACHABLE;
3860        atom->numFails++;
3861    }
3862    else
3863    {
3864        tr_handshake * handshake = tr_handshakeNew( io,
3865                                                    mgr->session->encryptionMode,
3866                                                    myHandshakeDoneCB,
3867                                                    mgr );
3868
3869        assert( tr_peerIoGetTorrentHash( io ) );
3870
3871        tr_peerIoUnref( io ); /* balanced by the initial ref
3872                                 in tr_peerIoNewOutgoing() */
3873
3874        tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
3875                                 handshakeCompare );
3876    }
3877
3878    atom->lastConnectionAttemptAt = now;
3879    atom->time = now;
3880}
3881
3882static void
3883initiateCandidateConnection( tr_peerMgr * mgr, struct peer_candidate * c )
3884{
3885#if 0
3886    fprintf( stderr, "Starting an OUTGOING connection with %s - [%s] seedProbability==%d; %s, %s\n",
3887             tr_atomAddrStr( c->atom ),
3888             tr_torrentName( c->tor ),
3889             (int)c->atom->seedProbability,
3890             tr_torrentIsPrivate( c->tor ) ? "private" : "public",
3891             tr_torrentIsSeed( c->tor ) ? "seed" : "downloader" );
3892#endif
3893
3894    initiateConnection( mgr, c->tor->torrentPeers, c->atom );
3895}
3896
3897static void
3898makeNewPeerConnections( struct tr_peerMgr * mgr, const int max )
3899{
3900    int i, n;
3901    struct peer_candidate * candidates;
3902
3903    candidates = getPeerCandidates( mgr->session, &n );
3904
3905    for( i=0; i<n && i<max; ++i )
3906        initiateCandidateConnection( mgr, &candidates[i] );
3907
3908    tr_free( candidates );
3909}
Note: See TracBrowser for help on using the repository browser.