source: trunk/libtransmission/peer-mgr.c @ 14167

Last change on this file since 14167 was 14167, checked in by jordan, 8 years ago

silence a small handful of minor gcc compiler warnings in libtransmission

  • Property svn:keywords set to Date Rev Author Id
File size: 108.6 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2 (b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 14167 2013-08-17 17:20:31Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h> /* error codes ERANGE, ... */
15#include <limits.h> /* INT_MAX */
16#include <string.h> /* memcpy, memcmp, strstr */
17#include <stdlib.h> /* qsort */
18
19#include <event2/event.h>
20
21#include <libutp/utp.h>
22
23#include "transmission.h"
24#include "announcer.h"
25#include "bandwidth.h"
26#include "blocklist.h"
27#include "cache.h"
28#include "clients.h"
29#include "completion.h"
30#include "crypto.h"
31#include "handshake.h"
32#include "log.h"
33#include "net.h"
34#include "peer-io.h"
35#include "peer-mgr.h"
36#include "peer-msgs.h"
37#include "ptrarray.h"
38#include "session.h"
39#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
40#include "torrent.h"
41#include "tr-utp.h"
42#include "utils.h"
43#include "webseed.h"
44
45enum
46{
47  /* how frequently to cull old atoms */
48  ATOM_PERIOD_MSEC = (60 * 1000),
49
50  /* how frequently to change which peers are choked */
51  RECHOKE_PERIOD_MSEC = (10 * 1000),
52
53  /* an optimistically unchoked peer is immune from rechoking
54     for this many calls to rechokeUploads (). */
55  OPTIMISTIC_UNCHOKE_MULTIPLIER = 4,
56
57  /* how frequently to reallocate bandwidth */
58  BANDWIDTH_PERIOD_MSEC = 500,
59
60  /* how frequently to age out old piece request lists */
61  REFILL_UPKEEP_PERIOD_MSEC = (10 * 1000),
62
63  /* how frequently to decide which peers live and die */
64  RECONNECT_PERIOD_MSEC = 500,
65
66  /* when many peers are available, keep idle ones this long */
67  MIN_UPLOAD_IDLE_SECS = (60),
68
69  /* when few peers are available, keep idle ones this long */
70  MAX_UPLOAD_IDLE_SECS = (60 * 5),
71
72  /* max number of peers to ask for per second overall.
73   * this throttle is to avoid overloading the router */
74  MAX_CONNECTIONS_PER_SECOND = 12,
75
76  MAX_CONNECTIONS_PER_PULSE = (int)(MAX_CONNECTIONS_PER_SECOND * (RECONNECT_PERIOD_MSEC/1000.0)),
77
78  /* number of bad pieces a peer is allowed to send before we ban them */
79  MAX_BAD_PIECES_PER_PEER = 5,
80
81  /* amount of time to keep a list of request pieces lying around
82     before it's considered too old and needs to be rebuilt */
83  PIECE_LIST_SHELF_LIFE_SECS = 60,
84
85  /* use for bitwise operations w/peer_atom.flags2 */
86  MYFLAG_BANNED = 1,
87
88  /* use for bitwise operations w/peer_atom.flags2 */
89  /* unreachable for now... but not banned.
90   * if they try to connect to us it's okay */
91  MYFLAG_UNREACHABLE = 2,
92
93  /* the minimum we'll wait before attempting to reconnect to a peer */
94  MINIMUM_RECONNECT_INTERVAL_SECS = 5,
95
96  /** how long we'll let requests we've made linger before we cancel them */
97  REQUEST_TTL_SECS = 90,
98
99  NO_BLOCKS_CANCEL_HISTORY = 120,
100
101  CANCEL_HISTORY_SEC = 60
102};
103
104const tr_peer_event TR_PEER_EVENT_INIT = { 0, 0, NULL, 0, 0, 0, 0 };
105
106const tr_swarm_stats TR_SWARM_STATS_INIT = { { 0, 0 }, 0, 0, { 0, 0, 0, 0, 0, 0, 0 } };
107
108/**
109***
110**/
111
112/**
113 * Peer information that should be kept even before we've connected and
114 * after we've disconnected. These are kept in a pool of peer_atoms to decide
115 * which ones would make good candidates for connecting to, and to watch out
116 * for banned peers.
117 *
118 * @see tr_peer
119 * @see tr_peerMsgs
120 */
121struct peer_atom
122{
123  uint8_t     fromFirst;          /* where the peer was first found */
124  uint8_t     fromBest;           /* the "best" value of where the peer has been found */
125  uint8_t     flags;              /* these match the added_f flags */
126  uint8_t     flags2;             /* flags that aren't defined in added_f */
127  int8_t      seedProbability;    /* how likely is this to be a seed... [0..100] or -1 for unknown */
128  int8_t      blocklisted;        /* -1 for unknown, true for blocklisted, false for not blocklisted */
129
130  tr_port     port;
131  bool        utp_failed;         /* We recently failed to connect over uTP */
132  uint16_t    numFails;
133  time_t      time;               /* when the peer's connection status last changed */
134  time_t      piece_data_time;
135
136  time_t      lastConnectionAttemptAt;
137  time_t      lastConnectionAt;
138
139  /* similar to a TTL field, but less rigid --
140   * if the swarm is small, the atom will be kept past this date. */
141  time_t      shelf_date;
142  tr_peer   * peer;               /* will be NULL if not connected */
143  tr_address  addr;
144};
145
146#ifdef NDEBUG
147#define tr_isAtom(a) (TRUE)
148#else
149static bool
150tr_isAtom (const struct peer_atom * atom)
151{
152  return (atom != NULL)
153      && (atom->fromFirst < TR_PEER_FROM__MAX)
154      && (atom->fromBest < TR_PEER_FROM__MAX)
155      && (tr_address_is_valid (&atom->addr));
156}
157#endif
158
159static const char*
160tr_atomAddrStr (const struct peer_atom * atom)
161{
162  return atom ? tr_peerIoAddrStr (&atom->addr, atom->port) : "[no atom]";
163}
164
165struct block_request
166{
167  tr_block_index_t block;
168  tr_peer * peer;
169  time_t sentAt;
170};
171
172struct weighted_piece
173{
174  tr_piece_index_t index;
175  int16_t salt;
176  int16_t requestCount;
177};
178
179enum piece_sort_state
180{
181  PIECES_UNSORTED,
182  PIECES_SORTED_BY_INDEX,
183  PIECES_SORTED_BY_WEIGHT
184};
185
186/** @brief Opaque, per-torrent data structure for peer connection information */
187typedef struct tr_swarm
188{
189  tr_swarm_stats             stats;
190
191  tr_ptrArray                outgoingHandshakes; /* tr_handshake */
192  tr_ptrArray                pool; /* struct peer_atom */
193  tr_ptrArray                peers; /* tr_peerMsgs */
194  tr_ptrArray                webseeds; /* tr_webseed */
195
196  tr_torrent               * tor;
197  struct tr_peerMgr        * manager;
198
199  tr_peerMsgs              * optimistic; /* the optimistic peer, or NULL if none */
200  int                        optimisticUnchokeTimeScaler;
201
202  bool                       isRunning;
203  bool                       needsCompletenessCheck;
204
205  struct block_request     * requests;
206  int                        requestCount;
207  int                        requestAlloc;
208
209  struct weighted_piece    * pieces;
210  int                        pieceCount;
211  enum piece_sort_state      pieceSortState;
212
213  /* An array of pieceCount items stating how many peers have each piece.
214     This is used to help us for downloading pieces "rarest first."
215     This may be NULL if we don't have metainfo yet, or if we're not
216     downloading and don't care about rarity */
217  uint16_t                 * pieceReplication;
218  size_t                     pieceReplicationSize;
219
220  int                        interestedCount;
221  int                        maxPeers;
222  time_t                     lastCancel;
223
224  /* Before the endgame this should be 0. In endgame, is contains the average
225   * number of pending requests per peer. Only peers which have more pending
226   * requests are considered 'fast' are allowed to request a block that's
227   * already been requested from another (slower?) peer. */
228  int                        endgame;
229}
230tr_swarm;
231
232struct tr_peerMgr
233{
234  tr_session    * session;
235  tr_ptrArray     incomingHandshakes; /* tr_handshake */
236  struct event  * bandwidthTimer;
237  struct event  * rechokeTimer;
238  struct event  * refillUpkeepTimer;
239  struct event  * atomTimer;
240};
241
242#define tordbg(t, ...) \
243  do \
244    { \
245      if (tr_logGetDeepEnabled ()) \
246        tr_logAddDeep (__FILE__, __LINE__, tr_torrentName (t->tor), __VA_ARGS__); \
247    } \
248  while (0)
249
250#define dbgmsg(...) \
251  do \
252    { \
253      if (tr_logGetDeepEnabled ()) \
254        tr_logAddDeep (__FILE__, __LINE__, NULL, __VA_ARGS__); \
255    } \
256  while (0)
257
258/**
259*** tr_peer virtual functions
260**/
261
262static bool
263tr_peerIsTransferringPieces (const tr_peer * peer,
264                             uint64_t        now,
265                             tr_direction    direction,
266                             unsigned int  * Bps)
267{
268  assert (peer != NULL);
269  assert (peer->funcs != NULL);
270
271  return (*peer->funcs->is_transferring_pieces)(peer, now, direction, Bps);
272}
273
274unsigned int
275tr_peerGetPieceSpeed_Bps (const tr_peer    * peer,
276                          uint64_t           now,
277                          tr_direction       direction)
278{
279  unsigned int Bps = 0;
280  tr_peerIsTransferringPieces (peer, now, direction, &Bps);
281  return Bps;
282}
283
284static void
285tr_peerFree (tr_peer * peer)
286{
287  assert (peer != NULL);
288  assert (peer->funcs != NULL);
289
290  (*peer->funcs->destruct)(peer);
291
292  tr_free (peer);
293}
294
295void
296tr_peerConstruct (tr_peer * peer, const tr_torrent * tor)
297{
298  assert (peer != NULL);
299  assert (tr_isTorrent (tor));
300
301  memset (peer, 0, sizeof (tr_peer));
302
303  peer->client = TR_KEY_NONE;
304  peer->swarm = tor->swarm;
305  tr_bitfieldConstruct (&peer->have, tor->info.pieceCount);
306  tr_bitfieldConstruct (&peer->blame, tor->blockCount);
307}
308
309static void peerDeclinedAllRequests (tr_swarm *, const tr_peer *);
310
311void
312tr_peerDestruct (tr_peer * peer)
313{
314  assert (peer != NULL);
315
316  if (peer->swarm != NULL)
317    peerDeclinedAllRequests (peer->swarm, peer);
318
319  tr_bitfieldDestruct (&peer->have);
320  tr_bitfieldDestruct (&peer->blame);
321
322  if (peer->atom)
323    peer->atom->peer = NULL;
324}
325
326/**
327***
328**/
329
330static inline void
331managerLock (const struct tr_peerMgr * manager)
332{
333  tr_sessionLock (manager->session);
334}
335
336static inline void
337managerUnlock (const struct tr_peerMgr * manager)
338{
339  tr_sessionUnlock (manager->session);
340}
341
342static inline void
343swarmLock (tr_swarm * swarm)
344{
345  managerLock (swarm->manager);
346}
347
348static inline void
349swarmUnlock (tr_swarm * swarm)
350{
351  managerUnlock (swarm->manager);
352}
353
354static inline int
355swarmIsLocked (const tr_swarm * swarm)
356{
357  return tr_sessionIsLocked (swarm->manager->session);
358}
359
360/**
361***
362**/
363
364static int
365handshakeCompareToAddr (const void * va, const void * vb)
366{
367  const tr_handshake * a = va;
368
369  return tr_address_compare (tr_handshakeGetAddr (a, NULL), vb);
370}
371
372static int
373handshakeCompare (const void * a, const void * b)
374{
375  return handshakeCompareToAddr (a, tr_handshakeGetAddr (b, NULL));
376}
377
378static inline tr_handshake*
379getExistingHandshake (tr_ptrArray * handshakes, const tr_address * addr)
380{
381  if (tr_ptrArrayEmpty (handshakes))
382    return NULL;
383
384  return tr_ptrArrayFindSorted (handshakes, addr, handshakeCompareToAddr);
385}
386
387static int
388comparePeerAtomToAddress (const void * va, const void * vb)
389{
390  const struct peer_atom * a = va;
391
392  return tr_address_compare (&a->addr, vb);
393}
394
395static int
396compareAtomsByAddress (const void * va, const void * vb)
397{
398  const struct peer_atom * b = vb;
399
400  assert (tr_isAtom (b));
401
402  return comparePeerAtomToAddress (va, &b->addr);
403}
404
405/**
406***
407**/
408
409const tr_address *
410tr_peerAddress (const tr_peer * peer)
411{
412  return &peer->atom->addr;
413}
414
415static tr_swarm *
416getExistingSwarm (tr_peerMgr *    manager,
417                  const uint8_t * hash)
418{
419  tr_torrent * tor = tr_torrentFindFromHash (manager->session, hash);
420
421  return tor == NULL ? NULL : tor->swarm;
422}
423
424static int
425peerCompare (const void * a, const void * b)
426{
427  return tr_address_compare (tr_peerAddress (a), tr_peerAddress (b));
428}
429
430static struct peer_atom*
431getExistingAtom (const tr_swarm   * cswarm,
432                 const tr_address * addr)
433{
434  tr_swarm * swarm = (tr_swarm*) cswarm;
435  return tr_ptrArrayFindSorted (&swarm->pool, addr, comparePeerAtomToAddress);
436}
437
438static bool
439peerIsInUse (const tr_swarm * cs, const struct peer_atom * atom)
440{
441  tr_swarm * s = (tr_swarm*) cs;
442
443  assert (swarmIsLocked (s));
444
445  return (atom->peer != NULL)
446      || getExistingHandshake (&s->outgoingHandshakes, &atom->addr)
447      || getExistingHandshake (&s->manager->incomingHandshakes, &atom->addr);
448}
449
450static inline bool
451replicationExists (const tr_swarm * s)
452{
453  return s->pieceReplication != NULL;
454}
455
456static void
457replicationFree (tr_swarm * s)
458{
459  tr_free (s->pieceReplication);
460  s->pieceReplication = NULL;
461  s->pieceReplicationSize = 0;
462}
463
464static void
465replicationNew (tr_swarm * s)
466{
467  tr_piece_index_t piece_i;
468  const tr_piece_index_t piece_count = s->tor->info.pieceCount;
469  const int n = tr_ptrArraySize (&s->peers);
470
471  assert (!replicationExists (s));
472
473  s->pieceReplicationSize = piece_count;
474  s->pieceReplication = tr_new0 (uint16_t, piece_count);
475
476  for (piece_i=0; piece_i<piece_count; ++piece_i)
477    {
478      int peer_i;
479      uint16_t r = 0;
480
481      for (peer_i=0; peer_i<n; ++peer_i)
482        {
483          tr_peer * peer = tr_ptrArrayNth (&s->peers, peer_i);
484          if (tr_bitfieldHas (&peer->have, piece_i))
485            ++r;
486        }
487
488      s->pieceReplication[piece_i] = r;
489    }
490}
491
492static void
493swarmFree (void * vs)
494{
495  tr_swarm * s = vs;
496
497  assert (s);
498  assert (!s->isRunning);
499  assert (swarmIsLocked (s));
500  assert (tr_ptrArrayEmpty (&s->outgoingHandshakes));
501  assert (tr_ptrArrayEmpty (&s->peers));
502
503  tr_ptrArrayDestruct (&s->webseeds, (PtrArrayForeachFunc)tr_peerFree);
504  tr_ptrArrayDestruct (&s->pool, (PtrArrayForeachFunc)tr_free);
505  tr_ptrArrayDestruct (&s->outgoingHandshakes, NULL);
506  tr_ptrArrayDestruct (&s->peers, NULL);
507  s->stats = TR_SWARM_STATS_INIT;
508
509  replicationFree (s);
510
511  tr_free (s->requests);
512  tr_free (s->pieces);
513  tr_free (s);
514}
515
516static void peerCallbackFunc (tr_peer *, const tr_peer_event *, void *);
517
518static void
519rebuildWebseedArray (tr_swarm * s, tr_torrent * tor)
520{
521  unsigned int i;
522  const tr_info * inf = &tor->info;
523
524  /* clear the array */
525  tr_ptrArrayDestruct (&s->webseeds, (PtrArrayForeachFunc)tr_peerFree);
526  s->webseeds = TR_PTR_ARRAY_INIT;
527  s->stats.activeWebseedCount = 0;
528
529  /* repopulate it */
530  for (i=0; i<inf->webseedCount; ++i)
531    {
532      tr_webseed * w = tr_webseedNew (tor, inf->webseeds[i], peerCallbackFunc, s);
533      tr_ptrArrayAppend (&s->webseeds, w);
534    }
535}
536
537static tr_swarm *
538swarmNew (tr_peerMgr * manager, tr_torrent * tor)
539{
540  tr_swarm * s;
541
542  s = tr_new0 (tr_swarm, 1);
543  s->manager = manager;
544  s->tor = tor;
545  s->pool = TR_PTR_ARRAY_INIT;
546  s->peers = TR_PTR_ARRAY_INIT;
547  s->webseeds = TR_PTR_ARRAY_INIT;
548  s->outgoingHandshakes = TR_PTR_ARRAY_INIT;
549
550  rebuildWebseedArray (s, tor);
551
552  return s;
553}
554
555static void ensureMgrTimersExist (struct tr_peerMgr * m);
556
557tr_peerMgr*
558tr_peerMgrNew (tr_session * session)
559{
560  tr_peerMgr * m = tr_new0 (tr_peerMgr, 1);
561  m->session = session;
562  m->incomingHandshakes = TR_PTR_ARRAY_INIT;
563  ensureMgrTimersExist (m);
564  return m;
565}
566
567static void
568deleteTimer (struct event ** t)
569{
570  if (*t != NULL)
571    {
572      event_free (*t);
573      *t = NULL;
574    }
575}
576
577static void
578deleteTimers (struct tr_peerMgr * m)
579{
580  deleteTimer (&m->atomTimer);
581  deleteTimer (&m->bandwidthTimer);
582  deleteTimer (&m->rechokeTimer);
583  deleteTimer (&m->refillUpkeepTimer);
584}
585
586void
587tr_peerMgrFree (tr_peerMgr * manager)
588{
589  managerLock (manager);
590
591  deleteTimers (manager);
592
593  /* free the handshakes. Abort invokes handshakeDoneCB (), which removes
594   * the item from manager->handshakes, so this is a little roundabout... */
595  while (!tr_ptrArrayEmpty (&manager->incomingHandshakes))
596    tr_handshakeAbort (tr_ptrArrayNth (&manager->incomingHandshakes, 0));
597
598  tr_ptrArrayDestruct (&manager->incomingHandshakes, NULL);
599
600  managerUnlock (manager);
601  tr_free (manager);
602}
603
604/***
605****
606***/
607
608void
609tr_peerMgrOnBlocklistChanged (tr_peerMgr * mgr)
610{
611  tr_torrent * tor = NULL;
612  tr_session * session = mgr->session;
613
614  /* we cache whether or not a peer is blocklisted...
615     since the blocklist has changed, erase that cached value */
616  while ((tor = tr_torrentNext (session, tor)))
617    {
618      int i;
619      tr_swarm * s = tor->swarm;
620      const int n = tr_ptrArraySize (&s->pool);
621      for (i=0; i<n; ++i)
622        {
623          struct peer_atom * atom = tr_ptrArrayNth (&s->pool, i);
624          atom->blocklisted = -1;
625        }
626    }
627}
628
629static bool
630isAtomBlocklisted (tr_session * session, struct peer_atom * atom)
631{
632  if (atom->blocklisted < 0)
633    atom->blocklisted = tr_sessionIsAddressBlocked (session, &atom->addr);
634
635  assert (tr_isBool (atom->blocklisted));
636  return atom->blocklisted;
637}
638
639
640/***
641****
642***/
643
644static void
645atomSetSeedProbability (struct peer_atom * atom, int seedProbability)
646{
647  assert (atom != NULL);
648  assert (-1<=seedProbability && seedProbability<=100);
649
650  atom->seedProbability = seedProbability;
651
652  if (seedProbability == 100)
653    atom->flags |= ADDED_F_SEED_FLAG;
654  else if (seedProbability != -1)
655    atom->flags &= ~ADDED_F_SEED_FLAG;
656}
657
658static inline bool
659atomIsSeed (const struct peer_atom * atom)
660{
661  return atom->seedProbability == 100;
662}
663
664static void
665atomSetSeed (const tr_swarm * s, struct peer_atom * atom)
666{
667  if (!atomIsSeed (atom))
668    {
669      tordbg (s, "marking peer %s as a seed", tr_atomAddrStr (atom));
670
671      atomSetSeedProbability (atom, 100);
672    }
673}
674
675
676bool
677tr_peerMgrPeerIsSeed (const tr_torrent  * tor,
678                      const tr_address  * addr)
679{
680  bool isSeed = false;
681  const tr_swarm * s = tor->swarm;
682  const struct peer_atom * atom = getExistingAtom (s, addr);
683
684  if (atom)
685    isSeed = atomIsSeed (atom);
686
687  return isSeed;
688}
689
690void
691tr_peerMgrSetUtpSupported (tr_torrent * tor, const tr_address * addr)
692{
693  struct peer_atom * atom = getExistingAtom (tor->swarm, addr);
694
695  if (atom)
696    atom->flags |= ADDED_F_UTP_FLAGS;
697}
698
699void
700tr_peerMgrSetUtpFailed (tr_torrent *tor, const tr_address *addr, bool failed)
701{
702  struct peer_atom * atom = getExistingAtom (tor->swarm, addr);
703
704  if (atom)
705    atom->utp_failed = failed;
706}
707
708
709/**
710***  REQUESTS
711***
712*** There are two data structures associated with managing block requests:
713***
714*** 1. tr_swarm::requests, an array of "struct block_request" which keeps
715***    track of which blocks have been requested, and when, and by which peers.
716***    This is list is used for (a) cancelling requests that have been pending
717***    for too long and (b) avoiding duplicate requests before endgame.
718***
719*** 2. tr_swarm::pieces, an array of "struct weighted_piece" which lists the
720***    pieces that we want to request. It's used to decide which blocks to
721***    return next when tr_peerMgrGetBlockRequests () is called.
722**/
723
724/**
725*** struct block_request
726**/
727
728static int
729compareReqByBlock (const void * va, const void * vb)
730{
731  const struct block_request * a = va;
732  const struct block_request * b = vb;
733
734  /* primary key: block */
735  if (a->block < b->block) return -1;
736  if (a->block > b->block) return 1;
737
738  /* secondary key: peer */
739  if (a->peer < b->peer) return -1;
740  if (a->peer > b->peer) return 1;
741
742  return 0;
743}
744
745static void
746requestListAdd (tr_swarm * s, tr_block_index_t block, tr_peer * peer)
747{
748  struct block_request key;
749
750  /* ensure enough room is available... */
751  if (s->requestCount + 1 >= s->requestAlloc)
752    {
753      const int CHUNK_SIZE = 128;
754      s->requestAlloc += CHUNK_SIZE;
755      s->requests = tr_renew (struct block_request,
756                              s->requests, s->requestAlloc);
757    }
758
759  /* populate the record we're inserting */
760  key.block = block;
761  key.peer = peer;
762  key.sentAt = tr_time ();
763
764  /* insert the request to our array... */
765  {
766    bool exact;
767    const int pos = tr_lowerBound (&key, s->requests, s->requestCount,
768                                   sizeof (struct block_request),
769                                   compareReqByBlock, &exact);
770    assert (!exact);
771    memmove (s->requests + pos + 1,
772             s->requests + pos,
773             sizeof (struct block_request) * (s->requestCount++ - pos));
774    s->requests[pos] = key;
775  }
776
777  if (peer != NULL)
778    {
779      ++peer->pendingReqsToPeer;
780      assert (peer->pendingReqsToPeer >= 0);
781    }
782
783  /*fprintf (stderr, "added request of block %lu from peer %s... "
784                     "there are now %d block\n",
785                     (unsigned long)block, tr_atomAddrStr (peer->atom), s->requestCount);*/
786}
787
788static struct block_request *
789requestListLookup (tr_swarm * s, tr_block_index_t block, const tr_peer * peer)
790{
791  struct block_request key;
792  key.block = block;
793  key.peer = (tr_peer*) peer;
794
795  return bsearch (&key, s->requests, s->requestCount,
796                  sizeof (struct block_request),
797                  compareReqByBlock);
798}
799
800/**
801 * Find the peers are we currently requesting the block
802 * with index @a block from and append them to @a peerArr.
803 */
804static void
805getBlockRequestPeers (tr_swarm * s, tr_block_index_t block,
806                      tr_ptrArray * peerArr)
807{
808  bool exact;
809  int i, pos;
810  struct block_request key;
811
812  key.block = block;
813  key.peer = NULL;
814  pos = tr_lowerBound (&key, s->requests, s->requestCount,
815                       sizeof (struct block_request),
816                       compareReqByBlock, &exact);
817
818  assert (!exact); /* shouldn't have a request with .peer == NULL */
819
820  for (i=pos; i<s->requestCount; ++i)
821    {
822      if (s->requests[i].block != block)
823        break;
824      tr_ptrArrayAppend (peerArr, s->requests[i].peer);
825    }
826}
827
828static void
829decrementPendingReqCount (const struct block_request * b)
830{
831  if (b->peer != NULL)
832    if (b->peer->pendingReqsToPeer > 0)
833      --b->peer->pendingReqsToPeer;
834}
835
836static void
837requestListRemove (tr_swarm * s, tr_block_index_t block, const tr_peer * peer)
838{
839  const struct block_request * b = requestListLookup (s, block, peer);
840
841  if (b != NULL)
842    {
843      const int pos = b - s->requests;
844      assert (pos < s->requestCount);
845
846      decrementPendingReqCount (b);
847
848      tr_removeElementFromArray (s->requests,
849                                 pos,
850                                 sizeof (struct block_request),
851                                 s->requestCount--);
852
853      /*fprintf (stderr, "removing request of block %lu from peer %s... "
854                         "there are now %d block requests left\n",
855                         (unsigned long)block, tr_atomAddrStr (peer->atom), t->requestCount);*/
856    }
857}
858
859static int
860countActiveWebseeds (tr_swarm * s)
861{
862  int activeCount = 0;
863
864  if (s->tor->isRunning && !tr_torrentIsSeed (s->tor))
865    {
866      int i;
867      const int n = tr_ptrArraySize (&s->webseeds);
868      const uint64_t now = tr_time_msec ();
869
870      for (i=0; i<n; ++i)
871        if (tr_peerIsTransferringPieces (tr_ptrArrayNth(&s->webseeds,i), now, TR_DOWN, NULL))
872          ++activeCount;
873    }
874
875  return activeCount;
876}
877
878static bool
879testForEndgame (const tr_swarm * s)
880{
881  /* we consider ourselves to be in endgame if the number of bytes
882     we've got requested is >= the number of bytes left to download */
883  return (s->requestCount * s->tor->blockSize)
884               >= tr_cpLeftUntilDone (&s->tor->completion);
885}
886
887static void
888updateEndgame (tr_swarm * s)
889{
890  assert (s->requestCount >= 0);
891
892  if (!testForEndgame (s))
893    {
894      /* not in endgame */
895      s->endgame = 0;
896    }
897  else if (!s->endgame) /* only recalculate when endgame first begins */
898    {
899      int i;
900      int numDownloading = 0;
901      const int n = tr_ptrArraySize (&s->peers);
902
903      /* add the active bittorrent peers... */
904      for (i=0; i<n; ++i)
905        {
906          const tr_peer * p = tr_ptrArrayNth (&s->peers, i);
907          if (p->pendingReqsToPeer > 0)
908            ++numDownloading;
909        }
910
911      /* add the active webseeds... */
912      numDownloading += countActiveWebseeds (s);
913
914      /* average number of pending requests per downloading peer */
915      s->endgame = s->requestCount / MAX (numDownloading, 1);
916    }
917}
918
919
920/****
921*****
922*****  Piece List Manipulation / Accessors
923*****
924****/
925
926static inline void
927invalidatePieceSorting (tr_swarm * s)
928{
929  s->pieceSortState = PIECES_UNSORTED;
930}
931
932static const tr_torrent * weightTorrent;
933
934static const uint16_t * weightReplication;
935
936static void
937setComparePieceByWeightTorrent (tr_swarm * s)
938{
939  if (!replicationExists (s))
940    replicationNew (s);
941
942  weightTorrent = s->tor;
943  weightReplication = s->pieceReplication;
944}
945
946/* we try to create a "weight" s.t. high-priority pieces come before others,
947 * and that partially-complete pieces come before empty ones. */
948static int
949comparePieceByWeight (const void * va, const void * vb)
950{
951  const struct weighted_piece * a = va;
952  const struct weighted_piece * b = vb;
953  int ia, ib, missing, pending;
954  const tr_torrent * tor = weightTorrent;
955  const uint16_t * rep = weightReplication;
956
957  /* primary key: weight */
958  missing = tr_cpMissingBlocksInPiece (&tor->completion, a->index);
959  pending = a->requestCount;
960  ia = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
961  missing = tr_cpMissingBlocksInPiece (&tor->completion, b->index);
962  pending = b->requestCount;
963  ib = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
964  if (ia < ib) return -1;
965  if (ia > ib) return 1;
966
967  /* secondary key: higher priorities go first */
968  ia = tor->info.pieces[a->index].priority;
969  ib = tor->info.pieces[b->index].priority;
970  if (ia > ib) return -1;
971  if (ia < ib) return 1;
972
973  /* tertiary key: rarest first. */
974  ia = rep[a->index];
975  ib = rep[b->index];
976  if (ia < ib) return -1;
977  if (ia > ib) return 1;
978
979  /* quaternary key: random */
980  if (a->salt < b->salt) return -1;
981  if (a->salt > b->salt) return 1;
982
983  /* okay, they're equal */
984  return 0;
985}
986
987static int
988comparePieceByIndex (const void * va, const void * vb)
989{
990  const struct weighted_piece * a = va;
991  const struct weighted_piece * b = vb;
992  if (a->index < b->index) return -1;
993  if (a->index > b->index) return 1;
994  return 0;
995}
996
997static void
998pieceListSort (tr_swarm * s, enum piece_sort_state state)
999{
1000  assert (state==PIECES_SORTED_BY_INDEX
1001       || state==PIECES_SORTED_BY_WEIGHT);
1002
1003  if (state == PIECES_SORTED_BY_WEIGHT)
1004    {
1005      setComparePieceByWeightTorrent (s);
1006      qsort (s->pieces, s->pieceCount, sizeof (struct weighted_piece), comparePieceByWeight);
1007    }
1008  else
1009    {
1010      qsort (s->pieces, s->pieceCount, sizeof (struct weighted_piece), comparePieceByIndex);
1011    }
1012
1013  s->pieceSortState = state;
1014}
1015
1016/**
1017 * These functions are useful for testing, but too expensive for nightly builds.
1018 * let's leave it disabled but add an easy hook to compile it back in
1019 */
1020#if 1
1021#define assertWeightedPiecesAreSorted(t)
1022#define assertReplicationCountIsExact(t)
1023#else
1024static void
1025assertWeightedPiecesAreSorted (Torrent * t)
1026{
1027    if (!t->endgame)
1028    {
1029        int i;
1030        setComparePieceByWeightTorrent (t);
1031        for (i=0; i<t->pieceCount-1; ++i)
1032            assert (comparePieceByWeight (&t->pieces[i], &t->pieces[i+1]) <= 0);
1033    }
1034}
1035static void
1036assertReplicationCountIsExact (Torrent * t)
1037{
1038    /* This assert might fail due to errors of implementations in other
1039     * clients. It happens when receiving duplicate bitfields/HaveAll/HaveNone
1040     * from a client. If a such a behavior is noticed,
1041     * a bug report should be filled to the faulty client. */
1042
1043    size_t piece_i;
1044    const uint16_t * rep = t->pieceReplication;
1045    const size_t piece_count = t->pieceReplicationSize;
1046    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase (&t->peers);
1047    const int peer_count = tr_ptrArraySize (&t->peers);
1048
1049    assert (piece_count == t->tor->info.pieceCount);
1050
1051    for (piece_i=0; piece_i<piece_count; ++piece_i)
1052    {
1053        int peer_i;
1054        uint16_t r = 0;
1055
1056        for (peer_i=0; peer_i<peer_count; ++peer_i)
1057            if (tr_bitsetHas (&peers[peer_i]->have, piece_i))
1058                ++r;
1059
1060        assert (rep[piece_i] == r);
1061    }
1062}
1063#endif
1064
1065static struct weighted_piece *
1066pieceListLookup (tr_swarm * s, tr_piece_index_t index)
1067{
1068  int i;
1069
1070  for (i=0; i<s->pieceCount; ++i)
1071    if (s->pieces[i].index == index)
1072      return &s->pieces[i];
1073
1074  return NULL;
1075}
1076
1077static void
1078pieceListRebuild (tr_swarm * s)
1079{
1080  if (!tr_torrentIsSeed (s->tor))
1081    {
1082      tr_piece_index_t i;
1083      tr_piece_index_t * pool;
1084      tr_piece_index_t poolCount = 0;
1085      const tr_torrent * tor = s->tor;
1086      const tr_info * inf = tr_torrentInfo (tor);
1087      struct weighted_piece * pieces;
1088      int pieceCount;
1089
1090      /* build the new list */
1091      pool = tr_new (tr_piece_index_t, inf->pieceCount);
1092      for (i=0; i<inf->pieceCount; ++i)
1093        if (!inf->pieces[i].dnd)
1094          if (!tr_cpPieceIsComplete (&tor->completion, i))
1095            pool[poolCount++] = i;
1096      pieceCount = poolCount;
1097      pieces = tr_new0 (struct weighted_piece, pieceCount);
1098      for (i=0; i<poolCount; ++i)
1099        {
1100          struct weighted_piece * piece = pieces + i;
1101          piece->index = pool[i];
1102          piece->requestCount = 0;
1103          piece->salt = tr_cryptoWeakRandInt (4096);
1104        }
1105
1106      /* if we already had a list of pieces, merge it into
1107       * the new list so we don't lose its requestCounts */
1108      if (s->pieces != NULL)
1109        {
1110          struct weighted_piece * o = s->pieces;
1111          struct weighted_piece * oend = o + s->pieceCount;
1112          struct weighted_piece * n = pieces;
1113          struct weighted_piece * nend = n + pieceCount;
1114
1115          pieceListSort (s, PIECES_SORTED_BY_INDEX);
1116
1117          while (o!=oend && n!=nend)
1118            {
1119              if (o->index < n->index)
1120                ++o;
1121              else if (o->index > n->index)
1122                ++n;
1123              else
1124                *n++ = *o++;
1125            }
1126
1127          tr_free (s->pieces);
1128        }
1129
1130      s->pieces = pieces;
1131      s->pieceCount = pieceCount;
1132
1133      pieceListSort (s, PIECES_SORTED_BY_WEIGHT);
1134
1135      /* cleanup */
1136      tr_free (pool);
1137    }
1138}
1139
1140static void
1141pieceListRemovePiece (tr_swarm * s, tr_piece_index_t piece)
1142{
1143  struct weighted_piece * p;
1144
1145  if ((p = pieceListLookup (s, piece)))
1146    {
1147      const int pos = p - s->pieces;
1148
1149      tr_removeElementFromArray (s->pieces,
1150                                 pos,
1151                                 sizeof (struct weighted_piece),
1152                                 s->pieceCount--);
1153
1154      if (s->pieceCount == 0)
1155        {
1156          tr_free (s->pieces);
1157          s->pieces = NULL;
1158        }
1159    }
1160}
1161
1162static void
1163pieceListResortPiece (tr_swarm * s, struct weighted_piece * p)
1164{
1165  int pos;
1166  bool isSorted = true;
1167
1168  if (p == NULL)
1169    return;
1170
1171  /* is the torrent already sorted? */
1172  pos = p - s->pieces;
1173  setComparePieceByWeightTorrent (s);
1174  if (isSorted && (pos > 0) && (comparePieceByWeight (p-1, p) > 0))
1175    isSorted = false;
1176  if (isSorted && (pos < s->pieceCount - 1) && (comparePieceByWeight (p, p+1) > 0))
1177    isSorted = false;
1178
1179  if (s->pieceSortState != PIECES_SORTED_BY_WEIGHT)
1180    {
1181      pieceListSort (s, PIECES_SORTED_BY_WEIGHT);
1182      isSorted = true;
1183    }
1184
1185  /* if it's not sorted, move it around */
1186  if (!isSorted)
1187    {
1188      bool exact;
1189      const struct weighted_piece tmp = *p;
1190
1191      tr_removeElementFromArray (s->pieces,
1192                                 pos,
1193                                 sizeof (struct weighted_piece),
1194                                 s->pieceCount--);
1195
1196      pos = tr_lowerBound (&tmp, s->pieces, s->pieceCount,
1197                           sizeof (struct weighted_piece),
1198                           comparePieceByWeight, &exact);
1199
1200      memmove (&s->pieces[pos + 1],
1201               &s->pieces[pos],
1202               sizeof (struct weighted_piece) * (s->pieceCount++ - pos));
1203
1204      s->pieces[pos] = tmp;
1205    }
1206
1207  assertWeightedPiecesAreSorted (s);
1208}
1209
1210static void
1211pieceListRemoveRequest (tr_swarm * s, tr_block_index_t block)
1212{
1213  struct weighted_piece * p;
1214  const tr_piece_index_t index = tr_torBlockPiece (s->tor, block);
1215
1216  if (((p = pieceListLookup (s, index))) && (p->requestCount > 0))
1217    {
1218      --p->requestCount;
1219      pieceListResortPiece (s, p);
1220    }
1221}
1222
1223
1224/****
1225*****
1226*****  Replication count (for rarest first policy)
1227*****
1228****/
1229
1230/**
1231 * Increase the replication count of this piece and sort it if the
1232 * piece list is already sorted
1233 */
1234static void
1235tr_incrReplicationOfPiece (tr_swarm * s, const size_t index)
1236{
1237  assert (replicationExists (s));
1238  assert (s->pieceReplicationSize == s->tor->info.pieceCount);
1239
1240  /* One more replication of this piece is present in the swarm */
1241  ++s->pieceReplication[index];
1242
1243  /* we only resort the piece if the list is already sorted */
1244  if (s->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1245    pieceListResortPiece (s, pieceListLookup (s, index));
1246}
1247
1248/**
1249 * Increases the replication count of pieces present in the bitfield
1250 */
1251static void
1252tr_incrReplicationFromBitfield (tr_swarm * s, const tr_bitfield * b)
1253{
1254  size_t i;
1255  uint16_t * rep = s->pieceReplication;
1256  const size_t n = s->tor->info.pieceCount;
1257
1258  assert (replicationExists (s));
1259
1260  for (i=0; i<n; ++i)
1261    if (tr_bitfieldHas (b, i))
1262      ++rep[i];
1263
1264  if (s->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1265    invalidatePieceSorting (s);
1266}
1267
1268/**
1269 * Increase the replication count of every piece
1270 */
1271static void
1272tr_incrReplication (tr_swarm * s)
1273{
1274  int i;
1275  const int n = s->pieceReplicationSize;
1276
1277  assert (replicationExists (s));
1278  assert (s->pieceReplicationSize == s->tor->info.pieceCount);
1279
1280  for (i=0; i<n; ++i)
1281    ++s->pieceReplication[i];
1282}
1283
1284/**
1285 * Decrease the replication count of pieces present in the bitset.
1286 */
1287static void
1288tr_decrReplicationFromBitfield (tr_swarm * s, const tr_bitfield * b)
1289{
1290  int i;
1291  const int n = s->pieceReplicationSize;
1292
1293  assert (replicationExists (s));
1294  assert (s->pieceReplicationSize == s->tor->info.pieceCount);
1295
1296  if (tr_bitfieldHasAll (b))
1297    {
1298      for (i=0; i<n; ++i)
1299        --s->pieceReplication[i];
1300    }
1301  else if (!tr_bitfieldHasNone (b))
1302    {
1303      for (i=0; i<n; ++i)
1304        if (tr_bitfieldHas (b, i))
1305          --s->pieceReplication[i];
1306
1307      if (s->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1308        invalidatePieceSorting (s);
1309    }
1310}
1311
1312/**
1313***
1314**/
1315
1316void
1317tr_peerMgrRebuildRequests (tr_torrent * tor)
1318{
1319  assert (tr_isTorrent (tor));
1320
1321  pieceListRebuild (tor->swarm);
1322}
1323
1324void
1325tr_peerMgrGetNextRequests (tr_torrent           * tor,
1326                           tr_peer              * peer,
1327                           int                    numwant,
1328                           tr_block_index_t     * setme,
1329                           int                  * numgot,
1330                           bool                   get_intervals)
1331{
1332  int i;
1333  int got;
1334  tr_swarm * s;
1335  struct weighted_piece * pieces;
1336  const tr_bitfield * const have = &peer->have;
1337
1338  /* sanity clause */
1339  assert (tr_isTorrent (tor));
1340  assert (numwant > 0);
1341
1342  /* walk through the pieces and find blocks that should be requested */
1343  got = 0;
1344  s = tor->swarm;
1345
1346  /* prep the pieces list */
1347  if (s->pieces == NULL)
1348    pieceListRebuild (s);
1349
1350  if (s->pieceSortState != PIECES_SORTED_BY_WEIGHT)
1351    pieceListSort (s, PIECES_SORTED_BY_WEIGHT);
1352
1353  assertReplicationCountIsExact (s);
1354  assertWeightedPiecesAreSorted (s);
1355
1356  updateEndgame (s);
1357  pieces = s->pieces;
1358  for (i=0; i<s->pieceCount && got<numwant; ++i)
1359    {
1360      struct weighted_piece * p = pieces + i;
1361
1362      /* if the peer has this piece that we want... */
1363      if (tr_bitfieldHas (have, p->index))
1364        {
1365          tr_block_index_t b;
1366          tr_block_index_t first;
1367          tr_block_index_t last;
1368          tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1369
1370          tr_torGetPieceBlockRange (tor, p->index, &first, &last);
1371
1372          for (b=first; b<=last && (got<numwant || (get_intervals && setme[2*got-1] == b-1)); ++b)
1373            {
1374              int peerCount;
1375              tr_peer ** peers;
1376
1377              /* don't request blocks we've already got */
1378              if (tr_cpBlockIsComplete (&tor->completion, b))
1379                continue;
1380
1381              /* always add peer if this block has no peers yet */
1382              tr_ptrArrayClear (&peerArr);
1383              getBlockRequestPeers (s, b, &peerArr);
1384              peers = (tr_peer **) tr_ptrArrayPeek (&peerArr, &peerCount);
1385              if (peerCount != 0)
1386                {
1387                  /* don't make a second block request until the endgame */
1388                  if (!s->endgame)
1389                    continue;
1390
1391                  /* don't have more than two peers requesting this block */
1392                  if (peerCount > 1)
1393                    continue;
1394
1395                  /* don't send the same request to the same peer twice */
1396                  if (peer == peers[0])
1397                    continue;
1398
1399                  /* in the endgame allow an additional peer to download a
1400                     block but only if the peer seems to be handling requests
1401                     relatively fast */
1402                  if (peer->pendingReqsToPeer + numwant - got < s->endgame)
1403                    continue;
1404                }
1405
1406              /* update the caller's table */
1407              if (!get_intervals)
1408                {
1409                  setme[got++] = b;
1410                }
1411              /* if intervals are requested two array entries are necessarry:
1412                 one for the interval's starting block and one for its end block */
1413              else if (got && setme[2 * got - 1] == b - 1 && b != first)
1414                {
1415                  /* expand the last interval */
1416                  ++setme[2 * got - 1];
1417                }
1418              else
1419                {
1420                  /* begin a new interval */
1421                  setme[2 * got] = setme[2 * got + 1] = b;
1422                  ++got;
1423                }
1424
1425              /* update our own tables */
1426              requestListAdd (s, b, peer);
1427              ++p->requestCount;
1428            }
1429
1430          tr_ptrArrayDestruct (&peerArr, NULL);
1431        }
1432    }
1433
1434  /* In most cases we've just changed the weights of a small number of pieces.
1435   * So rather than qsort ()ing the entire array, it's faster to apply an
1436   * adaptive insertion sort algorithm. */
1437  if (got > 0)
1438    {
1439      /* not enough requests || last piece modified */
1440      if (i == s->pieceCount)
1441        --i;
1442
1443      setComparePieceByWeightTorrent (s);
1444      while (--i >= 0)
1445        {
1446          bool exact;
1447
1448          /* relative position! */
1449          const int newpos = tr_lowerBound (&s->pieces[i], &s->pieces[i + 1],
1450                                            s->pieceCount - (i + 1),
1451                                            sizeof (struct weighted_piece),
1452                                            comparePieceByWeight, &exact);
1453          if (newpos > 0)
1454            {
1455              const struct weighted_piece piece = s->pieces[i];
1456              memmove (&s->pieces[i],
1457                       &s->pieces[i + 1],
1458                       sizeof (struct weighted_piece) * (newpos));
1459              s->pieces[i + newpos] = piece;
1460            }
1461        }
1462    }
1463
1464  assertWeightedPiecesAreSorted (t);
1465  *numgot = got;
1466}
1467
1468bool
1469tr_peerMgrDidPeerRequest (const tr_torrent  * tor,
1470                          const tr_peer     * peer,
1471                          tr_block_index_t    block)
1472{
1473  return requestListLookup ((tr_swarm*)tor->swarm, block, peer) != NULL;
1474}
1475
1476/* cancel requests that are too old */
1477static void
1478refillUpkeep (int foo UNUSED, short bar UNUSED, void * vmgr)
1479{
1480    time_t now;
1481    time_t too_old;
1482    tr_torrent * tor;
1483    int cancel_buflen = 0;
1484    struct block_request * cancel = NULL;
1485    tr_peerMgr * mgr = vmgr;
1486    managerLock (mgr);
1487
1488    now = tr_time ();
1489    too_old = now - REQUEST_TTL_SECS;
1490
1491    /* alloc the temporary "cancel" buffer */
1492    tor = NULL;
1493    while ((tor = tr_torrentNext (mgr->session, tor)))
1494        cancel_buflen = MAX (cancel_buflen, tor->swarm->requestCount);
1495    if (cancel_buflen > 0)
1496        cancel = tr_new (struct block_request, cancel_buflen);
1497
1498    /* prune requests that are too old */
1499    tor = NULL;
1500    while ((tor = tr_torrentNext (mgr->session, tor)))
1501    {
1502        tr_swarm * s = tor->swarm;
1503        const int n = s->requestCount;
1504        if (n > 0)
1505        {
1506            int keepCount = 0;
1507            int cancelCount = 0;
1508            const struct block_request * it;
1509            const struct block_request * end;
1510
1511            for (it=s->requests, end=it+n; it!=end; ++it)
1512            {
1513                tr_peerMsgs * msgs = PEER_MSGS(it->peer);
1514
1515                if ((msgs !=NULL) && (it->sentAt <= too_old) && !tr_peerMsgsIsReadingBlock (msgs, it->block))
1516                    cancel[cancelCount++] = *it;
1517                else
1518                {
1519                    if (it != &s->requests[keepCount])
1520                        s->requests[keepCount] = *it;
1521                    keepCount++;
1522                }
1523            }
1524
1525            /* prune out the ones we aren't keeping */
1526            s->requestCount = keepCount;
1527
1528            /* send cancel messages for all the "cancel" ones */
1529            for (it=cancel, end=it+cancelCount; it!=end; ++it)
1530            {
1531              tr_peerMsgs * msgs = PEER_MSGS(it->peer);
1532
1533              if (msgs != NULL)
1534                {
1535                  tr_historyAdd (&it->peer->cancelsSentToPeer, now, 1);
1536                  tr_peerMsgsCancel (msgs, it->block);
1537                  decrementPendingReqCount (it);
1538                }
1539            }
1540
1541            /* decrement the pending request counts for the timed-out blocks */
1542            for (it=cancel, end=it+cancelCount; it!=end; ++it)
1543                pieceListRemoveRequest (s, it->block);
1544        }
1545    }
1546
1547    tr_free (cancel);
1548    tr_timerAddMsec (mgr->refillUpkeepTimer, REFILL_UPKEEP_PERIOD_MSEC);
1549    managerUnlock (mgr);
1550}
1551
1552static void
1553addStrike (tr_swarm * s, tr_peer * peer)
1554{
1555  tordbg (s, "increasing peer %s strike count to %d",
1556          tr_atomAddrStr (peer->atom), peer->strikes + 1);
1557
1558  if (++peer->strikes >= MAX_BAD_PIECES_PER_PEER)
1559    {
1560      struct peer_atom * atom = peer->atom;
1561      atom->flags2 |= MYFLAG_BANNED;
1562      peer->doPurge = 1;
1563      tordbg (s, "banning peer %s", tr_atomAddrStr (atom));
1564    }
1565}
1566
1567static void
1568peerSuggestedPiece (tr_swarm           * s UNUSED,
1569                    tr_peer            * peer UNUSED,
1570                    tr_piece_index_t     pieceIndex UNUSED,
1571                    int                  isFastAllowed UNUSED)
1572{
1573#if 0
1574    assert (t);
1575    assert (peer);
1576    assert (peer->msgs);
1577
1578    /* is this a valid piece? */
1579    if (pieceIndex >= t->tor->info.pieceCount)
1580        return;
1581
1582    /* don't ask for it if we've already got it */
1583    if (tr_cpPieceIsComplete (t->tor->completion, pieceIndex))
1584        return;
1585
1586    /* don't ask for it if they don't have it */
1587    if (!tr_bitfieldHas (peer->have, pieceIndex))
1588        return;
1589
1590    /* don't ask for it if we're choked and it's not fast */
1591    if (!isFastAllowed && peer->clientIsChoked)
1592        return;
1593
1594    /* request the blocks that we don't have in this piece */
1595    {
1596        tr_block_index_t b;
1597        tr_block_index_t first;
1598        tr_block_index_t last;
1599        const tr_torrent * tor = t->tor;
1600
1601        tr_torGetPieceBlockRange (t->tor, pieceIndex, &first, &last);
1602
1603        for (b=first; b<=last; ++b)
1604        {
1605            if (!tr_cpBlockIsComplete (tor->completion, b))
1606            {
1607                const uint32_t offset = getBlockOffsetInPiece (tor, b);
1608                const uint32_t length = tr_torBlockCountBytes (tor, b);
1609                tr_peerMsgsAddRequest (peer->msgs, pieceIndex, offset, length);
1610                incrementPieceRequests (t, pieceIndex);
1611            }
1612        }
1613    }
1614#endif
1615}
1616
1617static void
1618removeRequestFromTables (tr_swarm * s, tr_block_index_t block, const tr_peer * peer)
1619{
1620  requestListRemove (s, block, peer);
1621  pieceListRemoveRequest (s, block);
1622}
1623
1624/* peer choked us, or maybe it disconnected.
1625   either way we need to remove all its requests */
1626static void
1627peerDeclinedAllRequests (tr_swarm * s, const tr_peer * peer)
1628{
1629  int i, n;
1630  tr_block_index_t * blocks = tr_new (tr_block_index_t, s->requestCount);
1631
1632  for (i=n=0; i<s->requestCount; ++i)
1633    if (peer == s->requests[i].peer)
1634      blocks[n++] = s->requests[i].block;
1635
1636  for (i=0; i<n; ++i)
1637    removeRequestFromTables (s, blocks[i], peer);
1638
1639  tr_free (blocks);
1640}
1641
1642static void
1643cancelAllRequestsForBlock (tr_swarm          * s,
1644                           tr_block_index_t    block,
1645                           tr_peer           * no_notify)
1646{
1647  int i;
1648  int peerCount;
1649  tr_peer ** peers;
1650  tr_ptrArray peerArr;
1651
1652  peerArr = TR_PTR_ARRAY_INIT;
1653  getBlockRequestPeers (s, block, &peerArr);
1654  peers = (tr_peer **) tr_ptrArrayPeek (&peerArr, &peerCount);
1655  for (i=0; i<peerCount; ++i)
1656    {
1657      tr_peer * p = peers[i];
1658
1659      if ((p != no_notify) && tr_isPeerMsgs (p))
1660        {
1661          tr_historyAdd (&p->cancelsSentToPeer, tr_time (), 1);
1662          tr_peerMsgsCancel (PEER_MSGS(p), block);
1663        }
1664
1665      removeRequestFromTables (s, block, p);
1666    }
1667
1668  tr_ptrArrayDestruct (&peerArr, NULL);
1669}
1670
1671void
1672tr_peerMgrPieceCompleted (tr_torrent * tor, tr_piece_index_t p)
1673{
1674  int i;
1675  bool pieceCameFromPeers = false;
1676  tr_swarm * const s = tor->swarm;
1677  const int n = tr_ptrArraySize (&s->peers);
1678
1679  /* walk through our peers */
1680  for (i=0; i<n; ++i)
1681    {
1682      tr_peer * peer = tr_ptrArrayNth (&s->peers, i);
1683
1684      /* notify the peer that we now have this piece */
1685      tr_peerMsgsHave (PEER_MSGS(peer), p);
1686
1687      if (!pieceCameFromPeers)
1688        pieceCameFromPeers = tr_bitfieldHas (&peer->blame, p);
1689    }
1690
1691  if (pieceCameFromPeers) /* webseed downloads don't belong in announce totals */
1692    tr_announcerAddBytes (tor, TR_ANN_DOWN, tr_torPieceCountBytes (tor, p));
1693
1694  /* bookkeeping */
1695  pieceListRemovePiece (s, p);
1696  s->needsCompletenessCheck = true;
1697}
1698
1699static void
1700peerCallbackFunc (tr_peer * peer, const tr_peer_event * e, void * vs)
1701{
1702  tr_swarm * s = vs;
1703
1704  swarmLock (s);
1705
1706  assert (peer != NULL);
1707
1708  switch (e->eventType)
1709    {
1710      case TR_PEER_PEER_GOT_PIECE_DATA:
1711        {
1712          const time_t now = tr_time ();
1713          tr_torrent * tor = s->tor;
1714
1715          tor->uploadedCur += e->length;
1716          tr_announcerAddBytes (tor, TR_ANN_UP, e->length);
1717          tr_torrentSetActivityDate (tor, now);
1718          tr_torrentSetDirty (tor);
1719          tr_statsAddUploaded (tor->session, e->length);
1720
1721          if (peer->atom != NULL)
1722            peer->atom->piece_data_time = now;
1723
1724          break;
1725        }
1726
1727      case TR_PEER_CLIENT_GOT_PIECE_DATA:
1728        {
1729          const time_t now = tr_time ();
1730          tr_torrent * tor = s->tor;
1731
1732          tor->downloadedCur += e->length;
1733          tr_torrentSetActivityDate (tor, now);
1734          tr_torrentSetDirty (tor);
1735
1736          tr_statsAddDownloaded (tor->session, e->length);
1737
1738          if (peer->atom != NULL)
1739            peer->atom->piece_data_time = now;
1740
1741          break;
1742        }
1743
1744      case TR_PEER_CLIENT_GOT_HAVE:
1745        if (replicationExists (s))
1746          {
1747            tr_incrReplicationOfPiece (s, e->pieceIndex);
1748            assertReplicationCountIsExact (s);
1749          }
1750        break;
1751
1752      case TR_PEER_CLIENT_GOT_HAVE_ALL:
1753        if (replicationExists (s))
1754          {
1755            tr_incrReplication (s);
1756            assertReplicationCountIsExact (s);
1757          }
1758        break;
1759
1760      case TR_PEER_CLIENT_GOT_HAVE_NONE:
1761        /* noop */
1762        break;
1763
1764      case TR_PEER_CLIENT_GOT_BITFIELD:
1765        assert (e->bitfield != NULL);
1766        if (replicationExists (s))
1767          {
1768            tr_incrReplicationFromBitfield (s, e->bitfield);
1769            assertReplicationCountIsExact (s);
1770          }
1771        break;
1772
1773      case TR_PEER_CLIENT_GOT_REJ:
1774        {
1775          tr_block_index_t b = _tr_block (s->tor, e->pieceIndex, e->offset);
1776          if (b < s->tor->blockCount)
1777            removeRequestFromTables (s, b, peer);
1778          else
1779            tordbg (s, "Peer %s sent an out-of-range reject message",
1780                    tr_atomAddrStr (peer->atom));
1781          break;
1782        }
1783
1784      case TR_PEER_CLIENT_GOT_CHOKE:
1785        peerDeclinedAllRequests (s, peer);
1786        break;
1787
1788      case TR_PEER_CLIENT_GOT_PORT:
1789        if (peer->atom)
1790          peer->atom->port = e->port;
1791        break;
1792
1793      case TR_PEER_CLIENT_GOT_SUGGEST:
1794        peerSuggestedPiece (s, peer, e->pieceIndex, false);
1795        break;
1796
1797      case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1798        peerSuggestedPiece (s, peer, e->pieceIndex, true);
1799        break;
1800
1801      case TR_PEER_CLIENT_GOT_BLOCK:
1802        {
1803          tr_torrent * tor = s->tor;
1804          const tr_piece_index_t p = e->pieceIndex;
1805          const tr_block_index_t block = _tr_block (tor, p, e->offset);
1806          cancelAllRequestsForBlock (s, block, peer);
1807          tr_historyAdd (&peer->blocksSentToClient, tr_time(), 1);
1808          pieceListResortPiece (s, pieceListLookup (s, p));
1809          tr_torrentGotBlock (tor, block);
1810          break;
1811        }
1812
1813      case TR_PEER_ERROR:
1814        if ((e->err == ERANGE) || (e->err == EMSGSIZE) || (e->err == ENOTCONN))
1815          {
1816            /* some protocol error from the peer */
1817            peer->doPurge = 1;
1818            tordbg (s, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1819                    tr_atomAddrStr (peer->atom));
1820          }
1821        else
1822          {
1823            tordbg (s, "unhandled error: %s", tr_strerror (e->err));
1824          }
1825        break;
1826
1827      default:
1828        assert (0);
1829    }
1830
1831  swarmUnlock (s);
1832}
1833
1834static int
1835getDefaultShelfLife (uint8_t from)
1836{
1837    /* in general, peers obtained from firsthand contact
1838     * are better than those from secondhand, etc etc */
1839    switch (from)
1840    {
1841        case TR_PEER_FROM_INCOMING : return 60 * 60 * 6;
1842        case TR_PEER_FROM_LTEP     : return 60 * 60 * 6;
1843        case TR_PEER_FROM_TRACKER  : return 60 * 60 * 3;
1844        case TR_PEER_FROM_DHT      : return 60 * 60 * 3;
1845        case TR_PEER_FROM_PEX      : return 60 * 60 * 2;
1846        case TR_PEER_FROM_RESUME   : return 60 * 60;
1847        case TR_PEER_FROM_LPD      : return 10 * 60;
1848        default                    : return 60 * 60;
1849    }
1850}
1851
1852static void
1853ensureAtomExists (tr_swarm          * s,
1854                  const tr_address  * addr,
1855                  const tr_port       port,
1856                  const uint8_t       flags,
1857                  const int8_t        seedProbability,
1858                  const uint8_t       from)
1859{
1860  struct peer_atom * a;
1861
1862  assert (tr_address_is_valid (addr));
1863  assert (from < TR_PEER_FROM__MAX);
1864
1865  a = getExistingAtom (s, addr);
1866
1867  if (a == NULL)
1868    {
1869      const int jitter = tr_cryptoWeakRandInt (60*10);
1870      a = tr_new0 (struct peer_atom, 1);
1871      a->addr = *addr;
1872      a->port = port;
1873      a->flags = flags;
1874      a->fromFirst = from;
1875      a->fromBest = from;
1876      a->shelf_date = tr_time () + getDefaultShelfLife (from) + jitter;
1877      a->blocklisted = -1;
1878      atomSetSeedProbability (a, seedProbability);
1879      tr_ptrArrayInsertSorted (&s->pool, a, compareAtomsByAddress);
1880
1881      tordbg (s, "got a new atom: %s", tr_atomAddrStr (a));
1882    }
1883  else
1884    {
1885      if (from < a->fromBest)
1886        a->fromBest = from;
1887
1888      if (a->seedProbability == -1)
1889        atomSetSeedProbability (a, seedProbability);
1890
1891      a->flags |= flags;
1892    }
1893}
1894
1895static int
1896getMaxPeerCount (const tr_torrent * tor)
1897{
1898  return tor->maxConnectedPeers;
1899}
1900
1901static int
1902getPeerCount (const tr_swarm * s)
1903{
1904  return tr_ptrArraySize (&s->peers);/* + tr_ptrArraySize (&t->outgoingHandshakes); */
1905}
1906
1907
1908static void
1909createBitTorrentPeer (tr_torrent       * tor,
1910                      struct tr_peerIo * io,
1911                      struct peer_atom * atom,
1912                      tr_quark           client)
1913{
1914  tr_peer * peer;
1915  tr_peerMsgs * msgs;
1916  tr_swarm * swarm;
1917
1918  assert (atom != NULL);
1919  assert (tr_isTorrent (tor));
1920  assert (tor->swarm != NULL);
1921
1922  swarm = tor->swarm;
1923
1924  peer = (tr_peer*) tr_peerMsgsNew (tor, io, peerCallbackFunc, swarm);
1925  peer->atom = atom;
1926  peer->client = client;
1927  atom->peer = peer;
1928
1929  tr_ptrArrayInsertSorted (&swarm->peers, peer, peerCompare);
1930  ++swarm->stats.peerCount;
1931  ++swarm->stats.peerFromCount[atom->fromFirst];
1932
1933  assert (swarm->stats.peerCount == tr_ptrArraySize (&swarm->peers));
1934  assert (swarm->stats.peerFromCount[atom->fromFirst] <= swarm->stats.peerCount);
1935
1936  msgs = PEER_MSGS (peer);
1937  tr_peerMsgsUpdateActive (msgs, TR_UP);
1938  tr_peerMsgsUpdateActive (msgs, TR_DOWN);
1939}
1940
1941
1942/* FIXME: this is kind of a mess. */
1943static bool
1944myHandshakeDoneCB (tr_handshake  * handshake,
1945                   tr_peerIo     * io,
1946                   bool            readAnythingFromPeer,
1947                   bool            isConnected,
1948                   const uint8_t * peer_id,
1949                   void          * vmanager)
1950{
1951  bool ok = isConnected;
1952  bool success = false;
1953  tr_port port;
1954  const tr_address * addr;
1955  tr_peerMgr * manager = vmanager;
1956  tr_swarm  * s;
1957
1958  assert (io);
1959  assert (tr_isBool (ok));
1960
1961  s = tr_peerIoHasTorrentHash (io)
1962    ? getExistingSwarm (manager, tr_peerIoGetTorrentHash (io))
1963    : NULL;
1964
1965  if (tr_peerIoIsIncoming (io))
1966    tr_ptrArrayRemoveSortedPointer (&manager->incomingHandshakes,
1967                                    handshake, handshakeCompare);
1968  else if (s)
1969    tr_ptrArrayRemoveSortedPointer (&s->outgoingHandshakes,
1970                                    handshake, handshakeCompare);
1971
1972  if (s)
1973    swarmLock (s);
1974
1975  addr = tr_peerIoGetAddress (io, &port);
1976
1977  if (!ok || !s || !s->isRunning)
1978    {
1979      if (s)
1980        {
1981          struct peer_atom * atom = getExistingAtom (s, addr);
1982          if (atom)
1983            {
1984              ++atom->numFails;
1985
1986              if (!readAnythingFromPeer)
1987                {
1988                  tordbg (s, "marking peer %s as unreachable... numFails is %d", tr_atomAddrStr (atom), (int)atom->numFails);
1989                  atom->flags2 |= MYFLAG_UNREACHABLE;
1990                }
1991            }
1992        }
1993    }
1994  else /* looking good */
1995    {
1996      struct peer_atom * atom;
1997
1998      ensureAtomExists (s, addr, port, 0, -1, TR_PEER_FROM_INCOMING);
1999      atom = getExistingAtom (s, addr);
2000      atom->time = tr_time ();
2001      atom->piece_data_time = 0;
2002      atom->lastConnectionAt = tr_time ();
2003
2004      if (!tr_peerIoIsIncoming (io))
2005        {
2006          atom->flags |= ADDED_F_CONNECTABLE;
2007          atom->flags2 &= ~MYFLAG_UNREACHABLE;
2008        }
2009
2010      /* In principle, this flag specifies whether the peer groks uTP,
2011         not whether it's currently connected over uTP. */
2012      if (io->utp_socket)
2013        atom->flags |= ADDED_F_UTP_FLAGS;
2014
2015      if (atom->flags2 & MYFLAG_BANNED)
2016        {
2017          tordbg (s, "banned peer %s tried to reconnect",
2018                  tr_atomAddrStr (atom));
2019        }
2020      else if (tr_peerIoIsIncoming (io) && (getPeerCount (s) >= getMaxPeerCount (s->tor)))
2021        {
2022        }
2023      else
2024        {
2025          tr_peer * peer = atom->peer;
2026
2027          if (peer) /* we already have this peer */
2028            {
2029            }
2030          else
2031            {
2032              tr_quark client;
2033              tr_peerIo * io;
2034              char buf[128];
2035
2036              if (peer_id != NULL)
2037                client = tr_quark_new (tr_clientForId (buf, sizeof (buf), peer_id), -1);
2038              else
2039                client = TR_KEY_NONE;
2040
2041              io = tr_handshakeStealIO (handshake); /* this steals its refcount too, which is
2042                                                       balanced by our unref in peerDelete ()  */
2043              tr_peerIoSetParent (io, &s->tor->bandwidth);
2044              createBitTorrentPeer (s->tor, io, atom, client);
2045
2046              success = true;
2047            }
2048        }
2049    }
2050
2051  if (s != NULL)
2052    swarmUnlock (s);
2053
2054  return success;
2055}
2056
2057void
2058tr_peerMgrAddIncoming (tr_peerMgr       * manager,
2059                       tr_address       * addr,
2060                       tr_port            port,
2061                       int                socket,
2062                       struct UTPSocket * utp_socket)
2063{
2064  tr_session * session;
2065
2066  managerLock (manager);
2067
2068  assert (tr_isSession (manager->session));
2069  session = manager->session;
2070
2071  if (tr_sessionIsAddressBlocked (session, addr))
2072    {
2073      tr_logAddDebug ("Banned IP address \"%s\" tried to connect to us", tr_address_to_string (addr));
2074      if (socket >= 0)
2075        tr_netClose (session, socket);
2076      else
2077        UTP_Close (utp_socket);
2078    }
2079  else if (getExistingHandshake (&manager->incomingHandshakes, addr))
2080    {
2081      if (socket >= 0)
2082        tr_netClose (session, socket);
2083      else
2084        UTP_Close (utp_socket);
2085    }
2086  else /* we don't have a connection to them yet... */
2087    {
2088      tr_peerIo *    io;
2089      tr_handshake * handshake;
2090
2091      io = tr_peerIoNewIncoming (session, &session->bandwidth, addr, port, socket, utp_socket);
2092
2093      handshake = tr_handshakeNew (io,
2094                                   session->encryptionMode,
2095                                   myHandshakeDoneCB,
2096                                   manager);
2097
2098      tr_peerIoUnref (io); /* balanced by the implicit ref in tr_peerIoNewIncoming () */
2099
2100      tr_ptrArrayInsertSorted (&manager->incomingHandshakes, handshake,
2101                               handshakeCompare);
2102    }
2103
2104  managerUnlock (manager);
2105}
2106
2107void
2108tr_peerMgrAddPex (tr_torrent * tor, uint8_t from,
2109                  const tr_pex * pex, int8_t seedProbability)
2110{
2111  if (tr_isPex (pex)) /* safeguard against corrupt data */
2112    {
2113      tr_swarm * s = tor->swarm;
2114      managerLock (s->manager);
2115
2116      if (!tr_sessionIsAddressBlocked (s->manager->session, &pex->addr))
2117        if (tr_address_is_valid_for_peers (&pex->addr, pex->port))
2118          ensureAtomExists (s, &pex->addr, pex->port, pex->flags, seedProbability, from);
2119
2120      managerUnlock (s->manager);
2121    }
2122}
2123
2124void
2125tr_peerMgrMarkAllAsSeeds (tr_torrent * tor)
2126{
2127  tr_swarm * s = tor->swarm;
2128  const int n = tr_ptrArraySize (&s->pool);
2129  struct peer_atom ** it = (struct peer_atom**) tr_ptrArrayBase (&s->pool);
2130  struct peer_atom ** end = it + n;
2131
2132  while (it != end)
2133    atomSetSeed (s, *it++);
2134}
2135
2136tr_pex *
2137tr_peerMgrCompactToPex (const void    * compact,
2138                        size_t          compactLen,
2139                        const uint8_t * added_f,
2140                        size_t          added_f_len,
2141                        size_t        * pexCount)
2142{
2143  size_t i;
2144  size_t n = compactLen / 6;
2145  const uint8_t * walk = compact;
2146  tr_pex * pex = tr_new0 (tr_pex, n);
2147
2148  for (i=0; i<n; ++i)
2149    {
2150      pex[i].addr.type = TR_AF_INET;
2151      memcpy (&pex[i].addr.addr, walk, 4); walk += 4;
2152      memcpy (&pex[i].port, walk, 2); walk += 2;
2153      if (added_f && (n == added_f_len))
2154        pex[i].flags = added_f[i];
2155    }
2156
2157  *pexCount = n;
2158  return pex;
2159}
2160
2161tr_pex *
2162tr_peerMgrCompact6ToPex (const void    * compact,
2163                         size_t          compactLen,
2164                         const uint8_t * added_f,
2165                         size_t          added_f_len,
2166                         size_t        * pexCount)
2167{
2168  size_t i;
2169  size_t n = compactLen / 18;
2170  const uint8_t * walk = compact;
2171  tr_pex * pex = tr_new0 (tr_pex, n);
2172
2173  for (i=0; i<n; ++i)
2174    {
2175      pex[i].addr.type = TR_AF_INET6;
2176      memcpy (&pex[i].addr.addr.addr6.s6_addr, walk, 16); walk += 16;
2177      memcpy (&pex[i].port, walk, 2); walk += 2;
2178      if (added_f && (n == added_f_len))
2179        pex[i].flags = added_f[i];
2180    }
2181
2182  *pexCount = n;
2183  return pex;
2184}
2185
2186tr_pex *
2187tr_peerMgrArrayToPex (const void  * array,
2188                      size_t        arrayLen,
2189                      size_t      * pexCount)
2190{
2191  size_t i;
2192  size_t n = arrayLen / (sizeof (tr_address) + 2);
2193  const uint8_t * walk = array;
2194  tr_pex * pex = tr_new0 (tr_pex, n);
2195
2196  for (i=0 ; i<n ; ++i)
2197    {
2198      memcpy (&pex[i].addr, walk, sizeof (tr_address));
2199      memcpy (&pex[i].port, walk + sizeof (tr_address), 2);
2200      pex[i].flags = 0x00;
2201      walk += sizeof (tr_address) + 2;
2202    }
2203
2204  *pexCount = n;
2205  return pex;
2206}
2207
2208/**
2209***
2210**/
2211
2212void
2213tr_peerMgrGotBadPiece (tr_torrent * tor, tr_piece_index_t pieceIndex)
2214{
2215  int i;
2216  int n;
2217  tr_swarm * s = tor->swarm;
2218  const uint32_t byteCount = tr_torPieceCountBytes (tor, pieceIndex);
2219
2220  for (i=0, n=tr_ptrArraySize(&s->peers); i!=n; ++i)
2221    {
2222      tr_peer * peer = tr_ptrArrayNth (&s->peers, i);
2223
2224      if (tr_bitfieldHas (&peer->blame, pieceIndex))
2225        {
2226          tordbg (s, "peer %s contributed to corrupt piece (%d); now has %d strikes",
2227                  tr_atomAddrStr(peer->atom), pieceIndex, (int)peer->strikes + 1);
2228          addStrike (s, peer);
2229        }
2230    }
2231
2232
2233  tr_announcerAddBytes (tor, TR_ANN_CORRUPT, byteCount);
2234}
2235
2236int
2237tr_pexCompare (const void * va, const void * vb)
2238{
2239  int i;
2240  const tr_pex * a = va;
2241  const tr_pex * b = vb;
2242
2243  assert (tr_isPex (a));
2244  assert (tr_isPex (b));
2245
2246  if ((i = tr_address_compare (&a->addr, &b->addr)))
2247    return i;
2248
2249  if (a->port != b->port)
2250    return a->port < b->port ? -1 : 1;
2251
2252  return 0;
2253}
2254
2255/* better goes first */
2256static int
2257compareAtomsByUsefulness (const void * va, const void *vb)
2258{
2259  const struct peer_atom * a = * (const struct peer_atom**) va;
2260  const struct peer_atom * b = * (const struct peer_atom**) vb;
2261
2262  assert (tr_isAtom (a));
2263  assert (tr_isAtom (b));
2264
2265  if (a->piece_data_time != b->piece_data_time)
2266    return a->piece_data_time > b->piece_data_time ? -1 : 1;
2267  if (a->fromBest != b->fromBest)
2268    return a->fromBest < b->fromBest ? -1 : 1;
2269  if (a->numFails != b->numFails)
2270    return a->numFails < b->numFails ? -1 : 1;
2271
2272  return 0;
2273}
2274
2275static bool
2276isAtomInteresting (const tr_torrent * tor, struct peer_atom * atom)
2277{
2278  if (tr_torrentIsSeed (tor) && atomIsSeed (atom))
2279    return false;
2280
2281  if (peerIsInUse (tor->swarm, atom))
2282    return true;
2283
2284  if (isAtomBlocklisted (tor->session, atom))
2285    return false;
2286
2287  if (atom->flags2 & MYFLAG_BANNED)
2288    return false;
2289
2290  return true;
2291}
2292
2293int
2294tr_peerMgrGetPeers (tr_torrent   * tor,
2295                    tr_pex      ** setme_pex,
2296                    uint8_t        af,
2297                    uint8_t        list_mode,
2298                    int            maxCount)
2299{
2300  int i;
2301  int n;
2302  int count = 0;
2303  int atomCount = 0;
2304  const tr_swarm * s = tor->swarm;
2305  struct peer_atom ** atoms = NULL;
2306  tr_pex * pex;
2307  tr_pex * walk;
2308
2309  assert (tr_isTorrent (tor));
2310  assert (setme_pex != NULL);
2311  assert (af==TR_AF_INET || af==TR_AF_INET6);
2312  assert (list_mode==TR_PEERS_CONNECTED || list_mode==TR_PEERS_INTERESTING);
2313
2314  managerLock (s->manager);
2315
2316  /**
2317  ***  build a list of atoms
2318  **/
2319
2320  if (list_mode == TR_PEERS_CONNECTED) /* connected peers only */
2321    {
2322      int i;
2323      const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase (&s->peers);
2324      atomCount = tr_ptrArraySize (&s->peers);
2325      atoms = tr_new (struct peer_atom *, atomCount);
2326      for (i=0; i<atomCount; ++i)
2327        atoms[i] = peers[i]->atom;
2328    }
2329  else /* TR_PEERS_INTERESTING */
2330    {
2331      int i;
2332      struct peer_atom ** atomBase = (struct peer_atom**) tr_ptrArrayBase (&s->pool);
2333      n = tr_ptrArraySize (&s->pool);
2334      atoms = tr_new (struct peer_atom *, n);
2335      for (i=0; i<n; ++i)
2336        if (isAtomInteresting (tor, atomBase[i]))
2337          atoms[atomCount++] = atomBase[i];
2338    }
2339
2340  qsort (atoms, atomCount, sizeof (struct peer_atom *), compareAtomsByUsefulness);
2341
2342  /**
2343  ***  add the first N of them into our return list
2344  **/
2345
2346  n = MIN (atomCount, maxCount);
2347  pex = walk = tr_new0 (tr_pex, n);
2348
2349  for (i=0; i<atomCount && count<n; ++i)
2350    {
2351      const struct peer_atom * atom = atoms[i];
2352      if (atom->addr.type == af)
2353        {
2354          assert (tr_address_is_valid (&atom->addr));
2355          walk->addr = atom->addr;
2356          walk->port = atom->port;
2357          walk->flags = atom->flags;
2358          ++count;
2359          ++walk;
2360        }
2361    }
2362
2363  qsort (pex, count, sizeof (tr_pex), tr_pexCompare);
2364
2365  assert ((walk - pex) == count);
2366  *setme_pex = pex;
2367
2368  /* cleanup */
2369  tr_free (atoms);
2370  managerUnlock (s->manager);
2371  return count;
2372}
2373
2374static void atomPulse    (int, short, void *);
2375static void bandwidthPulse (int, short, void *);
2376static void rechokePulse (int, short, void *);
2377static void reconnectPulse (int, short, void *);
2378
2379static struct event *
2380createTimer (tr_session * session, int msec, void (*callback)(int, short, void *), void * cbdata)
2381{
2382  struct event * timer = evtimer_new (session->event_base, callback, cbdata);
2383  tr_timerAddMsec (timer, msec);
2384  return timer;
2385}
2386
2387static void
2388ensureMgrTimersExist (struct tr_peerMgr * m)
2389{
2390  if (m->atomTimer == NULL)
2391    m->atomTimer = createTimer (m->session, ATOM_PERIOD_MSEC, atomPulse, m);
2392
2393  if (m->bandwidthTimer == NULL)
2394    m->bandwidthTimer = createTimer (m->session, BANDWIDTH_PERIOD_MSEC, bandwidthPulse, m);
2395
2396  if (m->rechokeTimer == NULL)
2397    m->rechokeTimer = createTimer (m->session, RECHOKE_PERIOD_MSEC, rechokePulse, m);
2398
2399  if (m->refillUpkeepTimer == NULL)
2400    m->refillUpkeepTimer = createTimer (m->session, REFILL_UPKEEP_PERIOD_MSEC, refillUpkeep, m);
2401}
2402
2403void
2404tr_peerMgrStartTorrent (tr_torrent * tor)
2405{
2406  tr_swarm * s;
2407
2408  assert (tr_isTorrent (tor));
2409  assert (tr_torrentIsLocked (tor));
2410
2411  s = tor->swarm;
2412  ensureMgrTimersExist (s->manager);
2413
2414  s->isRunning = true;
2415  s->maxPeers = tor->maxConnectedPeers;
2416  s->pieceSortState = PIECES_UNSORTED;
2417
2418  rechokePulse (0, 0, s->manager);
2419}
2420
2421static void removeAllPeers (tr_swarm *);
2422
2423static void
2424stopSwarm (tr_swarm * swarm)
2425{
2426  swarm->isRunning = false;
2427
2428  replicationFree (swarm);
2429  invalidatePieceSorting (swarm);
2430
2431  removeAllPeers (swarm);
2432
2433  /* disconnect the handshakes. handshakeAbort calls handshakeDoneCB (),
2434   * which removes the handshake from t->outgoingHandshakes... */
2435  while (!tr_ptrArrayEmpty (&swarm->outgoingHandshakes))
2436    tr_handshakeAbort (tr_ptrArrayNth (&swarm->outgoingHandshakes, 0));
2437}
2438
2439void
2440tr_peerMgrStopTorrent (tr_torrent * tor)
2441{
2442  assert (tr_isTorrent (tor));
2443  assert (tr_torrentIsLocked (tor));
2444
2445  stopSwarm (tor->swarm);
2446}
2447
2448void
2449tr_peerMgrAddTorrent (tr_peerMgr * manager, tr_torrent * tor)
2450{
2451  assert (tr_isTorrent (tor));
2452  assert (tr_torrentIsLocked (tor));
2453  assert (tor->swarm == NULL);
2454
2455  tor->swarm = swarmNew (manager, tor);
2456}
2457
2458void
2459tr_peerMgrRemoveTorrent (tr_torrent * tor)
2460{
2461  assert (tr_isTorrent (tor));
2462  assert (tr_torrentIsLocked (tor));
2463
2464  stopSwarm (tor->swarm);
2465  swarmFree (tor->swarm);
2466}
2467
2468void
2469tr_peerUpdateProgress (tr_torrent * tor, tr_peer * peer)
2470{
2471  const tr_bitfield * have = &peer->have;
2472
2473  if (tr_bitfieldHasAll (have))
2474    {
2475      peer->progress = 1.0;
2476    }
2477  else if (tr_bitfieldHasNone (have))
2478    {
2479      peer->progress = 0.0;
2480    }
2481  else
2482    {
2483      const float true_count = tr_bitfieldCountTrueBits (have);
2484
2485      if (tr_torrentHasMetadata (tor))
2486        {
2487          peer->progress = true_count / tor->info.pieceCount;
2488        }
2489      else /* without pieceCount, this result is only a best guess... */
2490        {
2491          peer->progress = true_count / (have->bit_count + 1);
2492        }
2493    }
2494
2495  /* clamp the progress range */
2496  if (peer->progress < 0.0)
2497    peer->progress = 0.0;
2498  if (peer->progress > 1.0)
2499    peer->progress = 1.0;
2500
2501  if (peer->atom && (peer->progress >= 1.0))
2502    atomSetSeed (tor->swarm, peer->atom);
2503}
2504
2505void
2506tr_peerMgrOnTorrentGotMetainfo (tr_torrent * tor)
2507{
2508  int i;
2509  int peerCount;
2510  tr_peer ** peers;
2511
2512  /* the webseed list may have changed... */
2513  rebuildWebseedArray (tor->swarm, tor);
2514
2515  /* some peer_msgs' progress fields may not be accurate if we
2516     didn't have the metadata before now... so refresh them all... */
2517  peerCount = tr_ptrArraySize (&tor->swarm->peers);
2518  peers = (tr_peer**) tr_ptrArrayBase (&tor->swarm->peers);
2519  for (i=0; i<peerCount; ++i)
2520    tr_peerUpdateProgress (tor, peers[i]);
2521
2522  /* update the bittorrent peers' willingnes... */
2523  for (i=0; i<peerCount; ++i)
2524    {
2525      tr_peerMsgsUpdateActive (tr_peerMsgsCast(peers[i]), TR_UP);
2526      tr_peerMsgsUpdateActive (tr_peerMsgsCast(peers[i]), TR_DOWN);
2527    }
2528}
2529
2530void
2531tr_peerMgrTorrentAvailability (const tr_torrent  * tor,
2532                               int8_t            * tab,
2533                               unsigned int        tabCount)
2534{
2535  assert (tr_isTorrent (tor));
2536  assert (tab != NULL);
2537  assert (tabCount > 0);
2538
2539  memset (tab, 0, tabCount);
2540
2541  if (tr_torrentHasMetadata (tor))
2542    {
2543      tr_piece_index_t i;
2544      const int peerCount = tr_ptrArraySize (&tor->swarm->peers);
2545      const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase (&tor->swarm->peers);
2546      const float interval = tor->info.pieceCount / (float)tabCount;
2547      const bool isSeed = tr_cpGetStatus (&tor->completion) == TR_SEED;
2548
2549      for (i=0; i<tabCount; ++i)
2550        {
2551          const int piece = i * interval;
2552
2553          if (isSeed || tr_cpPieceIsComplete (&tor->completion, piece))
2554            {
2555              tab[i] = -1;
2556            }
2557          else if (peerCount)
2558            {
2559              int j;
2560              for (j=0; j<peerCount; ++j)
2561                if (tr_bitfieldHas (&peers[j]->have, piece))
2562                  ++tab[i];
2563            }
2564        }
2565    }
2566}
2567
2568void
2569tr_swarmGetStats (const tr_swarm * swarm, tr_swarm_stats * setme)
2570{
2571  assert (swarm != NULL);
2572  assert (setme != NULL);
2573
2574  *setme = swarm->stats;
2575}
2576
2577void
2578tr_swarmIncrementActivePeers (tr_swarm * swarm, tr_direction direction, bool is_active)
2579{
2580  int n = swarm->stats.activePeerCount[direction];
2581
2582  if (is_active)
2583    ++n;
2584  else
2585    --n;
2586
2587  assert (0 <= n);
2588  assert (n <= swarm->stats.peerCount);
2589
2590  swarm->stats.activePeerCount[direction] = n;
2591}
2592
2593bool
2594tr_peerIsSeed (const tr_peer * peer)
2595{
2596  if (peer->progress >= 1.0)
2597    return true;
2598
2599  if (peer->atom && atomIsSeed (peer->atom))
2600    return true;
2601
2602  return false;
2603}
2604
2605/* count how many bytes we want that connected peers have */
2606uint64_t
2607tr_peerMgrGetDesiredAvailable (const tr_torrent * tor)
2608{
2609  size_t i;
2610  size_t n;
2611  uint64_t desiredAvailable;
2612  const tr_swarm * s;
2613
2614  assert (tr_isTorrent (tor));
2615
2616  /* common shortcuts... */
2617
2618  if (tr_torrentIsSeed (tor))
2619    return 0;
2620
2621  if (!tr_torrentHasMetadata (tor))
2622    return 0;
2623
2624  s = tor->swarm;
2625  if (s == NULL)
2626    return 0;
2627
2628  n = tr_ptrArraySize (&s->peers);
2629  if (n == 0)
2630    {
2631      return 0;
2632    }
2633  else
2634    {
2635      const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase (&s->peers);
2636      for (i=0; i<n; ++i)
2637        if (peers[i]->atom && atomIsSeed (peers[i]->atom))
2638          return tr_cpLeftUntilDone (&tor->completion);
2639    }
2640
2641  if (!s->pieceReplication || !s->pieceReplicationSize)
2642    return 0;
2643
2644  /* do it the hard way */
2645
2646  desiredAvailable = 0;
2647  for (i=0, n=MIN (tor->info.pieceCount, s->pieceReplicationSize); i<n; ++i)
2648    if (!tor->info.pieces[i].dnd && (s->pieceReplication[i] > 0))
2649      desiredAvailable += tr_cpMissingBytesInPiece (&tor->completion, i);
2650
2651  assert (desiredAvailable <= tor->info.totalSize);
2652  return desiredAvailable;
2653}
2654
2655double*
2656tr_peerMgrWebSpeeds_KBps (const tr_torrent * tor)
2657{
2658  unsigned int i;
2659  tr_swarm * s;
2660  unsigned int n;
2661  double * ret = NULL;
2662  const uint64_t now = tr_time_msec ();
2663
2664  assert (tr_isTorrent (tor));
2665
2666  s = tor->swarm;
2667  n = tr_ptrArraySize (&s->webseeds);
2668  ret = tr_new0 (double, n);
2669
2670  assert (s->manager != NULL);
2671  assert (n == tor->info.webseedCount);
2672
2673  for (i=0; i<n; ++i)
2674    {
2675      unsigned int Bps = 0;
2676      if (tr_peerIsTransferringPieces (tr_ptrArrayNth(&s->webseeds,i), now, TR_DOWN, &Bps))
2677        ret[i] = Bps / (double)tr_speed_K;
2678      else
2679        ret[i] = -1.0;
2680    }
2681
2682  return ret;
2683}
2684
2685struct tr_peer_stat *
2686tr_peerMgrPeerStats (const tr_torrent * tor, int * setmeCount)
2687{
2688  int i;
2689  int size = 0;
2690  tr_peer_stat * ret;
2691  const tr_swarm * s;
2692  tr_peer ** peers;
2693  const time_t now = tr_time ();
2694  const uint64_t now_msec = tr_time_msec ();
2695
2696  assert (tr_isTorrent (tor));
2697  assert (tor->swarm->manager != NULL);
2698
2699  s = tor->swarm;
2700  peers = (tr_peer**) tr_ptrArrayBase (&s->peers);
2701  size = tr_ptrArraySize (&s->peers);
2702  ret = tr_new0 (tr_peer_stat, size);
2703
2704  for (i=0; i<size; ++i)
2705    {
2706      char *                   pch;
2707      tr_peer *                peer = peers[i];
2708      tr_peerMsgs *            msgs = PEER_MSGS (peer);
2709      const struct peer_atom * atom = peer->atom;
2710      tr_peer_stat *           stat = ret + i;
2711
2712      tr_address_to_string_with_buf (&atom->addr, stat->addr, sizeof (stat->addr));
2713      tr_strlcpy (stat->client, tr_quark_get_string(peer->client,NULL), sizeof (stat->client));
2714      stat->port                = ntohs (peer->atom->port);
2715      stat->from                = atom->fromFirst;
2716      stat->progress            = peer->progress;
2717      stat->isUTP               = tr_peerMsgsIsUtpConnection (msgs);
2718      stat->isEncrypted         = tr_peerMsgsIsEncrypted (msgs);
2719      stat->rateToPeer_KBps     = toSpeedKBps (tr_peerGetPieceSpeed_Bps (peer, now_msec, TR_CLIENT_TO_PEER));
2720      stat->rateToClient_KBps   = toSpeedKBps (tr_peerGetPieceSpeed_Bps (peer, now_msec, TR_PEER_TO_CLIENT));
2721      stat->peerIsChoked        = tr_peerMsgsIsPeerChoked (msgs);
2722      stat->peerIsInterested    = tr_peerMsgsIsPeerInterested (msgs);
2723      stat->clientIsChoked      = tr_peerMsgsIsClientChoked (msgs);
2724      stat->clientIsInterested  = tr_peerMsgsIsClientInterested (msgs);
2725      stat->isIncoming          = tr_peerMsgsIsIncomingConnection (msgs);
2726      stat->isDownloadingFrom   = tr_peerMsgsIsActive (msgs, TR_PEER_TO_CLIENT);
2727      stat->isUploadingTo       = tr_peerMsgsIsActive (msgs, TR_CLIENT_TO_PEER);
2728      stat->isSeed              = tr_peerIsSeed (peer);
2729
2730      stat->blocksToPeer        = tr_historyGet (&peer->blocksSentToPeer,    now, CANCEL_HISTORY_SEC);
2731      stat->blocksToClient      = tr_historyGet (&peer->blocksSentToClient,  now, CANCEL_HISTORY_SEC);
2732      stat->cancelsToPeer       = tr_historyGet (&peer->cancelsSentToPeer,   now, CANCEL_HISTORY_SEC);
2733      stat->cancelsToClient     = tr_historyGet (&peer->cancelsSentToClient, now, CANCEL_HISTORY_SEC);
2734
2735      stat->pendingReqsToPeer   = peer->pendingReqsToPeer;
2736      stat->pendingReqsToClient = peer->pendingReqsToClient;
2737
2738      pch = stat->flagStr;
2739      if (stat->isUTP) *pch++ = 'T';
2740      if (s->optimistic == msgs) *pch++ = 'O';
2741      if (stat->isDownloadingFrom) *pch++ = 'D';
2742      else if (stat->clientIsInterested) *pch++ = 'd';
2743      if (stat->isUploadingTo) *pch++ = 'U';
2744      else if (stat->peerIsInterested) *pch++ = 'u';
2745      if (!stat->clientIsChoked && !stat->clientIsInterested) *pch++ = 'K';
2746      if (!stat->peerIsChoked && !stat->peerIsInterested) *pch++ = '?';
2747      if (stat->isEncrypted) *pch++ = 'E';
2748      if (stat->from == TR_PEER_FROM_DHT) *pch++ = 'H';
2749      else if (stat->from == TR_PEER_FROM_PEX) *pch++ = 'X';
2750      if (stat->isIncoming) *pch++ = 'I';
2751      *pch = '\0';
2752    }
2753
2754  *setmeCount = size;
2755  return ret;
2756}
2757
2758/***
2759****
2760****
2761***/
2762
2763void
2764tr_peerMgrClearInterest (tr_torrent * tor)
2765{
2766  int i;
2767  tr_swarm * s = tor->swarm;
2768  const int peerCount = tr_ptrArraySize (&s->peers);
2769
2770  assert (tr_isTorrent (tor));
2771  assert (tr_torrentIsLocked (tor));
2772
2773  for (i=0; i<peerCount; ++i)
2774    tr_peerMsgsSetInterested (tr_ptrArrayNth (&s->peers, i), false);
2775}
2776
2777/* does this peer have any pieces that we want? */
2778static bool
2779isPeerInteresting (tr_torrent     * const tor,
2780                   const bool     * const piece_is_interesting,
2781                   const tr_peer  * const peer)
2782{
2783  tr_piece_index_t i, n;
2784
2785  /* these cases should have already been handled by the calling code... */
2786  assert (!tr_torrentIsSeed (tor));
2787  assert (tr_torrentIsPieceTransferAllowed (tor, TR_PEER_TO_CLIENT));
2788
2789  if (tr_peerIsSeed (peer))
2790    return true;
2791
2792  for (i=0, n=tor->info.pieceCount; i<n; ++i)
2793    if (piece_is_interesting[i] && tr_bitfieldHas (&peer->have, i))
2794      return true;
2795
2796  return false;
2797}
2798
2799typedef enum
2800{
2801  RECHOKE_STATE_GOOD,
2802  RECHOKE_STATE_UNTESTED,
2803  RECHOKE_STATE_BAD
2804}
2805tr_rechoke_state;
2806
2807struct tr_rechoke_info
2808{
2809  tr_peer * peer;
2810  int salt;
2811  int rechoke_state;
2812};
2813
2814static int
2815compare_rechoke_info (const void * va, const void * vb)
2816{
2817  const struct tr_rechoke_info * a = va;
2818  const struct tr_rechoke_info * b = vb;
2819
2820  if (a->rechoke_state != b->rechoke_state)
2821    return a->rechoke_state - b->rechoke_state;
2822
2823  return a->salt - b->salt;
2824}
2825
2826/* determines who we send "interested" messages to */
2827static void
2828rechokeDownloads (tr_swarm * s)
2829{
2830  int i;
2831  int maxPeers = 0;
2832  int rechoke_count = 0;
2833  struct tr_rechoke_info * rechoke = NULL;
2834  const int MIN_INTERESTING_PEERS = 5;
2835  const int peerCount = tr_ptrArraySize (&s->peers);
2836  const time_t now = tr_time ();
2837
2838  /* some cases where this function isn't necessary */
2839  if (tr_torrentIsSeed (s->tor))
2840    return;
2841  if (!tr_torrentIsPieceTransferAllowed (s->tor, TR_PEER_TO_CLIENT))
2842    return;
2843
2844  /* decide HOW MANY peers to be interested in */
2845  {
2846    int blocks = 0;
2847    int cancels = 0;
2848    time_t timeSinceCancel;
2849
2850    /* Count up how many blocks & cancels each peer has.
2851     *
2852     * There are two situations where we send out cancels --
2853     *
2854     * 1. We've got unresponsive peers, which is handled by deciding
2855     *    -which- peers to be interested in.
2856     *
2857     * 2. We've hit our bandwidth cap, which is handled by deciding
2858     *    -how many- peers to be interested in.
2859     *
2860     * We're working on 2. here, so we need to ignore unresponsive
2861     * peers in our calculations lest they confuse Transmission into
2862     * thinking it's hit its bandwidth cap.
2863     */
2864    for (i=0; i<peerCount; ++i)
2865      {
2866        const tr_peer * peer = tr_ptrArrayNth (&s->peers, i);
2867        const int b = tr_historyGet (&peer->blocksSentToClient, now, CANCEL_HISTORY_SEC);
2868        const int c = tr_historyGet (&peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC);
2869
2870        if (b == 0) /* ignore unresponsive peers, as described above */
2871          continue;
2872
2873        blocks += b;
2874        cancels += c;
2875      }
2876
2877    if (cancels > 0)
2878      {
2879        /* cancelRate: of the block requests we've recently made, the percentage we cancelled.
2880         * higher values indicate more congestion. */
2881        const double cancelRate = cancels / (double)(cancels + blocks);
2882        const double mult = 1 - MIN (cancelRate, 0.5);
2883        maxPeers = s->interestedCount * mult;
2884        tordbg (s, "cancel rate is %.3f -- reducing the "
2885                   "number of peers we're interested in by %.0f percent",
2886                   cancelRate, mult * 100);
2887        s->lastCancel = now;
2888      }
2889
2890    timeSinceCancel = now - s->lastCancel;
2891    if (timeSinceCancel)
2892      {
2893        const int maxIncrease = 15;
2894        const time_t maxHistory = 2 * CANCEL_HISTORY_SEC;
2895        const double mult = MIN (timeSinceCancel, maxHistory) / (double) maxHistory;
2896        const int inc = maxIncrease * mult;
2897        maxPeers = s->maxPeers + inc;
2898        tordbg (s, "time since last cancel is %li -- increasing the "
2899                   "number of peers we're interested in by %d",
2900                   timeSinceCancel, inc);
2901      }
2902  }
2903
2904  /* don't let the previous section's number tweaking go too far... */
2905  if (maxPeers < MIN_INTERESTING_PEERS)
2906    maxPeers = MIN_INTERESTING_PEERS;
2907  if (maxPeers > s->tor->maxConnectedPeers)
2908    maxPeers = s->tor->maxConnectedPeers;
2909
2910  s->maxPeers = maxPeers;
2911
2912  if (peerCount > 0)
2913    {
2914      bool * piece_is_interesting;
2915      const tr_torrent * const tor = s->tor;
2916      const int n = tor->info.pieceCount;
2917
2918      /* build a bitfield of interesting pieces... */
2919      piece_is_interesting = tr_new (bool, n);
2920      for (i=0; i<n; i++)
2921        piece_is_interesting[i] = !tor->info.pieces[i].dnd && !tr_cpPieceIsComplete (&tor->completion, i);
2922
2923      /* decide WHICH peers to be interested in (based on their cancel-to-block ratio) */
2924      for (i=0; i<peerCount; ++i)
2925        {
2926          tr_peer * peer = tr_ptrArrayNth (&s->peers, i);
2927
2928          if (!isPeerInteresting (s->tor, piece_is_interesting, peer))
2929            {
2930              tr_peerMsgsSetInterested (PEER_MSGS(peer), false);
2931            }
2932          else
2933            {
2934              tr_rechoke_state rechoke_state;
2935              const int blocks = tr_historyGet (&peer->blocksSentToClient, now, CANCEL_HISTORY_SEC);
2936              const int cancels = tr_historyGet (&peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC);
2937
2938              if (!blocks && !cancels)
2939                rechoke_state = RECHOKE_STATE_UNTESTED;
2940              else if (!cancels)
2941                rechoke_state = RECHOKE_STATE_GOOD;
2942              else if (!blocks)
2943                rechoke_state = RECHOKE_STATE_BAD;
2944              else if ((cancels * 10) < blocks)
2945                rechoke_state = RECHOKE_STATE_GOOD;
2946              else
2947                rechoke_state = RECHOKE_STATE_BAD;
2948
2949              if (rechoke == NULL)
2950                rechoke = tr_new (struct tr_rechoke_info, peerCount);
2951
2952              rechoke[rechoke_count].peer = peer;
2953              rechoke[rechoke_count].rechoke_state = rechoke_state;
2954              rechoke[rechoke_count].salt = tr_cryptoWeakRandInt (INT_MAX);
2955              rechoke_count++;
2956            }
2957
2958        }
2959
2960      tr_free (piece_is_interesting);
2961    }
2962
2963  /* now that we know which & how many peers to be interested in... update the peer interest */
2964  qsort (rechoke, rechoke_count, sizeof (struct tr_rechoke_info), compare_rechoke_info);
2965  s->interestedCount = MIN (maxPeers, rechoke_count);
2966  for (i=0; i<rechoke_count; ++i)
2967    tr_peerMsgsSetInterested (PEER_MSGS(rechoke[i].peer), i<s->interestedCount);
2968
2969  /* cleanup */
2970  tr_free (rechoke);
2971}
2972
2973/**
2974***
2975**/
2976
2977struct ChokeData
2978{
2979  bool          isInterested;
2980  bool          wasChoked;
2981  bool          isChoked;
2982  int           rate;
2983  int           salt;
2984  tr_peerMsgs * msgs;
2985};
2986
2987static int
2988compareChoke (const void * va, const void * vb)
2989{
2990  const struct ChokeData * a = va;
2991  const struct ChokeData * b = vb;
2992
2993  if (a->rate != b->rate) /* prefer higher overall speeds */
2994    return a->rate > b->rate ? -1 : 1;
2995
2996  if (a->wasChoked != b->wasChoked) /* prefer unchoked */
2997    return a->wasChoked ? 1 : -1;
2998
2999  if (a->salt != b->salt) /* random order */
3000    return a->salt - b->salt;
3001
3002  return 0;
3003}
3004
3005/* is this a new connection? */
3006static bool
3007isNew (const tr_peerMsgs * msgs)
3008{
3009  return (msgs != NULL) && (tr_peerMsgsGetConnectionAge (msgs) < 45);
3010}
3011
3012/* get a rate for deciding which peers to choke and unchoke. */
3013static int
3014getRate (const tr_torrent * tor, struct peer_atom * atom, uint64_t now)
3015{
3016  unsigned int Bps;
3017
3018  if (tr_torrentIsSeed (tor))
3019    Bps = tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_CLIENT_TO_PEER);
3020
3021  /* downloading a private torrent... take upload speed into account
3022   * because there may only be a small window of opportunity to share */
3023  else if (tr_torrentIsPrivate (tor))
3024    Bps = tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_PEER_TO_CLIENT)
3025        + tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_CLIENT_TO_PEER);
3026
3027  /* downloading a public torrent */
3028  else
3029    Bps = tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_PEER_TO_CLIENT);
3030
3031  /* convert it to bytes per second */
3032  return Bps;
3033}
3034
3035static inline bool
3036isBandwidthMaxedOut (const tr_bandwidth * b,
3037                     const uint64_t now_msec, tr_direction dir)
3038{
3039  if (!tr_bandwidthIsLimited (b, dir))
3040    {
3041      return false;
3042    }
3043  else
3044    {
3045      const unsigned int got = tr_bandwidthGetPieceSpeed_Bps (b, now_msec, dir);
3046      const unsigned int want = tr_bandwidthGetDesiredSpeed_Bps (b, dir);
3047      return got >= want;
3048    }
3049}
3050
3051static void
3052rechokeUploads (tr_swarm * s, const uint64_t now)
3053{
3054  int i, size, unchokedInterested;
3055  const int peerCount = tr_ptrArraySize (&s->peers);
3056  tr_peer ** peers = (tr_peer**) tr_ptrArrayBase (&s->peers);
3057  struct ChokeData * choke = tr_new0 (struct ChokeData, peerCount);
3058  const tr_session * session = s->manager->session;
3059  const int chokeAll = !tr_torrentIsPieceTransferAllowed (s->tor, TR_CLIENT_TO_PEER);
3060  const bool isMaxedOut = isBandwidthMaxedOut (&s->tor->bandwidth, now, TR_UP);
3061
3062  assert (swarmIsLocked (s));
3063
3064  /* an optimistic unchoke peer's "optimistic"
3065   * state lasts for N calls to rechokeUploads (). */
3066  if (s->optimisticUnchokeTimeScaler > 0)
3067    s->optimisticUnchokeTimeScaler--;
3068  else
3069    s->optimistic = NULL;
3070
3071  /* sort the peers by preference and rate */
3072  for (i=0, size=0; i<peerCount; ++i)
3073    {
3074      tr_peer * peer = peers[i];
3075      tr_peerMsgs * msgs = PEER_MSGS (peer);
3076
3077      struct peer_atom * atom = peer->atom;
3078
3079      if (tr_peerIsSeed (peer)) /* choke seeds and partial seeds */
3080        {
3081          tr_peerMsgsSetChoke (PEER_MSGS(peer), true);
3082        }
3083      else if (chokeAll) /* choke everyone if we're not uploading */
3084        {
3085          tr_peerMsgsSetChoke (PEER_MSGS(peer), true);
3086        }
3087      else if (msgs != s->optimistic)
3088        {
3089          struct ChokeData * n = &choke[size++];
3090          n->msgs         = msgs;
3091          n->isInterested = tr_peerMsgsIsPeerInterested (msgs);
3092          n->wasChoked    = tr_peerMsgsIsPeerChoked (msgs);
3093          n->rate         = getRate (s->tor, atom, now);
3094          n->salt         = tr_cryptoWeakRandInt (INT_MAX);
3095          n->isChoked     = true;
3096        }
3097    }
3098
3099  qsort (choke, size, sizeof (struct ChokeData), compareChoke);
3100
3101  /**
3102   * Reciprocation and number of uploads capping is managed by unchoking
3103   * the N peers which have the best upload rate and are interested.
3104   * This maximizes the client's download rate. These N peers are
3105   * referred to as downloaders, because they are interested in downloading
3106   * from the client.
3107   *
3108   * Peers which have a better upload rate (as compared to the downloaders)
3109   * but aren't interested get unchoked. If they become interested, the
3110   * downloader with the worst upload rate gets choked. If a client has
3111   * a complete file, it uses its upload rate rather than its download
3112   * rate to decide which peers to unchoke.
3113   *
3114   * If our bandwidth is maxed out, don't unchoke any more peers.
3115   */
3116  unchokedInterested = 0;
3117  for (i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i)
3118    {
3119      choke[i].isChoked = isMaxedOut ? choke[i].wasChoked : false;
3120      if (choke[i].isInterested)
3121        ++unchokedInterested;
3122    }
3123
3124  /* optimistic unchoke */
3125  if (!s->optimistic && !isMaxedOut && (i<size))
3126    {
3127      int n;
3128      struct ChokeData * c;
3129      tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
3130
3131      for (; i<size; ++i)
3132        {
3133          if (choke[i].isInterested)
3134            {
3135              const tr_peerMsgs * msgs = choke[i].msgs;
3136              int x = 1, y;
3137              if (isNew (msgs)) x *= 3;
3138              for (y=0; y<x; ++y)
3139                tr_ptrArrayAppend (&randPool, &choke[i]);
3140            }
3141        }
3142
3143      if ((n = tr_ptrArraySize (&randPool)))
3144        {
3145          c = tr_ptrArrayNth (&randPool, tr_cryptoWeakRandInt (n));
3146          c->isChoked = false;
3147          s->optimistic = c->msgs;
3148          s->optimisticUnchokeTimeScaler = OPTIMISTIC_UNCHOKE_MULTIPLIER;
3149        }
3150
3151      tr_ptrArrayDestruct (&randPool, NULL);
3152    }
3153
3154  for (i=0; i<size; ++i)
3155    tr_peerMsgsSetChoke (choke[i].msgs, choke[i].isChoked);
3156
3157  /* cleanup */
3158  tr_free (choke);
3159}
3160
3161static void
3162rechokePulse (int foo UNUSED, short bar UNUSED, void * vmgr)
3163{
3164  tr_torrent * tor = NULL;
3165  tr_peerMgr * mgr = vmgr;
3166  const uint64_t now = tr_time_msec ();
3167
3168  managerLock (mgr);
3169
3170  while ((tor = tr_torrentNext (mgr->session, tor)))
3171    {
3172      if (tor->isRunning)
3173        {
3174          tr_swarm * s = tor->swarm;
3175
3176          if (s->stats.peerCount > 0)
3177            {
3178              rechokeUploads (s, now);
3179              rechokeDownloads (s);
3180            }
3181        }
3182    }
3183
3184  tr_timerAddMsec (mgr->rechokeTimer, RECHOKE_PERIOD_MSEC);
3185  managerUnlock (mgr);
3186}
3187
3188/***
3189****
3190****  Life and Death
3191****
3192***/
3193
3194static bool
3195shouldPeerBeClosed (const tr_swarm   * s,
3196                    const tr_peer    * peer,
3197                    int                peerCount,
3198                    const time_t       now)
3199{
3200  const tr_torrent * tor = s->tor;
3201  const struct peer_atom * atom = peer->atom;
3202
3203  /* if it's marked for purging, close it */
3204  if (peer->doPurge)
3205    {
3206      tordbg (s, "purging peer %s because its doPurge flag is set",
3207              tr_atomAddrStr (atom));
3208      return true;
3209    }
3210
3211  /* disconnect if we're both seeds and enough time has passed for PEX */
3212  if (tr_torrentIsSeed (tor) && tr_peerIsSeed (peer))
3213    return !tr_torrentAllowsPex (tor) || (now-atom->time>=30);
3214
3215  /* disconnect if it's been too long since piece data has been transferred.
3216   * this is on a sliding scale based on number of available peers... */
3217  {
3218    const int relaxStrictnessIfFewerThanN = (int)((getMaxPeerCount (tor) * 0.9) + 0.5);
3219    /* if we have >= relaxIfFewerThan, strictness is 100%.
3220     * if we have zero connections, strictness is 0% */
3221    const float strictness = peerCount >= relaxStrictnessIfFewerThanN
3222                           ? 1.0
3223                           : peerCount / (float)relaxStrictnessIfFewerThanN;
3224    const int lo = MIN_UPLOAD_IDLE_SECS;
3225    const int hi = MAX_UPLOAD_IDLE_SECS;
3226    const int limit = hi - ((hi - lo) * strictness);
3227    const int idleTime = now - MAX (atom->time, atom->piece_data_time);
3228/*fprintf (stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit);*/
3229    if (idleTime > limit)
3230      {
3231        tordbg (s, "purging peer %s because it's been %d secs since we shared anything",
3232                tr_atomAddrStr (atom), idleTime);
3233        return true;
3234      }
3235  }
3236
3237  return false;
3238}
3239
3240static tr_peer **
3241getPeersToClose (tr_swarm * s, const time_t now_sec, int * setmeSize)
3242{
3243  int i, peerCount, outsize;
3244  struct tr_peer ** ret = NULL;
3245  tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek (&s->peers, &peerCount);
3246
3247  assert (swarmIsLocked (s));
3248
3249  for (i=outsize=0; i<peerCount; ++i)
3250    {
3251      if (shouldPeerBeClosed (s, peers[i], peerCount, now_sec))
3252        {
3253          if (ret == NULL)
3254            ret = tr_new (tr_peer *, peerCount);
3255          ret[outsize++] = peers[i];
3256        }
3257    }
3258
3259  *setmeSize = outsize;
3260  return ret;
3261}
3262
3263static int
3264getReconnectIntervalSecs (const struct peer_atom * atom, const time_t now)
3265{
3266  int sec;
3267  const bool unreachable = (atom->flags2 & MYFLAG_UNREACHABLE) != 0;
3268
3269  /* if we were recently connected to this peer and transferring piece
3270   * data, try to reconnect to them sooner rather that later -- we don't
3271   * want network troubles to get in the way of a good peer. */
3272  if (!unreachable && ((now - atom->piece_data_time) <= (MINIMUM_RECONNECT_INTERVAL_SECS * 2)))
3273    sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3274
3275  /* otherwise, the interval depends on how many times we've tried
3276   * and failed to connect to the peer */
3277  else
3278    {
3279      int step = atom->numFails;
3280
3281      /* penalize peers that were unreachable the last time we tried */
3282      if (unreachable)
3283        step += 2;
3284 
3285      switch (step)
3286        {
3287          case 0: sec = 0; break;
3288          case 1: sec = 10; break;
3289          case 2: sec = 60 * 2; break;
3290          case 3: sec = 60 * 15; break;
3291          case 4: sec = 60 * 30; break;
3292          case 5: sec = 60 * 60; break;
3293          default: sec = 60 * 120; break;
3294        }
3295    }
3296
3297  dbgmsg ("reconnect interval for %s is %d seconds", tr_atomAddrStr (atom), sec);
3298  return sec;
3299}
3300
3301static void
3302removePeer (tr_swarm * s, tr_peer * peer)
3303{
3304  struct peer_atom * atom = peer->atom;
3305
3306  assert (swarmIsLocked (s));
3307  assert (atom);
3308
3309  atom->time = tr_time ();
3310
3311  tr_ptrArrayRemoveSortedPointer (&s->peers, peer, peerCompare);
3312  --s->stats.peerCount;
3313  --s->stats.peerFromCount[atom->fromFirst];
3314
3315  if (replicationExists (s))
3316    tr_decrReplicationFromBitfield (s, &peer->have);
3317
3318  assert (s->stats.peerCount == tr_ptrArraySize (&s->peers));
3319  assert (s->stats.peerFromCount[atom->fromFirst] >= 0);
3320
3321  tr_peerFree (peer);
3322}
3323
3324static void
3325closePeer (tr_swarm * s, tr_peer * peer)
3326{
3327  struct peer_atom * atom;
3328
3329  assert (s != NULL);
3330  assert (peer != NULL);
3331
3332  atom = peer->atom;
3333
3334  /* if we transferred piece data, then they might be good peers,
3335     so reset their `numFails' weight to zero. otherwise we connected
3336     to them fruitlessly, so mark it as another fail */
3337  if (atom->piece_data_time)
3338    {
3339      tordbg (s, "resetting atom %s numFails to 0", tr_atomAddrStr (atom));
3340      atom->numFails = 0;
3341    }
3342  else
3343    {
3344      ++atom->numFails;
3345      tordbg (s, "incremented atom %s numFails to %d", tr_atomAddrStr (atom), (int)atom->numFails);
3346    }
3347
3348  tordbg (s, "removing bad peer %s", tr_atomAddrStr (peer->atom));
3349  removePeer (s, peer);
3350}
3351
3352static void
3353removeAllPeers (tr_swarm * s)
3354{
3355  while (!tr_ptrArrayEmpty (&s->peers))
3356    removePeer (s, tr_ptrArrayNth (&s->peers, 0));
3357
3358  assert (!s->stats.peerCount);
3359}
3360
3361static void
3362closeBadPeers (tr_swarm * s, const time_t now_sec)
3363{
3364  if (!tr_ptrArrayEmpty (&s->peers))
3365    {
3366      int i;
3367      int peerCount;
3368      struct tr_peer ** peers;
3369
3370      peers = getPeersToClose (s, now_sec, &peerCount);
3371      for (i=0; i<peerCount; ++i)
3372        closePeer (s, peers[i]);
3373      tr_free (peers);
3374    }
3375}
3376
3377struct peer_liveliness
3378{
3379  tr_peer * peer;
3380  void * clientData;
3381  time_t pieceDataTime;
3382  time_t time;
3383  unsigned int speed;
3384  bool doPurge;
3385};
3386
3387static int
3388comparePeerLiveliness (const void * va, const void * vb)
3389{
3390  const struct peer_liveliness * a = va;
3391  const struct peer_liveliness * b = vb;
3392
3393  if (a->doPurge != b->doPurge)
3394    return a->doPurge ? 1 : -1;
3395
3396  if (a->speed != b->speed) /* faster goes first */
3397    return a->speed > b->speed ? -1 : 1;
3398
3399  /* the one to give us data more recently goes first */
3400  if (a->pieceDataTime != b->pieceDataTime)
3401    return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
3402
3403  /* the one we connected to most recently goes first */
3404  if (a->time != b->time)
3405    return a->time > b->time ? -1 : 1;
3406
3407  return 0;
3408}
3409
3410static void
3411sortPeersByLivelinessImpl (tr_peer  ** peers,
3412                           void     ** clientData,
3413                           int         n,
3414                           uint64_t    now,
3415                           int (*compare)(const void *va, const void *vb))
3416{
3417  int i;
3418  struct peer_liveliness *lives, *l;
3419
3420  /* build a sortable array of peer + extra info */
3421  lives = l = tr_new0 (struct peer_liveliness, n);
3422  for (i=0; i<n; ++i, ++l)
3423    {
3424      tr_peer * p = peers[i];
3425      l->peer = p;
3426      l->doPurge = p->doPurge;
3427      l->pieceDataTime = p->atom->piece_data_time;
3428      l->time = p->atom->time;
3429      l->speed = tr_peerGetPieceSpeed_Bps (p, now, TR_UP)
3430               + tr_peerGetPieceSpeed_Bps (p, now, TR_DOWN);
3431      if (clientData)
3432        l->clientData = clientData[i];
3433    }
3434
3435  /* sort 'em */
3436  assert (n == (l - lives));
3437  qsort (lives, n, sizeof (struct peer_liveliness), compare);
3438
3439  /* build the peer array */
3440  for (i=0, l=lives; i<n; ++i, ++l)
3441    {
3442      peers[i] = l->peer;
3443      if (clientData)
3444        clientData[i] = l->clientData;
3445    }
3446  assert (n == (l - lives));
3447
3448  /* cleanup */
3449  tr_free (lives);
3450}
3451
3452static void
3453sortPeersByLiveliness (tr_peer ** peers, void ** clientData, int n, uint64_t now)
3454{
3455  sortPeersByLivelinessImpl (peers, clientData, n, now, comparePeerLiveliness);
3456}
3457
3458
3459static void
3460enforceTorrentPeerLimit (tr_swarm * s, uint64_t now)
3461{
3462  int n = tr_ptrArraySize (&s->peers);
3463  const int max = tr_torrentGetPeerLimit (s->tor);
3464  if (n > max)
3465    {
3466      void * base = tr_ptrArrayBase (&s->peers);
3467      tr_peer ** peers = tr_memdup (base, n*sizeof (tr_peer*));
3468      sortPeersByLiveliness (peers, NULL, n, now);
3469      while (n > max)
3470        closePeer (s, peers[--n]);
3471      tr_free (peers);
3472    }
3473}
3474
3475static void
3476enforceSessionPeerLimit (tr_session * session, uint64_t now)
3477{
3478  int n = 0;
3479  tr_torrent * tor = NULL;
3480  const int max = tr_sessionGetPeerLimit (session);
3481
3482  /* count the total number of peers */
3483  while ((tor = tr_torrentNext (session, tor)))
3484    n += tr_ptrArraySize (&tor->swarm->peers);
3485
3486  /* if there are too many, prune out the worst */
3487  if (n > max)
3488    {
3489      tr_peer ** peers = tr_new (tr_peer*, n);
3490      tr_swarm ** swarms = tr_new (tr_swarm*, n);
3491
3492      /* populate the peer array */
3493      n = 0;
3494      tor = NULL;
3495      while ((tor = tr_torrentNext (session, tor)))
3496        {
3497          int i;
3498          tr_swarm * s = tor->swarm;
3499          const int tn = tr_ptrArraySize (&s->peers);
3500          for (i=0; i<tn; ++i, ++n)
3501            {
3502              peers[n] = tr_ptrArrayNth (&s->peers, i);
3503              swarms[n] = s;
3504            }
3505        }
3506
3507      /* sort 'em */
3508      sortPeersByLiveliness (peers, (void**)swarms, n, now);
3509
3510      /* cull out the crappiest */
3511      while (n-- > max)
3512        closePeer (swarms[n], peers[n]);
3513
3514      /* cleanup */
3515      tr_free (swarms);
3516      tr_free (peers);
3517    }
3518}
3519
3520static void makeNewPeerConnections (tr_peerMgr * mgr, const int max);
3521
3522static void
3523reconnectPulse (int foo UNUSED, short bar UNUSED, void * vmgr)
3524{
3525  tr_torrent * tor;
3526  tr_peerMgr * mgr = vmgr;
3527  const time_t now_sec = tr_time ();
3528  const uint64_t now_msec = tr_time_msec ();
3529
3530  /**
3531  ***  enforce the per-session and per-torrent peer limits
3532  **/
3533
3534  /* if we're over the per-torrent peer limits, cull some peers */
3535  tor = NULL;
3536  while ((tor = tr_torrentNext (mgr->session, tor)))
3537    if (tor->isRunning)
3538      enforceTorrentPeerLimit (tor->swarm, now_msec);
3539
3540  /* if we're over the per-session peer limits, cull some peers */
3541  enforceSessionPeerLimit (mgr->session, now_msec);
3542
3543  /* remove crappy peers */
3544  tor = NULL;
3545  while ((tor = tr_torrentNext (mgr->session, tor)))
3546    if (!tor->swarm->isRunning)
3547      removeAllPeers (tor->swarm);
3548    else
3549      closeBadPeers (tor->swarm, now_sec);
3550
3551  /* try to make new peer connections */
3552  makeNewPeerConnections (mgr, MAX_CONNECTIONS_PER_PULSE);
3553}
3554
3555/****
3556*****
3557*****  BANDWIDTH ALLOCATION
3558*****
3559****/
3560
3561static void
3562pumpAllPeers (tr_peerMgr * mgr)
3563{
3564  tr_torrent * tor = NULL;
3565
3566  while ((tor = tr_torrentNext (mgr->session, tor)))
3567    {
3568      int j;
3569      tr_swarm * s = tor->swarm;
3570
3571      for (j=0; j<tr_ptrArraySize (&s->peers); ++j)
3572        tr_peerMsgsPulse (tr_ptrArrayNth (&s->peers, j));
3573    }
3574}
3575
3576
3577static void
3578queuePulseForeach (void * vtor)
3579{
3580  tr_torrent * tor = vtor;
3581
3582  tr_torrentStartNow (tor);
3583
3584  if (tor->queue_started_callback != NULL)
3585    (*tor->queue_started_callback)(tor, tor->queue_started_user_data);
3586}
3587
3588static void
3589queuePulse (tr_session * session, tr_direction dir)
3590{
3591  assert (tr_isSession (session));
3592  assert (tr_isDirection (dir));
3593
3594  if (tr_sessionGetQueueEnabled (session, dir))
3595    {
3596      tr_ptrArray torrents = TR_PTR_ARRAY_INIT;
3597
3598      tr_sessionGetNextQueuedTorrents (session,
3599                                       dir,
3600                                       tr_sessionCountQueueFreeSlots (session, dir),
3601                                       &torrents);
3602
3603      tr_ptrArrayForeach (&torrents, queuePulseForeach);
3604
3605      tr_ptrArrayDestruct (&torrents, NULL);
3606    }
3607}
3608
3609static void
3610bandwidthPulse (int foo UNUSED, short bar UNUSED, void * vmgr)
3611{
3612  tr_torrent * tor;
3613  tr_peerMgr * mgr = vmgr;
3614  tr_session * session = mgr->session;
3615  managerLock (mgr);
3616
3617  /* FIXME: this next line probably isn't necessary... */
3618  pumpAllPeers (mgr);
3619
3620  /* allocate bandwidth to the peers */
3621  tr_bandwidthAllocate (&session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC);
3622  tr_bandwidthAllocate (&session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC);
3623
3624  /* torrent upkeep */
3625  tor = NULL;
3626  while ((tor = tr_torrentNext (session, tor)))
3627    {
3628      /* possibly stop torrents that have seeded enough */
3629      tr_torrentCheckSeedLimit (tor);
3630
3631      /* run the completeness check for any torrents that need it */
3632      if (tor->swarm->needsCompletenessCheck)
3633        {
3634          tor->swarm->needsCompletenessCheck  = false;
3635          tr_torrentRecheckCompleteness (tor);
3636        }
3637
3638      /* stop torrents that are ready to stop, but couldn't be stopped
3639         earlier during the peer-io callback call chain */
3640      if (tor->isStopping)
3641        tr_torrentStop (tor);
3642
3643      /* update the torrent's stats */
3644      tor->swarm->stats.activeWebseedCount = countActiveWebseeds (tor->swarm);
3645    }
3646
3647  /* pump the queues */
3648  queuePulse (session, TR_UP);
3649  queuePulse (session, TR_DOWN);
3650
3651  reconnectPulse (0, 0, mgr);
3652
3653  tr_timerAddMsec (mgr->bandwidthTimer, BANDWIDTH_PERIOD_MSEC);
3654  managerUnlock (mgr);
3655}
3656
3657/***
3658****
3659***/
3660
3661static int
3662compareAtomPtrsByAddress (const void * va, const void *vb)
3663{
3664  const struct peer_atom * a = * (const struct peer_atom**) va;
3665  const struct peer_atom * b = * (const struct peer_atom**) vb;
3666
3667  assert (tr_isAtom (a));
3668  assert (tr_isAtom (b));
3669
3670  return tr_address_compare (&a->addr, &b->addr);
3671}
3672
3673/* best come first, worst go last */
3674static int
3675compareAtomPtrsByShelfDate (const void * va, const void *vb)
3676{
3677  time_t atime;
3678  time_t btime;
3679  const struct peer_atom * a = * (const struct peer_atom**) va;
3680  const struct peer_atom * b = * (const struct peer_atom**) vb;
3681  const int data_time_cutoff_secs = 60 * 60;
3682  const time_t tr_now = tr_time ();
3683
3684  assert (tr_isAtom (a));
3685  assert (tr_isAtom (b));
3686
3687  /* primary key: the last piece data time *if* it was within the last hour */
3688  atime = a->piece_data_time; if (atime + data_time_cutoff_secs < tr_now) atime = 0;
3689  btime = b->piece_data_time; if (btime + data_time_cutoff_secs < tr_now) btime = 0;
3690  if (atime != btime)
3691    return atime > btime ? -1 : 1;
3692
3693  /* secondary key: shelf date. */
3694  if (a->shelf_date != b->shelf_date)
3695    return a->shelf_date > b->shelf_date ? -1 : 1;
3696
3697  return 0;
3698}
3699
3700static int
3701getMaxAtomCount (const tr_torrent * tor)
3702{
3703  return MIN (50, tor->maxConnectedPeers * 3);
3704}
3705
3706static void
3707atomPulse (int foo UNUSED, short bar UNUSED, void * vmgr)
3708{
3709  tr_torrent * tor = NULL;
3710  tr_peerMgr * mgr = vmgr;
3711  managerLock (mgr);
3712
3713  while ((tor = tr_torrentNext (mgr->session, tor)))
3714    {
3715      int atomCount;
3716      tr_swarm * s = tor->swarm;
3717      const int maxAtomCount = getMaxAtomCount (tor);
3718      struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek (&s->pool, &atomCount);
3719
3720      if (atomCount > maxAtomCount) /* we've got too many atoms... time to prune */
3721        {
3722          int i;
3723          int keepCount = 0;
3724          int testCount = 0;
3725          struct peer_atom ** keep = tr_new (struct peer_atom*, atomCount);
3726          struct peer_atom ** test = tr_new (struct peer_atom*, atomCount);
3727
3728          /* keep the ones that are in use */
3729          for (i=0; i<atomCount; ++i)
3730            {
3731              struct peer_atom * atom = atoms[i];
3732              if (peerIsInUse (s, atom))
3733                keep[keepCount++] = atom;
3734              else
3735                test[testCount++] = atom;
3736            }
3737
3738          /* if there's room, keep the best of what's left */
3739          i = 0;
3740          if (keepCount < maxAtomCount)
3741            {
3742              qsort (test, testCount, sizeof (struct peer_atom *), compareAtomPtrsByShelfDate);
3743              while (i<testCount && keepCount<maxAtomCount)
3744                keep[keepCount++] = test[i++];
3745            }
3746
3747          /* free the culled atoms */
3748          while (i<testCount)
3749            tr_free (test[i++]);
3750
3751          /* rebuild Torrent.pool with what's left */
3752          tr_ptrArrayDestruct (&s->pool, NULL);
3753          s->pool = TR_PTR_ARRAY_INIT;
3754          qsort (keep, keepCount, sizeof (struct peer_atom *), compareAtomPtrsByAddress);
3755          for (i=0; i<keepCount; ++i)
3756            tr_ptrArrayAppend (&s->pool, keep[i]);
3757
3758          tordbg (s, "max atom count is %d... pruned from %d to %d\n", maxAtomCount, atomCount, keepCount);
3759
3760          /* cleanup */
3761          tr_free (test);
3762          tr_free (keep);
3763        }
3764    }
3765
3766  tr_timerAddMsec (mgr->atomTimer, ATOM_PERIOD_MSEC);
3767  managerUnlock (mgr);
3768}
3769
3770/***
3771****
3772****
3773****
3774***/
3775
3776/* is this atom someone that we'd want to initiate a connection to? */
3777static bool
3778isPeerCandidate (const tr_torrent * tor, struct peer_atom * atom, const time_t now)
3779{
3780  /* not if we're both seeds */
3781  if (tr_torrentIsSeed (tor) && atomIsSeed (atom))
3782    return false;
3783
3784  /* not if we've already got a connection to them... */
3785  if (peerIsInUse (tor->swarm, atom))
3786    return false;
3787
3788  /* not if we just tried them already */
3789  if ((now - atom->time) < getReconnectIntervalSecs (atom, now))
3790    return false;
3791
3792  /* not if they're blocklisted */
3793  if (isAtomBlocklisted (tor->session, atom))
3794    return false;
3795
3796  /* not if they're banned... */
3797  if (atom->flags2 & MYFLAG_BANNED)
3798    return false;
3799
3800  return true;
3801}
3802
3803struct peer_candidate
3804{
3805  uint64_t score;
3806  tr_torrent * tor;
3807  struct peer_atom * atom;
3808};
3809
3810static bool
3811torrentWasRecentlyStarted (const tr_torrent * tor)
3812{
3813  return difftime (tr_time (), tor->startDate) < 120;
3814}
3815
3816static inline uint64_t
3817addValToKey (uint64_t value, int width, uint64_t addme)
3818{
3819  value = (value << (uint64_t)width);
3820  value |= addme;
3821  return value;
3822}
3823
3824/* smaller value is better */
3825static uint64_t
3826getPeerCandidateScore (const tr_torrent * tor, const struct peer_atom * atom, uint8_t salt)
3827{
3828  uint64_t i;
3829  uint64_t score = 0;
3830  const bool failed = atom->lastConnectionAt < atom->lastConnectionAttemptAt;
3831
3832  /* prefer peers we've connected to, or never tried, over peers we failed to connect to. */
3833  i = failed ? 1 : 0;
3834  score = addValToKey (score, 1, i);
3835
3836  /* prefer the one we attempted least recently (to cycle through all peers) */
3837  i = atom->lastConnectionAttemptAt;
3838  score = addValToKey (score, 32, i);
3839
3840  /* prefer peers belonging to a torrent of a higher priority */
3841  switch (tr_torrentGetPriority (tor))
3842    {
3843      case TR_PRI_HIGH:    i = 0; break;
3844      case TR_PRI_NORMAL:  i = 1; break;
3845      case TR_PRI_LOW:     i = 2; break;
3846    }
3847  score = addValToKey (score, 4, i);
3848
3849  /* prefer recently-started torrents */
3850  i = torrentWasRecentlyStarted (tor) ? 0 : 1;
3851  score = addValToKey (score, 1, i);
3852
3853  /* prefer torrents we're downloading with */
3854  i = tr_torrentIsSeed (tor) ? 1 : 0;
3855  score = addValToKey (score, 1, i);
3856
3857  /* prefer peers that are known to be connectible */
3858  i = (atom->flags & ADDED_F_CONNECTABLE) ? 0 : 1;
3859  score = addValToKey (score, 1, i);
3860
3861  /* prefer peers that we might have a chance of uploading to...
3862  so lower seed probability is better */
3863  if (atom->seedProbability == 100) i = 101;
3864  else if (atom->seedProbability == -1) i = 100;
3865  else i = atom->seedProbability;
3866  score = addValToKey (score, 8, i);
3867
3868  /* Prefer peers that we got from more trusted sources.
3869   * lower `fromBest' values indicate more trusted sources */
3870  score = addValToKey (score, 4, atom->fromBest);
3871
3872  /* salt */
3873  score = addValToKey (score, 8, salt);
3874
3875  return score;
3876}
3877
3878static int
3879comparePeerCandidates (const void * va, const void * vb)
3880{
3881  int ret;
3882  const struct peer_candidate * a = va;
3883  const struct peer_candidate * b = vb;
3884
3885  if (a->score < b->score)
3886    ret = -1;
3887  else if (a->score > b->score)
3888    ret = 1;
3889  else
3890    ret = 0;
3891
3892  return ret;
3893}
3894
3895/* Partial sorting -- selecting the k best candidates
3896   Adapted from http://en.wikipedia.org/wiki/Selection_algorithm */
3897static void
3898selectPeerCandidates (struct peer_candidate * candidates, int candidate_count, int select_count)
3899{
3900  tr_quickfindFirstK (candidates,
3901                      candidate_count,
3902                      sizeof(struct peer_candidate),
3903                      comparePeerCandidates,
3904                      select_count);
3905}
3906
3907#ifndef NDEBUG
3908static bool
3909checkBestScoresComeFirst (const struct peer_candidate * candidates, int n, int k)
3910{
3911  int i;
3912  uint64_t worstFirstScore = 0;
3913  const int x = MIN (n, k) - 1;
3914
3915  for (i=0; i<x; i++)
3916    if (worstFirstScore < candidates[i].score)
3917      worstFirstScore = candidates[i].score;
3918
3919  for (i=0; i<x; i++)
3920    assert (candidates[i].score <= worstFirstScore);
3921
3922  for (i=x+1; i<n; i++)
3923    assert (candidates[i].score >= worstFirstScore);
3924
3925  return true;
3926}
3927#endif /* NDEBUG */
3928
3929/** @return an array of all the atoms we might want to connect to */
3930static struct peer_candidate*
3931getPeerCandidates (tr_session * session, int * candidateCount, int max)
3932{
3933  int atomCount;
3934  int peerCount;
3935  tr_torrent * tor;
3936  struct peer_candidate * candidates;
3937  struct peer_candidate * walk;
3938  const time_t now = tr_time ();
3939  const uint64_t now_msec = tr_time_msec ();
3940  /* leave 5% of connection slots for incoming connections -- ticket #2609 */
3941  const int maxCandidates = tr_sessionGetPeerLimit (session) * 0.95;
3942
3943  /* count how many peers and atoms we've got */
3944  tor= NULL;
3945  atomCount = 0;
3946  peerCount = 0;
3947  while ((tor = tr_torrentNext (session, tor)))
3948    {
3949      atomCount += tr_ptrArraySize (&tor->swarm->pool);
3950      peerCount += tr_ptrArraySize (&tor->swarm->peers);
3951    }
3952
3953  /* don't start any new handshakes if we're full up */
3954  if (maxCandidates <= peerCount)
3955    {
3956      *candidateCount = 0;
3957      return NULL;
3958    }
3959
3960  /* allocate an array of candidates */
3961  walk = candidates = tr_new (struct peer_candidate, atomCount);
3962
3963  /* populate the candidate array */
3964  tor = NULL;
3965  while ((tor = tr_torrentNext (session, tor)))
3966    {
3967      int i, nAtoms;
3968      struct peer_atom ** atoms;
3969
3970      if (!tor->swarm->isRunning)
3971        continue;
3972
3973      /* if we've already got enough peers in this torrent... */
3974      if (tr_torrentGetPeerLimit (tor) <= tr_ptrArraySize (&tor->swarm->peers))
3975        continue;
3976
3977      /* if we've already got enough speed in this torrent... */
3978      if (tr_torrentIsSeed (tor) && isBandwidthMaxedOut (&tor->bandwidth, now_msec, TR_UP))
3979        continue;
3980
3981      atoms = (struct peer_atom**) tr_ptrArrayPeek (&tor->swarm->pool, &nAtoms);
3982      for (i=0; i<nAtoms; ++i)
3983        {
3984          struct peer_atom * atom = atoms[i];
3985
3986          if (isPeerCandidate (tor, atom, now))
3987            {
3988              const uint8_t salt = tr_cryptoWeakRandInt (1024);
3989              walk->tor = tor;
3990              walk->atom = atom;
3991              walk->score = getPeerCandidateScore (tor, atom, salt);
3992              ++walk;
3993            }
3994        }
3995    }
3996
3997  *candidateCount = walk - candidates;
3998  if (walk != candidates)
3999    selectPeerCandidates (candidates, walk-candidates, max);
4000
4001  assert (checkBestScoresComeFirst (candidates, *candidateCount, max));
4002  return candidates;
4003}
4004
4005static void
4006initiateConnection (tr_peerMgr * mgr, tr_swarm * s, struct peer_atom * atom)
4007{
4008  tr_peerIo * io;
4009  const time_t now = tr_time ();
4010  bool utp = tr_sessionIsUTPEnabled (mgr->session) && !atom->utp_failed;
4011
4012  if (atom->fromFirst == TR_PEER_FROM_PEX)
4013    /* PEX has explicit signalling for uTP support.  If an atom
4014       originally came from PEX and doesn't have the uTP flag, skip the
4015       uTP connection attempt.  Are we being optimistic here? */
4016    utp = utp && (atom->flags & ADDED_F_UTP_FLAGS);
4017
4018  tordbg (s, "Starting an OUTGOING%s connection with %s",
4019          utp ? " µTP" : "", tr_atomAddrStr (atom));
4020
4021  io = tr_peerIoNewOutgoing (mgr->session,
4022                             &mgr->session->bandwidth,
4023                             &atom->addr,
4024                             atom->port,
4025                             s->tor->info.hash,
4026                             s->tor->completeness == TR_SEED,
4027                             utp);
4028
4029  if (io == NULL)
4030    {
4031      tordbg (s, "peerIo not created; marking peer %s as unreachable", tr_atomAddrStr (atom));
4032      atom->flags2 |= MYFLAG_UNREACHABLE;
4033      atom->numFails++;
4034    }
4035  else
4036    {
4037      tr_handshake * handshake = tr_handshakeNew (io,
4038                                                  mgr->session->encryptionMode,
4039                                                  myHandshakeDoneCB,
4040                                                  mgr);
4041
4042      assert (tr_peerIoGetTorrentHash (io));
4043
4044      tr_peerIoUnref (io); /* balanced by the initial ref
4045                              in tr_peerIoNewOutgoing () */
4046
4047      tr_ptrArrayInsertSorted (&s->outgoingHandshakes, handshake,
4048                               handshakeCompare);
4049    }
4050
4051  atom->lastConnectionAttemptAt = now;
4052  atom->time = now;
4053}
4054
4055static void
4056initiateCandidateConnection (tr_peerMgr * mgr, struct peer_candidate * c)
4057{
4058#if 0
4059  fprintf (stderr, "Starting an OUTGOING connection with %s - [%s] seedProbability==%d; %s, %s\n",
4060           tr_atomAddrStr (c->atom),
4061           tr_torrentName (c->tor),
4062           (int)c->atom->seedProbability,
4063           tr_torrentIsPrivate (c->tor) ? "private" : "public",
4064           tr_torrentIsSeed (c->tor) ? "seed" : "downloader");
4065#endif
4066
4067  initiateConnection (mgr, c->tor->swarm, c->atom);
4068}
4069
4070static void
4071makeNewPeerConnections (struct tr_peerMgr * mgr, const int max)
4072{
4073  int i, n;
4074  struct peer_candidate * candidates;
4075
4076  candidates = getPeerCandidates (mgr->session, &n, max);
4077
4078  for (i=0; i<n && i<max; ++i)
4079    initiateCandidateConnection (mgr, &candidates[i]);
4080
4081  tr_free (candidates);
4082}
Note: See TracBrowser for help on using the repository browser.