source: trunk/libtransmission/peer-mgr.c @ 13651

Last change on this file since 13651 was 13651, checked in by jordan, 9 years ago

(trunk) #5168 'make libtransmission's public funcs nonblocking when possible' -- first attempt.

  • Property svn:keywords set to Date Rev Author Id
File size: 117.7 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2 (b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 13651 2012-12-12 20:22:57Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h> /* error codes ERANGE, ... */
15#include <limits.h> /* INT_MAX */
16#include <string.h> /* memcpy, memcmp, strstr */
17#include <stdlib.h> /* qsort */
18
19#include <event2/event.h>
20
21#include <libutp/utp.h>
22
23#include "transmission.h"
24#include "announcer.h"
25#include "bandwidth.h"
26#include "blocklist.h"
27#include "cache.h"
28#include "clients.h"
29#include "completion.h"
30#include "crypto.h"
31#include "handshake.h"
32#include "net.h"
33#include "peer-io.h"
34#include "peer-mgr.h"
35#include "peer-msgs.h"
36#include "ptrarray.h"
37#include "session.h"
38#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
39#include "torrent.h"
40#include "tr-utp.h"
41#include "utils.h"
42#include "webseed.h"
43
44enum
45{
46    /* how frequently to cull old atoms */
47    ATOM_PERIOD_MSEC = (60 * 1000),
48
49    /* how frequently to change which peers are choked */
50    RECHOKE_PERIOD_MSEC = (10 * 1000),
51
52    /* an optimistically unchoked peer is immune from rechoking
53       for this many calls to rechokeUploads (). */
54    OPTIMISTIC_UNCHOKE_MULTIPLIER = 4,
55
56    /* how frequently to reallocate bandwidth */
57    BANDWIDTH_PERIOD_MSEC = 500,
58
59    /* how frequently to age out old piece request lists */
60    REFILL_UPKEEP_PERIOD_MSEC = (10 * 1000),
61
62    /* how frequently to decide which peers live and die */
63    RECONNECT_PERIOD_MSEC = 500,
64
65    /* when many peers are available, keep idle ones this long */
66    MIN_UPLOAD_IDLE_SECS = (60),
67
68    /* when few peers are available, keep idle ones this long */
69    MAX_UPLOAD_IDLE_SECS = (60 * 5),
70
71    /* max number of peers to ask for per second overall.
72     * this throttle is to avoid overloading the router */
73    MAX_CONNECTIONS_PER_SECOND = 12,
74
75    MAX_CONNECTIONS_PER_PULSE = (int)(MAX_CONNECTIONS_PER_SECOND * (RECONNECT_PERIOD_MSEC/1000.0)),
76
77    /* number of bad pieces a peer is allowed to send before we ban them */
78    MAX_BAD_PIECES_PER_PEER = 5,
79
80    /* amount of time to keep a list of request pieces lying around
81       before it's considered too old and needs to be rebuilt */
82    PIECE_LIST_SHELF_LIFE_SECS = 60,
83
84    /* use for bitwise operations w/peer_atom.flags2 */
85    MYFLAG_BANNED = 1,
86
87    /* use for bitwise operations w/peer_atom.flags2 */
88    /* unreachable for now... but not banned.
89     * if they try to connect to us it's okay */
90    MYFLAG_UNREACHABLE = 2,
91
92    /* the minimum we'll wait before attempting to reconnect to a peer */
93    MINIMUM_RECONNECT_INTERVAL_SECS = 5,
94
95    /** how long we'll let requests we've made linger before we cancel them */
96    REQUEST_TTL_SECS = 120,
97
98    NO_BLOCKS_CANCEL_HISTORY = 120,
99
100    CANCEL_HISTORY_SEC = 60
101};
102
103const tr_peer_event TR_PEER_EVENT_INIT = { 0, 0, NULL, 0, 0, 0, false, 0 };
104
105/**
106***
107**/
108
109/**
110 * Peer information that should be kept even before we've connected and
111 * after we've disconnected. These are kept in a pool of peer_atoms to decide
112 * which ones would make good candidates for connecting to, and to watch out
113 * for banned peers.
114 *
115 * @see tr_peer
116 * @see tr_peermsgs
117 */
118struct peer_atom
119{
120    uint8_t     fromFirst;          /* where the peer was first found */
121    uint8_t     fromBest;           /* the "best" value of where the peer has been found */
122    uint8_t     flags;              /* these match the added_f flags */
123    uint8_t     flags2;             /* flags that aren't defined in added_f */
124    int8_t      seedProbability;    /* how likely is this to be a seed... [0..100] or -1 for unknown */
125    int8_t      blocklisted;        /* -1 for unknown, true for blocklisted, false for not blocklisted */
126
127    tr_port     port;
128    bool        utp_failed;         /* We recently failed to connect over uTP */
129    uint16_t    numFails;
130    time_t      time;               /* when the peer's connection status last changed */
131    time_t      piece_data_time;
132
133    time_t      lastConnectionAttemptAt;
134    time_t      lastConnectionAt;
135
136    /* similar to a TTL field, but less rigid --
137     * if the swarm is small, the atom will be kept past this date. */
138    time_t      shelf_date;
139    tr_peer   * peer;               /* will be NULL if not connected */
140    tr_address  addr;
141};
142
143#ifdef NDEBUG
144#define tr_isAtom(a) (TRUE)
145#else
146static bool
147tr_isAtom (const struct peer_atom * atom)
148{
149    return (atom != NULL)
150        && (atom->fromFirst < TR_PEER_FROM__MAX)
151        && (atom->fromBest < TR_PEER_FROM__MAX)
152        && (tr_address_is_valid (&atom->addr));
153}
154#endif
155
156static const char*
157tr_atomAddrStr (const struct peer_atom * atom)
158{
159    return atom ? tr_peerIoAddrStr (&atom->addr, atom->port) : "[no atom]";
160}
161
162struct block_request
163{
164    tr_block_index_t block;
165    tr_peer * peer;
166    time_t sentAt;
167};
168
169struct weighted_piece
170{
171    tr_piece_index_t index;
172    int16_t salt;
173    int16_t requestCount;
174};
175
176enum piece_sort_state
177{
178    PIECES_UNSORTED,
179    PIECES_SORTED_BY_INDEX,
180    PIECES_SORTED_BY_WEIGHT
181};
182
183/** @brief Opaque, per-torrent data structure for peer connection information */
184typedef struct tr_torrent_peers
185{
186    tr_ptrArray                outgoingHandshakes; /* tr_handshake */
187    tr_ptrArray                pool; /* struct peer_atom */
188    tr_ptrArray                peers; /* tr_peer */
189    tr_ptrArray                webseeds; /* tr_webseed */
190
191    tr_torrent               * tor;
192    struct tr_peerMgr        * manager;
193
194    tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
195    int                        optimisticUnchokeTimeScaler;
196
197    bool                       isRunning;
198    bool                       needsCompletenessCheck;
199
200    struct block_request     * requests;
201    int                        requestCount;
202    int                        requestAlloc;
203
204    struct weighted_piece    * pieces;
205    int                        pieceCount;
206    enum piece_sort_state      pieceSortState;
207
208    /* An array of pieceCount items stating how many peers have each piece.
209       This is used to help us for downloading pieces "rarest first."
210       This may be NULL if we don't have metainfo yet, or if we're not
211       downloading and don't care about rarity */
212    uint16_t                 * pieceReplication;
213    size_t                     pieceReplicationSize;
214
215    int                        interestedCount;
216    int                        maxPeers;
217    time_t                     lastCancel;
218
219    /* Before the endgame this should be 0. In endgame, is contains the average
220     * number of pending requests per peer. Only peers which have more pending
221     * requests are considered 'fast' are allowed to request a block that's
222     * already been requested from another (slower?) peer. */
223    int                        endgame;
224}
225Torrent;
226
227struct tr_peerMgr
228{
229    tr_session    * session;
230    tr_ptrArray     incomingHandshakes; /* tr_handshake */
231    struct event  * bandwidthTimer;
232    struct event  * rechokeTimer;
233    struct event  * refillUpkeepTimer;
234    struct event  * atomTimer;
235};
236
237#define tordbg(t, ...) \
238  do \
239    { \
240      if (tr_deepLoggingIsActive ()) \
241        tr_deepLog (__FILE__, __LINE__, tr_torrentName (t->tor), __VA_ARGS__); \
242    } \
243  while (0)
244
245#define dbgmsg(...) \
246  do \
247    { \
248      if (tr_deepLoggingIsActive ()) \
249        tr_deepLog (__FILE__, __LINE__, NULL, __VA_ARGS__); \
250    } \
251  while (0)
252
253/**
254***
255**/
256
257static inline void
258managerLock (const struct tr_peerMgr * manager)
259{
260    tr_sessionLock (manager->session);
261}
262
263static inline void
264managerUnlock (const struct tr_peerMgr * manager)
265{
266    tr_sessionUnlock (manager->session);
267}
268
269static inline void
270torrentLock (Torrent * torrent)
271{
272    managerLock (torrent->manager);
273}
274
275static inline void
276torrentUnlock (Torrent * torrent)
277{
278    managerUnlock (torrent->manager);
279}
280
281static inline int
282torrentIsLocked (const Torrent * t)
283{
284    return tr_sessionIsLocked (t->manager->session);
285}
286
287/**
288***
289**/
290
291static int
292handshakeCompareToAddr (const void * va, const void * vb)
293{
294    const tr_handshake * a = va;
295
296    return tr_address_compare (tr_handshakeGetAddr (a, NULL), vb);
297}
298
299static int
300handshakeCompare (const void * a, const void * b)
301{
302    return handshakeCompareToAddr (a, tr_handshakeGetAddr (b, NULL));
303}
304
305static inline tr_handshake*
306getExistingHandshake (tr_ptrArray * handshakes, const tr_address * addr)
307{
308    if (tr_ptrArrayEmpty (handshakes))
309        return NULL;
310
311    return tr_ptrArrayFindSorted (handshakes, addr, handshakeCompareToAddr);
312}
313
314static int
315comparePeerAtomToAddress (const void * va, const void * vb)
316{
317    const struct peer_atom * a = va;
318
319    return tr_address_compare (&a->addr, vb);
320}
321
322static int
323compareAtomsByAddress (const void * va, const void * vb)
324{
325    const struct peer_atom * b = vb;
326
327    assert (tr_isAtom (b));
328
329    return comparePeerAtomToAddress (va, &b->addr);
330}
331
332/**
333***
334**/
335
336const tr_address *
337tr_peerAddress (const tr_peer * peer)
338{
339    return &peer->atom->addr;
340}
341
342static Torrent*
343getExistingTorrent (tr_peerMgr *    manager,
344                    const uint8_t * hash)
345{
346    tr_torrent * tor = tr_torrentFindFromHash (manager->session, hash);
347
348    return tor == NULL ? NULL : tor->torrentPeers;
349}
350
351static int
352peerCompare (const void * a, const void * b)
353{
354    return tr_address_compare (tr_peerAddress (a), tr_peerAddress (b));
355}
356
357static struct peer_atom*
358getExistingAtom (const Torrent    * t,
359                 const tr_address * addr)
360{
361    Torrent * tt = (Torrent*)t;
362    assert (torrentIsLocked (t));
363    return tr_ptrArrayFindSorted (&tt->pool, addr, comparePeerAtomToAddress);
364}
365
366static bool
367peerIsInUse (const Torrent * ct, const struct peer_atom * atom)
368{
369    Torrent * t = (Torrent*) ct;
370
371    assert (torrentIsLocked (t));
372
373    return (atom->peer != NULL)
374        || getExistingHandshake (&t->outgoingHandshakes, &atom->addr)
375        || getExistingHandshake (&t->manager->incomingHandshakes, &atom->addr);
376}
377
378void
379tr_peerConstruct (tr_peer * peer)
380{
381    memset (peer, 0, sizeof (tr_peer));
382
383    peer->have = TR_BITFIELD_INIT;
384}
385
386static tr_peer*
387peerNew (struct peer_atom * atom)
388{
389    tr_peer * peer = tr_new (tr_peer, 1);
390    tr_peerConstruct (peer);
391
392    peer->atom = atom;
393    atom->peer = peer;
394
395    return peer;
396}
397
398static tr_peer*
399getPeer (Torrent * torrent, struct peer_atom * atom)
400{
401    tr_peer * peer;
402
403    assert (torrentIsLocked (torrent));
404
405    peer = atom->peer;
406
407    if (peer == NULL)
408    {
409        peer = peerNew (atom);
410        tr_bitfieldConstruct (&peer->have, torrent->tor->info.pieceCount);
411        tr_bitfieldConstruct (&peer->blame, torrent->tor->blockCount);
412        tr_ptrArrayInsertSorted (&torrent->peers, peer, peerCompare);
413    }
414
415    return peer;
416}
417
418static void peerDeclinedAllRequests (Torrent *, const tr_peer *);
419
420void
421tr_peerDestruct (tr_torrent * tor, tr_peer * peer)
422{
423    assert (peer != NULL);
424
425    peerDeclinedAllRequests (tor->torrentPeers, peer);
426
427    if (peer->msgs != NULL)
428        tr_peerMsgsFree (peer->msgs);
429
430    if (peer->io) {
431        tr_peerIoClear (peer->io);
432        tr_peerIoUnref (peer->io); /* balanced by the ref in handshakeDoneCB () */
433    }
434
435    tr_bitfieldDestruct (&peer->have);
436    tr_bitfieldDestruct (&peer->blame);
437    tr_free (peer->client);
438
439    if (peer->atom)
440        peer->atom->peer = NULL;
441}
442
443static void
444peerDelete (Torrent * t, tr_peer * peer)
445{
446    tr_peerDestruct (t->tor, peer);
447    tr_free (peer);
448}
449
450static bool
451replicationExists (const Torrent * t)
452{
453    return t->pieceReplication != NULL;
454}
455
456static void
457replicationFree (Torrent * t)
458{
459    tr_free (t->pieceReplication);
460    t->pieceReplication = NULL;
461    t->pieceReplicationSize = 0;
462}
463
464static void
465replicationNew (Torrent * t)
466{
467    tr_piece_index_t piece_i;
468    const tr_piece_index_t piece_count = t->tor->info.pieceCount;
469    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase (&t->peers);
470    const int peer_count = tr_ptrArraySize (&t->peers);
471
472    assert (!replicationExists (t));
473
474    t->pieceReplicationSize = piece_count;
475    t->pieceReplication = tr_new0 (uint16_t, piece_count);
476
477    for (piece_i=0; piece_i<piece_count; ++piece_i)
478    {
479        int peer_i;
480        uint16_t r = 0;
481
482        for (peer_i=0; peer_i<peer_count; ++peer_i)
483            if (tr_bitfieldHas (&peers[peer_i]->have, piece_i))
484                ++r;
485
486        t->pieceReplication[piece_i] = r;
487    }
488}
489
490static void
491torrentFree (void * vt)
492{
493    Torrent * t = vt;
494
495    assert (t);
496    assert (!t->isRunning);
497    assert (torrentIsLocked (t));
498    assert (tr_ptrArrayEmpty (&t->outgoingHandshakes));
499    assert (tr_ptrArrayEmpty (&t->peers));
500
501    tr_ptrArrayDestruct (&t->webseeds, (PtrArrayForeachFunc)tr_webseedFree);
502    tr_ptrArrayDestruct (&t->pool, (PtrArrayForeachFunc)tr_free);
503    tr_ptrArrayDestruct (&t->outgoingHandshakes, NULL);
504    tr_ptrArrayDestruct (&t->peers, NULL);
505
506    replicationFree (t);
507
508    tr_free (t->requests);
509    tr_free (t->pieces);
510    tr_free (t);
511}
512
513static void peerCallbackFunc (tr_peer *, const tr_peer_event *, void *);
514
515static void
516rebuildWebseedArray (Torrent * t, tr_torrent * tor)
517{
518    int i;
519    const tr_info * inf = &tor->info;
520
521    /* clear the array */
522    tr_ptrArrayDestruct (&t->webseeds, (PtrArrayForeachFunc)tr_webseedFree);
523    t->webseeds = TR_PTR_ARRAY_INIT;
524
525    /* repopulate it */
526    for (i = 0; i < inf->webseedCount; ++i)
527    {
528        tr_webseed * w = tr_webseedNew (tor, inf->webseeds[i], peerCallbackFunc, t);
529        tr_ptrArrayAppend (&t->webseeds, w);
530    }
531}
532
533static Torrent*
534torrentNew (tr_peerMgr * manager, tr_torrent * tor)
535{
536    Torrent * t;
537
538    t = tr_new0 (Torrent, 1);
539    t->manager = manager;
540    t->tor = tor;
541    t->pool = TR_PTR_ARRAY_INIT;
542    t->peers = TR_PTR_ARRAY_INIT;
543    t->webseeds = TR_PTR_ARRAY_INIT;
544    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
545
546    rebuildWebseedArray (t, tor);
547
548    return t;
549}
550
551static void ensureMgrTimersExist (struct tr_peerMgr * m);
552
553tr_peerMgr*
554tr_peerMgrNew (tr_session * session)
555{
556    tr_peerMgr * m = tr_new0 (tr_peerMgr, 1);
557    m->session = session;
558    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
559    ensureMgrTimersExist (m);
560    return m;
561}
562
563static void
564deleteTimer (struct event ** t)
565{
566    if (*t != NULL)
567    {
568        event_free (*t);
569        *t = NULL;
570    }
571}
572
573static void
574deleteTimers (struct tr_peerMgr * m)
575{
576    deleteTimer (&m->atomTimer);
577    deleteTimer (&m->bandwidthTimer);
578    deleteTimer (&m->rechokeTimer);
579    deleteTimer (&m->refillUpkeepTimer);
580}
581
582void
583tr_peerMgrFree (tr_peerMgr * manager)
584{
585    managerLock (manager);
586
587    deleteTimers (manager);
588
589    /* free the handshakes. Abort invokes handshakeDoneCB (), which removes
590     * the item from manager->handshakes, so this is a little roundabout... */
591    while (!tr_ptrArrayEmpty (&manager->incomingHandshakes))
592        tr_handshakeAbort (tr_ptrArrayNth (&manager->incomingHandshakes, 0));
593
594    tr_ptrArrayDestruct (&manager->incomingHandshakes, NULL);
595
596    managerUnlock (manager);
597    tr_free (manager);
598}
599
600static int
601clientIsDownloadingFrom (const tr_torrent * tor, const tr_peer * peer)
602{
603    if (!tr_torrentHasMetadata (tor))
604        return true;
605
606    return peer->clientIsInterested && !peer->clientIsChoked;
607}
608
609static int
610clientIsUploadingTo (const tr_peer * peer)
611{
612    return peer->peerIsInterested && !peer->peerIsChoked;
613}
614
615/***
616****
617***/
618
619void
620tr_peerMgrOnBlocklistChanged (tr_peerMgr * mgr)
621{
622    tr_torrent * tor = NULL;
623    tr_session * session = mgr->session;
624
625    /* we cache whether or not a peer is blocklisted...
626       since the blocklist has changed, erase that cached value */
627    while ((tor = tr_torrentNext (session, tor)))
628    {
629        int i;
630        Torrent * t = tor->torrentPeers;
631        const int n = tr_ptrArraySize (&t->pool);
632        for (i=0; i<n; ++i) {
633            struct peer_atom * atom = tr_ptrArrayNth (&t->pool, i);
634            atom->blocklisted = -1;
635        }
636    }
637}
638
639static bool
640isAtomBlocklisted (tr_session * session, struct peer_atom * atom)
641{
642    if (atom->blocklisted < 0)
643        atom->blocklisted = tr_sessionIsAddressBlocked (session, &atom->addr);
644
645    assert (tr_isBool (atom->blocklisted));
646    return atom->blocklisted;
647}
648
649
650/***
651****
652***/
653
654static void
655atomSetSeedProbability (struct peer_atom * atom, int seedProbability)
656{
657    assert (atom != NULL);
658    assert (-1<=seedProbability && seedProbability<=100);
659
660    atom->seedProbability = seedProbability;
661
662    if (seedProbability == 100)
663        atom->flags |= ADDED_F_SEED_FLAG;
664    else if (seedProbability != -1)
665        atom->flags &= ~ADDED_F_SEED_FLAG;
666}
667
668static inline bool
669atomIsSeed (const struct peer_atom * atom)
670{
671    return atom->seedProbability == 100;
672}
673
674static void
675atomSetSeed (const Torrent * t, struct peer_atom * atom)
676{
677    if (!atomIsSeed (atom))
678    {
679        tordbg (t, "marking peer %s as a seed", tr_atomAddrStr (atom));
680
681        atomSetSeedProbability (atom, 100);
682    }
683}
684
685
686bool
687tr_peerMgrPeerIsSeed (const tr_torrent  * tor,
688                      const tr_address  * addr)
689{
690    bool isSeed = false;
691    const Torrent * t = tor->torrentPeers;
692    const struct peer_atom * atom = getExistingAtom (t, addr);
693
694    if (atom)
695        isSeed = atomIsSeed (atom);
696
697    return isSeed;
698}
699
700void
701tr_peerMgrSetUtpSupported (tr_torrent * tor, const tr_address * addr)
702{
703    struct peer_atom * atom = getExistingAtom (tor->torrentPeers, addr);
704
705    if (atom)
706        atom->flags |= ADDED_F_UTP_FLAGS;
707}
708
709void
710tr_peerMgrSetUtpFailed (tr_torrent *tor, const tr_address *addr, bool failed)
711{
712    struct peer_atom * atom = getExistingAtom (tor->torrentPeers, addr);
713
714    if (atom)
715        atom->utp_failed = failed;
716}
717
718
719/**
720***  REQUESTS
721***
722*** There are two data structures associated with managing block requests:
723***
724*** 1. Torrent::requests, an array of "struct block_request" which keeps
725***    track of which blocks have been requested, and when, and by which peers.
726***    This is list is used for (a) cancelling requests that have been pending
727***    for too long and (b) avoiding duplicate requests before endgame.
728***
729*** 2. Torrent::pieces, an array of "struct weighted_piece" which lists the
730***    pieces that we want to request. It's used to decide which blocks to
731***    return next when tr_peerMgrGetBlockRequests () is called.
732**/
733
734/**
735*** struct block_request
736**/
737
738static int
739compareReqByBlock (const void * va, const void * vb)
740{
741    const struct block_request * a = va;
742    const struct block_request * b = vb;
743
744    /* primary key: block */
745    if (a->block < b->block) return -1;
746    if (a->block > b->block) return 1;
747
748    /* secondary key: peer */
749    if (a->peer < b->peer) return -1;
750    if (a->peer > b->peer) return 1;
751
752    return 0;
753}
754
755static void
756requestListAdd (Torrent * t, tr_block_index_t block, tr_peer * peer)
757{
758    struct block_request key;
759
760    /* ensure enough room is available... */
761    if (t->requestCount + 1 >= t->requestAlloc)
762    {
763        const int CHUNK_SIZE = 128;
764        t->requestAlloc += CHUNK_SIZE;
765        t->requests = tr_renew (struct block_request,
766                                t->requests, t->requestAlloc);
767    }
768
769    /* populate the record we're inserting */
770    key.block = block;
771    key.peer = peer;
772    key.sentAt = tr_time ();
773
774    /* insert the request to our array... */
775    {
776        bool exact;
777        const int pos = tr_lowerBound (&key, t->requests, t->requestCount,
778                                       sizeof (struct block_request),
779                                       compareReqByBlock, &exact);
780        assert (!exact);
781        memmove (t->requests + pos + 1,
782                 t->requests + pos,
783                 sizeof (struct block_request) * (t->requestCount++ - pos));
784        t->requests[pos] = key;
785    }
786
787    if (peer != NULL)
788    {
789        ++peer->pendingReqsToPeer;
790        assert (peer->pendingReqsToPeer >= 0);
791    }
792
793    /*fprintf (stderr, "added request of block %lu from peer %s... "
794                       "there are now %d block\n",
795                     (unsigned long)block, tr_atomAddrStr (peer->atom), t->requestCount);*/
796}
797
798static struct block_request *
799requestListLookup (Torrent * t, tr_block_index_t block, const tr_peer * peer)
800{
801    struct block_request key;
802    key.block = block;
803    key.peer = (tr_peer*) peer;
804
805    return bsearch (&key, t->requests, t->requestCount,
806                    sizeof (struct block_request),
807                    compareReqByBlock);
808}
809
810/**
811 * Find the peers are we currently requesting the block
812 * with index @a block from and append them to @a peerArr.
813 */
814static void
815getBlockRequestPeers (Torrent * t, tr_block_index_t block,
816                      tr_ptrArray * peerArr)
817{
818    bool exact;
819    int i, pos;
820    struct block_request key;
821
822    key.block = block;
823    key.peer = NULL;
824    pos = tr_lowerBound (&key, t->requests, t->requestCount,
825                         sizeof (struct block_request),
826                         compareReqByBlock, &exact);
827
828    assert (!exact); /* shouldn't have a request with .peer == NULL */
829
830    for (i = pos; i < t->requestCount; ++i)
831    {
832        if (t->requests[i].block != block)
833            break;
834        tr_ptrArrayAppend (peerArr, t->requests[i].peer);
835    }
836}
837
838static void
839decrementPendingReqCount (const struct block_request * b)
840{
841    if (b->peer != NULL)
842        if (b->peer->pendingReqsToPeer > 0)
843            --b->peer->pendingReqsToPeer;
844}
845
846static void
847requestListRemove (Torrent * t, tr_block_index_t block, const tr_peer * peer)
848{
849    const struct block_request * b = requestListLookup (t, block, peer);
850    if (b != NULL)
851    {
852        const int pos = b - t->requests;
853        assert (pos < t->requestCount);
854
855        decrementPendingReqCount (b);
856
857        tr_removeElementFromArray (t->requests,
858                                   pos,
859                                   sizeof (struct block_request),
860                                   t->requestCount--);
861
862        /*fprintf (stderr, "removing request of block %lu from peer %s... "
863                           "there are now %d block requests left\n",
864                         (unsigned long)block, tr_atomAddrStr (peer->atom), t->requestCount);*/
865    }
866}
867
868static int
869countActiveWebseeds (const Torrent * t)
870{
871    int activeCount = 0;
872    const tr_webseed ** w = (const tr_webseed **) tr_ptrArrayBase (&t->webseeds);
873    const tr_webseed ** const wend = w + tr_ptrArraySize (&t->webseeds);
874
875    for (; w!=wend; ++w)
876        if (tr_webseedIsActive (*w))
877            ++activeCount;
878
879    return activeCount;
880}
881
882static bool
883testForEndgame (const Torrent * t)
884{
885    /* we consider ourselves to be in endgame if the number of bytes
886       we've got requested is >= the number of bytes left to download */
887    return (t->requestCount * t->tor->blockSize)
888               >= tr_cpLeftUntilDone (&t->tor->completion);
889}
890
891static void
892updateEndgame (Torrent * t)
893{
894    assert (t->requestCount >= 0);
895
896    if (!testForEndgame (t))
897    {
898        /* not in endgame */
899        t->endgame = 0;
900    }
901    else if (!t->endgame) /* only recalculate when endgame first begins */
902    {
903        int numDownloading = 0;
904        const tr_peer ** p = (const tr_peer **) tr_ptrArrayBase (&t->peers);
905        const tr_peer ** const pend = p + tr_ptrArraySize (&t->peers);
906
907        /* add the active bittorrent peers... */
908        for (; p!=pend; ++p)
909            if ((*p)->pendingReqsToPeer > 0)
910                ++numDownloading;
911
912        /* add the active webseeds... */
913        numDownloading += countActiveWebseeds (t);
914
915        /* average number of pending requests per downloading peer */
916        t->endgame = t->requestCount / MAX (numDownloading, 1);
917    }
918}
919
920
921/****
922*****
923*****  Piece List Manipulation / Accessors
924*****
925****/
926
927static inline void
928invalidatePieceSorting (Torrent * t)
929{
930    t->pieceSortState = PIECES_UNSORTED;
931}
932
933static const tr_torrent * weightTorrent;
934
935static const uint16_t * weightReplication;
936
937static void
938setComparePieceByWeightTorrent (Torrent * t)
939{
940    if (!replicationExists (t))
941        replicationNew (t);
942
943    weightTorrent = t->tor;
944    weightReplication = t->pieceReplication;
945}
946
947/* we try to create a "weight" s.t. high-priority pieces come before others,
948 * and that partially-complete pieces come before empty ones. */
949static int
950comparePieceByWeight (const void * va, const void * vb)
951{
952    const struct weighted_piece * a = va;
953    const struct weighted_piece * b = vb;
954    int ia, ib, missing, pending;
955    const tr_torrent * tor = weightTorrent;
956    const uint16_t * rep = weightReplication;
957
958    /* primary key: weight */
959    missing = tr_cpMissingBlocksInPiece (&tor->completion, a->index);
960    pending = a->requestCount;
961    ia = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
962    missing = tr_cpMissingBlocksInPiece (&tor->completion, b->index);
963    pending = b->requestCount;
964    ib = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
965    if (ia < ib) return -1;
966    if (ia > ib) return 1;
967
968    /* secondary key: higher priorities go first */
969    ia = tor->info.pieces[a->index].priority;
970    ib = tor->info.pieces[b->index].priority;
971    if (ia > ib) return -1;
972    if (ia < ib) return 1;
973
974    /* tertiary key: rarest first. */
975    ia = rep[a->index];
976    ib = rep[b->index];
977    if (ia < ib) return -1;
978    if (ia > ib) return 1;
979
980    /* quaternary key: random */
981    if (a->salt < b->salt) return -1;
982    if (a->salt > b->salt) return 1;
983
984    /* okay, they're equal */
985    return 0;
986}
987
988static int
989comparePieceByIndex (const void * va, const void * vb)
990{
991    const struct weighted_piece * a = va;
992    const struct weighted_piece * b = vb;
993    if (a->index < b->index) return -1;
994    if (a->index > b->index) return 1;
995    return 0;
996}
997
998static void
999pieceListSort (Torrent * t, enum piece_sort_state state)
1000{
1001    assert (state==PIECES_SORTED_BY_INDEX
1002         || state==PIECES_SORTED_BY_WEIGHT);
1003
1004
1005    if (state == PIECES_SORTED_BY_WEIGHT)
1006    {
1007        setComparePieceByWeightTorrent (t);
1008        qsort (t->pieces, t->pieceCount, sizeof (struct weighted_piece), comparePieceByWeight);
1009    }
1010    else
1011        qsort (t->pieces, t->pieceCount, sizeof (struct weighted_piece), comparePieceByIndex);
1012
1013    t->pieceSortState = state;
1014}
1015
1016/**
1017 * These functions are useful for testing, but too expensive for nightly builds.
1018 * let's leave it disabled but add an easy hook to compile it back in
1019 */
1020#if 1
1021#define assertWeightedPiecesAreSorted(t)
1022#define assertReplicationCountIsExact(t)
1023#else
1024static void
1025assertWeightedPiecesAreSorted (Torrent * t)
1026{
1027    if (!t->endgame)
1028    {
1029        int i;
1030        setComparePieceByWeightTorrent (t);
1031        for (i=0; i<t->pieceCount-1; ++i)
1032            assert (comparePieceByWeight (&t->pieces[i], &t->pieces[i+1]) <= 0);
1033    }
1034}
1035static void
1036assertReplicationCountIsExact (Torrent * t)
1037{
1038    /* This assert might fail due to errors of implementations in other
1039     * clients. It happens when receiving duplicate bitfields/HaveAll/HaveNone
1040     * from a client. If a such a behavior is noticed,
1041     * a bug report should be filled to the faulty client. */
1042
1043    size_t piece_i;
1044    const uint16_t * rep = t->pieceReplication;
1045    const size_t piece_count = t->pieceReplicationSize;
1046    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase (&t->peers);
1047    const int peer_count = tr_ptrArraySize (&t->peers);
1048
1049    assert (piece_count == t->tor->info.pieceCount);
1050
1051    for (piece_i=0; piece_i<piece_count; ++piece_i)
1052    {
1053        int peer_i;
1054        uint16_t r = 0;
1055
1056        for (peer_i=0; peer_i<peer_count; ++peer_i)
1057            if (tr_bitsetHas (&peers[peer_i]->have, piece_i))
1058                ++r;
1059
1060        assert (rep[piece_i] == r);
1061    }
1062}
1063#endif
1064
1065static struct weighted_piece *
1066pieceListLookup (Torrent * t, tr_piece_index_t index)
1067{
1068    int i;
1069
1070    for (i=0; i<t->pieceCount; ++i)
1071        if (t->pieces[i].index == index)
1072            return &t->pieces[i];
1073
1074    return NULL;
1075}
1076
1077static void
1078pieceListRebuild (Torrent * t)
1079{
1080
1081    if (!tr_torrentIsSeed (t->tor))
1082    {
1083        tr_piece_index_t i;
1084        tr_piece_index_t * pool;
1085        tr_piece_index_t poolCount = 0;
1086        const tr_torrent * tor = t->tor;
1087        const tr_info * inf = tr_torrentInfo (tor);
1088        struct weighted_piece * pieces;
1089        int pieceCount;
1090
1091        /* build the new list */
1092        pool = tr_new (tr_piece_index_t, inf->pieceCount);
1093        for (i=0; i<inf->pieceCount; ++i)
1094            if (!inf->pieces[i].dnd)
1095                if (!tr_cpPieceIsComplete (&tor->completion, i))
1096                    pool[poolCount++] = i;
1097        pieceCount = poolCount;
1098        pieces = tr_new0 (struct weighted_piece, pieceCount);
1099        for (i=0; i<poolCount; ++i) {
1100            struct weighted_piece * piece = pieces + i;
1101            piece->index = pool[i];
1102            piece->requestCount = 0;
1103            piece->salt = tr_cryptoWeakRandInt (4096);
1104        }
1105
1106        /* if we already had a list of pieces, merge it into
1107         * the new list so we don't lose its requestCounts */
1108        if (t->pieces != NULL)
1109        {
1110            struct weighted_piece * o = t->pieces;
1111            struct weighted_piece * oend = o + t->pieceCount;
1112            struct weighted_piece * n = pieces;
1113            struct weighted_piece * nend = n + pieceCount;
1114
1115            pieceListSort (t, PIECES_SORTED_BY_INDEX);
1116
1117            while (o!=oend && n!=nend) {
1118                if (o->index < n->index)
1119                    ++o;
1120                else if (o->index > n->index)
1121                    ++n;
1122                else
1123                    *n++ = *o++;
1124            }
1125
1126            tr_free (t->pieces);
1127        }
1128
1129        t->pieces = pieces;
1130        t->pieceCount = pieceCount;
1131
1132        pieceListSort (t, PIECES_SORTED_BY_WEIGHT);
1133
1134        /* cleanup */
1135        tr_free (pool);
1136    }
1137}
1138
1139static void
1140pieceListRemovePiece (Torrent * t, tr_piece_index_t piece)
1141{
1142    struct weighted_piece * p;
1143
1144    if ((p = pieceListLookup (t, piece)))
1145    {
1146        const int pos = p - t->pieces;
1147
1148        tr_removeElementFromArray (t->pieces,
1149                                   pos,
1150                                   sizeof (struct weighted_piece),
1151                                   t->pieceCount--);
1152
1153        if (t->pieceCount == 0)
1154        {
1155            tr_free (t->pieces);
1156            t->pieces = NULL;
1157        }
1158    }
1159}
1160
1161static void
1162pieceListResortPiece (Torrent * t, struct weighted_piece * p)
1163{
1164    int pos;
1165    bool isSorted = true;
1166
1167    if (p == NULL)
1168        return;
1169
1170    /* is the torrent already sorted? */
1171    pos = p - t->pieces;
1172    setComparePieceByWeightTorrent (t);
1173    if (isSorted && (pos > 0) && (comparePieceByWeight (p-1, p) > 0))
1174        isSorted = false;
1175    if (isSorted && (pos < t->pieceCount - 1) && (comparePieceByWeight (p, p+1) > 0))
1176        isSorted = false;
1177
1178    if (t->pieceSortState != PIECES_SORTED_BY_WEIGHT)
1179    {
1180       pieceListSort (t, PIECES_SORTED_BY_WEIGHT);
1181       isSorted = true;
1182    }
1183
1184    /* if it's not sorted, move it around */
1185    if (!isSorted)
1186    {
1187        bool exact;
1188        const struct weighted_piece tmp = *p;
1189
1190        tr_removeElementFromArray (t->pieces,
1191                                   pos,
1192                                   sizeof (struct weighted_piece),
1193                                   t->pieceCount--);
1194
1195        pos = tr_lowerBound (&tmp, t->pieces, t->pieceCount,
1196                             sizeof (struct weighted_piece),
1197                             comparePieceByWeight, &exact);
1198
1199        memmove (&t->pieces[pos + 1],
1200                 &t->pieces[pos],
1201                 sizeof (struct weighted_piece) * (t->pieceCount++ - pos));
1202
1203        t->pieces[pos] = tmp;
1204    }
1205
1206    assertWeightedPiecesAreSorted (t);
1207}
1208
1209static void
1210pieceListRemoveRequest (Torrent * t, tr_block_index_t block)
1211{
1212    struct weighted_piece * p;
1213    const tr_piece_index_t index = tr_torBlockPiece (t->tor, block);
1214
1215    if (((p = pieceListLookup (t, index))) && (p->requestCount > 0))
1216    {
1217        --p->requestCount;
1218        pieceListResortPiece (t, p);
1219    }
1220}
1221
1222
1223/****
1224*****
1225*****  Replication count (for rarest first policy)
1226*****
1227****/
1228
1229/**
1230 * Increase the replication count of this piece and sort it if the
1231 * piece list is already sorted
1232 */
1233static void
1234tr_incrReplicationOfPiece (Torrent * t, const size_t index)
1235{
1236    assert (replicationExists (t));
1237    assert (t->pieceReplicationSize == t->tor->info.pieceCount);
1238
1239    /* One more replication of this piece is present in the swarm */
1240    ++t->pieceReplication[index];
1241
1242    /* we only resort the piece if the list is already sorted */
1243    if (t->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1244        pieceListResortPiece (t, pieceListLookup (t, index));
1245}
1246
1247/**
1248 * Increases the replication count of pieces present in the bitfield
1249 */
1250static void
1251tr_incrReplicationFromBitfield (Torrent * t, const tr_bitfield * b)
1252{
1253    size_t i;
1254    uint16_t * rep = t->pieceReplication;
1255    const size_t n = t->tor->info.pieceCount;
1256
1257    assert (replicationExists (t));
1258
1259    for (i=0; i<n; ++i)
1260        if (tr_bitfieldHas (b, i))
1261            ++rep[i];
1262
1263    if (t->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1264        invalidatePieceSorting (t);
1265}
1266
1267/**
1268 * Increase the replication count of every piece
1269 */
1270static void
1271tr_incrReplication (Torrent * t)
1272{
1273    int i;
1274    const int n = t->pieceReplicationSize;
1275
1276    assert (replicationExists (t));
1277    assert (t->pieceReplicationSize == t->tor->info.pieceCount);
1278
1279    for (i=0; i<n; ++i)
1280        ++t->pieceReplication[i];
1281}
1282
1283/**
1284 * Decrease the replication count of pieces present in the bitset.
1285 */
1286static void
1287tr_decrReplicationFromBitfield (Torrent * t, const tr_bitfield * b)
1288{
1289    int i;
1290    const int n = t->pieceReplicationSize;
1291
1292    assert (replicationExists (t));
1293    assert (t->pieceReplicationSize == t->tor->info.pieceCount);
1294
1295    if (tr_bitfieldHasAll (b))
1296    {
1297        for (i=0; i<n; ++i)
1298            --t->pieceReplication[i];
1299    }
1300    else if (!tr_bitfieldHasNone (b))
1301    {
1302        for (i=0; i<n; ++i)
1303            if (tr_bitfieldHas (b, i))
1304                --t->pieceReplication[i];
1305
1306        if (t->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1307            invalidatePieceSorting (t);
1308    }
1309}
1310
1311/**
1312***
1313**/
1314
1315void
1316tr_peerMgrRebuildRequests (tr_torrent * tor)
1317{
1318    assert (tr_isTorrent (tor));
1319
1320    pieceListRebuild (tor->torrentPeers);
1321}
1322
1323void
1324tr_peerMgrGetNextRequests (tr_torrent           * tor,
1325                           tr_peer              * peer,
1326                           int                    numwant,
1327                           tr_block_index_t     * setme,
1328                           int                  * numgot,
1329                           bool                   get_intervals)
1330{
1331    int i;
1332    int got;
1333    Torrent * t;
1334    struct weighted_piece * pieces;
1335    const tr_bitfield * const have = &peer->have;
1336
1337    /* sanity clause */
1338    assert (tr_isTorrent (tor));
1339    assert (peer->clientIsInterested);
1340    assert (!peer->clientIsChoked);
1341    assert (numwant > 0);
1342
1343    /* walk through the pieces and find blocks that should be requested */
1344    got = 0;
1345    t = tor->torrentPeers;
1346
1347    /* prep the pieces list */
1348    if (t->pieces == NULL)
1349        pieceListRebuild (t);
1350
1351    if (t->pieceSortState != PIECES_SORTED_BY_WEIGHT)
1352        pieceListSort (t, PIECES_SORTED_BY_WEIGHT);
1353
1354    assertReplicationCountIsExact (t);
1355    assertWeightedPiecesAreSorted (t);
1356
1357    updateEndgame (t);
1358    pieces = t->pieces;
1359    for (i=0; i<t->pieceCount && got<numwant; ++i)
1360    {
1361        struct weighted_piece * p = pieces + i;
1362
1363        /* if the peer has this piece that we want... */
1364        if (tr_bitfieldHas (have, p->index))
1365        {
1366            tr_block_index_t b;
1367            tr_block_index_t first;
1368            tr_block_index_t last;
1369            tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1370
1371            tr_torGetPieceBlockRange (tor, p->index, &first, &last);
1372
1373            for (b=first; b<=last && (got<numwant || (get_intervals && setme[2*got-1] == b-1)); ++b)
1374            {
1375                int peerCount;
1376                tr_peer ** peers;
1377
1378                /* don't request blocks we've already got */
1379                if (tr_cpBlockIsComplete (&tor->completion, b))
1380                    continue;
1381
1382                /* always add peer if this block has no peers yet */
1383                tr_ptrArrayClear (&peerArr);
1384                getBlockRequestPeers (t, b, &peerArr);
1385                peers = (tr_peer **) tr_ptrArrayPeek (&peerArr, &peerCount);
1386                if (peerCount != 0)
1387                {
1388                    /* don't make a second block request until the endgame */
1389                    if (!t->endgame)
1390                        continue;
1391
1392                    /* don't have more than two peers requesting this block */
1393                    if (peerCount > 1)
1394                        continue;
1395
1396                    /* don't send the same request to the same peer twice */
1397                    if (peer == peers[0])
1398                        continue;
1399
1400                    /* in the endgame allow an additional peer to download a
1401                       block but only if the peer seems to be handling requests
1402                       relatively fast */
1403                    if (peer->pendingReqsToPeer + numwant - got < t->endgame)
1404                        continue;
1405                }
1406
1407                /* update the caller's table */
1408                if (!get_intervals) {
1409                    setme[got++] = b;
1410                }
1411                /* if intervals are requested two array entries are necessarry:
1412                   one for the interval's starting block and one for its end block */
1413                else if (got && setme[2 * got - 1] == b - 1 && b != first) {
1414                    /* expand the last interval */
1415                    ++setme[2 * got - 1];
1416                }
1417                else {
1418                    /* begin a new interval */
1419                    setme[2 * got] = setme[2 * got + 1] = b;
1420                    ++got;
1421                }
1422
1423                /* update our own tables */
1424                requestListAdd (t, b, peer);
1425                ++p->requestCount;
1426            }
1427
1428            tr_ptrArrayDestruct (&peerArr, NULL);
1429        }
1430    }
1431
1432    /* In most cases we've just changed the weights of a small number of pieces.
1433     * So rather than qsort ()ing the entire array, it's faster to apply an
1434     * adaptive insertion sort algorithm. */
1435    if (got > 0)
1436    {
1437        /* not enough requests || last piece modified */
1438        if (i == t->pieceCount) --i;
1439
1440        setComparePieceByWeightTorrent (t);
1441        while (--i >= 0)
1442        {
1443            bool exact;
1444
1445            /* relative position! */
1446            const int newpos = tr_lowerBound (&t->pieces[i], &t->pieces[i + 1],
1447                                              t->pieceCount - (i + 1),
1448                                              sizeof (struct weighted_piece),
1449                                              comparePieceByWeight, &exact);
1450            if (newpos > 0)
1451            {
1452                const struct weighted_piece piece = t->pieces[i];
1453                memmove (&t->pieces[i],
1454                         &t->pieces[i + 1],
1455                         sizeof (struct weighted_piece) * (newpos));
1456                t->pieces[i + newpos] = piece;
1457            }
1458        }
1459    }
1460
1461    assertWeightedPiecesAreSorted (t);
1462    *numgot = got;
1463}
1464
1465bool
1466tr_peerMgrDidPeerRequest (const tr_torrent  * tor,
1467                          const tr_peer     * peer,
1468                          tr_block_index_t    block)
1469{
1470    const Torrent * t = tor->torrentPeers;
1471    return requestListLookup ((Torrent*)t, block, peer) != NULL;
1472}
1473
1474/* cancel requests that are too old */
1475static void
1476refillUpkeep (int foo UNUSED, short bar UNUSED, void * vmgr)
1477{
1478    time_t now;
1479    time_t too_old;
1480    tr_torrent * tor;
1481    int cancel_buflen = 0;
1482    struct block_request * cancel = NULL;
1483    tr_peerMgr * mgr = vmgr;
1484    managerLock (mgr);
1485
1486    now = tr_time ();
1487    too_old = now - REQUEST_TTL_SECS;
1488
1489    /* alloc the temporary "cancel" buffer */
1490    tor = NULL;
1491    while ((tor = tr_torrentNext (mgr->session, tor)))
1492        cancel_buflen = MAX (cancel_buflen, tor->torrentPeers->requestCount);
1493    if (cancel_buflen > 0)
1494        cancel = tr_new (struct block_request, cancel_buflen);
1495
1496    /* prune requests that are too old */
1497    tor = NULL;
1498    while ((tor = tr_torrentNext (mgr->session, tor)))
1499    {
1500        Torrent * t = tor->torrentPeers;
1501        const int n = t->requestCount;
1502        if (n > 0)
1503        {
1504            int keepCount = 0;
1505            int cancelCount = 0;
1506            const struct block_request * it;
1507            const struct block_request * end;
1508
1509            for (it=t->requests, end=it+n; it!=end; ++it)
1510            {
1511                if ((it->sentAt <= too_old) && it->peer->msgs && !tr_peerMsgsIsReadingBlock (it->peer->msgs, it->block))
1512                    cancel[cancelCount++] = *it;
1513                else
1514                {
1515                    if (it != &t->requests[keepCount])
1516                        t->requests[keepCount] = *it;
1517                    keepCount++;
1518                }
1519            }
1520
1521            /* prune out the ones we aren't keeping */
1522            t->requestCount = keepCount;
1523
1524            /* send cancel messages for all the "cancel" ones */
1525            for (it=cancel, end=it+cancelCount; it!=end; ++it) {
1526                if ((it->peer != NULL) && (it->peer->msgs != NULL)) {
1527                    tr_historyAdd (&it->peer->cancelsSentToPeer, now, 1);
1528                    tr_peerMsgsCancel (it->peer->msgs, it->block);
1529                    decrementPendingReqCount (it);
1530                }
1531            }
1532
1533            /* decrement the pending request counts for the timed-out blocks */
1534            for (it=cancel, end=it+cancelCount; it!=end; ++it)
1535                pieceListRemoveRequest (t, it->block);
1536        }
1537    }
1538
1539    tr_free (cancel);
1540    tr_timerAddMsec (mgr->refillUpkeepTimer, REFILL_UPKEEP_PERIOD_MSEC);
1541    managerUnlock (mgr);
1542}
1543
1544static void
1545addStrike (Torrent * t, tr_peer * peer)
1546{
1547    tordbg (t, "increasing peer %s strike count to %d",
1548            tr_atomAddrStr (peer->atom), peer->strikes + 1);
1549
1550    if (++peer->strikes >= MAX_BAD_PIECES_PER_PEER)
1551    {
1552        struct peer_atom * atom = peer->atom;
1553        atom->flags2 |= MYFLAG_BANNED;
1554        peer->doPurge = 1;
1555        tordbg (t, "banning peer %s", tr_atomAddrStr (atom));
1556    }
1557}
1558
1559static void
1560gotBadPiece (Torrent * t, tr_piece_index_t pieceIndex)
1561{
1562    tr_torrent *   tor = t->tor;
1563    const uint32_t byteCount = tr_torPieceCountBytes (tor, pieceIndex);
1564
1565    tor->corruptCur += byteCount;
1566    tor->downloadedCur -= MIN (tor->downloadedCur, byteCount);
1567
1568    tr_announcerAddBytes (tor, TR_ANN_CORRUPT, byteCount);
1569}
1570
1571static void
1572peerSuggestedPiece (Torrent            * t UNUSED,
1573                    tr_peer            * peer UNUSED,
1574                    tr_piece_index_t     pieceIndex UNUSED,
1575                    int                  isFastAllowed UNUSED)
1576{
1577#if 0
1578    assert (t);
1579    assert (peer);
1580    assert (peer->msgs);
1581
1582    /* is this a valid piece? */
1583    if (pieceIndex >= t->tor->info.pieceCount)
1584        return;
1585
1586    /* don't ask for it if we've already got it */
1587    if (tr_cpPieceIsComplete (t->tor->completion, pieceIndex))
1588        return;
1589
1590    /* don't ask for it if they don't have it */
1591    if (!tr_bitfieldHas (peer->have, pieceIndex))
1592        return;
1593
1594    /* don't ask for it if we're choked and it's not fast */
1595    if (!isFastAllowed && peer->clientIsChoked)
1596        return;
1597
1598    /* request the blocks that we don't have in this piece */
1599    {
1600        tr_block_index_t b;
1601        tr_block_index_t first;
1602        tr_block_index_t last;
1603        const tr_torrent * tor = t->tor;
1604
1605        tr_torGetPieceBlockRange (t->tor, pieceIndex, &first, &last);
1606
1607        for (b=first; b<=last; ++b)
1608        {
1609            if (!tr_cpBlockIsComplete (tor->completion, b))
1610            {
1611                const uint32_t offset = getBlockOffsetInPiece (tor, b);
1612                const uint32_t length = tr_torBlockCountBytes (tor, b);
1613                tr_peerMsgsAddRequest (peer->msgs, pieceIndex, offset, length);
1614                incrementPieceRequests (t, pieceIndex);
1615            }
1616        }
1617    }
1618#endif
1619}
1620
1621static void
1622removeRequestFromTables (Torrent * t, tr_block_index_t block, const tr_peer * peer)
1623{
1624    requestListRemove (t, block, peer);
1625    pieceListRemoveRequest (t, block);
1626}
1627
1628/* peer choked us, or maybe it disconnected.
1629   either way we need to remove all its requests */
1630static void
1631peerDeclinedAllRequests (Torrent * t, const tr_peer * peer)
1632{
1633    int i, n;
1634    tr_block_index_t * blocks = tr_new (tr_block_index_t, t->requestCount);
1635
1636    for (i=n=0; i<t->requestCount; ++i)
1637        if (peer == t->requests[i].peer)
1638            blocks[n++] = t->requests[i].block;
1639
1640    for (i=0; i<n; ++i)
1641        removeRequestFromTables (t, blocks[i], peer);
1642
1643    tr_free (blocks);
1644}
1645
1646static void tr_peerMgrSetBlame (tr_torrent *, tr_piece_index_t, int);
1647
1648static void
1649peerCallbackFunc (tr_peer * peer, const tr_peer_event * e, void * vt)
1650{
1651    Torrent * t = vt;
1652
1653    torrentLock (t);
1654
1655    assert (peer != NULL);
1656
1657    switch (e->eventType)
1658    {
1659        case TR_PEER_PEER_GOT_DATA:
1660        {
1661            const time_t now = tr_time ();
1662            tr_torrent * tor = t->tor;
1663
1664            if (e->wasPieceData)
1665            {
1666                tor->uploadedCur += e->length;
1667                tr_announcerAddBytes (tor, TR_ANN_UP, e->length);
1668                tr_torrentSetActivityDate (tor, now);
1669                tr_torrentSetDirty (tor);
1670            }
1671
1672            /* update the stats */
1673            if (e->wasPieceData)
1674                tr_statsAddUploaded (tor->session, e->length);
1675
1676            /* update our atom */
1677            if (peer->atom && e->wasPieceData)
1678                peer->atom->piece_data_time = now;
1679
1680            break;
1681        }
1682
1683        case TR_PEER_CLIENT_GOT_HAVE:
1684            if (replicationExists (t)) {
1685                tr_incrReplicationOfPiece (t, e->pieceIndex);
1686                assertReplicationCountIsExact (t);
1687            }
1688            break;
1689
1690        case TR_PEER_CLIENT_GOT_HAVE_ALL:
1691            if (replicationExists (t)) {
1692                tr_incrReplication (t);
1693                assertReplicationCountIsExact (t);
1694            }
1695            break;
1696
1697        case TR_PEER_CLIENT_GOT_HAVE_NONE:
1698            /* noop */
1699            break;
1700
1701        case TR_PEER_CLIENT_GOT_BITFIELD:
1702            assert (e->bitfield != NULL);
1703            if (replicationExists (t)) {
1704                tr_incrReplicationFromBitfield (t, e->bitfield);
1705                assertReplicationCountIsExact (t);
1706            }
1707            break;
1708
1709        case TR_PEER_CLIENT_GOT_REJ: {
1710            tr_block_index_t b = _tr_block (t->tor, e->pieceIndex, e->offset);
1711            if (b < t->tor->blockCount)
1712                removeRequestFromTables (t, b, peer);
1713            else
1714                tordbg (t, "Peer %s sent an out-of-range reject message",
1715                           tr_atomAddrStr (peer->atom));
1716            break;
1717        }
1718
1719        case TR_PEER_CLIENT_GOT_CHOKE:
1720            peerDeclinedAllRequests (t, peer);
1721            break;
1722
1723        case TR_PEER_CLIENT_GOT_PORT:
1724            if (peer->atom)
1725                peer->atom->port = e->port;
1726            break;
1727
1728        case TR_PEER_CLIENT_GOT_SUGGEST:
1729            peerSuggestedPiece (t, peer, e->pieceIndex, false);
1730            break;
1731
1732        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1733            peerSuggestedPiece (t, peer, e->pieceIndex, true);
1734            break;
1735
1736        case TR_PEER_CLIENT_GOT_DATA:
1737        {
1738            const time_t now = tr_time ();
1739            tr_torrent * tor = t->tor;
1740
1741            if (e->wasPieceData)
1742            {
1743                tor->downloadedCur += e->length;
1744                tr_torrentSetActivityDate (tor, now);
1745                tr_torrentSetDirty (tor);
1746            }
1747
1748            /* update the stats */
1749            if (e->wasPieceData)
1750                tr_statsAddDownloaded (tor->session, e->length);
1751
1752            /* update our atom */
1753            if (peer->atom && e->wasPieceData)
1754                peer->atom->piece_data_time = now;
1755
1756            break;
1757        }
1758
1759        case TR_PEER_CLIENT_GOT_BLOCK:
1760        {
1761            tr_torrent * tor = t->tor;
1762            tr_block_index_t block = _tr_block (tor, e->pieceIndex, e->offset);
1763            int i, peerCount;
1764            tr_peer ** peers;
1765            tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1766
1767            removeRequestFromTables (t, block, peer);
1768            getBlockRequestPeers (t, block, &peerArr);
1769            peers = (tr_peer **) tr_ptrArrayPeek (&peerArr, &peerCount);
1770
1771            /* remove additional block requests and send cancel to peers */
1772            for (i=0; i<peerCount; i++) {
1773                tr_peer * p = peers[i];
1774                assert (p != peer);
1775                if (p->msgs) {
1776                    tr_historyAdd (&p->cancelsSentToPeer, tr_time (), 1);
1777                    tr_peerMsgsCancel (p->msgs, block);
1778                }
1779                removeRequestFromTables (t, block, p);
1780            }
1781
1782            tr_ptrArrayDestruct (&peerArr, false);
1783
1784            tr_historyAdd (&peer->blocksSentToClient, tr_time (), 1);
1785
1786            if (tr_cpBlockIsComplete (&tor->completion, block))
1787            {
1788                /* we already have this block... */
1789                const uint32_t n = tr_torBlockCountBytes (tor, block);
1790                tor->downloadedCur -= MIN (tor->downloadedCur, n);
1791                tordbg (t, "we have this block already...");
1792            }
1793            else
1794            {
1795                tr_cpBlockAdd (&tor->completion, block);
1796                pieceListResortPiece (t, pieceListLookup (t, e->pieceIndex));
1797                tr_torrentSetDirty (tor);
1798
1799                if (tr_cpPieceIsComplete (&tor->completion, e->pieceIndex))
1800                {
1801                    const tr_piece_index_t p = e->pieceIndex;
1802                    const bool ok = tr_torrentCheckPiece (tor, p);
1803
1804                    tordbg (t, "[LAZY] checked just-completed piece %zu", (size_t)p);
1805
1806                    if (!ok)
1807                    {
1808                        tr_torerr (tor, _("Piece %lu, which was just downloaded, failed its checksum test"),
1809                                 (unsigned long)p);
1810                    }
1811
1812                    tr_peerMgrSetBlame (tor, p, ok);
1813
1814                    if (!ok)
1815                    {
1816                        gotBadPiece (t, p);
1817                    }
1818                    else
1819                    {
1820                        int i;
1821                        int peerCount;
1822                        tr_peer ** peers;
1823                        tr_file_index_t fileIndex;
1824
1825                        /* only add this to downloadedCur if we got it from a peer --
1826                         * webseeds shouldn't count against our ratio. As one tracker
1827                         * admin put it, "Those pieces are downloaded directly from the
1828                         * content distributor, not the peers, it is the tracker's job
1829                         * to manage the swarms, not the web server and does not fit
1830                         * into the jurisdiction of the tracker." */
1831                        if (peer->msgs != NULL) {
1832                            const uint32_t n = tr_torPieceCountBytes (tor, p);
1833                            tr_announcerAddBytes (tor, TR_ANN_DOWN, n);
1834                        }
1835
1836                        peerCount = tr_ptrArraySize (&t->peers);
1837                        peers = (tr_peer**) tr_ptrArrayBase (&t->peers);
1838                        for (i=0; i<peerCount; ++i)
1839                            tr_peerMsgsHave (peers[i]->msgs, p);
1840
1841                        for (fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex) {
1842                            const tr_file * file = &tor->info.files[fileIndex];
1843                            if ((file->firstPiece <= p) && (p <= file->lastPiece)) {
1844                                if (tr_cpFileIsComplete (&tor->completion, fileIndex)) {
1845                                    tr_cacheFlushFile (tor->session->cache, tor, fileIndex);
1846                                    tr_torrentFileCompleted (tor, fileIndex);
1847                                }
1848                            }
1849                        }
1850
1851                        pieceListRemovePiece (t, p);
1852                    }
1853                }
1854
1855                t->needsCompletenessCheck = true;
1856            }
1857            break;
1858        }
1859
1860        case TR_PEER_ERROR:
1861            if ((e->err == ERANGE) || (e->err == EMSGSIZE) || (e->err == ENOTCONN))
1862            {
1863                /* some protocol error from the peer */
1864                peer->doPurge = 1;
1865                tordbg (t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1866                        tr_atomAddrStr (peer->atom));
1867            }
1868            else
1869            {
1870                tordbg (t, "unhandled error: %s", tr_strerror (e->err));
1871            }
1872            break;
1873
1874        default:
1875            assert (0);
1876    }
1877
1878    torrentUnlock (t);
1879}
1880
1881static int
1882getDefaultShelfLife (uint8_t from)
1883{
1884    /* in general, peers obtained from firsthand contact
1885     * are better than those from secondhand, etc etc */
1886    switch (from)
1887    {
1888        case TR_PEER_FROM_INCOMING : return 60 * 60 * 6;
1889        case TR_PEER_FROM_LTEP     : return 60 * 60 * 6;
1890        case TR_PEER_FROM_TRACKER  : return 60 * 60 * 3;
1891        case TR_PEER_FROM_DHT      : return 60 * 60 * 3;
1892        case TR_PEER_FROM_PEX      : return 60 * 60 * 2;
1893        case TR_PEER_FROM_RESUME   : return 60 * 60;
1894        case TR_PEER_FROM_LPD      : return 10 * 60;
1895        default                    : return 60 * 60;
1896    }
1897}
1898
1899static void
1900ensureAtomExists (Torrent           * t,
1901                  const tr_address  * addr,
1902                  const tr_port       port,
1903                  const uint8_t       flags,
1904                  const int8_t        seedProbability,
1905                  const uint8_t       from)
1906{
1907    struct peer_atom * a;
1908
1909    assert (tr_address_is_valid (addr));
1910    assert (from < TR_PEER_FROM__MAX);
1911
1912    a = getExistingAtom (t, addr);
1913
1914    if (a == NULL)
1915    {
1916        const int jitter = tr_cryptoWeakRandInt (60*10);
1917        a = tr_new0 (struct peer_atom, 1);
1918        a->addr = *addr;
1919        a->port = port;
1920        a->flags = flags;
1921        a->fromFirst = from;
1922        a->fromBest = from;
1923        a->shelf_date = tr_time () + getDefaultShelfLife (from) + jitter;
1924        a->blocklisted = -1;
1925        atomSetSeedProbability (a, seedProbability);
1926        tr_ptrArrayInsertSorted (&t->pool, a, compareAtomsByAddress);
1927
1928        tordbg (t, "got a new atom: %s", tr_atomAddrStr (a));
1929    }
1930    else
1931    {
1932        if (from < a->fromBest)
1933            a->fromBest = from;
1934
1935        if (a->seedProbability == -1)
1936            atomSetSeedProbability (a, seedProbability);
1937
1938        a->flags |= flags;
1939    }
1940}
1941
1942static int
1943getMaxPeerCount (const tr_torrent * tor)
1944{
1945    return tor->maxConnectedPeers;
1946}
1947
1948static int
1949getPeerCount (const Torrent * t)
1950{
1951    return tr_ptrArraySize (&t->peers);/* + tr_ptrArraySize (&t->outgoingHandshakes); */
1952}
1953
1954/* FIXME: this is kind of a mess. */
1955static bool
1956myHandshakeDoneCB (tr_handshake  * handshake,
1957                   tr_peerIo     * io,
1958                   bool            readAnythingFromPeer,
1959                   bool            isConnected,
1960                   const uint8_t * peer_id,
1961                   void          * vmanager)
1962{
1963    bool               ok = isConnected;
1964    bool               success = false;
1965    tr_port            port;
1966    const tr_address * addr;
1967    tr_peerMgr       * manager = vmanager;
1968    Torrent          * t;
1969    tr_handshake     * ours;
1970
1971    assert (io);
1972    assert (tr_isBool (ok));
1973
1974    t = tr_peerIoHasTorrentHash (io)
1975        ? getExistingTorrent (manager, tr_peerIoGetTorrentHash (io))
1976        : NULL;
1977
1978    if (tr_peerIoIsIncoming (io))
1979        ours = tr_ptrArrayRemoveSorted (&manager->incomingHandshakes,
1980                                        handshake, handshakeCompare);
1981    else if (t)
1982        ours = tr_ptrArrayRemoveSorted (&t->outgoingHandshakes,
1983                                        handshake, handshakeCompare);
1984    else
1985        ours = handshake;
1986
1987    assert (ours);
1988    assert (ours == handshake);
1989
1990    if (t)
1991        torrentLock (t);
1992
1993    addr = tr_peerIoGetAddress (io, &port);
1994
1995    if (!ok || !t || !t->isRunning)
1996    {
1997        if (t)
1998        {
1999            struct peer_atom * atom = getExistingAtom (t, addr);
2000            if (atom)
2001            {
2002                ++atom->numFails;
2003
2004                if (!readAnythingFromPeer)
2005                {
2006                    tordbg (t, "marking peer %s as unreachable... numFails is %d", tr_atomAddrStr (atom), (int)atom->numFails);
2007                    atom->flags2 |= MYFLAG_UNREACHABLE;
2008                }
2009            }
2010        }
2011    }
2012    else /* looking good */
2013    {
2014        struct peer_atom * atom;
2015
2016        ensureAtomExists (t, addr, port, 0, -1, TR_PEER_FROM_INCOMING);
2017        atom = getExistingAtom (t, addr);
2018        atom->time = tr_time ();
2019        atom->piece_data_time = 0;
2020        atom->lastConnectionAt = tr_time ();
2021
2022        if (!tr_peerIoIsIncoming (io))
2023        {
2024            atom->flags |= ADDED_F_CONNECTABLE;
2025            atom->flags2 &= ~MYFLAG_UNREACHABLE;
2026        }
2027
2028        /* In principle, this flag specifies whether the peer groks uTP,
2029           not whether it's currently connected over uTP. */
2030        if (io->utp_socket)
2031            atom->flags |= ADDED_F_UTP_FLAGS;
2032
2033        if (atom->flags2 & MYFLAG_BANNED)
2034        {
2035            tordbg (t, "banned peer %s tried to reconnect",
2036                    tr_atomAddrStr (atom));
2037        }
2038        else if (tr_peerIoIsIncoming (io)
2039               && (getPeerCount (t) >= getMaxPeerCount (t->tor)))
2040
2041        {
2042        }
2043        else
2044        {
2045            tr_peer * peer = atom->peer;
2046
2047            if (peer) /* we already have this peer */
2048            {
2049            }
2050            else
2051            {
2052                peer = getPeer (t, atom);
2053                tr_free (peer->client);
2054
2055                if (!peer_id)
2056                    peer->client = NULL;
2057                else {
2058                    char client[128];
2059                    tr_clientForId (client, sizeof (client), peer_id);
2060                    peer->client = tr_strdup (client);
2061                }
2062
2063                peer->io = tr_handshakeStealIO (handshake); /* this steals its refcount too, which is
2064                                                                balanced by our unref in peerDelete ()  */
2065                tr_peerIoSetParent (peer->io, &t->tor->bandwidth);
2066                tr_peerMsgsNew (t->tor, peer, peerCallbackFunc, t);
2067
2068                success = true;
2069            }
2070        }
2071    }
2072
2073    if (t)
2074        torrentUnlock (t);
2075
2076    return success;
2077}
2078
2079void
2080tr_peerMgrAddIncoming (tr_peerMgr * manager,
2081                       tr_address * addr,
2082                       tr_port      port,
2083                       int          socket,
2084                       struct UTPSocket * utp_socket)
2085{
2086    tr_session * session;
2087
2088    managerLock (manager);
2089
2090    assert (tr_isSession (manager->session));
2091    session = manager->session;
2092
2093    if (tr_sessionIsAddressBlocked (session, addr))
2094    {
2095        tr_dbg ("Banned IP address \"%s\" tried to connect to us", tr_address_to_string (addr));
2096        if (socket >= 0)
2097            tr_netClose (session, socket);
2098        else
2099            UTP_Close (utp_socket);
2100    }
2101    else if (getExistingHandshake (&manager->incomingHandshakes, addr))
2102    {
2103        if (socket >= 0)
2104            tr_netClose (session, socket);
2105        else
2106            UTP_Close (utp_socket);
2107    }
2108    else /* we don't have a connection to them yet... */
2109    {
2110        tr_peerIo *    io;
2111        tr_handshake * handshake;
2112
2113        io = tr_peerIoNewIncoming (session, &session->bandwidth, addr, port, socket, utp_socket);
2114
2115        handshake = tr_handshakeNew (io,
2116                                     session->encryptionMode,
2117                                     myHandshakeDoneCB,
2118                                     manager);
2119
2120        tr_peerIoUnref (io); /* balanced by the implicit ref in tr_peerIoNewIncoming () */
2121
2122        tr_ptrArrayInsertSorted (&manager->incomingHandshakes, handshake,
2123                                 handshakeCompare);
2124    }
2125
2126    managerUnlock (manager);
2127}
2128
2129void
2130tr_peerMgrAddPex (tr_torrent * tor, uint8_t from,
2131                  const tr_pex * pex, int8_t seedProbability)
2132{
2133    if (tr_isPex (pex)) /* safeguard against corrupt data */
2134    {
2135        Torrent * t = tor->torrentPeers;
2136        managerLock (t->manager);
2137
2138        if (!tr_sessionIsAddressBlocked (t->manager->session, &pex->addr))
2139            if (tr_address_is_valid_for_peers (&pex->addr, pex->port))
2140                ensureAtomExists (t, &pex->addr, pex->port, pex->flags, seedProbability, from);
2141
2142        managerUnlock (t->manager);
2143    }
2144}
2145
2146void
2147tr_peerMgrMarkAllAsSeeds (tr_torrent * tor)
2148{
2149    Torrent * t = tor->torrentPeers;
2150    const int n = tr_ptrArraySize (&t->pool);
2151    struct peer_atom ** it = (struct peer_atom**) tr_ptrArrayBase (&t->pool);
2152    struct peer_atom ** end = it + n;
2153
2154    while (it != end)
2155        atomSetSeed (t, *it++);
2156}
2157
2158tr_pex *
2159tr_peerMgrCompactToPex (const void *    compact,
2160                        size_t          compactLen,
2161                        const uint8_t * added_f,
2162                        size_t          added_f_len,
2163                        size_t *        pexCount)
2164{
2165    size_t          i;
2166    size_t          n = compactLen / 6;
2167    const uint8_t * walk = compact;
2168    tr_pex *        pex = tr_new0 (tr_pex, n);
2169
2170    for (i = 0; i < n; ++i)
2171    {
2172        pex[i].addr.type = TR_AF_INET;
2173        memcpy (&pex[i].addr.addr, walk, 4); walk += 4;
2174        memcpy (&pex[i].port, walk, 2); walk += 2;
2175        if (added_f && (n == added_f_len))
2176            pex[i].flags = added_f[i];
2177    }
2178
2179    *pexCount = n;
2180    return pex;
2181}
2182
2183tr_pex *
2184tr_peerMgrCompact6ToPex (const void    * compact,
2185                         size_t          compactLen,
2186                         const uint8_t * added_f,
2187                         size_t          added_f_len,
2188                         size_t        * pexCount)
2189{
2190    size_t          i;
2191    size_t          n = compactLen / 18;
2192    const uint8_t * walk = compact;
2193    tr_pex *        pex = tr_new0 (tr_pex, n);
2194
2195    for (i = 0; i < n; ++i)
2196    {
2197        pex[i].addr.type = TR_AF_INET6;
2198        memcpy (&pex[i].addr.addr.addr6.s6_addr, walk, 16); walk += 16;
2199        memcpy (&pex[i].port, walk, 2); walk += 2;
2200        if (added_f && (n == added_f_len))
2201            pex[i].flags = added_f[i];
2202    }
2203
2204    *pexCount = n;
2205    return pex;
2206}
2207
2208tr_pex *
2209tr_peerMgrArrayToPex (const void * array,
2210                      size_t       arrayLen,
2211                      size_t      * pexCount)
2212{
2213    size_t          i;
2214    size_t          n = arrayLen / (sizeof (tr_address) + 2);
2215    /*size_t          n = arrayLen / sizeof (tr_peerArrayElement);*/
2216    const uint8_t * walk = array;
2217    tr_pex        * pex = tr_new0 (tr_pex, n);
2218
2219    for (i = 0 ; i < n ; i++) {
2220        memcpy (&pex[i].addr, walk, sizeof (tr_address));
2221        memcpy (&pex[i].port, walk + sizeof (tr_address), 2);
2222        pex[i].flags = 0x00;
2223        walk += sizeof (tr_address) + 2;
2224    }
2225
2226    *pexCount = n;
2227    return pex;
2228}
2229
2230/**
2231***
2232**/
2233
2234static void
2235tr_peerMgrSetBlame (tr_torrent     * tor,
2236                    tr_piece_index_t pieceIndex,
2237                    int              success)
2238{
2239    if (!success)
2240    {
2241        int        peerCount, i;
2242        Torrent *  t = tor->torrentPeers;
2243        tr_peer ** peers;
2244
2245        assert (torrentIsLocked (t));
2246
2247        peers = (tr_peer **) tr_ptrArrayPeek (&t->peers, &peerCount);
2248        for (i = 0; i < peerCount; ++i)
2249        {
2250            tr_peer * peer = peers[i];
2251            if (tr_bitfieldHas (&peer->blame, pieceIndex))
2252            {
2253                tordbg (t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
2254                        tr_atomAddrStr (peer->atom),
2255                        pieceIndex, (int)peer->strikes + 1);
2256                addStrike (t, peer);
2257            }
2258        }
2259    }
2260}
2261
2262int
2263tr_pexCompare (const void * va, const void * vb)
2264{
2265    const tr_pex * a = va;
2266    const tr_pex * b = vb;
2267    int i;
2268
2269    assert (tr_isPex (a));
2270    assert (tr_isPex (b));
2271
2272    if ((i = tr_address_compare (&a->addr, &b->addr)))
2273        return i;
2274
2275    if (a->port != b->port)
2276        return a->port < b->port ? -1 : 1;
2277
2278    return 0;
2279}
2280
2281/* better goes first */
2282static int
2283compareAtomsByUsefulness (const void * va, const void *vb)
2284{
2285    const struct peer_atom * a = * (const struct peer_atom**) va;
2286    const struct peer_atom * b = * (const struct peer_atom**) vb;
2287
2288    assert (tr_isAtom (a));
2289    assert (tr_isAtom (b));
2290
2291    if (a->piece_data_time != b->piece_data_time)
2292        return a->piece_data_time > b->piece_data_time ? -1 : 1;
2293    if (a->fromBest != b->fromBest)
2294        return a->fromBest < b->fromBest ? -1 : 1;
2295    if (a->numFails != b->numFails)
2296        return a->numFails < b->numFails ? -1 : 1;
2297
2298    return 0;
2299}
2300
2301static bool
2302isAtomInteresting (const tr_torrent * tor, struct peer_atom * atom)
2303{
2304    if (tr_torrentIsSeed (tor) && atomIsSeed (atom))
2305        return false;
2306
2307    if (peerIsInUse (tor->torrentPeers, atom))
2308        return true;
2309
2310    if (isAtomBlocklisted (tor->session, atom))
2311        return false;
2312
2313    if (atom->flags2 & MYFLAG_BANNED)
2314        return false;
2315
2316    return true;
2317}
2318
2319int
2320tr_peerMgrGetPeers (tr_torrent   * tor,
2321                    tr_pex      ** setme_pex,
2322                    uint8_t        af,
2323                    uint8_t        list_mode,
2324                    int            maxCount)
2325{
2326    int i;
2327    int n;
2328    int count = 0;
2329    int atomCount = 0;
2330    const Torrent * t = tor->torrentPeers;
2331    struct peer_atom ** atoms = NULL;
2332    tr_pex * pex;
2333    tr_pex * walk;
2334
2335    assert (tr_isTorrent (tor));
2336    assert (setme_pex != NULL);
2337    assert (af==TR_AF_INET || af==TR_AF_INET6);
2338    assert (list_mode==TR_PEERS_CONNECTED || list_mode==TR_PEERS_INTERESTING);
2339
2340    managerLock (t->manager);
2341
2342    /**
2343    ***  build a list of atoms
2344    **/
2345
2346    if (list_mode == TR_PEERS_CONNECTED) /* connected peers only */
2347    {
2348        int i;
2349        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase (&t->peers);
2350        atomCount = tr_ptrArraySize (&t->peers);
2351        atoms = tr_new (struct peer_atom *, atomCount);
2352        for (i=0; i<atomCount; ++i)
2353            atoms[i] = peers[i]->atom;
2354    }
2355    else /* TR_PEERS_INTERESTING */
2356    {
2357        int i;
2358        struct peer_atom ** atomBase = (struct peer_atom**) tr_ptrArrayBase (&t->pool);
2359        n = tr_ptrArraySize (&t->pool);
2360        atoms = tr_new (struct peer_atom *, n);
2361        for (i=0; i<n; ++i)
2362            if (isAtomInteresting (tor, atomBase[i]))
2363                atoms[atomCount++] = atomBase[i];
2364    }
2365
2366    qsort (atoms, atomCount, sizeof (struct peer_atom *), compareAtomsByUsefulness);
2367
2368    /**
2369    ***  add the first N of them into our return list
2370    **/
2371
2372    n = MIN (atomCount, maxCount);
2373    pex = walk = tr_new0 (tr_pex, n);
2374
2375    for (i=0; i<atomCount && count<n; ++i)
2376    {
2377        const struct peer_atom * atom = atoms[i];
2378        if (atom->addr.type == af)
2379        {
2380            assert (tr_address_is_valid (&atom->addr));
2381            walk->addr = atom->addr;
2382            walk->port = atom->port;
2383            walk->flags = atom->flags;
2384            ++count;
2385            ++walk;
2386        }
2387    }
2388
2389    qsort (pex, count, sizeof (tr_pex), tr_pexCompare);
2390
2391    assert ((walk - pex) == count);
2392    *setme_pex = pex;
2393
2394    /* cleanup */
2395    tr_free (atoms);
2396    managerUnlock (t->manager);
2397    return count;
2398}
2399
2400static void atomPulse    (int, short, void *);
2401static void bandwidthPulse (int, short, void *);
2402static void rechokePulse (int, short, void *);
2403static void reconnectPulse (int, short, void *);
2404
2405static struct event *
2406createTimer (tr_session * session, int msec, void (*callback)(int, short, void *), void * cbdata)
2407{
2408    struct event * timer = evtimer_new (session->event_base, callback, cbdata);
2409    tr_timerAddMsec (timer, msec);
2410    return timer;
2411}
2412
2413static void
2414ensureMgrTimersExist (struct tr_peerMgr * m)
2415{
2416    if (m->atomTimer == NULL)
2417        m->atomTimer = createTimer (m->session, ATOM_PERIOD_MSEC, atomPulse, m);
2418
2419    if (m->bandwidthTimer == NULL)
2420        m->bandwidthTimer = createTimer (m->session, BANDWIDTH_PERIOD_MSEC, bandwidthPulse, m);
2421
2422    if (m->rechokeTimer == NULL)
2423        m->rechokeTimer = createTimer (m->session, RECHOKE_PERIOD_MSEC, rechokePulse, m);
2424
2425    if (m->refillUpkeepTimer == NULL)
2426        m->refillUpkeepTimer = createTimer (m->session, REFILL_UPKEEP_PERIOD_MSEC, refillUpkeep, m);
2427}
2428
2429void
2430tr_peerMgrStartTorrent (tr_torrent * tor)
2431{
2432    Torrent * t = tor->torrentPeers;
2433
2434    assert (tr_isTorrent (tor));
2435    assert (tr_torrentIsLocked (tor));
2436
2437    ensureMgrTimersExist (t->manager);
2438
2439    t->isRunning = true;
2440    t->maxPeers = t->tor->maxConnectedPeers;
2441    t->pieceSortState = PIECES_UNSORTED;
2442
2443    rechokePulse (0, 0, t->manager);
2444}
2445
2446static void
2447stopTorrent (Torrent * t)
2448{
2449    tr_peer * peer;
2450
2451    t->isRunning = false;
2452
2453    replicationFree (t);
2454    invalidatePieceSorting (t);
2455
2456    /* disconnect the peers. */
2457    while ((peer = tr_ptrArrayPop (&t->peers)))
2458        peerDelete (t, peer);
2459
2460    /* disconnect the handshakes. handshakeAbort calls handshakeDoneCB (),
2461     * which removes the handshake from t->outgoingHandshakes... */
2462    while (!tr_ptrArrayEmpty (&t->outgoingHandshakes))
2463        tr_handshakeAbort (tr_ptrArrayNth (&t->outgoingHandshakes, 0));
2464}
2465
2466void
2467tr_peerMgrStopTorrent (tr_torrent * tor)
2468{
2469    assert (tr_isTorrent (tor));
2470    assert (tr_torrentIsLocked (tor));
2471
2472    stopTorrent (tor->torrentPeers);
2473}
2474
2475void
2476tr_peerMgrAddTorrent (tr_peerMgr * manager, tr_torrent * tor)
2477{
2478    assert (tr_isTorrent (tor));
2479    assert (tr_torrentIsLocked (tor));
2480    assert (tor->torrentPeers == NULL);
2481
2482    tor->torrentPeers = torrentNew (manager, tor);
2483}
2484
2485void
2486tr_peerMgrRemoveTorrent (tr_torrent * tor)
2487{
2488    assert (tr_isTorrent (tor));
2489    assert (tr_torrentIsLocked (tor));
2490
2491    stopTorrent (tor->torrentPeers);
2492    torrentFree (tor->torrentPeers);
2493}
2494
2495void
2496tr_peerUpdateProgress (tr_torrent * tor, tr_peer * peer)
2497{
2498    const tr_bitfield * have = &peer->have;
2499
2500    if (tr_bitfieldHasAll (have))
2501    {
2502        peer->progress = 1.0;
2503    }
2504    else if (tr_bitfieldHasNone (have))
2505    {
2506        peer->progress = 0.0;
2507    }
2508    else
2509    {
2510        const float true_count = tr_bitfieldCountTrueBits (have);
2511
2512        if (tr_torrentHasMetadata (tor))
2513        {
2514            peer->progress = true_count / tor->info.pieceCount;
2515        }
2516        else /* without pieceCount, this result is only a best guess... */
2517        {
2518            peer->progress = true_count / (have->bit_count + 1);
2519        }
2520    }
2521
2522    /* clamp the progress range */
2523    if (peer->progress < 0.0)
2524        peer->progress = 0.0;
2525    if (peer->progress > 1.0)
2526        peer->progress = 1.0;
2527
2528    if (peer->atom && (peer->progress >= 1.0))
2529        atomSetSeed (tor->torrentPeers, peer->atom);
2530}
2531
2532void
2533tr_peerMgrOnTorrentGotMetainfo (tr_torrent * tor)
2534{
2535    int i;
2536    int peerCount;
2537    tr_peer ** peers;
2538
2539    /* the webseed list may have changed... */
2540    rebuildWebseedArray (tor->torrentPeers, tor);
2541
2542    /* some peer_msgs' progress fields may not be accurate if we
2543       didn't have the metadata before now... so refresh them all... */
2544    peerCount = tr_ptrArraySize (&tor->torrentPeers->peers);
2545    peers = (tr_peer**) tr_ptrArrayBase (&tor->torrentPeers->peers);
2546    for (i=0; i<peerCount; ++i)
2547        tr_peerUpdateProgress (tor, peers[i]);
2548
2549}
2550
2551void
2552tr_peerMgrTorrentAvailability (tr_torrent * tor, int8_t * tab, unsigned int tabCount)
2553{
2554  assert (tab != NULL);
2555  assert (tabCount > 0);
2556
2557  if (tr_torrentRef (tor))
2558    {
2559      memset (tab, 0, tabCount);
2560
2561      if (tr_torrentHasMetadata (tor))
2562        {
2563          tr_piece_index_t i;
2564          const int peerCount = tr_ptrArraySize (&tor->torrentPeers->peers);
2565          const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase (&tor->torrentPeers->peers);
2566          const float interval = tor->info.pieceCount / (float)tabCount;
2567          const bool isSeed = tr_cpGetStatus (&tor->completion) == TR_SEED;
2568
2569          for (i=0; i<tabCount; ++i)
2570            {
2571              const int piece = i * interval;
2572
2573              if (isSeed || tr_cpPieceIsComplete (&tor->completion, piece))
2574                {
2575                  tab[i] = -1;
2576                }
2577              else if (peerCount)
2578                {
2579                  int j;
2580                  for (j=0; j<peerCount; ++j)
2581                    if (tr_bitfieldHas (&peers[j]->have, piece))
2582                      ++tab[i];
2583                }
2584            }
2585        }
2586
2587      tr_torrentUnref (tor);
2588    }
2589}
2590
2591static bool
2592peerIsSeed (const tr_peer * peer)
2593{
2594    if (peer->progress >= 1.0)
2595        return true;
2596
2597    if (peer->atom && atomIsSeed (peer->atom))
2598        return true;
2599
2600    return false;
2601}
2602
2603/* count how many bytes we want that connected peers have */
2604uint64_t
2605tr_peerMgrGetDesiredAvailable (const tr_torrent * tor)
2606{
2607    size_t i;
2608    size_t n;
2609    uint64_t desiredAvailable;
2610    const Torrent * t = tor->torrentPeers;
2611
2612    /* common shortcuts... */
2613
2614    if (tr_torrentIsSeed (t->tor))
2615        return 0;
2616
2617    if (!tr_torrentHasMetadata (tor))
2618        return 0;
2619
2620    n = tr_ptrArraySize (&t->peers);
2621    if (n == 0)
2622        return 0;
2623    else {
2624        const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase (&t->peers);
2625        for (i=0; i<n; ++i)
2626            if (peers[i]->atom && atomIsSeed (peers[i]->atom))
2627                return tr_cpLeftUntilDone (&tor->completion);
2628    }
2629
2630    if (!t->pieceReplication || !t->pieceReplicationSize)
2631        return 0;
2632
2633    /* do it the hard way */
2634
2635    desiredAvailable = 0;
2636    for (i=0, n=MIN (tor->info.pieceCount, t->pieceReplicationSize); i<n; ++i)
2637        if (!tor->info.pieces[i].dnd && (t->pieceReplication[i] > 0))
2638            desiredAvailable += tr_cpMissingBytesInPiece (&t->tor->completion, i);
2639
2640    assert (desiredAvailable <= tor->info.totalSize);
2641    return desiredAvailable;
2642}
2643
2644void
2645tr_peerMgrTorrentStats (tr_torrent  * tor,
2646                        int         * setmePeersConnected,
2647                        int         * setmeWebseedsSendingToUs,
2648                        int         * setmePeersSendingToUs,
2649                        int         * setmePeersGettingFromUs,
2650                        int         * setmePeersFrom)
2651{
2652  *setmePeersConnected       = 0;
2653  *setmePeersGettingFromUs   = 0;
2654  *setmePeersSendingToUs     = 0;
2655  *setmeWebseedsSendingToUs  = 0;
2656
2657  if (tr_torrentRef (tor))
2658    {
2659      int i;
2660      const Torrent * t = tor->torrentPeers;
2661      const int size = tr_ptrArraySize (&t->peers);
2662      const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase (&t->peers);
2663
2664      for (i=0; i<TR_PEER_FROM__MAX; ++i)
2665        setmePeersFrom[i] = 0;
2666
2667      for (i=0; i<size; ++i)
2668        {
2669          const tr_peer * peer = peers[i];
2670          const struct peer_atom * atom = peer->atom;
2671
2672          if (peer->io == NULL) /* not connected */
2673            continue;
2674
2675          ++*setmePeersConnected;
2676
2677          ++setmePeersFrom[atom->fromFirst];
2678
2679          if (clientIsDownloadingFrom (tor, peer))
2680            ++*setmePeersSendingToUs;
2681
2682          if (clientIsUploadingTo (peer))
2683            ++*setmePeersGettingFromUs;
2684        }
2685
2686      *setmeWebseedsSendingToUs = countActiveWebseeds (t);
2687      tr_torrentUnref (tor);
2688    }
2689}
2690
2691double*
2692tr_peerMgrWebSpeeds_KBps (tr_torrent * tor)
2693{
2694  double * ret = NULL;
2695
2696  if (tr_torrentRef (tor))
2697    {
2698      int i;
2699      const Torrent * t = tor->torrentPeers;
2700      const int webseedCount = tr_ptrArraySize (&t->webseeds);
2701      const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase (&t->webseeds);
2702      const uint64_t now = tr_time_msec ();
2703
2704      ret = tr_new0 (double, webseedCount);
2705
2706      assert (t->manager != NULL);
2707      assert (webseedCount == tor->info.webseedCount);
2708
2709      for (i=0; i<webseedCount; ++i)
2710        {
2711          unsigned int Bps;
2712          if (tr_webseedGetSpeed_Bps (webseeds[i], now, &Bps))
2713            ret[i] = Bps / (double)tr_speed_K;
2714          else
2715            ret[i] = -1.0;
2716        }
2717
2718      tr_torrentUnref (tor);
2719    }
2720
2721  return ret;
2722}
2723
2724unsigned int
2725tr_peerGetPieceSpeed_Bps (const tr_peer * peer, uint64_t now, tr_direction direction)
2726{
2727    return peer->io ? tr_peerIoGetPieceSpeed_Bps (peer->io, now, direction) : 0.0;
2728}
2729
2730struct tr_peer_stat *
2731tr_peerMgrPeerStats (tr_torrent * tor, int * setmeCount)
2732{
2733  int size = 0;
2734  tr_peer_stat * ret = NULL;
2735
2736  if (tr_torrentRef (tor))
2737    {
2738      int i;
2739      const Torrent * t = tor->torrentPeers;
2740      const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase (&t->peers);
2741      const uint64_t now_msec = tr_time_msec ();
2742      const time_t now = tr_time ();
2743
2744      assert (t->manager != NULL);
2745
2746      size = tr_ptrArraySize (&t->peers);
2747      ret = tr_new0 (tr_peer_stat, size);
2748
2749      for (i=0; i<size; ++i)
2750        {
2751          char *                   pch;
2752          const tr_peer *          peer = peers[i];
2753          const struct peer_atom * atom = peer->atom;
2754          tr_peer_stat *           stat = ret + i;
2755
2756          tr_address_to_string_with_buf (&atom->addr, stat->addr, sizeof (stat->addr));
2757          tr_strlcpy (stat->client, (peer->client ? peer->client : ""), sizeof (stat->client));
2758          stat->port                = ntohs (peer->atom->port);
2759          stat->from                = atom->fromFirst;
2760          stat->progress            = peer->progress;
2761          stat->isUTP               = peer->io->utp_socket != NULL;
2762          stat->isEncrypted         = tr_peerIoIsEncrypted (peer->io) ? 1 : 0;
2763          stat->rateToPeer_KBps     = toSpeedKBps (tr_peerGetPieceSpeed_Bps (peer, now_msec, TR_CLIENT_TO_PEER));
2764          stat->rateToClient_KBps   = toSpeedKBps (tr_peerGetPieceSpeed_Bps (peer, now_msec, TR_PEER_TO_CLIENT));
2765          stat->peerIsChoked        = peer->peerIsChoked;
2766          stat->peerIsInterested    = peer->peerIsInterested;
2767          stat->clientIsChoked      = peer->clientIsChoked;
2768          stat->clientIsInterested  = peer->clientIsInterested;
2769          stat->isIncoming          = tr_peerIoIsIncoming (peer->io);
2770          stat->isDownloadingFrom   = clientIsDownloadingFrom (tor, peer);
2771          stat->isUploadingTo       = clientIsUploadingTo (peer);
2772          stat->isSeed              = peerIsSeed (peer);
2773
2774          stat->blocksToPeer        = tr_historyGet (&peer->blocksSentToPeer,    now, CANCEL_HISTORY_SEC);
2775          stat->blocksToClient      = tr_historyGet (&peer->blocksSentToClient,  now, CANCEL_HISTORY_SEC);
2776          stat->cancelsToPeer       = tr_historyGet (&peer->cancelsSentToPeer,   now, CANCEL_HISTORY_SEC);
2777          stat->cancelsToClient     = tr_historyGet (&peer->cancelsSentToClient, now, CANCEL_HISTORY_SEC);
2778
2779          stat->pendingReqsToPeer   = peer->pendingReqsToPeer;
2780          stat->pendingReqsToClient = peer->pendingReqsToClient;
2781
2782          pch = stat->flagStr;
2783          if (stat->isUTP) *pch++ = 'T';
2784          if (t->optimistic == peer) *pch++ = 'O';
2785          if (stat->isDownloadingFrom) *pch++ = 'D';
2786          else if (stat->clientIsInterested) *pch++ = 'd';
2787          if (stat->isUploadingTo) *pch++ = 'U';
2788          else if (stat->peerIsInterested) *pch++ = 'u';
2789          if (!stat->clientIsChoked && !stat->clientIsInterested) *pch++ = 'K';
2790          if (!stat->peerIsChoked && !stat->peerIsInterested) *pch++ = '?';
2791          if (stat->isEncrypted) *pch++ = 'E';
2792          if (stat->from == TR_PEER_FROM_DHT) *pch++ = 'H';
2793          else if (stat->from == TR_PEER_FROM_PEX) *pch++ = 'X';
2794          if (stat->isIncoming) *pch++ = 'I';
2795          *pch = '\0';
2796        }
2797
2798      tr_torrentUnref (tor);
2799    }
2800
2801  *setmeCount = size;
2802  return ret;
2803}
2804
2805/***
2806****
2807****
2808***/
2809
2810void
2811tr_peerMgrClearInterest (tr_torrent * tor)
2812{
2813    int i;
2814    Torrent * t = tor->torrentPeers;
2815    const int peerCount = tr_ptrArraySize (&t->peers);
2816
2817    assert (tr_isTorrent (tor));
2818    assert (tr_torrentIsLocked (tor));
2819
2820    for (i=0; i<peerCount; ++i)
2821    {
2822        const tr_peer * peer = tr_ptrArrayNth (&t->peers, i);
2823        tr_peerMsgsSetInterested (peer->msgs, false);
2824    }
2825}
2826
2827/* does this peer have any pieces that we want? */
2828static bool
2829isPeerInteresting (tr_torrent     * const tor,
2830                   const bool     * const piece_is_interesting,
2831                   const tr_peer  * const peer)
2832{
2833  tr_piece_index_t i, n;
2834
2835  /* these cases should have already been handled by the calling code... */
2836  assert (!tr_torrentIsSeed (tor));
2837  assert (tr_torrentIsPieceTransferAllowed (tor, TR_PEER_TO_CLIENT));
2838
2839  if (peerIsSeed (peer))
2840    return true;
2841
2842  for (i=0, n=tor->info.pieceCount; i<n; ++i)
2843    if (piece_is_interesting[i] && tr_bitfieldHas (&peer->have, i))
2844      return true;
2845
2846  return false;
2847}
2848
2849typedef enum
2850{
2851    RECHOKE_STATE_GOOD,
2852    RECHOKE_STATE_UNTESTED,
2853    RECHOKE_STATE_BAD
2854}
2855tr_rechoke_state;
2856
2857struct tr_rechoke_info
2858{
2859    tr_peer * peer;
2860    int salt;
2861    int rechoke_state;
2862};
2863
2864static int
2865compare_rechoke_info (const void * va, const void * vb)
2866{
2867    const struct tr_rechoke_info * a = va;
2868    const struct tr_rechoke_info * b = vb;
2869
2870    if (a->rechoke_state != b->rechoke_state)
2871        return a->rechoke_state - b->rechoke_state;
2872
2873    return a->salt - b->salt;
2874}
2875
2876/* determines who we send "interested" messages to */
2877static void
2878rechokeDownloads (Torrent * t)
2879{
2880    int i;
2881    int maxPeers = 0;
2882    int rechoke_count = 0;
2883    struct tr_rechoke_info * rechoke = NULL;
2884    const int MIN_INTERESTING_PEERS = 5;
2885    const int peerCount = tr_ptrArraySize (&t->peers);
2886    const time_t now = tr_time ();
2887
2888    /* some cases where this function isn't necessary */
2889    if (tr_torrentIsSeed (t->tor))
2890        return;
2891    if (!tr_torrentIsPieceTransferAllowed (t->tor, TR_PEER_TO_CLIENT))
2892        return;
2893
2894    /* decide HOW MANY peers to be interested in */
2895    {
2896        int blocks = 0;
2897        int cancels = 0;
2898        time_t timeSinceCancel;
2899
2900        /* Count up how many blocks & cancels each peer has.
2901         *
2902         * There are two situations where we send out cancels --
2903         *
2904         * 1. We've got unresponsive peers, which is handled by deciding
2905         *    -which- peers to be interested in.
2906         *
2907         * 2. We've hit our bandwidth cap, which is handled by deciding
2908         *    -how many- peers to be interested in.
2909         *
2910         * We're working on 2. here, so we need to ignore unresponsive
2911         * peers in our calculations lest they confuse Transmission into
2912         * thinking it's hit its bandwidth cap.
2913         */
2914        for (i=0; i<peerCount; ++i)
2915        {
2916            const tr_peer * peer = tr_ptrArrayNth (&t->peers, i);
2917            const int b = tr_historyGet (&peer->blocksSentToClient, now, CANCEL_HISTORY_SEC);
2918            const int c = tr_historyGet (&peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC);
2919
2920            if (b == 0) /* ignore unresponsive peers, as described above */
2921                continue;
2922
2923            blocks += b;
2924            cancels += c;
2925        }
2926
2927        if (cancels > 0)
2928        {
2929            /* cancelRate: of the block requests we've recently made, the percentage we cancelled.
2930             * higher values indicate more congestion. */
2931            const double cancelRate = cancels / (double)(cancels + blocks);
2932            const double mult = 1 - MIN (cancelRate, 0.5);
2933            maxPeers = t->interestedCount * mult;
2934            tordbg (t, "cancel rate is %.3f -- reducing the "
2935                       "number of peers we're interested in by %.0f percent",
2936                       cancelRate, mult * 100);
2937            t->lastCancel = now;
2938        }
2939
2940        timeSinceCancel = now - t->lastCancel;
2941        if (timeSinceCancel)
2942        {
2943            const int maxIncrease = 15;
2944            const time_t maxHistory = 2 * CANCEL_HISTORY_SEC;
2945            const double mult = MIN (timeSinceCancel, maxHistory) / (double) maxHistory;
2946            const int inc = maxIncrease * mult;
2947            maxPeers = t->maxPeers + inc;
2948            tordbg (t, "time since last cancel is %li -- increasing the "
2949                       "number of peers we're interested in by %d",
2950                       timeSinceCancel, inc);
2951        }
2952    }
2953
2954    /* don't let the previous section's number tweaking go too far... */
2955    if (maxPeers < MIN_INTERESTING_PEERS)
2956        maxPeers = MIN_INTERESTING_PEERS;
2957    if (maxPeers > t->tor->maxConnectedPeers)
2958        maxPeers = t->tor->maxConnectedPeers;
2959
2960    t->maxPeers = maxPeers;
2961
2962    if (peerCount > 0)
2963    {
2964        bool * piece_is_interesting;
2965        const tr_torrent * const tor = t->tor;
2966        const int n = tor->info.pieceCount;
2967
2968        /* build a bitfield of interesting pieces... */
2969        piece_is_interesting = tr_new (bool, n);
2970        for (i=0; i<n; i++)
2971            piece_is_interesting[i] = !tor->info.pieces[i].dnd && !tr_cpPieceIsComplete (&tor->completion, i);
2972
2973        /* decide WHICH peers to be interested in (based on their cancel-to-block ratio) */
2974        for (i=0; i<peerCount; ++i)
2975        {
2976            tr_peer * peer = tr_ptrArrayNth (&t->peers, i);
2977
2978            if (!isPeerInteresting (t->tor, piece_is_interesting, peer))
2979            {
2980                tr_peerMsgsSetInterested (peer->msgs, false);
2981            }
2982            else
2983            {
2984                tr_rechoke_state rechoke_state;
2985                const int blocks = tr_historyGet (&peer->blocksSentToClient, now, CANCEL_HISTORY_SEC);
2986                const int cancels = tr_historyGet (&peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC);
2987
2988                if (!blocks && !cancels)
2989                    rechoke_state = RECHOKE_STATE_UNTESTED;
2990                else if (!cancels)
2991                    rechoke_state = RECHOKE_STATE_GOOD;
2992                else if (!blocks)
2993                    rechoke_state = RECHOKE_STATE_BAD;
2994                else if ((cancels * 10) < blocks)
2995                    rechoke_state = RECHOKE_STATE_GOOD;
2996                else
2997                    rechoke_state = RECHOKE_STATE_BAD;
2998
2999                if (rechoke == NULL)
3000                    rechoke = tr_new (struct tr_rechoke_info, peerCount);
3001
3002                 rechoke[rechoke_count].peer = peer;
3003                 rechoke[rechoke_count].rechoke_state = rechoke_state;
3004                 rechoke[rechoke_count].salt = tr_cryptoWeakRandInt (INT_MAX);
3005                 rechoke_count++;
3006            }
3007
3008        }
3009
3010        tr_free (piece_is_interesting);
3011    }
3012
3013    /* now that we know which & how many peers to be interested in... update the peer interest */
3014    qsort (rechoke, rechoke_count, sizeof (struct tr_rechoke_info), compare_rechoke_info);
3015    t->interestedCount = MIN (maxPeers, rechoke_count);
3016    for (i=0; i<rechoke_count; ++i)
3017        tr_peerMsgsSetInterested (rechoke[i].peer->msgs, i<t->interestedCount);
3018
3019    /* cleanup */
3020    tr_free (rechoke);
3021}
3022
3023/**
3024***
3025**/
3026
3027struct ChokeData
3028{
3029    bool            isInterested;
3030    bool            wasChoked;
3031    bool            isChoked;
3032    int             rate;
3033    int             salt;
3034    tr_peer *       peer;
3035};
3036
3037static int
3038compareChoke (const void * va, const void * vb)
3039{
3040    const struct ChokeData * a = va;
3041    const struct ChokeData * b = vb;
3042
3043    if (a->rate != b->rate) /* prefer higher overall speeds */
3044        return a->rate > b->rate ? -1 : 1;
3045
3046    if (a->wasChoked != b->wasChoked) /* prefer unchoked */
3047        return a->wasChoked ? 1 : -1;
3048
3049    if (a->salt != b->salt) /* random order */
3050        return a->salt - b->salt;
3051
3052    return 0;
3053}
3054
3055/* is this a new connection? */
3056static int
3057isNew (const tr_peer * peer)
3058{
3059    return peer && peer->io && tr_peerIoGetAge (peer->io) < 45;
3060}
3061
3062/* get a rate for deciding which peers to choke and unchoke. */
3063static int
3064getRate (const tr_torrent * tor, struct peer_atom * atom, uint64_t now)
3065{
3066    unsigned int Bps;
3067
3068    if (tr_torrentIsSeed (tor))
3069        Bps = tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_CLIENT_TO_PEER);
3070
3071    /* downloading a private torrent... take upload speed into account
3072     * because there may only be a small window of opportunity to share */
3073    else if (tr_torrentIsPrivate (tor))
3074        Bps = tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_PEER_TO_CLIENT)
3075            + tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_CLIENT_TO_PEER);
3076
3077    /* downloading a public torrent */
3078    else
3079        Bps = tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_PEER_TO_CLIENT);
3080
3081    /* convert it to bytes per second */
3082    return Bps;
3083}
3084
3085static inline bool
3086isBandwidthMaxedOut (const tr_bandwidth * b,
3087                     const uint64_t now_msec, tr_direction dir)
3088{
3089    if (!tr_bandwidthIsLimited (b, dir))
3090        return false;
3091    else {
3092        const unsigned int got = tr_bandwidthGetPieceSpeed_Bps (b, now_msec, dir);
3093        const unsigned int want = tr_bandwidthGetDesiredSpeed_Bps (b, dir);
3094        return got >= want;
3095    }
3096}
3097
3098static void
3099rechokeUploads (Torrent * t, const uint64_t now)
3100{
3101    int i, size, unchokedInterested;
3102    const int peerCount = tr_ptrArraySize (&t->peers);
3103    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase (&t->peers);
3104    struct ChokeData * choke = tr_new0 (struct ChokeData, peerCount);
3105    const tr_session * session = t->manager->session;
3106    const int chokeAll = !tr_torrentIsPieceTransferAllowed (t->tor, TR_CLIENT_TO_PEER);
3107    const bool isMaxedOut = isBandwidthMaxedOut (&t->tor->bandwidth, now, TR_UP);
3108
3109    assert (torrentIsLocked (t));
3110
3111    /* an optimistic unchoke peer's "optimistic"
3112     * state lasts for N calls to rechokeUploads (). */
3113    if (t->optimisticUnchokeTimeScaler > 0)
3114        t->optimisticUnchokeTimeScaler--;
3115    else
3116        t->optimistic = NULL;
3117
3118    /* sort the peers by preference and rate */
3119    for (i = 0, size = 0; i < peerCount; ++i)
3120    {
3121        tr_peer * peer = peers[i];
3122        struct peer_atom * atom = peer->atom;
3123
3124        if (peerIsSeed (peer)) /* choke seeds and partial seeds */
3125        {
3126            tr_peerMsgsSetChoke (peer->msgs, true);
3127        }
3128        else if (chokeAll) /* choke everyone if we're not uploading */
3129        {
3130            tr_peerMsgsSetChoke (peer->msgs, true);
3131        }
3132        else if (peer != t->optimistic)
3133        {
3134            struct ChokeData * n = &choke[size++];
3135            n->peer         = peer;
3136            n->isInterested = peer->peerIsInterested;
3137            n->wasChoked    = peer->peerIsChoked;
3138            n->rate         = getRate (t->tor, atom, now);
3139            n->salt         = tr_cryptoWeakRandInt (INT_MAX);
3140            n->isChoked     = true;
3141        }
3142    }
3143
3144    qsort (choke, size, sizeof (struct ChokeData), compareChoke);
3145
3146    /**
3147     * Reciprocation and number of uploads capping is managed by unchoking
3148     * the N peers which have the best upload rate and are interested.
3149     * This maximizes the client's download rate. These N peers are
3150     * referred to as downloaders, because they are interested in downloading
3151     * from the client.
3152     *
3153     * Peers which have a better upload rate (as compared to the downloaders)
3154     * but aren't interested get unchoked. If they become interested, the
3155     * downloader with the worst upload rate gets choked. If a client has
3156     * a complete file, it uses its upload rate rather than its download
3157     * rate to decide which peers to unchoke.
3158     *
3159     * If our bandwidth is maxed out, don't unchoke any more peers.
3160     */
3161    unchokedInterested = 0;
3162    for (i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i) {
3163        choke[i].isChoked = isMaxedOut ? choke[i].wasChoked : false;
3164        if (choke[i].isInterested)
3165            ++unchokedInterested;
3166    }
3167
3168    /* optimistic unchoke */
3169    if (!t->optimistic && !isMaxedOut && (i<size))
3170    {
3171        int n;
3172        struct ChokeData * c;
3173        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
3174
3175        for (; i<size; ++i)
3176        {
3177            if (choke[i].isInterested)
3178            {
3179                const tr_peer * peer = choke[i].peer;
3180                int x = 1, y;
3181                if (isNew (peer)) x *= 3;
3182                for (y=0; y<x; ++y)
3183                    tr_ptrArrayAppend (&randPool, &choke[i]);
3184            }
3185        }
3186
3187        if ((n = tr_ptrArraySize (&randPool)))
3188        {
3189            c = tr_ptrArrayNth (&randPool, tr_cryptoWeakRandInt (n));
3190            c->isChoked = false;
3191            t->optimistic = c->peer;
3192            t->optimisticUnchokeTimeScaler = OPTIMISTIC_UNCHOKE_MULTIPLIER;
3193        }
3194
3195        tr_ptrArrayDestruct (&randPool, NULL);
3196    }
3197
3198    for (i=0; i<size; ++i)
3199        tr_peerMsgsSetChoke (choke[i].peer->msgs, choke[i].isChoked);
3200
3201    /* cleanup */
3202    tr_free (choke);
3203}
3204
3205static void
3206rechokePulse (int foo UNUSED, short bar UNUSED, void * vmgr)
3207{
3208    tr_torrent * tor = NULL;
3209    tr_peerMgr * mgr = vmgr;
3210    const uint64_t now = tr_time_msec ();
3211
3212    managerLock (mgr);
3213
3214    while ((tor = tr_torrentNext (mgr->session, tor))) {
3215        if (tor->isRunning) {
3216            Torrent * t = tor->torrentPeers;
3217            if (!tr_ptrArrayEmpty (&t->peers)) {
3218                rechokeUploads (t, now);
3219                rechokeDownloads (t);
3220            }
3221        }
3222    }
3223
3224    tr_timerAddMsec (mgr->rechokeTimer, RECHOKE_PERIOD_MSEC);
3225    managerUnlock (mgr);
3226}
3227
3228/***
3229****
3230****  Life and Death
3231****
3232***/
3233
3234static bool
3235shouldPeerBeClosed (const Torrent    * t,
3236                    const tr_peer    * peer,
3237                    int                peerCount,
3238                    const time_t       now)
3239{
3240    const tr_torrent *       tor = t->tor;
3241    const struct peer_atom * atom = peer->atom;
3242
3243    /* if it's marked for purging, close it */
3244    if (peer->doPurge)
3245    {
3246        tordbg (t, "purging peer %s because its doPurge flag is set",
3247                tr_atomAddrStr (atom));
3248        return true;
3249    }
3250
3251    /* disconnect if we're both seeds and enough time has passed for PEX */
3252    if (tr_torrentIsSeed (tor) && peerIsSeed (peer))
3253        return !tr_torrentAllowsPex (tor) || (now-atom->time>=30);
3254
3255    /* disconnect if it's been too long since piece data has been transferred.
3256     * this is on a sliding scale based on number of available peers... */
3257    {
3258        const int relaxStrictnessIfFewerThanN = (int)((getMaxPeerCount (tor) * 0.9) + 0.5);
3259        /* if we have >= relaxIfFewerThan, strictness is 100%.
3260         * if we have zero connections, strictness is 0% */
3261        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
3262                               ? 1.0
3263                               : peerCount / (float)relaxStrictnessIfFewerThanN;
3264        const int lo = MIN_UPLOAD_IDLE_SECS;
3265        const int hi = MAX_UPLOAD_IDLE_SECS;
3266        const int limit = hi - ((hi - lo) * strictness);
3267        const int idleTime = now - MAX (atom->time, atom->piece_data_time);
3268/*fprintf (stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit);*/
3269        if (idleTime > limit) {
3270            tordbg (t, "purging peer %s because it's been %d secs since we shared anything",
3271                       tr_atomAddrStr (atom), idleTime);
3272            return true;
3273        }
3274    }
3275
3276    return false;
3277}
3278
3279static tr_peer **
3280getPeersToClose (Torrent * t, const time_t now_sec, int * setmeSize)
3281{
3282    int i, peerCount, outsize;
3283    struct tr_peer ** ret = NULL;
3284    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek (&t->peers, &peerCount);
3285
3286    assert (torrentIsLocked (t));
3287
3288    for (i = outsize = 0; i < peerCount; ++i) {
3289        if (shouldPeerBeClosed (t, peers[i], peerCount, now_sec)) {
3290            if (ret == NULL)
3291                ret = tr_new (tr_peer *, peerCount);
3292            ret[outsize++] = peers[i];
3293        }
3294    }
3295
3296    *setmeSize = outsize;
3297    return ret;
3298}
3299
3300static int
3301getReconnectIntervalSecs (const struct peer_atom * atom, const time_t now)
3302{
3303    int sec;
3304
3305    /* if we were recently connected to this peer and transferring piece
3306     * data, try to reconnect to them sooner rather that later -- we don't
3307     * want network troubles to get in the way of a good peer. */
3308    if ((now - atom->piece_data_time) <= (MINIMUM_RECONNECT_INTERVAL_SECS * 2))
3309        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3310
3311    /* don't allow reconnects more often than our minimum */
3312    else if ((now - atom->time) < MINIMUM_RECONNECT_INTERVAL_SECS)
3313        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3314
3315    /* otherwise, the interval depends on how many times we've tried
3316     * and failed to connect to the peer */
3317    else switch (atom->numFails) {
3318        case 0: sec = 0; break;
3319        case 1: sec = 5; break;
3320        case 2: sec = 2 * 60; break;
3321        case 3: sec = 15 * 60; break;
3322        case 4: sec = 30 * 60; break;
3323        case 5: sec = 60 * 60; break;
3324        default: sec = 120 * 60; break;
3325    }
3326
3327    /* penalize peers that were unreachable the last time we tried */
3328    if (atom->flags2 & MYFLAG_UNREACHABLE)
3329        sec += sec;
3330
3331    dbgmsg ("reconnect interval for %s is %d seconds", tr_atomAddrStr (atom), sec);
3332    return sec;
3333}
3334
3335static void
3336removePeer (Torrent * t, tr_peer * peer)
3337{
3338    tr_peer * removed;
3339    struct peer_atom * atom = peer->atom;
3340
3341    assert (torrentIsLocked (t));
3342    assert (atom);
3343
3344    atom->time = tr_time ();
3345
3346    removed = tr_ptrArrayRemoveSorted (&t->peers, peer, peerCompare);
3347
3348    if (replicationExists (t))
3349        tr_decrReplicationFromBitfield (t, &peer->have);
3350
3351    assert (removed == peer);
3352    peerDelete (t, removed);
3353}
3354
3355static void
3356closePeer (Torrent * t, tr_peer * peer)
3357{
3358    struct peer_atom * atom;
3359
3360    assert (t != NULL);
3361    assert (peer != NULL);
3362
3363    atom = peer->atom;
3364
3365    /* if we transferred piece data, then they might be good peers,
3366       so reset their `numFails' weight to zero. otherwise we connected
3367       to them fruitlessly, so mark it as another fail */
3368    if (atom->piece_data_time) {
3369        tordbg (t, "resetting atom %s numFails to 0", tr_atomAddrStr (atom));
3370        atom->numFails = 0;
3371    } else {
3372        ++atom->numFails;
3373        tordbg (t, "incremented atom %s numFails to %d", tr_atomAddrStr (atom), (int)atom->numFails);
3374    }
3375
3376    tordbg (t, "removing bad peer %s", tr_peerIoGetAddrStr (peer->io));
3377    removePeer (t, peer);
3378}
3379
3380static void
3381removeAllPeers (Torrent * t)
3382{
3383    while (!tr_ptrArrayEmpty (&t->peers))
3384        removePeer (t, tr_ptrArrayNth (&t->peers, 0));
3385}
3386
3387static void
3388closeBadPeers (Torrent * t, const time_t now_sec)
3389{
3390    if (!tr_ptrArrayEmpty (&t->peers))
3391    {
3392        int i;
3393        int peerCount;
3394        struct tr_peer ** peers = getPeersToClose (t, now_sec, &peerCount);
3395        for (i=0; i<peerCount; ++i)
3396            closePeer (t, peers[i]);
3397        tr_free (peers);
3398    }
3399}
3400
3401struct peer_liveliness
3402{
3403    tr_peer * peer;
3404    void * clientData;
3405    time_t pieceDataTime;
3406    time_t time;
3407    int speed;
3408    bool doPurge;
3409};
3410
3411static int
3412comparePeerLiveliness (const void * va, const void * vb)
3413{
3414    const struct peer_liveliness * a = va;
3415    const struct peer_liveliness * b = vb;
3416
3417    if (a->doPurge != b->doPurge)
3418        return a->doPurge ? 1 : -1;
3419
3420    if (a->speed != b->speed) /* faster goes first */
3421        return a->speed > b->speed ? -1 : 1;
3422
3423    /* the one to give us data more recently goes first */
3424    if (a->pieceDataTime != b->pieceDataTime)
3425        return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
3426
3427    /* the one we connected to most recently goes first */
3428    if (a->time != b->time)
3429        return a->time > b->time ? -1 : 1;
3430
3431    return 0;
3432}
3433
3434static void
3435sortPeersByLivelinessImpl (tr_peer  ** peers,
3436                           void     ** clientData,
3437                           int         n,
3438                           uint64_t    now,
3439                           int (*compare)(const void *va, const void *vb))
3440{
3441    int i;
3442    struct peer_liveliness *lives, *l;
3443
3444    /* build a sortable array of peer + extra info */
3445    lives = l = tr_new0 (struct peer_liveliness, n);
3446    for (i=0; i<n; ++i, ++l)
3447    {
3448        tr_peer * p = peers[i];
3449        l->peer = p;
3450        l->doPurge = p->doPurge;
3451        l->pieceDataTime = p->atom->piece_data_time;
3452        l->time = p->atom->time;
3453        l->speed = tr_peerGetPieceSpeed_Bps (p, now, TR_UP)
3454                 + tr_peerGetPieceSpeed_Bps (p, now, TR_DOWN);
3455        if (clientData)
3456            l->clientData = clientData[i];
3457    }
3458
3459    /* sort 'em */
3460    assert (n == (l - lives));
3461    qsort (lives, n, sizeof (struct peer_liveliness), compare);
3462
3463    /* build the peer array */
3464    for (i=0, l=lives; i<n; ++i, ++l) {
3465        peers[i] = l->peer;
3466        if (clientData)
3467            clientData[i] = l->clientData;
3468    }
3469    assert (n == (l - lives));
3470
3471    /* cleanup */
3472    tr_free (lives);
3473}
3474
3475static void
3476sortPeersByLiveliness (tr_peer ** peers, void ** clientData, int n, uint64_t now)
3477{
3478    sortPeersByLivelinessImpl (peers, clientData, n, now, comparePeerLiveliness);
3479}
3480
3481
3482static void
3483enforceTorrentPeerLimit (Torrent * t, uint64_t now)
3484{
3485    int n = tr_ptrArraySize (&t->peers);
3486    const int max = tr_torrentGetPeerLimit (t->tor);
3487    if (n > max)
3488    {
3489        void * base = tr_ptrArrayBase (&t->peers);
3490        tr_peer ** peers = tr_memdup (base, n*sizeof (tr_peer*));
3491        sortPeersByLiveliness (peers, NULL, n, now);
3492        while (n > max)
3493            closePeer (t, peers[--n]);
3494        tr_free (peers);
3495    }
3496}
3497
3498static void
3499enforceSessionPeerLimit (tr_session * session, uint64_t now)
3500{
3501    int n = 0;
3502    tr_torrent * tor = NULL;
3503    const int max = tr_sessionGetPeerLimit (session);
3504
3505    /* count the total number of peers */
3506    while ((tor = tr_torrentNext (session, tor)))
3507        n += tr_ptrArraySize (&tor->torrentPeers->peers);
3508
3509    /* if there are too many, prune out the worst */
3510    if (n > max)
3511    {
3512        tr_peer ** peers = tr_new (tr_peer*, n);
3513        Torrent ** torrents = tr_new (Torrent*, n);
3514
3515        /* populate the peer array */
3516        n = 0;
3517        tor = NULL;
3518        while ((tor = tr_torrentNext (session, tor))) {
3519            int i;
3520            Torrent * t = tor->torrentPeers;
3521            const int tn = tr_ptrArraySize (&t->peers);
3522            for (i=0; i<tn; ++i, ++n) {
3523                peers[n] = tr_ptrArrayNth (&t->peers, i);
3524                torrents[n] = t;
3525            }
3526        }
3527
3528        /* sort 'em */
3529        sortPeersByLiveliness (peers, (void**)torrents, n, now);
3530
3531        /* cull out the crappiest */
3532        while (n-- > max)
3533            closePeer (torrents[n], peers[n]);
3534
3535        /* cleanup */
3536        tr_free (torrents);
3537        tr_free (peers);
3538    }
3539}
3540
3541static void makeNewPeerConnections (tr_peerMgr * mgr, const int max);
3542
3543static void
3544reconnectPulse (int foo UNUSED, short bar UNUSED, void * vmgr)
3545{
3546    tr_torrent * tor;
3547    tr_peerMgr * mgr = vmgr;
3548    const time_t now_sec = tr_time ();
3549    const uint64_t now_msec = tr_time_msec ();
3550
3551    /**
3552    ***  enforce the per-session and per-torrent peer limits
3553    **/
3554
3555    /* if we're over the per-torrent peer limits, cull some peers */
3556    tor = NULL;
3557    while ((tor = tr_torrentNext (mgr->session, tor)))
3558        if (tor->isRunning)
3559            enforceTorrentPeerLimit (tor->torrentPeers, now_msec);
3560
3561    /* if we're over the per-session peer limits, cull some peers */
3562    enforceSessionPeerLimit (mgr->session, now_msec);
3563
3564    /* remove crappy peers */
3565    tor = NULL;
3566    while ((tor = tr_torrentNext (mgr->session, tor)))
3567        if (!tor->torrentPeers->isRunning)
3568            removeAllPeers (tor->torrentPeers);
3569        else
3570            closeBadPeers (tor->torrentPeers, now_sec);
3571
3572    /* try to make new peer connections */
3573    makeNewPeerConnections (mgr, MAX_CONNECTIONS_PER_PULSE);
3574}
3575
3576/****
3577*****
3578*****  BANDWIDTH ALLOCATION
3579*****
3580****/
3581
3582static void
3583pumpAllPeers (tr_peerMgr * mgr)
3584{
3585    tr_torrent * tor = NULL;
3586
3587    while ((tor = tr_torrentNext (mgr->session, tor)))
3588    {
3589        int j;
3590        Torrent * t = tor->torrentPeers;
3591
3592        for (j=0; j<tr_ptrArraySize (&t->peers); ++j)
3593        {
3594            tr_peer * peer = tr_ptrArrayNth (&t->peers, j);
3595            tr_peerMsgsPulse (peer->msgs);
3596        }
3597    }
3598}
3599
3600static void
3601queuePulse (tr_session * session, tr_direction dir)
3602{
3603    assert (tr_isSession (session));
3604    assert (tr_isDirection (dir));
3605
3606    if (tr_sessionGetQueueEnabled (session, dir))
3607    {
3608        int i;
3609        const int n = tr_sessionCountQueueFreeSlots (session, dir);
3610        for (i=0; i<n; i++) {
3611            tr_torrent * tor = tr_sessionGetNextQueuedTorrent (session, dir);
3612            if (tor != NULL) {
3613                tr_torrentStartNow (tor);
3614                if (tor->queue_started_callback != NULL)
3615                  (*tor->queue_started_callback)(tor, tor->queue_started_user_data);
3616            }
3617        }
3618    }
3619}
3620
3621static void
3622bandwidthPulse (int foo UNUSED, short bar UNUSED, void * vmgr)
3623{
3624    tr_torrent * tor;
3625    tr_peerMgr * mgr = vmgr;
3626    tr_session * session = mgr->session;
3627    managerLock (mgr);
3628
3629    /* FIXME: this next line probably isn't necessary... */
3630    pumpAllPeers (mgr);
3631
3632    /* allocate bandwidth to the peers */
3633    tr_bandwidthAllocate (&session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC);
3634    tr_bandwidthAllocate (&session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC);
3635
3636    /* torrent upkeep */
3637    tor = NULL;
3638    while ((tor = tr_torrentNext (session, tor)))
3639    {
3640        /* possibly stop torrents that have seeded enough */
3641        tr_torrentCheckSeedLimit (tor);
3642
3643        /* run the completeness check for any torrents that need it */
3644        if (tor->torrentPeers->needsCompletenessCheck) {
3645            tor->torrentPeers->needsCompletenessCheck  = false;
3646            tr_torrentRecheckCompleteness (tor);
3647        }
3648
3649        /* stop torrents that are ready to stop, but couldn't be stopped
3650           earlier during the peer-io callback call chain */
3651        if (tor->isStopping)
3652            tr_torrentStop (tor);
3653    }
3654
3655    /* pump the queues */
3656    queuePulse (session, TR_UP);
3657    queuePulse (session, TR_DOWN);
3658
3659    reconnectPulse (0, 0, mgr);
3660
3661    tr_timerAddMsec (mgr->bandwidthTimer, BANDWIDTH_PERIOD_MSEC);
3662    managerUnlock (mgr);
3663}
3664
3665/***
3666****
3667***/
3668
3669static int
3670compareAtomPtrsByAddress (const void * va, const void *vb)
3671{
3672    const struct peer_atom * a = * (const struct peer_atom**) va;
3673    const struct peer_atom * b = * (const struct peer_atom**) vb;
3674
3675    assert (tr_isAtom (a));
3676    assert (tr_isAtom (b));
3677
3678    return tr_address_compare (&a->addr, &b->addr);
3679}
3680
3681/* best come first, worst go last */
3682static int
3683compareAtomPtrsByShelfDate (const void * va, const void *vb)
3684{
3685    time_t atime;
3686    time_t btime;
3687    const struct peer_atom * a = * (const struct peer_atom**) va;
3688    const struct peer_atom * b = * (const struct peer_atom**) vb;
3689    const int data_time_cutoff_secs = 60 * 60;
3690    const time_t tr_now = tr_time ();
3691
3692    assert (tr_isAtom (a));
3693    assert (tr_isAtom (b));
3694
3695    /* primary key: the last piece data time *if* it was within the last hour */
3696    atime = a->piece_data_time; if (atime + data_time_cutoff_secs < tr_now) atime = 0;
3697    btime = b->piece_data_time; if (btime + data_time_cutoff_secs < tr_now) btime = 0;
3698    if (atime != btime)
3699        return atime > btime ? -1 : 1;
3700
3701    /* secondary key: shelf date. */
3702    if (a->shelf_date != b->shelf_date)
3703        return a->shelf_date > b->shelf_date ? -1 : 1;
3704
3705    return 0;
3706}
3707
3708static int
3709getMaxAtomCount (const tr_torrent * tor)
3710{
3711    const int n = tor->maxConnectedPeers;
3712    /* approximate fit of the old jump discontinuous function */
3713    if (n >= 55) return     n + 150;
3714    if (n >= 20) return 2 * n + 95;
3715    return               4 * n + 55;
3716}
3717
3718static void
3719atomPulse (int foo UNUSED, short bar UNUSED, void * vmgr)
3720{
3721    tr_torrent * tor = NULL;
3722    tr_peerMgr * mgr = vmgr;
3723    managerLock (mgr);
3724
3725    while ((tor = tr_torrentNext (mgr->session, tor)))
3726    {
3727        int atomCount;
3728        Torrent * t = tor->torrentPeers;
3729        const int maxAtomCount = getMaxAtomCount (tor);
3730        struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek (&t->pool, &atomCount);
3731
3732        if (atomCount > maxAtomCount) /* we've got too many atoms... time to prune */
3733        {
3734            int i;
3735            int keepCount = 0;
3736            int testCount = 0;
3737            struct peer_atom ** keep = tr_new (struct peer_atom*, atomCount);
3738            struct peer_atom ** test = tr_new (struct peer_atom*, atomCount);
3739
3740            /* keep the ones that are in use */
3741            for (i=0; i<atomCount; ++i) {
3742                struct peer_atom * atom = atoms[i];
3743                if (peerIsInUse (t, atom))
3744                    keep[keepCount++] = atom;
3745                else
3746                    test[testCount++] = atom;
3747            }
3748
3749            /* if there's room, keep the best of what's left */
3750            i = 0;
3751            if (keepCount < maxAtomCount) {
3752                qsort (test, testCount, sizeof (struct peer_atom *), compareAtomPtrsByShelfDate);
3753                while (i<testCount && keepCount<maxAtomCount)
3754                    keep[keepCount++] = test[i++];
3755            }
3756
3757            /* free the culled atoms */
3758            while (i<testCount)
3759                tr_free (test[i++]);
3760
3761            /* rebuild Torrent.pool with what's left */
3762            tr_ptrArrayDestruct (&t->pool, NULL);
3763            t->pool = TR_PTR_ARRAY_INIT;
3764            qsort (keep, keepCount, sizeof (struct peer_atom *), compareAtomPtrsByAddress);
3765            for (i=0; i<keepCount; ++i)
3766                tr_ptrArrayAppend (&t->pool, keep[i]);
3767
3768            tordbg (t, "max atom count is %d... pruned from %d to %d\n", maxAtomCount, atomCount, keepCount);
3769
3770            /* cleanup */
3771            tr_free (test);
3772            tr_free (keep);
3773        }
3774    }
3775
3776    tr_timerAddMsec (mgr->atomTimer, ATOM_PERIOD_MSEC);
3777    managerUnlock (mgr);
3778}
3779
3780/***
3781****
3782****
3783****
3784***/
3785
3786/* is this atom someone that we'd want to initiate a connection to? */
3787static bool
3788isPeerCandidate (const tr_torrent * tor, struct peer_atom * atom, const time_t now)
3789{
3790    /* not if we're both seeds */
3791    if (tr_torrentIsSeed (tor) && atomIsSeed (atom))
3792        return false;
3793
3794    /* not if we've already got a connection to them... */
3795    if (peerIsInUse (tor->torrentPeers, atom))
3796        return false;
3797
3798    /* not if we just tried them already */
3799    if ((now - atom->time) < getReconnectIntervalSecs (atom, now))
3800        return false;
3801
3802    /* not if they're blocklisted */
3803    if (isAtomBlocklisted (tor->session, atom))
3804        return false;
3805
3806    /* not if they're banned... */
3807    if (atom->flags2 & MYFLAG_BANNED)
3808        return false;
3809
3810    return true;
3811}
3812
3813struct peer_candidate
3814{
3815    uint64_t score;
3816    tr_torrent * tor;
3817    struct peer_atom * atom;
3818};
3819
3820static bool
3821torrentWasRecentlyStarted (const tr_torrent * tor)
3822{
3823    return difftime (tr_time (), tor->startDate) < 120;
3824}
3825
3826static inline uint64_t
3827addValToKey (uint64_t value, int width, uint64_t addme)
3828{
3829    value = (value << (uint64_t)width);
3830    value |= addme;
3831    return value;
3832}
3833
3834/* smaller value is better */
3835static uint64_t
3836getPeerCandidateScore (const tr_torrent * tor, const struct peer_atom * atom, uint8_t salt)
3837{
3838    uint64_t i;
3839    uint64_t score = 0;
3840    const bool failed = atom->lastConnectionAt < atom->lastConnectionAttemptAt;
3841
3842    /* prefer peers we've connected to, or never tried, over peers we failed to connect to. */
3843    i = failed ? 1 : 0;
3844    score = addValToKey (score, 1, i);
3845
3846    /* prefer the one we attempted least recently (to cycle through all peers) */
3847    i = atom->lastConnectionAttemptAt;
3848    score = addValToKey (score, 32, i);
3849
3850    /* prefer peers belonging to a torrent of a higher priority */
3851    switch (tr_torrentGetPriority (tor)) {
3852        case TR_PRI_HIGH:    i = 0; break;
3853        case TR_PRI_NORMAL:  i = 1; break;
3854        case TR_PRI_LOW:     i = 2; break;
3855    }
3856    score = addValToKey (score, 4, i);
3857
3858    /* prefer recently-started torrents */
3859    i = torrentWasRecentlyStarted (tor) ? 0 : 1;
3860    score = addValToKey (score, 1, i);
3861
3862    /* prefer torrents we're downloading with */
3863    i = tr_torrentIsSeed (tor) ? 1 : 0;
3864    score = addValToKey (score, 1, i);
3865
3866    /* prefer peers that are known to be connectible */
3867    i = (atom->flags & ADDED_F_CONNECTABLE) ? 0 : 1;
3868    score = addValToKey (score, 1, i);
3869
3870    /* prefer peers that we might have a chance of uploading to...
3871       so lower seed probability is better */
3872    if (atom->seedProbability == 100) i = 101;
3873    else if (atom->seedProbability == -1) i = 100;
3874    else i = atom->seedProbability;
3875    score = addValToKey (score, 8, i);
3876
3877    /* Prefer peers that we got from more trusted sources.
3878     * lower `fromBest' values indicate more trusted sources */
3879    score = addValToKey (score, 4, atom->fromBest);
3880
3881    /* salt */
3882    score = addValToKey (score, 8, salt);
3883
3884    return score;
3885}
3886
3887#ifndef NDEBUG
3888static int
3889checkPartition (const struct peer_candidate * candidates, int left, int right, uint64_t pivotScore, int storeIndex)
3890{
3891    int i;
3892
3893    assert (storeIndex >= left);
3894    assert (storeIndex <= right);
3895    assert (candidates[storeIndex].score == pivotScore);
3896
3897    for (i=left; i<storeIndex; ++i)
3898        assert (candidates[i].score < pivotScore);
3899    for (i=storeIndex+1; i<=right; ++i)
3900        assert (candidates[i].score >= pivotScore);
3901
3902    return true;
3903}
3904#endif
3905
3906/* Helper to selectBestCandidates ().
3907 * Adapted from http://en.wikipedia.org/wiki/Selection_algorithm */
3908static int
3909partitionPeerCandidates (struct peer_candidate * candidates, int left, int right, int pivotIndex)
3910{
3911    int i;
3912    int storeIndex;
3913    struct peer_candidate tmp;
3914    const struct peer_candidate pivotValue = candidates[pivotIndex];
3915
3916    /* move pivot to end */
3917    tmp = candidates[right];
3918    candidates[right] = pivotValue;
3919    candidates[pivotIndex] = tmp;
3920
3921    storeIndex = left;
3922    for (i=left; i<=right; ++i)
3923    {
3924        if (candidates[i].score < pivotValue.score)
3925        {
3926            tmp = candidates[storeIndex];
3927            candidates[storeIndex] = candidates[i];
3928            candidates[i] = tmp;
3929            storeIndex++;
3930        }
3931    }
3932
3933    /* move pivot to its final place */
3934    tmp = candidates[right];
3935    candidates[right] = candidates[storeIndex];
3936    candidates[storeIndex] = tmp;
3937
3938    /* sanity check */
3939    assert (checkPartition (candidates, left, right, pivotValue.score, storeIndex));
3940
3941    return storeIndex;
3942}
3943
3944/* Adapted from http://en.wikipedia.org/wiki/Selection_algorithm */
3945static void
3946selectPeerCandidates (struct peer_candidate * candidates, int left, int right, int k)
3947{
3948    if (right > left)
3949    {
3950        const int pivotIndex = left + (right-left)/2;
3951
3952        int pivotNewIndex = partitionPeerCandidates (candidates, left, right, pivotIndex);
3953
3954        if (pivotNewIndex > left + k) /* new condition */
3955            selectPeerCandidates (candidates, left, pivotNewIndex-1, k);
3956        else if (pivotNewIndex < left + k)
3957            selectPeerCandidates (candidates, pivotNewIndex+1, right, k+left-pivotNewIndex-1);
3958    }
3959}
3960
3961#ifndef NDEBUG
3962static bool
3963checkBestScoresComeFirst (const struct peer_candidate * candidates, int n, int k)
3964{
3965    int i;
3966    uint64_t worstFirstScore = 0;
3967    const int x = MIN (n, k) - 1;
3968
3969    for (i=0; i<x; i++)
3970        if (worstFirstScore < candidates[i].score)
3971            worstFirstScore = candidates[i].score;
3972
3973    for (i=0; i<x; i++)
3974        assert (candidates[i].score <= worstFirstScore);
3975
3976    for (i=x+1; i<n; i++)
3977        assert (candidates[i].score >= worstFirstScore);
3978
3979    return true;
3980}
3981#endif /* NDEBUG */
3982
3983/** @return an array of all the atoms we might want to connect to */
3984static struct peer_candidate*
3985getPeerCandidates (tr_session * session, int * candidateCount, int max)
3986{
3987    int atomCount;
3988    int peerCount;
3989    tr_torrent * tor;
3990    struct peer_candidate * candidates;
3991    struct peer_candidate * walk;
3992    const time_t now = tr_time ();
3993    const uint64_t now_msec = tr_time_msec ();
3994    /* leave 5% of connection slots for incoming connections -- ticket #2609 */
3995    const int maxCandidates = tr_sessionGetPeerLimit (session) * 0.95;
3996
3997    /* count how many peers and atoms we've got */
3998    tor= NULL;
3999    atomCount = 0;
4000    peerCount = 0;
4001    while ((tor = tr_torrentNext (session, tor))) {
4002        atomCount += tr_ptrArraySize (&tor->torrentPeers->pool);
4003        peerCount += tr_ptrArraySize (&tor->torrentPeers->peers);
4004    }
4005
4006    /* don't start any new handshakes if we're full up */
4007    if (maxCandidates <= peerCount) {
4008        *candidateCount = 0;
4009        return NULL;
4010    }
4011
4012    /* allocate an array of candidates */
4013    walk = candidates = tr_new (struct peer_candidate, atomCount);
4014
4015    /* populate the candidate array */
4016    tor = NULL;
4017    while ((tor = tr_torrentNext (session, tor)))
4018    {
4019        int i, nAtoms;
4020        struct peer_atom ** atoms;
4021
4022        if (!tor->torrentPeers->isRunning)
4023            continue;
4024
4025        /* if we've already got enough peers in this torrent... */
4026        if (tr_torrentGetPeerLimit (tor) <= tr_ptrArraySize (&tor->torrentPeers->peers))
4027            continue;
4028
4029        /* if we've already got enough speed in this torrent... */
4030        if (tr_torrentIsSeed (tor) && isBandwidthMaxedOut (&tor->bandwidth, now_msec, TR_UP))
4031            continue;
4032
4033        atoms = (struct peer_atom**) tr_ptrArrayPeek (&tor->torrentPeers->pool, &nAtoms);
4034        for (i=0; i<nAtoms; ++i)
4035        {
4036            struct peer_atom * atom = atoms[i];
4037
4038            if (isPeerCandidate (tor, atom, now))
4039            {
4040                const uint8_t salt = tr_cryptoWeakRandInt (1024);
4041                walk->tor = tor;
4042                walk->atom = atom;
4043                walk->score = getPeerCandidateScore (tor, atom, salt);
4044                ++walk;
4045            }
4046        }
4047    }
4048
4049    *candidateCount = walk - candidates;
4050    if (walk != candidates)
4051        selectPeerCandidates (candidates, 0, (walk-candidates)-1, max);
4052
4053    assert (checkBestScoresComeFirst (candidates, *candidateCount, max));
4054
4055    return candidates;
4056}
4057
4058static void
4059initiateConnection (tr_peerMgr * mgr, Torrent * t, struct peer_atom * atom)
4060{
4061    tr_peerIo * io;
4062    const time_t now = tr_time ();
4063    bool utp = tr_sessionIsUTPEnabled (mgr->session) && !atom->utp_failed;
4064
4065    if (atom->fromFirst == TR_PEER_FROM_PEX)
4066        /* PEX has explicit signalling for uTP support.  If an atom
4067           originally came from PEX and doesn't have the uTP flag, skip the
4068           uTP connection attempt.  Are we being optimistic here? */
4069        utp = utp && (atom->flags & ADDED_F_UTP_FLAGS);
4070
4071    tordbg (t, "Starting an OUTGOING%s connection with %s",
4072            utp ? " µTP" : "",
4073            tr_atomAddrStr (atom));
4074
4075    io = tr_peerIoNewOutgoing (mgr->session,
4076                               &mgr->session->bandwidth,
4077                               &atom->addr,
4078                               atom->port,
4079                               t->tor->info.hash,
4080                               t->tor->completeness == TR_SEED,
4081                               utp);
4082
4083    if (io == NULL)
4084    {
4085        tordbg (t, "peerIo not created; marking peer %s as unreachable",
4086                tr_atomAddrStr (atom));
4087        atom->flags2 |= MYFLAG_UNREACHABLE;
4088        atom->numFails++;
4089    }
4090    else
4091    {
4092        tr_handshake * handshake = tr_handshakeNew (io,
4093                                                    mgr->session->encryptionMode,
4094                                                    myHandshakeDoneCB,
4095                                                    mgr);
4096
4097        assert (tr_peerIoGetTorrentHash (io));
4098
4099        tr_peerIoUnref (io); /* balanced by the initial ref
4100                                 in tr_peerIoNewOutgoing () */
4101
4102        tr_ptrArrayInsertSorted (&t->outgoingHandshakes, handshake,
4103                                 handshakeCompare);
4104    }
4105
4106    atom->lastConnectionAttemptAt = now;
4107    atom->time = now;
4108}
4109
4110static void
4111initiateCandidateConnection (tr_peerMgr * mgr, struct peer_candidate * c)
4112{
4113#if 0
4114    fprintf (stderr, "Starting an OUTGOING connection with %s - [%s] seedProbability==%d; %s, %s\n",
4115             tr_atomAddrStr (c->atom),
4116             tr_torrentName (c->tor),
4117           (int)c->atom->seedProbability,
4118             tr_torrentIsPrivate (c->tor) ? "private" : "public",
4119             tr_torrentIsSeed (c->tor) ? "seed" : "downloader");
4120#endif
4121
4122    initiateConnection (mgr, c->tor->torrentPeers, c->atom);
4123}
4124
4125static void
4126makeNewPeerConnections (struct tr_peerMgr * mgr, const int max)
4127{
4128    int i, n;
4129    struct peer_candidate * candidates;
4130
4131    candidates = getPeerCandidates (mgr->session, &n, max);
4132
4133    for (i=0; i<n && i<max; ++i)
4134        initiateCandidateConnection (mgr, &candidates[i]);
4135
4136    tr_free (candidates);
4137}
Note: See TracBrowser for help on using the repository browser.