source: trunk/libtransmission/peer-mgr.c @ 14083

Last change on this file since 14083 was 14083, checked in by jordan, 9 years ago

(trunk, libT) fix tr_torrentStat() regression in the nightlies reported in #5294 by mw3demo

  • Property svn:keywords set to Date Rev Author Id
File size: 108.1 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2 (b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 14083 2013-05-27 21:04:48Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h> /* error codes ERANGE, ... */
15#include <limits.h> /* INT_MAX */
16#include <string.h> /* memcpy, memcmp, strstr */
17#include <stdlib.h> /* qsort */
18
19#include <event2/event.h>
20
21#include <libutp/utp.h>
22
23#include "transmission.h"
24#include "announcer.h"
25#include "bandwidth.h"
26#include "blocklist.h"
27#include "cache.h"
28#include "clients.h"
29#include "completion.h"
30#include "crypto.h"
31#include "handshake.h"
32#include "log.h"
33#include "net.h"
34#include "peer-io.h"
35#include "peer-mgr.h"
36#include "peer-msgs.h"
37#include "ptrarray.h"
38#include "session.h"
39#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
40#include "torrent.h"
41#include "tr-utp.h"
42#include "utils.h"
43#include "webseed.h"
44
45enum
46{
47  /* how frequently to cull old atoms */
48  ATOM_PERIOD_MSEC = (60 * 1000),
49
50  /* how frequently to change which peers are choked */
51  RECHOKE_PERIOD_MSEC = (10 * 1000),
52
53  /* an optimistically unchoked peer is immune from rechoking
54     for this many calls to rechokeUploads (). */
55  OPTIMISTIC_UNCHOKE_MULTIPLIER = 4,
56
57  /* how frequently to reallocate bandwidth */
58  BANDWIDTH_PERIOD_MSEC = 500,
59
60  /* how frequently to age out old piece request lists */
61  REFILL_UPKEEP_PERIOD_MSEC = (10 * 1000),
62
63  /* how frequently to decide which peers live and die */
64  RECONNECT_PERIOD_MSEC = 500,
65
66  /* when many peers are available, keep idle ones this long */
67  MIN_UPLOAD_IDLE_SECS = (60),
68
69  /* when few peers are available, keep idle ones this long */
70  MAX_UPLOAD_IDLE_SECS = (60 * 5),
71
72  /* max number of peers to ask for per second overall.
73   * this throttle is to avoid overloading the router */
74  MAX_CONNECTIONS_PER_SECOND = 12,
75
76  MAX_CONNECTIONS_PER_PULSE = (int)(MAX_CONNECTIONS_PER_SECOND * (RECONNECT_PERIOD_MSEC/1000.0)),
77
78  /* number of bad pieces a peer is allowed to send before we ban them */
79  MAX_BAD_PIECES_PER_PEER = 5,
80
81  /* amount of time to keep a list of request pieces lying around
82     before it's considered too old and needs to be rebuilt */
83  PIECE_LIST_SHELF_LIFE_SECS = 60,
84
85  /* use for bitwise operations w/peer_atom.flags2 */
86  MYFLAG_BANNED = 1,
87
88  /* use for bitwise operations w/peer_atom.flags2 */
89  /* unreachable for now... but not banned.
90   * if they try to connect to us it's okay */
91  MYFLAG_UNREACHABLE = 2,
92
93  /* the minimum we'll wait before attempting to reconnect to a peer */
94  MINIMUM_RECONNECT_INTERVAL_SECS = 5,
95
96  /** how long we'll let requests we've made linger before we cancel them */
97  REQUEST_TTL_SECS = 90,
98
99  NO_BLOCKS_CANCEL_HISTORY = 120,
100
101  CANCEL_HISTORY_SEC = 60
102};
103
104const tr_peer_event TR_PEER_EVENT_INIT = { 0, 0, NULL, 0, 0, 0, 0 };
105
106/**
107***
108**/
109
110/**
111 * Peer information that should be kept even before we've connected and
112 * after we've disconnected. These are kept in a pool of peer_atoms to decide
113 * which ones would make good candidates for connecting to, and to watch out
114 * for banned peers.
115 *
116 * @see tr_peer
117 * @see tr_peerMsgs
118 */
119struct peer_atom
120{
121  uint8_t     fromFirst;          /* where the peer was first found */
122  uint8_t     fromBest;           /* the "best" value of where the peer has been found */
123  uint8_t     flags;              /* these match the added_f flags */
124  uint8_t     flags2;             /* flags that aren't defined in added_f */
125  int8_t      seedProbability;    /* how likely is this to be a seed... [0..100] or -1 for unknown */
126  int8_t      blocklisted;        /* -1 for unknown, true for blocklisted, false for not blocklisted */
127
128  tr_port     port;
129  bool        utp_failed;         /* We recently failed to connect over uTP */
130  uint16_t    numFails;
131  time_t      time;               /* when the peer's connection status last changed */
132  time_t      piece_data_time;
133
134  time_t      lastConnectionAttemptAt;
135  time_t      lastConnectionAt;
136
137  /* similar to a TTL field, but less rigid --
138   * if the swarm is small, the atom will be kept past this date. */
139  time_t      shelf_date;
140  tr_peer   * peer;               /* will be NULL if not connected */
141  tr_address  addr;
142};
143
144#ifdef NDEBUG
145#define tr_isAtom(a) (TRUE)
146#else
147static bool
148tr_isAtom (const struct peer_atom * atom)
149{
150  return (atom != NULL)
151      && (atom->fromFirst < TR_PEER_FROM__MAX)
152      && (atom->fromBest < TR_PEER_FROM__MAX)
153      && (tr_address_is_valid (&atom->addr));
154}
155#endif
156
157static const char*
158tr_atomAddrStr (const struct peer_atom * atom)
159{
160  return atom ? tr_peerIoAddrStr (&atom->addr, atom->port) : "[no atom]";
161}
162
163struct block_request
164{
165  tr_block_index_t block;
166  tr_peer * peer;
167  time_t sentAt;
168};
169
170struct weighted_piece
171{
172  tr_piece_index_t index;
173  int16_t salt;
174  int16_t requestCount;
175};
176
177enum piece_sort_state
178{
179  PIECES_UNSORTED,
180  PIECES_SORTED_BY_INDEX,
181  PIECES_SORTED_BY_WEIGHT
182};
183
184/** @brief Opaque, per-torrent data structure for peer connection information */
185typedef struct tr_swarm
186{
187  tr_ptrArray                outgoingHandshakes; /* tr_handshake */
188  tr_ptrArray                pool; /* struct peer_atom */
189  tr_ptrArray                peers; /* tr_peerMsgs */
190  tr_ptrArray                webseeds; /* tr_webseed */
191
192  tr_torrent               * tor;
193  struct tr_peerMgr        * manager;
194
195  tr_peerMsgs              * optimistic; /* the optimistic peer, or NULL if none */
196  int                        optimisticUnchokeTimeScaler;
197
198  bool                       isRunning;
199  bool                       needsCompletenessCheck;
200
201  struct block_request     * requests;
202  int                        requestCount;
203  int                        requestAlloc;
204
205  struct weighted_piece    * pieces;
206  int                        pieceCount;
207  enum piece_sort_state      pieceSortState;
208
209  /* An array of pieceCount items stating how many peers have each piece.
210     This is used to help us for downloading pieces "rarest first."
211     This may be NULL if we don't have metainfo yet, or if we're not
212     downloading and don't care about rarity */
213  uint16_t                 * pieceReplication;
214  size_t                     pieceReplicationSize;
215
216  int                        interestedCount;
217  int                        maxPeers;
218  time_t                     lastCancel;
219
220  /* Before the endgame this should be 0. In endgame, is contains the average
221   * number of pending requests per peer. Only peers which have more pending
222   * requests are considered 'fast' are allowed to request a block that's
223   * already been requested from another (slower?) peer. */
224  int                        endgame;
225}
226tr_swarm;
227
228struct tr_peerMgr
229{
230  tr_session    * session;
231  tr_ptrArray     incomingHandshakes; /* tr_handshake */
232  struct event  * bandwidthTimer;
233  struct event  * rechokeTimer;
234  struct event  * refillUpkeepTimer;
235  struct event  * atomTimer;
236};
237
238#define tordbg(t, ...) \
239  do \
240    { \
241      if (tr_logGetDeepEnabled ()) \
242        tr_logAddDeep (__FILE__, __LINE__, tr_torrentName (t->tor), __VA_ARGS__); \
243    } \
244  while (0)
245
246#define dbgmsg(...) \
247  do \
248    { \
249      if (tr_logGetDeepEnabled ()) \
250        tr_logAddDeep (__FILE__, __LINE__, NULL, __VA_ARGS__); \
251    } \
252  while (0)
253
254/**
255*** tr_peer virtual functions
256**/
257
258static bool
259tr_peerIsTransferringPieces (const tr_peer * peer,
260                             uint64_t        now,
261                             tr_direction    direction,
262                             unsigned int  * Bps)
263{
264  assert (peer != NULL);
265  assert (peer->funcs != NULL);
266
267  return (*peer->funcs->is_transferring_pieces)(peer, now, direction, Bps);
268}
269
270unsigned int
271tr_peerGetPieceSpeed_Bps (const tr_peer    * peer,
272                          uint64_t           now,
273                          tr_direction       direction)
274{
275  unsigned int Bps = 0;
276  tr_peerIsTransferringPieces (peer, now, direction, &Bps);
277  return Bps;
278}
279
280static void
281tr_peerFree (tr_peer * peer)
282{
283  assert (peer != NULL);
284  assert (peer->funcs != NULL);
285
286  (*peer->funcs->destruct)(peer);
287
288  tr_free (peer);
289}
290
291void
292tr_peerConstruct (tr_peer * peer, const tr_torrent * tor)
293{
294  assert (peer != NULL);
295  assert (tr_isTorrent (tor));
296
297  memset (peer, 0, sizeof (tr_peer));
298
299  peer->client = TR_KEY_NONE;
300  peer->swarm = tor->swarm;
301  tr_bitfieldConstruct (&peer->have, tor->info.pieceCount);
302  tr_bitfieldConstruct (&peer->blame, tor->blockCount);
303}
304
305static void peerDeclinedAllRequests (tr_swarm *, const tr_peer *);
306
307void
308tr_peerDestruct (tr_peer * peer)
309{
310  assert (peer != NULL);
311
312  if (peer->swarm != NULL)
313    peerDeclinedAllRequests (peer->swarm, peer);
314
315  tr_bitfieldDestruct (&peer->have);
316  tr_bitfieldDestruct (&peer->blame);
317
318  if (peer->atom)
319    peer->atom->peer = NULL;
320}
321
322/**
323***
324**/
325
326static inline void
327managerLock (const struct tr_peerMgr * manager)
328{
329  tr_sessionLock (manager->session);
330}
331
332static inline void
333managerUnlock (const struct tr_peerMgr * manager)
334{
335  tr_sessionUnlock (manager->session);
336}
337
338static inline void
339swarmLock (tr_swarm * swarm)
340{
341  managerLock (swarm->manager);
342}
343
344static inline void
345swarmUnlock (tr_swarm * swarm)
346{
347  managerUnlock (swarm->manager);
348}
349
350static inline int
351swarmIsLocked (const tr_swarm * swarm)
352{
353  return tr_sessionIsLocked (swarm->manager->session);
354}
355
356/**
357***
358**/
359
360static int
361handshakeCompareToAddr (const void * va, const void * vb)
362{
363  const tr_handshake * a = va;
364
365  return tr_address_compare (tr_handshakeGetAddr (a, NULL), vb);
366}
367
368static int
369handshakeCompare (const void * a, const void * b)
370{
371  return handshakeCompareToAddr (a, tr_handshakeGetAddr (b, NULL));
372}
373
374static inline tr_handshake*
375getExistingHandshake (tr_ptrArray * handshakes, const tr_address * addr)
376{
377  if (tr_ptrArrayEmpty (handshakes))
378    return NULL;
379
380  return tr_ptrArrayFindSorted (handshakes, addr, handshakeCompareToAddr);
381}
382
383static int
384comparePeerAtomToAddress (const void * va, const void * vb)
385{
386  const struct peer_atom * a = va;
387
388  return tr_address_compare (&a->addr, vb);
389}
390
391static int
392compareAtomsByAddress (const void * va, const void * vb)
393{
394  const struct peer_atom * b = vb;
395
396  assert (tr_isAtom (b));
397
398  return comparePeerAtomToAddress (va, &b->addr);
399}
400
401/**
402***
403**/
404
405const tr_address *
406tr_peerAddress (const tr_peer * peer)
407{
408  return &peer->atom->addr;
409}
410
411static tr_swarm *
412getExistingSwarm (tr_peerMgr *    manager,
413                  const uint8_t * hash)
414{
415  tr_torrent * tor = tr_torrentFindFromHash (manager->session, hash);
416
417  return tor == NULL ? NULL : tor->swarm;
418}
419
420static int
421peerCompare (const void * a, const void * b)
422{
423  return tr_address_compare (tr_peerAddress (a), tr_peerAddress (b));
424}
425
426static struct peer_atom*
427getExistingAtom (const tr_swarm   * cswarm,
428                 const tr_address * addr)
429{
430  tr_swarm * swarm = (tr_swarm*) cswarm;
431  return tr_ptrArrayFindSorted (&swarm->pool, addr, comparePeerAtomToAddress);
432}
433
434static bool
435peerIsInUse (const tr_swarm * cs, const struct peer_atom * atom)
436{
437  tr_swarm * s = (tr_swarm*) cs;
438
439  assert (swarmIsLocked (s));
440
441  return (atom->peer != NULL)
442      || getExistingHandshake (&s->outgoingHandshakes, &atom->addr)
443      || getExistingHandshake (&s->manager->incomingHandshakes, &atom->addr);
444}
445
446static inline bool
447replicationExists (const tr_swarm * s)
448{
449  return s->pieceReplication != NULL;
450}
451
452static void
453replicationFree (tr_swarm * s)
454{
455  tr_free (s->pieceReplication);
456  s->pieceReplication = NULL;
457  s->pieceReplicationSize = 0;
458}
459
460static void
461replicationNew (tr_swarm * s)
462{
463  tr_piece_index_t piece_i;
464  const tr_piece_index_t piece_count = s->tor->info.pieceCount;
465  const int n = tr_ptrArraySize (&s->peers);
466
467  assert (!replicationExists (s));
468
469  s->pieceReplicationSize = piece_count;
470  s->pieceReplication = tr_new0 (uint16_t, piece_count);
471
472  for (piece_i=0; piece_i<piece_count; ++piece_i)
473    {
474      int peer_i;
475      uint16_t r = 0;
476
477      for (peer_i=0; peer_i<n; ++peer_i)
478        {
479          tr_peer * peer = tr_ptrArrayNth (&s->peers, peer_i);
480          if (tr_bitfieldHas (&peer->have, piece_i))
481            ++r;
482        }
483
484      s->pieceReplication[piece_i] = r;
485    }
486}
487
488static void
489resetTorrentStats (tr_torrent * tor)
490{
491  int i;
492
493  tor->peerCount = 0;
494  tor->activePeerCount[TR_UP] = 0;
495  tor->activePeerCount[TR_DOWN] = 0;
496  tor->activeWebseedCount = 0;
497  for (i=0; i<TR_PEER_FROM__MAX; i++)
498    tor->peerFromCount[i] = 0;
499}
500
501static void
502swarmFree (void * vs)
503{
504  tr_swarm * s = vs;
505
506  assert (s);
507  assert (!s->isRunning);
508  assert (swarmIsLocked (s));
509  assert (tr_ptrArrayEmpty (&s->outgoingHandshakes));
510  assert (tr_ptrArrayEmpty (&s->peers));
511
512  tr_ptrArrayDestruct (&s->webseeds, (PtrArrayForeachFunc)tr_peerFree);
513  tr_ptrArrayDestruct (&s->pool, (PtrArrayForeachFunc)tr_free);
514  tr_ptrArrayDestruct (&s->outgoingHandshakes, NULL);
515  tr_ptrArrayDestruct (&s->peers, NULL);
516  resetTorrentStats (s->tor);
517
518  replicationFree (s);
519
520  tr_free (s->requests);
521  tr_free (s->pieces);
522  tr_free (s);
523}
524
525static void peerCallbackFunc (tr_peer *, const tr_peer_event *, void *);
526
527static void
528rebuildWebseedArray (tr_swarm * s, tr_torrent * tor)
529{
530  unsigned int i;
531  const tr_info * inf = &tor->info;
532
533  /* clear the array */
534  tr_ptrArrayDestruct (&s->webseeds, (PtrArrayForeachFunc)tr_peerFree);
535  s->webseeds = TR_PTR_ARRAY_INIT;
536  s->tor->activeWebseedCount = 0;
537
538  /* repopulate it */
539  for (i=0; i<inf->webseedCount; ++i)
540    {
541      tr_webseed * w = tr_webseedNew (tor, inf->webseeds[i], peerCallbackFunc, s);
542      tr_ptrArrayAppend (&s->webseeds, w);
543    }
544}
545
546static tr_swarm *
547swarmNew (tr_peerMgr * manager, tr_torrent * tor)
548{
549  tr_swarm * s;
550
551  s = tr_new0 (tr_swarm, 1);
552  s->manager = manager;
553  s->tor = tor;
554  s->pool = TR_PTR_ARRAY_INIT;
555  s->peers = TR_PTR_ARRAY_INIT;
556  s->webseeds = TR_PTR_ARRAY_INIT;
557  s->outgoingHandshakes = TR_PTR_ARRAY_INIT;
558
559  rebuildWebseedArray (s, tor);
560
561  return s;
562}
563
564static void ensureMgrTimersExist (struct tr_peerMgr * m);
565
566tr_peerMgr*
567tr_peerMgrNew (tr_session * session)
568{
569  tr_peerMgr * m = tr_new0 (tr_peerMgr, 1);
570  m->session = session;
571  m->incomingHandshakes = TR_PTR_ARRAY_INIT;
572  ensureMgrTimersExist (m);
573  return m;
574}
575
576static void
577deleteTimer (struct event ** t)
578{
579  if (*t != NULL)
580    {
581      event_free (*t);
582      *t = NULL;
583    }
584}
585
586static void
587deleteTimers (struct tr_peerMgr * m)
588{
589  deleteTimer (&m->atomTimer);
590  deleteTimer (&m->bandwidthTimer);
591  deleteTimer (&m->rechokeTimer);
592  deleteTimer (&m->refillUpkeepTimer);
593}
594
595void
596tr_peerMgrFree (tr_peerMgr * manager)
597{
598  managerLock (manager);
599
600  deleteTimers (manager);
601
602  /* free the handshakes. Abort invokes handshakeDoneCB (), which removes
603   * the item from manager->handshakes, so this is a little roundabout... */
604  while (!tr_ptrArrayEmpty (&manager->incomingHandshakes))
605    tr_handshakeAbort (tr_ptrArrayNth (&manager->incomingHandshakes, 0));
606
607  tr_ptrArrayDestruct (&manager->incomingHandshakes, NULL);
608
609  managerUnlock (manager);
610  tr_free (manager);
611}
612
613/***
614****
615***/
616
617void
618tr_peerMgrOnBlocklistChanged (tr_peerMgr * mgr)
619{
620  tr_torrent * tor = NULL;
621  tr_session * session = mgr->session;
622
623  /* we cache whether or not a peer is blocklisted...
624     since the blocklist has changed, erase that cached value */
625  while ((tor = tr_torrentNext (session, tor)))
626    {
627      int i;
628      tr_swarm * s = tor->swarm;
629      const int n = tr_ptrArraySize (&s->pool);
630      for (i=0; i<n; ++i)
631        {
632          struct peer_atom * atom = tr_ptrArrayNth (&s->pool, i);
633          atom->blocklisted = -1;
634        }
635    }
636}
637
638static bool
639isAtomBlocklisted (tr_session * session, struct peer_atom * atom)
640{
641  if (atom->blocklisted < 0)
642    atom->blocklisted = tr_sessionIsAddressBlocked (session, &atom->addr);
643
644  assert (tr_isBool (atom->blocklisted));
645  return atom->blocklisted;
646}
647
648
649/***
650****
651***/
652
653static void
654atomSetSeedProbability (struct peer_atom * atom, int seedProbability)
655{
656  assert (atom != NULL);
657  assert (-1<=seedProbability && seedProbability<=100);
658
659  atom->seedProbability = seedProbability;
660
661  if (seedProbability == 100)
662    atom->flags |= ADDED_F_SEED_FLAG;
663  else if (seedProbability != -1)
664    atom->flags &= ~ADDED_F_SEED_FLAG;
665}
666
667static inline bool
668atomIsSeed (const struct peer_atom * atom)
669{
670  return atom->seedProbability == 100;
671}
672
673static void
674atomSetSeed (const tr_swarm * s, struct peer_atom * atom)
675{
676  if (!atomIsSeed (atom))
677    {
678      tordbg (s, "marking peer %s as a seed", tr_atomAddrStr (atom));
679
680      atomSetSeedProbability (atom, 100);
681    }
682}
683
684
685bool
686tr_peerMgrPeerIsSeed (const tr_torrent  * tor,
687                      const tr_address  * addr)
688{
689  bool isSeed = false;
690  const tr_swarm * s = tor->swarm;
691  const struct peer_atom * atom = getExistingAtom (s, addr);
692
693  if (atom)
694    isSeed = atomIsSeed (atom);
695
696  return isSeed;
697}
698
699void
700tr_peerMgrSetUtpSupported (tr_torrent * tor, const tr_address * addr)
701{
702  struct peer_atom * atom = getExistingAtom (tor->swarm, addr);
703
704  if (atom)
705    atom->flags |= ADDED_F_UTP_FLAGS;
706}
707
708void
709tr_peerMgrSetUtpFailed (tr_torrent *tor, const tr_address *addr, bool failed)
710{
711  struct peer_atom * atom = getExistingAtom (tor->swarm, addr);
712
713  if (atom)
714    atom->utp_failed = failed;
715}
716
717
718/**
719***  REQUESTS
720***
721*** There are two data structures associated with managing block requests:
722***
723*** 1. tr_swarm::requests, an array of "struct block_request" which keeps
724***    track of which blocks have been requested, and when, and by which peers.
725***    This is list is used for (a) cancelling requests that have been pending
726***    for too long and (b) avoiding duplicate requests before endgame.
727***
728*** 2. tr_swarm::pieces, an array of "struct weighted_piece" which lists the
729***    pieces that we want to request. It's used to decide which blocks to
730***    return next when tr_peerMgrGetBlockRequests () is called.
731**/
732
733/**
734*** struct block_request
735**/
736
737static int
738compareReqByBlock (const void * va, const void * vb)
739{
740  const struct block_request * a = va;
741  const struct block_request * b = vb;
742
743  /* primary key: block */
744  if (a->block < b->block) return -1;
745  if (a->block > b->block) return 1;
746
747  /* secondary key: peer */
748  if (a->peer < b->peer) return -1;
749  if (a->peer > b->peer) return 1;
750
751  return 0;
752}
753
754static void
755requestListAdd (tr_swarm * s, tr_block_index_t block, tr_peer * peer)
756{
757  struct block_request key;
758
759  /* ensure enough room is available... */
760  if (s->requestCount + 1 >= s->requestAlloc)
761    {
762      const int CHUNK_SIZE = 128;
763      s->requestAlloc += CHUNK_SIZE;
764      s->requests = tr_renew (struct block_request,
765                              s->requests, s->requestAlloc);
766    }
767
768  /* populate the record we're inserting */
769  key.block = block;
770  key.peer = peer;
771  key.sentAt = tr_time ();
772
773  /* insert the request to our array... */
774  {
775    bool exact;
776    const int pos = tr_lowerBound (&key, s->requests, s->requestCount,
777                                   sizeof (struct block_request),
778                                   compareReqByBlock, &exact);
779    assert (!exact);
780    memmove (s->requests + pos + 1,
781             s->requests + pos,
782             sizeof (struct block_request) * (s->requestCount++ - pos));
783    s->requests[pos] = key;
784  }
785
786  if (peer != NULL)
787    {
788      ++peer->pendingReqsToPeer;
789      assert (peer->pendingReqsToPeer >= 0);
790    }
791
792  /*fprintf (stderr, "added request of block %lu from peer %s... "
793                     "there are now %d block\n",
794                     (unsigned long)block, tr_atomAddrStr (peer->atom), s->requestCount);*/
795}
796
797static struct block_request *
798requestListLookup (tr_swarm * s, tr_block_index_t block, const tr_peer * peer)
799{
800  struct block_request key;
801  key.block = block;
802  key.peer = (tr_peer*) peer;
803
804  return bsearch (&key, s->requests, s->requestCount,
805                  sizeof (struct block_request),
806                  compareReqByBlock);
807}
808
809/**
810 * Find the peers are we currently requesting the block
811 * with index @a block from and append them to @a peerArr.
812 */
813static void
814getBlockRequestPeers (tr_swarm * s, tr_block_index_t block,
815                      tr_ptrArray * peerArr)
816{
817  bool exact;
818  int i, pos;
819  struct block_request key;
820
821  key.block = block;
822  key.peer = NULL;
823  pos = tr_lowerBound (&key, s->requests, s->requestCount,
824                       sizeof (struct block_request),
825                       compareReqByBlock, &exact);
826
827  assert (!exact); /* shouldn't have a request with .peer == NULL */
828
829  for (i=pos; i<s->requestCount; ++i)
830    {
831      if (s->requests[i].block != block)
832        break;
833      tr_ptrArrayAppend (peerArr, s->requests[i].peer);
834    }
835}
836
837static void
838decrementPendingReqCount (const struct block_request * b)
839{
840  if (b->peer != NULL)
841    if (b->peer->pendingReqsToPeer > 0)
842      --b->peer->pendingReqsToPeer;
843}
844
845static void
846requestListRemove (tr_swarm * s, tr_block_index_t block, const tr_peer * peer)
847{
848  const struct block_request * b = requestListLookup (s, block, peer);
849
850  if (b != NULL)
851    {
852      const int pos = b - s->requests;
853      assert (pos < s->requestCount);
854
855      decrementPendingReqCount (b);
856
857      tr_removeElementFromArray (s->requests,
858                                 pos,
859                                 sizeof (struct block_request),
860                                 s->requestCount--);
861
862      /*fprintf (stderr, "removing request of block %lu from peer %s... "
863                         "there are now %d block requests left\n",
864                         (unsigned long)block, tr_atomAddrStr (peer->atom), t->requestCount);*/
865    }
866}
867
868static int
869countActiveWebseeds (tr_swarm * s)
870{
871  int activeCount;
872
873  if (s->tor->isRunning && !tr_torrentIsSeed (s->tor))
874    {
875      int i;
876      const int n = tr_ptrArraySize (&s->webseeds);
877      const uint64_t now = tr_time_msec ();
878
879      for (i=0; i<n; ++i)
880        if (tr_peerIsTransferringPieces (tr_ptrArrayNth(&s->webseeds,i), now, TR_DOWN, NULL))
881          ++activeCount;
882    }
883  else
884    {
885      activeCount = 0;
886    }
887
888  return activeCount;
889}
890
891static bool
892testForEndgame (const tr_swarm * s)
893{
894  /* we consider ourselves to be in endgame if the number of bytes
895     we've got requested is >= the number of bytes left to download */
896  return (s->requestCount * s->tor->blockSize)
897               >= tr_cpLeftUntilDone (&s->tor->completion);
898}
899
900static void
901updateEndgame (tr_swarm * s)
902{
903  assert (s->requestCount >= 0);
904
905  if (!testForEndgame (s))
906    {
907      /* not in endgame */
908      s->endgame = 0;
909    }
910  else if (!s->endgame) /* only recalculate when endgame first begins */
911    {
912      int i;
913      int numDownloading = 0;
914      const int n = tr_ptrArraySize (&s->peers);
915
916      /* add the active bittorrent peers... */
917      for (i=0; i<n; ++i)
918        {
919          const tr_peer * p = tr_ptrArrayNth (&s->peers, i);
920          if (p->pendingReqsToPeer > 0)
921            ++numDownloading;
922        }
923
924      /* add the active webseeds... */
925      numDownloading += countActiveWebseeds (s);
926
927      /* average number of pending requests per downloading peer */
928      s->endgame = s->requestCount / MAX (numDownloading, 1);
929    }
930}
931
932
933/****
934*****
935*****  Piece List Manipulation / Accessors
936*****
937****/
938
939static inline void
940invalidatePieceSorting (tr_swarm * s)
941{
942  s->pieceSortState = PIECES_UNSORTED;
943}
944
945static const tr_torrent * weightTorrent;
946
947static const uint16_t * weightReplication;
948
949static void
950setComparePieceByWeightTorrent (tr_swarm * s)
951{
952  if (!replicationExists (s))
953    replicationNew (s);
954
955  weightTorrent = s->tor;
956  weightReplication = s->pieceReplication;
957}
958
959/* we try to create a "weight" s.t. high-priority pieces come before others,
960 * and that partially-complete pieces come before empty ones. */
961static int
962comparePieceByWeight (const void * va, const void * vb)
963{
964  const struct weighted_piece * a = va;
965  const struct weighted_piece * b = vb;
966  int ia, ib, missing, pending;
967  const tr_torrent * tor = weightTorrent;
968  const uint16_t * rep = weightReplication;
969
970  /* primary key: weight */
971  missing = tr_cpMissingBlocksInPiece (&tor->completion, a->index);
972  pending = a->requestCount;
973  ia = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
974  missing = tr_cpMissingBlocksInPiece (&tor->completion, b->index);
975  pending = b->requestCount;
976  ib = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
977  if (ia < ib) return -1;
978  if (ia > ib) return 1;
979
980  /* secondary key: higher priorities go first */
981  ia = tor->info.pieces[a->index].priority;
982  ib = tor->info.pieces[b->index].priority;
983  if (ia > ib) return -1;
984  if (ia < ib) return 1;
985
986  /* tertiary key: rarest first. */
987  ia = rep[a->index];
988  ib = rep[b->index];
989  if (ia < ib) return -1;
990  if (ia > ib) return 1;
991
992  /* quaternary key: random */
993  if (a->salt < b->salt) return -1;
994  if (a->salt > b->salt) return 1;
995
996  /* okay, they're equal */
997  return 0;
998}
999
1000static int
1001comparePieceByIndex (const void * va, const void * vb)
1002{
1003  const struct weighted_piece * a = va;
1004  const struct weighted_piece * b = vb;
1005  if (a->index < b->index) return -1;
1006  if (a->index > b->index) return 1;
1007  return 0;
1008}
1009
1010static void
1011pieceListSort (tr_swarm * s, enum piece_sort_state state)
1012{
1013  assert (state==PIECES_SORTED_BY_INDEX
1014       || state==PIECES_SORTED_BY_WEIGHT);
1015
1016  if (state == PIECES_SORTED_BY_WEIGHT)
1017    {
1018      setComparePieceByWeightTorrent (s);
1019      qsort (s->pieces, s->pieceCount, sizeof (struct weighted_piece), comparePieceByWeight);
1020    }
1021  else
1022    {
1023      qsort (s->pieces, s->pieceCount, sizeof (struct weighted_piece), comparePieceByIndex);
1024    }
1025
1026  s->pieceSortState = state;
1027}
1028
1029/**
1030 * These functions are useful for testing, but too expensive for nightly builds.
1031 * let's leave it disabled but add an easy hook to compile it back in
1032 */
1033#if 1
1034#define assertWeightedPiecesAreSorted(t)
1035#define assertReplicationCountIsExact(t)
1036#else
1037static void
1038assertWeightedPiecesAreSorted (Torrent * t)
1039{
1040    if (!t->endgame)
1041    {
1042        int i;
1043        setComparePieceByWeightTorrent (t);
1044        for (i=0; i<t->pieceCount-1; ++i)
1045            assert (comparePieceByWeight (&t->pieces[i], &t->pieces[i+1]) <= 0);
1046    }
1047}
1048static void
1049assertReplicationCountIsExact (Torrent * t)
1050{
1051    /* This assert might fail due to errors of implementations in other
1052     * clients. It happens when receiving duplicate bitfields/HaveAll/HaveNone
1053     * from a client. If a such a behavior is noticed,
1054     * a bug report should be filled to the faulty client. */
1055
1056    size_t piece_i;
1057    const uint16_t * rep = t->pieceReplication;
1058    const size_t piece_count = t->pieceReplicationSize;
1059    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase (&t->peers);
1060    const int peer_count = tr_ptrArraySize (&t->peers);
1061
1062    assert (piece_count == t->tor->info.pieceCount);
1063
1064    for (piece_i=0; piece_i<piece_count; ++piece_i)
1065    {
1066        int peer_i;
1067        uint16_t r = 0;
1068
1069        for (peer_i=0; peer_i<peer_count; ++peer_i)
1070            if (tr_bitsetHas (&peers[peer_i]->have, piece_i))
1071                ++r;
1072
1073        assert (rep[piece_i] == r);
1074    }
1075}
1076#endif
1077
1078static struct weighted_piece *
1079pieceListLookup (tr_swarm * s, tr_piece_index_t index)
1080{
1081  int i;
1082
1083  for (i=0; i<s->pieceCount; ++i)
1084    if (s->pieces[i].index == index)
1085      return &s->pieces[i];
1086
1087  return NULL;
1088}
1089
1090static void
1091pieceListRebuild (tr_swarm * s)
1092{
1093  if (!tr_torrentIsSeed (s->tor))
1094    {
1095      tr_piece_index_t i;
1096      tr_piece_index_t * pool;
1097      tr_piece_index_t poolCount = 0;
1098      const tr_torrent * tor = s->tor;
1099      const tr_info * inf = tr_torrentInfo (tor);
1100      struct weighted_piece * pieces;
1101      int pieceCount;
1102
1103      /* build the new list */
1104      pool = tr_new (tr_piece_index_t, inf->pieceCount);
1105      for (i=0; i<inf->pieceCount; ++i)
1106        if (!inf->pieces[i].dnd)
1107          if (!tr_cpPieceIsComplete (&tor->completion, i))
1108            pool[poolCount++] = i;
1109      pieceCount = poolCount;
1110      pieces = tr_new0 (struct weighted_piece, pieceCount);
1111      for (i=0; i<poolCount; ++i)
1112        {
1113          struct weighted_piece * piece = pieces + i;
1114          piece->index = pool[i];
1115          piece->requestCount = 0;
1116          piece->salt = tr_cryptoWeakRandInt (4096);
1117        }
1118
1119      /* if we already had a list of pieces, merge it into
1120       * the new list so we don't lose its requestCounts */
1121      if (s->pieces != NULL)
1122        {
1123          struct weighted_piece * o = s->pieces;
1124          struct weighted_piece * oend = o + s->pieceCount;
1125          struct weighted_piece * n = pieces;
1126          struct weighted_piece * nend = n + pieceCount;
1127
1128          pieceListSort (s, PIECES_SORTED_BY_INDEX);
1129
1130          while (o!=oend && n!=nend)
1131            {
1132              if (o->index < n->index)
1133                ++o;
1134              else if (o->index > n->index)
1135                ++n;
1136              else
1137                *n++ = *o++;
1138            }
1139
1140          tr_free (s->pieces);
1141        }
1142
1143      s->pieces = pieces;
1144      s->pieceCount = pieceCount;
1145
1146      pieceListSort (s, PIECES_SORTED_BY_WEIGHT);
1147
1148      /* cleanup */
1149      tr_free (pool);
1150    }
1151}
1152
1153static void
1154pieceListRemovePiece (tr_swarm * s, tr_piece_index_t piece)
1155{
1156  struct weighted_piece * p;
1157
1158  if ((p = pieceListLookup (s, piece)))
1159    {
1160      const int pos = p - s->pieces;
1161
1162      tr_removeElementFromArray (s->pieces,
1163                                 pos,
1164                                 sizeof (struct weighted_piece),
1165                                 s->pieceCount--);
1166
1167      if (s->pieceCount == 0)
1168        {
1169          tr_free (s->pieces);
1170          s->pieces = NULL;
1171        }
1172    }
1173}
1174
1175static void
1176pieceListResortPiece (tr_swarm * s, struct weighted_piece * p)
1177{
1178  int pos;
1179  bool isSorted = true;
1180
1181  if (p == NULL)
1182    return;
1183
1184  /* is the torrent already sorted? */
1185  pos = p - s->pieces;
1186  setComparePieceByWeightTorrent (s);
1187  if (isSorted && (pos > 0) && (comparePieceByWeight (p-1, p) > 0))
1188    isSorted = false;
1189  if (isSorted && (pos < s->pieceCount - 1) && (comparePieceByWeight (p, p+1) > 0))
1190    isSorted = false;
1191
1192  if (s->pieceSortState != PIECES_SORTED_BY_WEIGHT)
1193    {
1194      pieceListSort (s, PIECES_SORTED_BY_WEIGHT);
1195      isSorted = true;
1196    }
1197
1198  /* if it's not sorted, move it around */
1199  if (!isSorted)
1200    {
1201      bool exact;
1202      const struct weighted_piece tmp = *p;
1203
1204      tr_removeElementFromArray (s->pieces,
1205                                 pos,
1206                                 sizeof (struct weighted_piece),
1207                                 s->pieceCount--);
1208
1209      pos = tr_lowerBound (&tmp, s->pieces, s->pieceCount,
1210                           sizeof (struct weighted_piece),
1211                           comparePieceByWeight, &exact);
1212
1213      memmove (&s->pieces[pos + 1],
1214               &s->pieces[pos],
1215               sizeof (struct weighted_piece) * (s->pieceCount++ - pos));
1216
1217      s->pieces[pos] = tmp;
1218    }
1219
1220  assertWeightedPiecesAreSorted (s);
1221}
1222
1223static void
1224pieceListRemoveRequest (tr_swarm * s, tr_block_index_t block)
1225{
1226  struct weighted_piece * p;
1227  const tr_piece_index_t index = tr_torBlockPiece (s->tor, block);
1228
1229  if (((p = pieceListLookup (s, index))) && (p->requestCount > 0))
1230    {
1231      --p->requestCount;
1232      pieceListResortPiece (s, p);
1233    }
1234}
1235
1236
1237/****
1238*****
1239*****  Replication count (for rarest first policy)
1240*****
1241****/
1242
1243/**
1244 * Increase the replication count of this piece and sort it if the
1245 * piece list is already sorted
1246 */
1247static void
1248tr_incrReplicationOfPiece (tr_swarm * s, const size_t index)
1249{
1250  assert (replicationExists (s));
1251  assert (s->pieceReplicationSize == s->tor->info.pieceCount);
1252
1253  /* One more replication of this piece is present in the swarm */
1254  ++s->pieceReplication[index];
1255
1256  /* we only resort the piece if the list is already sorted */
1257  if (s->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1258    pieceListResortPiece (s, pieceListLookup (s, index));
1259}
1260
1261/**
1262 * Increases the replication count of pieces present in the bitfield
1263 */
1264static void
1265tr_incrReplicationFromBitfield (tr_swarm * s, const tr_bitfield * b)
1266{
1267  size_t i;
1268  uint16_t * rep = s->pieceReplication;
1269  const size_t n = s->tor->info.pieceCount;
1270
1271  assert (replicationExists (s));
1272
1273  for (i=0; i<n; ++i)
1274    if (tr_bitfieldHas (b, i))
1275      ++rep[i];
1276
1277  if (s->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1278    invalidatePieceSorting (s);
1279}
1280
1281/**
1282 * Increase the replication count of every piece
1283 */
1284static void
1285tr_incrReplication (tr_swarm * s)
1286{
1287  int i;
1288  const int n = s->pieceReplicationSize;
1289
1290  assert (replicationExists (s));
1291  assert (s->pieceReplicationSize == s->tor->info.pieceCount);
1292
1293  for (i=0; i<n; ++i)
1294    ++s->pieceReplication[i];
1295}
1296
1297/**
1298 * Decrease the replication count of pieces present in the bitset.
1299 */
1300static void
1301tr_decrReplicationFromBitfield (tr_swarm * s, const tr_bitfield * b)
1302{
1303  int i;
1304  const int n = s->pieceReplicationSize;
1305
1306  assert (replicationExists (s));
1307  assert (s->pieceReplicationSize == s->tor->info.pieceCount);
1308
1309  if (tr_bitfieldHasAll (b))
1310    {
1311      for (i=0; i<n; ++i)
1312        --s->pieceReplication[i];
1313    }
1314  else if (!tr_bitfieldHasNone (b))
1315    {
1316      for (i=0; i<n; ++i)
1317        if (tr_bitfieldHas (b, i))
1318          --s->pieceReplication[i];
1319
1320      if (s->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1321        invalidatePieceSorting (s);
1322    }
1323}
1324
1325/**
1326***
1327**/
1328
1329void
1330tr_peerMgrRebuildRequests (tr_torrent * tor)
1331{
1332  assert (tr_isTorrent (tor));
1333
1334  pieceListRebuild (tor->swarm);
1335}
1336
1337void
1338tr_peerMgrGetNextRequests (tr_torrent           * tor,
1339                           tr_peer              * peer,
1340                           int                    numwant,
1341                           tr_block_index_t     * setme,
1342                           int                  * numgot,
1343                           bool                   get_intervals)
1344{
1345  int i;
1346  int got;
1347  tr_swarm * s;
1348  struct weighted_piece * pieces;
1349  const tr_bitfield * const have = &peer->have;
1350
1351  /* sanity clause */
1352  assert (tr_isTorrent (tor));
1353  assert (numwant > 0);
1354
1355  /* walk through the pieces and find blocks that should be requested */
1356  got = 0;
1357  s = tor->swarm;
1358
1359  /* prep the pieces list */
1360  if (s->pieces == NULL)
1361    pieceListRebuild (s);
1362
1363  if (s->pieceSortState != PIECES_SORTED_BY_WEIGHT)
1364    pieceListSort (s, PIECES_SORTED_BY_WEIGHT);
1365
1366  assertReplicationCountIsExact (s);
1367  assertWeightedPiecesAreSorted (s);
1368
1369  updateEndgame (s);
1370  pieces = s->pieces;
1371  for (i=0; i<s->pieceCount && got<numwant; ++i)
1372    {
1373      struct weighted_piece * p = pieces + i;
1374
1375      /* if the peer has this piece that we want... */
1376      if (tr_bitfieldHas (have, p->index))
1377        {
1378          tr_block_index_t b;
1379          tr_block_index_t first;
1380          tr_block_index_t last;
1381          tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1382
1383          tr_torGetPieceBlockRange (tor, p->index, &first, &last);
1384
1385          for (b=first; b<=last && (got<numwant || (get_intervals && setme[2*got-1] == b-1)); ++b)
1386            {
1387              int peerCount;
1388              tr_peer ** peers;
1389
1390              /* don't request blocks we've already got */
1391              if (tr_cpBlockIsComplete (&tor->completion, b))
1392                continue;
1393
1394              /* always add peer if this block has no peers yet */
1395              tr_ptrArrayClear (&peerArr);
1396              getBlockRequestPeers (s, b, &peerArr);
1397              peers = (tr_peer **) tr_ptrArrayPeek (&peerArr, &peerCount);
1398              if (peerCount != 0)
1399                {
1400                  /* don't make a second block request until the endgame */
1401                  if (!s->endgame)
1402                    continue;
1403
1404                  /* don't have more than two peers requesting this block */
1405                  if (peerCount > 1)
1406                    continue;
1407
1408                  /* don't send the same request to the same peer twice */
1409                  if (peer == peers[0])
1410                    continue;
1411
1412                  /* in the endgame allow an additional peer to download a
1413                     block but only if the peer seems to be handling requests
1414                     relatively fast */
1415                  if (peer->pendingReqsToPeer + numwant - got < s->endgame)
1416                    continue;
1417                }
1418
1419              /* update the caller's table */
1420              if (!get_intervals)
1421                {
1422                  setme[got++] = b;
1423                }
1424              /* if intervals are requested two array entries are necessarry:
1425                 one for the interval's starting block and one for its end block */
1426              else if (got && setme[2 * got - 1] == b - 1 && b != first)
1427                {
1428                  /* expand the last interval */
1429                  ++setme[2 * got - 1];
1430                }
1431              else
1432                {
1433                  /* begin a new interval */
1434                  setme[2 * got] = setme[2 * got + 1] = b;
1435                  ++got;
1436                }
1437
1438              /* update our own tables */
1439              requestListAdd (s, b, peer);
1440              ++p->requestCount;
1441            }
1442
1443          tr_ptrArrayDestruct (&peerArr, NULL);
1444        }
1445    }
1446
1447  /* In most cases we've just changed the weights of a small number of pieces.
1448   * So rather than qsort ()ing the entire array, it's faster to apply an
1449   * adaptive insertion sort algorithm. */
1450  if (got > 0)
1451    {
1452      /* not enough requests || last piece modified */
1453      if (i == s->pieceCount)
1454        --i;
1455
1456      setComparePieceByWeightTorrent (s);
1457      while (--i >= 0)
1458        {
1459          bool exact;
1460
1461          /* relative position! */
1462          const int newpos = tr_lowerBound (&s->pieces[i], &s->pieces[i + 1],
1463                                            s->pieceCount - (i + 1),
1464                                            sizeof (struct weighted_piece),
1465                                            comparePieceByWeight, &exact);
1466          if (newpos > 0)
1467            {
1468              const struct weighted_piece piece = s->pieces[i];
1469              memmove (&s->pieces[i],
1470                       &s->pieces[i + 1],
1471                       sizeof (struct weighted_piece) * (newpos));
1472              s->pieces[i + newpos] = piece;
1473            }
1474        }
1475    }
1476
1477  assertWeightedPiecesAreSorted (t);
1478  *numgot = got;
1479}
1480
1481bool
1482tr_peerMgrDidPeerRequest (const tr_torrent  * tor,
1483                          const tr_peer     * peer,
1484                          tr_block_index_t    block)
1485{
1486  return requestListLookup ((tr_swarm*)tor->swarm, block, peer) != NULL;
1487}
1488
1489/* cancel requests that are too old */
1490static void
1491refillUpkeep (int foo UNUSED, short bar UNUSED, void * vmgr)
1492{
1493    time_t now;
1494    time_t too_old;
1495    tr_torrent * tor;
1496    int cancel_buflen = 0;
1497    struct block_request * cancel = NULL;
1498    tr_peerMgr * mgr = vmgr;
1499    managerLock (mgr);
1500
1501    now = tr_time ();
1502    too_old = now - REQUEST_TTL_SECS;
1503
1504    /* alloc the temporary "cancel" buffer */
1505    tor = NULL;
1506    while ((tor = tr_torrentNext (mgr->session, tor)))
1507        cancel_buflen = MAX (cancel_buflen, tor->swarm->requestCount);
1508    if (cancel_buflen > 0)
1509        cancel = tr_new (struct block_request, cancel_buflen);
1510
1511    /* prune requests that are too old */
1512    tor = NULL;
1513    while ((tor = tr_torrentNext (mgr->session, tor)))
1514    {
1515        tr_swarm * s = tor->swarm;
1516        const int n = s->requestCount;
1517        if (n > 0)
1518        {
1519            int keepCount = 0;
1520            int cancelCount = 0;
1521            const struct block_request * it;
1522            const struct block_request * end;
1523
1524            for (it=s->requests, end=it+n; it!=end; ++it)
1525            {
1526                tr_peerMsgs * msgs = PEER_MSGS(it->peer);
1527
1528                if ((msgs !=NULL) && (it->sentAt <= too_old) && !tr_peerMsgsIsReadingBlock (msgs, it->block))
1529                    cancel[cancelCount++] = *it;
1530                else
1531                {
1532                    if (it != &s->requests[keepCount])
1533                        s->requests[keepCount] = *it;
1534                    keepCount++;
1535                }
1536            }
1537
1538            /* prune out the ones we aren't keeping */
1539            s->requestCount = keepCount;
1540
1541            /* send cancel messages for all the "cancel" ones */
1542            for (it=cancel, end=it+cancelCount; it!=end; ++it)
1543            {
1544              tr_peerMsgs * msgs = PEER_MSGS(it->peer);
1545
1546              if (msgs != NULL)
1547                {
1548                  tr_historyAdd (&it->peer->cancelsSentToPeer, now, 1);
1549                  tr_peerMsgsCancel (msgs, it->block);
1550                  decrementPendingReqCount (it);
1551                }
1552            }
1553
1554            /* decrement the pending request counts for the timed-out blocks */
1555            for (it=cancel, end=it+cancelCount; it!=end; ++it)
1556                pieceListRemoveRequest (s, it->block);
1557        }
1558    }
1559
1560    tr_free (cancel);
1561    tr_timerAddMsec (mgr->refillUpkeepTimer, REFILL_UPKEEP_PERIOD_MSEC);
1562    managerUnlock (mgr);
1563}
1564
1565static void
1566addStrike (tr_swarm * s, tr_peer * peer)
1567{
1568  tordbg (s, "increasing peer %s strike count to %d",
1569          tr_atomAddrStr (peer->atom), peer->strikes + 1);
1570
1571  if (++peer->strikes >= MAX_BAD_PIECES_PER_PEER)
1572    {
1573      struct peer_atom * atom = peer->atom;
1574      atom->flags2 |= MYFLAG_BANNED;
1575      peer->doPurge = 1;
1576      tordbg (s, "banning peer %s", tr_atomAddrStr (atom));
1577    }
1578}
1579
1580static void
1581peerSuggestedPiece (tr_swarm           * s UNUSED,
1582                    tr_peer            * peer UNUSED,
1583                    tr_piece_index_t     pieceIndex UNUSED,
1584                    int                  isFastAllowed UNUSED)
1585{
1586#if 0
1587    assert (t);
1588    assert (peer);
1589    assert (peer->msgs);
1590
1591    /* is this a valid piece? */
1592    if (pieceIndex >= t->tor->info.pieceCount)
1593        return;
1594
1595    /* don't ask for it if we've already got it */
1596    if (tr_cpPieceIsComplete (t->tor->completion, pieceIndex))
1597        return;
1598
1599    /* don't ask for it if they don't have it */
1600    if (!tr_bitfieldHas (peer->have, pieceIndex))
1601        return;
1602
1603    /* don't ask for it if we're choked and it's not fast */
1604    if (!isFastAllowed && peer->clientIsChoked)
1605        return;
1606
1607    /* request the blocks that we don't have in this piece */
1608    {
1609        tr_block_index_t b;
1610        tr_block_index_t first;
1611        tr_block_index_t last;
1612        const tr_torrent * tor = t->tor;
1613
1614        tr_torGetPieceBlockRange (t->tor, pieceIndex, &first, &last);
1615
1616        for (b=first; b<=last; ++b)
1617        {
1618            if (!tr_cpBlockIsComplete (tor->completion, b))
1619            {
1620                const uint32_t offset = getBlockOffsetInPiece (tor, b);
1621                const uint32_t length = tr_torBlockCountBytes (tor, b);
1622                tr_peerMsgsAddRequest (peer->msgs, pieceIndex, offset, length);
1623                incrementPieceRequests (t, pieceIndex);
1624            }
1625        }
1626    }
1627#endif
1628}
1629
1630static void
1631removeRequestFromTables (tr_swarm * s, tr_block_index_t block, const tr_peer * peer)
1632{
1633  requestListRemove (s, block, peer);
1634  pieceListRemoveRequest (s, block);
1635}
1636
1637/* peer choked us, or maybe it disconnected.
1638   either way we need to remove all its requests */
1639static void
1640peerDeclinedAllRequests (tr_swarm * s, const tr_peer * peer)
1641{
1642  int i, n;
1643  tr_block_index_t * blocks = tr_new (tr_block_index_t, s->requestCount);
1644
1645  for (i=n=0; i<s->requestCount; ++i)
1646    if (peer == s->requests[i].peer)
1647      blocks[n++] = s->requests[i].block;
1648
1649  for (i=0; i<n; ++i)
1650    removeRequestFromTables (s, blocks[i], peer);
1651
1652  tr_free (blocks);
1653}
1654
1655static void
1656cancelAllRequestsForBlock (tr_swarm          * s,
1657                           tr_block_index_t    block,
1658                           tr_peer           * no_notify)
1659{
1660  int i;
1661  int peerCount;
1662  tr_peer ** peers;
1663  tr_ptrArray peerArr;
1664
1665  peerArr = TR_PTR_ARRAY_INIT;
1666  getBlockRequestPeers (s, block, &peerArr);
1667  peers = (tr_peer **) tr_ptrArrayPeek (&peerArr, &peerCount);
1668  for (i=0; i<peerCount; ++i)
1669    {
1670      tr_peer * p = peers[i];
1671
1672      if ((p != no_notify) && (p != NULL))
1673        {
1674          tr_historyAdd (&p->cancelsSentToPeer, tr_time (), 1);
1675          tr_peerMsgsCancel (PEER_MSGS(p), block);
1676        }
1677
1678      removeRequestFromTables (s, block, p);
1679    }
1680
1681  tr_ptrArrayDestruct (&peerArr, NULL);
1682}
1683
1684void
1685tr_peerMgrPieceCompleted (tr_torrent * tor, tr_piece_index_t p)
1686{
1687  int i;
1688  bool pieceCameFromPeers = false;
1689  tr_swarm * const s = tor->swarm;
1690  const int n = tr_ptrArraySize (&s->peers);
1691
1692  /* walk through our peers */
1693  for (i=0; i<n; ++i)
1694    {
1695      tr_peer * peer = tr_ptrArrayNth (&s->peers, i);
1696
1697      /* notify the peer that we now have this piece */
1698      tr_peerMsgsHave (PEER_MSGS(peer), p);
1699
1700      if (!pieceCameFromPeers)
1701        pieceCameFromPeers = tr_bitfieldHas (&peer->blame, p);
1702    }
1703
1704  if (pieceCameFromPeers) /* webseed downloads don't belong in announce totals */
1705    tr_announcerAddBytes (tor, TR_ANN_DOWN, tr_torPieceCountBytes (tor, p));
1706
1707  /* bookkeeping */
1708  pieceListRemovePiece (s, p);
1709  s->needsCompletenessCheck = true;
1710}
1711
1712static void
1713peerCallbackFunc (tr_peer * peer, const tr_peer_event * e, void * vs)
1714{
1715  tr_swarm * s = vs;
1716
1717  swarmLock (s);
1718
1719  assert (peer != NULL);
1720
1721  switch (e->eventType)
1722    {
1723      case TR_PEER_PEER_GOT_PIECE_DATA:
1724        {
1725          const time_t now = tr_time ();
1726          tr_torrent * tor = s->tor;
1727
1728          tor->uploadedCur += e->length;
1729          tr_announcerAddBytes (tor, TR_ANN_UP, e->length);
1730          tr_torrentSetActivityDate (tor, now);
1731          tr_torrentSetDirty (tor);
1732          tr_statsAddUploaded (tor->session, e->length);
1733
1734          if (peer->atom != NULL)
1735            peer->atom->piece_data_time = now;
1736
1737          break;
1738        }
1739
1740      case TR_PEER_CLIENT_GOT_PIECE_DATA:
1741        {
1742          const time_t now = tr_time ();
1743          tr_torrent * tor = s->tor;
1744
1745          tor->downloadedCur += e->length;
1746          tr_torrentSetActivityDate (tor, now);
1747          tr_torrentSetDirty (tor);
1748
1749          tr_statsAddDownloaded (tor->session, e->length);
1750
1751          if (peer->atom != NULL)
1752            peer->atom->piece_data_time = now;
1753
1754          break;
1755        }
1756
1757      case TR_PEER_CLIENT_GOT_HAVE:
1758        if (replicationExists (s))
1759          {
1760            tr_incrReplicationOfPiece (s, e->pieceIndex);
1761            assertReplicationCountIsExact (s);
1762          }
1763        break;
1764
1765      case TR_PEER_CLIENT_GOT_HAVE_ALL:
1766        if (replicationExists (s))
1767          {
1768            tr_incrReplication (s);
1769            assertReplicationCountIsExact (s);
1770          }
1771        break;
1772
1773      case TR_PEER_CLIENT_GOT_HAVE_NONE:
1774        /* noop */
1775        break;
1776
1777      case TR_PEER_CLIENT_GOT_BITFIELD:
1778        assert (e->bitfield != NULL);
1779        if (replicationExists (s))
1780          {
1781            tr_incrReplicationFromBitfield (s, e->bitfield);
1782            assertReplicationCountIsExact (s);
1783          }
1784        break;
1785
1786      case TR_PEER_CLIENT_GOT_REJ:
1787        {
1788          tr_block_index_t b = _tr_block (s->tor, e->pieceIndex, e->offset);
1789          if (b < s->tor->blockCount)
1790            removeRequestFromTables (s, b, peer);
1791          else
1792            tordbg (s, "Peer %s sent an out-of-range reject message",
1793                    tr_atomAddrStr (peer->atom));
1794          break;
1795        }
1796
1797      case TR_PEER_CLIENT_GOT_CHOKE:
1798        peerDeclinedAllRequests (s, peer);
1799        break;
1800
1801      case TR_PEER_CLIENT_GOT_PORT:
1802        if (peer->atom)
1803          peer->atom->port = e->port;
1804        break;
1805
1806      case TR_PEER_CLIENT_GOT_SUGGEST:
1807        peerSuggestedPiece (s, peer, e->pieceIndex, false);
1808        break;
1809
1810      case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1811        peerSuggestedPiece (s, peer, e->pieceIndex, true);
1812        break;
1813
1814      case TR_PEER_CLIENT_GOT_BLOCK:
1815        {
1816          tr_torrent * tor = s->tor;
1817          const tr_piece_index_t p = e->pieceIndex;
1818          const tr_block_index_t block = _tr_block (tor, p, e->offset);
1819          cancelAllRequestsForBlock (s, block, peer);
1820          tr_historyAdd (&peer->blocksSentToClient, tr_time(), 1);
1821          pieceListResortPiece (s, pieceListLookup (s, p));
1822          tr_torrentGotBlock (tor, block);
1823          break;
1824        }
1825
1826      case TR_PEER_ERROR:
1827        if ((e->err == ERANGE) || (e->err == EMSGSIZE) || (e->err == ENOTCONN))
1828          {
1829            /* some protocol error from the peer */
1830            peer->doPurge = 1;
1831            tordbg (s, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1832                    tr_atomAddrStr (peer->atom));
1833          }
1834        else
1835          {
1836            tordbg (s, "unhandled error: %s", tr_strerror (e->err));
1837          }
1838        break;
1839
1840      default:
1841        assert (0);
1842    }
1843
1844  swarmUnlock (s);
1845}
1846
1847static int
1848getDefaultShelfLife (uint8_t from)
1849{
1850    /* in general, peers obtained from firsthand contact
1851     * are better than those from secondhand, etc etc */
1852    switch (from)
1853    {
1854        case TR_PEER_FROM_INCOMING : return 60 * 60 * 6;
1855        case TR_PEER_FROM_LTEP     : return 60 * 60 * 6;
1856        case TR_PEER_FROM_TRACKER  : return 60 * 60 * 3;
1857        case TR_PEER_FROM_DHT      : return 60 * 60 * 3;
1858        case TR_PEER_FROM_PEX      : return 60 * 60 * 2;
1859        case TR_PEER_FROM_RESUME   : return 60 * 60;
1860        case TR_PEER_FROM_LPD      : return 10 * 60;
1861        default                    : return 60 * 60;
1862    }
1863}
1864
1865static void
1866ensureAtomExists (tr_swarm          * s,
1867                  const tr_address  * addr,
1868                  const tr_port       port,
1869                  const uint8_t       flags,
1870                  const int8_t        seedProbability,
1871                  const uint8_t       from)
1872{
1873  struct peer_atom * a;
1874
1875  assert (tr_address_is_valid (addr));
1876  assert (from < TR_PEER_FROM__MAX);
1877
1878  a = getExistingAtom (s, addr);
1879
1880  if (a == NULL)
1881    {
1882      const int jitter = tr_cryptoWeakRandInt (60*10);
1883      a = tr_new0 (struct peer_atom, 1);
1884      a->addr = *addr;
1885      a->port = port;
1886      a->flags = flags;
1887      a->fromFirst = from;
1888      a->fromBest = from;
1889      a->shelf_date = tr_time () + getDefaultShelfLife (from) + jitter;
1890      a->blocklisted = -1;
1891      atomSetSeedProbability (a, seedProbability);
1892      tr_ptrArrayInsertSorted (&s->pool, a, compareAtomsByAddress);
1893
1894      tordbg (s, "got a new atom: %s", tr_atomAddrStr (a));
1895    }
1896  else
1897    {
1898      if (from < a->fromBest)
1899        a->fromBest = from;
1900
1901      if (a->seedProbability == -1)
1902        atomSetSeedProbability (a, seedProbability);
1903
1904      a->flags |= flags;
1905    }
1906}
1907
1908static int
1909getMaxPeerCount (const tr_torrent * tor)
1910{
1911  return tor->maxConnectedPeers;
1912}
1913
1914static int
1915getPeerCount (const tr_swarm * s)
1916{
1917  return tr_ptrArraySize (&s->peers);/* + tr_ptrArraySize (&t->outgoingHandshakes); */
1918}
1919
1920
1921static void
1922createBitTorrentPeer (tr_torrent       * tor,
1923                      struct tr_peerIo * io,
1924                      struct peer_atom * atom,
1925                      tr_quark           client)
1926{
1927  tr_peer * peer;
1928  tr_swarm * swarm;
1929
1930  assert (atom != NULL);
1931  assert (tr_isTorrent (tor));
1932  assert (tor->swarm != NULL);
1933
1934  swarm = tor->swarm;
1935
1936  peer = (tr_peer*) tr_peerMsgsNew (tor, io, peerCallbackFunc, swarm);
1937  peer->atom = atom;
1938  peer->client = client;
1939  atom->peer = peer;
1940
1941  tr_ptrArrayInsertSorted (&swarm->peers, peer, peerCompare);
1942  ++tor->peerCount;
1943  ++tor->peerFromCount[atom->fromFirst];
1944
1945  assert (tor->peerCount == tr_ptrArraySize (&swarm->peers));
1946  assert (tor->peerFromCount[atom->fromFirst] <= tor->peerCount);
1947}
1948
1949
1950/* FIXME: this is kind of a mess. */
1951static bool
1952myHandshakeDoneCB (tr_handshake  * handshake,
1953                   tr_peerIo     * io,
1954                   bool            readAnythingFromPeer,
1955                   bool            isConnected,
1956                   const uint8_t * peer_id,
1957                   void          * vmanager)
1958{
1959  bool ok = isConnected;
1960  bool success = false;
1961  tr_port port;
1962  const tr_address * addr;
1963  tr_peerMgr * manager = vmanager;
1964  tr_swarm  * s;
1965  tr_handshake * ours;
1966
1967  assert (io);
1968  assert (tr_isBool (ok));
1969
1970  s = tr_peerIoHasTorrentHash (io)
1971    ? getExistingSwarm (manager, tr_peerIoGetTorrentHash (io))
1972    : NULL;
1973
1974  if (tr_peerIoIsIncoming (io))
1975    ours = tr_ptrArrayRemoveSorted (&manager->incomingHandshakes,
1976                                    handshake, handshakeCompare);
1977  else if (s)
1978    ours = tr_ptrArrayRemoveSorted (&s->outgoingHandshakes,
1979                                    handshake, handshakeCompare);
1980  else
1981    ours = handshake;
1982
1983  assert (ours);
1984  assert (ours == handshake);
1985
1986  if (s)
1987    swarmLock (s);
1988
1989  addr = tr_peerIoGetAddress (io, &port);
1990
1991  if (!ok || !s || !s->isRunning)
1992    {
1993      if (s)
1994        {
1995          struct peer_atom * atom = getExistingAtom (s, addr);
1996          if (atom)
1997            {
1998              ++atom->numFails;
1999
2000              if (!readAnythingFromPeer)
2001                {
2002                  tordbg (s, "marking peer %s as unreachable... numFails is %d", tr_atomAddrStr (atom), (int)atom->numFails);
2003                  atom->flags2 |= MYFLAG_UNREACHABLE;
2004                }
2005            }
2006        }
2007    }
2008  else /* looking good */
2009    {
2010      struct peer_atom * atom;
2011
2012      ensureAtomExists (s, addr, port, 0, -1, TR_PEER_FROM_INCOMING);
2013      atom = getExistingAtom (s, addr);
2014      atom->time = tr_time ();
2015      atom->piece_data_time = 0;
2016      atom->lastConnectionAt = tr_time ();
2017
2018      if (!tr_peerIoIsIncoming (io))
2019        {
2020          atom->flags |= ADDED_F_CONNECTABLE;
2021          atom->flags2 &= ~MYFLAG_UNREACHABLE;
2022        }
2023
2024      /* In principle, this flag specifies whether the peer groks uTP,
2025         not whether it's currently connected over uTP. */
2026      if (io->utp_socket)
2027        atom->flags |= ADDED_F_UTP_FLAGS;
2028
2029      if (atom->flags2 & MYFLAG_BANNED)
2030        {
2031          tordbg (s, "banned peer %s tried to reconnect",
2032                  tr_atomAddrStr (atom));
2033        }
2034      else if (tr_peerIoIsIncoming (io) && (getPeerCount (s) >= getMaxPeerCount (s->tor)))
2035        {
2036        }
2037      else
2038        {
2039          tr_peer * peer = atom->peer;
2040
2041          if (peer) /* we already have this peer */
2042            {
2043            }
2044          else
2045            {
2046              tr_quark client;
2047              tr_peerIo * io;
2048              char buf[128];
2049
2050              if (peer_id != NULL)
2051                client = tr_quark_new (tr_clientForId (buf, sizeof (buf), peer_id), -1);
2052              else
2053                client = TR_KEY_NONE;
2054
2055              io = tr_handshakeStealIO (handshake); /* this steals its refcount too, which is
2056                                                       balanced by our unref in peerDelete ()  */
2057              tr_peerIoSetParent (io, &s->tor->bandwidth);
2058              createBitTorrentPeer (s->tor, io, atom, client);
2059
2060              success = true;
2061            }
2062        }
2063    }
2064
2065  if (s != NULL)
2066    swarmUnlock (s);
2067
2068  return success;
2069}
2070
2071void
2072tr_peerMgrAddIncoming (tr_peerMgr       * manager,
2073                       tr_address       * addr,
2074                       tr_port            port,
2075                       int                socket,
2076                       struct UTPSocket * utp_socket)
2077{
2078  tr_session * session;
2079
2080  managerLock (manager);
2081
2082  assert (tr_isSession (manager->session));
2083  session = manager->session;
2084
2085  if (tr_sessionIsAddressBlocked (session, addr))
2086    {
2087      tr_logAddDebug ("Banned IP address \"%s\" tried to connect to us", tr_address_to_string (addr));
2088      if (socket >= 0)
2089        tr_netClose (session, socket);
2090      else
2091        UTP_Close (utp_socket);
2092    }
2093  else if (getExistingHandshake (&manager->incomingHandshakes, addr))
2094    {
2095      if (socket >= 0)
2096        tr_netClose (session, socket);
2097      else
2098        UTP_Close (utp_socket);
2099    }
2100  else /* we don't have a connection to them yet... */
2101    {
2102      tr_peerIo *    io;
2103      tr_handshake * handshake;
2104
2105      io = tr_peerIoNewIncoming (session, &session->bandwidth, addr, port, socket, utp_socket);
2106
2107      handshake = tr_handshakeNew (io,
2108                                   session->encryptionMode,
2109                                   myHandshakeDoneCB,
2110                                   manager);
2111
2112      tr_peerIoUnref (io); /* balanced by the implicit ref in tr_peerIoNewIncoming () */
2113
2114      tr_ptrArrayInsertSorted (&manager->incomingHandshakes, handshake,
2115                               handshakeCompare);
2116    }
2117
2118  managerUnlock (manager);
2119}
2120
2121void
2122tr_peerMgrAddPex (tr_torrent * tor, uint8_t from,
2123                  const tr_pex * pex, int8_t seedProbability)
2124{
2125  if (tr_isPex (pex)) /* safeguard against corrupt data */
2126    {
2127      tr_swarm * s = tor->swarm;
2128      managerLock (s->manager);
2129
2130      if (!tr_sessionIsAddressBlocked (s->manager->session, &pex->addr))
2131        if (tr_address_is_valid_for_peers (&pex->addr, pex->port))
2132          ensureAtomExists (s, &pex->addr, pex->port, pex->flags, seedProbability, from);
2133
2134      managerUnlock (s->manager);
2135    }
2136}
2137
2138void
2139tr_peerMgrMarkAllAsSeeds (tr_torrent * tor)
2140{
2141  tr_swarm * s = tor->swarm;
2142  const int n = tr_ptrArraySize (&s->pool);
2143  struct peer_atom ** it = (struct peer_atom**) tr_ptrArrayBase (&s->pool);
2144  struct peer_atom ** end = it + n;
2145
2146  while (it != end)
2147    atomSetSeed (s, *it++);
2148}
2149
2150tr_pex *
2151tr_peerMgrCompactToPex (const void    * compact,
2152                        size_t          compactLen,
2153                        const uint8_t * added_f,
2154                        size_t          added_f_len,
2155                        size_t        * pexCount)
2156{
2157  size_t i;
2158  size_t n = compactLen / 6;
2159  const uint8_t * walk = compact;
2160  tr_pex * pex = tr_new0 (tr_pex, n);
2161
2162  for (i=0; i<n; ++i)
2163    {
2164      pex[i].addr.type = TR_AF_INET;
2165      memcpy (&pex[i].addr.addr, walk, 4); walk += 4;
2166      memcpy (&pex[i].port, walk, 2); walk += 2;
2167      if (added_f && (n == added_f_len))
2168        pex[i].flags = added_f[i];
2169    }
2170
2171  *pexCount = n;
2172  return pex;
2173}
2174
2175tr_pex *
2176tr_peerMgrCompact6ToPex (const void    * compact,
2177                         size_t          compactLen,
2178                         const uint8_t * added_f,
2179                         size_t          added_f_len,
2180                         size_t        * pexCount)
2181{
2182  size_t i;
2183  size_t n = compactLen / 18;
2184  const uint8_t * walk = compact;
2185  tr_pex * pex = tr_new0 (tr_pex, n);
2186
2187  for (i=0; i<n; ++i)
2188    {
2189      pex[i].addr.type = TR_AF_INET6;
2190      memcpy (&pex[i].addr.addr.addr6.s6_addr, walk, 16); walk += 16;
2191      memcpy (&pex[i].port, walk, 2); walk += 2;
2192      if (added_f && (n == added_f_len))
2193        pex[i].flags = added_f[i];
2194    }
2195
2196  *pexCount = n;
2197  return pex;
2198}
2199
2200tr_pex *
2201tr_peerMgrArrayToPex (const void  * array,
2202                      size_t        arrayLen,
2203                      size_t      * pexCount)
2204{
2205  size_t i;
2206  size_t n = arrayLen / (sizeof (tr_address) + 2);
2207  const uint8_t * walk = array;
2208  tr_pex * pex = tr_new0 (tr_pex, n);
2209
2210  for (i=0 ; i<n ; ++i)
2211    {
2212      memcpy (&pex[i].addr, walk, sizeof (tr_address));
2213      memcpy (&pex[i].port, walk + sizeof (tr_address), 2);
2214      pex[i].flags = 0x00;
2215      walk += sizeof (tr_address) + 2;
2216    }
2217
2218  *pexCount = n;
2219  return pex;
2220}
2221
2222/**
2223***
2224**/
2225
2226void
2227tr_peerMgrGotBadPiece (tr_torrent * tor, tr_piece_index_t pieceIndex)
2228{
2229  int i;
2230  int n;
2231  tr_swarm * s = tor->swarm;
2232  const uint32_t byteCount = tr_torPieceCountBytes (tor, pieceIndex);
2233
2234  for (i=0, n=tr_ptrArraySize(&s->peers); i!=n; ++i)
2235    {
2236      tr_peer * peer = tr_ptrArrayNth (&s->peers, i);
2237
2238      if (tr_bitfieldHas (&peer->blame, pieceIndex))
2239        {
2240          tordbg (s, "peer %s contributed to corrupt piece (%d); now has %d strikes",
2241                  tr_atomAddrStr(peer->atom), pieceIndex, (int)peer->strikes + 1);
2242          addStrike (s, peer);
2243        }
2244    }
2245
2246
2247  tr_announcerAddBytes (tor, TR_ANN_CORRUPT, byteCount);
2248}
2249
2250int
2251tr_pexCompare (const void * va, const void * vb)
2252{
2253  int i;
2254  const tr_pex * a = va;
2255  const tr_pex * b = vb;
2256
2257  assert (tr_isPex (a));
2258  assert (tr_isPex (b));
2259
2260  if ((i = tr_address_compare (&a->addr, &b->addr)))
2261    return i;
2262
2263  if (a->port != b->port)
2264    return a->port < b->port ? -1 : 1;
2265
2266  return 0;
2267}
2268
2269/* better goes first */
2270static int
2271compareAtomsByUsefulness (const void * va, const void *vb)
2272{
2273  const struct peer_atom * a = * (const struct peer_atom**) va;
2274  const struct peer_atom * b = * (const struct peer_atom**) vb;
2275
2276  assert (tr_isAtom (a));
2277  assert (tr_isAtom (b));
2278
2279  if (a->piece_data_time != b->piece_data_time)
2280    return a->piece_data_time > b->piece_data_time ? -1 : 1;
2281  if (a->fromBest != b->fromBest)
2282    return a->fromBest < b->fromBest ? -1 : 1;
2283  if (a->numFails != b->numFails)
2284    return a->numFails < b->numFails ? -1 : 1;
2285
2286  return 0;
2287}
2288
2289static bool
2290isAtomInteresting (const tr_torrent * tor, struct peer_atom * atom)
2291{
2292  if (tr_torrentIsSeed (tor) && atomIsSeed (atom))
2293    return false;
2294
2295  if (peerIsInUse (tor->swarm, atom))
2296    return true;
2297
2298  if (isAtomBlocklisted (tor->session, atom))
2299    return false;
2300
2301  if (atom->flags2 & MYFLAG_BANNED)
2302    return false;
2303
2304  return true;
2305}
2306
2307int
2308tr_peerMgrGetPeers (tr_torrent   * tor,
2309                    tr_pex      ** setme_pex,
2310                    uint8_t        af,
2311                    uint8_t        list_mode,
2312                    int            maxCount)
2313{
2314  int i;
2315  int n;
2316  int count = 0;
2317  int atomCount = 0;
2318  const tr_swarm * s = tor->swarm;
2319  struct peer_atom ** atoms = NULL;
2320  tr_pex * pex;
2321  tr_pex * walk;
2322
2323  assert (tr_isTorrent (tor));
2324  assert (setme_pex != NULL);
2325  assert (af==TR_AF_INET || af==TR_AF_INET6);
2326  assert (list_mode==TR_PEERS_CONNECTED || list_mode==TR_PEERS_INTERESTING);
2327
2328  managerLock (s->manager);
2329
2330  /**
2331  ***  build a list of atoms
2332  **/
2333
2334  if (list_mode == TR_PEERS_CONNECTED) /* connected peers only */
2335    {
2336      int i;
2337      const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase (&s->peers);
2338      atomCount = tr_ptrArraySize (&s->peers);
2339      atoms = tr_new (struct peer_atom *, atomCount);
2340      for (i=0; i<atomCount; ++i)
2341        atoms[i] = peers[i]->atom;
2342    }
2343  else /* TR_PEERS_INTERESTING */
2344    {
2345      int i;
2346      struct peer_atom ** atomBase = (struct peer_atom**) tr_ptrArrayBase (&s->pool);
2347      n = tr_ptrArraySize (&s->pool);
2348      atoms = tr_new (struct peer_atom *, n);
2349      for (i=0; i<n; ++i)
2350        if (isAtomInteresting (tor, atomBase[i]))
2351          atoms[atomCount++] = atomBase[i];
2352    }
2353
2354  qsort (atoms, atomCount, sizeof (struct peer_atom *), compareAtomsByUsefulness);
2355
2356  /**
2357  ***  add the first N of them into our return list
2358  **/
2359
2360  n = MIN (atomCount, maxCount);
2361  pex = walk = tr_new0 (tr_pex, n);
2362
2363  for (i=0; i<atomCount && count<n; ++i)
2364    {
2365      const struct peer_atom * atom = atoms[i];
2366      if (atom->addr.type == af)
2367        {
2368          assert (tr_address_is_valid (&atom->addr));
2369          walk->addr = atom->addr;
2370          walk->port = atom->port;
2371          walk->flags = atom->flags;
2372          ++count;
2373          ++walk;
2374        }
2375    }
2376
2377  qsort (pex, count, sizeof (tr_pex), tr_pexCompare);
2378
2379  assert ((walk - pex) == count);
2380  *setme_pex = pex;
2381
2382  /* cleanup */
2383  tr_free (atoms);
2384  managerUnlock (s->manager);
2385  return count;
2386}
2387
2388static void atomPulse    (int, short, void *);
2389static void bandwidthPulse (int, short, void *);
2390static void rechokePulse (int, short, void *);
2391static void reconnectPulse (int, short, void *);
2392
2393static struct event *
2394createTimer (tr_session * session, int msec, void (*callback)(int, short, void *), void * cbdata)
2395{
2396  struct event * timer = evtimer_new (session->event_base, callback, cbdata);
2397  tr_timerAddMsec (timer, msec);
2398  return timer;
2399}
2400
2401static void
2402ensureMgrTimersExist (struct tr_peerMgr * m)
2403{
2404  if (m->atomTimer == NULL)
2405    m->atomTimer = createTimer (m->session, ATOM_PERIOD_MSEC, atomPulse, m);
2406
2407  if (m->bandwidthTimer == NULL)
2408    m->bandwidthTimer = createTimer (m->session, BANDWIDTH_PERIOD_MSEC, bandwidthPulse, m);
2409
2410  if (m->rechokeTimer == NULL)
2411    m->rechokeTimer = createTimer (m->session, RECHOKE_PERIOD_MSEC, rechokePulse, m);
2412
2413  if (m->refillUpkeepTimer == NULL)
2414    m->refillUpkeepTimer = createTimer (m->session, REFILL_UPKEEP_PERIOD_MSEC, refillUpkeep, m);
2415}
2416
2417void
2418tr_peerMgrStartTorrent (tr_torrent * tor)
2419{
2420  tr_swarm * s;
2421
2422  assert (tr_isTorrent (tor));
2423  assert (tr_torrentIsLocked (tor));
2424
2425  s = tor->swarm;
2426  ensureMgrTimersExist (s->manager);
2427
2428  s->isRunning = true;
2429  s->maxPeers = tor->maxConnectedPeers;
2430  s->pieceSortState = PIECES_UNSORTED;
2431
2432  rechokePulse (0, 0, s->manager);
2433}
2434
2435static void removeAllPeers (tr_swarm *);
2436
2437static void
2438stopSwarm (tr_swarm * swarm)
2439{
2440  swarm->isRunning = false;
2441
2442  replicationFree (swarm);
2443  invalidatePieceSorting (swarm);
2444
2445  removeAllPeers (swarm);
2446
2447  /* disconnect the handshakes. handshakeAbort calls handshakeDoneCB (),
2448   * which removes the handshake from t->outgoingHandshakes... */
2449  while (!tr_ptrArrayEmpty (&swarm->outgoingHandshakes))
2450    tr_handshakeAbort (tr_ptrArrayNth (&swarm->outgoingHandshakes, 0));
2451}
2452
2453void
2454tr_peerMgrStopTorrent (tr_torrent * tor)
2455{
2456  assert (tr_isTorrent (tor));
2457  assert (tr_torrentIsLocked (tor));
2458
2459  stopSwarm (tor->swarm);
2460}
2461
2462void
2463tr_peerMgrAddTorrent (tr_peerMgr * manager, tr_torrent * tor)
2464{
2465  assert (tr_isTorrent (tor));
2466  assert (tr_torrentIsLocked (tor));
2467  assert (tor->swarm == NULL);
2468
2469  tor->swarm = swarmNew (manager, tor);
2470}
2471
2472void
2473tr_peerMgrRemoveTorrent (tr_torrent * tor)
2474{
2475  assert (tr_isTorrent (tor));
2476  assert (tr_torrentIsLocked (tor));
2477
2478  stopSwarm (tor->swarm);
2479  swarmFree (tor->swarm);
2480}
2481
2482void
2483tr_peerUpdateProgress (tr_torrent * tor, tr_peer * peer)
2484{
2485  const tr_bitfield * have = &peer->have;
2486
2487  if (tr_bitfieldHasAll (have))
2488    {
2489      peer->progress = 1.0;
2490    }
2491  else if (tr_bitfieldHasNone (have))
2492    {
2493      peer->progress = 0.0;
2494    }
2495  else
2496    {
2497      const float true_count = tr_bitfieldCountTrueBits (have);
2498
2499      if (tr_torrentHasMetadata (tor))
2500        {
2501          peer->progress = true_count / tor->info.pieceCount;
2502        }
2503      else /* without pieceCount, this result is only a best guess... */
2504        {
2505          peer->progress = true_count / (have->bit_count + 1);
2506        }
2507    }
2508
2509  /* clamp the progress range */
2510  if (peer->progress < 0.0)
2511    peer->progress = 0.0;
2512  if (peer->progress > 1.0)
2513    peer->progress = 1.0;
2514
2515  if (peer->atom && (peer->progress >= 1.0))
2516    atomSetSeed (tor->swarm, peer->atom);
2517}
2518
2519void
2520tr_peerMgrOnTorrentGotMetainfo (tr_torrent * tor)
2521{
2522  int i;
2523  int peerCount;
2524  tr_peer ** peers;
2525
2526  /* the webseed list may have changed... */
2527  rebuildWebseedArray (tor->swarm, tor);
2528
2529  /* some peer_msgs' progress fields may not be accurate if we
2530     didn't have the metadata before now... so refresh them all... */
2531  peerCount = tr_ptrArraySize (&tor->swarm->peers);
2532  peers = (tr_peer**) tr_ptrArrayBase (&tor->swarm->peers);
2533  for (i=0; i<peerCount; ++i)
2534    tr_peerUpdateProgress (tor, peers[i]);
2535
2536  /* update the bittorrent peers' willingnes... */
2537  for (i=0; i<peerCount; ++i)
2538    {
2539      tr_peerMsgsUpdateActive (tr_peerMsgsCast(peers[i]), TR_UP);
2540      tr_peerMsgsUpdateActive (tr_peerMsgsCast(peers[i]), TR_DOWN);
2541    }
2542}
2543
2544void
2545tr_peerMgrTorrentAvailability (const tr_torrent  * tor,
2546                               int8_t            * tab,
2547                               unsigned int        tabCount)
2548{
2549  assert (tr_isTorrent (tor));
2550  assert (tab != NULL);
2551  assert (tabCount > 0);
2552
2553  memset (tab, 0, tabCount);
2554
2555  if (tr_torrentHasMetadata (tor))
2556    {
2557      tr_piece_index_t i;
2558      const int peerCount = tr_ptrArraySize (&tor->swarm->peers);
2559      const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase (&tor->swarm->peers);
2560      const float interval = tor->info.pieceCount / (float)tabCount;
2561      const bool isSeed = tr_cpGetStatus (&tor->completion) == TR_SEED;
2562
2563      for (i=0; i<tabCount; ++i)
2564        {
2565          const int piece = i * interval;
2566
2567          if (isSeed || tr_cpPieceIsComplete (&tor->completion, piece))
2568            {
2569              tab[i] = -1;
2570            }
2571          else if (peerCount)
2572            {
2573              int j;
2574              for (j=0; j<peerCount; ++j)
2575                if (tr_bitfieldHas (&peers[j]->have, piece))
2576                  ++tab[i];
2577            }
2578        }
2579    }
2580}
2581
2582static bool
2583peerIsSeed (const tr_peer * peer)
2584{
2585  if (peer->progress >= 1.0)
2586    return true;
2587
2588  if (peer->atom && atomIsSeed (peer->atom))
2589    return true;
2590
2591  return false;
2592}
2593
2594/* count how many bytes we want that connected peers have */
2595uint64_t
2596tr_peerMgrGetDesiredAvailable (const tr_torrent * tor)
2597{
2598  size_t i;
2599  size_t n;
2600  uint64_t desiredAvailable;
2601  const tr_swarm * s = tor->swarm;
2602
2603  /* common shortcuts... */
2604
2605  if (tr_torrentIsSeed (s->tor))
2606    return 0;
2607
2608  if (!tr_torrentHasMetadata (tor))
2609    return 0;
2610
2611  n = tr_ptrArraySize (&s->peers);
2612  if (n == 0)
2613    {
2614      return 0;
2615    }
2616  else
2617    {
2618      const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase (&s->peers);
2619      for (i=0; i<n; ++i)
2620        if (peers[i]->atom && atomIsSeed (peers[i]->atom))
2621          return tr_cpLeftUntilDone (&tor->completion);
2622    }
2623
2624  if (!s->pieceReplication || !s->pieceReplicationSize)
2625    return 0;
2626
2627  /* do it the hard way */
2628
2629  desiredAvailable = 0;
2630  for (i=0, n=MIN (tor->info.pieceCount, s->pieceReplicationSize); i<n; ++i)
2631    if (!tor->info.pieces[i].dnd && (s->pieceReplication[i] > 0))
2632      desiredAvailable += tr_cpMissingBytesInPiece (&s->tor->completion, i);
2633
2634  assert (desiredAvailable <= tor->info.totalSize);
2635  return desiredAvailable;
2636}
2637
2638double*
2639tr_peerMgrWebSpeeds_KBps (const tr_torrent * tor)
2640{
2641  unsigned int i;
2642  tr_swarm * s;
2643  unsigned int n;
2644  double * ret = NULL;
2645  const uint64_t now = tr_time_msec ();
2646
2647  assert (tr_isTorrent (tor));
2648
2649  s = tor->swarm;
2650  n = tr_ptrArraySize (&s->webseeds);
2651  ret = tr_new0 (double, n);
2652
2653  assert (s->manager != NULL);
2654  assert (n == tor->info.webseedCount);
2655
2656  for (i=0; i<n; ++i)
2657    {
2658      unsigned int Bps = 0;
2659      if (tr_peerIsTransferringPieces (tr_ptrArrayNth(&s->webseeds,i), now, TR_DOWN, &Bps))
2660        ret[i] = Bps / (double)tr_speed_K;
2661      else
2662        ret[i] = -1.0;
2663    }
2664
2665  return ret;
2666}
2667
2668struct tr_peer_stat *
2669tr_peerMgrPeerStats (const tr_torrent * tor, int * setmeCount)
2670{
2671  int i;
2672  int size = 0;
2673  tr_peer_stat * ret;
2674  const tr_swarm * s;
2675  tr_peer ** peers;
2676  const time_t now = tr_time ();
2677  const uint64_t now_msec = tr_time_msec ();
2678
2679  assert (tr_isTorrent (tor));
2680  assert (tor->swarm->manager != NULL);
2681
2682  s = tor->swarm;
2683  peers = (tr_peer**) tr_ptrArrayBase (&s->peers);
2684  size = tr_ptrArraySize (&s->peers);
2685  ret = tr_new0 (tr_peer_stat, size);
2686
2687  for (i=0; i<size; ++i)
2688    {
2689      char *                   pch;
2690      tr_peer *                peer = peers[i];
2691      tr_peerMsgs *            msgs = PEER_MSGS (peer);
2692      const struct peer_atom * atom = peer->atom;
2693      tr_peer_stat *           stat = ret + i;
2694
2695      tr_address_to_string_with_buf (&atom->addr, stat->addr, sizeof (stat->addr));
2696      tr_strlcpy (stat->client, tr_quark_get_string(peer->client,NULL), sizeof (stat->client));
2697      stat->port                = ntohs (peer->atom->port);
2698      stat->from                = atom->fromFirst;
2699      stat->progress            = peer->progress;
2700      stat->isUTP               = tr_peerMsgsIsUtpConnection (msgs);
2701      stat->isEncrypted         = tr_peerMsgsIsEncrypted (msgs);
2702      stat->rateToPeer_KBps     = toSpeedKBps (tr_peerGetPieceSpeed_Bps (peer, now_msec, TR_CLIENT_TO_PEER));
2703      stat->rateToClient_KBps   = toSpeedKBps (tr_peerGetPieceSpeed_Bps (peer, now_msec, TR_PEER_TO_CLIENT));
2704      stat->peerIsChoked        = tr_peerMsgsIsPeerChoked (msgs);
2705      stat->peerIsInterested    = tr_peerMsgsIsPeerInterested (msgs);
2706      stat->clientIsChoked      = tr_peerMsgsIsClientChoked (msgs);
2707      stat->clientIsInterested  = tr_peerMsgsIsClientInterested (msgs);
2708      stat->isIncoming          = tr_peerMsgsIsIncomingConnection (msgs);
2709      stat->isDownloadingFrom   = tr_peerMsgsIsActive (msgs, TR_PEER_TO_CLIENT);
2710      stat->isUploadingTo       = tr_peerMsgsIsActive (msgs, TR_CLIENT_TO_PEER);
2711      stat->isSeed              = peerIsSeed (peer);
2712
2713      stat->blocksToPeer        = tr_historyGet (&peer->blocksSentToPeer,    now, CANCEL_HISTORY_SEC);
2714      stat->blocksToClient      = tr_historyGet (&peer->blocksSentToClient,  now, CANCEL_HISTORY_SEC);
2715      stat->cancelsToPeer       = tr_historyGet (&peer->cancelsSentToPeer,   now, CANCEL_HISTORY_SEC);
2716      stat->cancelsToClient     = tr_historyGet (&peer->cancelsSentToClient, now, CANCEL_HISTORY_SEC);
2717
2718      stat->pendingReqsToPeer   = peer->pendingReqsToPeer;
2719      stat->pendingReqsToClient = peer->pendingReqsToClient;
2720
2721      pch = stat->flagStr;
2722      if (stat->isUTP) *pch++ = 'T';
2723      if (s->optimistic == msgs) *pch++ = 'O';
2724      if (stat->isDownloadingFrom) *pch++ = 'D';
2725      else if (stat->clientIsInterested) *pch++ = 'd';
2726      if (stat->isUploadingTo) *pch++ = 'U';
2727      else if (stat->peerIsInterested) *pch++ = 'u';
2728      if (!stat->clientIsChoked && !stat->clientIsInterested) *pch++ = 'K';
2729      if (!stat->peerIsChoked && !stat->peerIsInterested) *pch++ = '?';
2730      if (stat->isEncrypted) *pch++ = 'E';
2731      if (stat->from == TR_PEER_FROM_DHT) *pch++ = 'H';
2732      else if (stat->from == TR_PEER_FROM_PEX) *pch++ = 'X';
2733      if (stat->isIncoming) *pch++ = 'I';
2734      *pch = '\0';
2735    }
2736
2737  *setmeCount = size;
2738  return ret;
2739}
2740
2741/***
2742****
2743****
2744***/
2745
2746void
2747tr_peerMgrClearInterest (tr_torrent * tor)
2748{
2749  int i;
2750  tr_swarm * s = tor->swarm;
2751  const int peerCount = tr_ptrArraySize (&s->peers);
2752
2753  assert (tr_isTorrent (tor));
2754  assert (tr_torrentIsLocked (tor));
2755
2756  for (i=0; i<peerCount; ++i)
2757    tr_peerMsgsSetInterested (tr_ptrArrayNth (&s->peers, i), false);
2758}
2759
2760/* does this peer have any pieces that we want? */
2761static bool
2762isPeerInteresting (tr_torrent     * const tor,
2763                   const bool     * const piece_is_interesting,
2764                   const tr_peer  * const peer)
2765{
2766  tr_piece_index_t i, n;
2767
2768  /* these cases should have already been handled by the calling code... */
2769  assert (!tr_torrentIsSeed (tor));
2770  assert (tr_torrentIsPieceTransferAllowed (tor, TR_PEER_TO_CLIENT));
2771
2772  if (peerIsSeed (peer))
2773    return true;
2774
2775  for (i=0, n=tor->info.pieceCount; i<n; ++i)
2776    if (piece_is_interesting[i] && tr_bitfieldHas (&peer->have, i))
2777      return true;
2778
2779  return false;
2780}
2781
2782typedef enum
2783{
2784  RECHOKE_STATE_GOOD,
2785  RECHOKE_STATE_UNTESTED,
2786  RECHOKE_STATE_BAD
2787}
2788tr_rechoke_state;
2789
2790struct tr_rechoke_info
2791{
2792  tr_peer * peer;
2793  int salt;
2794  int rechoke_state;
2795};
2796
2797static int
2798compare_rechoke_info (const void * va, const void * vb)
2799{
2800  const struct tr_rechoke_info * a = va;
2801  const struct tr_rechoke_info * b = vb;
2802
2803  if (a->rechoke_state != b->rechoke_state)
2804    return a->rechoke_state - b->rechoke_state;
2805
2806  return a->salt - b->salt;
2807}
2808
2809/* determines who we send "interested" messages to */
2810static void
2811rechokeDownloads (tr_swarm * s)
2812{
2813  int i;
2814  int maxPeers = 0;
2815  int rechoke_count = 0;
2816  struct tr_rechoke_info * rechoke = NULL;
2817  const int MIN_INTERESTING_PEERS = 5;
2818  const int peerCount = tr_ptrArraySize (&s->peers);
2819  const time_t now = tr_time ();
2820
2821  /* some cases where this function isn't necessary */
2822  if (tr_torrentIsSeed (s->tor))
2823    return;
2824  if (!tr_torrentIsPieceTransferAllowed (s->tor, TR_PEER_TO_CLIENT))
2825    return;
2826
2827  /* decide HOW MANY peers to be interested in */
2828  {
2829    int blocks = 0;
2830    int cancels = 0;
2831    time_t timeSinceCancel;
2832
2833    /* Count up how many blocks & cancels each peer has.
2834     *
2835     * There are two situations where we send out cancels --
2836     *
2837     * 1. We've got unresponsive peers, which is handled by deciding
2838     *    -which- peers to be interested in.
2839     *
2840     * 2. We've hit our bandwidth cap, which is handled by deciding
2841     *    -how many- peers to be interested in.
2842     *
2843     * We're working on 2. here, so we need to ignore unresponsive
2844     * peers in our calculations lest they confuse Transmission into
2845     * thinking it's hit its bandwidth cap.
2846     */
2847    for (i=0; i<peerCount; ++i)
2848      {
2849        const tr_peer * peer = tr_ptrArrayNth (&s->peers, i);
2850        const int b = tr_historyGet (&peer->blocksSentToClient, now, CANCEL_HISTORY_SEC);
2851        const int c = tr_historyGet (&peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC);
2852
2853        if (b == 0) /* ignore unresponsive peers, as described above */
2854          continue;
2855
2856        blocks += b;
2857        cancels += c;
2858      }
2859
2860    if (cancels > 0)
2861      {
2862        /* cancelRate: of the block requests we've recently made, the percentage we cancelled.
2863         * higher values indicate more congestion. */
2864        const double cancelRate = cancels / (double)(cancels + blocks);
2865        const double mult = 1 - MIN (cancelRate, 0.5);
2866        maxPeers = s->interestedCount * mult;
2867        tordbg (s, "cancel rate is %.3f -- reducing the "
2868                   "number of peers we're interested in by %.0f percent",
2869                   cancelRate, mult * 100);
2870        s->lastCancel = now;
2871      }
2872
2873    timeSinceCancel = now - s->lastCancel;
2874    if (timeSinceCancel)
2875      {
2876        const int maxIncrease = 15;
2877        const time_t maxHistory = 2 * CANCEL_HISTORY_SEC;
2878        const double mult = MIN (timeSinceCancel, maxHistory) / (double) maxHistory;
2879        const int inc = maxIncrease * mult;
2880        maxPeers = s->maxPeers + inc;
2881        tordbg (s, "time since last cancel is %li -- increasing the "
2882                   "number of peers we're interested in by %d",
2883                   timeSinceCancel, inc);
2884      }
2885  }
2886
2887  /* don't let the previous section's number tweaking go too far... */
2888  if (maxPeers < MIN_INTERESTING_PEERS)
2889    maxPeers = MIN_INTERESTING_PEERS;
2890  if (maxPeers > s->tor->maxConnectedPeers)
2891    maxPeers = s->tor->maxConnectedPeers;
2892
2893  s->maxPeers = maxPeers;
2894
2895  if (peerCount > 0)
2896    {
2897      bool * piece_is_interesting;
2898      const tr_torrent * const tor = s->tor;
2899      const int n = tor->info.pieceCount;
2900
2901      /* build a bitfield of interesting pieces... */
2902      piece_is_interesting = tr_new (bool, n);
2903      for (i=0; i<n; i++)
2904        piece_is_interesting[i] = !tor->info.pieces[i].dnd && !tr_cpPieceIsComplete (&tor->completion, i);
2905
2906      /* decide WHICH peers to be interested in (based on their cancel-to-block ratio) */
2907      for (i=0; i<peerCount; ++i)
2908        {
2909          tr_peer * peer = tr_ptrArrayNth (&s->peers, i);
2910
2911          if (!isPeerInteresting (s->tor, piece_is_interesting, peer))
2912            {
2913              tr_peerMsgsSetInterested (PEER_MSGS(peer), false);
2914            }
2915          else
2916            {
2917              tr_rechoke_state rechoke_state;
2918              const int blocks = tr_historyGet (&peer->blocksSentToClient, now, CANCEL_HISTORY_SEC);
2919              const int cancels = tr_historyGet (&peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC);
2920
2921              if (!blocks && !cancels)
2922                rechoke_state = RECHOKE_STATE_UNTESTED;
2923              else if (!cancels)
2924                rechoke_state = RECHOKE_STATE_GOOD;
2925              else if (!blocks)
2926                rechoke_state = RECHOKE_STATE_BAD;
2927              else if ((cancels * 10) < blocks)
2928                rechoke_state = RECHOKE_STATE_GOOD;
2929              else
2930                rechoke_state = RECHOKE_STATE_BAD;
2931
2932              if (rechoke == NULL)
2933                rechoke = tr_new (struct tr_rechoke_info, peerCount);
2934
2935              rechoke[rechoke_count].peer = peer;
2936              rechoke[rechoke_count].rechoke_state = rechoke_state;
2937              rechoke[rechoke_count].salt = tr_cryptoWeakRandInt (INT_MAX);
2938              rechoke_count++;
2939            }
2940
2941        }
2942
2943      tr_free (piece_is_interesting);
2944    }
2945
2946  /* now that we know which & how many peers to be interested in... update the peer interest */
2947  qsort (rechoke, rechoke_count, sizeof (struct tr_rechoke_info), compare_rechoke_info);
2948  s->interestedCount = MIN (maxPeers, rechoke_count);
2949  for (i=0; i<rechoke_count; ++i)
2950    tr_peerMsgsSetInterested (PEER_MSGS(rechoke[i].peer), i<s->interestedCount);
2951
2952  /* cleanup */
2953  tr_free (rechoke);
2954}
2955
2956/**
2957***
2958**/
2959
2960struct ChokeData
2961{
2962  bool          isInterested;
2963  bool          wasChoked;
2964  bool          isChoked;
2965  int           rate;
2966  int           salt;
2967  tr_peerMsgs * msgs;
2968};
2969
2970static int
2971compareChoke (const void * va, const void * vb)
2972{
2973  const struct ChokeData * a = va;
2974  const struct ChokeData * b = vb;
2975
2976  if (a->rate != b->rate) /* prefer higher overall speeds */
2977    return a->rate > b->rate ? -1 : 1;
2978
2979  if (a->wasChoked != b->wasChoked) /* prefer unchoked */
2980    return a->wasChoked ? 1 : -1;
2981
2982  if (a->salt != b->salt) /* random order */
2983    return a->salt - b->salt;
2984
2985  return 0;
2986}
2987
2988/* is this a new connection? */
2989static bool
2990isNew (const tr_peerMsgs * msgs)
2991{
2992  return (msgs != NULL) && (tr_peerMsgsGetConnectionAge (msgs) < 45);
2993}
2994
2995/* get a rate for deciding which peers to choke and unchoke. */
2996static int
2997getRate (const tr_torrent * tor, struct peer_atom * atom, uint64_t now)
2998{
2999  unsigned int Bps;
3000
3001  if (tr_torrentIsSeed (tor))
3002    Bps = tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_CLIENT_TO_PEER);
3003
3004  /* downloading a private torrent... take upload speed into account
3005   * because there may only be a small window of opportunity to share */
3006  else if (tr_torrentIsPrivate (tor))
3007    Bps = tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_PEER_TO_CLIENT)
3008        + tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_CLIENT_TO_PEER);
3009
3010  /* downloading a public torrent */
3011  else
3012    Bps = tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_PEER_TO_CLIENT);
3013
3014  /* convert it to bytes per second */
3015  return Bps;
3016}
3017
3018static inline bool
3019isBandwidthMaxedOut (const tr_bandwidth * b,
3020                     const uint64_t now_msec, tr_direction dir)
3021{
3022  if (!tr_bandwidthIsLimited (b, dir))
3023    {
3024      return false;
3025    }
3026  else
3027    {
3028      const unsigned int got = tr_bandwidthGetPieceSpeed_Bps (b, now_msec, dir);
3029      const unsigned int want = tr_bandwidthGetDesiredSpeed_Bps (b, dir);
3030      return got >= want;
3031    }
3032}
3033
3034static void
3035rechokeUploads (tr_swarm * s, const uint64_t now)
3036{
3037  int i, size, unchokedInterested;
3038  const int peerCount = tr_ptrArraySize (&s->peers);
3039  tr_peer ** peers = (tr_peer**) tr_ptrArrayBase (&s->peers);
3040  struct ChokeData * choke = tr_new0 (struct ChokeData, peerCount);
3041  const tr_session * session = s->manager->session;
3042  const int chokeAll = !tr_torrentIsPieceTransferAllowed (s->tor, TR_CLIENT_TO_PEER);
3043  const bool isMaxedOut = isBandwidthMaxedOut (&s->tor->bandwidth, now, TR_UP);
3044
3045  assert (swarmIsLocked (s));
3046
3047  /* an optimistic unchoke peer's "optimistic"
3048   * state lasts for N calls to rechokeUploads (). */
3049  if (s->optimisticUnchokeTimeScaler > 0)
3050    s->optimisticUnchokeTimeScaler--;
3051  else
3052    s->optimistic = NULL;
3053
3054  /* sort the peers by preference and rate */
3055  for (i=0, size=0; i<peerCount; ++i)
3056    {
3057      tr_peer * peer = peers[i];
3058      tr_peerMsgs * msgs = PEER_MSGS (peer);
3059
3060      struct peer_atom * atom = peer->atom;
3061
3062      if (peerIsSeed (peer)) /* choke seeds and partial seeds */
3063        {
3064          tr_peerMsgsSetChoke (PEER_MSGS(peer), true);
3065        }
3066      else if (chokeAll) /* choke everyone if we're not uploading */
3067        {
3068          tr_peerMsgsSetChoke (PEER_MSGS(peer), true);
3069        }
3070      else if (msgs != s->optimistic)
3071        {
3072          struct ChokeData * n = &choke[size++];
3073          n->msgs         = msgs;
3074          n->isInterested = tr_peerMsgsIsPeerInterested (msgs);
3075          n->wasChoked    = tr_peerMsgsIsPeerChoked (msgs);
3076          n->rate         = getRate (s->tor, atom, now);
3077          n->salt         = tr_cryptoWeakRandInt (INT_MAX);
3078          n->isChoked     = true;
3079        }
3080    }
3081
3082  qsort (choke, size, sizeof (struct ChokeData), compareChoke);
3083
3084  /**
3085   * Reciprocation and number of uploads capping is managed by unchoking
3086   * the N peers which have the best upload rate and are interested.
3087   * This maximizes the client's download rate. These N peers are
3088   * referred to as downloaders, because they are interested in downloading
3089   * from the client.
3090   *
3091   * Peers which have a better upload rate (as compared to the downloaders)
3092   * but aren't interested get unchoked. If they become interested, the
3093   * downloader with the worst upload rate gets choked. If a client has
3094   * a complete file, it uses its upload rate rather than its download
3095   * rate to decide which peers to unchoke.
3096   *
3097   * If our bandwidth is maxed out, don't unchoke any more peers.
3098   */
3099  unchokedInterested = 0;
3100  for (i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i)
3101    {
3102      choke[i].isChoked = isMaxedOut ? choke[i].wasChoked : false;
3103      if (choke[i].isInterested)
3104        ++unchokedInterested;
3105    }
3106
3107  /* optimistic unchoke */
3108  if (!s->optimistic && !isMaxedOut && (i<size))
3109    {
3110      int n;
3111      struct ChokeData * c;
3112      tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
3113
3114      for (; i<size; ++i)
3115        {
3116          if (choke[i].isInterested)
3117            {
3118              const tr_peerMsgs * msgs = choke[i].msgs;
3119              int x = 1, y;
3120              if (isNew (msgs)) x *= 3;
3121              for (y=0; y<x; ++y)
3122                tr_ptrArrayAppend (&randPool, &choke[i]);
3123            }
3124        }
3125
3126      if ((n = tr_ptrArraySize (&randPool)))
3127        {
3128          c = tr_ptrArrayNth (&randPool, tr_cryptoWeakRandInt (n));
3129          c->isChoked = false;
3130          s->optimistic = c->msgs;
3131          s->optimisticUnchokeTimeScaler = OPTIMISTIC_UNCHOKE_MULTIPLIER;
3132        }
3133
3134      tr_ptrArrayDestruct (&randPool, NULL);
3135    }
3136
3137  for (i=0; i<size; ++i)
3138    tr_peerMsgsSetChoke (choke[i].msgs, choke[i].isChoked);
3139
3140  /* cleanup */
3141  tr_free (choke);
3142}
3143
3144static void
3145rechokePulse (int foo UNUSED, short bar UNUSED, void * vmgr)
3146{
3147  tr_torrent * tor = NULL;
3148  tr_peerMgr * mgr = vmgr;
3149  const uint64_t now = tr_time_msec ();
3150
3151  managerLock (mgr);
3152
3153  while ((tor = tr_torrentNext (mgr->session, tor)))
3154    {
3155      if (tor->isRunning && tor->peerCount)
3156        {
3157          tr_swarm * s = tor->swarm;
3158          rechokeUploads (s, now);
3159          rechokeDownloads (s);
3160        }
3161    }
3162
3163  tr_timerAddMsec (mgr->rechokeTimer, RECHOKE_PERIOD_MSEC);
3164  managerUnlock (mgr);
3165}
3166
3167/***
3168****
3169****  Life and Death
3170****
3171***/
3172
3173static bool
3174shouldPeerBeClosed (const tr_swarm   * s,
3175                    const tr_peer    * peer,
3176                    int                peerCount,
3177                    const time_t       now)
3178{
3179  const tr_torrent * tor = s->tor;
3180  const struct peer_atom * atom = peer->atom;
3181
3182  /* if it's marked for purging, close it */
3183  if (peer->doPurge)
3184    {
3185      tordbg (s, "purging peer %s because its doPurge flag is set",
3186              tr_atomAddrStr (atom));
3187      return true;
3188    }
3189
3190  /* disconnect if we're both seeds and enough time has passed for PEX */
3191  if (tr_torrentIsSeed (tor) && peerIsSeed (peer))
3192    return !tr_torrentAllowsPex (tor) || (now-atom->time>=30);
3193
3194  /* disconnect if it's been too long since piece data has been transferred.
3195   * this is on a sliding scale based on number of available peers... */
3196  {
3197    const int relaxStrictnessIfFewerThanN = (int)((getMaxPeerCount (tor) * 0.9) + 0.5);
3198    /* if we have >= relaxIfFewerThan, strictness is 100%.
3199     * if we have zero connections, strictness is 0% */
3200    const float strictness = peerCount >= relaxStrictnessIfFewerThanN
3201                           ? 1.0
3202                           : peerCount / (float)relaxStrictnessIfFewerThanN;
3203    const int lo = MIN_UPLOAD_IDLE_SECS;
3204    const int hi = MAX_UPLOAD_IDLE_SECS;
3205    const int limit = hi - ((hi - lo) * strictness);
3206    const int idleTime = now - MAX (atom->time, atom->piece_data_time);
3207/*fprintf (stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit);*/
3208    if (idleTime > limit)
3209      {
3210        tordbg (s, "purging peer %s because it's been %d secs since we shared anything",
3211                tr_atomAddrStr (atom), idleTime);
3212        return true;
3213      }
3214  }
3215
3216  return false;
3217}
3218
3219static tr_peer **
3220getPeersToClose (tr_swarm * s, const time_t now_sec, int * setmeSize)
3221{
3222  int i, peerCount, outsize;
3223  struct tr_peer ** ret = NULL;
3224  tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek (&s->peers, &peerCount);
3225
3226  assert (swarmIsLocked (s));
3227
3228  for (i=outsize=0; i<peerCount; ++i)
3229    {
3230      if (shouldPeerBeClosed (s, peers[i], peerCount, now_sec))
3231        {
3232          if (ret == NULL)
3233            ret = tr_new (tr_peer *, peerCount);
3234          ret[outsize++] = peers[i];
3235        }
3236    }
3237
3238  *setmeSize = outsize;
3239  return ret;
3240}
3241
3242static int
3243getReconnectIntervalSecs (const struct peer_atom * atom, const time_t now)
3244{
3245  int sec;
3246  const bool unreachable = (atom->flags2 & MYFLAG_UNREACHABLE) != 0;
3247
3248  /* if we were recently connected to this peer and transferring piece
3249   * data, try to reconnect to them sooner rather that later -- we don't
3250   * want network troubles to get in the way of a good peer. */
3251  if (!unreachable && ((now - atom->piece_data_time) <= (MINIMUM_RECONNECT_INTERVAL_SECS * 2)))
3252    sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3253
3254  /* otherwise, the interval depends on how many times we've tried
3255   * and failed to connect to the peer */
3256  else
3257    {
3258      int step = atom->numFails;
3259
3260      /* penalize peers that were unreachable the last time we tried */
3261      if (unreachable)
3262        step += 2;
3263 
3264      switch (step)
3265        {
3266          case 0: sec = 0; break;
3267          case 1: sec = 10; break;
3268          case 2: sec = 60 * 2; break;
3269          case 3: sec = 60 * 15; break;
3270          case 4: sec = 60 * 30; break;
3271          case 5: sec = 60 * 60; break;
3272          default: sec = 60 * 120; break;
3273        }
3274    }
3275
3276  dbgmsg ("reconnect interval for %s is %d seconds", tr_atomAddrStr (atom), sec);
3277  return sec;
3278}
3279
3280static void
3281removePeer (tr_swarm * s, tr_peer * peer)
3282{
3283  tr_peer * removed;
3284  struct peer_atom * atom = peer->atom;
3285
3286  assert (swarmIsLocked (s));
3287  assert (atom);
3288
3289  atom->time = tr_time ();
3290
3291  removed = tr_ptrArrayRemoveSorted (&s->peers, peer, peerCompare);
3292  --s->tor->peerCount;
3293  --s->tor->peerFromCount[atom->fromFirst];
3294
3295  if (replicationExists (s))
3296    tr_decrReplicationFromBitfield (s, &peer->have);
3297
3298  assert (removed == peer);
3299  assert (s->tor->peerCount == tr_ptrArraySize (&s->peers));
3300  assert (s->tor->peerFromCount[atom->fromFirst] >= 0);
3301
3302  tr_peerFree (removed);
3303}
3304
3305static void
3306closePeer (tr_swarm * s, tr_peer * peer)
3307{
3308  struct peer_atom * atom;
3309
3310  assert (s != NULL);
3311  assert (peer != NULL);
3312
3313  atom = peer->atom;
3314
3315  /* if we transferred piece data, then they might be good peers,
3316     so reset their `numFails' weight to zero. otherwise we connected
3317     to them fruitlessly, so mark it as another fail */
3318  if (atom->piece_data_time)
3319    {
3320      tordbg (s, "resetting atom %s numFails to 0", tr_atomAddrStr (atom));
3321      atom->numFails = 0;
3322    }
3323  else
3324    {
3325      ++atom->numFails;
3326      tordbg (s, "incremented atom %s numFails to %d", tr_atomAddrStr (atom), (int)atom->numFails);
3327    }
3328
3329  tordbg (s, "removing bad peer %s", tr_atomAddrStr (peer->atom));
3330  removePeer (s, peer);
3331}
3332
3333static void
3334removeAllPeers (tr_swarm * s)
3335{
3336  while (!tr_ptrArrayEmpty (&s->peers))
3337    removePeer (s, tr_ptrArrayNth (&s->peers, 0));
3338
3339  assert (!s->tor->peerCount);
3340}
3341
3342static void
3343closeBadPeers (tr_swarm * s, const time_t now_sec)
3344{
3345  if (!tr_ptrArrayEmpty (&s->peers))
3346    {
3347      int i;
3348      int peerCount;
3349      struct tr_peer ** peers;
3350
3351      peers = getPeersToClose (s, now_sec, &peerCount);
3352      for (i=0; i<peerCount; ++i)
3353        closePeer (s, peers[i]);
3354      tr_free (peers);
3355    }
3356}
3357
3358struct peer_liveliness
3359{
3360  tr_peer * peer;
3361  void * clientData;
3362  time_t pieceDataTime;
3363  time_t time;
3364  int speed;
3365  bool doPurge;
3366};
3367
3368static int
3369comparePeerLiveliness (const void * va, const void * vb)
3370{
3371  const struct peer_liveliness * a = va;
3372  const struct peer_liveliness * b = vb;
3373
3374  if (a->doPurge != b->doPurge)
3375    return a->doPurge ? 1 : -1;
3376
3377  if (a->speed != b->speed) /* faster goes first */
3378    return a->speed > b->speed ? -1 : 1;
3379
3380  /* the one to give us data more recently goes first */
3381  if (a->pieceDataTime != b->pieceDataTime)
3382    return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
3383
3384  /* the one we connected to most recently goes first */
3385  if (a->time != b->time)
3386    return a->time > b->time ? -1 : 1;
3387
3388  return 0;
3389}
3390
3391static void
3392sortPeersByLivelinessImpl (tr_peer  ** peers,
3393                           void     ** clientData,
3394                           int         n,
3395                           uint64_t    now,
3396                           int (*compare)(const void *va, const void *vb))
3397{
3398  int i;
3399  struct peer_liveliness *lives, *l;
3400
3401  /* build a sortable array of peer + extra info */
3402  lives = l = tr_new0 (struct peer_liveliness, n);
3403  for (i=0; i<n; ++i, ++l)
3404    {
3405      tr_peer * p = peers[i];
3406      l->peer = p;
3407      l->doPurge = p->doPurge;
3408      l->pieceDataTime = p->atom->piece_data_time;
3409      l->time = p->atom->time;
3410      l->speed = tr_peerGetPieceSpeed_Bps (p, now, TR_UP)
3411               + tr_peerGetPieceSpeed_Bps (p, now, TR_DOWN);
3412      if (clientData)
3413        l->clientData = clientData[i];
3414    }
3415
3416  /* sort 'em */
3417  assert (n == (l - lives));
3418  qsort (lives, n, sizeof (struct peer_liveliness), compare);
3419
3420  /* build the peer array */
3421  for (i=0, l=lives; i<n; ++i, ++l)
3422    {
3423      peers[i] = l->peer;
3424      if (clientData)
3425        clientData[i] = l->clientData;
3426    }
3427  assert (n == (l - lives));
3428
3429  /* cleanup */
3430  tr_free (lives);
3431}
3432
3433static void
3434sortPeersByLiveliness (tr_peer ** peers, void ** clientData, int n, uint64_t now)
3435{
3436  sortPeersByLivelinessImpl (peers, clientData, n, now, comparePeerLiveliness);
3437}
3438
3439
3440static void
3441enforceTorrentPeerLimit (tr_swarm * s, uint64_t now)
3442{
3443  int n = tr_ptrArraySize (&s->peers);
3444  const int max = tr_torrentGetPeerLimit (s->tor);
3445  if (n > max)
3446    {
3447      void * base = tr_ptrArrayBase (&s->peers);
3448      tr_peer ** peers = tr_memdup (base, n*sizeof (tr_peer*));
3449      sortPeersByLiveliness (peers, NULL, n, now);
3450      while (n > max)
3451        closePeer (s, peers[--n]);
3452      tr_free (peers);
3453    }
3454}
3455
3456static void
3457enforceSessionPeerLimit (tr_session * session, uint64_t now)
3458{
3459  int n = 0;
3460  tr_torrent * tor = NULL;
3461  const int max = tr_sessionGetPeerLimit (session);
3462
3463  /* count the total number of peers */
3464  while ((tor = tr_torrentNext (session, tor)))
3465    n += tr_ptrArraySize (&tor->swarm->peers);
3466
3467  /* if there are too many, prune out the worst */
3468  if (n > max)
3469    {
3470      tr_peer ** peers = tr_new (tr_peer*, n);
3471      tr_swarm ** swarms = tr_new (tr_swarm*, n);
3472
3473      /* populate the peer array */
3474      n = 0;
3475      tor = NULL;
3476      while ((tor = tr_torrentNext (session, tor)))
3477        {
3478          int i;
3479          tr_swarm * s = tor->swarm;
3480          const int tn = tr_ptrArraySize (&s->peers);
3481          for (i=0; i<tn; ++i, ++n)
3482            {
3483              peers[n] = tr_ptrArrayNth (&s->peers, i);
3484              swarms[n] = s;
3485            }
3486        }
3487
3488      /* sort 'em */
3489      sortPeersByLiveliness (peers, (void**)swarms, n, now);
3490
3491      /* cull out the crappiest */
3492      while (n-- > max)
3493        closePeer (swarms[n], peers[n]);
3494
3495      /* cleanup */
3496      tr_free (swarms);
3497      tr_free (peers);
3498    }
3499}
3500
3501static void makeNewPeerConnections (tr_peerMgr * mgr, const int max);
3502
3503static void
3504reconnectPulse (int foo UNUSED, short bar UNUSED, void * vmgr)
3505{
3506  tr_torrent * tor;
3507  tr_peerMgr * mgr = vmgr;
3508  const time_t now_sec = tr_time ();
3509  const uint64_t now_msec = tr_time_msec ();
3510
3511  /**
3512  ***  enforce the per-session and per-torrent peer limits
3513  **/
3514
3515  /* if we're over the per-torrent peer limits, cull some peers */
3516  tor = NULL;
3517  while ((tor = tr_torrentNext (mgr->session, tor)))
3518    if (tor->isRunning)
3519      enforceTorrentPeerLimit (tor->swarm, now_msec);
3520
3521  /* if we're over the per-session peer limits, cull some peers */
3522  enforceSessionPeerLimit (mgr->session, now_msec);
3523
3524  /* remove crappy peers */
3525  tor = NULL;
3526  while ((tor = tr_torrentNext (mgr->session, tor)))
3527    if (!tor->swarm->isRunning)
3528      removeAllPeers (tor->swarm);
3529    else
3530      closeBadPeers (tor->swarm, now_sec);
3531
3532  /* try to make new peer connections */
3533  makeNewPeerConnections (mgr, MAX_CONNECTIONS_PER_PULSE);
3534}
3535
3536/****
3537*****
3538*****  BANDWIDTH ALLOCATION
3539*****
3540****/
3541
3542static void
3543pumpAllPeers (tr_peerMgr * mgr)
3544{
3545  tr_torrent * tor = NULL;
3546
3547  while ((tor = tr_torrentNext (mgr->session, tor)))
3548    {
3549      int j;
3550      tr_swarm * s = tor->swarm;
3551
3552      for (j=0; j<tr_ptrArraySize (&s->peers); ++j)
3553        tr_peerMsgsPulse (tr_ptrArrayNth (&s->peers, j));
3554    }
3555}
3556
3557
3558static void
3559queuePulseForeach (void * vtor)
3560{
3561  tr_torrent * tor = vtor;
3562
3563  tr_torrentStartNow (tor);
3564
3565  if (tor->queue_started_callback != NULL)
3566    (*tor->queue_started_callback)(tor, tor->queue_started_user_data);
3567}
3568
3569static void
3570queuePulse (tr_session * session, tr_direction dir)
3571{
3572  assert (tr_isSession (session));
3573  assert (tr_isDirection (dir));
3574
3575  if (tr_sessionGetQueueEnabled (session, dir))
3576    {
3577      tr_ptrArray torrents = TR_PTR_ARRAY_INIT;
3578
3579      tr_sessionGetNextQueuedTorrents (session,
3580                                       dir,
3581                                       tr_sessionCountQueueFreeSlots (session, dir),
3582                                       &torrents);
3583
3584      tr_ptrArrayForeach (&torrents, queuePulseForeach);
3585
3586      tr_ptrArrayDestruct (&torrents, NULL);
3587    }
3588}
3589
3590static void
3591bandwidthPulse (int foo UNUSED, short bar UNUSED, void * vmgr)
3592{
3593  tr_torrent * tor;
3594  tr_peerMgr * mgr = vmgr;
3595  tr_session * session = mgr->session;
3596  managerLock (mgr);
3597
3598  /* FIXME: this next line probably isn't necessary... */
3599  pumpAllPeers (mgr);
3600
3601  /* allocate bandwidth to the peers */
3602  tr_bandwidthAllocate (&session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC);
3603  tr_bandwidthAllocate (&session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC);
3604
3605  /* torrent upkeep */
3606  tor = NULL;
3607  while ((tor = tr_torrentNext (session, tor)))
3608    {
3609      /* possibly stop torrents that have seeded enough */
3610      tr_torrentCheckSeedLimit (tor);
3611
3612      /* run the completeness check for any torrents that need it */
3613      if (tor->swarm->needsCompletenessCheck)
3614        {
3615          tor->swarm->needsCompletenessCheck  = false;
3616          tr_torrentRecheckCompleteness (tor);
3617        }
3618
3619      /* stop torrents that are ready to stop, but couldn't be stopped
3620         earlier during the peer-io callback call chain */
3621      if (tor->isStopping)
3622        tr_torrentStop (tor);
3623
3624      /* update the torrent's stats */
3625      tor->activeWebseedCount = countActiveWebseeds (tor->swarm);
3626    }
3627
3628  /* pump the queues */
3629  queuePulse (session, TR_UP);
3630  queuePulse (session, TR_DOWN);
3631
3632  reconnectPulse (0, 0, mgr);
3633
3634  tr_timerAddMsec (mgr->bandwidthTimer, BANDWIDTH_PERIOD_MSEC);
3635  managerUnlock (mgr);
3636}
3637
3638/***
3639****
3640***/
3641
3642static int
3643compareAtomPtrsByAddress (const void * va, const void *vb)
3644{
3645  const struct peer_atom * a = * (const struct peer_atom**) va;
3646  const struct peer_atom * b = * (const struct peer_atom**) vb;
3647
3648  assert (tr_isAtom (a));
3649  assert (tr_isAtom (b));
3650
3651  return tr_address_compare (&a->addr, &b->addr);
3652}
3653
3654/* best come first, worst go last */
3655static int
3656compareAtomPtrsByShelfDate (const void * va, const void *vb)
3657{
3658  time_t atime;
3659  time_t btime;
3660  const struct peer_atom * a = * (const struct peer_atom**) va;
3661  const struct peer_atom * b = * (const struct peer_atom**) vb;
3662  const int data_time_cutoff_secs = 60 * 60;
3663  const time_t tr_now = tr_time ();
3664
3665  assert (tr_isAtom (a));
3666  assert (tr_isAtom (b));
3667
3668  /* primary key: the last piece data time *if* it was within the last hour */
3669  atime = a->piece_data_time; if (atime + data_time_cutoff_secs < tr_now) atime = 0;
3670  btime = b->piece_data_time; if (btime + data_time_cutoff_secs < tr_now) btime = 0;
3671  if (atime != btime)
3672    return atime > btime ? -1 : 1;
3673
3674  /* secondary key: shelf date. */
3675  if (a->shelf_date != b->shelf_date)
3676    return a->shelf_date > b->shelf_date ? -1 : 1;
3677
3678  return 0;
3679}
3680
3681static int
3682getMaxAtomCount (const tr_torrent * tor)
3683{
3684  return MIN (50, tor->maxConnectedPeers * 3);
3685}
3686
3687static void
3688atomPulse (int foo UNUSED, short bar UNUSED, void * vmgr)
3689{
3690  tr_torrent * tor = NULL;
3691  tr_peerMgr * mgr = vmgr;
3692  managerLock (mgr);
3693
3694  while ((tor = tr_torrentNext (mgr->session, tor)))
3695    {
3696      int atomCount;
3697      tr_swarm * s = tor->swarm;
3698      const int maxAtomCount = getMaxAtomCount (tor);
3699      struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek (&s->pool, &atomCount);
3700
3701      if (atomCount > maxAtomCount) /* we've got too many atoms... time to prune */
3702        {
3703          int i;
3704          int keepCount = 0;
3705          int testCount = 0;
3706          struct peer_atom ** keep = tr_new (struct peer_atom*, atomCount);
3707          struct peer_atom ** test = tr_new (struct peer_atom*, atomCount);
3708
3709          /* keep the ones that are in use */
3710          for (i=0; i<atomCount; ++i)
3711            {
3712              struct peer_atom * atom = atoms[i];
3713              if (peerIsInUse (s, atom))
3714                keep[keepCount++] = atom;
3715              else
3716                test[testCount++] = atom;
3717            }
3718
3719          /* if there's room, keep the best of what's left */
3720          i = 0;
3721          if (keepCount < maxAtomCount)
3722            {
3723              qsort (test, testCount, sizeof (struct peer_atom *), compareAtomPtrsByShelfDate);
3724              while (i<testCount && keepCount<maxAtomCount)
3725                keep[keepCount++] = test[i++];
3726            }
3727
3728          /* free the culled atoms */
3729          while (i<testCount)
3730            tr_free (test[i++]);
3731
3732          /* rebuild Torrent.pool with what's left */
3733          tr_ptrArrayDestruct (&s->pool, NULL);
3734          s->pool = TR_PTR_ARRAY_INIT;
3735          qsort (keep, keepCount, sizeof (struct peer_atom *), compareAtomPtrsByAddress);
3736          for (i=0; i<keepCount; ++i)
3737            tr_ptrArrayAppend (&s->pool, keep[i]);
3738
3739          tordbg (s, "max atom count is %d... pruned from %d to %d\n", maxAtomCount, atomCount, keepCount);
3740
3741          /* cleanup */
3742          tr_free (test);
3743          tr_free (keep);
3744        }
3745    }
3746
3747  tr_timerAddMsec (mgr->atomTimer, ATOM_PERIOD_MSEC);
3748  managerUnlock (mgr);
3749}
3750
3751/***
3752****
3753****
3754****
3755***/
3756
3757/* is this atom someone that we'd want to initiate a connection to? */
3758static bool
3759isPeerCandidate (const tr_torrent * tor, struct peer_atom * atom, const time_t now)
3760{
3761  /* not if we're both seeds */
3762  if (tr_torrentIsSeed (tor) && atomIsSeed (atom))
3763    return false;
3764
3765  /* not if we've already got a connection to them... */
3766  if (peerIsInUse (tor->swarm, atom))
3767    return false;
3768
3769  /* not if we just tried them already */
3770  if ((now - atom->time) < getReconnectIntervalSecs (atom, now))
3771    return false;
3772
3773  /* not if they're blocklisted */
3774  if (isAtomBlocklisted (tor->session, atom))
3775    return false;
3776
3777  /* not if they're banned... */
3778  if (atom->flags2 & MYFLAG_BANNED)
3779    return false;
3780
3781  return true;
3782}
3783
3784struct peer_candidate
3785{
3786  uint64_t score;
3787  tr_torrent * tor;
3788  struct peer_atom * atom;
3789};
3790
3791static bool
3792torrentWasRecentlyStarted (const tr_torrent * tor)
3793{
3794  return difftime (tr_time (), tor->startDate) < 120;
3795}
3796
3797static inline uint64_t
3798addValToKey (uint64_t value, int width, uint64_t addme)
3799{
3800  value = (value << (uint64_t)width);
3801  value |= addme;
3802  return value;
3803}
3804
3805/* smaller value is better */
3806static uint64_t
3807getPeerCandidateScore (const tr_torrent * tor, const struct peer_atom * atom, uint8_t salt)
3808{
3809  uint64_t i;
3810  uint64_t score = 0;
3811  const bool failed = atom->lastConnectionAt < atom->lastConnectionAttemptAt;
3812
3813  /* prefer peers we've connected to, or never tried, over peers we failed to connect to. */
3814  i = failed ? 1 : 0;
3815  score = addValToKey (score, 1, i);
3816
3817  /* prefer the one we attempted least recently (to cycle through all peers) */
3818  i = atom->lastConnectionAttemptAt;
3819  score = addValToKey (score, 32, i);
3820
3821  /* prefer peers belonging to a torrent of a higher priority */
3822  switch (tr_torrentGetPriority (tor))
3823    {
3824      case TR_PRI_HIGH:    i = 0; break;
3825      case TR_PRI_NORMAL:  i = 1; break;
3826      case TR_PRI_LOW:     i = 2; break;
3827    }
3828  score = addValToKey (score, 4, i);
3829
3830  /* prefer recently-started torrents */
3831  i = torrentWasRecentlyStarted (tor) ? 0 : 1;
3832  score = addValToKey (score, 1, i);
3833
3834  /* prefer torrents we're downloading with */
3835  i = tr_torrentIsSeed (tor) ? 1 : 0;
3836  score = addValToKey (score, 1, i);
3837
3838  /* prefer peers that are known to be connectible */
3839  i = (atom->flags & ADDED_F_CONNECTABLE) ? 0 : 1;
3840  score = addValToKey (score, 1, i);
3841
3842  /* prefer peers that we might have a chance of uploading to...
3843  so lower seed probability is better */
3844  if (atom->seedProbability == 100) i = 101;
3845  else if (atom->seedProbability == -1) i = 100;
3846  else i = atom->seedProbability;
3847  score = addValToKey (score, 8, i);
3848
3849  /* Prefer peers that we got from more trusted sources.
3850   * lower `fromBest' values indicate more trusted sources */
3851  score = addValToKey (score, 4, atom->fromBest);
3852
3853  /* salt */
3854  score = addValToKey (score, 8, salt);
3855
3856  return score;
3857}
3858
3859static int
3860comparePeerCandidates (const void * va, const void * vb)
3861{
3862  int ret;
3863  const struct peer_candidate * a = va;
3864  const struct peer_candidate * b = vb;
3865
3866  if (a->score < b->score)
3867    ret = -1;
3868  else if (a->score > b->score)
3869    ret = 1;
3870  else
3871    ret = 0;
3872
3873  return ret;
3874}
3875
3876/* Partial sorting -- selecting the k best candidates
3877   Adapted from http://en.wikipedia.org/wiki/Selection_algorithm */
3878static void
3879selectPeerCandidates (struct peer_candidate * candidates, int candidate_count, int select_count)
3880{
3881  tr_quickfindFirstK (candidates,
3882                      candidate_count,
3883                      sizeof(struct peer_candidate),
3884                      comparePeerCandidates,
3885                      select_count);
3886}
3887
3888#ifndef NDEBUG
3889static bool
3890checkBestScoresComeFirst (const struct peer_candidate * candidates, int n, int k)
3891{
3892  int i;
3893  uint64_t worstFirstScore = 0;
3894  const int x = MIN (n, k) - 1;
3895
3896  for (i=0; i<x; i++)
3897    if (worstFirstScore < candidates[i].score)
3898      worstFirstScore = candidates[i].score;
3899
3900  for (i=0; i<x; i++)
3901    assert (candidates[i].score <= worstFirstScore);
3902
3903  for (i=x+1; i<n; i++)
3904    assert (candidates[i].score >= worstFirstScore);
3905
3906  return true;
3907}
3908#endif /* NDEBUG */
3909
3910/** @return an array of all the atoms we might want to connect to */
3911static struct peer_candidate*
3912getPeerCandidates (tr_session * session, int * candidateCount, int max)
3913{
3914  int atomCount;
3915  int peerCount;
3916  tr_torrent * tor;
3917  struct peer_candidate * candidates;
3918  struct peer_candidate * walk;
3919  const time_t now = tr_time ();
3920  const uint64_t now_msec = tr_time_msec ();
3921  /* leave 5% of connection slots for incoming connections -- ticket #2609 */
3922  const int maxCandidates = tr_sessionGetPeerLimit (session) * 0.95;
3923
3924  /* count how many peers and atoms we've got */
3925  tor= NULL;
3926  atomCount = 0;
3927  peerCount = 0;
3928  while ((tor = tr_torrentNext (session, tor)))
3929    {
3930      atomCount += tr_ptrArraySize (&tor->swarm->pool);
3931      peerCount += tr_ptrArraySize (&tor->swarm->peers);
3932    }
3933
3934  /* don't start any new handshakes if we're full up */
3935  if (maxCandidates <= peerCount)
3936    {
3937      *candidateCount = 0;
3938      return NULL;
3939    }
3940
3941  /* allocate an array of candidates */
3942  walk = candidates = tr_new (struct peer_candidate, atomCount);
3943
3944  /* populate the candidate array */
3945  tor = NULL;
3946  while ((tor = tr_torrentNext (session, tor)))
3947    {
3948      int i, nAtoms;
3949      struct peer_atom ** atoms;
3950
3951      if (!tor->swarm->isRunning)
3952        continue;
3953
3954      /* if we've already got enough peers in this torrent... */
3955      if (tr_torrentGetPeerLimit (tor) <= tr_ptrArraySize (&tor->swarm->peers))
3956        continue;
3957
3958      /* if we've already got enough speed in this torrent... */
3959      if (tr_torrentIsSeed (tor) && isBandwidthMaxedOut (&tor->bandwidth, now_msec, TR_UP))
3960        continue;
3961
3962      atoms = (struct peer_atom**) tr_ptrArrayPeek (&tor->swarm->pool, &nAtoms);
3963      for (i=0; i<nAtoms; ++i)
3964        {
3965          struct peer_atom * atom = atoms[i];
3966
3967          if (isPeerCandidate (tor, atom, now))
3968            {
3969              const uint8_t salt = tr_cryptoWeakRandInt (1024);
3970              walk->tor = tor;
3971              walk->atom = atom;
3972              walk->score = getPeerCandidateScore (tor, atom, salt);
3973              ++walk;
3974            }
3975        }
3976    }
3977
3978  *candidateCount = walk - candidates;
3979  if (walk != candidates)
3980    selectPeerCandidates (candidates, walk-candidates, max);
3981
3982  assert (checkBestScoresComeFirst (candidates, *candidateCount, max));
3983  return candidates;
3984}
3985
3986static void
3987initiateConnection (tr_peerMgr * mgr, tr_swarm * s, struct peer_atom * atom)
3988{
3989  tr_peerIo * io;
3990  const time_t now = tr_time ();
3991  bool utp = tr_sessionIsUTPEnabled (mgr->session) && !atom->utp_failed;
3992
3993  if (atom->fromFirst == TR_PEER_FROM_PEX)
3994    /* PEX has explicit signalling for uTP support.  If an atom
3995       originally came from PEX and doesn't have the uTP flag, skip the
3996       uTP connection attempt.  Are we being optimistic here? */
3997    utp = utp && (atom->flags & ADDED_F_UTP_FLAGS);
3998
3999  tordbg (s, "Starting an OUTGOING%s connection with %s",
4000          utp ? " µTP" : "", tr_atomAddrStr (atom));
4001
4002  io = tr_peerIoNewOutgoing (mgr->session,
4003                             &mgr->session->bandwidth,
4004                             &atom->addr,
4005                             atom->port,
4006                             s->tor->info.hash,
4007                             s->tor->completeness == TR_SEED,
4008                             utp);
4009
4010  if (io == NULL)
4011    {
4012      tordbg (s, "peerIo not created; marking peer %s as unreachable", tr_atomAddrStr (atom));
4013      atom->flags2 |= MYFLAG_UNREACHABLE;
4014      atom->numFails++;
4015    }
4016  else
4017    {
4018      tr_handshake * handshake = tr_handshakeNew (io,
4019                                                  mgr->session->encryptionMode,
4020                                                  myHandshakeDoneCB,
4021                                                  mgr);
4022
4023      assert (tr_peerIoGetTorrentHash (io));
4024
4025      tr_peerIoUnref (io); /* balanced by the initial ref
4026                              in tr_peerIoNewOutgoing () */
4027
4028      tr_ptrArrayInsertSorted (&s->outgoingHandshakes, handshake,
4029                               handshakeCompare);
4030    }
4031
4032  atom->lastConnectionAttemptAt = now;
4033  atom->time = now;
4034}
4035
4036static void
4037initiateCandidateConnection (tr_peerMgr * mgr, struct peer_candidate * c)
4038{
4039#if 0
4040  fprintf (stderr, "Starting an OUTGOING connection with %s - [%s] seedProbability==%d; %s, %s\n",
4041           tr_atomAddrStr (c->atom),
4042           tr_torrentName (c->tor),
4043           (int)c->atom->seedProbability,
4044           tr_torrentIsPrivate (c->tor) ? "private" : "public",
4045           tr_torrentIsSeed (c->tor) ? "seed" : "downloader");
4046#endif
4047
4048  initiateConnection (mgr, c->tor->swarm, c->atom);
4049}
4050
4051static void
4052makeNewPeerConnections (struct tr_peerMgr * mgr, const int max)
4053{
4054  int i, n;
4055  struct peer_candidate * candidates;
4056
4057  candidates = getPeerCandidates (mgr->session, &n, max);
4058
4059  for (i=0; i<n && i<max; ++i)
4060    initiateCandidateConnection (mgr, &candidates[i]);
4061
4062  tr_free (candidates);
4063}
Note: See TracBrowser for help on using the repository browser.