source: trunk/libtransmission/peer-mgr.c @ 13652

Last change on this file since 13652 was 13652, checked in by jordan, 9 years ago

remove a couple of unnecessary torrentIsLocked() assertions for #5168

  • Property svn:keywords set to Date Rev Author Id
File size: 117.7 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2 (b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 13652 2012-12-12 20:57:13Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h> /* error codes ERANGE, ... */
15#include <limits.h> /* INT_MAX */
16#include <string.h> /* memcpy, memcmp, strstr */
17#include <stdlib.h> /* qsort */
18
19#include <event2/event.h>
20
21#include <libutp/utp.h>
22
23#include "transmission.h"
24#include "announcer.h"
25#include "bandwidth.h"
26#include "blocklist.h"
27#include "cache.h"
28#include "clients.h"
29#include "completion.h"
30#include "crypto.h"
31#include "handshake.h"
32#include "net.h"
33#include "peer-io.h"
34#include "peer-mgr.h"
35#include "peer-msgs.h"
36#include "ptrarray.h"
37#include "session.h"
38#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
39#include "torrent.h"
40#include "tr-utp.h"
41#include "utils.h"
42#include "webseed.h"
43
44enum
45{
46    /* how frequently to cull old atoms */
47    ATOM_PERIOD_MSEC = (60 * 1000),
48
49    /* how frequently to change which peers are choked */
50    RECHOKE_PERIOD_MSEC = (10 * 1000),
51
52    /* an optimistically unchoked peer is immune from rechoking
53       for this many calls to rechokeUploads (). */
54    OPTIMISTIC_UNCHOKE_MULTIPLIER = 4,
55
56    /* how frequently to reallocate bandwidth */
57    BANDWIDTH_PERIOD_MSEC = 500,
58
59    /* how frequently to age out old piece request lists */
60    REFILL_UPKEEP_PERIOD_MSEC = (10 * 1000),
61
62    /* how frequently to decide which peers live and die */
63    RECONNECT_PERIOD_MSEC = 500,
64
65    /* when many peers are available, keep idle ones this long */
66    MIN_UPLOAD_IDLE_SECS = (60),
67
68    /* when few peers are available, keep idle ones this long */
69    MAX_UPLOAD_IDLE_SECS = (60 * 5),
70
71    /* max number of peers to ask for per second overall.
72     * this throttle is to avoid overloading the router */
73    MAX_CONNECTIONS_PER_SECOND = 12,
74
75    MAX_CONNECTIONS_PER_PULSE = (int)(MAX_CONNECTIONS_PER_SECOND * (RECONNECT_PERIOD_MSEC/1000.0)),
76
77    /* number of bad pieces a peer is allowed to send before we ban them */
78    MAX_BAD_PIECES_PER_PEER = 5,
79
80    /* amount of time to keep a list of request pieces lying around
81       before it's considered too old and needs to be rebuilt */
82    PIECE_LIST_SHELF_LIFE_SECS = 60,
83
84    /* use for bitwise operations w/peer_atom.flags2 */
85    MYFLAG_BANNED = 1,
86
87    /* use for bitwise operations w/peer_atom.flags2 */
88    /* unreachable for now... but not banned.
89     * if they try to connect to us it's okay */
90    MYFLAG_UNREACHABLE = 2,
91
92    /* the minimum we'll wait before attempting to reconnect to a peer */
93    MINIMUM_RECONNECT_INTERVAL_SECS = 5,
94
95    /** how long we'll let requests we've made linger before we cancel them */
96    REQUEST_TTL_SECS = 120,
97
98    NO_BLOCKS_CANCEL_HISTORY = 120,
99
100    CANCEL_HISTORY_SEC = 60
101};
102
103const tr_peer_event TR_PEER_EVENT_INIT = { 0, 0, NULL, 0, 0, 0, false, 0 };
104
105/**
106***
107**/
108
109/**
110 * Peer information that should be kept even before we've connected and
111 * after we've disconnected. These are kept in a pool of peer_atoms to decide
112 * which ones would make good candidates for connecting to, and to watch out
113 * for banned peers.
114 *
115 * @see tr_peer
116 * @see tr_peermsgs
117 */
118struct peer_atom
119{
120    uint8_t     fromFirst;          /* where the peer was first found */
121    uint8_t     fromBest;           /* the "best" value of where the peer has been found */
122    uint8_t     flags;              /* these match the added_f flags */
123    uint8_t     flags2;             /* flags that aren't defined in added_f */
124    int8_t      seedProbability;    /* how likely is this to be a seed... [0..100] or -1 for unknown */
125    int8_t      blocklisted;        /* -1 for unknown, true for blocklisted, false for not blocklisted */
126
127    tr_port     port;
128    bool        utp_failed;         /* We recently failed to connect over uTP */
129    uint16_t    numFails;
130    time_t      time;               /* when the peer's connection status last changed */
131    time_t      piece_data_time;
132
133    time_t      lastConnectionAttemptAt;
134    time_t      lastConnectionAt;
135
136    /* similar to a TTL field, but less rigid --
137     * if the swarm is small, the atom will be kept past this date. */
138    time_t      shelf_date;
139    tr_peer   * peer;               /* will be NULL if not connected */
140    tr_address  addr;
141};
142
143#ifdef NDEBUG
144#define tr_isAtom(a) (TRUE)
145#else
146static bool
147tr_isAtom (const struct peer_atom * atom)
148{
149    return (atom != NULL)
150        && (atom->fromFirst < TR_PEER_FROM__MAX)
151        && (atom->fromBest < TR_PEER_FROM__MAX)
152        && (tr_address_is_valid (&atom->addr));
153}
154#endif
155
156static const char*
157tr_atomAddrStr (const struct peer_atom * atom)
158{
159    return atom ? tr_peerIoAddrStr (&atom->addr, atom->port) : "[no atom]";
160}
161
162struct block_request
163{
164    tr_block_index_t block;
165    tr_peer * peer;
166    time_t sentAt;
167};
168
169struct weighted_piece
170{
171    tr_piece_index_t index;
172    int16_t salt;
173    int16_t requestCount;
174};
175
176enum piece_sort_state
177{
178    PIECES_UNSORTED,
179    PIECES_SORTED_BY_INDEX,
180    PIECES_SORTED_BY_WEIGHT
181};
182
183/** @brief Opaque, per-torrent data structure for peer connection information */
184typedef struct tr_torrent_peers
185{
186    tr_ptrArray                outgoingHandshakes; /* tr_handshake */
187    tr_ptrArray                pool; /* struct peer_atom */
188    tr_ptrArray                peers; /* tr_peer */
189    tr_ptrArray                webseeds; /* tr_webseed */
190
191    tr_torrent               * tor;
192    struct tr_peerMgr        * manager;
193
194    tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
195    int                        optimisticUnchokeTimeScaler;
196
197    bool                       isRunning;
198    bool                       needsCompletenessCheck;
199
200    struct block_request     * requests;
201    int                        requestCount;
202    int                        requestAlloc;
203
204    struct weighted_piece    * pieces;
205    int                        pieceCount;
206    enum piece_sort_state      pieceSortState;
207
208    /* An array of pieceCount items stating how many peers have each piece.
209       This is used to help us for downloading pieces "rarest first."
210       This may be NULL if we don't have metainfo yet, or if we're not
211       downloading and don't care about rarity */
212    uint16_t                 * pieceReplication;
213    size_t                     pieceReplicationSize;
214
215    int                        interestedCount;
216    int                        maxPeers;
217    time_t                     lastCancel;
218
219    /* Before the endgame this should be 0. In endgame, is contains the average
220     * number of pending requests per peer. Only peers which have more pending
221     * requests are considered 'fast' are allowed to request a block that's
222     * already been requested from another (slower?) peer. */
223    int                        endgame;
224}
225Torrent;
226
227struct tr_peerMgr
228{
229    tr_session    * session;
230    tr_ptrArray     incomingHandshakes; /* tr_handshake */
231    struct event  * bandwidthTimer;
232    struct event  * rechokeTimer;
233    struct event  * refillUpkeepTimer;
234    struct event  * atomTimer;
235};
236
237#define tordbg(t, ...) \
238  do \
239    { \
240      if (tr_deepLoggingIsActive ()) \
241        tr_deepLog (__FILE__, __LINE__, tr_torrentName (t->tor), __VA_ARGS__); \
242    } \
243  while (0)
244
245#define dbgmsg(...) \
246  do \
247    { \
248      if (tr_deepLoggingIsActive ()) \
249        tr_deepLog (__FILE__, __LINE__, NULL, __VA_ARGS__); \
250    } \
251  while (0)
252
253/**
254***
255**/
256
257static inline void
258managerLock (const struct tr_peerMgr * manager)
259{
260    tr_sessionLock (manager->session);
261}
262
263static inline void
264managerUnlock (const struct tr_peerMgr * manager)
265{
266    tr_sessionUnlock (manager->session);
267}
268
269static inline void
270torrentLock (Torrent * torrent)
271{
272    managerLock (torrent->manager);
273}
274
275static inline void
276torrentUnlock (Torrent * torrent)
277{
278    managerUnlock (torrent->manager);
279}
280
281static inline int
282torrentIsLocked (const Torrent * t)
283{
284    return tr_sessionIsLocked (t->manager->session);
285}
286
287/**
288***
289**/
290
291static int
292handshakeCompareToAddr (const void * va, const void * vb)
293{
294    const tr_handshake * a = va;
295
296    return tr_address_compare (tr_handshakeGetAddr (a, NULL), vb);
297}
298
299static int
300handshakeCompare (const void * a, const void * b)
301{
302    return handshakeCompareToAddr (a, tr_handshakeGetAddr (b, NULL));
303}
304
305static inline tr_handshake*
306getExistingHandshake (tr_ptrArray * handshakes, const tr_address * addr)
307{
308    if (tr_ptrArrayEmpty (handshakes))
309        return NULL;
310
311    return tr_ptrArrayFindSorted (handshakes, addr, handshakeCompareToAddr);
312}
313
314static int
315comparePeerAtomToAddress (const void * va, const void * vb)
316{
317    const struct peer_atom * a = va;
318
319    return tr_address_compare (&a->addr, vb);
320}
321
322static int
323compareAtomsByAddress (const void * va, const void * vb)
324{
325    const struct peer_atom * b = vb;
326
327    assert (tr_isAtom (b));
328
329    return comparePeerAtomToAddress (va, &b->addr);
330}
331
332/**
333***
334**/
335
336const tr_address *
337tr_peerAddress (const tr_peer * peer)
338{
339    return &peer->atom->addr;
340}
341
342static Torrent*
343getExistingTorrent (tr_peerMgr *    manager,
344                    const uint8_t * hash)
345{
346    tr_torrent * tor = tr_torrentFindFromHash (manager->session, hash);
347
348    return tor == NULL ? NULL : tor->torrentPeers;
349}
350
351static int
352peerCompare (const void * a, const void * b)
353{
354    return tr_address_compare (tr_peerAddress (a), tr_peerAddress (b));
355}
356
357static struct peer_atom*
358getExistingAtom (const Torrent    * t,
359                 const tr_address * addr)
360{
361    Torrent * tt = (Torrent*)t;
362    return tr_ptrArrayFindSorted (&tt->pool, addr, comparePeerAtomToAddress);
363}
364
365static bool
366peerIsInUse (const Torrent * ct, const struct peer_atom * atom)
367{
368    Torrent * t = (Torrent*) ct;
369
370    assert (torrentIsLocked (t));
371
372    return (atom->peer != NULL)
373        || getExistingHandshake (&t->outgoingHandshakes, &atom->addr)
374        || getExistingHandshake (&t->manager->incomingHandshakes, &atom->addr);
375}
376
377void
378tr_peerConstruct (tr_peer * peer)
379{
380    memset (peer, 0, sizeof (tr_peer));
381
382    peer->have = TR_BITFIELD_INIT;
383}
384
385static tr_peer*
386peerNew (struct peer_atom * atom)
387{
388    tr_peer * peer = tr_new (tr_peer, 1);
389    tr_peerConstruct (peer);
390
391    peer->atom = atom;
392    atom->peer = peer;
393
394    return peer;
395}
396
397static tr_peer*
398getPeer (Torrent * torrent, struct peer_atom * atom)
399{
400    tr_peer * peer;
401
402    assert (torrentIsLocked (torrent));
403
404    peer = atom->peer;
405
406    if (peer == NULL)
407    {
408        peer = peerNew (atom);
409        tr_bitfieldConstruct (&peer->have, torrent->tor->info.pieceCount);
410        tr_bitfieldConstruct (&peer->blame, torrent->tor->blockCount);
411        tr_ptrArrayInsertSorted (&torrent->peers, peer, peerCompare);
412    }
413
414    return peer;
415}
416
417static void peerDeclinedAllRequests (Torrent *, const tr_peer *);
418
419void
420tr_peerDestruct (tr_torrent * tor, tr_peer * peer)
421{
422    assert (peer != NULL);
423
424    peerDeclinedAllRequests (tor->torrentPeers, peer);
425
426    if (peer->msgs != NULL)
427        tr_peerMsgsFree (peer->msgs);
428
429    if (peer->io) {
430        tr_peerIoClear (peer->io);
431        tr_peerIoUnref (peer->io); /* balanced by the ref in handshakeDoneCB () */
432    }
433
434    tr_bitfieldDestruct (&peer->have);
435    tr_bitfieldDestruct (&peer->blame);
436    tr_free (peer->client);
437
438    if (peer->atom)
439        peer->atom->peer = NULL;
440}
441
442static void
443peerDelete (Torrent * t, tr_peer * peer)
444{
445    tr_peerDestruct (t->tor, peer);
446    tr_free (peer);
447}
448
449static bool
450replicationExists (const Torrent * t)
451{
452    return t->pieceReplication != NULL;
453}
454
455static void
456replicationFree (Torrent * t)
457{
458    tr_free (t->pieceReplication);
459    t->pieceReplication = NULL;
460    t->pieceReplicationSize = 0;
461}
462
463static void
464replicationNew (Torrent * t)
465{
466    tr_piece_index_t piece_i;
467    const tr_piece_index_t piece_count = t->tor->info.pieceCount;
468    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase (&t->peers);
469    const int peer_count = tr_ptrArraySize (&t->peers);
470
471    assert (!replicationExists (t));
472
473    t->pieceReplicationSize = piece_count;
474    t->pieceReplication = tr_new0 (uint16_t, piece_count);
475
476    for (piece_i=0; piece_i<piece_count; ++piece_i)
477    {
478        int peer_i;
479        uint16_t r = 0;
480
481        for (peer_i=0; peer_i<peer_count; ++peer_i)
482            if (tr_bitfieldHas (&peers[peer_i]->have, piece_i))
483                ++r;
484
485        t->pieceReplication[piece_i] = r;
486    }
487}
488
489static void
490torrentFree (void * vt)
491{
492    Torrent * t = vt;
493
494    assert (t);
495    assert (!t->isRunning);
496    assert (torrentIsLocked (t));
497    assert (tr_ptrArrayEmpty (&t->outgoingHandshakes));
498    assert (tr_ptrArrayEmpty (&t->peers));
499
500    tr_ptrArrayDestruct (&t->webseeds, (PtrArrayForeachFunc)tr_webseedFree);
501    tr_ptrArrayDestruct (&t->pool, (PtrArrayForeachFunc)tr_free);
502    tr_ptrArrayDestruct (&t->outgoingHandshakes, NULL);
503    tr_ptrArrayDestruct (&t->peers, NULL);
504
505    replicationFree (t);
506
507    tr_free (t->requests);
508    tr_free (t->pieces);
509    tr_free (t);
510}
511
512static void peerCallbackFunc (tr_peer *, const tr_peer_event *, void *);
513
514static void
515rebuildWebseedArray (Torrent * t, tr_torrent * tor)
516{
517    int i;
518    const tr_info * inf = &tor->info;
519
520    /* clear the array */
521    tr_ptrArrayDestruct (&t->webseeds, (PtrArrayForeachFunc)tr_webseedFree);
522    t->webseeds = TR_PTR_ARRAY_INIT;
523
524    /* repopulate it */
525    for (i = 0; i < inf->webseedCount; ++i)
526    {
527        tr_webseed * w = tr_webseedNew (tor, inf->webseeds[i], peerCallbackFunc, t);
528        tr_ptrArrayAppend (&t->webseeds, w);
529    }
530}
531
532static Torrent*
533torrentNew (tr_peerMgr * manager, tr_torrent * tor)
534{
535    Torrent * t;
536
537    t = tr_new0 (Torrent, 1);
538    t->manager = manager;
539    t->tor = tor;
540    t->pool = TR_PTR_ARRAY_INIT;
541    t->peers = TR_PTR_ARRAY_INIT;
542    t->webseeds = TR_PTR_ARRAY_INIT;
543    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
544
545    rebuildWebseedArray (t, tor);
546
547    return t;
548}
549
550static void ensureMgrTimersExist (struct tr_peerMgr * m);
551
552tr_peerMgr*
553tr_peerMgrNew (tr_session * session)
554{
555    tr_peerMgr * m = tr_new0 (tr_peerMgr, 1);
556    m->session = session;
557    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
558    ensureMgrTimersExist (m);
559    return m;
560}
561
562static void
563deleteTimer (struct event ** t)
564{
565    if (*t != NULL)
566    {
567        event_free (*t);
568        *t = NULL;
569    }
570}
571
572static void
573deleteTimers (struct tr_peerMgr * m)
574{
575    deleteTimer (&m->atomTimer);
576    deleteTimer (&m->bandwidthTimer);
577    deleteTimer (&m->rechokeTimer);
578    deleteTimer (&m->refillUpkeepTimer);
579}
580
581void
582tr_peerMgrFree (tr_peerMgr * manager)
583{
584    managerLock (manager);
585
586    deleteTimers (manager);
587
588    /* free the handshakes. Abort invokes handshakeDoneCB (), which removes
589     * the item from manager->handshakes, so this is a little roundabout... */
590    while (!tr_ptrArrayEmpty (&manager->incomingHandshakes))
591        tr_handshakeAbort (tr_ptrArrayNth (&manager->incomingHandshakes, 0));
592
593    tr_ptrArrayDestruct (&manager->incomingHandshakes, NULL);
594
595    managerUnlock (manager);
596    tr_free (manager);
597}
598
599static int
600clientIsDownloadingFrom (const tr_torrent * tor, const tr_peer * peer)
601{
602    if (!tr_torrentHasMetadata (tor))
603        return true;
604
605    return peer->clientIsInterested && !peer->clientIsChoked;
606}
607
608static int
609clientIsUploadingTo (const tr_peer * peer)
610{
611    return peer->peerIsInterested && !peer->peerIsChoked;
612}
613
614/***
615****
616***/
617
618void
619tr_peerMgrOnBlocklistChanged (tr_peerMgr * mgr)
620{
621    tr_torrent * tor = NULL;
622    tr_session * session = mgr->session;
623
624    /* we cache whether or not a peer is blocklisted...
625       since the blocklist has changed, erase that cached value */
626    while ((tor = tr_torrentNext (session, tor)))
627    {
628        int i;
629        Torrent * t = tor->torrentPeers;
630        const int n = tr_ptrArraySize (&t->pool);
631        for (i=0; i<n; ++i) {
632            struct peer_atom * atom = tr_ptrArrayNth (&t->pool, i);
633            atom->blocklisted = -1;
634        }
635    }
636}
637
638static bool
639isAtomBlocklisted (tr_session * session, struct peer_atom * atom)
640{
641    if (atom->blocklisted < 0)
642        atom->blocklisted = tr_sessionIsAddressBlocked (session, &atom->addr);
643
644    assert (tr_isBool (atom->blocklisted));
645    return atom->blocklisted;
646}
647
648
649/***
650****
651***/
652
653static void
654atomSetSeedProbability (struct peer_atom * atom, int seedProbability)
655{
656    assert (atom != NULL);
657    assert (-1<=seedProbability && seedProbability<=100);
658
659    atom->seedProbability = seedProbability;
660
661    if (seedProbability == 100)
662        atom->flags |= ADDED_F_SEED_FLAG;
663    else if (seedProbability != -1)
664        atom->flags &= ~ADDED_F_SEED_FLAG;
665}
666
667static inline bool
668atomIsSeed (const struct peer_atom * atom)
669{
670    return atom->seedProbability == 100;
671}
672
673static void
674atomSetSeed (const Torrent * t, struct peer_atom * atom)
675{
676    if (!atomIsSeed (atom))
677    {
678        tordbg (t, "marking peer %s as a seed", tr_atomAddrStr (atom));
679
680        atomSetSeedProbability (atom, 100);
681    }
682}
683
684
685bool
686tr_peerMgrPeerIsSeed (const tr_torrent  * tor,
687                      const tr_address  * addr)
688{
689    bool isSeed = false;
690    const Torrent * t = tor->torrentPeers;
691    const struct peer_atom * atom = getExistingAtom (t, addr);
692
693    if (atom)
694        isSeed = atomIsSeed (atom);
695
696    return isSeed;
697}
698
699void
700tr_peerMgrSetUtpSupported (tr_torrent * tor, const tr_address * addr)
701{
702    struct peer_atom * atom = getExistingAtom (tor->torrentPeers, addr);
703
704    if (atom)
705        atom->flags |= ADDED_F_UTP_FLAGS;
706}
707
708void
709tr_peerMgrSetUtpFailed (tr_torrent *tor, const tr_address *addr, bool failed)
710{
711    struct peer_atom * atom = getExistingAtom (tor->torrentPeers, addr);
712
713    if (atom)
714        atom->utp_failed = failed;
715}
716
717
718/**
719***  REQUESTS
720***
721*** There are two data structures associated with managing block requests:
722***
723*** 1. Torrent::requests, an array of "struct block_request" which keeps
724***    track of which blocks have been requested, and when, and by which peers.
725***    This is list is used for (a) cancelling requests that have been pending
726***    for too long and (b) avoiding duplicate requests before endgame.
727***
728*** 2. Torrent::pieces, an array of "struct weighted_piece" which lists the
729***    pieces that we want to request. It's used to decide which blocks to
730***    return next when tr_peerMgrGetBlockRequests () is called.
731**/
732
733/**
734*** struct block_request
735**/
736
737static int
738compareReqByBlock (const void * va, const void * vb)
739{
740    const struct block_request * a = va;
741    const struct block_request * b = vb;
742
743    /* primary key: block */
744    if (a->block < b->block) return -1;
745    if (a->block > b->block) return 1;
746
747    /* secondary key: peer */
748    if (a->peer < b->peer) return -1;
749    if (a->peer > b->peer) return 1;
750
751    return 0;
752}
753
754static void
755requestListAdd (Torrent * t, tr_block_index_t block, tr_peer * peer)
756{
757    struct block_request key;
758
759    /* ensure enough room is available... */
760    if (t->requestCount + 1 >= t->requestAlloc)
761    {
762        const int CHUNK_SIZE = 128;
763        t->requestAlloc += CHUNK_SIZE;
764        t->requests = tr_renew (struct block_request,
765                                t->requests, t->requestAlloc);
766    }
767
768    /* populate the record we're inserting */
769    key.block = block;
770    key.peer = peer;
771    key.sentAt = tr_time ();
772
773    /* insert the request to our array... */
774    {
775        bool exact;
776        const int pos = tr_lowerBound (&key, t->requests, t->requestCount,
777                                       sizeof (struct block_request),
778                                       compareReqByBlock, &exact);
779        assert (!exact);
780        memmove (t->requests + pos + 1,
781                 t->requests + pos,
782                 sizeof (struct block_request) * (t->requestCount++ - pos));
783        t->requests[pos] = key;
784    }
785
786    if (peer != NULL)
787    {
788        ++peer->pendingReqsToPeer;
789        assert (peer->pendingReqsToPeer >= 0);
790    }
791
792    /*fprintf (stderr, "added request of block %lu from peer %s... "
793                       "there are now %d block\n",
794                     (unsigned long)block, tr_atomAddrStr (peer->atom), t->requestCount);*/
795}
796
797static struct block_request *
798requestListLookup (Torrent * t, tr_block_index_t block, const tr_peer * peer)
799{
800    struct block_request key;
801    key.block = block;
802    key.peer = (tr_peer*) peer;
803
804    return bsearch (&key, t->requests, t->requestCount,
805                    sizeof (struct block_request),
806                    compareReqByBlock);
807}
808
809/**
810 * Find the peers are we currently requesting the block
811 * with index @a block from and append them to @a peerArr.
812 */
813static void
814getBlockRequestPeers (Torrent * t, tr_block_index_t block,
815                      tr_ptrArray * peerArr)
816{
817    bool exact;
818    int i, pos;
819    struct block_request key;
820
821    key.block = block;
822    key.peer = NULL;
823    pos = tr_lowerBound (&key, t->requests, t->requestCount,
824                         sizeof (struct block_request),
825                         compareReqByBlock, &exact);
826
827    assert (!exact); /* shouldn't have a request with .peer == NULL */
828
829    for (i = pos; i < t->requestCount; ++i)
830    {
831        if (t->requests[i].block != block)
832            break;
833        tr_ptrArrayAppend (peerArr, t->requests[i].peer);
834    }
835}
836
837static void
838decrementPendingReqCount (const struct block_request * b)
839{
840    if (b->peer != NULL)
841        if (b->peer->pendingReqsToPeer > 0)
842            --b->peer->pendingReqsToPeer;
843}
844
845static void
846requestListRemove (Torrent * t, tr_block_index_t block, const tr_peer * peer)
847{
848    const struct block_request * b = requestListLookup (t, block, peer);
849    if (b != NULL)
850    {
851        const int pos = b - t->requests;
852        assert (pos < t->requestCount);
853
854        decrementPendingReqCount (b);
855
856        tr_removeElementFromArray (t->requests,
857                                   pos,
858                                   sizeof (struct block_request),
859                                   t->requestCount--);
860
861        /*fprintf (stderr, "removing request of block %lu from peer %s... "
862                           "there are now %d block requests left\n",
863                         (unsigned long)block, tr_atomAddrStr (peer->atom), t->requestCount);*/
864    }
865}
866
867static int
868countActiveWebseeds (const Torrent * t)
869{
870    int activeCount = 0;
871    const tr_webseed ** w = (const tr_webseed **) tr_ptrArrayBase (&t->webseeds);
872    const tr_webseed ** const wend = w + tr_ptrArraySize (&t->webseeds);
873
874    for (; w!=wend; ++w)
875        if (tr_webseedIsActive (*w))
876            ++activeCount;
877
878    return activeCount;
879}
880
881static bool
882testForEndgame (const Torrent * t)
883{
884    /* we consider ourselves to be in endgame if the number of bytes
885       we've got requested is >= the number of bytes left to download */
886    return (t->requestCount * t->tor->blockSize)
887               >= tr_cpLeftUntilDone (&t->tor->completion);
888}
889
890static void
891updateEndgame (Torrent * t)
892{
893    assert (t->requestCount >= 0);
894
895    if (!testForEndgame (t))
896    {
897        /* not in endgame */
898        t->endgame = 0;
899    }
900    else if (!t->endgame) /* only recalculate when endgame first begins */
901    {
902        int numDownloading = 0;
903        const tr_peer ** p = (const tr_peer **) tr_ptrArrayBase (&t->peers);
904        const tr_peer ** const pend = p + tr_ptrArraySize (&t->peers);
905
906        /* add the active bittorrent peers... */
907        for (; p!=pend; ++p)
908            if ((*p)->pendingReqsToPeer > 0)
909                ++numDownloading;
910
911        /* add the active webseeds... */
912        numDownloading += countActiveWebseeds (t);
913
914        /* average number of pending requests per downloading peer */
915        t->endgame = t->requestCount / MAX (numDownloading, 1);
916    }
917}
918
919
920/****
921*****
922*****  Piece List Manipulation / Accessors
923*****
924****/
925
926static inline void
927invalidatePieceSorting (Torrent * t)
928{
929    t->pieceSortState = PIECES_UNSORTED;
930}
931
932static const tr_torrent * weightTorrent;
933
934static const uint16_t * weightReplication;
935
936static void
937setComparePieceByWeightTorrent (Torrent * t)
938{
939    if (!replicationExists (t))
940        replicationNew (t);
941
942    weightTorrent = t->tor;
943    weightReplication = t->pieceReplication;
944}
945
946/* we try to create a "weight" s.t. high-priority pieces come before others,
947 * and that partially-complete pieces come before empty ones. */
948static int
949comparePieceByWeight (const void * va, const void * vb)
950{
951    const struct weighted_piece * a = va;
952    const struct weighted_piece * b = vb;
953    int ia, ib, missing, pending;
954    const tr_torrent * tor = weightTorrent;
955    const uint16_t * rep = weightReplication;
956
957    /* primary key: weight */
958    missing = tr_cpMissingBlocksInPiece (&tor->completion, a->index);
959    pending = a->requestCount;
960    ia = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
961    missing = tr_cpMissingBlocksInPiece (&tor->completion, b->index);
962    pending = b->requestCount;
963    ib = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
964    if (ia < ib) return -1;
965    if (ia > ib) return 1;
966
967    /* secondary key: higher priorities go first */
968    ia = tor->info.pieces[a->index].priority;
969    ib = tor->info.pieces[b->index].priority;
970    if (ia > ib) return -1;
971    if (ia < ib) return 1;
972
973    /* tertiary key: rarest first. */
974    ia = rep[a->index];
975    ib = rep[b->index];
976    if (ia < ib) return -1;
977    if (ia > ib) return 1;
978
979    /* quaternary key: random */
980    if (a->salt < b->salt) return -1;
981    if (a->salt > b->salt) return 1;
982
983    /* okay, they're equal */
984    return 0;
985}
986
987static int
988comparePieceByIndex (const void * va, const void * vb)
989{
990    const struct weighted_piece * a = va;
991    const struct weighted_piece * b = vb;
992    if (a->index < b->index) return -1;
993    if (a->index > b->index) return 1;
994    return 0;
995}
996
997static void
998pieceListSort (Torrent * t, enum piece_sort_state state)
999{
1000    assert (state==PIECES_SORTED_BY_INDEX
1001         || state==PIECES_SORTED_BY_WEIGHT);
1002
1003
1004    if (state == PIECES_SORTED_BY_WEIGHT)
1005    {
1006        setComparePieceByWeightTorrent (t);
1007        qsort (t->pieces, t->pieceCount, sizeof (struct weighted_piece), comparePieceByWeight);
1008    }
1009    else
1010        qsort (t->pieces, t->pieceCount, sizeof (struct weighted_piece), comparePieceByIndex);
1011
1012    t->pieceSortState = state;
1013}
1014
1015/**
1016 * These functions are useful for testing, but too expensive for nightly builds.
1017 * let's leave it disabled but add an easy hook to compile it back in
1018 */
1019#if 1
1020#define assertWeightedPiecesAreSorted(t)
1021#define assertReplicationCountIsExact(t)
1022#else
1023static void
1024assertWeightedPiecesAreSorted (Torrent * t)
1025{
1026    if (!t->endgame)
1027    {
1028        int i;
1029        setComparePieceByWeightTorrent (t);
1030        for (i=0; i<t->pieceCount-1; ++i)
1031            assert (comparePieceByWeight (&t->pieces[i], &t->pieces[i+1]) <= 0);
1032    }
1033}
1034static void
1035assertReplicationCountIsExact (Torrent * t)
1036{
1037    /* This assert might fail due to errors of implementations in other
1038     * clients. It happens when receiving duplicate bitfields/HaveAll/HaveNone
1039     * from a client. If a such a behavior is noticed,
1040     * a bug report should be filled to the faulty client. */
1041
1042    size_t piece_i;
1043    const uint16_t * rep = t->pieceReplication;
1044    const size_t piece_count = t->pieceReplicationSize;
1045    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase (&t->peers);
1046    const int peer_count = tr_ptrArraySize (&t->peers);
1047
1048    assert (piece_count == t->tor->info.pieceCount);
1049
1050    for (piece_i=0; piece_i<piece_count; ++piece_i)
1051    {
1052        int peer_i;
1053        uint16_t r = 0;
1054
1055        for (peer_i=0; peer_i<peer_count; ++peer_i)
1056            if (tr_bitsetHas (&peers[peer_i]->have, piece_i))
1057                ++r;
1058
1059        assert (rep[piece_i] == r);
1060    }
1061}
1062#endif
1063
1064static struct weighted_piece *
1065pieceListLookup (Torrent * t, tr_piece_index_t index)
1066{
1067    int i;
1068
1069    for (i=0; i<t->pieceCount; ++i)
1070        if (t->pieces[i].index == index)
1071            return &t->pieces[i];
1072
1073    return NULL;
1074}
1075
1076static void
1077pieceListRebuild (Torrent * t)
1078{
1079
1080    if (!tr_torrentIsSeed (t->tor))
1081    {
1082        tr_piece_index_t i;
1083        tr_piece_index_t * pool;
1084        tr_piece_index_t poolCount = 0;
1085        const tr_torrent * tor = t->tor;
1086        const tr_info * inf = tr_torrentInfo (tor);
1087        struct weighted_piece * pieces;
1088        int pieceCount;
1089
1090        /* build the new list */
1091        pool = tr_new (tr_piece_index_t, inf->pieceCount);
1092        for (i=0; i<inf->pieceCount; ++i)
1093            if (!inf->pieces[i].dnd)
1094                if (!tr_cpPieceIsComplete (&tor->completion, i))
1095                    pool[poolCount++] = i;
1096        pieceCount = poolCount;
1097        pieces = tr_new0 (struct weighted_piece, pieceCount);
1098        for (i=0; i<poolCount; ++i) {
1099            struct weighted_piece * piece = pieces + i;
1100            piece->index = pool[i];
1101            piece->requestCount = 0;
1102            piece->salt = tr_cryptoWeakRandInt (4096);
1103        }
1104
1105        /* if we already had a list of pieces, merge it into
1106         * the new list so we don't lose its requestCounts */
1107        if (t->pieces != NULL)
1108        {
1109            struct weighted_piece * o = t->pieces;
1110            struct weighted_piece * oend = o + t->pieceCount;
1111            struct weighted_piece * n = pieces;
1112            struct weighted_piece * nend = n + pieceCount;
1113
1114            pieceListSort (t, PIECES_SORTED_BY_INDEX);
1115
1116            while (o!=oend && n!=nend) {
1117                if (o->index < n->index)
1118                    ++o;
1119                else if (o->index > n->index)
1120                    ++n;
1121                else
1122                    *n++ = *o++;
1123            }
1124
1125            tr_free (t->pieces);
1126        }
1127
1128        t->pieces = pieces;
1129        t->pieceCount = pieceCount;
1130
1131        pieceListSort (t, PIECES_SORTED_BY_WEIGHT);
1132
1133        /* cleanup */
1134        tr_free (pool);
1135    }
1136}
1137
1138static void
1139pieceListRemovePiece (Torrent * t, tr_piece_index_t piece)
1140{
1141    struct weighted_piece * p;
1142
1143    if ((p = pieceListLookup (t, piece)))
1144    {
1145        const int pos = p - t->pieces;
1146
1147        tr_removeElementFromArray (t->pieces,
1148                                   pos,
1149                                   sizeof (struct weighted_piece),
1150                                   t->pieceCount--);
1151
1152        if (t->pieceCount == 0)
1153        {
1154            tr_free (t->pieces);
1155            t->pieces = NULL;
1156        }
1157    }
1158}
1159
1160static void
1161pieceListResortPiece (Torrent * t, struct weighted_piece * p)
1162{
1163    int pos;
1164    bool isSorted = true;
1165
1166    if (p == NULL)
1167        return;
1168
1169    /* is the torrent already sorted? */
1170    pos = p - t->pieces;
1171    setComparePieceByWeightTorrent (t);
1172    if (isSorted && (pos > 0) && (comparePieceByWeight (p-1, p) > 0))
1173        isSorted = false;
1174    if (isSorted && (pos < t->pieceCount - 1) && (comparePieceByWeight (p, p+1) > 0))
1175        isSorted = false;
1176
1177    if (t->pieceSortState != PIECES_SORTED_BY_WEIGHT)
1178    {
1179       pieceListSort (t, PIECES_SORTED_BY_WEIGHT);
1180       isSorted = true;
1181    }
1182
1183    /* if it's not sorted, move it around */
1184    if (!isSorted)
1185    {
1186        bool exact;
1187        const struct weighted_piece tmp = *p;
1188
1189        tr_removeElementFromArray (t->pieces,
1190                                   pos,
1191                                   sizeof (struct weighted_piece),
1192                                   t->pieceCount--);
1193
1194        pos = tr_lowerBound (&tmp, t->pieces, t->pieceCount,
1195                             sizeof (struct weighted_piece),
1196                             comparePieceByWeight, &exact);
1197
1198        memmove (&t->pieces[pos + 1],
1199                 &t->pieces[pos],
1200                 sizeof (struct weighted_piece) * (t->pieceCount++ - pos));
1201
1202        t->pieces[pos] = tmp;
1203    }
1204
1205    assertWeightedPiecesAreSorted (t);
1206}
1207
1208static void
1209pieceListRemoveRequest (Torrent * t, tr_block_index_t block)
1210{
1211    struct weighted_piece * p;
1212    const tr_piece_index_t index = tr_torBlockPiece (t->tor, block);
1213
1214    if (((p = pieceListLookup (t, index))) && (p->requestCount > 0))
1215    {
1216        --p->requestCount;
1217        pieceListResortPiece (t, p);
1218    }
1219}
1220
1221
1222/****
1223*****
1224*****  Replication count (for rarest first policy)
1225*****
1226****/
1227
1228/**
1229 * Increase the replication count of this piece and sort it if the
1230 * piece list is already sorted
1231 */
1232static void
1233tr_incrReplicationOfPiece (Torrent * t, const size_t index)
1234{
1235    assert (replicationExists (t));
1236    assert (t->pieceReplicationSize == t->tor->info.pieceCount);
1237
1238    /* One more replication of this piece is present in the swarm */
1239    ++t->pieceReplication[index];
1240
1241    /* we only resort the piece if the list is already sorted */
1242    if (t->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1243        pieceListResortPiece (t, pieceListLookup (t, index));
1244}
1245
1246/**
1247 * Increases the replication count of pieces present in the bitfield
1248 */
1249static void
1250tr_incrReplicationFromBitfield (Torrent * t, const tr_bitfield * b)
1251{
1252    size_t i;
1253    uint16_t * rep = t->pieceReplication;
1254    const size_t n = t->tor->info.pieceCount;
1255
1256    assert (replicationExists (t));
1257
1258    for (i=0; i<n; ++i)
1259        if (tr_bitfieldHas (b, i))
1260            ++rep[i];
1261
1262    if (t->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1263        invalidatePieceSorting (t);
1264}
1265
1266/**
1267 * Increase the replication count of every piece
1268 */
1269static void
1270tr_incrReplication (Torrent * t)
1271{
1272    int i;
1273    const int n = t->pieceReplicationSize;
1274
1275    assert (replicationExists (t));
1276    assert (t->pieceReplicationSize == t->tor->info.pieceCount);
1277
1278    for (i=0; i<n; ++i)
1279        ++t->pieceReplication[i];
1280}
1281
1282/**
1283 * Decrease the replication count of pieces present in the bitset.
1284 */
1285static void
1286tr_decrReplicationFromBitfield (Torrent * t, const tr_bitfield * b)
1287{
1288    int i;
1289    const int n = t->pieceReplicationSize;
1290
1291    assert (replicationExists (t));
1292    assert (t->pieceReplicationSize == t->tor->info.pieceCount);
1293
1294    if (tr_bitfieldHasAll (b))
1295    {
1296        for (i=0; i<n; ++i)
1297            --t->pieceReplication[i];
1298    }
1299    else if (!tr_bitfieldHasNone (b))
1300    {
1301        for (i=0; i<n; ++i)
1302            if (tr_bitfieldHas (b, i))
1303                --t->pieceReplication[i];
1304
1305        if (t->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1306            invalidatePieceSorting (t);
1307    }
1308}
1309
1310/**
1311***
1312**/
1313
1314void
1315tr_peerMgrRebuildRequests (tr_torrent * tor)
1316{
1317    assert (tr_isTorrent (tor));
1318
1319    pieceListRebuild (tor->torrentPeers);
1320}
1321
1322void
1323tr_peerMgrGetNextRequests (tr_torrent           * tor,
1324                           tr_peer              * peer,
1325                           int                    numwant,
1326                           tr_block_index_t     * setme,
1327                           int                  * numgot,
1328                           bool                   get_intervals)
1329{
1330    int i;
1331    int got;
1332    Torrent * t;
1333    struct weighted_piece * pieces;
1334    const tr_bitfield * const have = &peer->have;
1335
1336    /* sanity clause */
1337    assert (tr_isTorrent (tor));
1338    assert (peer->clientIsInterested);
1339    assert (!peer->clientIsChoked);
1340    assert (numwant > 0);
1341
1342    /* walk through the pieces and find blocks that should be requested */
1343    got = 0;
1344    t = tor->torrentPeers;
1345
1346    /* prep the pieces list */
1347    if (t->pieces == NULL)
1348        pieceListRebuild (t);
1349
1350    if (t->pieceSortState != PIECES_SORTED_BY_WEIGHT)
1351        pieceListSort (t, PIECES_SORTED_BY_WEIGHT);
1352
1353    assertReplicationCountIsExact (t);
1354    assertWeightedPiecesAreSorted (t);
1355
1356    updateEndgame (t);
1357    pieces = t->pieces;
1358    for (i=0; i<t->pieceCount && got<numwant; ++i)
1359    {
1360        struct weighted_piece * p = pieces + i;
1361
1362        /* if the peer has this piece that we want... */
1363        if (tr_bitfieldHas (have, p->index))
1364        {
1365            tr_block_index_t b;
1366            tr_block_index_t first;
1367            tr_block_index_t last;
1368            tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1369
1370            tr_torGetPieceBlockRange (tor, p->index, &first, &last);
1371
1372            for (b=first; b<=last && (got<numwant || (get_intervals && setme[2*got-1] == b-1)); ++b)
1373            {
1374                int peerCount;
1375                tr_peer ** peers;
1376
1377                /* don't request blocks we've already got */
1378                if (tr_cpBlockIsComplete (&tor->completion, b))
1379                    continue;
1380
1381                /* always add peer if this block has no peers yet */
1382                tr_ptrArrayClear (&peerArr);
1383                getBlockRequestPeers (t, b, &peerArr);
1384                peers = (tr_peer **) tr_ptrArrayPeek (&peerArr, &peerCount);
1385                if (peerCount != 0)
1386                {
1387                    /* don't make a second block request until the endgame */
1388                    if (!t->endgame)
1389                        continue;
1390
1391                    /* don't have more than two peers requesting this block */
1392                    if (peerCount > 1)
1393                        continue;
1394
1395                    /* don't send the same request to the same peer twice */
1396                    if (peer == peers[0])
1397                        continue;
1398
1399                    /* in the endgame allow an additional peer to download a
1400                       block but only if the peer seems to be handling requests
1401                       relatively fast */
1402                    if (peer->pendingReqsToPeer + numwant - got < t->endgame)
1403                        continue;
1404                }
1405
1406                /* update the caller's table */
1407                if (!get_intervals) {
1408                    setme[got++] = b;
1409                }
1410                /* if intervals are requested two array entries are necessarry:
1411                   one for the interval's starting block and one for its end block */
1412                else if (got && setme[2 * got - 1] == b - 1 && b != first) {
1413                    /* expand the last interval */
1414                    ++setme[2 * got - 1];
1415                }
1416                else {
1417                    /* begin a new interval */
1418                    setme[2 * got] = setme[2 * got + 1] = b;
1419                    ++got;
1420                }
1421
1422                /* update our own tables */
1423                requestListAdd (t, b, peer);
1424                ++p->requestCount;
1425            }
1426
1427            tr_ptrArrayDestruct (&peerArr, NULL);
1428        }
1429    }
1430
1431    /* In most cases we've just changed the weights of a small number of pieces.
1432     * So rather than qsort ()ing the entire array, it's faster to apply an
1433     * adaptive insertion sort algorithm. */
1434    if (got > 0)
1435    {
1436        /* not enough requests || last piece modified */
1437        if (i == t->pieceCount) --i;
1438
1439        setComparePieceByWeightTorrent (t);
1440        while (--i >= 0)
1441        {
1442            bool exact;
1443
1444            /* relative position! */
1445            const int newpos = tr_lowerBound (&t->pieces[i], &t->pieces[i + 1],
1446                                              t->pieceCount - (i + 1),
1447                                              sizeof (struct weighted_piece),
1448                                              comparePieceByWeight, &exact);
1449            if (newpos > 0)
1450            {
1451                const struct weighted_piece piece = t->pieces[i];
1452                memmove (&t->pieces[i],
1453                         &t->pieces[i + 1],
1454                         sizeof (struct weighted_piece) * (newpos));
1455                t->pieces[i + newpos] = piece;
1456            }
1457        }
1458    }
1459
1460    assertWeightedPiecesAreSorted (t);
1461    *numgot = got;
1462}
1463
1464bool
1465tr_peerMgrDidPeerRequest (const tr_torrent  * tor,
1466                          const tr_peer     * peer,
1467                          tr_block_index_t    block)
1468{
1469    const Torrent * t = tor->torrentPeers;
1470    return requestListLookup ((Torrent*)t, block, peer) != NULL;
1471}
1472
1473/* cancel requests that are too old */
1474static void
1475refillUpkeep (int foo UNUSED, short bar UNUSED, void * vmgr)
1476{
1477    time_t now;
1478    time_t too_old;
1479    tr_torrent * tor;
1480    int cancel_buflen = 0;
1481    struct block_request * cancel = NULL;
1482    tr_peerMgr * mgr = vmgr;
1483    managerLock (mgr);
1484
1485    now = tr_time ();
1486    too_old = now - REQUEST_TTL_SECS;
1487
1488    /* alloc the temporary "cancel" buffer */
1489    tor = NULL;
1490    while ((tor = tr_torrentNext (mgr->session, tor)))
1491        cancel_buflen = MAX (cancel_buflen, tor->torrentPeers->requestCount);
1492    if (cancel_buflen > 0)
1493        cancel = tr_new (struct block_request, cancel_buflen);
1494
1495    /* prune requests that are too old */
1496    tor = NULL;
1497    while ((tor = tr_torrentNext (mgr->session, tor)))
1498    {
1499        Torrent * t = tor->torrentPeers;
1500        const int n = t->requestCount;
1501        if (n > 0)
1502        {
1503            int keepCount = 0;
1504            int cancelCount = 0;
1505            const struct block_request * it;
1506            const struct block_request * end;
1507
1508            for (it=t->requests, end=it+n; it!=end; ++it)
1509            {
1510                if ((it->sentAt <= too_old) && it->peer->msgs && !tr_peerMsgsIsReadingBlock (it->peer->msgs, it->block))
1511                    cancel[cancelCount++] = *it;
1512                else
1513                {
1514                    if (it != &t->requests[keepCount])
1515                        t->requests[keepCount] = *it;
1516                    keepCount++;
1517                }
1518            }
1519
1520            /* prune out the ones we aren't keeping */
1521            t->requestCount = keepCount;
1522
1523            /* send cancel messages for all the "cancel" ones */
1524            for (it=cancel, end=it+cancelCount; it!=end; ++it) {
1525                if ((it->peer != NULL) && (it->peer->msgs != NULL)) {
1526                    tr_historyAdd (&it->peer->cancelsSentToPeer, now, 1);
1527                    tr_peerMsgsCancel (it->peer->msgs, it->block);
1528                    decrementPendingReqCount (it);
1529                }
1530            }
1531
1532            /* decrement the pending request counts for the timed-out blocks */
1533            for (it=cancel, end=it+cancelCount; it!=end; ++it)
1534                pieceListRemoveRequest (t, it->block);
1535        }
1536    }
1537
1538    tr_free (cancel);
1539    tr_timerAddMsec (mgr->refillUpkeepTimer, REFILL_UPKEEP_PERIOD_MSEC);
1540    managerUnlock (mgr);
1541}
1542
1543static void
1544addStrike (Torrent * t, tr_peer * peer)
1545{
1546    tordbg (t, "increasing peer %s strike count to %d",
1547            tr_atomAddrStr (peer->atom), peer->strikes + 1);
1548
1549    if (++peer->strikes >= MAX_BAD_PIECES_PER_PEER)
1550    {
1551        struct peer_atom * atom = peer->atom;
1552        atom->flags2 |= MYFLAG_BANNED;
1553        peer->doPurge = 1;
1554        tordbg (t, "banning peer %s", tr_atomAddrStr (atom));
1555    }
1556}
1557
1558static void
1559gotBadPiece (Torrent * t, tr_piece_index_t pieceIndex)
1560{
1561    tr_torrent *   tor = t->tor;
1562    const uint32_t byteCount = tr_torPieceCountBytes (tor, pieceIndex);
1563
1564    tor->corruptCur += byteCount;
1565    tor->downloadedCur -= MIN (tor->downloadedCur, byteCount);
1566
1567    tr_announcerAddBytes (tor, TR_ANN_CORRUPT, byteCount);
1568}
1569
1570static void
1571peerSuggestedPiece (Torrent            * t UNUSED,
1572                    tr_peer            * peer UNUSED,
1573                    tr_piece_index_t     pieceIndex UNUSED,
1574                    int                  isFastAllowed UNUSED)
1575{
1576#if 0
1577    assert (t);
1578    assert (peer);
1579    assert (peer->msgs);
1580
1581    /* is this a valid piece? */
1582    if (pieceIndex >= t->tor->info.pieceCount)
1583        return;
1584
1585    /* don't ask for it if we've already got it */
1586    if (tr_cpPieceIsComplete (t->tor->completion, pieceIndex))
1587        return;
1588
1589    /* don't ask for it if they don't have it */
1590    if (!tr_bitfieldHas (peer->have, pieceIndex))
1591        return;
1592
1593    /* don't ask for it if we're choked and it's not fast */
1594    if (!isFastAllowed && peer->clientIsChoked)
1595        return;
1596
1597    /* request the blocks that we don't have in this piece */
1598    {
1599        tr_block_index_t b;
1600        tr_block_index_t first;
1601        tr_block_index_t last;
1602        const tr_torrent * tor = t->tor;
1603
1604        tr_torGetPieceBlockRange (t->tor, pieceIndex, &first, &last);
1605
1606        for (b=first; b<=last; ++b)
1607        {
1608            if (!tr_cpBlockIsComplete (tor->completion, b))
1609            {
1610                const uint32_t offset = getBlockOffsetInPiece (tor, b);
1611                const uint32_t length = tr_torBlockCountBytes (tor, b);
1612                tr_peerMsgsAddRequest (peer->msgs, pieceIndex, offset, length);
1613                incrementPieceRequests (t, pieceIndex);
1614            }
1615        }
1616    }
1617#endif
1618}
1619
1620static void
1621removeRequestFromTables (Torrent * t, tr_block_index_t block, const tr_peer * peer)
1622{
1623    requestListRemove (t, block, peer);
1624    pieceListRemoveRequest (t, block);
1625}
1626
1627/* peer choked us, or maybe it disconnected.
1628   either way we need to remove all its requests */
1629static void
1630peerDeclinedAllRequests (Torrent * t, const tr_peer * peer)
1631{
1632    int i, n;
1633    tr_block_index_t * blocks = tr_new (tr_block_index_t, t->requestCount);
1634
1635    for (i=n=0; i<t->requestCount; ++i)
1636        if (peer == t->requests[i].peer)
1637            blocks[n++] = t->requests[i].block;
1638
1639    for (i=0; i<n; ++i)
1640        removeRequestFromTables (t, blocks[i], peer);
1641
1642    tr_free (blocks);
1643}
1644
1645static void tr_peerMgrSetBlame (tr_torrent *, tr_piece_index_t, int);
1646
1647static void
1648peerCallbackFunc (tr_peer * peer, const tr_peer_event * e, void * vt)
1649{
1650    Torrent * t = vt;
1651
1652    torrentLock (t);
1653
1654    assert (peer != NULL);
1655
1656    switch (e->eventType)
1657    {
1658        case TR_PEER_PEER_GOT_DATA:
1659        {
1660            const time_t now = tr_time ();
1661            tr_torrent * tor = t->tor;
1662
1663            if (e->wasPieceData)
1664            {
1665                tor->uploadedCur += e->length;
1666                tr_announcerAddBytes (tor, TR_ANN_UP, e->length);
1667                tr_torrentSetActivityDate (tor, now);
1668                tr_torrentSetDirty (tor);
1669            }
1670
1671            /* update the stats */
1672            if (e->wasPieceData)
1673                tr_statsAddUploaded (tor->session, e->length);
1674
1675            /* update our atom */
1676            if (peer->atom && e->wasPieceData)
1677                peer->atom->piece_data_time = now;
1678
1679            break;
1680        }
1681
1682        case TR_PEER_CLIENT_GOT_HAVE:
1683            if (replicationExists (t)) {
1684                tr_incrReplicationOfPiece (t, e->pieceIndex);
1685                assertReplicationCountIsExact (t);
1686            }
1687            break;
1688
1689        case TR_PEER_CLIENT_GOT_HAVE_ALL:
1690            if (replicationExists (t)) {
1691                tr_incrReplication (t);
1692                assertReplicationCountIsExact (t);
1693            }
1694            break;
1695
1696        case TR_PEER_CLIENT_GOT_HAVE_NONE:
1697            /* noop */
1698            break;
1699
1700        case TR_PEER_CLIENT_GOT_BITFIELD:
1701            assert (e->bitfield != NULL);
1702            if (replicationExists (t)) {
1703                tr_incrReplicationFromBitfield (t, e->bitfield);
1704                assertReplicationCountIsExact (t);
1705            }
1706            break;
1707
1708        case TR_PEER_CLIENT_GOT_REJ: {
1709            tr_block_index_t b = _tr_block (t->tor, e->pieceIndex, e->offset);
1710            if (b < t->tor->blockCount)
1711                removeRequestFromTables (t, b, peer);
1712            else
1713                tordbg (t, "Peer %s sent an out-of-range reject message",
1714                           tr_atomAddrStr (peer->atom));
1715            break;
1716        }
1717
1718        case TR_PEER_CLIENT_GOT_CHOKE:
1719            peerDeclinedAllRequests (t, peer);
1720            break;
1721
1722        case TR_PEER_CLIENT_GOT_PORT:
1723            if (peer->atom)
1724                peer->atom->port = e->port;
1725            break;
1726
1727        case TR_PEER_CLIENT_GOT_SUGGEST:
1728            peerSuggestedPiece (t, peer, e->pieceIndex, false);
1729            break;
1730
1731        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1732            peerSuggestedPiece (t, peer, e->pieceIndex, true);
1733            break;
1734
1735        case TR_PEER_CLIENT_GOT_DATA:
1736        {
1737            const time_t now = tr_time ();
1738            tr_torrent * tor = t->tor;
1739
1740            if (e->wasPieceData)
1741            {
1742                tor->downloadedCur += e->length;
1743                tr_torrentSetActivityDate (tor, now);
1744                tr_torrentSetDirty (tor);
1745            }
1746
1747            /* update the stats */
1748            if (e->wasPieceData)
1749                tr_statsAddDownloaded (tor->session, e->length);
1750
1751            /* update our atom */
1752            if (peer->atom && e->wasPieceData)
1753                peer->atom->piece_data_time = now;
1754
1755            break;
1756        }
1757
1758        case TR_PEER_CLIENT_GOT_BLOCK:
1759        {
1760            tr_torrent * tor = t->tor;
1761            tr_block_index_t block = _tr_block (tor, e->pieceIndex, e->offset);
1762            int i, peerCount;
1763            tr_peer ** peers;
1764            tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1765
1766            removeRequestFromTables (t, block, peer);
1767            getBlockRequestPeers (t, block, &peerArr);
1768            peers = (tr_peer **) tr_ptrArrayPeek (&peerArr, &peerCount);
1769
1770            /* remove additional block requests and send cancel to peers */
1771            for (i=0; i<peerCount; i++) {
1772                tr_peer * p = peers[i];
1773                assert (p != peer);
1774                if (p->msgs) {
1775                    tr_historyAdd (&p->cancelsSentToPeer, tr_time (), 1);
1776                    tr_peerMsgsCancel (p->msgs, block);
1777                }
1778                removeRequestFromTables (t, block, p);
1779            }
1780
1781            tr_ptrArrayDestruct (&peerArr, false);
1782
1783            tr_historyAdd (&peer->blocksSentToClient, tr_time (), 1);
1784
1785            if (tr_cpBlockIsComplete (&tor->completion, block))
1786            {
1787                /* we already have this block... */
1788                const uint32_t n = tr_torBlockCountBytes (tor, block);
1789                tor->downloadedCur -= MIN (tor->downloadedCur, n);
1790                tordbg (t, "we have this block already...");
1791            }
1792            else
1793            {
1794                tr_cpBlockAdd (&tor->completion, block);
1795                pieceListResortPiece (t, pieceListLookup (t, e->pieceIndex));
1796                tr_torrentSetDirty (tor);
1797
1798                if (tr_cpPieceIsComplete (&tor->completion, e->pieceIndex))
1799                {
1800                    const tr_piece_index_t p = e->pieceIndex;
1801                    const bool ok = tr_torrentCheckPiece (tor, p);
1802
1803                    tordbg (t, "[LAZY] checked just-completed piece %zu", (size_t)p);
1804
1805                    if (!ok)
1806                    {
1807                        tr_torerr (tor, _("Piece %lu, which was just downloaded, failed its checksum test"),
1808                                 (unsigned long)p);
1809                    }
1810
1811                    tr_peerMgrSetBlame (tor, p, ok);
1812
1813                    if (!ok)
1814                    {
1815                        gotBadPiece (t, p);
1816                    }
1817                    else
1818                    {
1819                        int i;
1820                        int peerCount;
1821                        tr_peer ** peers;
1822                        tr_file_index_t fileIndex;
1823
1824                        /* only add this to downloadedCur if we got it from a peer --
1825                         * webseeds shouldn't count against our ratio. As one tracker
1826                         * admin put it, "Those pieces are downloaded directly from the
1827                         * content distributor, not the peers, it is the tracker's job
1828                         * to manage the swarms, not the web server and does not fit
1829                         * into the jurisdiction of the tracker." */
1830                        if (peer->msgs != NULL) {
1831                            const uint32_t n = tr_torPieceCountBytes (tor, p);
1832                            tr_announcerAddBytes (tor, TR_ANN_DOWN, n);
1833                        }
1834
1835                        peerCount = tr_ptrArraySize (&t->peers);
1836                        peers = (tr_peer**) tr_ptrArrayBase (&t->peers);
1837                        for (i=0; i<peerCount; ++i)
1838                            tr_peerMsgsHave (peers[i]->msgs, p);
1839
1840                        for (fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex) {
1841                            const tr_file * file = &tor->info.files[fileIndex];
1842                            if ((file->firstPiece <= p) && (p <= file->lastPiece)) {
1843                                if (tr_cpFileIsComplete (&tor->completion, fileIndex)) {
1844                                    tr_cacheFlushFile (tor->session->cache, tor, fileIndex);
1845                                    tr_torrentFileCompleted (tor, fileIndex);
1846                                }
1847                            }
1848                        }
1849
1850                        pieceListRemovePiece (t, p);
1851                    }
1852                }
1853
1854                t->needsCompletenessCheck = true;
1855            }
1856            break;
1857        }
1858
1859        case TR_PEER_ERROR:
1860            if ((e->err == ERANGE) || (e->err == EMSGSIZE) || (e->err == ENOTCONN))
1861            {
1862                /* some protocol error from the peer */
1863                peer->doPurge = 1;
1864                tordbg (t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1865                        tr_atomAddrStr (peer->atom));
1866            }
1867            else
1868            {
1869                tordbg (t, "unhandled error: %s", tr_strerror (e->err));
1870            }
1871            break;
1872
1873        default:
1874            assert (0);
1875    }
1876
1877    torrentUnlock (t);
1878}
1879
1880static int
1881getDefaultShelfLife (uint8_t from)
1882{
1883    /* in general, peers obtained from firsthand contact
1884     * are better than those from secondhand, etc etc */
1885    switch (from)
1886    {
1887        case TR_PEER_FROM_INCOMING : return 60 * 60 * 6;
1888        case TR_PEER_FROM_LTEP     : return 60 * 60 * 6;
1889        case TR_PEER_FROM_TRACKER  : return 60 * 60 * 3;
1890        case TR_PEER_FROM_DHT      : return 60 * 60 * 3;
1891        case TR_PEER_FROM_PEX      : return 60 * 60 * 2;
1892        case TR_PEER_FROM_RESUME   : return 60 * 60;
1893        case TR_PEER_FROM_LPD      : return 10 * 60;
1894        default                    : return 60 * 60;
1895    }
1896}
1897
1898static void
1899ensureAtomExists (Torrent           * t,
1900                  const tr_address  * addr,
1901                  const tr_port       port,
1902                  const uint8_t       flags,
1903                  const int8_t        seedProbability,
1904                  const uint8_t       from)
1905{
1906    struct peer_atom * a;
1907
1908    assert (tr_address_is_valid (addr));
1909    assert (from < TR_PEER_FROM__MAX);
1910
1911    a = getExistingAtom (t, addr);
1912
1913    if (a == NULL)
1914    {
1915        const int jitter = tr_cryptoWeakRandInt (60*10);
1916        a = tr_new0 (struct peer_atom, 1);
1917        a->addr = *addr;
1918        a->port = port;
1919        a->flags = flags;
1920        a->fromFirst = from;
1921        a->fromBest = from;
1922        a->shelf_date = tr_time () + getDefaultShelfLife (from) + jitter;
1923        a->blocklisted = -1;
1924        atomSetSeedProbability (a, seedProbability);
1925        tr_ptrArrayInsertSorted (&t->pool, a, compareAtomsByAddress);
1926
1927        tordbg (t, "got a new atom: %s", tr_atomAddrStr (a));
1928    }
1929    else
1930    {
1931        if (from < a->fromBest)
1932            a->fromBest = from;
1933
1934        if (a->seedProbability == -1)
1935            atomSetSeedProbability (a, seedProbability);
1936
1937        a->flags |= flags;
1938    }
1939}
1940
1941static int
1942getMaxPeerCount (const tr_torrent * tor)
1943{
1944    return tor->maxConnectedPeers;
1945}
1946
1947static int
1948getPeerCount (const Torrent * t)
1949{
1950    return tr_ptrArraySize (&t->peers);/* + tr_ptrArraySize (&t->outgoingHandshakes); */
1951}
1952
1953/* FIXME: this is kind of a mess. */
1954static bool
1955myHandshakeDoneCB (tr_handshake  * handshake,
1956                   tr_peerIo     * io,
1957                   bool            readAnythingFromPeer,
1958                   bool            isConnected,
1959                   const uint8_t * peer_id,
1960                   void          * vmanager)
1961{
1962    bool               ok = isConnected;
1963    bool               success = false;
1964    tr_port            port;
1965    const tr_address * addr;
1966    tr_peerMgr       * manager = vmanager;
1967    Torrent          * t;
1968    tr_handshake     * ours;
1969
1970    assert (io);
1971    assert (tr_isBool (ok));
1972
1973    t = tr_peerIoHasTorrentHash (io)
1974        ? getExistingTorrent (manager, tr_peerIoGetTorrentHash (io))
1975        : NULL;
1976
1977    if (tr_peerIoIsIncoming (io))
1978        ours = tr_ptrArrayRemoveSorted (&manager->incomingHandshakes,
1979                                        handshake, handshakeCompare);
1980    else if (t)
1981        ours = tr_ptrArrayRemoveSorted (&t->outgoingHandshakes,
1982                                        handshake, handshakeCompare);
1983    else
1984        ours = handshake;
1985
1986    assert (ours);
1987    assert (ours == handshake);
1988
1989    if (t)
1990        torrentLock (t);
1991
1992    addr = tr_peerIoGetAddress (io, &port);
1993
1994    if (!ok || !t || !t->isRunning)
1995    {
1996        if (t)
1997        {
1998            struct peer_atom * atom = getExistingAtom (t, addr);
1999            if (atom)
2000            {
2001                ++atom->numFails;
2002
2003                if (!readAnythingFromPeer)
2004                {
2005                    tordbg (t, "marking peer %s as unreachable... numFails is %d", tr_atomAddrStr (atom), (int)atom->numFails);
2006                    atom->flags2 |= MYFLAG_UNREACHABLE;
2007                }
2008            }
2009        }
2010    }
2011    else /* looking good */
2012    {
2013        struct peer_atom * atom;
2014
2015        ensureAtomExists (t, addr, port, 0, -1, TR_PEER_FROM_INCOMING);
2016        atom = getExistingAtom (t, addr);
2017        atom->time = tr_time ();
2018        atom->piece_data_time = 0;
2019        atom->lastConnectionAt = tr_time ();
2020
2021        if (!tr_peerIoIsIncoming (io))
2022        {
2023            atom->flags |= ADDED_F_CONNECTABLE;
2024            atom->flags2 &= ~MYFLAG_UNREACHABLE;
2025        }
2026
2027        /* In principle, this flag specifies whether the peer groks uTP,
2028           not whether it's currently connected over uTP. */
2029        if (io->utp_socket)
2030            atom->flags |= ADDED_F_UTP_FLAGS;
2031
2032        if (atom->flags2 & MYFLAG_BANNED)
2033        {
2034            tordbg (t, "banned peer %s tried to reconnect",
2035                    tr_atomAddrStr (atom));
2036        }
2037        else if (tr_peerIoIsIncoming (io)
2038               && (getPeerCount (t) >= getMaxPeerCount (t->tor)))
2039
2040        {
2041        }
2042        else
2043        {
2044            tr_peer * peer = atom->peer;
2045
2046            if (peer) /* we already have this peer */
2047            {
2048            }
2049            else
2050            {
2051                peer = getPeer (t, atom);
2052                tr_free (peer->client);
2053
2054                if (!peer_id)
2055                    peer->client = NULL;
2056                else {
2057                    char client[128];
2058                    tr_clientForId (client, sizeof (client), peer_id);
2059                    peer->client = tr_strdup (client);
2060                }
2061
2062                peer->io = tr_handshakeStealIO (handshake); /* this steals its refcount too, which is
2063                                                                balanced by our unref in peerDelete ()  */
2064                tr_peerIoSetParent (peer->io, &t->tor->bandwidth);
2065                tr_peerMsgsNew (t->tor, peer, peerCallbackFunc, t);
2066
2067                success = true;
2068            }
2069        }
2070    }
2071
2072    if (t)
2073        torrentUnlock (t);
2074
2075    return success;
2076}
2077
2078void
2079tr_peerMgrAddIncoming (tr_peerMgr * manager,
2080                       tr_address * addr,
2081                       tr_port      port,
2082                       int          socket,
2083                       struct UTPSocket * utp_socket)
2084{
2085    tr_session * session;
2086
2087    managerLock (manager);
2088
2089    assert (tr_isSession (manager->session));
2090    session = manager->session;
2091
2092    if (tr_sessionIsAddressBlocked (session, addr))
2093    {
2094        tr_dbg ("Banned IP address \"%s\" tried to connect to us", tr_address_to_string (addr));
2095        if (socket >= 0)
2096            tr_netClose (session, socket);
2097        else
2098            UTP_Close (utp_socket);
2099    }
2100    else if (getExistingHandshake (&manager->incomingHandshakes, addr))
2101    {
2102        if (socket >= 0)
2103            tr_netClose (session, socket);
2104        else
2105            UTP_Close (utp_socket);
2106    }
2107    else /* we don't have a connection to them yet... */
2108    {
2109        tr_peerIo *    io;
2110        tr_handshake * handshake;
2111
2112        io = tr_peerIoNewIncoming (session, &session->bandwidth, addr, port, socket, utp_socket);
2113
2114        handshake = tr_handshakeNew (io,
2115                                     session->encryptionMode,
2116                                     myHandshakeDoneCB,
2117                                     manager);
2118
2119        tr_peerIoUnref (io); /* balanced by the implicit ref in tr_peerIoNewIncoming () */
2120
2121        tr_ptrArrayInsertSorted (&manager->incomingHandshakes, handshake,
2122                                 handshakeCompare);
2123    }
2124
2125    managerUnlock (manager);
2126}
2127
2128void
2129tr_peerMgrAddPex (tr_torrent * tor, uint8_t from,
2130                  const tr_pex * pex, int8_t seedProbability)
2131{
2132    if (tr_isPex (pex)) /* safeguard against corrupt data */
2133    {
2134        Torrent * t = tor->torrentPeers;
2135        managerLock (t->manager);
2136
2137        if (!tr_sessionIsAddressBlocked (t->manager->session, &pex->addr))
2138            if (tr_address_is_valid_for_peers (&pex->addr, pex->port))
2139                ensureAtomExists (t, &pex->addr, pex->port, pex->flags, seedProbability, from);
2140
2141        managerUnlock (t->manager);
2142    }
2143}
2144
2145void
2146tr_peerMgrMarkAllAsSeeds (tr_torrent * tor)
2147{
2148    Torrent * t = tor->torrentPeers;
2149    const int n = tr_ptrArraySize (&t->pool);
2150    struct peer_atom ** it = (struct peer_atom**) tr_ptrArrayBase (&t->pool);
2151    struct peer_atom ** end = it + n;
2152
2153    while (it != end)
2154        atomSetSeed (t, *it++);
2155}
2156
2157tr_pex *
2158tr_peerMgrCompactToPex (const void *    compact,
2159                        size_t          compactLen,
2160                        const uint8_t * added_f,
2161                        size_t          added_f_len,
2162                        size_t *        pexCount)
2163{
2164    size_t          i;
2165    size_t          n = compactLen / 6;
2166    const uint8_t * walk = compact;
2167    tr_pex *        pex = tr_new0 (tr_pex, n);
2168
2169    for (i = 0; i < n; ++i)
2170    {
2171        pex[i].addr.type = TR_AF_INET;
2172        memcpy (&pex[i].addr.addr, walk, 4); walk += 4;
2173        memcpy (&pex[i].port, walk, 2); walk += 2;
2174        if (added_f && (n == added_f_len))
2175            pex[i].flags = added_f[i];
2176    }
2177
2178    *pexCount = n;
2179    return pex;
2180}
2181
2182tr_pex *
2183tr_peerMgrCompact6ToPex (const void    * compact,
2184                         size_t          compactLen,
2185                         const uint8_t * added_f,
2186                         size_t          added_f_len,
2187                         size_t        * pexCount)
2188{
2189    size_t          i;
2190    size_t          n = compactLen / 18;
2191    const uint8_t * walk = compact;
2192    tr_pex *        pex = tr_new0 (tr_pex, n);
2193
2194    for (i = 0; i < n; ++i)
2195    {
2196        pex[i].addr.type = TR_AF_INET6;
2197        memcpy (&pex[i].addr.addr.addr6.s6_addr, walk, 16); walk += 16;
2198        memcpy (&pex[i].port, walk, 2); walk += 2;
2199        if (added_f && (n == added_f_len))
2200            pex[i].flags = added_f[i];
2201    }
2202
2203    *pexCount = n;
2204    return pex;
2205}
2206
2207tr_pex *
2208tr_peerMgrArrayToPex (const void * array,
2209                      size_t       arrayLen,
2210                      size_t      * pexCount)
2211{
2212    size_t          i;
2213    size_t          n = arrayLen / (sizeof (tr_address) + 2);
2214    /*size_t          n = arrayLen / sizeof (tr_peerArrayElement);*/
2215    const uint8_t * walk = array;
2216    tr_pex        * pex = tr_new0 (tr_pex, n);
2217
2218    for (i = 0 ; i < n ; i++) {
2219        memcpy (&pex[i].addr, walk, sizeof (tr_address));
2220        memcpy (&pex[i].port, walk + sizeof (tr_address), 2);
2221        pex[i].flags = 0x00;
2222        walk += sizeof (tr_address) + 2;
2223    }
2224
2225    *pexCount = n;
2226    return pex;
2227}
2228
2229/**
2230***
2231**/
2232
2233static void
2234tr_peerMgrSetBlame (tr_torrent     * tor,
2235                    tr_piece_index_t pieceIndex,
2236                    int              success)
2237{
2238    if (!success)
2239    {
2240        int        peerCount, i;
2241        Torrent *  t = tor->torrentPeers;
2242        tr_peer ** peers;
2243
2244        assert (torrentIsLocked (t));
2245
2246        peers = (tr_peer **) tr_ptrArrayPeek (&t->peers, &peerCount);
2247        for (i = 0; i < peerCount; ++i)
2248        {
2249            tr_peer * peer = peers[i];
2250            if (tr_bitfieldHas (&peer->blame, pieceIndex))
2251            {
2252                tordbg (t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
2253                        tr_atomAddrStr (peer->atom),
2254                        pieceIndex, (int)peer->strikes + 1);
2255                addStrike (t, peer);
2256            }
2257        }
2258    }
2259}
2260
2261int
2262tr_pexCompare (const void * va, const void * vb)
2263{
2264    const tr_pex * a = va;
2265    const tr_pex * b = vb;
2266    int i;
2267
2268    assert (tr_isPex (a));
2269    assert (tr_isPex (b));
2270
2271    if ((i = tr_address_compare (&a->addr, &b->addr)))
2272        return i;
2273
2274    if (a->port != b->port)
2275        return a->port < b->port ? -1 : 1;
2276
2277    return 0;
2278}
2279
2280/* better goes first */
2281static int
2282compareAtomsByUsefulness (const void * va, const void *vb)
2283{
2284    const struct peer_atom * a = * (const struct peer_atom**) va;
2285    const struct peer_atom * b = * (const struct peer_atom**) vb;
2286
2287    assert (tr_isAtom (a));
2288    assert (tr_isAtom (b));
2289
2290    if (a->piece_data_time != b->piece_data_time)
2291        return a->piece_data_time > b->piece_data_time ? -1 : 1;
2292    if (a->fromBest != b->fromBest)
2293        return a->fromBest < b->fromBest ? -1 : 1;
2294    if (a->numFails != b->numFails)
2295        return a->numFails < b->numFails ? -1 : 1;
2296
2297    return 0;
2298}
2299
2300static bool
2301isAtomInteresting (const tr_torrent * tor, struct peer_atom * atom)
2302{
2303    if (tr_torrentIsSeed (tor) && atomIsSeed (atom))
2304        return false;
2305
2306    if (peerIsInUse (tor->torrentPeers, atom))
2307        return true;
2308
2309    if (isAtomBlocklisted (tor->session, atom))
2310        return false;
2311
2312    if (atom->flags2 & MYFLAG_BANNED)
2313        return false;
2314
2315    return true;
2316}
2317
2318int
2319tr_peerMgrGetPeers (tr_torrent   * tor,
2320                    tr_pex      ** setme_pex,
2321                    uint8_t        af,
2322                    uint8_t        list_mode,
2323                    int            maxCount)
2324{
2325    int i;
2326    int n;
2327    int count = 0;
2328    int atomCount = 0;
2329    const Torrent * t = tor->torrentPeers;
2330    struct peer_atom ** atoms = NULL;
2331    tr_pex * pex;
2332    tr_pex * walk;
2333
2334    assert (tr_isTorrent (tor));
2335    assert (setme_pex != NULL);
2336    assert (af==TR_AF_INET || af==TR_AF_INET6);
2337    assert (list_mode==TR_PEERS_CONNECTED || list_mode==TR_PEERS_INTERESTING);
2338
2339    managerLock (t->manager);
2340
2341    /**
2342    ***  build a list of atoms
2343    **/
2344
2345    if (list_mode == TR_PEERS_CONNECTED) /* connected peers only */
2346    {
2347        int i;
2348        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase (&t->peers);
2349        atomCount = tr_ptrArraySize (&t->peers);
2350        atoms = tr_new (struct peer_atom *, atomCount);
2351        for (i=0; i<atomCount; ++i)
2352            atoms[i] = peers[i]->atom;
2353    }
2354    else /* TR_PEERS_INTERESTING */
2355    {
2356        int i;
2357        struct peer_atom ** atomBase = (struct peer_atom**) tr_ptrArrayBase (&t->pool);
2358        n = tr_ptrArraySize (&t->pool);
2359        atoms = tr_new (struct peer_atom *, n);
2360        for (i=0; i<n; ++i)
2361            if (isAtomInteresting (tor, atomBase[i]))
2362                atoms[atomCount++] = atomBase[i];
2363    }
2364
2365    qsort (atoms, atomCount, sizeof (struct peer_atom *), compareAtomsByUsefulness);
2366
2367    /**
2368    ***  add the first N of them into our return list
2369    **/
2370
2371    n = MIN (atomCount, maxCount);
2372    pex = walk = tr_new0 (tr_pex, n);
2373
2374    for (i=0; i<atomCount && count<n; ++i)
2375    {
2376        const struct peer_atom * atom = atoms[i];
2377        if (atom->addr.type == af)
2378        {
2379            assert (tr_address_is_valid (&atom->addr));
2380            walk->addr = atom->addr;
2381            walk->port = atom->port;
2382            walk->flags = atom->flags;
2383            ++count;
2384            ++walk;
2385        }
2386    }
2387
2388    qsort (pex, count, sizeof (tr_pex), tr_pexCompare);
2389
2390    assert ((walk - pex) == count);
2391    *setme_pex = pex;
2392
2393    /* cleanup */
2394    tr_free (atoms);
2395    managerUnlock (t->manager);
2396    return count;
2397}
2398
2399static void atomPulse    (int, short, void *);
2400static void bandwidthPulse (int, short, void *);
2401static void rechokePulse (int, short, void *);
2402static void reconnectPulse (int, short, void *);
2403
2404static struct event *
2405createTimer (tr_session * session, int msec, void (*callback)(int, short, void *), void * cbdata)
2406{
2407    struct event * timer = evtimer_new (session->event_base, callback, cbdata);
2408    tr_timerAddMsec (timer, msec);
2409    return timer;
2410}
2411
2412static void
2413ensureMgrTimersExist (struct tr_peerMgr * m)
2414{
2415    if (m->atomTimer == NULL)
2416        m->atomTimer = createTimer (m->session, ATOM_PERIOD_MSEC, atomPulse, m);
2417
2418    if (m->bandwidthTimer == NULL)
2419        m->bandwidthTimer = createTimer (m->session, BANDWIDTH_PERIOD_MSEC, bandwidthPulse, m);
2420
2421    if (m->rechokeTimer == NULL)
2422        m->rechokeTimer = createTimer (m->session, RECHOKE_PERIOD_MSEC, rechokePulse, m);
2423
2424    if (m->refillUpkeepTimer == NULL)
2425        m->refillUpkeepTimer = createTimer (m->session, REFILL_UPKEEP_PERIOD_MSEC, refillUpkeep, m);
2426}
2427
2428void
2429tr_peerMgrStartTorrent (tr_torrent * tor)
2430{
2431    Torrent * t = tor->torrentPeers;
2432
2433    assert (tr_isTorrent (tor));
2434    assert (tr_torrentIsLocked (tor));
2435
2436    ensureMgrTimersExist (t->manager);
2437
2438    t->isRunning = true;
2439    t->maxPeers = t->tor->maxConnectedPeers;
2440    t->pieceSortState = PIECES_UNSORTED;
2441
2442    rechokePulse (0, 0, t->manager);
2443}
2444
2445static void
2446stopTorrent (Torrent * t)
2447{
2448    tr_peer * peer;
2449
2450    t->isRunning = false;
2451
2452    replicationFree (t);
2453    invalidatePieceSorting (t);
2454
2455    /* disconnect the peers. */
2456    while ((peer = tr_ptrArrayPop (&t->peers)))
2457        peerDelete (t, peer);
2458
2459    /* disconnect the handshakes. handshakeAbort calls handshakeDoneCB (),
2460     * which removes the handshake from t->outgoingHandshakes... */
2461    while (!tr_ptrArrayEmpty (&t->outgoingHandshakes))
2462        tr_handshakeAbort (tr_ptrArrayNth (&t->outgoingHandshakes, 0));
2463}
2464
2465void
2466tr_peerMgrStopTorrent (tr_torrent * tor)
2467{
2468    assert (tr_isTorrent (tor));
2469    assert (tr_torrentIsLocked (tor));
2470
2471    stopTorrent (tor->torrentPeers);
2472}
2473
2474void
2475tr_peerMgrAddTorrent (tr_peerMgr * manager, tr_torrent * tor)
2476{
2477    assert (tr_isTorrent (tor));
2478    assert (tr_torrentIsLocked (tor));
2479    assert (tor->torrentPeers == NULL);
2480
2481    tor->torrentPeers = torrentNew (manager, tor);
2482}
2483
2484void
2485tr_peerMgrRemoveTorrent (tr_torrent * tor)
2486{
2487    assert (tr_isTorrent (tor));
2488    assert (tr_torrentIsLocked (tor));
2489
2490    stopTorrent (tor->torrentPeers);
2491    torrentFree (tor->torrentPeers);
2492}
2493
2494void
2495tr_peerUpdateProgress (tr_torrent * tor, tr_peer * peer)
2496{
2497    const tr_bitfield * have = &peer->have;
2498
2499    if (tr_bitfieldHasAll (have))
2500    {
2501        peer->progress = 1.0;
2502    }
2503    else if (tr_bitfieldHasNone (have))
2504    {
2505        peer->progress = 0.0;
2506    }
2507    else
2508    {
2509        const float true_count = tr_bitfieldCountTrueBits (have);
2510
2511        if (tr_torrentHasMetadata (tor))
2512        {
2513            peer->progress = true_count / tor->info.pieceCount;
2514        }
2515        else /* without pieceCount, this result is only a best guess... */
2516        {
2517            peer->progress = true_count / (have->bit_count + 1);
2518        }
2519    }
2520
2521    /* clamp the progress range */
2522    if (peer->progress < 0.0)
2523        peer->progress = 0.0;
2524    if (peer->progress > 1.0)
2525        peer->progress = 1.0;
2526
2527    if (peer->atom && (peer->progress >= 1.0))
2528        atomSetSeed (tor->torrentPeers, peer->atom);
2529}
2530
2531void
2532tr_peerMgrOnTorrentGotMetainfo (tr_torrent * tor)
2533{
2534    int i;
2535    int peerCount;
2536    tr_peer ** peers;
2537
2538    /* the webseed list may have changed... */
2539    rebuildWebseedArray (tor->torrentPeers, tor);
2540
2541    /* some peer_msgs' progress fields may not be accurate if we
2542       didn't have the metadata before now... so refresh them all... */
2543    peerCount = tr_ptrArraySize (&tor->torrentPeers->peers);
2544    peers = (tr_peer**) tr_ptrArrayBase (&tor->torrentPeers->peers);
2545    for (i=0; i<peerCount; ++i)
2546        tr_peerUpdateProgress (tor, peers[i]);
2547
2548}
2549
2550void
2551tr_peerMgrTorrentAvailability (tr_torrent * tor, int8_t * tab, unsigned int tabCount)
2552{
2553  assert (tab != NULL);
2554  assert (tabCount > 0);
2555
2556  if (tr_torrentRef (tor))
2557    {
2558      memset (tab, 0, tabCount);
2559
2560      if (tr_torrentHasMetadata (tor))
2561        {
2562          tr_piece_index_t i;
2563          const int peerCount = tr_ptrArraySize (&tor->torrentPeers->peers);
2564          const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase (&tor->torrentPeers->peers);
2565          const float interval = tor->info.pieceCount / (float)tabCount;
2566          const bool isSeed = tr_cpGetStatus (&tor->completion) == TR_SEED;
2567
2568          for (i=0; i<tabCount; ++i)
2569            {
2570              const int piece = i * interval;
2571
2572              if (isSeed || tr_cpPieceIsComplete (&tor->completion, piece))
2573                {
2574                  tab[i] = -1;
2575                }
2576              else if (peerCount)
2577                {
2578                  int j;
2579                  for (j=0; j<peerCount; ++j)
2580                    if (tr_bitfieldHas (&peers[j]->have, piece))
2581                      ++tab[i];
2582                }
2583            }
2584        }
2585
2586      tr_torrentUnref (tor);
2587    }
2588}
2589
2590static bool
2591peerIsSeed (const tr_peer * peer)
2592{
2593    if (peer->progress >= 1.0)
2594        return true;
2595
2596    if (peer->atom && atomIsSeed (peer->atom))
2597        return true;
2598
2599    return false;
2600}
2601
2602/* count how many bytes we want that connected peers have */
2603uint64_t
2604tr_peerMgrGetDesiredAvailable (const tr_torrent * tor)
2605{
2606    size_t i;
2607    size_t n;
2608    uint64_t desiredAvailable;
2609    const Torrent * t = tor->torrentPeers;
2610
2611    /* common shortcuts... */
2612
2613    if (tr_torrentIsSeed (t->tor))
2614        return 0;
2615
2616    if (!tr_torrentHasMetadata (tor))
2617        return 0;
2618
2619    n = tr_ptrArraySize (&t->peers);
2620    if (n == 0)
2621        return 0;
2622    else {
2623        const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase (&t->peers);
2624        for (i=0; i<n; ++i)
2625            if (peers[i]->atom && atomIsSeed (peers[i]->atom))
2626                return tr_cpLeftUntilDone (&tor->completion);
2627    }
2628
2629    if (!t->pieceReplication || !t->pieceReplicationSize)
2630        return 0;
2631
2632    /* do it the hard way */
2633
2634    desiredAvailable = 0;
2635    for (i=0, n=MIN (tor->info.pieceCount, t->pieceReplicationSize); i<n; ++i)
2636        if (!tor->info.pieces[i].dnd && (t->pieceReplication[i] > 0))
2637            desiredAvailable += tr_cpMissingBytesInPiece (&t->tor->completion, i);
2638
2639    assert (desiredAvailable <= tor->info.totalSize);
2640    return desiredAvailable;
2641}
2642
2643void
2644tr_peerMgrTorrentStats (tr_torrent  * tor,
2645                        int         * setmePeersConnected,
2646                        int         * setmeWebseedsSendingToUs,
2647                        int         * setmePeersSendingToUs,
2648                        int         * setmePeersGettingFromUs,
2649                        int         * setmePeersFrom)
2650{
2651  *setmePeersConnected       = 0;
2652  *setmePeersGettingFromUs   = 0;
2653  *setmePeersSendingToUs     = 0;
2654  *setmeWebseedsSendingToUs  = 0;
2655
2656  if (tr_torrentRef (tor))
2657    {
2658      int i;
2659      const Torrent * t = tor->torrentPeers;
2660      const int size = tr_ptrArraySize (&t->peers);
2661      const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase (&t->peers);
2662
2663      for (i=0; i<TR_PEER_FROM__MAX; ++i)
2664        setmePeersFrom[i] = 0;
2665
2666      for (i=0; i<size; ++i)
2667        {
2668          const tr_peer * peer = peers[i];
2669          const struct peer_atom * atom = peer->atom;
2670
2671          if (peer->io == NULL) /* not connected */
2672            continue;
2673
2674          ++*setmePeersConnected;
2675
2676          ++setmePeersFrom[atom->fromFirst];
2677
2678          if (clientIsDownloadingFrom (tor, peer))
2679            ++*setmePeersSendingToUs;
2680
2681          if (clientIsUploadingTo (peer))
2682            ++*setmePeersGettingFromUs;
2683        }
2684
2685      *setmeWebseedsSendingToUs = countActiveWebseeds (t);
2686      tr_torrentUnref (tor);
2687    }
2688}
2689
2690double*
2691tr_peerMgrWebSpeeds_KBps (tr_torrent * tor)
2692{
2693  double * ret = NULL;
2694
2695  if (tr_torrentRef (tor))
2696    {
2697      int i;
2698      const Torrent * t = tor->torrentPeers;
2699      const int webseedCount = tr_ptrArraySize (&t->webseeds);
2700      const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase (&t->webseeds);
2701      const uint64_t now = tr_time_msec ();
2702
2703      ret = tr_new0 (double, webseedCount);
2704
2705      assert (t->manager != NULL);
2706      assert (webseedCount == tor->info.webseedCount);
2707
2708      for (i=0; i<webseedCount; ++i)
2709        {
2710          unsigned int Bps;
2711          if (tr_webseedGetSpeed_Bps (webseeds[i], now, &Bps))
2712            ret[i] = Bps / (double)tr_speed_K;
2713          else
2714            ret[i] = -1.0;
2715        }
2716
2717      tr_torrentUnref (tor);
2718    }
2719
2720  return ret;
2721}
2722
2723unsigned int
2724tr_peerGetPieceSpeed_Bps (const tr_peer * peer, uint64_t now, tr_direction direction)
2725{
2726    return peer->io ? tr_peerIoGetPieceSpeed_Bps (peer->io, now, direction) : 0.0;
2727}
2728
2729struct tr_peer_stat *
2730tr_peerMgrPeerStats (tr_torrent * tor, int * setmeCount)
2731{
2732  int size = 0;
2733  tr_peer_stat * ret = NULL;
2734
2735  if (tr_torrentRef (tor))
2736    {
2737      int i;
2738      const Torrent * t = tor->torrentPeers;
2739      const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase (&t->peers);
2740      const uint64_t now_msec = tr_time_msec ();
2741      const time_t now = tr_time ();
2742
2743      assert (t->manager != NULL);
2744
2745      size = tr_ptrArraySize (&t->peers);
2746      ret = tr_new0 (tr_peer_stat, size);
2747
2748      for (i=0; i<size; ++i)
2749        {
2750          char *                   pch;
2751          const tr_peer *          peer = peers[i];
2752          const struct peer_atom * atom = peer->atom;
2753          tr_peer_stat *           stat = ret + i;
2754
2755          tr_address_to_string_with_buf (&atom->addr, stat->addr, sizeof (stat->addr));
2756          tr_strlcpy (stat->client, (peer->client ? peer->client : ""), sizeof (stat->client));
2757          stat->port                = ntohs (peer->atom->port);
2758          stat->from                = atom->fromFirst;
2759          stat->progress            = peer->progress;
2760          stat->isUTP               = peer->io->utp_socket != NULL;
2761          stat->isEncrypted         = tr_peerIoIsEncrypted (peer->io) ? 1 : 0;
2762          stat->rateToPeer_KBps     = toSpeedKBps (tr_peerGetPieceSpeed_Bps (peer, now_msec, TR_CLIENT_TO_PEER));
2763          stat->rateToClient_KBps   = toSpeedKBps (tr_peerGetPieceSpeed_Bps (peer, now_msec, TR_PEER_TO_CLIENT));
2764          stat->peerIsChoked        = peer->peerIsChoked;
2765          stat->peerIsInterested    = peer->peerIsInterested;
2766          stat->clientIsChoked      = peer->clientIsChoked;
2767          stat->clientIsInterested  = peer->clientIsInterested;
2768          stat->isIncoming          = tr_peerIoIsIncoming (peer->io);
2769          stat->isDownloadingFrom   = clientIsDownloadingFrom (tor, peer);
2770          stat->isUploadingTo       = clientIsUploadingTo (peer);
2771          stat->isSeed              = peerIsSeed (peer);
2772
2773          stat->blocksToPeer        = tr_historyGet (&peer->blocksSentToPeer,    now, CANCEL_HISTORY_SEC);
2774          stat->blocksToClient      = tr_historyGet (&peer->blocksSentToClient,  now, CANCEL_HISTORY_SEC);
2775          stat->cancelsToPeer       = tr_historyGet (&peer->cancelsSentToPeer,   now, CANCEL_HISTORY_SEC);
2776          stat->cancelsToClient     = tr_historyGet (&peer->cancelsSentToClient, now, CANCEL_HISTORY_SEC);
2777
2778          stat->pendingReqsToPeer   = peer->pendingReqsToPeer;
2779          stat->pendingReqsToClient = peer->pendingReqsToClient;
2780
2781          pch = stat->flagStr;
2782          if (stat->isUTP) *pch++ = 'T';
2783          if (t->optimistic == peer) *pch++ = 'O';
2784          if (stat->isDownloadingFrom) *pch++ = 'D';
2785          else if (stat->clientIsInterested) *pch++ = 'd';
2786          if (stat->isUploadingTo) *pch++ = 'U';
2787          else if (stat->peerIsInterested) *pch++ = 'u';
2788          if (!stat->clientIsChoked && !stat->clientIsInterested) *pch++ = 'K';
2789          if (!stat->peerIsChoked && !stat->peerIsInterested) *pch++ = '?';
2790          if (stat->isEncrypted) *pch++ = 'E';
2791          if (stat->from == TR_PEER_FROM_DHT) *pch++ = 'H';
2792          else if (stat->from == TR_PEER_FROM_PEX) *pch++ = 'X';
2793          if (stat->isIncoming) *pch++ = 'I';
2794          *pch = '\0';
2795        }
2796
2797      tr_torrentUnref (tor);
2798    }
2799
2800  *setmeCount = size;
2801  return ret;
2802}
2803
2804/***
2805****
2806****
2807***/
2808
2809void
2810tr_peerMgrClearInterest (tr_torrent * tor)
2811{
2812    int i;
2813    Torrent * t = tor->torrentPeers;
2814    const int peerCount = tr_ptrArraySize (&t->peers);
2815
2816    assert (tr_isTorrent (tor));
2817    assert (tr_torrentIsLocked (tor));
2818
2819    for (i=0; i<peerCount; ++i)
2820    {
2821        const tr_peer * peer = tr_ptrArrayNth (&t->peers, i);
2822        tr_peerMsgsSetInterested (peer->msgs, false);
2823    }
2824}
2825
2826/* does this peer have any pieces that we want? */
2827static bool
2828isPeerInteresting (tr_torrent     * const tor,
2829                   const bool     * const piece_is_interesting,
2830                   const tr_peer  * const peer)
2831{
2832  tr_piece_index_t i, n;
2833
2834  /* these cases should have already been handled by the calling code... */
2835  assert (!tr_torrentIsSeed (tor));
2836  assert (tr_torrentIsPieceTransferAllowed (tor, TR_PEER_TO_CLIENT));
2837
2838  if (peerIsSeed (peer))
2839    return true;
2840
2841  for (i=0, n=tor->info.pieceCount; i<n; ++i)
2842    if (piece_is_interesting[i] && tr_bitfieldHas (&peer->have, i))
2843      return true;
2844
2845  return false;
2846}
2847
2848typedef enum
2849{
2850    RECHOKE_STATE_GOOD,
2851    RECHOKE_STATE_UNTESTED,
2852    RECHOKE_STATE_BAD
2853}
2854tr_rechoke_state;
2855
2856struct tr_rechoke_info
2857{
2858    tr_peer * peer;
2859    int salt;
2860    int rechoke_state;
2861};
2862
2863static int
2864compare_rechoke_info (const void * va, const void * vb)
2865{
2866    const struct tr_rechoke_info * a = va;
2867    const struct tr_rechoke_info * b = vb;
2868
2869    if (a->rechoke_state != b->rechoke_state)
2870        return a->rechoke_state - b->rechoke_state;
2871
2872    return a->salt - b->salt;
2873}
2874
2875/* determines who we send "interested" messages to */
2876static void
2877rechokeDownloads (Torrent * t)
2878{
2879    int i;
2880    int maxPeers = 0;
2881    int rechoke_count = 0;
2882    struct tr_rechoke_info * rechoke = NULL;
2883    const int MIN_INTERESTING_PEERS = 5;
2884    const int peerCount = tr_ptrArraySize (&t->peers);
2885    const time_t now = tr_time ();
2886
2887    /* some cases where this function isn't necessary */
2888    if (tr_torrentIsSeed (t->tor))
2889        return;
2890    if (!tr_torrentIsPieceTransferAllowed (t->tor, TR_PEER_TO_CLIENT))
2891        return;
2892
2893    /* decide HOW MANY peers to be interested in */
2894    {
2895        int blocks = 0;
2896        int cancels = 0;
2897        time_t timeSinceCancel;
2898
2899        /* Count up how many blocks & cancels each peer has.
2900         *
2901         * There are two situations where we send out cancels --
2902         *
2903         * 1. We've got unresponsive peers, which is handled by deciding
2904         *    -which- peers to be interested in.
2905         *
2906         * 2. We've hit our bandwidth cap, which is handled by deciding
2907         *    -how many- peers to be interested in.
2908         *
2909         * We're working on 2. here, so we need to ignore unresponsive
2910         * peers in our calculations lest they confuse Transmission into
2911         * thinking it's hit its bandwidth cap.
2912         */
2913        for (i=0; i<peerCount; ++i)
2914        {
2915            const tr_peer * peer = tr_ptrArrayNth (&t->peers, i);
2916            const int b = tr_historyGet (&peer->blocksSentToClient, now, CANCEL_HISTORY_SEC);
2917            const int c = tr_historyGet (&peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC);
2918
2919            if (b == 0) /* ignore unresponsive peers, as described above */
2920                continue;
2921
2922            blocks += b;
2923            cancels += c;
2924        }
2925
2926        if (cancels > 0)
2927        {
2928            /* cancelRate: of the block requests we've recently made, the percentage we cancelled.
2929             * higher values indicate more congestion. */
2930            const double cancelRate = cancels / (double)(cancels + blocks);
2931            const double mult = 1 - MIN (cancelRate, 0.5);
2932            maxPeers = t->interestedCount * mult;
2933            tordbg (t, "cancel rate is %.3f -- reducing the "
2934                       "number of peers we're interested in by %.0f percent",
2935                       cancelRate, mult * 100);
2936            t->lastCancel = now;
2937        }
2938
2939        timeSinceCancel = now - t->lastCancel;
2940        if (timeSinceCancel)
2941        {
2942            const int maxIncrease = 15;
2943            const time_t maxHistory = 2 * CANCEL_HISTORY_SEC;
2944            const double mult = MIN (timeSinceCancel, maxHistory) / (double) maxHistory;
2945            const int inc = maxIncrease * mult;
2946            maxPeers = t->maxPeers + inc;
2947            tordbg (t, "time since last cancel is %li -- increasing the "
2948                       "number of peers we're interested in by %d",
2949                       timeSinceCancel, inc);
2950        }
2951    }
2952
2953    /* don't let the previous section's number tweaking go too far... */
2954    if (maxPeers < MIN_INTERESTING_PEERS)
2955        maxPeers = MIN_INTERESTING_PEERS;
2956    if (maxPeers > t->tor->maxConnectedPeers)
2957        maxPeers = t->tor->maxConnectedPeers;
2958
2959    t->maxPeers = maxPeers;
2960
2961    if (peerCount > 0)
2962    {
2963        bool * piece_is_interesting;
2964        const tr_torrent * const tor = t->tor;
2965        const int n = tor->info.pieceCount;
2966
2967        /* build a bitfield of interesting pieces... */
2968        piece_is_interesting = tr_new (bool, n);
2969        for (i=0; i<n; i++)
2970            piece_is_interesting[i] = !tor->info.pieces[i].dnd && !tr_cpPieceIsComplete (&tor->completion, i);
2971
2972        /* decide WHICH peers to be interested in (based on their cancel-to-block ratio) */
2973        for (i=0; i<peerCount; ++i)
2974        {
2975            tr_peer * peer = tr_ptrArrayNth (&t->peers, i);
2976
2977            if (!isPeerInteresting (t->tor, piece_is_interesting, peer))
2978            {
2979                tr_peerMsgsSetInterested (peer->msgs, false);
2980            }
2981            else
2982            {
2983                tr_rechoke_state rechoke_state;
2984                const int blocks = tr_historyGet (&peer->blocksSentToClient, now, CANCEL_HISTORY_SEC);
2985                const int cancels = tr_historyGet (&peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC);
2986
2987                if (!blocks && !cancels)
2988                    rechoke_state = RECHOKE_STATE_UNTESTED;
2989                else if (!cancels)
2990                    rechoke_state = RECHOKE_STATE_GOOD;
2991                else if (!blocks)
2992                    rechoke_state = RECHOKE_STATE_BAD;
2993                else if ((cancels * 10) < blocks)
2994                    rechoke_state = RECHOKE_STATE_GOOD;
2995                else
2996                    rechoke_state = RECHOKE_STATE_BAD;
2997
2998                if (rechoke == NULL)
2999                    rechoke = tr_new (struct tr_rechoke_info, peerCount);
3000
3001                 rechoke[rechoke_count].peer = peer;
3002                 rechoke[rechoke_count].rechoke_state = rechoke_state;
3003                 rechoke[rechoke_count].salt = tr_cryptoWeakRandInt (INT_MAX);
3004                 rechoke_count++;
3005            }
3006
3007        }
3008
3009        tr_free (piece_is_interesting);
3010    }
3011
3012    /* now that we know which & how many peers to be interested in... update the peer interest */
3013    qsort (rechoke, rechoke_count, sizeof (struct tr_rechoke_info), compare_rechoke_info);
3014    t->interestedCount = MIN (maxPeers, rechoke_count);
3015    for (i=0; i<rechoke_count; ++i)
3016        tr_peerMsgsSetInterested (rechoke[i].peer->msgs, i<t->interestedCount);
3017
3018    /* cleanup */
3019    tr_free (rechoke);
3020}
3021
3022/**
3023***
3024**/
3025
3026struct ChokeData
3027{
3028    bool            isInterested;
3029    bool            wasChoked;
3030    bool            isChoked;
3031    int             rate;
3032    int             salt;
3033    tr_peer *       peer;
3034};
3035
3036static int
3037compareChoke (const void * va, const void * vb)
3038{
3039    const struct ChokeData * a = va;
3040    const struct ChokeData * b = vb;
3041
3042    if (a->rate != b->rate) /* prefer higher overall speeds */
3043        return a->rate > b->rate ? -1 : 1;
3044
3045    if (a->wasChoked != b->wasChoked) /* prefer unchoked */
3046        return a->wasChoked ? 1 : -1;
3047
3048    if (a->salt != b->salt) /* random order */
3049        return a->salt - b->salt;
3050
3051    return 0;
3052}
3053
3054/* is this a new connection? */
3055static int
3056isNew (const tr_peer * peer)
3057{
3058    return peer && peer->io && tr_peerIoGetAge (peer->io) < 45;
3059}
3060
3061/* get a rate for deciding which peers to choke and unchoke. */
3062static int
3063getRate (const tr_torrent * tor, struct peer_atom * atom, uint64_t now)
3064{
3065    unsigned int Bps;
3066
3067    if (tr_torrentIsSeed (tor))
3068        Bps = tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_CLIENT_TO_PEER);
3069
3070    /* downloading a private torrent... take upload speed into account
3071     * because there may only be a small window of opportunity to share */
3072    else if (tr_torrentIsPrivate (tor))
3073        Bps = tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_PEER_TO_CLIENT)
3074            + tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_CLIENT_TO_PEER);
3075
3076    /* downloading a public torrent */
3077    else
3078        Bps = tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_PEER_TO_CLIENT);
3079
3080    /* convert it to bytes per second */
3081    return Bps;
3082}
3083
3084static inline bool
3085isBandwidthMaxedOut (const tr_bandwidth * b,
3086                     const uint64_t now_msec, tr_direction dir)
3087{
3088    if (!tr_bandwidthIsLimited (b, dir))
3089        return false;
3090    else {
3091        const unsigned int got = tr_bandwidthGetPieceSpeed_Bps (b, now_msec, dir);
3092        const unsigned int want = tr_bandwidthGetDesiredSpeed_Bps (b, dir);
3093        return got >= want;
3094    }
3095}
3096
3097static void
3098rechokeUploads (Torrent * t, const uint64_t now)
3099{
3100    int i, size, unchokedInterested;
3101    const int peerCount = tr_ptrArraySize (&t->peers);
3102    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase (&t->peers);
3103    struct ChokeData * choke = tr_new0 (struct ChokeData, peerCount);
3104    const tr_session * session = t->manager->session;
3105    const int chokeAll = !tr_torrentIsPieceTransferAllowed (t->tor, TR_CLIENT_TO_PEER);
3106    const bool isMaxedOut = isBandwidthMaxedOut (&t->tor->bandwidth, now, TR_UP);
3107
3108    assert (torrentIsLocked (t));
3109
3110    /* an optimistic unchoke peer's "optimistic"
3111     * state lasts for N calls to rechokeUploads (). */
3112    if (t->optimisticUnchokeTimeScaler > 0)
3113        t->optimisticUnchokeTimeScaler--;
3114    else
3115        t->optimistic = NULL;
3116
3117    /* sort the peers by preference and rate */
3118    for (i = 0, size = 0; i < peerCount; ++i)
3119    {
3120        tr_peer * peer = peers[i];
3121        struct peer_atom * atom = peer->atom;
3122
3123        if (peerIsSeed (peer)) /* choke seeds and partial seeds */
3124        {
3125            tr_peerMsgsSetChoke (peer->msgs, true);
3126        }
3127        else if (chokeAll) /* choke everyone if we're not uploading */
3128        {
3129            tr_peerMsgsSetChoke (peer->msgs, true);
3130        }
3131        else if (peer != t->optimistic)
3132        {
3133            struct ChokeData * n = &choke[size++];
3134            n->peer         = peer;
3135            n->isInterested = peer->peerIsInterested;
3136            n->wasChoked    = peer->peerIsChoked;
3137            n->rate         = getRate (t->tor, atom, now);
3138            n->salt         = tr_cryptoWeakRandInt (INT_MAX);
3139            n->isChoked     = true;
3140        }
3141    }
3142
3143    qsort (choke, size, sizeof (struct ChokeData), compareChoke);
3144
3145    /**
3146     * Reciprocation and number of uploads capping is managed by unchoking
3147     * the N peers which have the best upload rate and are interested.
3148     * This maximizes the client's download rate. These N peers are
3149     * referred to as downloaders, because they are interested in downloading
3150     * from the client.
3151     *
3152     * Peers which have a better upload rate (as compared to the downloaders)
3153     * but aren't interested get unchoked. If they become interested, the
3154     * downloader with the worst upload rate gets choked. If a client has
3155     * a complete file, it uses its upload rate rather than its download
3156     * rate to decide which peers to unchoke.
3157     *
3158     * If our bandwidth is maxed out, don't unchoke any more peers.
3159     */
3160    unchokedInterested = 0;
3161    for (i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i) {
3162        choke[i].isChoked = isMaxedOut ? choke[i].wasChoked : false;
3163        if (choke[i].isInterested)
3164            ++unchokedInterested;
3165    }
3166
3167    /* optimistic unchoke */
3168    if (!t->optimistic && !isMaxedOut && (i<size))
3169    {
3170        int n;
3171        struct ChokeData * c;
3172        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
3173
3174        for (; i<size; ++i)
3175        {
3176            if (choke[i].isInterested)
3177            {
3178                const tr_peer * peer = choke[i].peer;
3179                int x = 1, y;
3180                if (isNew (peer)) x *= 3;
3181                for (y=0; y<x; ++y)
3182                    tr_ptrArrayAppend (&randPool, &choke[i]);
3183            }
3184        }
3185
3186        if ((n = tr_ptrArraySize (&randPool)))
3187        {
3188            c = tr_ptrArrayNth (&randPool, tr_cryptoWeakRandInt (n));
3189            c->isChoked = false;
3190            t->optimistic = c->peer;
3191            t->optimisticUnchokeTimeScaler = OPTIMISTIC_UNCHOKE_MULTIPLIER;
3192        }
3193
3194        tr_ptrArrayDestruct (&randPool, NULL);
3195    }
3196
3197    for (i=0; i<size; ++i)
3198        tr_peerMsgsSetChoke (choke[i].peer->msgs, choke[i].isChoked);
3199
3200    /* cleanup */
3201    tr_free (choke);
3202}
3203
3204static void
3205rechokePulse (int foo UNUSED, short bar UNUSED, void * vmgr)
3206{
3207    tr_torrent * tor = NULL;
3208    tr_peerMgr * mgr = vmgr;
3209    const uint64_t now = tr_time_msec ();
3210
3211    managerLock (mgr);
3212
3213    while ((tor = tr_torrentNext (mgr->session, tor))) {
3214        if (tor->isRunning) {
3215            Torrent * t = tor->torrentPeers;
3216            if (!tr_ptrArrayEmpty (&t->peers)) {
3217                rechokeUploads (t, now);
3218                rechokeDownloads (t);
3219            }
3220        }
3221    }
3222
3223    tr_timerAddMsec (mgr->rechokeTimer, RECHOKE_PERIOD_MSEC);
3224    managerUnlock (mgr);
3225}
3226
3227/***
3228****
3229****  Life and Death
3230****
3231***/
3232
3233static bool
3234shouldPeerBeClosed (const Torrent    * t,
3235                    const tr_peer    * peer,
3236                    int                peerCount,
3237                    const time_t       now)
3238{
3239    const tr_torrent *       tor = t->tor;
3240    const struct peer_atom * atom = peer->atom;
3241
3242    /* if it's marked for purging, close it */
3243    if (peer->doPurge)
3244    {
3245        tordbg (t, "purging peer %s because its doPurge flag is set",
3246                tr_atomAddrStr (atom));
3247        return true;
3248    }
3249
3250    /* disconnect if we're both seeds and enough time has passed for PEX */
3251    if (tr_torrentIsSeed (tor) && peerIsSeed (peer))
3252        return !tr_torrentAllowsPex (tor) || (now-atom->time>=30);
3253
3254    /* disconnect if it's been too long since piece data has been transferred.
3255     * this is on a sliding scale based on number of available peers... */
3256    {
3257        const int relaxStrictnessIfFewerThanN = (int)((getMaxPeerCount (tor) * 0.9) + 0.5);
3258        /* if we have >= relaxIfFewerThan, strictness is 100%.
3259         * if we have zero connections, strictness is 0% */
3260        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
3261                               ? 1.0
3262                               : peerCount / (float)relaxStrictnessIfFewerThanN;
3263        const int lo = MIN_UPLOAD_IDLE_SECS;
3264        const int hi = MAX_UPLOAD_IDLE_SECS;
3265        const int limit = hi - ((hi - lo) * strictness);
3266        const int idleTime = now - MAX (atom->time, atom->piece_data_time);
3267/*fprintf (stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit);*/
3268        if (idleTime > limit) {
3269            tordbg (t, "purging peer %s because it's been %d secs since we shared anything",
3270                       tr_atomAddrStr (atom), idleTime);
3271            return true;
3272        }
3273    }
3274
3275    return false;
3276}
3277
3278static tr_peer **
3279getPeersToClose (Torrent * t, const time_t now_sec, int * setmeSize)
3280{
3281    int i, peerCount, outsize;
3282    struct tr_peer ** ret = NULL;
3283    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek (&t->peers, &peerCount);
3284
3285    assert (torrentIsLocked (t));
3286
3287    for (i = outsize = 0; i < peerCount; ++i) {
3288        if (shouldPeerBeClosed (t, peers[i], peerCount, now_sec)) {
3289            if (ret == NULL)
3290                ret = tr_new (tr_peer *, peerCount);
3291            ret[outsize++] = peers[i];
3292        }
3293    }
3294
3295    *setmeSize = outsize;
3296    return ret;
3297}
3298
3299static int
3300getReconnectIntervalSecs (const struct peer_atom * atom, const time_t now)
3301{
3302    int sec;
3303
3304    /* if we were recently connected to this peer and transferring piece
3305     * data, try to reconnect to them sooner rather that later -- we don't
3306     * want network troubles to get in the way of a good peer. */
3307    if ((now - atom->piece_data_time) <= (MINIMUM_RECONNECT_INTERVAL_SECS * 2))
3308        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3309
3310    /* don't allow reconnects more often than our minimum */
3311    else if ((now - atom->time) < MINIMUM_RECONNECT_INTERVAL_SECS)
3312        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3313
3314    /* otherwise, the interval depends on how many times we've tried
3315     * and failed to connect to the peer */
3316    else switch (atom->numFails) {
3317        case 0: sec = 0; break;
3318        case 1: sec = 5; break;
3319        case 2: sec = 2 * 60; break;
3320        case 3: sec = 15 * 60; break;
3321        case 4: sec = 30 * 60; break;
3322        case 5: sec = 60 * 60; break;
3323        default: sec = 120 * 60; break;
3324    }
3325
3326    /* penalize peers that were unreachable the last time we tried */
3327    if (atom->flags2 & MYFLAG_UNREACHABLE)
3328        sec += sec;
3329
3330    dbgmsg ("reconnect interval for %s is %d seconds", tr_atomAddrStr (atom), sec);
3331    return sec;
3332}
3333
3334static void
3335removePeer (Torrent * t, tr_peer * peer)
3336{
3337    tr_peer * removed;
3338    struct peer_atom * atom = peer->atom;
3339
3340    assert (torrentIsLocked (t));
3341    assert (atom);
3342
3343    atom->time = tr_time ();
3344
3345    removed = tr_ptrArrayRemoveSorted (&t->peers, peer, peerCompare);
3346
3347    if (replicationExists (t))
3348        tr_decrReplicationFromBitfield (t, &peer->have);
3349
3350    assert (removed == peer);
3351    peerDelete (t, removed);
3352}
3353
3354static void
3355closePeer (Torrent * t, tr_peer * peer)
3356{
3357    struct peer_atom * atom;
3358
3359    assert (t != NULL);
3360    assert (peer != NULL);
3361
3362    atom = peer->atom;
3363
3364    /* if we transferred piece data, then they might be good peers,
3365       so reset their `numFails' weight to zero. otherwise we connected
3366       to them fruitlessly, so mark it as another fail */
3367    if (atom->piece_data_time) {
3368        tordbg (t, "resetting atom %s numFails to 0", tr_atomAddrStr (atom));
3369        atom->numFails = 0;
3370    } else {
3371        ++atom->numFails;
3372        tordbg (t, "incremented atom %s numFails to %d", tr_atomAddrStr (atom), (int)atom->numFails);
3373    }
3374
3375    tordbg (t, "removing bad peer %s", tr_peerIoGetAddrStr (peer->io));
3376    removePeer (t, peer);
3377}
3378
3379static void
3380removeAllPeers (Torrent * t)
3381{
3382    while (!tr_ptrArrayEmpty (&t->peers))
3383        removePeer (t, tr_ptrArrayNth (&t->peers, 0));
3384}
3385
3386static void
3387closeBadPeers (Torrent * t, const time_t now_sec)
3388{
3389    if (!tr_ptrArrayEmpty (&t->peers))
3390    {
3391        int i;
3392        int peerCount;
3393        struct tr_peer ** peers = getPeersToClose (t, now_sec, &peerCount);
3394        for (i=0; i<peerCount; ++i)
3395            closePeer (t, peers[i]);
3396        tr_free (peers);
3397    }
3398}
3399
3400struct peer_liveliness
3401{
3402    tr_peer * peer;
3403    void * clientData;
3404    time_t pieceDataTime;
3405    time_t time;
3406    int speed;
3407    bool doPurge;
3408};
3409
3410static int
3411comparePeerLiveliness (const void * va, const void * vb)
3412{
3413    const struct peer_liveliness * a = va;
3414    const struct peer_liveliness * b = vb;
3415
3416    if (a->doPurge != b->doPurge)
3417        return a->doPurge ? 1 : -1;
3418
3419    if (a->speed != b->speed) /* faster goes first */
3420        return a->speed > b->speed ? -1 : 1;
3421
3422    /* the one to give us data more recently goes first */
3423    if (a->pieceDataTime != b->pieceDataTime)
3424        return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
3425
3426    /* the one we connected to most recently goes first */
3427    if (a->time != b->time)
3428        return a->time > b->time ? -1 : 1;
3429
3430    return 0;
3431}
3432
3433static void
3434sortPeersByLivelinessImpl (tr_peer  ** peers,
3435                           void     ** clientData,
3436                           int         n,
3437                           uint64_t    now,
3438                           int (*compare)(const void *va, const void *vb))
3439{
3440    int i;
3441    struct peer_liveliness *lives, *l;
3442
3443    /* build a sortable array of peer + extra info */
3444    lives = l = tr_new0 (struct peer_liveliness, n);
3445    for (i=0; i<n; ++i, ++l)
3446    {
3447        tr_peer * p = peers[i];
3448        l->peer = p;
3449        l->doPurge = p->doPurge;
3450        l->pieceDataTime = p->atom->piece_data_time;
3451        l->time = p->atom->time;
3452        l->speed = tr_peerGetPieceSpeed_Bps (p, now, TR_UP)
3453                 + tr_peerGetPieceSpeed_Bps (p, now, TR_DOWN);
3454        if (clientData)
3455            l->clientData = clientData[i];
3456    }
3457
3458    /* sort 'em */
3459    assert (n == (l - lives));
3460    qsort (lives, n, sizeof (struct peer_liveliness), compare);
3461
3462    /* build the peer array */
3463    for (i=0, l=lives; i<n; ++i, ++l) {
3464        peers[i] = l->peer;
3465        if (clientData)
3466            clientData[i] = l->clientData;
3467    }
3468    assert (n == (l - lives));
3469
3470    /* cleanup */
3471    tr_free (lives);
3472}
3473
3474static void
3475sortPeersByLiveliness (tr_peer ** peers, void ** clientData, int n, uint64_t now)
3476{
3477    sortPeersByLivelinessImpl (peers, clientData, n, now, comparePeerLiveliness);
3478}
3479
3480
3481static void
3482enforceTorrentPeerLimit (Torrent * t, uint64_t now)
3483{
3484    int n = tr_ptrArraySize (&t->peers);
3485    const int max = tr_torrentGetPeerLimit (t->tor);
3486    if (n > max)
3487    {
3488        void * base = tr_ptrArrayBase (&t->peers);
3489        tr_peer ** peers = tr_memdup (base, n*sizeof (tr_peer*));
3490        sortPeersByLiveliness (peers, NULL, n, now);
3491        while (n > max)
3492            closePeer (t, peers[--n]);
3493        tr_free (peers);
3494    }
3495}
3496
3497static void
3498enforceSessionPeerLimit (tr_session * session, uint64_t now)
3499{
3500    int n = 0;
3501    tr_torrent * tor = NULL;
3502    const int max = tr_sessionGetPeerLimit (session);
3503
3504    /* count the total number of peers */
3505    while ((tor = tr_torrentNext (session, tor)))
3506        n += tr_ptrArraySize (&tor->torrentPeers->peers);
3507
3508    /* if there are too many, prune out the worst */
3509    if (n > max)
3510    {
3511        tr_peer ** peers = tr_new (tr_peer*, n);
3512        Torrent ** torrents = tr_new (Torrent*, n);
3513
3514        /* populate the peer array */
3515        n = 0;
3516        tor = NULL;
3517        while ((tor = tr_torrentNext (session, tor))) {
3518            int i;
3519            Torrent * t = tor->torrentPeers;
3520            const int tn = tr_ptrArraySize (&t->peers);
3521            for (i=0; i<tn; ++i, ++n) {
3522                peers[n] = tr_ptrArrayNth (&t->peers, i);
3523                torrents[n] = t;
3524            }
3525        }
3526
3527        /* sort 'em */
3528        sortPeersByLiveliness (peers, (void**)torrents, n, now);
3529
3530        /* cull out the crappiest */
3531        while (n-- > max)
3532            closePeer (torrents[n], peers[n]);
3533
3534        /* cleanup */
3535        tr_free (torrents);
3536        tr_free (peers);
3537    }
3538}
3539
3540static void makeNewPeerConnections (tr_peerMgr * mgr, const int max);
3541
3542static void
3543reconnectPulse (int foo UNUSED, short bar UNUSED, void * vmgr)
3544{
3545    tr_torrent * tor;
3546    tr_peerMgr * mgr = vmgr;
3547    const time_t now_sec = tr_time ();
3548    const uint64_t now_msec = tr_time_msec ();
3549
3550    /**
3551    ***  enforce the per-session and per-torrent peer limits
3552    **/
3553
3554    /* if we're over the per-torrent peer limits, cull some peers */
3555    tor = NULL;
3556    while ((tor = tr_torrentNext (mgr->session, tor)))
3557        if (tor->isRunning)
3558            enforceTorrentPeerLimit (tor->torrentPeers, now_msec);
3559
3560    /* if we're over the per-session peer limits, cull some peers */
3561    enforceSessionPeerLimit (mgr->session, now_msec);
3562
3563    /* remove crappy peers */
3564    tor = NULL;
3565    while ((tor = tr_torrentNext (mgr->session, tor)))
3566        if (!tor->torrentPeers->isRunning)
3567            removeAllPeers (tor->torrentPeers);
3568        else
3569            closeBadPeers (tor->torrentPeers, now_sec);
3570
3571    /* try to make new peer connections */
3572    makeNewPeerConnections (mgr, MAX_CONNECTIONS_PER_PULSE);
3573}
3574
3575/****
3576*****
3577*****  BANDWIDTH ALLOCATION
3578*****
3579****/
3580
3581static void
3582pumpAllPeers (tr_peerMgr * mgr)
3583{
3584    tr_torrent * tor = NULL;
3585
3586    while ((tor = tr_torrentNext (mgr->session, tor)))
3587    {
3588        int j;
3589        Torrent * t = tor->torrentPeers;
3590
3591        for (j=0; j<tr_ptrArraySize (&t->peers); ++j)
3592        {
3593            tr_peer * peer = tr_ptrArrayNth (&t->peers, j);
3594            tr_peerMsgsPulse (peer->msgs);
3595        }
3596    }
3597}
3598
3599static void
3600queuePulse (tr_session * session, tr_direction dir)
3601{
3602    assert (tr_isSession (session));
3603    assert (tr_isDirection (dir));
3604
3605    if (tr_sessionGetQueueEnabled (session, dir))
3606    {
3607        int i;
3608        const int n = tr_sessionCountQueueFreeSlots (session, dir);
3609        for (i=0; i<n; i++) {
3610            tr_torrent * tor = tr_sessionGetNextQueuedTorrent (session, dir);
3611            if (tor != NULL) {
3612                tr_torrentStartNow (tor);
3613                if (tor->queue_started_callback != NULL)
3614                  (*tor->queue_started_callback)(tor, tor->queue_started_user_data);
3615            }
3616        }
3617    }
3618}
3619
3620static void
3621bandwidthPulse (int foo UNUSED, short bar UNUSED, void * vmgr)
3622{
3623    tr_torrent * tor;
3624    tr_peerMgr * mgr = vmgr;
3625    tr_session * session = mgr->session;
3626    managerLock (mgr);
3627
3628    /* FIXME: this next line probably isn't necessary... */
3629    pumpAllPeers (mgr);
3630
3631    /* allocate bandwidth to the peers */
3632    tr_bandwidthAllocate (&session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC);
3633    tr_bandwidthAllocate (&session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC);
3634
3635    /* torrent upkeep */
3636    tor = NULL;
3637    while ((tor = tr_torrentNext (session, tor)))
3638    {
3639        /* possibly stop torrents that have seeded enough */
3640        tr_torrentCheckSeedLimit (tor);
3641
3642        /* run the completeness check for any torrents that need it */
3643        if (tor->torrentPeers->needsCompletenessCheck) {
3644            tor->torrentPeers->needsCompletenessCheck  = false;
3645            tr_torrentRecheckCompleteness (tor);
3646        }
3647
3648        /* stop torrents that are ready to stop, but couldn't be stopped
3649           earlier during the peer-io callback call chain */
3650        if (tor->isStopping)
3651            tr_torrentStop (tor);
3652    }
3653
3654    /* pump the queues */
3655    queuePulse (session, TR_UP);
3656    queuePulse (session, TR_DOWN);
3657
3658    reconnectPulse (0, 0, mgr);
3659
3660    tr_timerAddMsec (mgr->bandwidthTimer, BANDWIDTH_PERIOD_MSEC);
3661    managerUnlock (mgr);
3662}
3663
3664/***
3665****
3666***/
3667
3668static int
3669compareAtomPtrsByAddress (const void * va, const void *vb)
3670{
3671    const struct peer_atom * a = * (const struct peer_atom**) va;
3672    const struct peer_atom * b = * (const struct peer_atom**) vb;
3673
3674    assert (tr_isAtom (a));
3675    assert (tr_isAtom (b));
3676
3677    return tr_address_compare (&a->addr, &b->addr);
3678}
3679
3680/* best come first, worst go last */
3681static int
3682compareAtomPtrsByShelfDate (const void * va, const void *vb)
3683{
3684    time_t atime;
3685    time_t btime;
3686    const struct peer_atom * a = * (const struct peer_atom**) va;
3687    const struct peer_atom * b = * (const struct peer_atom**) vb;
3688    const int data_time_cutoff_secs = 60 * 60;
3689    const time_t tr_now = tr_time ();
3690
3691    assert (tr_isAtom (a));
3692    assert (tr_isAtom (b));
3693
3694    /* primary key: the last piece data time *if* it was within the last hour */
3695    atime = a->piece_data_time; if (atime + data_time_cutoff_secs < tr_now) atime = 0;
3696    btime = b->piece_data_time; if (btime + data_time_cutoff_secs < tr_now) btime = 0;
3697    if (atime != btime)
3698        return atime > btime ? -1 : 1;
3699
3700    /* secondary key: shelf date. */
3701    if (a->shelf_date != b->shelf_date)
3702        return a->shelf_date > b->shelf_date ? -1 : 1;
3703
3704    return 0;
3705}
3706
3707static int
3708getMaxAtomCount (const tr_torrent * tor)
3709{
3710    const int n = tor->maxConnectedPeers;
3711    /* approximate fit of the old jump discontinuous function */
3712    if (n >= 55) return     n + 150;
3713    if (n >= 20) return 2 * n + 95;
3714    return               4 * n + 55;
3715}
3716
3717static void
3718atomPulse (int foo UNUSED, short bar UNUSED, void * vmgr)
3719{
3720    tr_torrent * tor = NULL;
3721    tr_peerMgr * mgr = vmgr;
3722    managerLock (mgr);
3723
3724    while ((tor = tr_torrentNext (mgr->session, tor)))
3725    {
3726        int atomCount;
3727        Torrent * t = tor->torrentPeers;
3728        const int maxAtomCount = getMaxAtomCount (tor);
3729        struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek (&t->pool, &atomCount);
3730
3731        if (atomCount > maxAtomCount) /* we've got too many atoms... time to prune */
3732        {
3733            int i;
3734            int keepCount = 0;
3735            int testCount = 0;
3736            struct peer_atom ** keep = tr_new (struct peer_atom*, atomCount);
3737            struct peer_atom ** test = tr_new (struct peer_atom*, atomCount);
3738
3739            /* keep the ones that are in use */
3740            for (i=0; i<atomCount; ++i) {
3741                struct peer_atom * atom = atoms[i];
3742                if (peerIsInUse (t, atom))
3743                    keep[keepCount++] = atom;
3744                else
3745                    test[testCount++] = atom;
3746            }
3747
3748            /* if there's room, keep the best of what's left */
3749            i = 0;
3750            if (keepCount < maxAtomCount) {
3751                qsort (test, testCount, sizeof (struct peer_atom *), compareAtomPtrsByShelfDate);
3752                while (i<testCount && keepCount<maxAtomCount)
3753                    keep[keepCount++] = test[i++];
3754            }
3755
3756            /* free the culled atoms */
3757            while (i<testCount)
3758                tr_free (test[i++]);
3759
3760            /* rebuild Torrent.pool with what's left */
3761            tr_ptrArrayDestruct (&t->pool, NULL);
3762            t->pool = TR_PTR_ARRAY_INIT;
3763            qsort (keep, keepCount, sizeof (struct peer_atom *), compareAtomPtrsByAddress);
3764            for (i=0; i<keepCount; ++i)
3765                tr_ptrArrayAppend (&t->pool, keep[i]);
3766
3767            tordbg (t, "max atom count is %d... pruned from %d to %d\n", maxAtomCount, atomCount, keepCount);
3768
3769            /* cleanup */
3770            tr_free (test);
3771            tr_free (keep);
3772        }
3773    }
3774
3775    tr_timerAddMsec (mgr->atomTimer, ATOM_PERIOD_MSEC);
3776    managerUnlock (mgr);
3777}
3778
3779/***
3780****
3781****
3782****
3783***/
3784
3785/* is this atom someone that we'd want to initiate a connection to? */
3786static bool
3787isPeerCandidate (const tr_torrent * tor, struct peer_atom * atom, const time_t now)
3788{
3789    /* not if we're both seeds */
3790    if (tr_torrentIsSeed (tor) && atomIsSeed (atom))
3791        return false;
3792
3793    /* not if we've already got a connection to them... */
3794    if (peerIsInUse (tor->torrentPeers, atom))
3795        return false;
3796
3797    /* not if we just tried them already */
3798    if ((now - atom->time) < getReconnectIntervalSecs (atom, now))
3799        return false;
3800
3801    /* not if they're blocklisted */
3802    if (isAtomBlocklisted (tor->session, atom))
3803        return false;
3804
3805    /* not if they're banned... */
3806    if (atom->flags2 & MYFLAG_BANNED)
3807        return false;
3808
3809    return true;
3810}
3811
3812struct peer_candidate
3813{
3814    uint64_t score;
3815    tr_torrent * tor;
3816    struct peer_atom * atom;
3817};
3818
3819static bool
3820torrentWasRecentlyStarted (const tr_torrent * tor)
3821{
3822    return difftime (tr_time (), tor->startDate) < 120;
3823}
3824
3825static inline uint64_t
3826addValToKey (uint64_t value, int width, uint64_t addme)
3827{
3828    value = (value << (uint64_t)width);
3829    value |= addme;
3830    return value;
3831}
3832
3833/* smaller value is better */
3834static uint64_t
3835getPeerCandidateScore (const tr_torrent * tor, const struct peer_atom * atom, uint8_t salt)
3836{
3837    uint64_t i;
3838    uint64_t score = 0;
3839    const bool failed = atom->lastConnectionAt < atom->lastConnectionAttemptAt;
3840
3841    /* prefer peers we've connected to, or never tried, over peers we failed to connect to. */
3842    i = failed ? 1 : 0;
3843    score = addValToKey (score, 1, i);
3844
3845    /* prefer the one we attempted least recently (to cycle through all peers) */
3846    i = atom->lastConnectionAttemptAt;
3847    score = addValToKey (score, 32, i);
3848
3849    /* prefer peers belonging to a torrent of a higher priority */
3850    switch (tr_torrentGetPriority (tor)) {
3851        case TR_PRI_HIGH:    i = 0; break;
3852        case TR_PRI_NORMAL:  i = 1; break;
3853        case TR_PRI_LOW:     i = 2; break;
3854    }
3855    score = addValToKey (score, 4, i);
3856
3857    /* prefer recently-started torrents */
3858    i = torrentWasRecentlyStarted (tor) ? 0 : 1;
3859    score = addValToKey (score, 1, i);
3860
3861    /* prefer torrents we're downloading with */
3862    i = tr_torrentIsSeed (tor) ? 1 : 0;
3863    score = addValToKey (score, 1, i);
3864
3865    /* prefer peers that are known to be connectible */
3866    i = (atom->flags & ADDED_F_CONNECTABLE) ? 0 : 1;
3867    score = addValToKey (score, 1, i);
3868
3869    /* prefer peers that we might have a chance of uploading to...
3870       so lower seed probability is better */
3871    if (atom->seedProbability == 100) i = 101;
3872    else if (atom->seedProbability == -1) i = 100;
3873    else i = atom->seedProbability;
3874    score = addValToKey (score, 8, i);
3875
3876    /* Prefer peers that we got from more trusted sources.
3877     * lower `fromBest' values indicate more trusted sources */
3878    score = addValToKey (score, 4, atom->fromBest);
3879
3880    /* salt */
3881    score = addValToKey (score, 8, salt);
3882
3883    return score;
3884}
3885
3886#ifndef NDEBUG
3887static int
3888checkPartition (const struct peer_candidate * candidates, int left, int right, uint64_t pivotScore, int storeIndex)
3889{
3890    int i;
3891
3892    assert (storeIndex >= left);
3893    assert (storeIndex <= right);
3894    assert (candidates[storeIndex].score == pivotScore);
3895
3896    for (i=left; i<storeIndex; ++i)
3897        assert (candidates[i].score < pivotScore);
3898    for (i=storeIndex+1; i<=right; ++i)
3899        assert (candidates[i].score >= pivotScore);
3900
3901    return true;
3902}
3903#endif
3904
3905/* Helper to selectBestCandidates ().
3906 * Adapted from http://en.wikipedia.org/wiki/Selection_algorithm */
3907static int
3908partitionPeerCandidates (struct peer_candidate * candidates, int left, int right, int pivotIndex)
3909{
3910    int i;
3911    int storeIndex;
3912    struct peer_candidate tmp;
3913    const struct peer_candidate pivotValue = candidates[pivotIndex];
3914
3915    /* move pivot to end */
3916    tmp = candidates[right];
3917    candidates[right] = pivotValue;
3918    candidates[pivotIndex] = tmp;
3919
3920    storeIndex = left;
3921    for (i=left; i<=right; ++i)
3922    {
3923        if (candidates[i].score < pivotValue.score)
3924        {
3925            tmp = candidates[storeIndex];
3926            candidates[storeIndex] = candidates[i];
3927            candidates[i] = tmp;
3928            storeIndex++;
3929        }
3930    }
3931
3932    /* move pivot to its final place */
3933    tmp = candidates[right];
3934    candidates[right] = candidates[storeIndex];
3935    candidates[storeIndex] = tmp;
3936
3937    /* sanity check */
3938    assert (checkPartition (candidates, left, right, pivotValue.score, storeIndex));
3939
3940    return storeIndex;
3941}
3942
3943/* Adapted from http://en.wikipedia.org/wiki/Selection_algorithm */
3944static void
3945selectPeerCandidates (struct peer_candidate * candidates, int left, int right, int k)
3946{
3947    if (right > left)
3948    {
3949        const int pivotIndex = left + (right-left)/2;
3950
3951        int pivotNewIndex = partitionPeerCandidates (candidates, left, right, pivotIndex);
3952
3953        if (pivotNewIndex > left + k) /* new condition */
3954            selectPeerCandidates (candidates, left, pivotNewIndex-1, k);
3955        else if (pivotNewIndex < left + k)
3956            selectPeerCandidates (candidates, pivotNewIndex+1, right, k+left-pivotNewIndex-1);
3957    }
3958}
3959
3960#ifndef NDEBUG
3961static bool
3962checkBestScoresComeFirst (const struct peer_candidate * candidates, int n, int k)
3963{
3964    int i;
3965    uint64_t worstFirstScore = 0;
3966    const int x = MIN (n, k) - 1;
3967
3968    for (i=0; i<x; i++)
3969        if (worstFirstScore < candidates[i].score)
3970            worstFirstScore = candidates[i].score;
3971
3972    for (i=0; i<x; i++)
3973        assert (candidates[i].score <= worstFirstScore);
3974
3975    for (i=x+1; i<n; i++)
3976        assert (candidates[i].score >= worstFirstScore);
3977
3978    return true;
3979}
3980#endif /* NDEBUG */
3981
3982/** @return an array of all the atoms we might want to connect to */
3983static struct peer_candidate*
3984getPeerCandidates (tr_session * session, int * candidateCount, int max)
3985{
3986    int atomCount;
3987    int peerCount;
3988    tr_torrent * tor;
3989    struct peer_candidate * candidates;
3990    struct peer_candidate * walk;
3991    const time_t now = tr_time ();
3992    const uint64_t now_msec = tr_time_msec ();
3993    /* leave 5% of connection slots for incoming connections -- ticket #2609 */
3994    const int maxCandidates = tr_sessionGetPeerLimit (session) * 0.95;
3995
3996    /* count how many peers and atoms we've got */
3997    tor= NULL;
3998    atomCount = 0;
3999    peerCount = 0;
4000    while ((tor = tr_torrentNext (session, tor))) {
4001        atomCount += tr_ptrArraySize (&tor->torrentPeers->pool);
4002        peerCount += tr_ptrArraySize (&tor->torrentPeers->peers);
4003    }
4004
4005    /* don't start any new handshakes if we're full up */
4006    if (maxCandidates <= peerCount) {
4007        *candidateCount = 0;
4008        return NULL;
4009    }
4010
4011    /* allocate an array of candidates */
4012    walk = candidates = tr_new (struct peer_candidate, atomCount);
4013
4014    /* populate the candidate array */
4015    tor = NULL;
4016    while ((tor = tr_torrentNext (session, tor)))
4017    {
4018        int i, nAtoms;
4019        struct peer_atom ** atoms;
4020
4021        if (!tor->torrentPeers->isRunning)
4022            continue;
4023
4024        /* if we've already got enough peers in this torrent... */
4025        if (tr_torrentGetPeerLimit (tor) <= tr_ptrArraySize (&tor->torrentPeers->peers))
4026            continue;
4027
4028        /* if we've already got enough speed in this torrent... */
4029        if (tr_torrentIsSeed (tor) && isBandwidthMaxedOut (&tor->bandwidth, now_msec, TR_UP))
4030            continue;
4031
4032        atoms = (struct peer_atom**) tr_ptrArrayPeek (&tor->torrentPeers->pool, &nAtoms);
4033        for (i=0; i<nAtoms; ++i)
4034        {
4035            struct peer_atom * atom = atoms[i];
4036
4037            if (isPeerCandidate (tor, atom, now))
4038            {
4039                const uint8_t salt = tr_cryptoWeakRandInt (1024);
4040                walk->tor = tor;
4041                walk->atom = atom;
4042                walk->score = getPeerCandidateScore (tor, atom, salt);
4043                ++walk;
4044            }
4045        }
4046    }
4047
4048    *candidateCount = walk - candidates;
4049    if (walk != candidates)
4050        selectPeerCandidates (candidates, 0, (walk-candidates)-1, max);
4051
4052    assert (checkBestScoresComeFirst (candidates, *candidateCount, max));
4053
4054    return candidates;
4055}
4056
4057static void
4058initiateConnection (tr_peerMgr * mgr, Torrent * t, struct peer_atom * atom)
4059{
4060    tr_peerIo * io;
4061    const time_t now = tr_time ();
4062    bool utp = tr_sessionIsUTPEnabled (mgr->session) && !atom->utp_failed;
4063
4064    if (atom->fromFirst == TR_PEER_FROM_PEX)
4065        /* PEX has explicit signalling for uTP support.  If an atom
4066           originally came from PEX and doesn't have the uTP flag, skip the
4067           uTP connection attempt.  Are we being optimistic here? */
4068        utp = utp && (atom->flags & ADDED_F_UTP_FLAGS);
4069
4070    tordbg (t, "Starting an OUTGOING%s connection with %s",
4071            utp ? " µTP" : "",
4072            tr_atomAddrStr (atom));
4073
4074    io = tr_peerIoNewOutgoing (mgr->session,
4075                               &mgr->session->bandwidth,
4076                               &atom->addr,
4077                               atom->port,
4078                               t->tor->info.hash,
4079                               t->tor->completeness == TR_SEED,
4080                               utp);
4081
4082    if (io == NULL)
4083    {
4084        tordbg (t, "peerIo not created; marking peer %s as unreachable",
4085                tr_atomAddrStr (atom));
4086        atom->flags2 |= MYFLAG_UNREACHABLE;
4087        atom->numFails++;
4088    }
4089    else
4090    {
4091        tr_handshake * handshake = tr_handshakeNew (io,
4092                                                    mgr->session->encryptionMode,
4093                                                    myHandshakeDoneCB,
4094                                                    mgr);
4095
4096        assert (tr_peerIoGetTorrentHash (io));
4097
4098        tr_peerIoUnref (io); /* balanced by the initial ref
4099                                 in tr_peerIoNewOutgoing () */
4100
4101        tr_ptrArrayInsertSorted (&t->outgoingHandshakes, handshake,
4102                                 handshakeCompare);
4103    }
4104
4105    atom->lastConnectionAttemptAt = now;
4106    atom->time = now;
4107}
4108
4109static void
4110initiateCandidateConnection (tr_peerMgr * mgr, struct peer_candidate * c)
4111{
4112#if 0
4113    fprintf (stderr, "Starting an OUTGOING connection with %s - [%s] seedProbability==%d; %s, %s\n",
4114             tr_atomAddrStr (c->atom),
4115             tr_torrentName (c->tor),
4116           (int)c->atom->seedProbability,
4117             tr_torrentIsPrivate (c->tor) ? "private" : "public",
4118             tr_torrentIsSeed (c->tor) ? "seed" : "downloader");
4119#endif
4120
4121    initiateConnection (mgr, c->tor->torrentPeers, c->atom);
4122}
4123
4124static void
4125makeNewPeerConnections (struct tr_peerMgr * mgr, const int max)
4126{
4127    int i, n;
4128    struct peer_candidate * candidates;
4129
4130    candidates = getPeerCandidates (mgr->session, &n, max);
4131
4132    for (i=0; i<n && i<max; ++i)
4133        initiateCandidateConnection (mgr, &candidates[i]);
4134
4135    tr_free (candidates);
4136}
Note: See TracBrowser for help on using the repository browser.