source: trunk/libtransmission/peer-mgr.c @ 13625

Last change on this file since 13625 was 13625, checked in by jordan, 9 years ago

Follow more common whitespace style conventions in the C code (libtransmission, daemon, utils, cli, gtk).

  • Property svn:keywords set to Date Rev Author Id
File size: 117.5 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2 (b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 13625 2012-12-05 17:29:46Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h> /* error codes ERANGE, ... */
15#include <limits.h> /* INT_MAX */
16#include <string.h> /* memcpy, memcmp, strstr */
17#include <stdlib.h> /* qsort */
18
19#include <event2/event.h>
20
21#include <libutp/utp.h>
22
23#include "transmission.h"
24#include "announcer.h"
25#include "bandwidth.h"
26#include "blocklist.h"
27#include "cache.h"
28#include "clients.h"
29#include "completion.h"
30#include "crypto.h"
31#include "handshake.h"
32#include "net.h"
33#include "peer-io.h"
34#include "peer-mgr.h"
35#include "peer-msgs.h"
36#include "ptrarray.h"
37#include "session.h"
38#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
39#include "torrent.h"
40#include "tr-utp.h"
41#include "utils.h"
42#include "webseed.h"
43
44enum
45{
46    /* how frequently to cull old atoms */
47    ATOM_PERIOD_MSEC = (60 * 1000),
48
49    /* how frequently to change which peers are choked */
50    RECHOKE_PERIOD_MSEC = (10 * 1000),
51
52    /* an optimistically unchoked peer is immune from rechoking
53       for this many calls to rechokeUploads (). */
54    OPTIMISTIC_UNCHOKE_MULTIPLIER = 4,
55
56    /* how frequently to reallocate bandwidth */
57    BANDWIDTH_PERIOD_MSEC = 500,
58
59    /* how frequently to age out old piece request lists */
60    REFILL_UPKEEP_PERIOD_MSEC = (10 * 1000),
61
62    /* how frequently to decide which peers live and die */
63    RECONNECT_PERIOD_MSEC = 500,
64
65    /* when many peers are available, keep idle ones this long */
66    MIN_UPLOAD_IDLE_SECS = (60),
67
68    /* when few peers are available, keep idle ones this long */
69    MAX_UPLOAD_IDLE_SECS = (60 * 5),
70
71    /* max number of peers to ask for per second overall.
72     * this throttle is to avoid overloading the router */
73    MAX_CONNECTIONS_PER_SECOND = 12,
74
75    MAX_CONNECTIONS_PER_PULSE = (int)(MAX_CONNECTIONS_PER_SECOND * (RECONNECT_PERIOD_MSEC/1000.0)),
76
77    /* number of bad pieces a peer is allowed to send before we ban them */
78    MAX_BAD_PIECES_PER_PEER = 5,
79
80    /* amount of time to keep a list of request pieces lying around
81       before it's considered too old and needs to be rebuilt */
82    PIECE_LIST_SHELF_LIFE_SECS = 60,
83
84    /* use for bitwise operations w/peer_atom.flags2 */
85    MYFLAG_BANNED = 1,
86
87    /* use for bitwise operations w/peer_atom.flags2 */
88    /* unreachable for now... but not banned.
89     * if they try to connect to us it's okay */
90    MYFLAG_UNREACHABLE = 2,
91
92    /* the minimum we'll wait before attempting to reconnect to a peer */
93    MINIMUM_RECONNECT_INTERVAL_SECS = 5,
94
95    /** how long we'll let requests we've made linger before we cancel them */
96    REQUEST_TTL_SECS = 120,
97
98    NO_BLOCKS_CANCEL_HISTORY = 120,
99
100    CANCEL_HISTORY_SEC = 60
101};
102
103const tr_peer_event TR_PEER_EVENT_INIT = { 0, 0, NULL, 0, 0, 0, false, 0 };
104
105/**
106***
107**/
108
109/**
110 * Peer information that should be kept even before we've connected and
111 * after we've disconnected. These are kept in a pool of peer_atoms to decide
112 * which ones would make good candidates for connecting to, and to watch out
113 * for banned peers.
114 *
115 * @see tr_peer
116 * @see tr_peermsgs
117 */
118struct peer_atom
119{
120    uint8_t     fromFirst;          /* where the peer was first found */
121    uint8_t     fromBest;           /* the "best" value of where the peer has been found */
122    uint8_t     flags;              /* these match the added_f flags */
123    uint8_t     flags2;             /* flags that aren't defined in added_f */
124    int8_t      seedProbability;    /* how likely is this to be a seed... [0..100] or -1 for unknown */
125    int8_t      blocklisted;        /* -1 for unknown, true for blocklisted, false for not blocklisted */
126
127    tr_port     port;
128    bool        utp_failed;         /* We recently failed to connect over uTP */
129    uint16_t    numFails;
130    time_t      time;               /* when the peer's connection status last changed */
131    time_t      piece_data_time;
132
133    time_t      lastConnectionAttemptAt;
134    time_t      lastConnectionAt;
135
136    /* similar to a TTL field, but less rigid --
137     * if the swarm is small, the atom will be kept past this date. */
138    time_t      shelf_date;
139    tr_peer   * peer;               /* will be NULL if not connected */
140    tr_address  addr;
141};
142
143#ifdef NDEBUG
144#define tr_isAtom(a) (TRUE)
145#else
146static bool
147tr_isAtom (const struct peer_atom * atom)
148{
149    return (atom != NULL)
150        && (atom->fromFirst < TR_PEER_FROM__MAX)
151        && (atom->fromBest < TR_PEER_FROM__MAX)
152        && (tr_address_is_valid (&atom->addr));
153}
154#endif
155
156static const char*
157tr_atomAddrStr (const struct peer_atom * atom)
158{
159    return atom ? tr_peerIoAddrStr (&atom->addr, atom->port) : "[no atom]";
160}
161
162struct block_request
163{
164    tr_block_index_t block;
165    tr_peer * peer;
166    time_t sentAt;
167};
168
169struct weighted_piece
170{
171    tr_piece_index_t index;
172    int16_t salt;
173    int16_t requestCount;
174};
175
176enum piece_sort_state
177{
178    PIECES_UNSORTED,
179    PIECES_SORTED_BY_INDEX,
180    PIECES_SORTED_BY_WEIGHT
181};
182
183/** @brief Opaque, per-torrent data structure for peer connection information */
184typedef struct tr_torrent_peers
185{
186    tr_ptrArray                outgoingHandshakes; /* tr_handshake */
187    tr_ptrArray                pool; /* struct peer_atom */
188    tr_ptrArray                peers; /* tr_peer */
189    tr_ptrArray                webseeds; /* tr_webseed */
190
191    tr_torrent               * tor;
192    struct tr_peerMgr        * manager;
193
194    tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
195    int                        optimisticUnchokeTimeScaler;
196
197    bool                       isRunning;
198    bool                       needsCompletenessCheck;
199
200    struct block_request     * requests;
201    int                        requestCount;
202    int                        requestAlloc;
203
204    struct weighted_piece    * pieces;
205    int                        pieceCount;
206    enum piece_sort_state      pieceSortState;
207
208    /* An array of pieceCount items stating how many peers have each piece.
209       This is used to help us for downloading pieces "rarest first."
210       This may be NULL if we don't have metainfo yet, or if we're not
211       downloading and don't care about rarity */
212    uint16_t                 * pieceReplication;
213    size_t                     pieceReplicationSize;
214
215    int                        interestedCount;
216    int                        maxPeers;
217    time_t                     lastCancel;
218
219    /* Before the endgame this should be 0. In endgame, is contains the average
220     * number of pending requests per peer. Only peers which have more pending
221     * requests are considered 'fast' are allowed to request a block that's
222     * already been requested from another (slower?) peer. */
223    int                        endgame;
224}
225Torrent;
226
227struct tr_peerMgr
228{
229    tr_session    * session;
230    tr_ptrArray     incomingHandshakes; /* tr_handshake */
231    struct event  * bandwidthTimer;
232    struct event  * rechokeTimer;
233    struct event  * refillUpkeepTimer;
234    struct event  * atomTimer;
235};
236
237#define tordbg(t, ...) \
238  do \
239    { \
240      if (tr_deepLoggingIsActive ()) \
241        tr_deepLog (__FILE__, __LINE__, tr_torrentName (t->tor), __VA_ARGS__); \
242    } \
243  while (0)
244
245#define dbgmsg(...) \
246  do \
247    { \
248      if (tr_deepLoggingIsActive ()) \
249        tr_deepLog (__FILE__, __LINE__, NULL, __VA_ARGS__); \
250    } \
251  while (0)
252
253/**
254***
255**/
256
257static inline void
258managerLock (const struct tr_peerMgr * manager)
259{
260    tr_sessionLock (manager->session);
261}
262
263static inline void
264managerUnlock (const struct tr_peerMgr * manager)
265{
266    tr_sessionUnlock (manager->session);
267}
268
269static inline void
270torrentLock (Torrent * torrent)
271{
272    managerLock (torrent->manager);
273}
274
275static inline void
276torrentUnlock (Torrent * torrent)
277{
278    managerUnlock (torrent->manager);
279}
280
281static inline int
282torrentIsLocked (const Torrent * t)
283{
284    return tr_sessionIsLocked (t->manager->session);
285}
286
287/**
288***
289**/
290
291static int
292handshakeCompareToAddr (const void * va, const void * vb)
293{
294    const tr_handshake * a = va;
295
296    return tr_address_compare (tr_handshakeGetAddr (a, NULL), vb);
297}
298
299static int
300handshakeCompare (const void * a, const void * b)
301{
302    return handshakeCompareToAddr (a, tr_handshakeGetAddr (b, NULL));
303}
304
305static inline tr_handshake*
306getExistingHandshake (tr_ptrArray * handshakes, const tr_address * addr)
307{
308    if (tr_ptrArrayEmpty (handshakes))
309        return NULL;
310
311    return tr_ptrArrayFindSorted (handshakes, addr, handshakeCompareToAddr);
312}
313
314static int
315comparePeerAtomToAddress (const void * va, const void * vb)
316{
317    const struct peer_atom * a = va;
318
319    return tr_address_compare (&a->addr, vb);
320}
321
322static int
323compareAtomsByAddress (const void * va, const void * vb)
324{
325    const struct peer_atom * b = vb;
326
327    assert (tr_isAtom (b));
328
329    return comparePeerAtomToAddress (va, &b->addr);
330}
331
332/**
333***
334**/
335
336const tr_address *
337tr_peerAddress (const tr_peer * peer)
338{
339    return &peer->atom->addr;
340}
341
342static Torrent*
343getExistingTorrent (tr_peerMgr *    manager,
344                    const uint8_t * hash)
345{
346    tr_torrent * tor = tr_torrentFindFromHash (manager->session, hash);
347
348    return tor == NULL ? NULL : tor->torrentPeers;
349}
350
351static int
352peerCompare (const void * a, const void * b)
353{
354    return tr_address_compare (tr_peerAddress (a), tr_peerAddress (b));
355}
356
357static struct peer_atom*
358getExistingAtom (const Torrent    * t,
359                 const tr_address * addr)
360{
361    Torrent * tt = (Torrent*)t;
362    assert (torrentIsLocked (t));
363    return tr_ptrArrayFindSorted (&tt->pool, addr, comparePeerAtomToAddress);
364}
365
366static bool
367peerIsInUse (const Torrent * ct, const struct peer_atom * atom)
368{
369    Torrent * t = (Torrent*) ct;
370
371    assert (torrentIsLocked (t));
372
373    return (atom->peer != NULL)
374        || getExistingHandshake (&t->outgoingHandshakes, &atom->addr)
375        || getExistingHandshake (&t->manager->incomingHandshakes, &atom->addr);
376}
377
378void
379tr_peerConstruct (tr_peer * peer)
380{
381    memset (peer, 0, sizeof (tr_peer));
382
383    peer->have = TR_BITFIELD_INIT;
384}
385
386static tr_peer*
387peerNew (struct peer_atom * atom)
388{
389    tr_peer * peer = tr_new (tr_peer, 1);
390    tr_peerConstruct (peer);
391
392    peer->atom = atom;
393    atom->peer = peer;
394
395    return peer;
396}
397
398static tr_peer*
399getPeer (Torrent * torrent, struct peer_atom * atom)
400{
401    tr_peer * peer;
402
403    assert (torrentIsLocked (torrent));
404
405    peer = atom->peer;
406
407    if (peer == NULL)
408    {
409        peer = peerNew (atom);
410        tr_bitfieldConstruct (&peer->have, torrent->tor->info.pieceCount);
411        tr_bitfieldConstruct (&peer->blame, torrent->tor->blockCount);
412        tr_ptrArrayInsertSorted (&torrent->peers, peer, peerCompare);
413    }
414
415    return peer;
416}
417
418static void peerDeclinedAllRequests (Torrent *, const tr_peer *);
419
420void
421tr_peerDestruct (tr_torrent * tor, tr_peer * peer)
422{
423    assert (peer != NULL);
424
425    peerDeclinedAllRequests (tor->torrentPeers, peer);
426
427    if (peer->msgs != NULL)
428        tr_peerMsgsFree (peer->msgs);
429
430    if (peer->io) {
431        tr_peerIoClear (peer->io);
432        tr_peerIoUnref (peer->io); /* balanced by the ref in handshakeDoneCB () */
433    }
434
435    tr_bitfieldDestruct (&peer->have);
436    tr_bitfieldDestruct (&peer->blame);
437    tr_free (peer->client);
438
439    if (peer->atom)
440        peer->atom->peer = NULL;
441}
442
443static void
444peerDelete (Torrent * t, tr_peer * peer)
445{
446    tr_peerDestruct (t->tor, peer);
447    tr_free (peer);
448}
449
450static bool
451replicationExists (const Torrent * t)
452{
453    return t->pieceReplication != NULL;
454}
455
456static void
457replicationFree (Torrent * t)
458{
459    tr_free (t->pieceReplication);
460    t->pieceReplication = NULL;
461    t->pieceReplicationSize = 0;
462}
463
464static void
465replicationNew (Torrent * t)
466{
467    tr_piece_index_t piece_i;
468    const tr_piece_index_t piece_count = t->tor->info.pieceCount;
469    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase (&t->peers);
470    const int peer_count = tr_ptrArraySize (&t->peers);
471
472    assert (!replicationExists (t));
473
474    t->pieceReplicationSize = piece_count;
475    t->pieceReplication = tr_new0 (uint16_t, piece_count);
476
477    for (piece_i=0; piece_i<piece_count; ++piece_i)
478    {
479        int peer_i;
480        uint16_t r = 0;
481
482        for (peer_i=0; peer_i<peer_count; ++peer_i)
483            if (tr_bitfieldHas (&peers[peer_i]->have, piece_i))
484                ++r;
485
486        t->pieceReplication[piece_i] = r;
487    }
488}
489
490static void
491torrentFree (void * vt)
492{
493    Torrent * t = vt;
494
495    assert (t);
496    assert (!t->isRunning);
497    assert (torrentIsLocked (t));
498    assert (tr_ptrArrayEmpty (&t->outgoingHandshakes));
499    assert (tr_ptrArrayEmpty (&t->peers));
500
501    tr_ptrArrayDestruct (&t->webseeds, (PtrArrayForeachFunc)tr_webseedFree);
502    tr_ptrArrayDestruct (&t->pool, (PtrArrayForeachFunc)tr_free);
503    tr_ptrArrayDestruct (&t->outgoingHandshakes, NULL);
504    tr_ptrArrayDestruct (&t->peers, NULL);
505
506    replicationFree (t);
507
508    tr_free (t->requests);
509    tr_free (t->pieces);
510    tr_free (t);
511}
512
513static void peerCallbackFunc (tr_peer *, const tr_peer_event *, void *);
514
515static void
516rebuildWebseedArray (Torrent * t, tr_torrent * tor)
517{
518    int i;
519    const tr_info * inf = &tor->info;
520
521    /* clear the array */
522    tr_ptrArrayDestruct (&t->webseeds, (PtrArrayForeachFunc)tr_webseedFree);
523    t->webseeds = TR_PTR_ARRAY_INIT;
524
525    /* repopulate it */
526    for (i = 0; i < inf->webseedCount; ++i)
527    {
528        tr_webseed * w = tr_webseedNew (tor, inf->webseeds[i], peerCallbackFunc, t);
529        tr_ptrArrayAppend (&t->webseeds, w);
530    }
531}
532
533static Torrent*
534torrentNew (tr_peerMgr * manager, tr_torrent * tor)
535{
536    Torrent * t;
537
538    t = tr_new0 (Torrent, 1);
539    t->manager = manager;
540    t->tor = tor;
541    t->pool = TR_PTR_ARRAY_INIT;
542    t->peers = TR_PTR_ARRAY_INIT;
543    t->webseeds = TR_PTR_ARRAY_INIT;
544    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
545
546    rebuildWebseedArray (t, tor);
547
548    return t;
549}
550
551static void ensureMgrTimersExist (struct tr_peerMgr * m);
552
553tr_peerMgr*
554tr_peerMgrNew (tr_session * session)
555{
556    tr_peerMgr * m = tr_new0 (tr_peerMgr, 1);
557    m->session = session;
558    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
559    ensureMgrTimersExist (m);
560    return m;
561}
562
563static void
564deleteTimer (struct event ** t)
565{
566    if (*t != NULL)
567    {
568        event_free (*t);
569        *t = NULL;
570    }
571}
572
573static void
574deleteTimers (struct tr_peerMgr * m)
575{
576    deleteTimer (&m->atomTimer);
577    deleteTimer (&m->bandwidthTimer);
578    deleteTimer (&m->rechokeTimer);
579    deleteTimer (&m->refillUpkeepTimer);
580}
581
582void
583tr_peerMgrFree (tr_peerMgr * manager)
584{
585    managerLock (manager);
586
587    deleteTimers (manager);
588
589    /* free the handshakes. Abort invokes handshakeDoneCB (), which removes
590     * the item from manager->handshakes, so this is a little roundabout... */
591    while (!tr_ptrArrayEmpty (&manager->incomingHandshakes))
592        tr_handshakeAbort (tr_ptrArrayNth (&manager->incomingHandshakes, 0));
593
594    tr_ptrArrayDestruct (&manager->incomingHandshakes, NULL);
595
596    managerUnlock (manager);
597    tr_free (manager);
598}
599
600static int
601clientIsDownloadingFrom (const tr_torrent * tor, const tr_peer * peer)
602{
603    if (!tr_torrentHasMetadata (tor))
604        return true;
605
606    return peer->clientIsInterested && !peer->clientIsChoked;
607}
608
609static int
610clientIsUploadingTo (const tr_peer * peer)
611{
612    return peer->peerIsInterested && !peer->peerIsChoked;
613}
614
615/***
616****
617***/
618
619void
620tr_peerMgrOnBlocklistChanged (tr_peerMgr * mgr)
621{
622    tr_torrent * tor = NULL;
623    tr_session * session = mgr->session;
624
625    /* we cache whether or not a peer is blocklisted...
626       since the blocklist has changed, erase that cached value */
627    while ((tor = tr_torrentNext (session, tor)))
628    {
629        int i;
630        Torrent * t = tor->torrentPeers;
631        const int n = tr_ptrArraySize (&t->pool);
632        for (i=0; i<n; ++i) {
633            struct peer_atom * atom = tr_ptrArrayNth (&t->pool, i);
634            atom->blocklisted = -1;
635        }
636    }
637}
638
639static bool
640isAtomBlocklisted (tr_session * session, struct peer_atom * atom)
641{
642    if (atom->blocklisted < 0)
643        atom->blocklisted = tr_sessionIsAddressBlocked (session, &atom->addr);
644
645    assert (tr_isBool (atom->blocklisted));
646    return atom->blocklisted;
647}
648
649
650/***
651****
652***/
653
654static void
655atomSetSeedProbability (struct peer_atom * atom, int seedProbability)
656{
657    assert (atom != NULL);
658    assert (-1<=seedProbability && seedProbability<=100);
659
660    atom->seedProbability = seedProbability;
661
662    if (seedProbability == 100)
663        atom->flags |= ADDED_F_SEED_FLAG;
664    else if (seedProbability != -1)
665        atom->flags &= ~ADDED_F_SEED_FLAG;
666}
667
668static inline bool
669atomIsSeed (const struct peer_atom * atom)
670{
671    return atom->seedProbability == 100;
672}
673
674static void
675atomSetSeed (const Torrent * t, struct peer_atom * atom)
676{
677    if (!atomIsSeed (atom))
678    {
679        tordbg (t, "marking peer %s as a seed", tr_atomAddrStr (atom));
680
681        atomSetSeedProbability (atom, 100);
682    }
683}
684
685
686bool
687tr_peerMgrPeerIsSeed (const tr_torrent  * tor,
688                      const tr_address  * addr)
689{
690    bool isSeed = false;
691    const Torrent * t = tor->torrentPeers;
692    const struct peer_atom * atom = getExistingAtom (t, addr);
693
694    if (atom)
695        isSeed = atomIsSeed (atom);
696
697    return isSeed;
698}
699
700void
701tr_peerMgrSetUtpSupported (tr_torrent * tor, const tr_address * addr)
702{
703    struct peer_atom * atom = getExistingAtom (tor->torrentPeers, addr);
704
705    if (atom)
706        atom->flags |= ADDED_F_UTP_FLAGS;
707}
708
709void
710tr_peerMgrSetUtpFailed (tr_torrent *tor, const tr_address *addr, bool failed)
711{
712    struct peer_atom * atom = getExistingAtom (tor->torrentPeers, addr);
713
714    if (atom)
715        atom->utp_failed = failed;
716}
717
718
719/**
720***  REQUESTS
721***
722*** There are two data structures associated with managing block requests:
723***
724*** 1. Torrent::requests, an array of "struct block_request" which keeps
725***    track of which blocks have been requested, and when, and by which peers.
726***    This is list is used for (a) cancelling requests that have been pending
727***    for too long and (b) avoiding duplicate requests before endgame.
728***
729*** 2. Torrent::pieces, an array of "struct weighted_piece" which lists the
730***    pieces that we want to request. It's used to decide which blocks to
731***    return next when tr_peerMgrGetBlockRequests () is called.
732**/
733
734/**
735*** struct block_request
736**/
737
738static int
739compareReqByBlock (const void * va, const void * vb)
740{
741    const struct block_request * a = va;
742    const struct block_request * b = vb;
743
744    /* primary key: block */
745    if (a->block < b->block) return -1;
746    if (a->block > b->block) return 1;
747
748    /* secondary key: peer */
749    if (a->peer < b->peer) return -1;
750    if (a->peer > b->peer) return 1;
751
752    return 0;
753}
754
755static void
756requestListAdd (Torrent * t, tr_block_index_t block, tr_peer * peer)
757{
758    struct block_request key;
759
760    /* ensure enough room is available... */
761    if (t->requestCount + 1 >= t->requestAlloc)
762    {
763        const int CHUNK_SIZE = 128;
764        t->requestAlloc += CHUNK_SIZE;
765        t->requests = tr_renew (struct block_request,
766                                t->requests, t->requestAlloc);
767    }
768
769    /* populate the record we're inserting */
770    key.block = block;
771    key.peer = peer;
772    key.sentAt = tr_time ();
773
774    /* insert the request to our array... */
775    {
776        bool exact;
777        const int pos = tr_lowerBound (&key, t->requests, t->requestCount,
778                                       sizeof (struct block_request),
779                                       compareReqByBlock, &exact);
780        assert (!exact);
781        memmove (t->requests + pos + 1,
782                 t->requests + pos,
783                 sizeof (struct block_request) * (t->requestCount++ - pos));
784        t->requests[pos] = key;
785    }
786
787    if (peer != NULL)
788    {
789        ++peer->pendingReqsToPeer;
790        assert (peer->pendingReqsToPeer >= 0);
791    }
792
793    /*fprintf (stderr, "added request of block %lu from peer %s... "
794                       "there are now %d block\n",
795                     (unsigned long)block, tr_atomAddrStr (peer->atom), t->requestCount);*/
796}
797
798static struct block_request *
799requestListLookup (Torrent * t, tr_block_index_t block, const tr_peer * peer)
800{
801    struct block_request key;
802    key.block = block;
803    key.peer = (tr_peer*) peer;
804
805    return bsearch (&key, t->requests, t->requestCount,
806                    sizeof (struct block_request),
807                    compareReqByBlock);
808}
809
810/**
811 * Find the peers are we currently requesting the block
812 * with index @a block from and append them to @a peerArr.
813 */
814static void
815getBlockRequestPeers (Torrent * t, tr_block_index_t block,
816                      tr_ptrArray * peerArr)
817{
818    bool exact;
819    int i, pos;
820    struct block_request key;
821
822    key.block = block;
823    key.peer = NULL;
824    pos = tr_lowerBound (&key, t->requests, t->requestCount,
825                         sizeof (struct block_request),
826                         compareReqByBlock, &exact);
827
828    assert (!exact); /* shouldn't have a request with .peer == NULL */
829
830    for (i = pos; i < t->requestCount; ++i)
831    {
832        if (t->requests[i].block != block)
833            break;
834        tr_ptrArrayAppend (peerArr, t->requests[i].peer);
835    }
836}
837
838static void
839decrementPendingReqCount (const struct block_request * b)
840{
841    if (b->peer != NULL)
842        if (b->peer->pendingReqsToPeer > 0)
843            --b->peer->pendingReqsToPeer;
844}
845
846static void
847requestListRemove (Torrent * t, tr_block_index_t block, const tr_peer * peer)
848{
849    const struct block_request * b = requestListLookup (t, block, peer);
850    if (b != NULL)
851    {
852        const int pos = b - t->requests;
853        assert (pos < t->requestCount);
854
855        decrementPendingReqCount (b);
856
857        tr_removeElementFromArray (t->requests,
858                                   pos,
859                                   sizeof (struct block_request),
860                                   t->requestCount--);
861
862        /*fprintf (stderr, "removing request of block %lu from peer %s... "
863                           "there are now %d block requests left\n",
864                         (unsigned long)block, tr_atomAddrStr (peer->atom), t->requestCount);*/
865    }
866}
867
868static int
869countActiveWebseeds (const Torrent * t)
870{
871    int activeCount = 0;
872    const tr_webseed ** w = (const tr_webseed **) tr_ptrArrayBase (&t->webseeds);
873    const tr_webseed ** const wend = w + tr_ptrArraySize (&t->webseeds);
874
875    for (; w!=wend; ++w)
876        if (tr_webseedIsActive (*w))
877            ++activeCount;
878
879    return activeCount;
880}
881
882static bool
883testForEndgame (const Torrent * t)
884{
885    /* we consider ourselves to be in endgame if the number of bytes
886       we've got requested is >= the number of bytes left to download */
887    return (t->requestCount * t->tor->blockSize)
888               >= tr_cpLeftUntilDone (&t->tor->completion);
889}
890
891static void
892updateEndgame (Torrent * t)
893{
894    assert (t->requestCount >= 0);
895
896    if (!testForEndgame (t))
897    {
898        /* not in endgame */
899        t->endgame = 0;
900    }
901    else if (!t->endgame) /* only recalculate when endgame first begins */
902    {
903        int numDownloading = 0;
904        const tr_peer ** p = (const tr_peer **) tr_ptrArrayBase (&t->peers);
905        const tr_peer ** const pend = p + tr_ptrArraySize (&t->peers);
906
907        /* add the active bittorrent peers... */
908        for (; p!=pend; ++p)
909            if ((*p)->pendingReqsToPeer > 0)
910                ++numDownloading;
911
912        /* add the active webseeds... */
913        numDownloading += countActiveWebseeds (t);
914
915        /* average number of pending requests per downloading peer */
916        t->endgame = t->requestCount / MAX (numDownloading, 1);
917    }
918}
919
920
921/****
922*****
923*****  Piece List Manipulation / Accessors
924*****
925****/
926
927static inline void
928invalidatePieceSorting (Torrent * t)
929{
930    t->pieceSortState = PIECES_UNSORTED;
931}
932
933static const tr_torrent * weightTorrent;
934
935static const uint16_t * weightReplication;
936
937static void
938setComparePieceByWeightTorrent (Torrent * t)
939{
940    if (!replicationExists (t))
941        replicationNew (t);
942
943    weightTorrent = t->tor;
944    weightReplication = t->pieceReplication;
945}
946
947/* we try to create a "weight" s.t. high-priority pieces come before others,
948 * and that partially-complete pieces come before empty ones. */
949static int
950comparePieceByWeight (const void * va, const void * vb)
951{
952    const struct weighted_piece * a = va;
953    const struct weighted_piece * b = vb;
954    int ia, ib, missing, pending;
955    const tr_torrent * tor = weightTorrent;
956    const uint16_t * rep = weightReplication;
957
958    /* primary key: weight */
959    missing = tr_cpMissingBlocksInPiece (&tor->completion, a->index);
960    pending = a->requestCount;
961    ia = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
962    missing = tr_cpMissingBlocksInPiece (&tor->completion, b->index);
963    pending = b->requestCount;
964    ib = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
965    if (ia < ib) return -1;
966    if (ia > ib) return 1;
967
968    /* secondary key: higher priorities go first */
969    ia = tor->info.pieces[a->index].priority;
970    ib = tor->info.pieces[b->index].priority;
971    if (ia > ib) return -1;
972    if (ia < ib) return 1;
973
974    /* tertiary key: rarest first. */
975    ia = rep[a->index];
976    ib = rep[b->index];
977    if (ia < ib) return -1;
978    if (ia > ib) return 1;
979
980    /* quaternary key: random */
981    if (a->salt < b->salt) return -1;
982    if (a->salt > b->salt) return 1;
983
984    /* okay, they're equal */
985    return 0;
986}
987
988static int
989comparePieceByIndex (const void * va, const void * vb)
990{
991    const struct weighted_piece * a = va;
992    const struct weighted_piece * b = vb;
993    if (a->index < b->index) return -1;
994    if (a->index > b->index) return 1;
995    return 0;
996}
997
998static void
999pieceListSort (Torrent * t, enum piece_sort_state state)
1000{
1001    assert (state==PIECES_SORTED_BY_INDEX
1002         || state==PIECES_SORTED_BY_WEIGHT);
1003
1004
1005    if (state == PIECES_SORTED_BY_WEIGHT)
1006    {
1007        setComparePieceByWeightTorrent (t);
1008        qsort (t->pieces, t->pieceCount, sizeof (struct weighted_piece), comparePieceByWeight);
1009    }
1010    else
1011        qsort (t->pieces, t->pieceCount, sizeof (struct weighted_piece), comparePieceByIndex);
1012
1013    t->pieceSortState = state;
1014}
1015
1016/**
1017 * These functions are useful for testing, but too expensive for nightly builds.
1018 * let's leave it disabled but add an easy hook to compile it back in
1019 */
1020#if 1
1021#define assertWeightedPiecesAreSorted(t)
1022#define assertReplicationCountIsExact(t)
1023#else
1024static void
1025assertWeightedPiecesAreSorted (Torrent * t)
1026{
1027    if (!t->endgame)
1028    {
1029        int i;
1030        setComparePieceByWeightTorrent (t);
1031        for (i=0; i<t->pieceCount-1; ++i)
1032            assert (comparePieceByWeight (&t->pieces[i], &t->pieces[i+1]) <= 0);
1033    }
1034}
1035static void
1036assertReplicationCountIsExact (Torrent * t)
1037{
1038    /* This assert might fail due to errors of implementations in other
1039     * clients. It happens when receiving duplicate bitfields/HaveAll/HaveNone
1040     * from a client. If a such a behavior is noticed,
1041     * a bug report should be filled to the faulty client. */
1042
1043    size_t piece_i;
1044    const uint16_t * rep = t->pieceReplication;
1045    const size_t piece_count = t->pieceReplicationSize;
1046    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase (&t->peers);
1047    const int peer_count = tr_ptrArraySize (&t->peers);
1048
1049    assert (piece_count == t->tor->info.pieceCount);
1050
1051    for (piece_i=0; piece_i<piece_count; ++piece_i)
1052    {
1053        int peer_i;
1054        uint16_t r = 0;
1055
1056        for (peer_i=0; peer_i<peer_count; ++peer_i)
1057            if (tr_bitsetHas (&peers[peer_i]->have, piece_i))
1058                ++r;
1059
1060        assert (rep[piece_i] == r);
1061    }
1062}
1063#endif
1064
1065static struct weighted_piece *
1066pieceListLookup (Torrent * t, tr_piece_index_t index)
1067{
1068    int i;
1069
1070    for (i=0; i<t->pieceCount; ++i)
1071        if (t->pieces[i].index == index)
1072            return &t->pieces[i];
1073
1074    return NULL;
1075}
1076
1077static void
1078pieceListRebuild (Torrent * t)
1079{
1080
1081    if (!tr_torrentIsSeed (t->tor))
1082    {
1083        tr_piece_index_t i;
1084        tr_piece_index_t * pool;
1085        tr_piece_index_t poolCount = 0;
1086        const tr_torrent * tor = t->tor;
1087        const tr_info * inf = tr_torrentInfo (tor);
1088        struct weighted_piece * pieces;
1089        int pieceCount;
1090
1091        /* build the new list */
1092        pool = tr_new (tr_piece_index_t, inf->pieceCount);
1093        for (i=0; i<inf->pieceCount; ++i)
1094            if (!inf->pieces[i].dnd)
1095                if (!tr_cpPieceIsComplete (&tor->completion, i))
1096                    pool[poolCount++] = i;
1097        pieceCount = poolCount;
1098        pieces = tr_new0 (struct weighted_piece, pieceCount);
1099        for (i=0; i<poolCount; ++i) {
1100            struct weighted_piece * piece = pieces + i;
1101            piece->index = pool[i];
1102            piece->requestCount = 0;
1103            piece->salt = tr_cryptoWeakRandInt (4096);
1104        }
1105
1106        /* if we already had a list of pieces, merge it into
1107         * the new list so we don't lose its requestCounts */
1108        if (t->pieces != NULL)
1109        {
1110            struct weighted_piece * o = t->pieces;
1111            struct weighted_piece * oend = o + t->pieceCount;
1112            struct weighted_piece * n = pieces;
1113            struct weighted_piece * nend = n + pieceCount;
1114
1115            pieceListSort (t, PIECES_SORTED_BY_INDEX);
1116
1117            while (o!=oend && n!=nend) {
1118                if (o->index < n->index)
1119                    ++o;
1120                else if (o->index > n->index)
1121                    ++n;
1122                else
1123                    *n++ = *o++;
1124            }
1125
1126            tr_free (t->pieces);
1127        }
1128
1129        t->pieces = pieces;
1130        t->pieceCount = pieceCount;
1131
1132        pieceListSort (t, PIECES_SORTED_BY_WEIGHT);
1133
1134        /* cleanup */
1135        tr_free (pool);
1136    }
1137}
1138
1139static void
1140pieceListRemovePiece (Torrent * t, tr_piece_index_t piece)
1141{
1142    struct weighted_piece * p;
1143
1144    if ((p = pieceListLookup (t, piece)))
1145    {
1146        const int pos = p - t->pieces;
1147
1148        tr_removeElementFromArray (t->pieces,
1149                                   pos,
1150                                   sizeof (struct weighted_piece),
1151                                   t->pieceCount--);
1152
1153        if (t->pieceCount == 0)
1154        {
1155            tr_free (t->pieces);
1156            t->pieces = NULL;
1157        }
1158    }
1159}
1160
1161static void
1162pieceListResortPiece (Torrent * t, struct weighted_piece * p)
1163{
1164    int pos;
1165    bool isSorted = true;
1166
1167    if (p == NULL)
1168        return;
1169
1170    /* is the torrent already sorted? */
1171    pos = p - t->pieces;
1172    setComparePieceByWeightTorrent (t);
1173    if (isSorted && (pos > 0) && (comparePieceByWeight (p-1, p) > 0))
1174        isSorted = false;
1175    if (isSorted && (pos < t->pieceCount - 1) && (comparePieceByWeight (p, p+1) > 0))
1176        isSorted = false;
1177
1178    if (t->pieceSortState != PIECES_SORTED_BY_WEIGHT)
1179    {
1180       pieceListSort (t, PIECES_SORTED_BY_WEIGHT);
1181       isSorted = true;
1182    }
1183
1184    /* if it's not sorted, move it around */
1185    if (!isSorted)
1186    {
1187        bool exact;
1188        const struct weighted_piece tmp = *p;
1189
1190        tr_removeElementFromArray (t->pieces,
1191                                   pos,
1192                                   sizeof (struct weighted_piece),
1193                                   t->pieceCount--);
1194
1195        pos = tr_lowerBound (&tmp, t->pieces, t->pieceCount,
1196                             sizeof (struct weighted_piece),
1197                             comparePieceByWeight, &exact);
1198
1199        memmove (&t->pieces[pos + 1],
1200                 &t->pieces[pos],
1201                 sizeof (struct weighted_piece) * (t->pieceCount++ - pos));
1202
1203        t->pieces[pos] = tmp;
1204    }
1205
1206    assertWeightedPiecesAreSorted (t);
1207}
1208
1209static void
1210pieceListRemoveRequest (Torrent * t, tr_block_index_t block)
1211{
1212    struct weighted_piece * p;
1213    const tr_piece_index_t index = tr_torBlockPiece (t->tor, block);
1214
1215    if (((p = pieceListLookup (t, index))) && (p->requestCount > 0))
1216    {
1217        --p->requestCount;
1218        pieceListResortPiece (t, p);
1219    }
1220}
1221
1222
1223/****
1224*****
1225*****  Replication count (for rarest first policy)
1226*****
1227****/
1228
1229/**
1230 * Increase the replication count of this piece and sort it if the
1231 * piece list is already sorted
1232 */
1233static void
1234tr_incrReplicationOfPiece (Torrent * t, const size_t index)
1235{
1236    assert (replicationExists (t));
1237    assert (t->pieceReplicationSize == t->tor->info.pieceCount);
1238
1239    /* One more replication of this piece is present in the swarm */
1240    ++t->pieceReplication[index];
1241
1242    /* we only resort the piece if the list is already sorted */
1243    if (t->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1244        pieceListResortPiece (t, pieceListLookup (t, index));
1245}
1246
1247/**
1248 * Increases the replication count of pieces present in the bitfield
1249 */
1250static void
1251tr_incrReplicationFromBitfield (Torrent * t, const tr_bitfield * b)
1252{
1253    size_t i;
1254    uint16_t * rep = t->pieceReplication;
1255    const size_t n = t->tor->info.pieceCount;
1256
1257    assert (replicationExists (t));
1258
1259    for (i=0; i<n; ++i)
1260        if (tr_bitfieldHas (b, i))
1261            ++rep[i];
1262
1263    if (t->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1264        invalidatePieceSorting (t);
1265}
1266
1267/**
1268 * Increase the replication count of every piece
1269 */
1270static void
1271tr_incrReplication (Torrent * t)
1272{
1273    int i;
1274    const int n = t->pieceReplicationSize;
1275
1276    assert (replicationExists (t));
1277    assert (t->pieceReplicationSize == t->tor->info.pieceCount);
1278
1279    for (i=0; i<n; ++i)
1280        ++t->pieceReplication[i];
1281}
1282
1283/**
1284 * Decrease the replication count of pieces present in the bitset.
1285 */
1286static void
1287tr_decrReplicationFromBitfield (Torrent * t, const tr_bitfield * b)
1288{
1289    int i;
1290    const int n = t->pieceReplicationSize;
1291
1292    assert (replicationExists (t));
1293    assert (t->pieceReplicationSize == t->tor->info.pieceCount);
1294
1295    if (tr_bitfieldHasAll (b))
1296    {
1297        for (i=0; i<n; ++i)
1298            --t->pieceReplication[i];
1299    }
1300    else if (!tr_bitfieldHasNone (b))
1301    {
1302        for (i=0; i<n; ++i)
1303            if (tr_bitfieldHas (b, i))
1304                --t->pieceReplication[i];
1305
1306        if (t->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1307            invalidatePieceSorting (t);
1308    }
1309}
1310
1311/**
1312***
1313**/
1314
1315void
1316tr_peerMgrRebuildRequests (tr_torrent * tor)
1317{
1318    assert (tr_isTorrent (tor));
1319
1320    pieceListRebuild (tor->torrentPeers);
1321}
1322
1323void
1324tr_peerMgrGetNextRequests (tr_torrent           * tor,
1325                           tr_peer              * peer,
1326                           int                    numwant,
1327                           tr_block_index_t     * setme,
1328                           int                  * numgot,
1329                           bool                   get_intervals)
1330{
1331    int i;
1332    int got;
1333    Torrent * t;
1334    struct weighted_piece * pieces;
1335    const tr_bitfield * const have = &peer->have;
1336
1337    /* sanity clause */
1338    assert (tr_isTorrent (tor));
1339    assert (peer->clientIsInterested);
1340    assert (!peer->clientIsChoked);
1341    assert (numwant > 0);
1342
1343    /* walk through the pieces and find blocks that should be requested */
1344    got = 0;
1345    t = tor->torrentPeers;
1346
1347    /* prep the pieces list */
1348    if (t->pieces == NULL)
1349        pieceListRebuild (t);
1350
1351    if (t->pieceSortState != PIECES_SORTED_BY_WEIGHT)
1352        pieceListSort (t, PIECES_SORTED_BY_WEIGHT);
1353
1354    assertReplicationCountIsExact (t);
1355    assertWeightedPiecesAreSorted (t);
1356
1357    updateEndgame (t);
1358    pieces = t->pieces;
1359    for (i=0; i<t->pieceCount && got<numwant; ++i)
1360    {
1361        struct weighted_piece * p = pieces + i;
1362
1363        /* if the peer has this piece that we want... */
1364        if (tr_bitfieldHas (have, p->index))
1365        {
1366            tr_block_index_t b;
1367            tr_block_index_t first;
1368            tr_block_index_t last;
1369            tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1370
1371            tr_torGetPieceBlockRange (tor, p->index, &first, &last);
1372
1373            for (b=first; b<=last && (got<numwant || (get_intervals && setme[2*got-1] == b-1)); ++b)
1374            {
1375                int peerCount;
1376                tr_peer ** peers;
1377
1378                /* don't request blocks we've already got */
1379                if (tr_cpBlockIsComplete (&tor->completion, b))
1380                    continue;
1381
1382                /* always add peer if this block has no peers yet */
1383                tr_ptrArrayClear (&peerArr);
1384                getBlockRequestPeers (t, b, &peerArr);
1385                peers = (tr_peer **) tr_ptrArrayPeek (&peerArr, &peerCount);
1386                if (peerCount != 0)
1387                {
1388                    /* don't make a second block request until the endgame */
1389                    if (!t->endgame)
1390                        continue;
1391
1392                    /* don't have more than two peers requesting this block */
1393                    if (peerCount > 1)
1394                        continue;
1395
1396                    /* don't send the same request to the same peer twice */
1397                    if (peer == peers[0])
1398                        continue;
1399
1400                    /* in the endgame allow an additional peer to download a
1401                       block but only if the peer seems to be handling requests
1402                       relatively fast */
1403                    if (peer->pendingReqsToPeer + numwant - got < t->endgame)
1404                        continue;
1405                }
1406
1407                /* update the caller's table */
1408                if (!get_intervals) {
1409                    setme[got++] = b;
1410                }
1411                /* if intervals are requested two array entries are necessarry:
1412                   one for the interval's starting block and one for its end block */
1413                else if (got && setme[2 * got - 1] == b - 1 && b != first) {
1414                    /* expand the last interval */
1415                    ++setme[2 * got - 1];
1416                }
1417                else {
1418                    /* begin a new interval */
1419                    setme[2 * got] = setme[2 * got + 1] = b;
1420                    ++got;
1421                }
1422
1423                /* update our own tables */
1424                requestListAdd (t, b, peer);
1425                ++p->requestCount;
1426            }
1427
1428            tr_ptrArrayDestruct (&peerArr, NULL);
1429        }
1430    }
1431
1432    /* In most cases we've just changed the weights of a small number of pieces.
1433     * So rather than qsort ()ing the entire array, it's faster to apply an
1434     * adaptive insertion sort algorithm. */
1435    if (got > 0)
1436    {
1437        /* not enough requests || last piece modified */
1438        if (i == t->pieceCount) --i;
1439
1440        setComparePieceByWeightTorrent (t);
1441        while (--i >= 0)
1442        {
1443            bool exact;
1444
1445            /* relative position! */
1446            const int newpos = tr_lowerBound (&t->pieces[i], &t->pieces[i + 1],
1447                                              t->pieceCount - (i + 1),
1448                                              sizeof (struct weighted_piece),
1449                                              comparePieceByWeight, &exact);
1450            if (newpos > 0)
1451            {
1452                const struct weighted_piece piece = t->pieces[i];
1453                memmove (&t->pieces[i],
1454                         &t->pieces[i + 1],
1455                         sizeof (struct weighted_piece) * (newpos));
1456                t->pieces[i + newpos] = piece;
1457            }
1458        }
1459    }
1460
1461    assertWeightedPiecesAreSorted (t);
1462    *numgot = got;
1463}
1464
1465bool
1466tr_peerMgrDidPeerRequest (const tr_torrent  * tor,
1467                          const tr_peer     * peer,
1468                          tr_block_index_t    block)
1469{
1470    const Torrent * t = tor->torrentPeers;
1471    return requestListLookup ((Torrent*)t, block, peer) != NULL;
1472}
1473
1474/* cancel requests that are too old */
1475static void
1476refillUpkeep (int foo UNUSED, short bar UNUSED, void * vmgr)
1477{
1478    time_t now;
1479    time_t too_old;
1480    tr_torrent * tor;
1481    int cancel_buflen = 0;
1482    struct block_request * cancel = NULL;
1483    tr_peerMgr * mgr = vmgr;
1484    managerLock (mgr);
1485
1486    now = tr_time ();
1487    too_old = now - REQUEST_TTL_SECS;
1488
1489    /* alloc the temporary "cancel" buffer */
1490    tor = NULL;
1491    while ((tor = tr_torrentNext (mgr->session, tor)))
1492        cancel_buflen = MAX (cancel_buflen, tor->torrentPeers->requestCount);
1493    if (cancel_buflen > 0)
1494        cancel = tr_new (struct block_request, cancel_buflen);
1495
1496    /* prune requests that are too old */
1497    tor = NULL;
1498    while ((tor = tr_torrentNext (mgr->session, tor)))
1499    {
1500        Torrent * t = tor->torrentPeers;
1501        const int n = t->requestCount;
1502        if (n > 0)
1503        {
1504            int keepCount = 0;
1505            int cancelCount = 0;
1506            const struct block_request * it;
1507            const struct block_request * end;
1508
1509            for (it=t->requests, end=it+n; it!=end; ++it)
1510            {
1511                if ((it->sentAt <= too_old) && it->peer->msgs && !tr_peerMsgsIsReadingBlock (it->peer->msgs, it->block))
1512                    cancel[cancelCount++] = *it;
1513                else
1514                {
1515                    if (it != &t->requests[keepCount])
1516                        t->requests[keepCount] = *it;
1517                    keepCount++;
1518                }
1519            }
1520
1521            /* prune out the ones we aren't keeping */
1522            t->requestCount = keepCount;
1523
1524            /* send cancel messages for all the "cancel" ones */
1525            for (it=cancel, end=it+cancelCount; it!=end; ++it) {
1526                if ((it->peer != NULL) && (it->peer->msgs != NULL)) {
1527                    tr_historyAdd (&it->peer->cancelsSentToPeer, now, 1);
1528                    tr_peerMsgsCancel (it->peer->msgs, it->block);
1529                    decrementPendingReqCount (it);
1530                }
1531            }
1532
1533            /* decrement the pending request counts for the timed-out blocks */
1534            for (it=cancel, end=it+cancelCount; it!=end; ++it)
1535                pieceListRemoveRequest (t, it->block);
1536        }
1537    }
1538
1539    tr_free (cancel);
1540    tr_timerAddMsec (mgr->refillUpkeepTimer, REFILL_UPKEEP_PERIOD_MSEC);
1541    managerUnlock (mgr);
1542}
1543
1544static void
1545addStrike (Torrent * t, tr_peer * peer)
1546{
1547    tordbg (t, "increasing peer %s strike count to %d",
1548            tr_atomAddrStr (peer->atom), peer->strikes + 1);
1549
1550    if (++peer->strikes >= MAX_BAD_PIECES_PER_PEER)
1551    {
1552        struct peer_atom * atom = peer->atom;
1553        atom->flags2 |= MYFLAG_BANNED;
1554        peer->doPurge = 1;
1555        tordbg (t, "banning peer %s", tr_atomAddrStr (atom));
1556    }
1557}
1558
1559static void
1560gotBadPiece (Torrent * t, tr_piece_index_t pieceIndex)
1561{
1562    tr_torrent *   tor = t->tor;
1563    const uint32_t byteCount = tr_torPieceCountBytes (tor, pieceIndex);
1564
1565    tor->corruptCur += byteCount;
1566    tor->downloadedCur -= MIN (tor->downloadedCur, byteCount);
1567
1568    tr_announcerAddBytes (tor, TR_ANN_CORRUPT, byteCount);
1569}
1570
1571static void
1572peerSuggestedPiece (Torrent            * t UNUSED,
1573                    tr_peer            * peer UNUSED,
1574                    tr_piece_index_t     pieceIndex UNUSED,
1575                    int                  isFastAllowed UNUSED)
1576{
1577#if 0
1578    assert (t);
1579    assert (peer);
1580    assert (peer->msgs);
1581
1582    /* is this a valid piece? */
1583    if (pieceIndex >= t->tor->info.pieceCount)
1584        return;
1585
1586    /* don't ask for it if we've already got it */
1587    if (tr_cpPieceIsComplete (t->tor->completion, pieceIndex))
1588        return;
1589
1590    /* don't ask for it if they don't have it */
1591    if (!tr_bitfieldHas (peer->have, pieceIndex))
1592        return;
1593
1594    /* don't ask for it if we're choked and it's not fast */
1595    if (!isFastAllowed && peer->clientIsChoked)
1596        return;
1597
1598    /* request the blocks that we don't have in this piece */
1599    {
1600        tr_block_index_t b;
1601        tr_block_index_t first;
1602        tr_block_index_t last;
1603        const tr_torrent * tor = t->tor;
1604
1605        tr_torGetPieceBlockRange (t->tor, pieceIndex, &first, &last);
1606
1607        for (b=first; b<=last; ++b)
1608        {
1609            if (!tr_cpBlockIsComplete (tor->completion, b))
1610            {
1611                const uint32_t offset = getBlockOffsetInPiece (tor, b);
1612                const uint32_t length = tr_torBlockCountBytes (tor, b);
1613                tr_peerMsgsAddRequest (peer->msgs, pieceIndex, offset, length);
1614                incrementPieceRequests (t, pieceIndex);
1615            }
1616        }
1617    }
1618#endif
1619}
1620
1621static void
1622removeRequestFromTables (Torrent * t, tr_block_index_t block, const tr_peer * peer)
1623{
1624    requestListRemove (t, block, peer);
1625    pieceListRemoveRequest (t, block);
1626}
1627
1628/* peer choked us, or maybe it disconnected.
1629   either way we need to remove all its requests */
1630static void
1631peerDeclinedAllRequests (Torrent * t, const tr_peer * peer)
1632{
1633    int i, n;
1634    tr_block_index_t * blocks = tr_new (tr_block_index_t, t->requestCount);
1635
1636    for (i=n=0; i<t->requestCount; ++i)
1637        if (peer == t->requests[i].peer)
1638            blocks[n++] = t->requests[i].block;
1639
1640    for (i=0; i<n; ++i)
1641        removeRequestFromTables (t, blocks[i], peer);
1642
1643    tr_free (blocks);
1644}
1645
1646static void tr_peerMgrSetBlame (tr_torrent *, tr_piece_index_t, int);
1647
1648static void
1649peerCallbackFunc (tr_peer * peer, const tr_peer_event * e, void * vt)
1650{
1651    Torrent * t = vt;
1652
1653    torrentLock (t);
1654
1655    assert (peer != NULL);
1656
1657    switch (e->eventType)
1658    {
1659        case TR_PEER_PEER_GOT_DATA:
1660        {
1661            const time_t now = tr_time ();
1662            tr_torrent * tor = t->tor;
1663
1664            if (e->wasPieceData)
1665            {
1666                tor->uploadedCur += e->length;
1667                tr_announcerAddBytes (tor, TR_ANN_UP, e->length);
1668                tr_torrentSetActivityDate (tor, now);
1669                tr_torrentSetDirty (tor);
1670            }
1671
1672            /* update the stats */
1673            if (e->wasPieceData)
1674                tr_statsAddUploaded (tor->session, e->length);
1675
1676            /* update our atom */
1677            if (peer->atom && e->wasPieceData)
1678                peer->atom->piece_data_time = now;
1679
1680            break;
1681        }
1682
1683        case TR_PEER_CLIENT_GOT_HAVE:
1684            if (replicationExists (t)) {
1685                tr_incrReplicationOfPiece (t, e->pieceIndex);
1686                assertReplicationCountIsExact (t);
1687            }
1688            break;
1689
1690        case TR_PEER_CLIENT_GOT_HAVE_ALL:
1691            if (replicationExists (t)) {
1692                tr_incrReplication (t);
1693                assertReplicationCountIsExact (t);
1694            }
1695            break;
1696
1697        case TR_PEER_CLIENT_GOT_HAVE_NONE:
1698            /* noop */
1699            break;
1700
1701        case TR_PEER_CLIENT_GOT_BITFIELD:
1702            assert (e->bitfield != NULL);
1703            if (replicationExists (t)) {
1704                tr_incrReplicationFromBitfield (t, e->bitfield);
1705                assertReplicationCountIsExact (t);
1706            }
1707            break;
1708
1709        case TR_PEER_CLIENT_GOT_REJ: {
1710            tr_block_index_t b = _tr_block (t->tor, e->pieceIndex, e->offset);
1711            if (b < t->tor->blockCount)
1712                removeRequestFromTables (t, b, peer);
1713            else
1714                tordbg (t, "Peer %s sent an out-of-range reject message",
1715                           tr_atomAddrStr (peer->atom));
1716            break;
1717        }
1718
1719        case TR_PEER_CLIENT_GOT_CHOKE:
1720            peerDeclinedAllRequests (t, peer);
1721            break;
1722
1723        case TR_PEER_CLIENT_GOT_PORT:
1724            if (peer->atom)
1725                peer->atom->port = e->port;
1726            break;
1727
1728        case TR_PEER_CLIENT_GOT_SUGGEST:
1729            peerSuggestedPiece (t, peer, e->pieceIndex, false);
1730            break;
1731
1732        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1733            peerSuggestedPiece (t, peer, e->pieceIndex, true);
1734            break;
1735
1736        case TR_PEER_CLIENT_GOT_DATA:
1737        {
1738            const time_t now = tr_time ();
1739            tr_torrent * tor = t->tor;
1740
1741            if (e->wasPieceData)
1742            {
1743                tor->downloadedCur += e->length;
1744                tr_torrentSetActivityDate (tor, now);
1745                tr_torrentSetDirty (tor);
1746            }
1747
1748            /* update the stats */
1749            if (e->wasPieceData)
1750                tr_statsAddDownloaded (tor->session, e->length);
1751
1752            /* update our atom */
1753            if (peer->atom && e->wasPieceData)
1754                peer->atom->piece_data_time = now;
1755
1756            break;
1757        }
1758
1759        case TR_PEER_CLIENT_GOT_BLOCK:
1760        {
1761            tr_torrent * tor = t->tor;
1762            tr_block_index_t block = _tr_block (tor, e->pieceIndex, e->offset);
1763            int i, peerCount;
1764            tr_peer ** peers;
1765            tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1766
1767            removeRequestFromTables (t, block, peer);
1768            getBlockRequestPeers (t, block, &peerArr);
1769            peers = (tr_peer **) tr_ptrArrayPeek (&peerArr, &peerCount);
1770
1771            /* remove additional block requests and send cancel to peers */
1772            for (i=0; i<peerCount; i++) {
1773                tr_peer * p = peers[i];
1774                assert (p != peer);
1775                if (p->msgs) {
1776                    tr_historyAdd (&p->cancelsSentToPeer, tr_time (), 1);
1777                    tr_peerMsgsCancel (p->msgs, block);
1778                }
1779                removeRequestFromTables (t, block, p);
1780            }
1781
1782            tr_ptrArrayDestruct (&peerArr, false);
1783
1784            tr_historyAdd (&peer->blocksSentToClient, tr_time (), 1);
1785
1786            if (tr_cpBlockIsComplete (&tor->completion, block))
1787            {
1788                /* we already have this block... */
1789                const uint32_t n = tr_torBlockCountBytes (tor, block);
1790                tor->downloadedCur -= MIN (tor->downloadedCur, n);
1791                tordbg (t, "we have this block already...");
1792            }
1793            else
1794            {
1795                tr_cpBlockAdd (&tor->completion, block);
1796                pieceListResortPiece (t, pieceListLookup (t, e->pieceIndex));
1797                tr_torrentSetDirty (tor);
1798
1799                if (tr_cpPieceIsComplete (&tor->completion, e->pieceIndex))
1800                {
1801                    const tr_piece_index_t p = e->pieceIndex;
1802                    const bool ok = tr_torrentCheckPiece (tor, p);
1803
1804                    tordbg (t, "[LAZY] checked just-completed piece %zu", (size_t)p);
1805
1806                    if (!ok)
1807                    {
1808                        tr_torerr (tor, _ ("Piece %lu, which was just downloaded, failed its checksum test"),
1809                                 (unsigned long)p);
1810                    }
1811
1812                    tr_peerMgrSetBlame (tor, p, ok);
1813
1814                    if (!ok)
1815                    {
1816                        gotBadPiece (t, p);
1817                    }
1818                    else
1819                    {
1820                        int i;
1821                        int peerCount;
1822                        tr_peer ** peers;
1823                        tr_file_index_t fileIndex;
1824
1825                        /* only add this to downloadedCur if we got it from a peer --
1826                         * webseeds shouldn't count against our ratio. As one tracker
1827                         * admin put it, "Those pieces are downloaded directly from the
1828                         * content distributor, not the peers, it is the tracker's job
1829                         * to manage the swarms, not the web server and does not fit
1830                         * into the jurisdiction of the tracker." */
1831                        if (peer->msgs != NULL) {
1832                            const uint32_t n = tr_torPieceCountBytes (tor, p);
1833                            tr_announcerAddBytes (tor, TR_ANN_DOWN, n);
1834                        }
1835
1836                        peerCount = tr_ptrArraySize (&t->peers);
1837                        peers = (tr_peer**) tr_ptrArrayBase (&t->peers);
1838                        for (i=0; i<peerCount; ++i)
1839                            tr_peerMsgsHave (peers[i]->msgs, p);
1840
1841                        for (fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex) {
1842                            const tr_file * file = &tor->info.files[fileIndex];
1843                            if ((file->firstPiece <= p) && (p <= file->lastPiece)) {
1844                                if (tr_cpFileIsComplete (&tor->completion, fileIndex)) {
1845                                    tr_cacheFlushFile (tor->session->cache, tor, fileIndex);
1846                                    tr_torrentFileCompleted (tor, fileIndex);
1847                                }
1848                            }
1849                        }
1850
1851                        pieceListRemovePiece (t, p);
1852                    }
1853                }
1854
1855                t->needsCompletenessCheck = true;
1856            }
1857            break;
1858        }
1859
1860        case TR_PEER_ERROR:
1861            if ((e->err == ERANGE) || (e->err == EMSGSIZE) || (e->err == ENOTCONN))
1862            {
1863                /* some protocol error from the peer */
1864                peer->doPurge = 1;
1865                tordbg (t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1866                        tr_atomAddrStr (peer->atom));
1867            }
1868            else
1869            {
1870                tordbg (t, "unhandled error: %s", tr_strerror (e->err));
1871            }
1872            break;
1873
1874        default:
1875            assert (0);
1876    }
1877
1878    torrentUnlock (t);
1879}
1880
1881static int
1882getDefaultShelfLife (uint8_t from)
1883{
1884    /* in general, peers obtained from firsthand contact
1885     * are better than those from secondhand, etc etc */
1886    switch (from)
1887    {
1888        case TR_PEER_FROM_INCOMING : return 60 * 60 * 6;
1889        case TR_PEER_FROM_LTEP     : return 60 * 60 * 6;
1890        case TR_PEER_FROM_TRACKER  : return 60 * 60 * 3;
1891        case TR_PEER_FROM_DHT      : return 60 * 60 * 3;
1892        case TR_PEER_FROM_PEX      : return 60 * 60 * 2;
1893        case TR_PEER_FROM_RESUME   : return 60 * 60;
1894        case TR_PEER_FROM_LPD      : return 10 * 60;
1895        default                    : return 60 * 60;
1896    }
1897}
1898
1899static void
1900ensureAtomExists (Torrent           * t,
1901                  const tr_address  * addr,
1902                  const tr_port       port,
1903                  const uint8_t       flags,
1904                  const int8_t        seedProbability,
1905                  const uint8_t       from)
1906{
1907    struct peer_atom * a;
1908
1909    assert (tr_address_is_valid (addr));
1910    assert (from < TR_PEER_FROM__MAX);
1911
1912    a = getExistingAtom (t, addr);
1913
1914    if (a == NULL)
1915    {
1916        const int jitter = tr_cryptoWeakRandInt (60*10);
1917        a = tr_new0 (struct peer_atom, 1);
1918        a->addr = *addr;
1919        a->port = port;
1920        a->flags = flags;
1921        a->fromFirst = from;
1922        a->fromBest = from;
1923        a->shelf_date = tr_time () + getDefaultShelfLife (from) + jitter;
1924        a->blocklisted = -1;
1925        atomSetSeedProbability (a, seedProbability);
1926        tr_ptrArrayInsertSorted (&t->pool, a, compareAtomsByAddress);
1927
1928        tordbg (t, "got a new atom: %s", tr_atomAddrStr (a));
1929    }
1930    else
1931    {
1932        if (from < a->fromBest)
1933            a->fromBest = from;
1934
1935        if (a->seedProbability == -1)
1936            atomSetSeedProbability (a, seedProbability);
1937
1938        a->flags |= flags;
1939    }
1940}
1941
1942static int
1943getMaxPeerCount (const tr_torrent * tor)
1944{
1945    return tor->maxConnectedPeers;
1946}
1947
1948static int
1949getPeerCount (const Torrent * t)
1950{
1951    return tr_ptrArraySize (&t->peers);/* + tr_ptrArraySize (&t->outgoingHandshakes); */
1952}
1953
1954/* FIXME: this is kind of a mess. */
1955static bool
1956myHandshakeDoneCB (tr_handshake  * handshake,
1957                   tr_peerIo     * io,
1958                   bool            readAnythingFromPeer,
1959                   bool            isConnected,
1960                   const uint8_t * peer_id,
1961                   void          * vmanager)
1962{
1963    bool               ok = isConnected;
1964    bool               success = false;
1965    tr_port            port;
1966    const tr_address * addr;
1967    tr_peerMgr       * manager = vmanager;
1968    Torrent          * t;
1969    tr_handshake     * ours;
1970
1971    assert (io);
1972    assert (tr_isBool (ok));
1973
1974    t = tr_peerIoHasTorrentHash (io)
1975        ? getExistingTorrent (manager, tr_peerIoGetTorrentHash (io))
1976        : NULL;
1977
1978    if (tr_peerIoIsIncoming (io))
1979        ours = tr_ptrArrayRemoveSorted (&manager->incomingHandshakes,
1980                                        handshake, handshakeCompare);
1981    else if (t)
1982        ours = tr_ptrArrayRemoveSorted (&t->outgoingHandshakes,
1983                                        handshake, handshakeCompare);
1984    else
1985        ours = handshake;
1986
1987    assert (ours);
1988    assert (ours == handshake);
1989
1990    if (t)
1991        torrentLock (t);
1992
1993    addr = tr_peerIoGetAddress (io, &port);
1994
1995    if (!ok || !t || !t->isRunning)
1996    {
1997        if (t)
1998        {
1999            struct peer_atom * atom = getExistingAtom (t, addr);
2000            if (atom)
2001            {
2002                ++atom->numFails;
2003
2004                if (!readAnythingFromPeer)
2005                {
2006                    tordbg (t, "marking peer %s as unreachable... numFails is %d", tr_atomAddrStr (atom), (int)atom->numFails);
2007                    atom->flags2 |= MYFLAG_UNREACHABLE;
2008                }
2009            }
2010        }
2011    }
2012    else /* looking good */
2013    {
2014        struct peer_atom * atom;
2015
2016        ensureAtomExists (t, addr, port, 0, -1, TR_PEER_FROM_INCOMING);
2017        atom = getExistingAtom (t, addr);
2018        atom->time = tr_time ();
2019        atom->piece_data_time = 0;
2020        atom->lastConnectionAt = tr_time ();
2021
2022        if (!tr_peerIoIsIncoming (io))
2023        {
2024            atom->flags |= ADDED_F_CONNECTABLE;
2025            atom->flags2 &= ~MYFLAG_UNREACHABLE;
2026        }
2027
2028        /* In principle, this flag specifies whether the peer groks uTP,
2029           not whether it's currently connected over uTP. */
2030        if (io->utp_socket)
2031            atom->flags |= ADDED_F_UTP_FLAGS;
2032
2033        if (atom->flags2 & MYFLAG_BANNED)
2034        {
2035            tordbg (t, "banned peer %s tried to reconnect",
2036                    tr_atomAddrStr (atom));
2037        }
2038        else if (tr_peerIoIsIncoming (io)
2039               && (getPeerCount (t) >= getMaxPeerCount (t->tor)))
2040
2041        {
2042        }
2043        else
2044        {
2045            tr_peer * peer = atom->peer;
2046
2047            if (peer) /* we already have this peer */
2048            {
2049            }
2050            else
2051            {
2052                peer = getPeer (t, atom);
2053                tr_free (peer->client);
2054
2055                if (!peer_id)
2056                    peer->client = NULL;
2057                else {
2058                    char client[128];
2059                    tr_clientForId (client, sizeof (client), peer_id);
2060                    peer->client = tr_strdup (client);
2061                }
2062
2063                peer->io = tr_handshakeStealIO (handshake); /* this steals its refcount too, which is
2064                                                                balanced by our unref in peerDelete ()  */
2065                tr_peerIoSetParent (peer->io, &t->tor->bandwidth);
2066                tr_peerMsgsNew (t->tor, peer, peerCallbackFunc, t);
2067
2068                success = true;
2069            }
2070        }
2071    }
2072
2073    if (t)
2074        torrentUnlock (t);
2075
2076    return success;
2077}
2078
2079void
2080tr_peerMgrAddIncoming (tr_peerMgr * manager,
2081                       tr_address * addr,
2082                       tr_port      port,
2083                       int          socket,
2084                       struct UTPSocket * utp_socket)
2085{
2086    tr_session * session;
2087
2088    managerLock (manager);
2089
2090    assert (tr_isSession (manager->session));
2091    session = manager->session;
2092
2093    if (tr_sessionIsAddressBlocked (session, addr))
2094    {
2095        tr_dbg ("Banned IP address \"%s\" tried to connect to us", tr_address_to_string (addr));
2096        if (socket >= 0)
2097            tr_netClose (session, socket);
2098        else
2099            UTP_Close (utp_socket);
2100    }
2101    else if (getExistingHandshake (&manager->incomingHandshakes, addr))
2102    {
2103        if (socket >= 0)
2104            tr_netClose (session, socket);
2105        else
2106            UTP_Close (utp_socket);
2107    }
2108    else /* we don't have a connection to them yet... */
2109    {
2110        tr_peerIo *    io;
2111        tr_handshake * handshake;
2112
2113        io = tr_peerIoNewIncoming (session, &session->bandwidth, addr, port, socket, utp_socket);
2114
2115        handshake = tr_handshakeNew (io,
2116                                     session->encryptionMode,
2117                                     myHandshakeDoneCB,
2118                                     manager);
2119
2120        tr_peerIoUnref (io); /* balanced by the implicit ref in tr_peerIoNewIncoming () */
2121
2122        tr_ptrArrayInsertSorted (&manager->incomingHandshakes, handshake,
2123                                 handshakeCompare);
2124    }
2125
2126    managerUnlock (manager);
2127}
2128
2129void
2130tr_peerMgrAddPex (tr_torrent * tor, uint8_t from,
2131                  const tr_pex * pex, int8_t seedProbability)
2132{
2133    if (tr_isPex (pex)) /* safeguard against corrupt data */
2134    {
2135        Torrent * t = tor->torrentPeers;
2136        managerLock (t->manager);
2137
2138        if (!tr_sessionIsAddressBlocked (t->manager->session, &pex->addr))
2139            if (tr_address_is_valid_for_peers (&pex->addr, pex->port))
2140                ensureAtomExists (t, &pex->addr, pex->port, pex->flags, seedProbability, from);
2141
2142        managerUnlock (t->manager);
2143    }
2144}
2145
2146void
2147tr_peerMgrMarkAllAsSeeds (tr_torrent * tor)
2148{
2149    Torrent * t = tor->torrentPeers;
2150    const int n = tr_ptrArraySize (&t->pool);
2151    struct peer_atom ** it = (struct peer_atom**) tr_ptrArrayBase (&t->pool);
2152    struct peer_atom ** end = it + n;
2153
2154    while (it != end)
2155        atomSetSeed (t, *it++);
2156}
2157
2158tr_pex *
2159tr_peerMgrCompactToPex (const void *    compact,
2160                        size_t          compactLen,
2161                        const uint8_t * added_f,
2162                        size_t          added_f_len,
2163                        size_t *        pexCount)
2164{
2165    size_t          i;
2166    size_t          n = compactLen / 6;
2167    const uint8_t * walk = compact;
2168    tr_pex *        pex = tr_new0 (tr_pex, n);
2169
2170    for (i = 0; i < n; ++i)
2171    {
2172        pex[i].addr.type = TR_AF_INET;
2173        memcpy (&pex[i].addr.addr, walk, 4); walk += 4;
2174        memcpy (&pex[i].port, walk, 2); walk += 2;
2175        if (added_f && (n == added_f_len))
2176            pex[i].flags = added_f[i];
2177    }
2178
2179    *pexCount = n;
2180    return pex;
2181}
2182
2183tr_pex *
2184tr_peerMgrCompact6ToPex (const void    * compact,
2185                         size_t          compactLen,
2186                         const uint8_t * added_f,
2187                         size_t          added_f_len,
2188                         size_t        * pexCount)
2189{
2190    size_t          i;
2191    size_t          n = compactLen / 18;
2192    const uint8_t * walk = compact;
2193    tr_pex *        pex = tr_new0 (tr_pex, n);
2194
2195    for (i = 0; i < n; ++i)
2196    {
2197        pex[i].addr.type = TR_AF_INET6;
2198        memcpy (&pex[i].addr.addr.addr6.s6_addr, walk, 16); walk += 16;
2199        memcpy (&pex[i].port, walk, 2); walk += 2;
2200        if (added_f && (n == added_f_len))
2201            pex[i].flags = added_f[i];
2202    }
2203
2204    *pexCount = n;
2205    return pex;
2206}
2207
2208tr_pex *
2209tr_peerMgrArrayToPex (const void * array,
2210                      size_t       arrayLen,
2211                      size_t      * pexCount)
2212{
2213    size_t          i;
2214    size_t          n = arrayLen / (sizeof (tr_address) + 2);
2215    /*size_t          n = arrayLen / sizeof (tr_peerArrayElement);*/
2216    const uint8_t * walk = array;
2217    tr_pex        * pex = tr_new0 (tr_pex, n);
2218
2219    for (i = 0 ; i < n ; i++) {
2220        memcpy (&pex[i].addr, walk, sizeof (tr_address));
2221        memcpy (&pex[i].port, walk + sizeof (tr_address), 2);
2222        pex[i].flags = 0x00;
2223        walk += sizeof (tr_address) + 2;
2224    }
2225
2226    *pexCount = n;
2227    return pex;
2228}
2229
2230/**
2231***
2232**/
2233
2234static void
2235tr_peerMgrSetBlame (tr_torrent     * tor,
2236                    tr_piece_index_t pieceIndex,
2237                    int              success)
2238{
2239    if (!success)
2240    {
2241        int        peerCount, i;
2242        Torrent *  t = tor->torrentPeers;
2243        tr_peer ** peers;
2244
2245        assert (torrentIsLocked (t));
2246
2247        peers = (tr_peer **) tr_ptrArrayPeek (&t->peers, &peerCount);
2248        for (i = 0; i < peerCount; ++i)
2249        {
2250            tr_peer * peer = peers[i];
2251            if (tr_bitfieldHas (&peer->blame, pieceIndex))
2252            {
2253                tordbg (t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
2254                        tr_atomAddrStr (peer->atom),
2255                        pieceIndex, (int)peer->strikes + 1);
2256                addStrike (t, peer);
2257            }
2258        }
2259    }
2260}
2261
2262int
2263tr_pexCompare (const void * va, const void * vb)
2264{
2265    const tr_pex * a = va;
2266    const tr_pex * b = vb;
2267    int i;
2268
2269    assert (tr_isPex (a));
2270    assert (tr_isPex (b));
2271
2272    if ((i = tr_address_compare (&a->addr, &b->addr)))
2273        return i;
2274
2275    if (a->port != b->port)
2276        return a->port < b->port ? -1 : 1;
2277
2278    return 0;
2279}
2280
2281/* better goes first */
2282static int
2283compareAtomsByUsefulness (const void * va, const void *vb)
2284{
2285    const struct peer_atom * a = * (const struct peer_atom**) va;
2286    const struct peer_atom * b = * (const struct peer_atom**) vb;
2287
2288    assert (tr_isAtom (a));
2289    assert (tr_isAtom (b));
2290
2291    if (a->piece_data_time != b->piece_data_time)
2292        return a->piece_data_time > b->piece_data_time ? -1 : 1;
2293    if (a->fromBest != b->fromBest)
2294        return a->fromBest < b->fromBest ? -1 : 1;
2295    if (a->numFails != b->numFails)
2296        return a->numFails < b->numFails ? -1 : 1;
2297
2298    return 0;
2299}
2300
2301static bool
2302isAtomInteresting (const tr_torrent * tor, struct peer_atom * atom)
2303{
2304    if (tr_torrentIsSeed (tor) && atomIsSeed (atom))
2305        return false;
2306
2307    if (peerIsInUse (tor->torrentPeers, atom))
2308        return true;
2309
2310    if (isAtomBlocklisted (tor->session, atom))
2311        return false;
2312
2313    if (atom->flags2 & MYFLAG_BANNED)
2314        return false;
2315
2316    return true;
2317}
2318
2319int
2320tr_peerMgrGetPeers (tr_torrent   * tor,
2321                    tr_pex      ** setme_pex,
2322                    uint8_t        af,
2323                    uint8_t        list_mode,
2324                    int            maxCount)
2325{
2326    int i;
2327    int n;
2328    int count = 0;
2329    int atomCount = 0;
2330    const Torrent * t = tor->torrentPeers;
2331    struct peer_atom ** atoms = NULL;
2332    tr_pex * pex;
2333    tr_pex * walk;
2334
2335    assert (tr_isTorrent (tor));
2336    assert (setme_pex != NULL);
2337    assert (af==TR_AF_INET || af==TR_AF_INET6);
2338    assert (list_mode==TR_PEERS_CONNECTED || list_mode==TR_PEERS_INTERESTING);
2339
2340    managerLock (t->manager);
2341
2342    /**
2343    ***  build a list of atoms
2344    **/
2345
2346    if (list_mode == TR_PEERS_CONNECTED) /* connected peers only */
2347    {
2348        int i;
2349        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase (&t->peers);
2350        atomCount = tr_ptrArraySize (&t->peers);
2351        atoms = tr_new (struct peer_atom *, atomCount);
2352        for (i=0; i<atomCount; ++i)
2353            atoms[i] = peers[i]->atom;
2354    }
2355    else /* TR_PEERS_INTERESTING */
2356    {
2357        int i;
2358        struct peer_atom ** atomBase = (struct peer_atom**) tr_ptrArrayBase (&t->pool);
2359        n = tr_ptrArraySize (&t->pool);
2360        atoms = tr_new (struct peer_atom *, n);
2361        for (i=0; i<n; ++i)
2362            if (isAtomInteresting (tor, atomBase[i]))
2363                atoms[atomCount++] = atomBase[i];
2364    }
2365
2366    qsort (atoms, atomCount, sizeof (struct peer_atom *), compareAtomsByUsefulness);
2367
2368    /**
2369    ***  add the first N of them into our return list
2370    **/
2371
2372    n = MIN (atomCount, maxCount);
2373    pex = walk = tr_new0 (tr_pex, n);
2374
2375    for (i=0; i<atomCount && count<n; ++i)
2376    {
2377        const struct peer_atom * atom = atoms[i];
2378        if (atom->addr.type == af)
2379        {
2380            assert (tr_address_is_valid (&atom->addr));
2381            walk->addr = atom->addr;
2382            walk->port = atom->port;
2383            walk->flags = atom->flags;
2384            ++count;
2385            ++walk;
2386        }
2387    }
2388
2389    qsort (pex, count, sizeof (tr_pex), tr_pexCompare);
2390
2391    assert ((walk - pex) == count);
2392    *setme_pex = pex;
2393
2394    /* cleanup */
2395    tr_free (atoms);
2396    managerUnlock (t->manager);
2397    return count;
2398}
2399
2400static void atomPulse    (int, short, void *);
2401static void bandwidthPulse (int, short, void *);
2402static void rechokePulse (int, short, void *);
2403static void reconnectPulse (int, short, void *);
2404
2405static struct event *
2406createTimer (tr_session * session, int msec, void (*callback)(int, short, void *), void * cbdata)
2407{
2408    struct event * timer = evtimer_new (session->event_base, callback, cbdata);
2409    tr_timerAddMsec (timer, msec);
2410    return timer;
2411}
2412
2413static void
2414ensureMgrTimersExist (struct tr_peerMgr * m)
2415{
2416    if (m->atomTimer == NULL)
2417        m->atomTimer = createTimer (m->session, ATOM_PERIOD_MSEC, atomPulse, m);
2418
2419    if (m->bandwidthTimer == NULL)
2420        m->bandwidthTimer = createTimer (m->session, BANDWIDTH_PERIOD_MSEC, bandwidthPulse, m);
2421
2422    if (m->rechokeTimer == NULL)
2423        m->rechokeTimer = createTimer (m->session, RECHOKE_PERIOD_MSEC, rechokePulse, m);
2424
2425    if (m->refillUpkeepTimer == NULL)
2426        m->refillUpkeepTimer = createTimer (m->session, REFILL_UPKEEP_PERIOD_MSEC, refillUpkeep, m);
2427}
2428
2429void
2430tr_peerMgrStartTorrent (tr_torrent * tor)
2431{
2432    Torrent * t = tor->torrentPeers;
2433
2434    assert (tr_isTorrent (tor));
2435    assert (tr_torrentIsLocked (tor));
2436
2437    ensureMgrTimersExist (t->manager);
2438
2439    t->isRunning = true;
2440    t->maxPeers = t->tor->maxConnectedPeers;
2441    t->pieceSortState = PIECES_UNSORTED;
2442
2443    rechokePulse (0, 0, t->manager);
2444}
2445
2446static void
2447stopTorrent (Torrent * t)
2448{
2449    tr_peer * peer;
2450
2451    t->isRunning = false;
2452
2453    replicationFree (t);
2454    invalidatePieceSorting (t);
2455
2456    /* disconnect the peers. */
2457    while ((peer = tr_ptrArrayPop (&t->peers)))
2458        peerDelete (t, peer);
2459
2460    /* disconnect the handshakes. handshakeAbort calls handshakeDoneCB (),
2461     * which removes the handshake from t->outgoingHandshakes... */
2462    while (!tr_ptrArrayEmpty (&t->outgoingHandshakes))
2463        tr_handshakeAbort (tr_ptrArrayNth (&t->outgoingHandshakes, 0));
2464}
2465
2466void
2467tr_peerMgrStopTorrent (tr_torrent * tor)
2468{
2469    assert (tr_isTorrent (tor));
2470    assert (tr_torrentIsLocked (tor));
2471
2472    stopTorrent (tor->torrentPeers);
2473}
2474
2475void
2476tr_peerMgrAddTorrent (tr_peerMgr * manager, tr_torrent * tor)
2477{
2478    assert (tr_isTorrent (tor));
2479    assert (tr_torrentIsLocked (tor));
2480    assert (tor->torrentPeers == NULL);
2481
2482    tor->torrentPeers = torrentNew (manager, tor);
2483}
2484
2485void
2486tr_peerMgrRemoveTorrent (tr_torrent * tor)
2487{
2488    assert (tr_isTorrent (tor));
2489    assert (tr_torrentIsLocked (tor));
2490
2491    stopTorrent (tor->torrentPeers);
2492    torrentFree (tor->torrentPeers);
2493}
2494
2495void
2496tr_peerUpdateProgress (tr_torrent * tor, tr_peer * peer)
2497{
2498    const tr_bitfield * have = &peer->have;
2499
2500    if (tr_bitfieldHasAll (have))
2501    {
2502        peer->progress = 1.0;
2503    }
2504    else if (tr_bitfieldHasNone (have))
2505    {
2506        peer->progress = 0.0;
2507    }
2508    else
2509    {
2510        const float true_count = tr_bitfieldCountTrueBits (have);
2511
2512        if (tr_torrentHasMetadata (tor))
2513        {
2514            peer->progress = true_count / tor->info.pieceCount;
2515        }
2516        else /* without pieceCount, this result is only a best guess... */
2517        {
2518            peer->progress = true_count / (have->bit_count + 1);
2519        }
2520    }
2521
2522    /* clamp the progress range */
2523    if (peer->progress < 0.0)
2524        peer->progress = 0.0;
2525    if (peer->progress > 1.0)
2526        peer->progress = 1.0;
2527
2528    if (peer->atom && (peer->progress >= 1.0))
2529        atomSetSeed (tor->torrentPeers, peer->atom);
2530}
2531
2532void
2533tr_peerMgrOnTorrentGotMetainfo (tr_torrent * tor)
2534{
2535    int i;
2536    int peerCount;
2537    tr_peer ** peers;
2538
2539    /* the webseed list may have changed... */
2540    rebuildWebseedArray (tor->torrentPeers, tor);
2541
2542    /* some peer_msgs' progress fields may not be accurate if we
2543       didn't have the metadata before now... so refresh them all... */
2544    peerCount = tr_ptrArraySize (&tor->torrentPeers->peers);
2545    peers = (tr_peer**) tr_ptrArrayBase (&tor->torrentPeers->peers);
2546    for (i=0; i<peerCount; ++i)
2547        tr_peerUpdateProgress (tor, peers[i]);
2548
2549}
2550
2551void
2552tr_peerMgrTorrentAvailability (const tr_torrent * tor, int8_t * tab, unsigned int tabCount)
2553{
2554    assert (tr_isTorrent (tor));
2555    assert (torrentIsLocked (tor->torrentPeers));
2556    assert (tab != NULL);
2557    assert (tabCount > 0);
2558
2559    memset (tab, 0, tabCount);
2560
2561    if (tr_torrentHasMetadata (tor))
2562    {
2563        tr_piece_index_t i;
2564        const int peerCount = tr_ptrArraySize (&tor->torrentPeers->peers);
2565        const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase (&tor->torrentPeers->peers);
2566        const float interval = tor->info.pieceCount / (float)tabCount;
2567        const bool isSeed = tr_cpGetStatus (&tor->completion) == TR_SEED;
2568
2569        for (i=0; i<tabCount; ++i)
2570        {
2571            const int piece = i * interval;
2572
2573            if (isSeed || tr_cpPieceIsComplete (&tor->completion, piece))
2574                tab[i] = -1;
2575            else if (peerCount) {
2576                int j;
2577                for (j=0; j<peerCount; ++j)
2578                    if (tr_bitfieldHas (&peers[j]->have, piece))
2579                        ++tab[i];
2580            }
2581        }
2582    }
2583}
2584
2585static bool
2586peerIsSeed (const tr_peer * peer)
2587{
2588    if (peer->progress >= 1.0)
2589        return true;
2590
2591    if (peer->atom && atomIsSeed (peer->atom))
2592        return true;
2593
2594    return false;
2595}
2596
2597/* count how many bytes we want that connected peers have */
2598uint64_t
2599tr_peerMgrGetDesiredAvailable (const tr_torrent * tor)
2600{
2601    size_t i;
2602    size_t n;
2603    uint64_t desiredAvailable;
2604    const Torrent * t = tor->torrentPeers;
2605
2606    /* common shortcuts... */
2607
2608    if (tr_torrentIsSeed (t->tor))
2609        return 0;
2610
2611    if (!tr_torrentHasMetadata (tor))
2612        return 0;
2613
2614    n = tr_ptrArraySize (&t->peers);
2615    if (n == 0)
2616        return 0;
2617    else {
2618        const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase (&t->peers);
2619        for (i=0; i<n; ++i)
2620            if (peers[i]->atom && atomIsSeed (peers[i]->atom))
2621                return tr_cpLeftUntilDone (&tor->completion);
2622    }
2623
2624    if (!t->pieceReplication || !t->pieceReplicationSize)
2625        return 0;
2626
2627    /* do it the hard way */
2628
2629    desiredAvailable = 0;
2630    for (i=0, n=MIN (tor->info.pieceCount, t->pieceReplicationSize); i<n; ++i)
2631        if (!tor->info.pieces[i].dnd && (t->pieceReplication[i] > 0))
2632            desiredAvailable += tr_cpMissingBytesInPiece (&t->tor->completion, i);
2633
2634    assert (desiredAvailable <= tor->info.totalSize);
2635    return desiredAvailable;
2636}
2637
2638void
2639tr_peerMgrTorrentStats (tr_torrent  * tor,
2640                        int         * setmePeersConnected,
2641                        int         * setmeWebseedsSendingToUs,
2642                        int         * setmePeersSendingToUs,
2643                        int         * setmePeersGettingFromUs,
2644                        int         * setmePeersFrom)
2645{
2646    int i, size;
2647    const Torrent * t = tor->torrentPeers;
2648    const tr_peer ** peers;
2649
2650    assert (tr_torrentIsLocked (tor));
2651
2652    peers = (const tr_peer **) tr_ptrArrayBase (&t->peers);
2653    size = tr_ptrArraySize (&t->peers);
2654
2655    *setmePeersConnected       = 0;
2656    *setmePeersGettingFromUs   = 0;
2657    *setmePeersSendingToUs     = 0;
2658    *setmeWebseedsSendingToUs  = 0;
2659
2660    for (i=0; i<TR_PEER_FROM__MAX; ++i)
2661        setmePeersFrom[i] = 0;
2662
2663    for (i=0; i<size; ++i)
2664    {
2665        const tr_peer * peer = peers[i];
2666        const struct peer_atom * atom = peer->atom;
2667
2668        if (peer->io == NULL) /* not connected */
2669            continue;
2670
2671        ++*setmePeersConnected;
2672
2673        ++setmePeersFrom[atom->fromFirst];
2674
2675        if (clientIsDownloadingFrom (tor, peer))
2676            ++*setmePeersSendingToUs;
2677
2678        if (clientIsUploadingTo (peer))
2679            ++*setmePeersGettingFromUs;
2680    }
2681
2682    *setmeWebseedsSendingToUs = countActiveWebseeds (t);
2683}
2684
2685double*
2686tr_peerMgrWebSpeeds_KBps (const tr_torrent * tor)
2687{
2688    int i;
2689    const Torrent * t = tor->torrentPeers;
2690    const int webseedCount = tr_ptrArraySize (&t->webseeds);
2691    const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase (&t->webseeds);
2692    const uint64_t now = tr_time_msec ();
2693    double * ret = tr_new0 (double, webseedCount);
2694
2695    assert (tr_isTorrent (tor));
2696    assert (tr_torrentIsLocked (tor));
2697    assert (t->manager != NULL);
2698    assert (webseedCount == tor->info.webseedCount);
2699
2700    for (i=0; i<webseedCount; ++i) {
2701        unsigned int Bps;
2702        if (tr_webseedGetSpeed_Bps (webseeds[i], now, &Bps))
2703            ret[i] = Bps / (double)tr_speed_K;
2704        else
2705            ret[i] = -1.0;
2706    }
2707
2708    return ret;
2709}
2710
2711unsigned int
2712tr_peerGetPieceSpeed_Bps (const tr_peer * peer, uint64_t now, tr_direction direction)
2713{
2714    return peer->io ? tr_peerIoGetPieceSpeed_Bps (peer->io, now, direction) : 0.0;
2715}
2716
2717struct tr_peer_stat *
2718tr_peerMgrPeerStats (const tr_torrent * tor, int * setmeCount)
2719{
2720    int i;
2721    const Torrent * t = tor->torrentPeers;
2722    const int size = tr_ptrArraySize (&t->peers);
2723    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase (&t->peers);
2724    const uint64_t now_msec = tr_time_msec ();
2725    const time_t now = tr_time ();
2726    tr_peer_stat * ret = tr_new0 (tr_peer_stat, size);
2727
2728    assert (tr_isTorrent (tor));
2729    assert (tr_torrentIsLocked (tor));
2730    assert (t->manager);
2731
2732    for (i=0; i<size; ++i)
2733    {
2734        char *                   pch;
2735        const tr_peer *          peer = peers[i];
2736        const struct peer_atom * atom = peer->atom;
2737        tr_peer_stat *           stat = ret + i;
2738
2739        tr_address_to_string_with_buf (&atom->addr, stat->addr, sizeof (stat->addr));
2740        tr_strlcpy (stat->client, (peer->client ? peer->client : ""),
2741                   sizeof (stat->client));
2742        stat->port                = ntohs (peer->atom->port);
2743        stat->from                = atom->fromFirst;
2744        stat->progress            = peer->progress;
2745        stat->isUTP               = peer->io->utp_socket != NULL;
2746        stat->isEncrypted         = tr_peerIoIsEncrypted (peer->io) ? 1 : 0;
2747        stat->rateToPeer_KBps     = toSpeedKBps (tr_peerGetPieceSpeed_Bps (peer, now_msec, TR_CLIENT_TO_PEER));
2748        stat->rateToClient_KBps   = toSpeedKBps (tr_peerGetPieceSpeed_Bps (peer, now_msec, TR_PEER_TO_CLIENT));
2749        stat->peerIsChoked        = peer->peerIsChoked;
2750        stat->peerIsInterested    = peer->peerIsInterested;
2751        stat->clientIsChoked      = peer->clientIsChoked;
2752        stat->clientIsInterested  = peer->clientIsInterested;
2753        stat->isIncoming          = tr_peerIoIsIncoming (peer->io);
2754        stat->isDownloadingFrom   = clientIsDownloadingFrom (tor, peer);
2755        stat->isUploadingTo       = clientIsUploadingTo (peer);
2756        stat->isSeed              = peerIsSeed (peer);
2757
2758        stat->blocksToPeer        = tr_historyGet (&peer->blocksSentToPeer,    now, CANCEL_HISTORY_SEC);
2759        stat->blocksToClient      = tr_historyGet (&peer->blocksSentToClient,  now, CANCEL_HISTORY_SEC);
2760        stat->cancelsToPeer       = tr_historyGet (&peer->cancelsSentToPeer,   now, CANCEL_HISTORY_SEC);
2761        stat->cancelsToClient     = tr_historyGet (&peer->cancelsSentToClient, now, CANCEL_HISTORY_SEC);
2762
2763        stat->pendingReqsToPeer   = peer->pendingReqsToPeer;
2764        stat->pendingReqsToClient = peer->pendingReqsToClient;
2765
2766        pch = stat->flagStr;
2767        if (stat->isUTP) *pch++ = 'T';
2768        if (t->optimistic == peer) *pch++ = 'O';
2769        if (stat->isDownloadingFrom) *pch++ = 'D';
2770        else if (stat->clientIsInterested) *pch++ = 'd';
2771        if (stat->isUploadingTo) *pch++ = 'U';
2772        else if (stat->peerIsInterested) *pch++ = 'u';
2773        if (!stat->clientIsChoked && !stat->clientIsInterested) *pch++ = 'K';
2774        if (!stat->peerIsChoked && !stat->peerIsInterested) *pch++ = '?';
2775        if (stat->isEncrypted) *pch++ = 'E';
2776        if (stat->from == TR_PEER_FROM_DHT) *pch++ = 'H';
2777        else if (stat->from == TR_PEER_FROM_PEX) *pch++ = 'X';
2778        if (stat->isIncoming) *pch++ = 'I';
2779        *pch = '\0';
2780    }
2781
2782    *setmeCount = size;
2783
2784    return ret;
2785}
2786
2787/***
2788****
2789****
2790***/
2791
2792void
2793tr_peerMgrClearInterest (tr_torrent * tor)
2794{
2795    int i;
2796    Torrent * t = tor->torrentPeers;
2797    const int peerCount = tr_ptrArraySize (&t->peers);
2798
2799    assert (tr_isTorrent (tor));
2800    assert (tr_torrentIsLocked (tor));
2801
2802    for (i=0; i<peerCount; ++i)
2803    {
2804        const tr_peer * peer = tr_ptrArrayNth (&t->peers, i);
2805        tr_peerMsgsSetInterested (peer->msgs, false);
2806    }
2807}
2808
2809/* does this peer have any pieces that we want? */
2810static bool
2811isPeerInteresting (const tr_torrent  * const tor,
2812                   const bool        * const piece_is_interesting,
2813                   const tr_peer     * const peer)
2814{
2815    tr_piece_index_t i, n;
2816
2817    /* these cases should have already been handled by the calling code... */
2818    assert (!tr_torrentIsSeed (tor));
2819    assert (tr_torrentIsPieceTransferAllowed (tor, TR_PEER_TO_CLIENT));
2820
2821    if (peerIsSeed (peer))
2822        return true;
2823
2824    for (i=0, n=tor->info.pieceCount; i<n; ++i)
2825        if (piece_is_interesting[i] && tr_bitfieldHas (&peer->have, i))
2826            return true;
2827
2828    return false;
2829}
2830
2831typedef enum
2832{
2833    RECHOKE_STATE_GOOD,
2834    RECHOKE_STATE_UNTESTED,
2835    RECHOKE_STATE_BAD
2836}
2837tr_rechoke_state;
2838
2839struct tr_rechoke_info
2840{
2841    tr_peer * peer;
2842    int salt;
2843    int rechoke_state;
2844};
2845
2846static int
2847compare_rechoke_info (const void * va, const void * vb)
2848{
2849    const struct tr_rechoke_info * a = va;
2850    const struct tr_rechoke_info * b = vb;
2851
2852    if (a->rechoke_state != b->rechoke_state)
2853        return a->rechoke_state - b->rechoke_state;
2854
2855    return a->salt - b->salt;
2856}
2857
2858/* determines who we send "interested" messages to */
2859static void
2860rechokeDownloads (Torrent * t)
2861{
2862    int i;
2863    int maxPeers = 0;
2864    int rechoke_count = 0;
2865    struct tr_rechoke_info * rechoke = NULL;
2866    const int MIN_INTERESTING_PEERS = 5;
2867    const int peerCount = tr_ptrArraySize (&t->peers);
2868    const time_t now = tr_time ();
2869
2870    /* some cases where this function isn't necessary */
2871    if (tr_torrentIsSeed (t->tor))
2872        return;
2873    if (!tr_torrentIsPieceTransferAllowed (t->tor, TR_PEER_TO_CLIENT))
2874        return;
2875
2876    /* decide HOW MANY peers to be interested in */
2877    {
2878        int blocks = 0;
2879        int cancels = 0;
2880        time_t timeSinceCancel;
2881
2882        /* Count up how many blocks & cancels each peer has.
2883         *
2884         * There are two situations where we send out cancels --
2885         *
2886         * 1. We've got unresponsive peers, which is handled by deciding
2887         *    -which- peers to be interested in.
2888         *
2889         * 2. We've hit our bandwidth cap, which is handled by deciding
2890         *    -how many- peers to be interested in.
2891         *
2892         * We're working on 2. here, so we need to ignore unresponsive
2893         * peers in our calculations lest they confuse Transmission into
2894         * thinking it's hit its bandwidth cap.
2895         */
2896        for (i=0; i<peerCount; ++i)
2897        {
2898            const tr_peer * peer = tr_ptrArrayNth (&t->peers, i);
2899            const int b = tr_historyGet (&peer->blocksSentToClient, now, CANCEL_HISTORY_SEC);
2900            const int c = tr_historyGet (&peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC);
2901
2902            if (b == 0) /* ignore unresponsive peers, as described above */
2903                continue;
2904
2905            blocks += b;
2906            cancels += c;
2907        }
2908
2909        if (cancels > 0)
2910        {
2911            /* cancelRate: of the block requests we've recently made, the percentage we cancelled.
2912             * higher values indicate more congestion. */
2913            const double cancelRate = cancels / (double)(cancels + blocks);
2914            const double mult = 1 - MIN (cancelRate, 0.5);
2915            maxPeers = t->interestedCount * mult;
2916            tordbg (t, "cancel rate is %.3f -- reducing the "
2917                       "number of peers we're interested in by %.0f percent",
2918                       cancelRate, mult * 100);
2919            t->lastCancel = now;
2920        }
2921
2922        timeSinceCancel = now - t->lastCancel;
2923        if (timeSinceCancel)
2924        {
2925            const int maxIncrease = 15;
2926            const time_t maxHistory = 2 * CANCEL_HISTORY_SEC;
2927            const double mult = MIN (timeSinceCancel, maxHistory) / (double) maxHistory;
2928            const int inc = maxIncrease * mult;
2929            maxPeers = t->maxPeers + inc;
2930            tordbg (t, "time since last cancel is %li -- increasing the "
2931                       "number of peers we're interested in by %d",
2932                       timeSinceCancel, inc);
2933        }
2934    }
2935
2936    /* don't let the previous section's number tweaking go too far... */
2937    if (maxPeers < MIN_INTERESTING_PEERS)
2938        maxPeers = MIN_INTERESTING_PEERS;
2939    if (maxPeers > t->tor->maxConnectedPeers)
2940        maxPeers = t->tor->maxConnectedPeers;
2941
2942    t->maxPeers = maxPeers;
2943
2944    if (peerCount > 0)
2945    {
2946        bool * piece_is_interesting;
2947        const tr_torrent * const tor = t->tor;
2948        const int n = tor->info.pieceCount;
2949
2950        /* build a bitfield of interesting pieces... */
2951        piece_is_interesting = tr_new (bool, n);
2952        for (i=0; i<n; i++)
2953            piece_is_interesting[i] = !tor->info.pieces[i].dnd && !tr_cpPieceIsComplete (&tor->completion, i);
2954
2955        /* decide WHICH peers to be interested in (based on their cancel-to-block ratio) */
2956        for (i=0; i<peerCount; ++i)
2957        {
2958            tr_peer * peer = tr_ptrArrayNth (&t->peers, i);
2959
2960            if (!isPeerInteresting (t->tor, piece_is_interesting, peer))
2961            {
2962                tr_peerMsgsSetInterested (peer->msgs, false);
2963            }
2964            else
2965            {
2966                tr_rechoke_state rechoke_state;
2967                const int blocks = tr_historyGet (&peer->blocksSentToClient, now, CANCEL_HISTORY_SEC);
2968                const int cancels = tr_historyGet (&peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC);
2969
2970                if (!blocks && !cancels)
2971                    rechoke_state = RECHOKE_STATE_UNTESTED;
2972                else if (!cancels)
2973                    rechoke_state = RECHOKE_STATE_GOOD;
2974                else if (!blocks)
2975                    rechoke_state = RECHOKE_STATE_BAD;
2976                else if ((cancels * 10) < blocks)
2977                    rechoke_state = RECHOKE_STATE_GOOD;
2978                else
2979                    rechoke_state = RECHOKE_STATE_BAD;
2980
2981                if (rechoke == NULL)
2982                    rechoke = tr_new (struct tr_rechoke_info, peerCount);
2983
2984                 rechoke[rechoke_count].peer = peer;
2985                 rechoke[rechoke_count].rechoke_state = rechoke_state;
2986                 rechoke[rechoke_count].salt = tr_cryptoWeakRandInt (INT_MAX);
2987                 rechoke_count++;
2988            }
2989
2990        }
2991
2992        tr_free (piece_is_interesting);
2993    }
2994
2995    /* now that we know which & how many peers to be interested in... update the peer interest */
2996    qsort (rechoke, rechoke_count, sizeof (struct tr_rechoke_info), compare_rechoke_info);
2997    t->interestedCount = MIN (maxPeers, rechoke_count);
2998    for (i=0; i<rechoke_count; ++i)
2999        tr_peerMsgsSetInterested (rechoke[i].peer->msgs, i<t->interestedCount);
3000
3001    /* cleanup */
3002    tr_free (rechoke);
3003}
3004
3005/**
3006***
3007**/
3008
3009struct ChokeData
3010{
3011    bool            isInterested;
3012    bool            wasChoked;
3013    bool            isChoked;
3014    int             rate;
3015    int             salt;
3016    tr_peer *       peer;
3017};
3018
3019static int
3020compareChoke (const void * va, const void * vb)
3021{
3022    const struct ChokeData * a = va;
3023    const struct ChokeData * b = vb;
3024
3025    if (a->rate != b->rate) /* prefer higher overall speeds */
3026        return a->rate > b->rate ? -1 : 1;
3027
3028    if (a->wasChoked != b->wasChoked) /* prefer unchoked */
3029        return a->wasChoked ? 1 : -1;
3030
3031    if (a->salt != b->salt) /* random order */
3032        return a->salt - b->salt;
3033
3034    return 0;
3035}
3036
3037/* is this a new connection? */
3038static int
3039isNew (const tr_peer * peer)
3040{
3041    return peer && peer->io && tr_peerIoGetAge (peer->io) < 45;
3042}
3043
3044/* get a rate for deciding which peers to choke and unchoke. */
3045static int
3046getRate (const tr_torrent * tor, struct peer_atom * atom, uint64_t now)
3047{
3048    unsigned int Bps;
3049
3050    if (tr_torrentIsSeed (tor))
3051        Bps = tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_CLIENT_TO_PEER);
3052
3053    /* downloading a private torrent... take upload speed into account
3054     * because there may only be a small window of opportunity to share */
3055    else if (tr_torrentIsPrivate (tor))
3056        Bps = tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_PEER_TO_CLIENT)
3057            + tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_CLIENT_TO_PEER);
3058
3059    /* downloading a public torrent */
3060    else
3061        Bps = tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_PEER_TO_CLIENT);
3062
3063    /* convert it to bytes per second */
3064    return Bps;
3065}
3066
3067static inline bool
3068isBandwidthMaxedOut (const tr_bandwidth * b,
3069                     const uint64_t now_msec, tr_direction dir)
3070{
3071    if (!tr_bandwidthIsLimited (b, dir))
3072        return false;
3073    else {
3074        const unsigned int got = tr_bandwidthGetPieceSpeed_Bps (b, now_msec, dir);
3075        const unsigned int want = tr_bandwidthGetDesiredSpeed_Bps (b, dir);
3076        return got >= want;
3077    }
3078}
3079
3080static void
3081rechokeUploads (Torrent * t, const uint64_t now)
3082{
3083    int i, size, unchokedInterested;
3084    const int peerCount = tr_ptrArraySize (&t->peers);
3085    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase (&t->peers);
3086    struct ChokeData * choke = tr_new0 (struct ChokeData, peerCount);
3087    const tr_session * session = t->manager->session;
3088    const int chokeAll = !tr_torrentIsPieceTransferAllowed (t->tor, TR_CLIENT_TO_PEER);
3089    const bool isMaxedOut = isBandwidthMaxedOut (&t->tor->bandwidth, now, TR_UP);
3090
3091    assert (torrentIsLocked (t));
3092
3093    /* an optimistic unchoke peer's "optimistic"
3094     * state lasts for N calls to rechokeUploads (). */
3095    if (t->optimisticUnchokeTimeScaler > 0)
3096        t->optimisticUnchokeTimeScaler--;
3097    else
3098        t->optimistic = NULL;
3099
3100    /* sort the peers by preference and rate */
3101    for (i = 0, size = 0; i < peerCount; ++i)
3102    {
3103        tr_peer * peer = peers[i];
3104        struct peer_atom * atom = peer->atom;
3105
3106        if (peerIsSeed (peer)) /* choke seeds and partial seeds */
3107        {
3108            tr_peerMsgsSetChoke (peer->msgs, true);
3109        }
3110        else if (chokeAll) /* choke everyone if we're not uploading */
3111        {
3112            tr_peerMsgsSetChoke (peer->msgs, true);
3113        }
3114        else if (peer != t->optimistic)
3115        {
3116            struct ChokeData * n = &choke[size++];
3117            n->peer         = peer;
3118            n->isInterested = peer->peerIsInterested;
3119            n->wasChoked    = peer->peerIsChoked;
3120            n->rate         = getRate (t->tor, atom, now);
3121            n->salt         = tr_cryptoWeakRandInt (INT_MAX);
3122            n->isChoked     = true;
3123        }
3124    }
3125
3126    qsort (choke, size, sizeof (struct ChokeData), compareChoke);
3127
3128    /**
3129     * Reciprocation and number of uploads capping is managed by unchoking
3130     * the N peers which have the best upload rate and are interested.
3131     * This maximizes the client's download rate. These N peers are
3132     * referred to as downloaders, because they are interested in downloading
3133     * from the client.
3134     *
3135     * Peers which have a better upload rate (as compared to the downloaders)
3136     * but aren't interested get unchoked. If they become interested, the
3137     * downloader with the worst upload rate gets choked. If a client has
3138     * a complete file, it uses its upload rate rather than its download
3139     * rate to decide which peers to unchoke.
3140     *
3141     * If our bandwidth is maxed out, don't unchoke any more peers.
3142     */
3143    unchokedInterested = 0;
3144    for (i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i) {
3145        choke[i].isChoked = isMaxedOut ? choke[i].wasChoked : false;
3146        if (choke[i].isInterested)
3147            ++unchokedInterested;
3148    }
3149
3150    /* optimistic unchoke */
3151    if (!t->optimistic && !isMaxedOut && (i<size))
3152    {
3153        int n;
3154        struct ChokeData * c;
3155        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
3156
3157        for (; i<size; ++i)
3158        {
3159            if (choke[i].isInterested)
3160            {
3161                const tr_peer * peer = choke[i].peer;
3162                int x = 1, y;
3163                if (isNew (peer)) x *= 3;
3164                for (y=0; y<x; ++y)
3165                    tr_ptrArrayAppend (&randPool, &choke[i]);
3166            }
3167        }
3168
3169        if ((n = tr_ptrArraySize (&randPool)))
3170        {
3171            c = tr_ptrArrayNth (&randPool, tr_cryptoWeakRandInt (n));
3172            c->isChoked = false;
3173            t->optimistic = c->peer;
3174            t->optimisticUnchokeTimeScaler = OPTIMISTIC_UNCHOKE_MULTIPLIER;
3175        }
3176
3177        tr_ptrArrayDestruct (&randPool, NULL);
3178    }
3179
3180    for (i=0; i<size; ++i)
3181        tr_peerMsgsSetChoke (choke[i].peer->msgs, choke[i].isChoked);
3182
3183    /* cleanup */
3184    tr_free (choke);
3185}
3186
3187static void
3188rechokePulse (int foo UNUSED, short bar UNUSED, void * vmgr)
3189{
3190    tr_torrent * tor = NULL;
3191    tr_peerMgr * mgr = vmgr;
3192    const uint64_t now = tr_time_msec ();
3193
3194    managerLock (mgr);
3195
3196    while ((tor = tr_torrentNext (mgr->session, tor))) {
3197        if (tor->isRunning) {
3198            Torrent * t = tor->torrentPeers;
3199            if (!tr_ptrArrayEmpty (&t->peers)) {
3200                rechokeUploads (t, now);
3201                rechokeDownloads (t);
3202            }
3203        }
3204    }
3205
3206    tr_timerAddMsec (mgr->rechokeTimer, RECHOKE_PERIOD_MSEC);
3207    managerUnlock (mgr);
3208}
3209
3210/***
3211****
3212****  Life and Death
3213****
3214***/
3215
3216static bool
3217shouldPeerBeClosed (const Torrent    * t,
3218                    const tr_peer    * peer,
3219                    int                peerCount,
3220                    const time_t       now)
3221{
3222    const tr_torrent *       tor = t->tor;
3223    const struct peer_atom * atom = peer->atom;
3224
3225    /* if it's marked for purging, close it */
3226    if (peer->doPurge)
3227    {
3228        tordbg (t, "purging peer %s because its doPurge flag is set",
3229                tr_atomAddrStr (atom));
3230        return true;
3231    }
3232
3233    /* disconnect if we're both seeds and enough time has passed for PEX */
3234    if (tr_torrentIsSeed (tor) && peerIsSeed (peer))
3235        return !tr_torrentAllowsPex (tor) || (now-atom->time>=30);
3236
3237    /* disconnect if it's been too long since piece data has been transferred.
3238     * this is on a sliding scale based on number of available peers... */
3239    {
3240        const int relaxStrictnessIfFewerThanN = (int)((getMaxPeerCount (tor) * 0.9) + 0.5);
3241        /* if we have >= relaxIfFewerThan, strictness is 100%.
3242         * if we have zero connections, strictness is 0% */
3243        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
3244                               ? 1.0
3245                               : peerCount / (float)relaxStrictnessIfFewerThanN;
3246        const int lo = MIN_UPLOAD_IDLE_SECS;
3247        const int hi = MAX_UPLOAD_IDLE_SECS;
3248        const int limit = hi - ((hi - lo) * strictness);
3249        const int idleTime = now - MAX (atom->time, atom->piece_data_time);
3250/*fprintf (stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit);*/
3251        if (idleTime > limit) {
3252            tordbg (t, "purging peer %s because it's been %d secs since we shared anything",
3253                       tr_atomAddrStr (atom), idleTime);
3254            return true;
3255        }
3256    }
3257
3258    return false;
3259}
3260
3261static tr_peer **
3262getPeersToClose (Torrent * t, const time_t now_sec, int * setmeSize)
3263{
3264    int i, peerCount, outsize;
3265    struct tr_peer ** ret = NULL;
3266    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek (&t->peers, &peerCount);
3267
3268    assert (torrentIsLocked (t));
3269
3270    for (i = outsize = 0; i < peerCount; ++i) {
3271        if (shouldPeerBeClosed (t, peers[i], peerCount, now_sec)) {
3272            if (ret == NULL)
3273                ret = tr_new (tr_peer *, peerCount);
3274            ret[outsize++] = peers[i];
3275        }
3276    }
3277
3278    *setmeSize = outsize;
3279    return ret;
3280}
3281
3282static int
3283getReconnectIntervalSecs (const struct peer_atom * atom, const time_t now)
3284{
3285    int sec;
3286
3287    /* if we were recently connected to this peer and transferring piece
3288     * data, try to reconnect to them sooner rather that later -- we don't
3289     * want network troubles to get in the way of a good peer. */
3290    if ((now - atom->piece_data_time) <= (MINIMUM_RECONNECT_INTERVAL_SECS * 2))
3291        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3292
3293    /* don't allow reconnects more often than our minimum */
3294    else if ((now - atom->time) < MINIMUM_RECONNECT_INTERVAL_SECS)
3295        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3296
3297    /* otherwise, the interval depends on how many times we've tried
3298     * and failed to connect to the peer */
3299    else switch (atom->numFails) {
3300        case 0: sec = 0; break;
3301        case 1: sec = 5; break;
3302        case 2: sec = 2 * 60; break;
3303        case 3: sec = 15 * 60; break;
3304        case 4: sec = 30 * 60; break;
3305        case 5: sec = 60 * 60; break;
3306        default: sec = 120 * 60; break;
3307    }
3308
3309    /* penalize peers that were unreachable the last time we tried */
3310    if (atom->flags2 & MYFLAG_UNREACHABLE)
3311        sec += sec;
3312
3313    dbgmsg ("reconnect interval for %s is %d seconds", tr_atomAddrStr (atom), sec);
3314    return sec;
3315}
3316
3317static void
3318removePeer (Torrent * t, tr_peer * peer)
3319{
3320    tr_peer * removed;
3321    struct peer_atom * atom = peer->atom;
3322
3323    assert (torrentIsLocked (t));
3324    assert (atom);
3325
3326    atom->time = tr_time ();
3327
3328    removed = tr_ptrArrayRemoveSorted (&t->peers, peer, peerCompare);
3329
3330    if (replicationExists (t))
3331        tr_decrReplicationFromBitfield (t, &peer->have);
3332
3333    assert (removed == peer);
3334    peerDelete (t, removed);
3335}
3336
3337static void
3338closePeer (Torrent * t, tr_peer * peer)
3339{
3340    struct peer_atom * atom;
3341
3342    assert (t != NULL);
3343    assert (peer != NULL);
3344
3345    atom = peer->atom;
3346
3347    /* if we transferred piece data, then they might be good peers,
3348       so reset their `numFails' weight to zero. otherwise we connected
3349       to them fruitlessly, so mark it as another fail */
3350    if (atom->piece_data_time) {
3351        tordbg (t, "resetting atom %s numFails to 0", tr_atomAddrStr (atom));
3352        atom->numFails = 0;
3353    } else {
3354        ++atom->numFails;
3355        tordbg (t, "incremented atom %s numFails to %d", tr_atomAddrStr (atom), (int)atom->numFails);
3356    }
3357
3358    tordbg (t, "removing bad peer %s", tr_peerIoGetAddrStr (peer->io));
3359    removePeer (t, peer);
3360}
3361
3362static void
3363removeAllPeers (Torrent * t)
3364{
3365    while (!tr_ptrArrayEmpty (&t->peers))
3366        removePeer (t, tr_ptrArrayNth (&t->peers, 0));
3367}
3368
3369static void
3370closeBadPeers (Torrent * t, const time_t now_sec)
3371{
3372    if (!tr_ptrArrayEmpty (&t->peers))
3373    {
3374        int i;
3375        int peerCount;
3376        struct tr_peer ** peers = getPeersToClose (t, now_sec, &peerCount);
3377        for (i=0; i<peerCount; ++i)
3378            closePeer (t, peers[i]);
3379        tr_free (peers);
3380    }
3381}
3382
3383struct peer_liveliness
3384{
3385    tr_peer * peer;
3386    void * clientData;
3387    time_t pieceDataTime;
3388    time_t time;
3389    int speed;
3390    bool doPurge;
3391};
3392
3393static int
3394comparePeerLiveliness (const void * va, const void * vb)
3395{
3396    const struct peer_liveliness * a = va;
3397    const struct peer_liveliness * b = vb;
3398
3399    if (a->doPurge != b->doPurge)
3400        return a->doPurge ? 1 : -1;
3401
3402    if (a->speed != b->speed) /* faster goes first */
3403        return a->speed > b->speed ? -1 : 1;
3404
3405    /* the one to give us data more recently goes first */
3406    if (a->pieceDataTime != b->pieceDataTime)
3407        return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
3408
3409    /* the one we connected to most recently goes first */
3410    if (a->time != b->time)
3411        return a->time > b->time ? -1 : 1;
3412
3413    return 0;
3414}
3415
3416static void
3417sortPeersByLivelinessImpl (tr_peer  ** peers,
3418                           void     ** clientData,
3419                           int         n,
3420                           uint64_t    now,
3421                           int (*compare)(const void *va, const void *vb))
3422{
3423    int i;
3424    struct peer_liveliness *lives, *l;
3425
3426    /* build a sortable array of peer + extra info */
3427    lives = l = tr_new0 (struct peer_liveliness, n);
3428    for (i=0; i<n; ++i, ++l)
3429    {
3430        tr_peer * p = peers[i];
3431        l->peer = p;
3432        l->doPurge = p->doPurge;
3433        l->pieceDataTime = p->atom->piece_data_time;
3434        l->time = p->atom->time;
3435        l->speed = tr_peerGetPieceSpeed_Bps (p, now, TR_UP)
3436                 + tr_peerGetPieceSpeed_Bps (p, now, TR_DOWN);
3437        if (clientData)
3438            l->clientData = clientData[i];
3439    }
3440
3441    /* sort 'em */
3442    assert (n == (l - lives));
3443    qsort (lives, n, sizeof (struct peer_liveliness), compare);
3444
3445    /* build the peer array */
3446    for (i=0, l=lives; i<n; ++i, ++l) {
3447        peers[i] = l->peer;
3448        if (clientData)
3449            clientData[i] = l->clientData;
3450    }
3451    assert (n == (l - lives));
3452
3453    /* cleanup */
3454    tr_free (lives);
3455}
3456
3457static void
3458sortPeersByLiveliness (tr_peer ** peers, void ** clientData, int n, uint64_t now)
3459{
3460    sortPeersByLivelinessImpl (peers, clientData, n, now, comparePeerLiveliness);
3461}
3462
3463
3464static void
3465enforceTorrentPeerLimit (Torrent * t, uint64_t now)
3466{
3467    int n = tr_ptrArraySize (&t->peers);
3468    const int max = tr_torrentGetPeerLimit (t->tor);
3469    if (n > max)
3470    {
3471        void * base = tr_ptrArrayBase (&t->peers);
3472        tr_peer ** peers = tr_memdup (base, n*sizeof (tr_peer*));
3473        sortPeersByLiveliness (peers, NULL, n, now);
3474        while (n > max)
3475            closePeer (t, peers[--n]);
3476        tr_free (peers);
3477    }
3478}
3479
3480static void
3481enforceSessionPeerLimit (tr_session * session, uint64_t now)
3482{
3483    int n = 0;
3484    tr_torrent * tor = NULL;
3485    const int max = tr_sessionGetPeerLimit (session);
3486
3487    /* count the total number of peers */
3488    while ((tor = tr_torrentNext (session, tor)))
3489        n += tr_ptrArraySize (&tor->torrentPeers->peers);
3490
3491    /* if there are too many, prune out the worst */
3492    if (n > max)
3493    {
3494        tr_peer ** peers = tr_new (tr_peer*, n);
3495        Torrent ** torrents = tr_new (Torrent*, n);
3496
3497        /* populate the peer array */
3498        n = 0;
3499        tor = NULL;
3500        while ((tor = tr_torrentNext (session, tor))) {
3501            int i;
3502            Torrent * t = tor->torrentPeers;
3503            const int tn = tr_ptrArraySize (&t->peers);
3504            for (i=0; i<tn; ++i, ++n) {
3505                peers[n] = tr_ptrArrayNth (&t->peers, i);
3506                torrents[n] = t;
3507            }
3508        }
3509
3510        /* sort 'em */
3511        sortPeersByLiveliness (peers, (void**)torrents, n, now);
3512
3513        /* cull out the crappiest */
3514        while (n-- > max)
3515            closePeer (torrents[n], peers[n]);
3516
3517        /* cleanup */
3518        tr_free (torrents);
3519        tr_free (peers);
3520    }
3521}
3522
3523static void makeNewPeerConnections (tr_peerMgr * mgr, const int max);
3524
3525static void
3526reconnectPulse (int foo UNUSED, short bar UNUSED, void * vmgr)
3527{
3528    tr_torrent * tor;
3529    tr_peerMgr * mgr = vmgr;
3530    const time_t now_sec = tr_time ();
3531    const uint64_t now_msec = tr_time_msec ();
3532
3533    /**
3534    ***  enforce the per-session and per-torrent peer limits
3535    **/
3536
3537    /* if we're over the per-torrent peer limits, cull some peers */
3538    tor = NULL;
3539    while ((tor = tr_torrentNext (mgr->session, tor)))
3540        if (tor->isRunning)
3541            enforceTorrentPeerLimit (tor->torrentPeers, now_msec);
3542
3543    /* if we're over the per-session peer limits, cull some peers */
3544    enforceSessionPeerLimit (mgr->session, now_msec);
3545
3546    /* remove crappy peers */
3547    tor = NULL;
3548    while ((tor = tr_torrentNext (mgr->session, tor)))
3549        if (!tor->torrentPeers->isRunning)
3550            removeAllPeers (tor->torrentPeers);
3551        else
3552            closeBadPeers (tor->torrentPeers, now_sec);
3553
3554    /* try to make new peer connections */
3555    makeNewPeerConnections (mgr, MAX_CONNECTIONS_PER_PULSE);
3556}
3557
3558/****
3559*****
3560*****  BANDWIDTH ALLOCATION
3561*****
3562****/
3563
3564static void
3565pumpAllPeers (tr_peerMgr * mgr)
3566{
3567    tr_torrent * tor = NULL;
3568
3569    while ((tor = tr_torrentNext (mgr->session, tor)))
3570    {
3571        int j;
3572        Torrent * t = tor->torrentPeers;
3573
3574        for (j=0; j<tr_ptrArraySize (&t->peers); ++j)
3575        {
3576            tr_peer * peer = tr_ptrArrayNth (&t->peers, j);
3577            tr_peerMsgsPulse (peer->msgs);
3578        }
3579    }
3580}
3581
3582static void
3583queuePulse (tr_session * session, tr_direction dir)
3584{
3585    assert (tr_isSession (session));
3586    assert (tr_isDirection (dir));
3587
3588    if (tr_sessionGetQueueEnabled (session, dir))
3589    {
3590        int i;
3591        const int n = tr_sessionCountQueueFreeSlots (session, dir);
3592        for (i=0; i<n; i++) {
3593            tr_torrent * tor = tr_sessionGetNextQueuedTorrent (session, dir);
3594            if (tor != NULL) {
3595                tr_torrentStartNow (tor);
3596                if (tor->queue_started_callback != NULL)
3597                  (*tor->queue_started_callback)(tor, tor->queue_started_user_data);
3598            }
3599        }
3600    }
3601}
3602
3603static void
3604bandwidthPulse (int foo UNUSED, short bar UNUSED, void * vmgr)
3605{
3606    tr_torrent * tor;
3607    tr_peerMgr * mgr = vmgr;
3608    tr_session * session = mgr->session;
3609    managerLock (mgr);
3610
3611    /* FIXME: this next line probably isn't necessary... */
3612    pumpAllPeers (mgr);
3613
3614    /* allocate bandwidth to the peers */
3615    tr_bandwidthAllocate (&session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC);
3616    tr_bandwidthAllocate (&session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC);
3617
3618    /* torrent upkeep */
3619    tor = NULL;
3620    while ((tor = tr_torrentNext (session, tor)))
3621    {
3622        /* possibly stop torrents that have seeded enough */
3623        tr_torrentCheckSeedLimit (tor);
3624
3625        /* run the completeness check for any torrents that need it */
3626        if (tor->torrentPeers->needsCompletenessCheck) {
3627            tor->torrentPeers->needsCompletenessCheck  = false;
3628            tr_torrentRecheckCompleteness (tor);
3629        }
3630
3631        /* stop torrents that are ready to stop, but couldn't be stopped
3632           earlier during the peer-io callback call chain */
3633        if (tor->isStopping)
3634            tr_torrentStop (tor);
3635    }
3636
3637    /* pump the queues */
3638    queuePulse (session, TR_UP);
3639    queuePulse (session, TR_DOWN);
3640
3641    reconnectPulse (0, 0, mgr);
3642
3643    tr_timerAddMsec (mgr->bandwidthTimer, BANDWIDTH_PERIOD_MSEC);
3644    managerUnlock (mgr);
3645}
3646
3647/***
3648****
3649***/
3650
3651static int
3652compareAtomPtrsByAddress (const void * va, const void *vb)
3653{
3654    const struct peer_atom * a = * (const struct peer_atom**) va;
3655    const struct peer_atom * b = * (const struct peer_atom**) vb;
3656
3657    assert (tr_isAtom (a));
3658    assert (tr_isAtom (b));
3659
3660    return tr_address_compare (&a->addr, &b->addr);
3661}
3662
3663/* best come first, worst go last */
3664static int
3665compareAtomPtrsByShelfDate (const void * va, const void *vb)
3666{
3667    time_t atime;
3668    time_t btime;
3669    const struct peer_atom * a = * (const struct peer_atom**) va;
3670    const struct peer_atom * b = * (const struct peer_atom**) vb;
3671    const int data_time_cutoff_secs = 60 * 60;
3672    const time_t tr_now = tr_time ();
3673
3674    assert (tr_isAtom (a));
3675    assert (tr_isAtom (b));
3676
3677    /* primary key: the last piece data time *if* it was within the last hour */
3678    atime = a->piece_data_time; if (atime + data_time_cutoff_secs < tr_now) atime = 0;
3679    btime = b->piece_data_time; if (btime + data_time_cutoff_secs < tr_now) btime = 0;
3680    if (atime != btime)
3681        return atime > btime ? -1 : 1;
3682
3683    /* secondary key: shelf date. */
3684    if (a->shelf_date != b->shelf_date)
3685        return a->shelf_date > b->shelf_date ? -1 : 1;
3686
3687    return 0;
3688}
3689
3690static int
3691getMaxAtomCount (const tr_torrent * tor)
3692{
3693    const int n = tor->maxConnectedPeers;
3694    /* approximate fit of the old jump discontinuous function */
3695    if (n >= 55) return     n + 150;
3696    if (n >= 20) return 2 * n + 95;
3697    return               4 * n + 55;
3698}
3699
3700static void
3701atomPulse (int foo UNUSED, short bar UNUSED, void * vmgr)
3702{
3703    tr_torrent * tor = NULL;
3704    tr_peerMgr * mgr = vmgr;
3705    managerLock (mgr);
3706
3707    while ((tor = tr_torrentNext (mgr->session, tor)))
3708    {
3709        int atomCount;
3710        Torrent * t = tor->torrentPeers;
3711        const int maxAtomCount = getMaxAtomCount (tor);
3712        struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek (&t->pool, &atomCount);
3713
3714        if (atomCount > maxAtomCount) /* we've got too many atoms... time to prune */
3715        {
3716            int i;
3717            int keepCount = 0;
3718            int testCount = 0;
3719            struct peer_atom ** keep = tr_new (struct peer_atom*, atomCount);
3720            struct peer_atom ** test = tr_new (struct peer_atom*, atomCount);
3721
3722            /* keep the ones that are in use */
3723            for (i=0; i<atomCount; ++i) {
3724                struct peer_atom * atom = atoms[i];
3725                if (peerIsInUse (t, atom))
3726                    keep[keepCount++] = atom;
3727                else
3728                    test[testCount++] = atom;
3729            }
3730
3731            /* if there's room, keep the best of what's left */
3732            i = 0;
3733            if (keepCount < maxAtomCount) {
3734                qsort (test, testCount, sizeof (struct peer_atom *), compareAtomPtrsByShelfDate);
3735                while (i<testCount && keepCount<maxAtomCount)
3736                    keep[keepCount++] = test[i++];
3737            }
3738
3739            /* free the culled atoms */
3740            while (i<testCount)
3741                tr_free (test[i++]);
3742
3743            /* rebuild Torrent.pool with what's left */
3744            tr_ptrArrayDestruct (&t->pool, NULL);
3745            t->pool = TR_PTR_ARRAY_INIT;
3746            qsort (keep, keepCount, sizeof (struct peer_atom *), compareAtomPtrsByAddress);
3747            for (i=0; i<keepCount; ++i)
3748                tr_ptrArrayAppend (&t->pool, keep[i]);
3749
3750            tordbg (t, "max atom count is %d... pruned from %d to %d\n", maxAtomCount, atomCount, keepCount);
3751
3752            /* cleanup */
3753            tr_free (test);
3754            tr_free (keep);
3755        }
3756    }
3757
3758    tr_timerAddMsec (mgr->atomTimer, ATOM_PERIOD_MSEC);
3759    managerUnlock (mgr);
3760}
3761
3762/***
3763****
3764****
3765****
3766***/
3767
3768/* is this atom someone that we'd want to initiate a connection to? */
3769static bool
3770isPeerCandidate (const tr_torrent * tor, struct peer_atom * atom, const time_t now)
3771{
3772    /* not if we're both seeds */
3773    if (tr_torrentIsSeed (tor) && atomIsSeed (atom))
3774        return false;
3775
3776    /* not if we've already got a connection to them... */
3777    if (peerIsInUse (tor->torrentPeers, atom))
3778        return false;
3779
3780    /* not if we just tried them already */
3781    if ((now - atom->time) < getReconnectIntervalSecs (atom, now))
3782        return false;
3783
3784    /* not if they're blocklisted */
3785    if (isAtomBlocklisted (tor->session, atom))
3786        return false;
3787
3788    /* not if they're banned... */
3789    if (atom->flags2 & MYFLAG_BANNED)
3790        return false;
3791
3792    return true;
3793}
3794
3795struct peer_candidate
3796{
3797    uint64_t score;
3798    tr_torrent * tor;
3799    struct peer_atom * atom;
3800};
3801
3802static bool
3803torrentWasRecentlyStarted (const tr_torrent * tor)
3804{
3805    return difftime (tr_time (), tor->startDate) < 120;
3806}
3807
3808static inline uint64_t
3809addValToKey (uint64_t value, int width, uint64_t addme)
3810{
3811    value = (value << (uint64_t)width);
3812    value |= addme;
3813    return value;
3814}
3815
3816/* smaller value is better */
3817static uint64_t
3818getPeerCandidateScore (const tr_torrent * tor, const struct peer_atom * atom, uint8_t salt)
3819{
3820    uint64_t i;
3821    uint64_t score = 0;
3822    const bool failed = atom->lastConnectionAt < atom->lastConnectionAttemptAt;
3823
3824    /* prefer peers we've connected to, or never tried, over peers we failed to connect to. */
3825    i = failed ? 1 : 0;
3826    score = addValToKey (score, 1, i);
3827
3828    /* prefer the one we attempted least recently (to cycle through all peers) */
3829    i = atom->lastConnectionAttemptAt;
3830    score = addValToKey (score, 32, i);
3831
3832    /* prefer peers belonging to a torrent of a higher priority */
3833    switch (tr_torrentGetPriority (tor)) {
3834        case TR_PRI_HIGH:    i = 0; break;
3835        case TR_PRI_NORMAL:  i = 1; break;
3836        case TR_PRI_LOW:     i = 2; break;
3837    }
3838    score = addValToKey (score, 4, i);
3839
3840    /* prefer recently-started torrents */
3841    i = torrentWasRecentlyStarted (tor) ? 0 : 1;
3842    score = addValToKey (score, 1, i);
3843
3844    /* prefer torrents we're downloading with */
3845    i = tr_torrentIsSeed (tor) ? 1 : 0;
3846    score = addValToKey (score, 1, i);
3847
3848    /* prefer peers that are known to be connectible */
3849    i = (atom->flags & ADDED_F_CONNECTABLE) ? 0 : 1;
3850    score = addValToKey (score, 1, i);
3851
3852    /* prefer peers that we might have a chance of uploading to...
3853       so lower seed probability is better */
3854    if (atom->seedProbability == 100) i = 101;
3855    else if (atom->seedProbability == -1) i = 100;
3856    else i = atom->seedProbability;
3857    score = addValToKey (score, 8, i);
3858
3859    /* Prefer peers that we got from more trusted sources.
3860     * lower `fromBest' values indicate more trusted sources */
3861    score = addValToKey (score, 4, atom->fromBest);
3862
3863    /* salt */
3864    score = addValToKey (score, 8, salt);
3865
3866    return score;
3867}
3868
3869#ifndef NDEBUG
3870static int
3871checkPartition (const struct peer_candidate * candidates, int left, int right, uint64_t pivotScore, int storeIndex)
3872{
3873    int i;
3874
3875    assert (storeIndex >= left);
3876    assert (storeIndex <= right);
3877    assert (candidates[storeIndex].score == pivotScore);
3878
3879    for (i=left; i<storeIndex; ++i)
3880        assert (candidates[i].score < pivotScore);
3881    for (i=storeIndex+1; i<=right; ++i)
3882        assert (candidates[i].score >= pivotScore);
3883
3884    return true;
3885}
3886#endif
3887
3888/* Helper to selectBestCandidates ().
3889 * Adapted from http://en.wikipedia.org/wiki/Selection_algorithm */
3890static int
3891partitionPeerCandidates (struct peer_candidate * candidates, int left, int right, int pivotIndex)
3892{
3893    int i;
3894    int storeIndex;
3895    struct peer_candidate tmp;
3896    const struct peer_candidate pivotValue = candidates[pivotIndex];
3897
3898    /* move pivot to end */
3899    tmp = candidates[right];
3900    candidates[right] = pivotValue;
3901    candidates[pivotIndex] = tmp;
3902
3903    storeIndex = left;
3904    for (i=left; i<=right; ++i)
3905    {
3906        if (candidates[i].score < pivotValue.score)
3907        {
3908            tmp = candidates[storeIndex];
3909            candidates[storeIndex] = candidates[i];
3910            candidates[i] = tmp;
3911            storeIndex++;
3912        }
3913    }
3914
3915    /* move pivot to its final place */
3916    tmp = candidates[right];
3917    candidates[right] = candidates[storeIndex];
3918    candidates[storeIndex] = tmp;
3919
3920    /* sanity check */
3921    assert (checkPartition (candidates, left, right, pivotValue.score, storeIndex));
3922
3923    return storeIndex;
3924}
3925
3926/* Adapted from http://en.wikipedia.org/wiki/Selection_algorithm */
3927static void
3928selectPeerCandidates (struct peer_candidate * candidates, int left, int right, int k)
3929{
3930    if (right > left)
3931    {
3932        const int pivotIndex = left + (right-left)/2;
3933
3934        int pivotNewIndex = partitionPeerCandidates (candidates, left, right, pivotIndex);
3935
3936        if (pivotNewIndex > left + k) /* new condition */
3937            selectPeerCandidates (candidates, left, pivotNewIndex-1, k);
3938        else if (pivotNewIndex < left + k)
3939            selectPeerCandidates (candidates, pivotNewIndex+1, right, k+left-pivotNewIndex-1);
3940    }
3941}
3942
3943#ifndef NDEBUG
3944static bool
3945checkBestScoresComeFirst (const struct peer_candidate * candidates, int n, int k)
3946{
3947    int i;
3948    uint64_t worstFirstScore = 0;
3949    const int x = MIN (n, k) - 1;
3950
3951    for (i=0; i<x; i++)
3952        if (worstFirstScore < candidates[i].score)
3953            worstFirstScore = candidates[i].score;
3954
3955    for (i=0; i<x; i++)
3956        assert (candidates[i].score <= worstFirstScore);
3957
3958    for (i=x+1; i<n; i++)
3959        assert (candidates[i].score >= worstFirstScore);
3960
3961    return true;
3962}
3963#endif /* NDEBUG */
3964
3965/** @return an array of all the atoms we might want to connect to */
3966static struct peer_candidate*
3967getPeerCandidates (tr_session * session, int * candidateCount, int max)
3968{
3969    int atomCount;
3970    int peerCount;
3971    tr_torrent * tor;
3972    struct peer_candidate * candidates;
3973    struct peer_candidate * walk;
3974    const time_t now = tr_time ();
3975    const uint64_t now_msec = tr_time_msec ();
3976    /* leave 5% of connection slots for incoming connections -- ticket #2609 */
3977    const int maxCandidates = tr_sessionGetPeerLimit (session) * 0.95;
3978
3979    /* count how many peers and atoms we've got */
3980    tor= NULL;
3981    atomCount = 0;
3982    peerCount = 0;
3983    while ((tor = tr_torrentNext (session, tor))) {
3984        atomCount += tr_ptrArraySize (&tor->torrentPeers->pool);
3985        peerCount += tr_ptrArraySize (&tor->torrentPeers->peers);
3986    }
3987
3988    /* don't start any new handshakes if we're full up */
3989    if (maxCandidates <= peerCount) {
3990        *candidateCount = 0;
3991        return NULL;
3992    }
3993
3994    /* allocate an array of candidates */
3995    walk = candidates = tr_new (struct peer_candidate, atomCount);
3996
3997    /* populate the candidate array */
3998    tor = NULL;
3999    while ((tor = tr_torrentNext (session, tor)))
4000    {
4001        int i, nAtoms;
4002        struct peer_atom ** atoms;
4003
4004        if (!tor->torrentPeers->isRunning)
4005            continue;
4006
4007        /* if we've already got enough peers in this torrent... */
4008        if (tr_torrentGetPeerLimit (tor) <= tr_ptrArraySize (&tor->torrentPeers->peers))
4009            continue;
4010
4011        /* if we've already got enough speed in this torrent... */
4012        if (tr_torrentIsSeed (tor) && isBandwidthMaxedOut (&tor->bandwidth, now_msec, TR_UP))
4013            continue;
4014
4015        atoms = (struct peer_atom**) tr_ptrArrayPeek (&tor->torrentPeers->pool, &nAtoms);
4016        for (i=0; i<nAtoms; ++i)
4017        {
4018            struct peer_atom * atom = atoms[i];
4019
4020            if (isPeerCandidate (tor, atom, now))
4021            {
4022                const uint8_t salt = tr_cryptoWeakRandInt (1024);
4023                walk->tor = tor;
4024                walk->atom = atom;
4025                walk->score = getPeerCandidateScore (tor, atom, salt);
4026                ++walk;
4027            }
4028        }
4029    }
4030
4031    *candidateCount = walk - candidates;
4032    if (walk != candidates)
4033        selectPeerCandidates (candidates, 0, (walk-candidates)-1, max);
4034
4035    assert (checkBestScoresComeFirst (candidates, *candidateCount, max));
4036
4037    return candidates;
4038}
4039
4040static void
4041initiateConnection (tr_peerMgr * mgr, Torrent * t, struct peer_atom * atom)
4042{
4043    tr_peerIo * io;
4044    const time_t now = tr_time ();
4045    bool utp = tr_sessionIsUTPEnabled (mgr->session) && !atom->utp_failed;
4046
4047    if (atom->fromFirst == TR_PEER_FROM_PEX)
4048        /* PEX has explicit signalling for uTP support.  If an atom
4049           originally came from PEX and doesn't have the uTP flag, skip the
4050           uTP connection attempt.  Are we being optimistic here? */
4051        utp = utp && (atom->flags & ADDED_F_UTP_FLAGS);
4052
4053    tordbg (t, "Starting an OUTGOING%s connection with %s",
4054            utp ? " µTP" : "",
4055            tr_atomAddrStr (atom));
4056
4057    io = tr_peerIoNewOutgoing (mgr->session,
4058                               &mgr->session->bandwidth,
4059                               &atom->addr,
4060                               atom->port,
4061                               t->tor->info.hash,
4062                               t->tor->completeness == TR_SEED,
4063                               utp);
4064
4065    if (io == NULL)
4066    {
4067        tordbg (t, "peerIo not created; marking peer %s as unreachable",
4068                tr_atomAddrStr (atom));
4069        atom->flags2 |= MYFLAG_UNREACHABLE;
4070        atom->numFails++;
4071    }
4072    else
4073    {
4074        tr_handshake * handshake = tr_handshakeNew (io,
4075                                                    mgr->session->encryptionMode,
4076                                                    myHandshakeDoneCB,
4077                                                    mgr);
4078
4079        assert (tr_peerIoGetTorrentHash (io));
4080
4081        tr_peerIoUnref (io); /* balanced by the initial ref
4082                                 in tr_peerIoNewOutgoing () */
4083
4084        tr_ptrArrayInsertSorted (&t->outgoingHandshakes, handshake,
4085                                 handshakeCompare);
4086    }
4087
4088    atom->lastConnectionAttemptAt = now;
4089    atom->time = now;
4090}
4091
4092static void
4093initiateCandidateConnection (tr_peerMgr * mgr, struct peer_candidate * c)
4094{
4095#if 0
4096    fprintf (stderr, "Starting an OUTGOING connection with %s - [%s] seedProbability==%d; %s, %s\n",
4097             tr_atomAddrStr (c->atom),
4098             tr_torrentName (c->tor),
4099           (int)c->atom->seedProbability,
4100             tr_torrentIsPrivate (c->tor) ? "private" : "public",
4101             tr_torrentIsSeed (c->tor) ? "seed" : "downloader");
4102#endif
4103
4104    initiateConnection (mgr, c->tor->torrentPeers, c->atom);
4105}
4106
4107static void
4108makeNewPeerConnections (struct tr_peerMgr * mgr, const int max)
4109{
4110    int i, n;
4111    struct peer_candidate * candidates;
4112
4113    candidates = getPeerCandidates (mgr->session, &n, max);
4114
4115    for (i=0; i<n && i<max; ++i)
4116        initiateCandidateConnection (mgr, &candidates[i]);
4117
4118    tr_free (candidates);
4119}
Note: See TracBrowser for help on using the repository browser.