source: trunk/libtransmission/peer-mgr.c @ 14483

Last change on this file since 14483 was 14483, checked in by mikedld, 6 years ago

#5407: Fix crash on Mac when pausing torrents (patch by x190)

  • Property svn:keywords set to Date Rev Author Id
File size: 108.5 KB
Line 
1/*
2 * This file Copyright (C) 2007-2014 Mnemosyne LLC
3 *
4 * It may be used under the GNU GPL versions 2 or 3
5 * or any future license endorsed by Mnemosyne LLC.
6 *
7 * $Id: peer-mgr.c 14483 2015-03-26 18:32:33Z mikedld $
8 */
9
10#include <assert.h>
11#include <errno.h> /* error codes ERANGE, ... */
12#include <limits.h> /* INT_MAX */
13#include <string.h> /* memcpy, memcmp, strstr */
14#include <stdlib.h> /* qsort */
15
16#include <event2/event.h>
17
18#include <libutp/utp.h>
19
20#include "transmission.h"
21#include "announcer.h"
22#include "bandwidth.h"
23#include "blocklist.h"
24#include "cache.h"
25#include "clients.h"
26#include "completion.h"
27#include "crypto-utils.h"
28#include "handshake.h"
29#include "log.h"
30#include "net.h"
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-msgs.h"
34#include "ptrarray.h"
35#include "session.h"
36#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
37#include "torrent.h"
38#include "tr-utp.h"
39#include "utils.h"
40#include "webseed.h"
41
42enum
43{
44  /* how frequently to cull old atoms */
45  ATOM_PERIOD_MSEC = (60 * 1000),
46
47  /* how frequently to change which peers are choked */
48  RECHOKE_PERIOD_MSEC = (10 * 1000),
49
50  /* an optimistically unchoked peer is immune from rechoking
51     for this many calls to rechokeUploads (). */
52  OPTIMISTIC_UNCHOKE_MULTIPLIER = 4,
53
54  /* how frequently to reallocate bandwidth */
55  BANDWIDTH_PERIOD_MSEC = 500,
56
57  /* how frequently to age out old piece request lists */
58  REFILL_UPKEEP_PERIOD_MSEC = (10 * 1000),
59
60  /* how frequently to decide which peers live and die */
61  RECONNECT_PERIOD_MSEC = 500,
62
63  /* when many peers are available, keep idle ones this long */
64  MIN_UPLOAD_IDLE_SECS = (60),
65
66  /* when few peers are available, keep idle ones this long */
67  MAX_UPLOAD_IDLE_SECS = (60 * 5),
68
69  /* max number of peers to ask for per second overall.
70   * this throttle is to avoid overloading the router */
71  MAX_CONNECTIONS_PER_SECOND = 12,
72
73  MAX_CONNECTIONS_PER_PULSE = (int)(MAX_CONNECTIONS_PER_SECOND * (RECONNECT_PERIOD_MSEC/1000.0)),
74
75  /* number of bad pieces a peer is allowed to send before we ban them */
76  MAX_BAD_PIECES_PER_PEER = 5,
77
78  /* amount of time to keep a list of request pieces lying around
79     before it's considered too old and needs to be rebuilt */
80  PIECE_LIST_SHELF_LIFE_SECS = 60,
81
82  /* use for bitwise operations w/peer_atom.flags2 */
83  MYFLAG_BANNED = 1,
84
85  /* use for bitwise operations w/peer_atom.flags2 */
86  /* unreachable for now... but not banned.
87   * if they try to connect to us it's okay */
88  MYFLAG_UNREACHABLE = 2,
89
90  /* the minimum we'll wait before attempting to reconnect to a peer */
91  MINIMUM_RECONNECT_INTERVAL_SECS = 5,
92
93  /** how long we'll let requests we've made linger before we cancel them */
94  REQUEST_TTL_SECS = 90,
95
96  NO_BLOCKS_CANCEL_HISTORY = 120,
97
98  CANCEL_HISTORY_SEC = 60
99};
100
101const tr_peer_event TR_PEER_EVENT_INIT = { 0, 0, NULL, 0, 0, 0, 0 };
102
103const tr_swarm_stats TR_SWARM_STATS_INIT = { { 0, 0 }, 0, 0, { 0, 0, 0, 0, 0, 0, 0 } };
104
105/**
106***
107**/
108
109/**
110 * Peer information that should be kept even before we've connected and
111 * after we've disconnected. These are kept in a pool of peer_atoms to decide
112 * which ones would make good candidates for connecting to, and to watch out
113 * for banned peers.
114 *
115 * @see tr_peer
116 * @see tr_peerMsgs
117 */
118struct peer_atom
119{
120  uint8_t     fromFirst;          /* where the peer was first found */
121  uint8_t     fromBest;           /* the "best" value of where the peer has been found */
122  uint8_t     flags;              /* these match the added_f flags */
123  uint8_t     flags2;             /* flags that aren't defined in added_f */
124  int8_t      seedProbability;    /* how likely is this to be a seed... [0..100] or -1 for unknown */
125  int8_t      blocklisted;        /* -1 for unknown, true for blocklisted, false for not blocklisted */
126
127  tr_port     port;
128  bool        utp_failed;         /* We recently failed to connect over uTP */
129  uint16_t    numFails;
130  time_t      time;               /* when the peer's connection status last changed */
131  time_t      piece_data_time;
132
133  time_t      lastConnectionAttemptAt;
134  time_t      lastConnectionAt;
135
136  /* similar to a TTL field, but less rigid --
137   * if the swarm is small, the atom will be kept past this date. */
138  time_t      shelf_date;
139  tr_peer   * peer;               /* will be NULL if not connected */
140  tr_address  addr;
141};
142
143#ifdef NDEBUG
144#define tr_isAtom(a) (TRUE)
145#else
146static bool
147tr_isAtom (const struct peer_atom * atom)
148{
149  return (atom != NULL)
150      && (atom->fromFirst < TR_PEER_FROM__MAX)
151      && (atom->fromBest < TR_PEER_FROM__MAX)
152      && (tr_address_is_valid (&atom->addr));
153}
154#endif
155
156static const char*
157tr_atomAddrStr (const struct peer_atom * atom)
158{
159  return atom ? tr_peerIoAddrStr (&atom->addr, atom->port) : "[no atom]";
160}
161
162struct block_request
163{
164  tr_block_index_t block;
165  tr_peer * peer;
166  time_t sentAt;
167};
168
169struct weighted_piece
170{
171  tr_piece_index_t index;
172  int16_t salt;
173  int16_t requestCount;
174};
175
176enum piece_sort_state
177{
178  PIECES_UNSORTED,
179  PIECES_SORTED_BY_INDEX,
180  PIECES_SORTED_BY_WEIGHT
181};
182
183/** @brief Opaque, per-torrent data structure for peer connection information */
184typedef struct tr_swarm
185{
186  tr_swarm_stats             stats;
187
188  tr_ptrArray                outgoingHandshakes; /* tr_handshake */
189  tr_ptrArray                pool; /* struct peer_atom */
190  tr_ptrArray                peers; /* tr_peerMsgs */
191  tr_ptrArray                webseeds; /* tr_webseed */
192
193  tr_torrent               * tor;
194  struct tr_peerMgr        * manager;
195
196  tr_peerMsgs              * optimistic; /* the optimistic peer, or NULL if none */
197  int                        optimisticUnchokeTimeScaler;
198
199  bool                       isRunning;
200  bool                       needsCompletenessCheck;
201
202  struct block_request     * requests;
203  int                        requestCount;
204  int                        requestAlloc;
205
206  struct weighted_piece    * pieces;
207  int                        pieceCount;
208  enum piece_sort_state      pieceSortState;
209
210  /* An array of pieceCount items stating how many peers have each piece.
211     This is used to help us for downloading pieces "rarest first."
212     This may be NULL if we don't have metainfo yet, or if we're not
213     downloading and don't care about rarity */
214  uint16_t                 * pieceReplication;
215  size_t                     pieceReplicationSize;
216
217  int                        interestedCount;
218  int                        maxPeers;
219  time_t                     lastCancel;
220
221  /* Before the endgame this should be 0. In endgame, is contains the average
222   * number of pending requests per peer. Only peers which have more pending
223   * requests are considered 'fast' are allowed to request a block that's
224   * already been requested from another (slower?) peer. */
225  int                        endgame;
226}
227tr_swarm;
228
229struct tr_peerMgr
230{
231  tr_session    * session;
232  tr_ptrArray     incomingHandshakes; /* tr_handshake */
233  struct event  * bandwidthTimer;
234  struct event  * rechokeTimer;
235  struct event  * refillUpkeepTimer;
236  struct event  * atomTimer;
237};
238
239#define tordbg(t, ...) \
240  do \
241    { \
242      if (tr_logGetDeepEnabled ()) \
243        tr_logAddDeep (__FILE__, __LINE__, tr_torrentName (t->tor), __VA_ARGS__); \
244    } \
245  while (0)
246
247#define dbgmsg(...) \
248  do \
249    { \
250      if (tr_logGetDeepEnabled ()) \
251        tr_logAddDeep (__FILE__, __LINE__, NULL, __VA_ARGS__); \
252    } \
253  while (0)
254
255/**
256*** tr_peer virtual functions
257**/
258
259static bool
260tr_peerIsTransferringPieces (const tr_peer * peer,
261                             uint64_t        now,
262                             tr_direction    direction,
263                             unsigned int  * Bps)
264{
265  assert (peer != NULL);
266  assert (peer->funcs != NULL);
267
268  return (*peer->funcs->is_transferring_pieces)(peer, now, direction, Bps);
269}
270
271unsigned int
272tr_peerGetPieceSpeed_Bps (const tr_peer    * peer,
273                          uint64_t           now,
274                          tr_direction       direction)
275{
276  unsigned int Bps = 0;
277  tr_peerIsTransferringPieces (peer, now, direction, &Bps);
278  return Bps;
279}
280
281static void
282tr_peerFree (tr_peer * peer)
283{
284  assert (peer != NULL);
285  assert (peer->funcs != NULL);
286
287  (*peer->funcs->destruct)(peer);
288
289  tr_free (peer);
290}
291
292void
293tr_peerConstruct (tr_peer * peer, const tr_torrent * tor)
294{
295  assert (peer != NULL);
296  assert (tr_isTorrent (tor));
297
298  memset (peer, 0, sizeof (tr_peer));
299
300  peer->client = TR_KEY_NONE;
301  peer->swarm = tor->swarm;
302  tr_bitfieldConstruct (&peer->have, tor->info.pieceCount);
303  tr_bitfieldConstruct (&peer->blame, tor->blockCount);
304}
305
306static void peerDeclinedAllRequests (tr_swarm *, const tr_peer *);
307
308void
309tr_peerDestruct (tr_peer * peer)
310{
311  assert (peer != NULL);
312
313  if (peer->swarm != NULL)
314    peerDeclinedAllRequests (peer->swarm, peer);
315
316  tr_bitfieldDestruct (&peer->have);
317  tr_bitfieldDestruct (&peer->blame);
318
319  if (peer->atom)
320    peer->atom->peer = NULL;
321}
322
323/**
324***
325**/
326
327static inline void
328managerLock (const struct tr_peerMgr * manager)
329{
330  tr_sessionLock (manager->session);
331}
332
333static inline void
334managerUnlock (const struct tr_peerMgr * manager)
335{
336  tr_sessionUnlock (manager->session);
337}
338
339static inline void
340swarmLock (tr_swarm * swarm)
341{
342  managerLock (swarm->manager);
343}
344
345static inline void
346swarmUnlock (tr_swarm * swarm)
347{
348  managerUnlock (swarm->manager);
349}
350
351static inline int
352swarmIsLocked (const tr_swarm * swarm)
353{
354  return tr_sessionIsLocked (swarm->manager->session);
355}
356
357/**
358***
359**/
360
361static int
362handshakeCompareToAddr (const void * va, const void * vb)
363{
364  const tr_handshake * a = va;
365
366  return tr_address_compare (tr_handshakeGetAddr (a, NULL), vb);
367}
368
369static int
370handshakeCompare (const void * a, const void * b)
371{
372  return handshakeCompareToAddr (a, tr_handshakeGetAddr (b, NULL));
373}
374
375static inline tr_handshake*
376getExistingHandshake (tr_ptrArray * handshakes, const tr_address * addr)
377{
378  if (tr_ptrArrayEmpty (handshakes))
379    return NULL;
380
381  return tr_ptrArrayFindSorted (handshakes, addr, handshakeCompareToAddr);
382}
383
384static int
385comparePeerAtomToAddress (const void * va, const void * vb)
386{
387  const struct peer_atom * a = va;
388
389  return tr_address_compare (&a->addr, vb);
390}
391
392static int
393compareAtomsByAddress (const void * va, const void * vb)
394{
395  const struct peer_atom * b = vb;
396
397  assert (tr_isAtom (b));
398
399  return comparePeerAtomToAddress (va, &b->addr);
400}
401
402/**
403***
404**/
405
406const tr_address *
407tr_peerAddress (const tr_peer * peer)
408{
409  return &peer->atom->addr;
410}
411
412static tr_swarm *
413getExistingSwarm (tr_peerMgr *    manager,
414                  const uint8_t * hash)
415{
416  tr_torrent * tor = tr_torrentFindFromHash (manager->session, hash);
417
418  return tor == NULL ? NULL : tor->swarm;
419}
420
421static int
422peerCompare (const void * a, const void * b)
423{
424  return tr_address_compare (tr_peerAddress (a), tr_peerAddress (b));
425}
426
427static struct peer_atom*
428getExistingAtom (const tr_swarm   * cswarm,
429                 const tr_address * addr)
430{
431  tr_swarm * swarm = (tr_swarm*) cswarm;
432  return tr_ptrArrayFindSorted (&swarm->pool, addr, comparePeerAtomToAddress);
433}
434
435static bool
436peerIsInUse (const tr_swarm * cs, const struct peer_atom * atom)
437{
438  tr_swarm * s = (tr_swarm*) cs;
439
440  assert (swarmIsLocked (s));
441
442  return (atom->peer != NULL)
443      || getExistingHandshake (&s->outgoingHandshakes, &atom->addr)
444      || getExistingHandshake (&s->manager->incomingHandshakes, &atom->addr);
445}
446
447static inline bool
448replicationExists (const tr_swarm * s)
449{
450  return s->pieceReplication != NULL;
451}
452
453static void
454replicationFree (tr_swarm * s)
455{
456  tr_free (s->pieceReplication);
457  s->pieceReplication = NULL;
458  s->pieceReplicationSize = 0;
459}
460
461static void
462replicationNew (tr_swarm * s)
463{
464  tr_piece_index_t piece_i;
465  const tr_piece_index_t piece_count = s->tor->info.pieceCount;
466  const int n = tr_ptrArraySize (&s->peers);
467
468  assert (!replicationExists (s));
469
470  s->pieceReplicationSize = piece_count;
471  s->pieceReplication = tr_new0 (uint16_t, piece_count);
472
473  for (piece_i=0; piece_i<piece_count; ++piece_i)
474    {
475      int peer_i;
476      uint16_t r = 0;
477
478      for (peer_i=0; peer_i<n; ++peer_i)
479        {
480          tr_peer * peer = tr_ptrArrayNth (&s->peers, peer_i);
481          if (tr_bitfieldHas (&peer->have, piece_i))
482            ++r;
483        }
484
485      s->pieceReplication[piece_i] = r;
486    }
487}
488
489static void
490swarmFree (void * vs)
491{
492  tr_swarm * s = vs;
493
494  assert (s);
495  assert (!s->isRunning);
496  assert (swarmIsLocked (s));
497  assert (tr_ptrArrayEmpty (&s->outgoingHandshakes));
498  assert (tr_ptrArrayEmpty (&s->peers));
499
500  tr_ptrArrayDestruct (&s->webseeds, (PtrArrayForeachFunc)tr_peerFree);
501  tr_ptrArrayDestruct (&s->pool, (PtrArrayForeachFunc)tr_free);
502  tr_ptrArrayDestruct (&s->outgoingHandshakes, NULL);
503  tr_ptrArrayDestruct (&s->peers, NULL);
504  s->stats = TR_SWARM_STATS_INIT;
505
506  replicationFree (s);
507
508  tr_free (s->requests);
509  tr_free (s->pieces);
510  tr_free (s);
511}
512
513static void peerCallbackFunc (tr_peer *, const tr_peer_event *, void *);
514
515static void
516rebuildWebseedArray (tr_swarm * s, tr_torrent * tor)
517{
518  unsigned int i;
519  const tr_info * inf = &tor->info;
520
521  /* clear the array */
522  tr_ptrArrayDestruct (&s->webseeds, (PtrArrayForeachFunc)tr_peerFree);
523  s->webseeds = TR_PTR_ARRAY_INIT;
524  s->stats.activeWebseedCount = 0;
525
526  /* repopulate it */
527  for (i=0; i<inf->webseedCount; ++i)
528    {
529      tr_webseed * w = tr_webseedNew (tor, inf->webseeds[i], peerCallbackFunc, s);
530      tr_ptrArrayAppend (&s->webseeds, w);
531    }
532}
533
534static tr_swarm *
535swarmNew (tr_peerMgr * manager, tr_torrent * tor)
536{
537  tr_swarm * s;
538
539  s = tr_new0 (tr_swarm, 1);
540  s->manager = manager;
541  s->tor = tor;
542  s->pool = TR_PTR_ARRAY_INIT;
543  s->peers = TR_PTR_ARRAY_INIT;
544  s->webseeds = TR_PTR_ARRAY_INIT;
545  s->outgoingHandshakes = TR_PTR_ARRAY_INIT;
546
547  rebuildWebseedArray (s, tor);
548
549  return s;
550}
551
552static void ensureMgrTimersExist (struct tr_peerMgr * m);
553
554tr_peerMgr*
555tr_peerMgrNew (tr_session * session)
556{
557  tr_peerMgr * m = tr_new0 (tr_peerMgr, 1);
558  m->session = session;
559  m->incomingHandshakes = TR_PTR_ARRAY_INIT;
560  ensureMgrTimersExist (m);
561  return m;
562}
563
564static void
565deleteTimer (struct event ** t)
566{
567  if (*t != NULL)
568    {
569      event_free (*t);
570      *t = NULL;
571    }
572}
573
574static void
575deleteTimers (struct tr_peerMgr * m)
576{
577  deleteTimer (&m->atomTimer);
578  deleteTimer (&m->bandwidthTimer);
579  deleteTimer (&m->rechokeTimer);
580  deleteTimer (&m->refillUpkeepTimer);
581}
582
583void
584tr_peerMgrFree (tr_peerMgr * manager)
585{
586  managerLock (manager);
587
588  deleteTimers (manager);
589
590  /* free the handshakes. Abort invokes handshakeDoneCB (), which removes
591   * the item from manager->handshakes, so this is a little roundabout... */
592  while (!tr_ptrArrayEmpty (&manager->incomingHandshakes))
593    tr_handshakeAbort (tr_ptrArrayNth (&manager->incomingHandshakes, 0));
594
595  tr_ptrArrayDestruct (&manager->incomingHandshakes, NULL);
596
597  managerUnlock (manager);
598  tr_free (manager);
599}
600
601/***
602****
603***/
604
605void
606tr_peerMgrOnBlocklistChanged (tr_peerMgr * mgr)
607{
608  tr_torrent * tor = NULL;
609  tr_session * session = mgr->session;
610
611  /* we cache whether or not a peer is blocklisted...
612     since the blocklist has changed, erase that cached value */
613  while ((tor = tr_torrentNext (session, tor)))
614    {
615      int i;
616      tr_swarm * s = tor->swarm;
617      const int n = tr_ptrArraySize (&s->pool);
618      for (i=0; i<n; ++i)
619        {
620          struct peer_atom * atom = tr_ptrArrayNth (&s->pool, i);
621          atom->blocklisted = -1;
622        }
623    }
624}
625
626static bool
627isAtomBlocklisted (tr_session * session, struct peer_atom * atom)
628{
629  if (atom->blocklisted < 0)
630    atom->blocklisted = tr_sessionIsAddressBlocked (session, &atom->addr);
631
632  assert (tr_isBool (atom->blocklisted));
633  return atom->blocklisted;
634}
635
636
637/***
638****
639***/
640
641static void
642atomSetSeedProbability (struct peer_atom * atom, int seedProbability)
643{
644  assert (atom != NULL);
645  assert (-1<=seedProbability && seedProbability<=100);
646
647  atom->seedProbability = seedProbability;
648
649  if (seedProbability == 100)
650    atom->flags |= ADDED_F_SEED_FLAG;
651  else if (seedProbability != -1)
652    atom->flags &= ~ADDED_F_SEED_FLAG;
653}
654
655static inline bool
656atomIsSeed (const struct peer_atom * atom)
657{
658  return atom->seedProbability == 100;
659}
660
661static void
662atomSetSeed (const tr_swarm * s, struct peer_atom * atom)
663{
664  if (!atomIsSeed (atom))
665    {
666      tordbg (s, "marking peer %s as a seed", tr_atomAddrStr (atom));
667
668      atomSetSeedProbability (atom, 100);
669    }
670}
671
672
673bool
674tr_peerMgrPeerIsSeed (const tr_torrent  * tor,
675                      const tr_address  * addr)
676{
677  bool isSeed = false;
678  const tr_swarm * s = tor->swarm;
679  const struct peer_atom * atom = getExistingAtom (s, addr);
680
681  if (atom)
682    isSeed = atomIsSeed (atom);
683
684  return isSeed;
685}
686
687void
688tr_peerMgrSetUtpSupported (tr_torrent * tor, const tr_address * addr)
689{
690  struct peer_atom * atom = getExistingAtom (tor->swarm, addr);
691
692  if (atom)
693    atom->flags |= ADDED_F_UTP_FLAGS;
694}
695
696void
697tr_peerMgrSetUtpFailed (tr_torrent *tor, const tr_address *addr, bool failed)
698{
699  struct peer_atom * atom = getExistingAtom (tor->swarm, addr);
700
701  if (atom)
702    atom->utp_failed = failed;
703}
704
705
706/**
707***  REQUESTS
708***
709*** There are two data structures associated with managing block requests:
710***
711*** 1. tr_swarm::requests, an array of "struct block_request" which keeps
712***    track of which blocks have been requested, and when, and by which peers.
713***    This is list is used for (a) cancelling requests that have been pending
714***    for too long and (b) avoiding duplicate requests before endgame.
715***
716*** 2. tr_swarm::pieces, an array of "struct weighted_piece" which lists the
717***    pieces that we want to request. It's used to decide which blocks to
718***    return next when tr_peerMgrGetBlockRequests () is called.
719**/
720
721/**
722*** struct block_request
723**/
724
725static int
726compareReqByBlock (const void * va, const void * vb)
727{
728  const struct block_request * a = va;
729  const struct block_request * b = vb;
730
731  /* primary key: block */
732  if (a->block < b->block) return -1;
733  if (a->block > b->block) return 1;
734
735  /* secondary key: peer */
736  if (a->peer < b->peer) return -1;
737  if (a->peer > b->peer) return 1;
738
739  return 0;
740}
741
742static void
743requestListAdd (tr_swarm * s, tr_block_index_t block, tr_peer * peer)
744{
745  struct block_request key;
746
747  /* ensure enough room is available... */
748  if (s->requestCount + 1 >= s->requestAlloc)
749    {
750      const int CHUNK_SIZE = 128;
751      s->requestAlloc += CHUNK_SIZE;
752      s->requests = tr_renew (struct block_request,
753                              s->requests, s->requestAlloc);
754    }
755
756  /* populate the record we're inserting */
757  key.block = block;
758  key.peer = peer;
759  key.sentAt = tr_time ();
760
761  /* insert the request to our array... */
762  {
763    bool exact;
764    const int pos = tr_lowerBound (&key, s->requests, s->requestCount,
765                                   sizeof (struct block_request),
766                                   compareReqByBlock, &exact);
767    assert (!exact);
768    memmove (s->requests + pos + 1,
769             s->requests + pos,
770             sizeof (struct block_request) * (s->requestCount++ - pos));
771    s->requests[pos] = key;
772  }
773
774  if (peer != NULL)
775    {
776      ++peer->pendingReqsToPeer;
777      assert (peer->pendingReqsToPeer >= 0);
778    }
779
780  /*fprintf (stderr, "added request of block %lu from peer %s... "
781                     "there are now %d block\n",
782                     (unsigned long)block, tr_atomAddrStr (peer->atom), s->requestCount);*/
783}
784
785static struct block_request *
786requestListLookup (tr_swarm * s, tr_block_index_t block, const tr_peer * peer)
787{
788  struct block_request key;
789  key.block = block;
790  key.peer = (tr_peer*) peer;
791
792  return bsearch (&key, s->requests, s->requestCount,
793                  sizeof (struct block_request),
794                  compareReqByBlock);
795}
796
797/**
798 * Find the peers are we currently requesting the block
799 * with index @a block from and append them to @a peerArr.
800 */
801static void
802getBlockRequestPeers (tr_swarm * s, tr_block_index_t block,
803                      tr_ptrArray * peerArr)
804{
805  bool exact;
806  int i, pos;
807  struct block_request key;
808
809  key.block = block;
810  key.peer = NULL;
811  pos = tr_lowerBound (&key, s->requests, s->requestCount,
812                       sizeof (struct block_request),
813                       compareReqByBlock, &exact);
814
815  assert (!exact); /* shouldn't have a request with .peer == NULL */
816
817  for (i=pos; i<s->requestCount; ++i)
818    {
819      if (s->requests[i].block != block)
820        break;
821      tr_ptrArrayAppend (peerArr, s->requests[i].peer);
822    }
823}
824
825static void
826decrementPendingReqCount (const struct block_request * b)
827{
828  if (b->peer != NULL)
829    if (b->peer->pendingReqsToPeer > 0)
830      --b->peer->pendingReqsToPeer;
831}
832
833static void
834requestListRemove (tr_swarm * s, tr_block_index_t block, const tr_peer * peer)
835{
836  const struct block_request * b = requestListLookup (s, block, peer);
837
838  if (b != NULL)
839    {
840      const int pos = b - s->requests;
841      assert (pos < s->requestCount);
842
843      decrementPendingReqCount (b);
844
845      tr_removeElementFromArray (s->requests,
846                                 pos,
847                                 sizeof (struct block_request),
848                                 s->requestCount--);
849
850      /*fprintf (stderr, "removing request of block %lu from peer %s... "
851                         "there are now %d block requests left\n",
852                         (unsigned long)block, tr_atomAddrStr (peer->atom), t->requestCount);*/
853    }
854}
855
856static int
857countActiveWebseeds (tr_swarm * s)
858{
859  int activeCount = 0;
860
861  if (s->tor->isRunning && !tr_torrentIsSeed (s->tor))
862    {
863      int i;
864      const int n = tr_ptrArraySize (&s->webseeds);
865      const uint64_t now = tr_time_msec ();
866
867      for (i=0; i<n; ++i)
868        if (tr_peerIsTransferringPieces (tr_ptrArrayNth(&s->webseeds,i), now, TR_DOWN, NULL))
869          ++activeCount;
870    }
871
872  return activeCount;
873}
874
875static bool
876testForEndgame (const tr_swarm * s)
877{
878  /* we consider ourselves to be in endgame if the number of bytes
879     we've got requested is >= the number of bytes left to download */
880  return (s->requestCount * s->tor->blockSize)
881               >= tr_torrentGetLeftUntilDone (s->tor);
882}
883
884static void
885updateEndgame (tr_swarm * s)
886{
887  assert (s->requestCount >= 0);
888
889  if (!testForEndgame (s))
890    {
891      /* not in endgame */
892      s->endgame = 0;
893    }
894  else if (!s->endgame) /* only recalculate when endgame first begins */
895    {
896      int i;
897      int numDownloading = 0;
898      const int n = tr_ptrArraySize (&s->peers);
899
900      /* add the active bittorrent peers... */
901      for (i=0; i<n; ++i)
902        {
903          const tr_peer * p = tr_ptrArrayNth (&s->peers, i);
904          if (p->pendingReqsToPeer > 0)
905            ++numDownloading;
906        }
907
908      /* add the active webseeds... */
909      numDownloading += countActiveWebseeds (s);
910
911      /* average number of pending requests per downloading peer */
912      s->endgame = s->requestCount / MAX (numDownloading, 1);
913    }
914}
915
916
917/****
918*****
919*****  Piece List Manipulation / Accessors
920*****
921****/
922
923static inline void
924invalidatePieceSorting (tr_swarm * s)
925{
926  s->pieceSortState = PIECES_UNSORTED;
927}
928
929static const tr_torrent * weightTorrent;
930
931static const uint16_t * weightReplication;
932
933static void
934setComparePieceByWeightTorrent (tr_swarm * s)
935{
936  if (!replicationExists (s))
937    replicationNew (s);
938
939  weightTorrent = s->tor;
940  weightReplication = s->pieceReplication;
941}
942
943/* we try to create a "weight" s.t. high-priority pieces come before others,
944 * and that partially-complete pieces come before empty ones. */
945static int
946comparePieceByWeight (const void * va, const void * vb)
947{
948  const struct weighted_piece * a = va;
949  const struct weighted_piece * b = vb;
950  int ia, ib, missing, pending;
951  const tr_torrent * tor = weightTorrent;
952  const uint16_t * rep = weightReplication;
953
954  /* primary key: weight */
955  missing = tr_torrentMissingBlocksInPiece (tor, a->index);
956  pending = a->requestCount;
957  ia = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
958  missing = tr_torrentMissingBlocksInPiece (tor, b->index);
959  pending = b->requestCount;
960  ib = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
961  if (ia < ib) return -1;
962  if (ia > ib) return 1;
963
964  /* secondary key: higher priorities go first */
965  ia = tor->info.pieces[a->index].priority;
966  ib = tor->info.pieces[b->index].priority;
967  if (ia > ib) return -1;
968  if (ia < ib) return 1;
969
970  /* tertiary key: rarest first. */
971  ia = rep[a->index];
972  ib = rep[b->index];
973  if (ia < ib) return -1;
974  if (ia > ib) return 1;
975
976  /* quaternary key: random */
977  if (a->salt < b->salt) return -1;
978  if (a->salt > b->salt) return 1;
979
980  /* okay, they're equal */
981  return 0;
982}
983
984static int
985comparePieceByIndex (const void * va, const void * vb)
986{
987  const struct weighted_piece * a = va;
988  const struct weighted_piece * b = vb;
989  if (a->index < b->index) return -1;
990  if (a->index > b->index) return 1;
991  return 0;
992}
993
994static void
995pieceListSort (tr_swarm * s, enum piece_sort_state state)
996{
997  assert (state==PIECES_SORTED_BY_INDEX
998       || state==PIECES_SORTED_BY_WEIGHT);
999
1000  if (state == PIECES_SORTED_BY_WEIGHT)
1001    {
1002      setComparePieceByWeightTorrent (s);
1003      qsort (s->pieces, s->pieceCount, sizeof (struct weighted_piece), comparePieceByWeight);
1004    }
1005  else
1006    {
1007      qsort (s->pieces, s->pieceCount, sizeof (struct weighted_piece), comparePieceByIndex);
1008    }
1009
1010  s->pieceSortState = state;
1011}
1012
1013/**
1014 * These functions are useful for testing, but too expensive for nightly builds.
1015 * let's leave it disabled but add an easy hook to compile it back in
1016 */
1017#if 1
1018#define assertWeightedPiecesAreSorted(t)
1019#define assertReplicationCountIsExact(t)
1020#else
1021static void
1022assertWeightedPiecesAreSorted (Torrent * t)
1023{
1024    if (!t->endgame)
1025    {
1026        int i;
1027        setComparePieceByWeightTorrent (t);
1028        for (i=0; i<t->pieceCount-1; ++i)
1029            assert (comparePieceByWeight (&t->pieces[i], &t->pieces[i+1]) <= 0);
1030    }
1031}
1032static void
1033assertReplicationCountIsExact (Torrent * t)
1034{
1035    /* This assert might fail due to errors of implementations in other
1036     * clients. It happens when receiving duplicate bitfields/HaveAll/HaveNone
1037     * from a client. If a such a behavior is noticed,
1038     * a bug report should be filled to the faulty client. */
1039
1040    size_t piece_i;
1041    const uint16_t * rep = t->pieceReplication;
1042    const size_t piece_count = t->pieceReplicationSize;
1043    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase (&t->peers);
1044    const int peer_count = tr_ptrArraySize (&t->peers);
1045
1046    assert (piece_count == t->tor->info.pieceCount);
1047
1048    for (piece_i=0; piece_i<piece_count; ++piece_i)
1049    {
1050        int peer_i;
1051        uint16_t r = 0;
1052
1053        for (peer_i=0; peer_i<peer_count; ++peer_i)
1054            if (tr_bitsetHas (&peers[peer_i]->have, piece_i))
1055                ++r;
1056
1057        assert (rep[piece_i] == r);
1058    }
1059}
1060#endif
1061
1062static struct weighted_piece *
1063pieceListLookup (tr_swarm * s, tr_piece_index_t index)
1064{
1065  int i;
1066
1067  for (i=0; i<s->pieceCount; ++i)
1068    if (s->pieces[i].index == index)
1069      return &s->pieces[i];
1070
1071  return NULL;
1072}
1073
1074static void
1075pieceListRebuild (tr_swarm * s)
1076{
1077  if (!tr_torrentIsSeed (s->tor))
1078    {
1079      tr_piece_index_t i;
1080      tr_piece_index_t * pool;
1081      tr_piece_index_t poolCount = 0;
1082      const tr_torrent * tor = s->tor;
1083      const tr_info * inf = tr_torrentInfo (tor);
1084      struct weighted_piece * pieces;
1085      int pieceCount;
1086
1087      /* build the new list */
1088      pool = tr_new (tr_piece_index_t, inf->pieceCount);
1089      for (i=0; i<inf->pieceCount; ++i)
1090        if (!inf->pieces[i].dnd)
1091          if (!tr_torrentPieceIsComplete (tor, i))
1092            pool[poolCount++] = i;
1093      pieceCount = poolCount;
1094      pieces = tr_new0 (struct weighted_piece, pieceCount);
1095      for (i=0; i<poolCount; ++i)
1096        {
1097          struct weighted_piece * piece = pieces + i;
1098          piece->index = pool[i];
1099          piece->requestCount = 0;
1100          piece->salt = tr_rand_int_weak (4096);
1101        }
1102
1103      /* if we already had a list of pieces, merge it into
1104       * the new list so we don't lose its requestCounts */
1105      if (s->pieces != NULL)
1106        {
1107          struct weighted_piece * o = s->pieces;
1108          struct weighted_piece * oend = o + s->pieceCount;
1109          struct weighted_piece * n = pieces;
1110          struct weighted_piece * nend = n + pieceCount;
1111
1112          pieceListSort (s, PIECES_SORTED_BY_INDEX);
1113
1114          while (o!=oend && n!=nend)
1115            {
1116              if (o->index < n->index)
1117                ++o;
1118              else if (o->index > n->index)
1119                ++n;
1120              else
1121                *n++ = *o++;
1122            }
1123
1124          tr_free (s->pieces);
1125        }
1126
1127      s->pieces = pieces;
1128      s->pieceCount = pieceCount;
1129
1130      pieceListSort (s, PIECES_SORTED_BY_WEIGHT);
1131
1132      /* cleanup */
1133      tr_free (pool);
1134    }
1135}
1136
1137static void
1138pieceListRemovePiece (tr_swarm * s, tr_piece_index_t piece)
1139{
1140  struct weighted_piece * p;
1141
1142  if ((p = pieceListLookup (s, piece)))
1143    {
1144      const int pos = p - s->pieces;
1145
1146      tr_removeElementFromArray (s->pieces,
1147                                 pos,
1148                                 sizeof (struct weighted_piece),
1149                                 s->pieceCount--);
1150
1151      if (s->pieceCount == 0)
1152        {
1153          tr_free (s->pieces);
1154          s->pieces = NULL;
1155        }
1156    }
1157}
1158
1159static void
1160pieceListResortPiece (tr_swarm * s, struct weighted_piece * p)
1161{
1162  int pos;
1163  bool isSorted = true;
1164
1165  if (p == NULL)
1166    return;
1167
1168  /* is the torrent already sorted? */
1169  pos = p - s->pieces;
1170  setComparePieceByWeightTorrent (s);
1171  if (isSorted && (pos > 0) && (comparePieceByWeight (p-1, p) > 0))
1172    isSorted = false;
1173  if (isSorted && (pos < s->pieceCount - 1) && (comparePieceByWeight (p, p+1) > 0))
1174    isSorted = false;
1175
1176  if (s->pieceSortState != PIECES_SORTED_BY_WEIGHT)
1177    {
1178      pieceListSort (s, PIECES_SORTED_BY_WEIGHT);
1179      isSorted = true;
1180    }
1181
1182  /* if it's not sorted, move it around */
1183  if (!isSorted)
1184    {
1185      bool exact;
1186      const struct weighted_piece tmp = *p;
1187
1188      tr_removeElementFromArray (s->pieces,
1189                                 pos,
1190                                 sizeof (struct weighted_piece),
1191                                 s->pieceCount--);
1192
1193      pos = tr_lowerBound (&tmp, s->pieces, s->pieceCount,
1194                           sizeof (struct weighted_piece),
1195                           comparePieceByWeight, &exact);
1196
1197      memmove (&s->pieces[pos + 1],
1198               &s->pieces[pos],
1199               sizeof (struct weighted_piece) * (s->pieceCount++ - pos));
1200
1201      s->pieces[pos] = tmp;
1202    }
1203
1204  assertWeightedPiecesAreSorted (s);
1205}
1206
1207static void
1208pieceListRemoveRequest (tr_swarm * s, tr_block_index_t block)
1209{
1210  struct weighted_piece * p;
1211  const tr_piece_index_t index = tr_torBlockPiece (s->tor, block);
1212
1213  if (((p = pieceListLookup (s, index))) && (p->requestCount > 0))
1214    {
1215      --p->requestCount;
1216      pieceListResortPiece (s, p);
1217    }
1218}
1219
1220
1221/****
1222*****
1223*****  Replication count (for rarest first policy)
1224*****
1225****/
1226
1227/**
1228 * Increase the replication count of this piece and sort it if the
1229 * piece list is already sorted
1230 */
1231static void
1232tr_incrReplicationOfPiece (tr_swarm * s, const size_t index)
1233{
1234  assert (replicationExists (s));
1235  assert (s->pieceReplicationSize == s->tor->info.pieceCount);
1236
1237  /* One more replication of this piece is present in the swarm */
1238  ++s->pieceReplication[index];
1239
1240  /* we only resort the piece if the list is already sorted */
1241  if (s->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1242    pieceListResortPiece (s, pieceListLookup (s, index));
1243}
1244
1245/**
1246 * Increases the replication count of pieces present in the bitfield
1247 */
1248static void
1249tr_incrReplicationFromBitfield (tr_swarm * s, const tr_bitfield * b)
1250{
1251  size_t i;
1252  uint16_t * rep = s->pieceReplication;
1253  const size_t n = s->tor->info.pieceCount;
1254
1255  assert (replicationExists (s));
1256
1257  for (i=0; i<n; ++i)
1258    if (tr_bitfieldHas (b, i))
1259      ++rep[i];
1260
1261  if (s->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1262    invalidatePieceSorting (s);
1263}
1264
1265/**
1266 * Increase the replication count of every piece
1267 */
1268static void
1269tr_incrReplication (tr_swarm * s)
1270{
1271  int i;
1272  const int n = s->pieceReplicationSize;
1273
1274  assert (replicationExists (s));
1275  assert (s->pieceReplicationSize == s->tor->info.pieceCount);
1276
1277  for (i=0; i<n; ++i)
1278    ++s->pieceReplication[i];
1279}
1280
1281/**
1282 * Decrease the replication count of pieces present in the bitset.
1283 */
1284static void
1285tr_decrReplicationFromBitfield (tr_swarm * s, const tr_bitfield * b)
1286{
1287  int i;
1288  const int n = s->pieceReplicationSize;
1289
1290  assert (replicationExists (s));
1291  assert (s->pieceReplicationSize == s->tor->info.pieceCount);
1292
1293  if (tr_bitfieldHasAll (b))
1294    {
1295      for (i=0; i<n; ++i)
1296        --s->pieceReplication[i];
1297    }
1298  else if (!tr_bitfieldHasNone (b))
1299    {
1300      for (i=0; i<n; ++i)
1301        if (tr_bitfieldHas (b, i))
1302          --s->pieceReplication[i];
1303
1304      if (s->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1305        invalidatePieceSorting (s);
1306    }
1307}
1308
1309/**
1310***
1311**/
1312
1313void
1314tr_peerMgrRebuildRequests (tr_torrent * tor)
1315{
1316  assert (tr_isTorrent (tor));
1317
1318  pieceListRebuild (tor->swarm);
1319}
1320
1321void
1322tr_peerMgrGetNextRequests (tr_torrent           * tor,
1323                           tr_peer              * peer,
1324                           int                    numwant,
1325                           tr_block_index_t     * setme,
1326                           int                  * numgot,
1327                           bool                   get_intervals)
1328{
1329  int i;
1330  int got;
1331  tr_swarm * s;
1332  struct weighted_piece * pieces;
1333  const tr_bitfield * const have = &peer->have;
1334
1335  /* sanity clause */
1336  assert (tr_isTorrent (tor));
1337  assert (numwant > 0);
1338
1339  /* walk through the pieces and find blocks that should be requested */
1340  got = 0;
1341  s = tor->swarm;
1342
1343  /* prep the pieces list */
1344  if (s->pieces == NULL)
1345    pieceListRebuild (s);
1346
1347  if (s->pieceSortState != PIECES_SORTED_BY_WEIGHT)
1348    pieceListSort (s, PIECES_SORTED_BY_WEIGHT);
1349
1350  assertReplicationCountIsExact (s);
1351  assertWeightedPiecesAreSorted (s);
1352
1353  updateEndgame (s);
1354  pieces = s->pieces;
1355  for (i=0; i<s->pieceCount && got<numwant; ++i)
1356    {
1357      struct weighted_piece * p = pieces + i;
1358
1359      /* if the peer has this piece that we want... */
1360      if (tr_bitfieldHas (have, p->index))
1361        {
1362          tr_block_index_t b;
1363          tr_block_index_t first;
1364          tr_block_index_t last;
1365          tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1366
1367          tr_torGetPieceBlockRange (tor, p->index, &first, &last);
1368
1369          for (b=first; b<=last && (got<numwant || (get_intervals && setme[2*got-1] == b-1)); ++b)
1370            {
1371              int peerCount;
1372              tr_peer ** peers;
1373
1374              /* don't request blocks we've already got */
1375              if (tr_torrentBlockIsComplete (tor, b))
1376                continue;
1377
1378              /* always add peer if this block has no peers yet */
1379              tr_ptrArrayClear (&peerArr);
1380              getBlockRequestPeers (s, b, &peerArr);
1381              peers = (tr_peer **) tr_ptrArrayPeek (&peerArr, &peerCount);
1382              if (peerCount != 0)
1383                {
1384                  /* don't make a second block request until the endgame */
1385                  if (!s->endgame)
1386                    continue;
1387
1388                  /* don't have more than two peers requesting this block */
1389                  if (peerCount > 1)
1390                    continue;
1391
1392                  /* don't send the same request to the same peer twice */
1393                  if (peer == peers[0])
1394                    continue;
1395
1396                  /* in the endgame allow an additional peer to download a
1397                     block but only if the peer seems to be handling requests
1398                     relatively fast */
1399                  if (peer->pendingReqsToPeer + numwant - got < s->endgame)
1400                    continue;
1401                }
1402
1403              /* update the caller's table */
1404              if (!get_intervals)
1405                {
1406                  setme[got++] = b;
1407                }
1408              /* if intervals are requested two array entries are necessarry:
1409                 one for the interval's starting block and one for its end block */
1410              else if (got && setme[2 * got - 1] == b - 1 && b != first)
1411                {
1412                  /* expand the last interval */
1413                  ++setme[2 * got - 1];
1414                }
1415              else
1416                {
1417                  /* begin a new interval */
1418                  setme[2 * got] = setme[2 * got + 1] = b;
1419                  ++got;
1420                }
1421
1422              /* update our own tables */
1423              requestListAdd (s, b, peer);
1424              ++p->requestCount;
1425            }
1426
1427          tr_ptrArrayDestruct (&peerArr, NULL);
1428        }
1429    }
1430
1431  /* In most cases we've just changed the weights of a small number of pieces.
1432   * So rather than qsort ()ing the entire array, it's faster to apply an
1433   * adaptive insertion sort algorithm. */
1434  if (got > 0)
1435    {
1436      /* not enough requests || last piece modified */
1437      if (i == s->pieceCount)
1438        --i;
1439
1440      setComparePieceByWeightTorrent (s);
1441      while (--i >= 0)
1442        {
1443          bool exact;
1444
1445          /* relative position! */
1446          const int newpos = tr_lowerBound (&s->pieces[i], &s->pieces[i + 1],
1447                                            s->pieceCount - (i + 1),
1448                                            sizeof (struct weighted_piece),
1449                                            comparePieceByWeight, &exact);
1450          if (newpos > 0)
1451            {
1452              const struct weighted_piece piece = s->pieces[i];
1453              memmove (&s->pieces[i],
1454                       &s->pieces[i + 1],
1455                       sizeof (struct weighted_piece) * (newpos));
1456              s->pieces[i + newpos] = piece;
1457            }
1458        }
1459    }
1460
1461  assertWeightedPiecesAreSorted (t);
1462  *numgot = got;
1463}
1464
1465bool
1466tr_peerMgrDidPeerRequest (const tr_torrent  * tor,
1467                          const tr_peer     * peer,
1468                          tr_block_index_t    block)
1469{
1470  return requestListLookup ((tr_swarm*)tor->swarm, block, peer) != NULL;
1471}
1472
1473/* cancel requests that are too old */
1474static void
1475refillUpkeep (evutil_socket_t foo UNUSED, short bar UNUSED, void * vmgr)
1476{
1477    time_t now;
1478    time_t too_old;
1479    tr_torrent * tor;
1480    int cancel_buflen = 0;
1481    struct block_request * cancel = NULL;
1482    tr_peerMgr * mgr = vmgr;
1483    managerLock (mgr);
1484
1485    now = tr_time ();
1486    too_old = now - REQUEST_TTL_SECS;
1487
1488    /* alloc the temporary "cancel" buffer */
1489    tor = NULL;
1490    while ((tor = tr_torrentNext (mgr->session, tor)))
1491        cancel_buflen = MAX (cancel_buflen, tor->swarm->requestCount);
1492    if (cancel_buflen > 0)
1493        cancel = tr_new (struct block_request, cancel_buflen);
1494
1495    /* prune requests that are too old */
1496    tor = NULL;
1497    while ((tor = tr_torrentNext (mgr->session, tor)))
1498    {
1499        tr_swarm * s = tor->swarm;
1500        const int n = s->requestCount;
1501        if (n > 0)
1502        {
1503            int keepCount = 0;
1504            int cancelCount = 0;
1505            const struct block_request * it;
1506            const struct block_request * end;
1507
1508            for (it=s->requests, end=it+n; it!=end; ++it)
1509            {
1510                tr_peerMsgs * msgs = PEER_MSGS(it->peer);
1511
1512                if ((msgs !=NULL) && (it->sentAt <= too_old) && !tr_peerMsgsIsReadingBlock (msgs, it->block))
1513                    cancel[cancelCount++] = *it;
1514                else
1515                {
1516                    if (it != &s->requests[keepCount])
1517                        s->requests[keepCount] = *it;
1518                    keepCount++;
1519                }
1520            }
1521
1522            /* prune out the ones we aren't keeping */
1523            s->requestCount = keepCount;
1524
1525            /* send cancel messages for all the "cancel" ones */
1526            for (it=cancel, end=it+cancelCount; it!=end; ++it)
1527            {
1528              tr_peerMsgs * msgs = PEER_MSGS(it->peer);
1529
1530              if (msgs != NULL)
1531                {
1532                  tr_historyAdd (&it->peer->cancelsSentToPeer, now, 1);
1533                  tr_peerMsgsCancel (msgs, it->block);
1534                  decrementPendingReqCount (it);
1535                }
1536            }
1537
1538            /* decrement the pending request counts for the timed-out blocks */
1539            for (it=cancel, end=it+cancelCount; it!=end; ++it)
1540                pieceListRemoveRequest (s, it->block);
1541        }
1542    }
1543
1544    tr_free (cancel);
1545    tr_timerAddMsec (mgr->refillUpkeepTimer, REFILL_UPKEEP_PERIOD_MSEC);
1546    managerUnlock (mgr);
1547}
1548
1549static void
1550addStrike (tr_swarm * s, tr_peer * peer)
1551{
1552  tordbg (s, "increasing peer %s strike count to %d",
1553          tr_atomAddrStr (peer->atom), peer->strikes + 1);
1554
1555  if (++peer->strikes >= MAX_BAD_PIECES_PER_PEER)
1556    {
1557      struct peer_atom * atom = peer->atom;
1558      atom->flags2 |= MYFLAG_BANNED;
1559      peer->doPurge = true;
1560      tordbg (s, "banning peer %s", tr_atomAddrStr (atom));
1561    }
1562}
1563
1564static void
1565peerSuggestedPiece (tr_swarm           * s UNUSED,
1566                    tr_peer            * peer UNUSED,
1567                    tr_piece_index_t     pieceIndex UNUSED,
1568                    int                  isFastAllowed UNUSED)
1569{
1570#if 0
1571    assert (t);
1572    assert (peer);
1573    assert (peer->msgs);
1574
1575    /* is this a valid piece? */
1576    if (pieceIndex >= t->tor->info.pieceCount)
1577        return;
1578
1579    /* don't ask for it if we've already got it */
1580    if (tr_torrentPieceIsComplete (t->tor, pieceIndex))
1581        return;
1582
1583    /* don't ask for it if they don't have it */
1584    if (!tr_bitfieldHas (peer->have, pieceIndex))
1585        return;
1586
1587    /* don't ask for it if we're choked and it's not fast */
1588    if (!isFastAllowed && peer->clientIsChoked)
1589        return;
1590
1591    /* request the blocks that we don't have in this piece */
1592    {
1593        tr_block_index_t b;
1594        tr_block_index_t first;
1595        tr_block_index_t last;
1596        const tr_torrent * tor = t->tor;
1597
1598        tr_torGetPieceBlockRange (t->tor, pieceIndex, &first, &last);
1599
1600        for (b=first; b<=last; ++b)
1601        {
1602            if (tr_torrentBlockIsComplete (tor, b))
1603            {
1604                const uint32_t offset = getBlockOffsetInPiece (tor, b);
1605                const uint32_t length = tr_torBlockCountBytes (tor, b);
1606                tr_peerMsgsAddRequest (peer->msgs, pieceIndex, offset, length);
1607                incrementPieceRequests (t, pieceIndex);
1608            }
1609        }
1610    }
1611#endif
1612}
1613
1614static void
1615removeRequestFromTables (tr_swarm * s, tr_block_index_t block, const tr_peer * peer)
1616{
1617  requestListRemove (s, block, peer);
1618  pieceListRemoveRequest (s, block);
1619}
1620
1621/* peer choked us, or maybe it disconnected.
1622   either way we need to remove all its requests */
1623static void
1624peerDeclinedAllRequests (tr_swarm * s, const tr_peer * peer)
1625{
1626  int i, n;
1627  tr_block_index_t * blocks = tr_new (tr_block_index_t, s->requestCount);
1628
1629  for (i=n=0; i<s->requestCount; ++i)
1630    if (peer == s->requests[i].peer)
1631      blocks[n++] = s->requests[i].block;
1632
1633  for (i=0; i<n; ++i)
1634    removeRequestFromTables (s, blocks[i], peer);
1635
1636  tr_free (blocks);
1637}
1638
1639static void
1640cancelAllRequestsForBlock (tr_swarm          * s,
1641                           tr_block_index_t    block,
1642                           tr_peer           * no_notify)
1643{
1644  int i;
1645  int peerCount;
1646  tr_peer ** peers;
1647  tr_ptrArray peerArr;
1648
1649  peerArr = TR_PTR_ARRAY_INIT;
1650  getBlockRequestPeers (s, block, &peerArr);
1651  peers = (tr_peer **) tr_ptrArrayPeek (&peerArr, &peerCount);
1652  for (i=0; i<peerCount; ++i)
1653    {
1654      tr_peer * p = peers[i];
1655
1656      if ((p != no_notify) && tr_isPeerMsgs (p))
1657        {
1658          tr_historyAdd (&p->cancelsSentToPeer, tr_time (), 1);
1659          tr_peerMsgsCancel (PEER_MSGS(p), block);
1660        }
1661
1662      removeRequestFromTables (s, block, p);
1663    }
1664
1665  tr_ptrArrayDestruct (&peerArr, NULL);
1666}
1667
1668void
1669tr_peerMgrPieceCompleted (tr_torrent * tor, tr_piece_index_t p)
1670{
1671  int i;
1672  bool pieceCameFromPeers = false;
1673  tr_swarm * const s = tor->swarm;
1674  const int n = tr_ptrArraySize (&s->peers);
1675
1676  /* walk through our peers */
1677  for (i=0; i<n; ++i)
1678    {
1679      tr_peer * peer = tr_ptrArrayNth (&s->peers, i);
1680
1681      /* notify the peer that we now have this piece */
1682      tr_peerMsgsHave (PEER_MSGS(peer), p);
1683
1684      if (!pieceCameFromPeers)
1685        pieceCameFromPeers = tr_bitfieldHas (&peer->blame, p);
1686    }
1687
1688  if (pieceCameFromPeers) /* webseed downloads don't belong in announce totals */
1689    tr_announcerAddBytes (tor, TR_ANN_DOWN, tr_torPieceCountBytes (tor, p));
1690
1691  /* bookkeeping */
1692  pieceListRemovePiece (s, p);
1693  s->needsCompletenessCheck = true;
1694}
1695
1696static void
1697peerCallbackFunc (tr_peer * peer, const tr_peer_event * e, void * vs)
1698{
1699  tr_swarm * s = vs;
1700
1701  swarmLock (s);
1702
1703  assert (peer != NULL);
1704
1705  switch (e->eventType)
1706    {
1707      case TR_PEER_PEER_GOT_PIECE_DATA:
1708        {
1709          const time_t now = tr_time ();
1710          tr_torrent * tor = s->tor;
1711
1712          tor->uploadedCur += e->length;
1713          tr_announcerAddBytes (tor, TR_ANN_UP, e->length);
1714          tr_torrentSetActivityDate (tor, now);
1715          tr_torrentSetDirty (tor);
1716          tr_statsAddUploaded (tor->session, e->length);
1717
1718          if (peer->atom != NULL)
1719            peer->atom->piece_data_time = now;
1720
1721          break;
1722        }
1723
1724      case TR_PEER_CLIENT_GOT_PIECE_DATA:
1725        {
1726          const time_t now = tr_time ();
1727          tr_torrent * tor = s->tor;
1728
1729          tor->downloadedCur += e->length;
1730          tr_torrentSetActivityDate (tor, now);
1731          tr_torrentSetDirty (tor);
1732
1733          tr_statsAddDownloaded (tor->session, e->length);
1734
1735          if (peer->atom != NULL)
1736            peer->atom->piece_data_time = now;
1737
1738          break;
1739        }
1740
1741      case TR_PEER_CLIENT_GOT_HAVE:
1742        if (replicationExists (s))
1743          {
1744            tr_incrReplicationOfPiece (s, e->pieceIndex);
1745            assertReplicationCountIsExact (s);
1746          }
1747        break;
1748
1749      case TR_PEER_CLIENT_GOT_HAVE_ALL:
1750        if (replicationExists (s))
1751          {
1752            tr_incrReplication (s);
1753            assertReplicationCountIsExact (s);
1754          }
1755        break;
1756
1757      case TR_PEER_CLIENT_GOT_HAVE_NONE:
1758        /* noop */
1759        break;
1760
1761      case TR_PEER_CLIENT_GOT_BITFIELD:
1762        assert (e->bitfield != NULL);
1763        if (replicationExists (s))
1764          {
1765            tr_incrReplicationFromBitfield (s, e->bitfield);
1766            assertReplicationCountIsExact (s);
1767          }
1768        break;
1769
1770      case TR_PEER_CLIENT_GOT_REJ:
1771        {
1772          tr_block_index_t b = _tr_block (s->tor, e->pieceIndex, e->offset);
1773          if (b < s->tor->blockCount)
1774            removeRequestFromTables (s, b, peer);
1775          else
1776            tordbg (s, "Peer %s sent an out-of-range reject message",
1777                    tr_atomAddrStr (peer->atom));
1778          break;
1779        }
1780
1781      case TR_PEER_CLIENT_GOT_CHOKE:
1782        peerDeclinedAllRequests (s, peer);
1783        break;
1784
1785      case TR_PEER_CLIENT_GOT_PORT:
1786        if (peer->atom)
1787          peer->atom->port = e->port;
1788        break;
1789
1790      case TR_PEER_CLIENT_GOT_SUGGEST:
1791        peerSuggestedPiece (s, peer, e->pieceIndex, false);
1792        break;
1793
1794      case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1795        peerSuggestedPiece (s, peer, e->pieceIndex, true);
1796        break;
1797
1798      case TR_PEER_CLIENT_GOT_BLOCK:
1799        {
1800          tr_torrent * tor = s->tor;
1801          const tr_piece_index_t p = e->pieceIndex;
1802          const tr_block_index_t block = _tr_block (tor, p, e->offset);
1803          cancelAllRequestsForBlock (s, block, peer);
1804          tr_historyAdd (&peer->blocksSentToClient, tr_time(), 1);
1805          pieceListResortPiece (s, pieceListLookup (s, p));
1806          tr_torrentGotBlock (tor, block);
1807          break;
1808        }
1809
1810      case TR_PEER_ERROR:
1811        if ((e->err == ERANGE) || (e->err == EMSGSIZE) || (e->err == ENOTCONN))
1812          {
1813            /* some protocol error from the peer */
1814            peer->doPurge = true;
1815            tordbg (s, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1816                    tr_atomAddrStr (peer->atom));
1817          }
1818        else
1819          {
1820            tordbg (s, "unhandled error: %s", tr_strerror (e->err));
1821          }
1822        break;
1823
1824      default:
1825        assert (0);
1826    }
1827
1828  swarmUnlock (s);
1829}
1830
1831static int
1832getDefaultShelfLife (uint8_t from)
1833{
1834    /* in general, peers obtained from firsthand contact
1835     * are better than those from secondhand, etc etc */
1836    switch (from)
1837    {
1838        case TR_PEER_FROM_INCOMING : return 60 * 60 * 6;
1839        case TR_PEER_FROM_LTEP     : return 60 * 60 * 6;
1840        case TR_PEER_FROM_TRACKER  : return 60 * 60 * 3;
1841        case TR_PEER_FROM_DHT      : return 60 * 60 * 3;
1842        case TR_PEER_FROM_PEX      : return 60 * 60 * 2;
1843        case TR_PEER_FROM_RESUME   : return 60 * 60;
1844        case TR_PEER_FROM_LPD      : return 10 * 60;
1845        default                    : return 60 * 60;
1846    }
1847}
1848
1849static void
1850ensureAtomExists (tr_swarm          * s,
1851                  const tr_address  * addr,
1852                  const tr_port       port,
1853                  const uint8_t       flags,
1854                  const int8_t        seedProbability,
1855                  const uint8_t       from)
1856{
1857  struct peer_atom * a;
1858
1859  assert (tr_address_is_valid (addr));
1860  assert (from < TR_PEER_FROM__MAX);
1861
1862  a = getExistingAtom (s, addr);
1863
1864  if (a == NULL)
1865    {
1866      const int jitter = tr_rand_int_weak (60*10);
1867      a = tr_new0 (struct peer_atom, 1);
1868      a->addr = *addr;
1869      a->port = port;
1870      a->flags = flags;
1871      a->fromFirst = from;
1872      a->fromBest = from;
1873      a->shelf_date = tr_time () + getDefaultShelfLife (from) + jitter;
1874      a->blocklisted = -1;
1875      atomSetSeedProbability (a, seedProbability);
1876      tr_ptrArrayInsertSorted (&s->pool, a, compareAtomsByAddress);
1877
1878      tordbg (s, "got a new atom: %s", tr_atomAddrStr (a));
1879    }
1880  else
1881    {
1882      if (from < a->fromBest)
1883        a->fromBest = from;
1884
1885      if (a->seedProbability == -1)
1886        atomSetSeedProbability (a, seedProbability);
1887
1888      a->flags |= flags;
1889    }
1890}
1891
1892static int
1893getMaxPeerCount (const tr_torrent * tor)
1894{
1895  return tor->maxConnectedPeers;
1896}
1897
1898static int
1899getPeerCount (const tr_swarm * s)
1900{
1901  return tr_ptrArraySize (&s->peers);/* + tr_ptrArraySize (&t->outgoingHandshakes); */
1902}
1903
1904
1905static void
1906createBitTorrentPeer (tr_torrent       * tor,
1907                      struct tr_peerIo * io,
1908                      struct peer_atom * atom,
1909                      tr_quark           client)
1910{
1911  tr_peer * peer;
1912  tr_peerMsgs * msgs;
1913  tr_swarm * swarm;
1914
1915  assert (atom != NULL);
1916  assert (tr_isTorrent (tor));
1917  assert (tor->swarm != NULL);
1918
1919  swarm = tor->swarm;
1920
1921  peer = (tr_peer*) tr_peerMsgsNew (tor, io, peerCallbackFunc, swarm);
1922  peer->atom = atom;
1923  peer->client = client;
1924  atom->peer = peer;
1925
1926  tr_ptrArrayInsertSorted (&swarm->peers, peer, peerCompare);
1927  ++swarm->stats.peerCount;
1928  ++swarm->stats.peerFromCount[atom->fromFirst];
1929
1930  assert (swarm->stats.peerCount == tr_ptrArraySize (&swarm->peers));
1931  assert (swarm->stats.peerFromCount[atom->fromFirst] <= swarm->stats.peerCount);
1932
1933  msgs = PEER_MSGS (peer);
1934  tr_peerMsgsUpdateActive (msgs, TR_UP);
1935  tr_peerMsgsUpdateActive (msgs, TR_DOWN);
1936}
1937
1938
1939/* FIXME: this is kind of a mess. */
1940static bool
1941myHandshakeDoneCB (tr_handshake  * handshake,
1942                   tr_peerIo     * io,
1943                   bool            readAnythingFromPeer,
1944                   bool            isConnected,
1945                   const uint8_t * peer_id,
1946                   void          * vmanager)
1947{
1948  bool ok = isConnected;
1949  bool success = false;
1950  tr_port port;
1951  const tr_address * addr;
1952  tr_peerMgr * manager = vmanager;
1953  tr_swarm  * s;
1954
1955  assert (io);
1956  assert (tr_isBool (ok));
1957
1958  s = tr_peerIoHasTorrentHash (io)
1959    ? getExistingSwarm (manager, tr_peerIoGetTorrentHash (io))
1960    : NULL;
1961
1962  if (tr_peerIoIsIncoming (io))
1963    tr_ptrArrayRemoveSortedPointer (&manager->incomingHandshakes,
1964                                    handshake, handshakeCompare);
1965  else if (s)
1966    tr_ptrArrayRemoveSortedPointer (&s->outgoingHandshakes,
1967                                    handshake, handshakeCompare);
1968
1969  if (s)
1970    swarmLock (s);
1971
1972  addr = tr_peerIoGetAddress (io, &port);
1973
1974  if (!ok || !s || !s->isRunning)
1975    {
1976      if (s)
1977        {
1978          struct peer_atom * atom = getExistingAtom (s, addr);
1979          if (atom)
1980            {
1981              ++atom->numFails;
1982
1983              if (!readAnythingFromPeer)
1984                {
1985                  tordbg (s, "marking peer %s as unreachable... numFails is %d", tr_atomAddrStr (atom), (int)atom->numFails);
1986                  atom->flags2 |= MYFLAG_UNREACHABLE;
1987                }
1988            }
1989        }
1990    }
1991  else /* looking good */
1992    {
1993      struct peer_atom * atom;
1994
1995      ensureAtomExists (s, addr, port, 0, -1, TR_PEER_FROM_INCOMING);
1996      atom = getExistingAtom (s, addr);
1997      atom->time = tr_time ();
1998      atom->piece_data_time = 0;
1999      atom->lastConnectionAt = tr_time ();
2000
2001      if (!tr_peerIoIsIncoming (io))
2002        {
2003          atom->flags |= ADDED_F_CONNECTABLE;
2004          atom->flags2 &= ~MYFLAG_UNREACHABLE;
2005        }
2006
2007      /* In principle, this flag specifies whether the peer groks uTP,
2008         not whether it's currently connected over uTP. */
2009      if (io->utp_socket)
2010        atom->flags |= ADDED_F_UTP_FLAGS;
2011
2012      if (atom->flags2 & MYFLAG_BANNED)
2013        {
2014          tordbg (s, "banned peer %s tried to reconnect",
2015                  tr_atomAddrStr (atom));
2016        }
2017      else if (tr_peerIoIsIncoming (io) && (getPeerCount (s) >= getMaxPeerCount (s->tor)))
2018        {
2019        }
2020      else
2021        {
2022          tr_peer * peer = atom->peer;
2023
2024          if (peer) /* we already have this peer */
2025            {
2026            }
2027          else
2028            {
2029              tr_quark client;
2030              tr_peerIo * io;
2031              char buf[128];
2032
2033              if (peer_id != NULL)
2034                client = tr_quark_new (tr_clientForId (buf, sizeof (buf), peer_id), -1);
2035              else
2036                client = TR_KEY_NONE;
2037
2038              io = tr_handshakeStealIO (handshake); /* this steals its refcount too, which is
2039                                                       balanced by our unref in peerDelete ()  */
2040              tr_peerIoSetParent (io, &s->tor->bandwidth);
2041              createBitTorrentPeer (s->tor, io, atom, client);
2042
2043              success = true;
2044            }
2045        }
2046    }
2047
2048  if (s != NULL)
2049    swarmUnlock (s);
2050
2051  return success;
2052}
2053
2054void
2055tr_peerMgrAddIncoming (tr_peerMgr       * manager,
2056                       tr_address       * addr,
2057                       tr_port            port,
2058                       tr_socket_t        socket,
2059                       struct UTPSocket * utp_socket)
2060{
2061  tr_session * session;
2062
2063  managerLock (manager);
2064
2065  assert (tr_isSession (manager->session));
2066  session = manager->session;
2067
2068  if (tr_sessionIsAddressBlocked (session, addr))
2069    {
2070      tr_logAddDebug ("Banned IP address \"%s\" tried to connect to us", tr_address_to_string (addr));
2071      if (socket != TR_BAD_SOCKET)
2072        tr_netClose (session, socket);
2073      else
2074        UTP_Close (utp_socket);
2075    }
2076  else if (getExistingHandshake (&manager->incomingHandshakes, addr))
2077    {
2078      if (socket != TR_BAD_SOCKET)
2079        tr_netClose (session, socket);
2080      else
2081        UTP_Close (utp_socket);
2082    }
2083  else /* we don't have a connection to them yet... */
2084    {
2085      tr_peerIo *    io;
2086      tr_handshake * handshake;
2087
2088      io = tr_peerIoNewIncoming (session, &session->bandwidth, addr, port, socket, utp_socket);
2089
2090      handshake = tr_handshakeNew (io,
2091                                   session->encryptionMode,
2092                                   myHandshakeDoneCB,
2093                                   manager);
2094
2095      tr_peerIoUnref (io); /* balanced by the implicit ref in tr_peerIoNewIncoming () */
2096
2097      tr_ptrArrayInsertSorted (&manager->incomingHandshakes, handshake,
2098                               handshakeCompare);
2099    }
2100
2101  managerUnlock (manager);
2102}
2103
2104void
2105tr_peerMgrAddPex (tr_torrent * tor, uint8_t from,
2106                  const tr_pex * pex, int8_t seedProbability)
2107{
2108  if (tr_isPex (pex)) /* safeguard against corrupt data */
2109    {
2110      tr_swarm * s = tor->swarm;
2111      managerLock (s->manager);
2112
2113      if (!tr_sessionIsAddressBlocked (s->manager->session, &pex->addr))
2114        if (tr_address_is_valid_for_peers (&pex->addr, pex->port))
2115          ensureAtomExists (s, &pex->addr, pex->port, pex->flags, seedProbability, from);
2116
2117      managerUnlock (s->manager);
2118    }
2119}
2120
2121void
2122tr_peerMgrMarkAllAsSeeds (tr_torrent * tor)
2123{
2124  tr_swarm * s = tor->swarm;
2125  const int n = tr_ptrArraySize (&s->pool);
2126  struct peer_atom ** it = (struct peer_atom**) tr_ptrArrayBase (&s->pool);
2127  struct peer_atom ** end = it + n;
2128
2129  while (it != end)
2130    atomSetSeed (s, *it++);
2131}
2132
2133tr_pex *
2134tr_peerMgrCompactToPex (const void    * compact,
2135                        size_t          compactLen,
2136                        const uint8_t * added_f,
2137                        size_t          added_f_len,
2138                        size_t        * pexCount)
2139{
2140  size_t i;
2141  size_t n = compactLen / 6;
2142  const uint8_t * walk = compact;
2143  tr_pex * pex = tr_new0 (tr_pex, n);
2144
2145  for (i=0; i<n; ++i)
2146    {
2147      pex[i].addr.type = TR_AF_INET;
2148      memcpy (&pex[i].addr.addr, walk, 4); walk += 4;
2149      memcpy (&pex[i].port, walk, 2); walk += 2;
2150      if (added_f && (n == added_f_len))
2151        pex[i].flags = added_f[i];
2152    }
2153
2154  *pexCount = n;
2155  return pex;
2156}
2157
2158tr_pex *
2159tr_peerMgrCompact6ToPex (const void    * compact,
2160                         size_t          compactLen,
2161                         const uint8_t * added_f,
2162                         size_t          added_f_len,
2163                         size_t        * pexCount)
2164{
2165  size_t i;
2166  size_t n = compactLen / 18;
2167  const uint8_t * walk = compact;
2168  tr_pex * pex = tr_new0 (tr_pex, n);
2169
2170  for (i=0; i<n; ++i)
2171    {
2172      pex[i].addr.type = TR_AF_INET6;
2173      memcpy (&pex[i].addr.addr.addr6.s6_addr, walk, 16); walk += 16;
2174      memcpy (&pex[i].port, walk, 2); walk += 2;
2175      if (added_f && (n == added_f_len))
2176        pex[i].flags = added_f[i];
2177    }
2178
2179  *pexCount = n;
2180  return pex;
2181}
2182
2183tr_pex *
2184tr_peerMgrArrayToPex (const void  * array,
2185                      size_t        arrayLen,
2186                      size_t      * pexCount)
2187{
2188  size_t i;
2189  size_t n = arrayLen / (sizeof (tr_address) + 2);
2190  const uint8_t * walk = array;
2191  tr_pex * pex = tr_new0 (tr_pex, n);
2192
2193  for (i=0 ; i<n ; ++i)
2194    {
2195      memcpy (&pex[i].addr, walk, sizeof (tr_address));
2196      memcpy (&pex[i].port, walk + sizeof (tr_address), 2);
2197      pex[i].flags = 0x00;
2198      walk += sizeof (tr_address) + 2;
2199    }
2200
2201  *pexCount = n;
2202  return pex;
2203}
2204
2205/**
2206***
2207**/
2208
2209void
2210tr_peerMgrGotBadPiece (tr_torrent * tor, tr_piece_index_t pieceIndex)
2211{
2212  int i;
2213  int n;
2214  tr_swarm * s = tor->swarm;
2215  const uint32_t byteCount = tr_torPieceCountBytes (tor, pieceIndex);
2216
2217  for (i=0, n=tr_ptrArraySize(&s->peers); i!=n; ++i)
2218    {
2219      tr_peer * peer = tr_ptrArrayNth (&s->peers, i);
2220
2221      if (tr_bitfieldHas (&peer->blame, pieceIndex))
2222        {
2223          tordbg (s, "peer %s contributed to corrupt piece (%d); now has %d strikes",
2224                  tr_atomAddrStr(peer->atom), pieceIndex, (int)peer->strikes + 1);
2225          addStrike (s, peer);
2226        }
2227    }
2228
2229
2230  tr_announcerAddBytes (tor, TR_ANN_CORRUPT, byteCount);
2231}
2232
2233int
2234tr_pexCompare (const void * va, const void * vb)
2235{
2236  int i;
2237  const tr_pex * a = va;
2238  const tr_pex * b = vb;
2239
2240  assert (tr_isPex (a));
2241  assert (tr_isPex (b));
2242
2243  if ((i = tr_address_compare (&a->addr, &b->addr)))
2244    return i;
2245
2246  if (a->port != b->port)
2247    return a->port < b->port ? -1 : 1;
2248
2249  return 0;
2250}
2251
2252/* better goes first */
2253static int
2254compareAtomsByUsefulness (const void * va, const void *vb)
2255{
2256  const struct peer_atom * a = * (const struct peer_atom**) va;
2257  const struct peer_atom * b = * (const struct peer_atom**) vb;
2258
2259  assert (tr_isAtom (a));
2260  assert (tr_isAtom (b));
2261
2262  if (a->piece_data_time != b->piece_data_time)
2263    return a->piece_data_time > b->piece_data_time ? -1 : 1;
2264  if (a->fromBest != b->fromBest)
2265    return a->fromBest < b->fromBest ? -1 : 1;
2266  if (a->numFails != b->numFails)
2267    return a->numFails < b->numFails ? -1 : 1;
2268
2269  return 0;
2270}
2271
2272static bool
2273isAtomInteresting (const tr_torrent * tor, struct peer_atom * atom)
2274{
2275  if (tr_torrentIsSeed (tor) && atomIsSeed (atom))
2276    return false;
2277
2278  if (peerIsInUse (tor->swarm, atom))
2279    return true;
2280
2281  if (isAtomBlocklisted (tor->session, atom))
2282    return false;
2283
2284  if (atom->flags2 & MYFLAG_BANNED)
2285    return false;
2286
2287  return true;
2288}
2289
2290int
2291tr_peerMgrGetPeers (tr_torrent   * tor,
2292                    tr_pex      ** setme_pex,
2293                    uint8_t        af,
2294                    uint8_t        list_mode,
2295                    int            maxCount)
2296{
2297  int i;
2298  int n;
2299  int count = 0;
2300  int atomCount = 0;
2301  const tr_swarm * s = tor->swarm;
2302  struct peer_atom ** atoms = NULL;
2303  tr_pex * pex;
2304  tr_pex * walk;
2305
2306  assert (tr_isTorrent (tor));
2307  assert (setme_pex != NULL);
2308  assert (af==TR_AF_INET || af==TR_AF_INET6);
2309  assert (list_mode==TR_PEERS_CONNECTED || list_mode==TR_PEERS_INTERESTING);
2310
2311  managerLock (s->manager);
2312
2313  /**
2314  ***  build a list of atoms
2315  **/
2316
2317  if (list_mode == TR_PEERS_CONNECTED) /* connected peers only */
2318    {
2319      int i;
2320      const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase (&s->peers);
2321      atomCount = tr_ptrArraySize (&s->peers);
2322      atoms = tr_new (struct peer_atom *, atomCount);
2323      for (i=0; i<atomCount; ++i)
2324        atoms[i] = peers[i]->atom;
2325    }
2326  else /* TR_PEERS_INTERESTING */
2327    {
2328      int i;
2329      struct peer_atom ** atomBase = (struct peer_atom**) tr_ptrArrayBase (&s->pool);
2330      n = tr_ptrArraySize (&s->pool);
2331      atoms = tr_new (struct peer_atom *, n);
2332      for (i=0; i<n; ++i)
2333        if (isAtomInteresting (tor, atomBase[i]))
2334          atoms[atomCount++] = atomBase[i];
2335    }
2336
2337  qsort (atoms, atomCount, sizeof (struct peer_atom *), compareAtomsByUsefulness);
2338
2339  /**
2340  ***  add the first N of them into our return list
2341  **/
2342
2343  n = MIN (atomCount, maxCount);
2344  pex = walk = tr_new0 (tr_pex, n);
2345
2346  for (i=0; i<atomCount && count<n; ++i)
2347    {
2348      const struct peer_atom * atom = atoms[i];
2349      if (atom->addr.type == af)
2350        {
2351          assert (tr_address_is_valid (&atom->addr));
2352          walk->addr = atom->addr;
2353          walk->port = atom->port;
2354          walk->flags = atom->flags;
2355          ++count;
2356          ++walk;
2357        }
2358    }
2359
2360  qsort (pex, count, sizeof (tr_pex), tr_pexCompare);
2361
2362  assert ((walk - pex) == count);
2363  *setme_pex = pex;
2364
2365  /* cleanup */
2366  tr_free (atoms);
2367  managerUnlock (s->manager);
2368  return count;
2369}
2370
2371static void atomPulse      (evutil_socket_t, short, void *);
2372static void bandwidthPulse (evutil_socket_t, short, void *);
2373static void rechokePulse   (evutil_socket_t, short, void *);
2374static void reconnectPulse (evutil_socket_t, short, void *);
2375
2376static struct event *
2377createTimer (tr_session * session, int msec, event_callback_fn callback, void * cbdata)
2378{
2379  struct event * timer = evtimer_new (session->event_base, callback, cbdata);
2380  tr_timerAddMsec (timer, msec);
2381  return timer;
2382}
2383
2384static void
2385ensureMgrTimersExist (struct tr_peerMgr * m)
2386{
2387  if (m->atomTimer == NULL)
2388    m->atomTimer = createTimer (m->session, ATOM_PERIOD_MSEC, atomPulse, m);
2389
2390  if (m->bandwidthTimer == NULL)
2391    m->bandwidthTimer = createTimer (m->session, BANDWIDTH_PERIOD_MSEC, bandwidthPulse, m);
2392
2393  if (m->rechokeTimer == NULL)
2394    m->rechokeTimer = createTimer (m->session, RECHOKE_PERIOD_MSEC, rechokePulse, m);
2395
2396  if (m->refillUpkeepTimer == NULL)
2397    m->refillUpkeepTimer = createTimer (m->session, REFILL_UPKEEP_PERIOD_MSEC, refillUpkeep, m);
2398}
2399
2400void
2401tr_peerMgrStartTorrent (tr_torrent * tor)
2402{
2403  tr_swarm * s;
2404
2405  assert (tr_isTorrent (tor));
2406  assert (tr_torrentIsLocked (tor));
2407
2408  s = tor->swarm;
2409  ensureMgrTimersExist (s->manager);
2410
2411  s->isRunning = true;
2412  s->maxPeers = tor->maxConnectedPeers;
2413  s->pieceSortState = PIECES_UNSORTED;
2414
2415  rechokePulse (0, 0, s->manager);
2416}
2417
2418static void removeAllPeers (tr_swarm *);
2419
2420static void
2421stopSwarm (tr_swarm * swarm)
2422{
2423  swarm->isRunning = false;
2424
2425  replicationFree (swarm);
2426  invalidatePieceSorting (swarm);
2427
2428  removeAllPeers (swarm);
2429
2430  /* disconnect the handshakes. handshakeAbort calls handshakeDoneCB (),
2431   * which removes the handshake from t->outgoingHandshakes... */
2432  while (!tr_ptrArrayEmpty (&swarm->outgoingHandshakes))
2433    tr_handshakeAbort (tr_ptrArrayNth (&swarm->outgoingHandshakes, 0));
2434}
2435
2436void
2437tr_peerMgrStopTorrent (tr_torrent * tor)
2438{
2439  assert (tr_isTorrent (tor));
2440  assert (tr_torrentIsLocked (tor));
2441
2442  stopSwarm (tor->swarm);
2443}
2444
2445void
2446tr_peerMgrAddTorrent (tr_peerMgr * manager, tr_torrent * tor)
2447{
2448  assert (tr_isTorrent (tor));
2449  assert (tr_torrentIsLocked (tor));
2450  assert (tor->swarm == NULL);
2451
2452  tor->swarm = swarmNew (manager, tor);
2453}
2454
2455void
2456tr_peerMgrRemoveTorrent (tr_torrent * tor)
2457{
2458  assert (tr_isTorrent (tor));
2459  assert (tr_torrentIsLocked (tor));
2460
2461  stopSwarm (tor->swarm);
2462  swarmFree (tor->swarm);
2463}
2464
2465void
2466tr_peerUpdateProgress (tr_torrent * tor, tr_peer * peer)
2467{
2468  const tr_bitfield * have = &peer->have;
2469
2470  if (tr_bitfieldHasAll (have))
2471    {
2472      peer->progress = 1.0;
2473    }
2474  else if (tr_bitfieldHasNone (have))
2475    {
2476      peer->progress = 0.0;
2477    }
2478  else
2479    {
2480      const float true_count = tr_bitfieldCountTrueBits (have);
2481
2482      if (tr_torrentHasMetadata (tor))
2483        {
2484          peer->progress = true_count / tor->info.pieceCount;
2485        }
2486      else /* without pieceCount, this result is only a best guess... */
2487        {
2488          peer->progress = true_count / (have->bit_count + 1);
2489        }
2490    }
2491
2492  /* clamp the progress range */
2493  if (peer->progress < 0.0)
2494    peer->progress = 0.0;
2495  if (peer->progress > 1.0)
2496    peer->progress = 1.0;
2497
2498  if (peer->atom && (peer->progress >= 1.0))
2499    atomSetSeed (tor->swarm, peer->atom);
2500}
2501
2502void
2503tr_peerMgrOnTorrentGotMetainfo (tr_torrent * tor)
2504{
2505  int i;
2506  int peerCount;
2507  tr_peer ** peers;
2508
2509  /* the webseed list may have changed... */
2510  rebuildWebseedArray (tor->swarm, tor);
2511
2512  /* some peer_msgs' progress fields may not be accurate if we
2513     didn't have the metadata before now... so refresh them all... */
2514  peerCount = tr_ptrArraySize (&tor->swarm->peers);
2515  peers = (tr_peer**) tr_ptrArrayBase (&tor->swarm->peers);
2516  for (i=0; i<peerCount; ++i)
2517    tr_peerUpdateProgress (tor, peers[i]);
2518
2519  /* update the bittorrent peers' willingnes... */
2520  for (i=0; i<peerCount; ++i)
2521    {
2522      tr_peerMsgsUpdateActive (tr_peerMsgsCast(peers[i]), TR_UP);
2523      tr_peerMsgsUpdateActive (tr_peerMsgsCast(peers[i]), TR_DOWN);
2524    }
2525}
2526
2527void
2528tr_peerMgrTorrentAvailability (const tr_torrent  * tor,
2529                               int8_t            * tab,
2530                               unsigned int        tabCount)
2531{
2532  assert (tr_isTorrent (tor));
2533  assert (tab != NULL);
2534  assert (tabCount > 0);
2535
2536  memset (tab, 0, tabCount);
2537
2538  if (tr_torrentHasMetadata (tor))
2539    {
2540      tr_piece_index_t i;
2541      const int peerCount = tr_ptrArraySize (&tor->swarm->peers);
2542      const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase (&tor->swarm->peers);
2543      const float interval = tor->info.pieceCount / (float)tabCount;
2544      const bool isSeed = tr_torrentGetCompleteness (tor) == TR_SEED;
2545
2546      for (i=0; i<tabCount; ++i)
2547        {
2548          const int piece = i * interval;
2549
2550          if (isSeed || tr_torrentPieceIsComplete (tor, piece))
2551            {
2552              tab[i] = -1;
2553            }
2554          else if (peerCount)
2555            {
2556              int j;
2557              for (j=0; j<peerCount; ++j)
2558                if (tr_bitfieldHas (&peers[j]->have, piece))
2559                  ++tab[i];
2560            }
2561        }
2562    }
2563}
2564
2565void
2566tr_swarmGetStats (const tr_swarm * swarm, tr_swarm_stats * setme)
2567{
2568  assert (swarm != NULL);
2569  assert (setme != NULL);
2570
2571  *setme = swarm->stats;
2572}
2573
2574void
2575tr_swarmIncrementActivePeers (tr_swarm * swarm, tr_direction direction, bool is_active)
2576{
2577  int n = swarm->stats.activePeerCount[direction];
2578
2579  if (is_active)
2580    ++n;
2581  else
2582    --n;
2583
2584  assert (0 <= n);
2585  assert (n <= swarm->stats.peerCount);
2586
2587  swarm->stats.activePeerCount[direction] = n;
2588}
2589
2590bool
2591tr_peerIsSeed (const tr_peer * peer)
2592{
2593  if (peer->progress >= 1.0)
2594    return true;
2595
2596  if (peer->atom && atomIsSeed (peer->atom))
2597    return true;
2598
2599  return false;
2600}
2601
2602/* count how many bytes we want that connected peers have */
2603uint64_t
2604tr_peerMgrGetDesiredAvailable (const tr_torrent * tor)
2605{
2606  size_t i;
2607  size_t n;
2608  uint64_t desiredAvailable;
2609  const tr_swarm * s;
2610
2611  assert (tr_isTorrent (tor));
2612
2613  /* common shortcuts... */
2614
2615  if (!tor->isRunning || tor->isStopping)
2616    return 0;
2617
2618  if (tr_torrentIsSeed (tor))
2619    return 0;
2620
2621  if (!tr_torrentHasMetadata (tor))
2622    return 0;
2623
2624  s = tor->swarm;
2625  if (s == NULL)
2626    return 0;
2627
2628  n = tr_ptrArraySize (&s->peers);
2629  if (n == 0)
2630    {
2631      return 0;
2632    }
2633  else
2634    {
2635      const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase (&s->peers);
2636      for (i=0; i<n; ++i)
2637        if (peers[i]->atom && atomIsSeed (peers[i]->atom))
2638          return tr_torrentGetLeftUntilDone (tor);
2639    }
2640
2641  if (!s->pieceReplication || !s->pieceReplicationSize)
2642    return 0;
2643
2644  /* do it the hard way */
2645
2646  desiredAvailable = 0;
2647  for (i=0, n=MIN (tor->info.pieceCount, s->pieceReplicationSize); i<n; ++i)
2648    if (!tor->info.pieces[i].dnd && (s->pieceReplication[i] > 0))
2649      desiredAvailable += tr_torrentMissingBytesInPiece (tor, i);
2650
2651  assert (desiredAvailable <= tor->info.totalSize);
2652  return desiredAvailable;
2653}
2654
2655double*
2656tr_peerMgrWebSpeeds_KBps (const tr_torrent * tor)
2657{
2658  unsigned int i;
2659  tr_swarm * s;
2660  unsigned int n;
2661  double * ret = NULL;
2662  const uint64_t now = tr_time_msec ();
2663
2664  assert (tr_isTorrent (tor));
2665
2666  s = tor->swarm;
2667  n = tr_ptrArraySize (&s->webseeds);
2668  ret = tr_new0 (double, n);
2669
2670  assert (s->manager != NULL);
2671  assert (n == tor->info.webseedCount);
2672
2673  for (i=0; i<n; ++i)
2674    {
2675      unsigned int Bps = 0;
2676      if (tr_peerIsTransferringPieces (tr_ptrArrayNth(&s->webseeds,i), now, TR_DOWN, &Bps))
2677        ret[i] = Bps / (double)tr_speed_K;
2678      else
2679        ret[i] = -1.0;
2680    }
2681
2682  return ret;
2683}
2684
2685struct tr_peer_stat *
2686tr_peerMgrPeerStats (const tr_torrent * tor, int * setmeCount)
2687{
2688  int i;
2689  int size = 0;
2690  tr_peer_stat * ret;
2691  const tr_swarm * s;
2692  tr_peer ** peers;
2693  const time_t now = tr_time ();
2694  const uint64_t now_msec = tr_time_msec ();
2695
2696  assert (tr_isTorrent (tor));
2697  assert (tor->swarm->manager != NULL);
2698
2699  s = tor->swarm;
2700  peers = (tr_peer**) tr_ptrArrayBase (&s->peers);
2701  size = tr_ptrArraySize (&s->peers);
2702  ret = tr_new0 (tr_peer_stat, size);
2703
2704  for (i=0; i<size; ++i)
2705    {
2706      char *                   pch;
2707      tr_peer *                peer = peers[i];
2708      tr_peerMsgs *            msgs = PEER_MSGS (peer);
2709      const struct peer_atom * atom = peer->atom;
2710      tr_peer_stat *           stat = ret + i;
2711
2712      tr_address_to_string_with_buf (&atom->addr, stat->addr, sizeof (stat->addr));
2713      tr_strlcpy (stat->client, tr_quark_get_string(peer->client,NULL), sizeof (stat->client));
2714      stat->port                = ntohs (peer->atom->port);
2715      stat->from                = atom->fromFirst;
2716      stat->progress            = peer->progress;
2717      stat->isUTP               = tr_peerMsgsIsUtpConnection (msgs);
2718      stat->isEncrypted         = tr_peerMsgsIsEncrypted (msgs);
2719      stat->rateToPeer_KBps     = toSpeedKBps (tr_peerGetPieceSpeed_Bps (peer, now_msec, TR_CLIENT_TO_PEER));
2720      stat->rateToClient_KBps   = toSpeedKBps (tr_peerGetPieceSpeed_Bps (peer, now_msec, TR_PEER_TO_CLIENT));
2721      stat->peerIsChoked        = tr_peerMsgsIsPeerChoked (msgs);
2722      stat->peerIsInterested    = tr_peerMsgsIsPeerInterested (msgs);
2723      stat->clientIsChoked      = tr_peerMsgsIsClientChoked (msgs);
2724      stat->clientIsInterested  = tr_peerMsgsIsClientInterested (msgs);
2725      stat->isIncoming          = tr_peerMsgsIsIncomingConnection (msgs);
2726      stat->isDownloadingFrom   = tr_peerMsgsIsActive (msgs, TR_PEER_TO_CLIENT);
2727      stat->isUploadingTo       = tr_peerMsgsIsActive (msgs, TR_CLIENT_TO_PEER);
2728      stat->isSeed              = tr_peerIsSeed (peer);
2729
2730      stat->blocksToPeer        = tr_historyGet (&peer->blocksSentToPeer,    now, CANCEL_HISTORY_SEC);
2731      stat->blocksToClient      = tr_historyGet (&peer->blocksSentToClient,  now, CANCEL_HISTORY_SEC);
2732      stat->cancelsToPeer       = tr_historyGet (&peer->cancelsSentToPeer,   now, CANCEL_HISTORY_SEC);
2733      stat->cancelsToClient     = tr_historyGet (&peer->cancelsSentToClient, now, CANCEL_HISTORY_SEC);
2734
2735      stat->pendingReqsToPeer   = peer->pendingReqsToPeer;
2736      stat->pendingReqsToClient = peer->pendingReqsToClient;
2737
2738      pch = stat->flagStr;
2739      if (stat->isUTP) *pch++ = 'T';
2740      if (s->optimistic == msgs) *pch++ = 'O';
2741      if (stat->isDownloadingFrom) *pch++ = 'D';
2742      else if (stat->clientIsInterested) *pch++ = 'd';
2743      if (stat->isUploadingTo) *pch++ = 'U';
2744      else if (stat->peerIsInterested) *pch++ = 'u';
2745      if (!stat->clientIsChoked && !stat->clientIsInterested) *pch++ = 'K';
2746      if (!stat->peerIsChoked && !stat->peerIsInterested) *pch++ = '?';
2747      if (stat->isEncrypted) *pch++ = 'E';
2748      if (stat->from == TR_PEER_FROM_DHT) *pch++ = 'H';
2749      else if (stat->from == TR_PEER_FROM_PEX) *pch++ = 'X';
2750      if (stat->isIncoming) *pch++ = 'I';
2751      *pch = '\0';
2752    }
2753
2754  *setmeCount = size;
2755  return ret;
2756}
2757
2758/***
2759****
2760****
2761***/
2762
2763void
2764tr_peerMgrClearInterest (tr_torrent * tor)
2765{
2766  int i;
2767  tr_swarm * s = tor->swarm;
2768  const int peerCount = tr_ptrArraySize (&s->peers);
2769
2770  assert (tr_isTorrent (tor));
2771  assert (tr_torrentIsLocked (tor));
2772
2773  for (i=0; i<peerCount; ++i)
2774    tr_peerMsgsSetInterested (tr_ptrArrayNth (&s->peers, i), false);
2775}
2776
2777/* does this peer have any pieces that we want? */
2778static bool
2779isPeerInteresting (tr_torrent     * const tor,
2780                   const bool     * const piece_is_interesting,
2781                   const tr_peer  * const peer)
2782{
2783  tr_piece_index_t i, n;
2784
2785  /* these cases should have already been handled by the calling code... */
2786  assert (!tr_torrentIsSeed (tor));
2787  assert (tr_torrentIsPieceTransferAllowed (tor, TR_PEER_TO_CLIENT));
2788
2789  if (tr_peerIsSeed (peer))
2790    return true;
2791
2792  for (i=0, n=tor->info.pieceCount; i<n; ++i)
2793    if (piece_is_interesting[i] && tr_bitfieldHas (&peer->have, i))
2794      return true;
2795
2796  return false;
2797}
2798
2799typedef enum
2800{
2801  RECHOKE_STATE_GOOD,
2802  RECHOKE_STATE_UNTESTED,
2803  RECHOKE_STATE_BAD
2804}
2805tr_rechoke_state;
2806
2807struct tr_rechoke_info
2808{
2809  tr_peer * peer;
2810  int salt;
2811  int rechoke_state;
2812};
2813
2814static int
2815compare_rechoke_info (const void * va, const void * vb)
2816{
2817  const struct tr_rechoke_info * a = va;
2818  const struct tr_rechoke_info * b = vb;
2819
2820  if (a->rechoke_state != b->rechoke_state)
2821    return a->rechoke_state - b->rechoke_state;
2822
2823  return a->salt - b->salt;
2824}
2825
2826/* determines who we send "interested" messages to */
2827static void
2828rechokeDownloads (tr_swarm * s)
2829{
2830  int i;
2831  int maxPeers = 0;
2832  int rechoke_count = 0;
2833  struct tr_rechoke_info * rechoke = NULL;
2834  const int MIN_INTERESTING_PEERS = 5;
2835  const int peerCount = tr_ptrArraySize (&s->peers);
2836  const time_t now = tr_time ();
2837
2838  /* some cases where this function isn't necessary */
2839  if (tr_torrentIsSeed (s->tor))
2840    return;
2841  if (!tr_torrentIsPieceTransferAllowed (s->tor, TR_PEER_TO_CLIENT))
2842    return;
2843
2844  /* decide HOW MANY peers to be interested in */
2845  {
2846    int blocks = 0;
2847    int cancels = 0;
2848    time_t timeSinceCancel;
2849
2850    /* Count up how many blocks & cancels each peer has.
2851     *
2852     * There are two situations where we send out cancels --
2853     *
2854     * 1. We've got unresponsive peers, which is handled by deciding
2855     *    -which- peers to be interested in.
2856     *
2857     * 2. We've hit our bandwidth cap, which is handled by deciding
2858     *    -how many- peers to be interested in.
2859     *
2860     * We're working on 2. here, so we need to ignore unresponsive
2861     * peers in our calculations lest they confuse Transmission into
2862     * thinking it's hit its bandwidth cap.
2863     */
2864    for (i=0; i<peerCount; ++i)
2865      {
2866        const tr_peer * peer = tr_ptrArrayNth (&s->peers, i);
2867        const int b = tr_historyGet (&peer->blocksSentToClient, now, CANCEL_HISTORY_SEC);
2868        const int c = tr_historyGet (&peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC);
2869
2870        if (b == 0) /* ignore unresponsive peers, as described above */
2871          continue;
2872
2873        blocks += b;
2874        cancels += c;
2875      }
2876
2877    if (cancels > 0)
2878      {
2879        /* cancelRate: of the block requests we've recently made, the percentage we cancelled.
2880         * higher values indicate more congestion. */
2881        const double cancelRate = cancels / (double)(cancels + blocks);
2882        const double mult = 1 - MIN (cancelRate, 0.5);
2883        maxPeers = s->interestedCount * mult;
2884        tordbg (s, "cancel rate is %.3f -- reducing the "
2885                   "number of peers we're interested in by %.0f percent",
2886                   cancelRate, mult * 100);
2887        s->lastCancel = now;
2888      }
2889
2890    timeSinceCancel = now - s->lastCancel;
2891    if (timeSinceCancel)
2892      {
2893        const int maxIncrease = 15;
2894        const time_t maxHistory = 2 * CANCEL_HISTORY_SEC;
2895        const double mult = MIN (timeSinceCancel, maxHistory) / (double) maxHistory;
2896        const int inc = maxIncrease * mult;
2897        maxPeers = s->maxPeers + inc;
2898        tordbg (s, "time since last cancel is %"PRIdMAX" -- increasing the "
2899                   "number of peers we're interested in by %d",
2900                   (intmax_t)timeSinceCancel, inc);
2901      }
2902  }
2903
2904  /* don't let the previous section's number tweaking go too far... */
2905  if (maxPeers < MIN_INTERESTING_PEERS)
2906    maxPeers = MIN_INTERESTING_PEERS;
2907  if (maxPeers > s->tor->maxConnectedPeers)
2908    maxPeers = s->tor->maxConnectedPeers;
2909
2910  s->maxPeers = maxPeers;
2911
2912  if (peerCount > 0)
2913    {
2914      bool * piece_is_interesting;
2915      const tr_torrent * const tor = s->tor;
2916      const int n = tor->info.pieceCount;
2917
2918      /* build a bitfield of interesting pieces... */
2919      piece_is_interesting = tr_new (bool, n);
2920      for (i=0; i<n; i++)
2921        piece_is_interesting[i] = !tor->info.pieces[i].dnd && !tr_torrentPieceIsComplete (tor, i);
2922
2923      /* decide WHICH peers to be interested in (based on their cancel-to-block ratio) */
2924      for (i=0; i<peerCount; ++i)
2925        {
2926          tr_peer * peer = tr_ptrArrayNth (&s->peers, i);
2927
2928          if (!isPeerInteresting (s->tor, piece_is_interesting, peer))
2929            {
2930              tr_peerMsgsSetInterested (PEER_MSGS(peer), false);
2931            }
2932          else
2933            {
2934              tr_rechoke_state rechoke_state;
2935              const int blocks = tr_historyGet (&peer->blocksSentToClient, now, CANCEL_HISTORY_SEC);
2936              const int cancels = tr_historyGet (&peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC);
2937
2938              if (!blocks && !cancels)
2939                rechoke_state = RECHOKE_STATE_UNTESTED;
2940              else if (!cancels)
2941                rechoke_state = RECHOKE_STATE_GOOD;
2942              else if (!blocks)
2943                rechoke_state = RECHOKE_STATE_BAD;
2944              else if ((cancels * 10) < blocks)
2945                rechoke_state = RECHOKE_STATE_GOOD;
2946              else
2947                rechoke_state = RECHOKE_STATE_BAD;
2948
2949              if (rechoke == NULL)
2950                rechoke = tr_new (struct tr_rechoke_info, peerCount);
2951
2952              rechoke[rechoke_count].peer = peer;
2953              rechoke[rechoke_count].rechoke_state = rechoke_state;
2954              rechoke[rechoke_count].salt = tr_rand_int_weak (INT_MAX);
2955              rechoke_count++;
2956            }
2957
2958        }
2959
2960      tr_free (piece_is_interesting);
2961    }
2962
2963  /* now that we know which & how many peers to be interested in... update the peer interest */
2964  qsort (rechoke, rechoke_count, sizeof (struct tr_rechoke_info), compare_rechoke_info);
2965  s->interestedCount = MIN (maxPeers, rechoke_count);
2966  for (i=0; i<rechoke_count; ++i)
2967    tr_peerMsgsSetInterested (PEER_MSGS(rechoke[i].peer), i<s->interestedCount);
2968
2969  /* cleanup */
2970  tr_free (rechoke);
2971}
2972
2973/**
2974***
2975**/
2976
2977struct ChokeData
2978{
2979  bool          isInterested;
2980  bool          wasChoked;
2981  bool          isChoked;
2982  int           rate;
2983  int           salt;
2984  tr_peerMsgs * msgs;
2985};
2986
2987static int
2988compareChoke (const void * va, const void * vb)
2989{
2990  const struct ChokeData * a = va;
2991  const struct ChokeData * b = vb;
2992
2993  if (a->rate != b->rate) /* prefer higher overall speeds */
2994    return a->rate > b->rate ? -1 : 1;
2995
2996  if (a->wasChoked != b->wasChoked) /* prefer unchoked */
2997    return a->wasChoked ? 1 : -1;
2998
2999  if (a->salt != b->salt) /* random order */
3000    return a->salt - b->salt;
3001
3002  return 0;
3003}
3004
3005/* is this a new connection? */
3006static bool
3007isNew (const tr_peerMsgs * msgs)
3008{
3009  return (msgs != NULL) && (tr_peerMsgsGetConnectionAge (msgs) < 45);
3010}
3011
3012/* get a rate for deciding which peers to choke and unchoke. */
3013static int
3014getRate (const tr_torrent * tor, struct peer_atom * atom, uint64_t now)
3015{
3016  unsigned int Bps;
3017
3018  if (tr_torrentIsSeed (tor))
3019    Bps = tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_CLIENT_TO_PEER);
3020
3021  /* downloading a private torrent... take upload speed into account
3022   * because there may only be a small window of opportunity to share */
3023  else if (tr_torrentIsPrivate (tor))
3024    Bps = tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_PEER_TO_CLIENT)
3025        + tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_CLIENT_TO_PEER);
3026
3027  /* downloading a public torrent */
3028  else
3029    Bps = tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_PEER_TO_CLIENT);
3030
3031  /* convert it to bytes per second */
3032  return Bps;
3033}
3034
3035static inline bool
3036isBandwidthMaxedOut (const tr_bandwidth * b,
3037                     const uint64_t now_msec, tr_direction dir)
3038{
3039  if (!tr_bandwidthIsLimited (b, dir))
3040    {
3041      return false;
3042    }
3043  else
3044    {
3045      const unsigned int got = tr_bandwidthGetPieceSpeed_Bps (b, now_msec, dir);
3046      const unsigned int want = tr_bandwidthGetDesiredSpeed_Bps (b, dir);
3047      return got >= want;
3048    }
3049}
3050
3051static void
3052rechokeUploads (tr_swarm * s, const uint64_t now)
3053{
3054  int i, size, unchokedInterested;
3055  const int peerCount = tr_ptrArraySize (&s->peers);
3056  tr_peer ** peers = (tr_peer**) tr_ptrArrayBase (&s->peers);
3057  struct ChokeData * choke = tr_new0 (struct ChokeData, peerCount);
3058  const tr_session * session = s->manager->session;
3059  const int chokeAll = !tr_torrentIsPieceTransferAllowed (s->tor, TR_CLIENT_TO_PEER);
3060  const bool isMaxedOut = isBandwidthMaxedOut (&s->tor->bandwidth, now, TR_UP);
3061
3062  assert (swarmIsLocked (s));
3063
3064  /* an optimistic unchoke peer's "optimistic"
3065   * state lasts for N calls to rechokeUploads (). */
3066  if (s->optimisticUnchokeTimeScaler > 0)
3067    s->optimisticUnchokeTimeScaler--;
3068  else
3069    s->optimistic = NULL;
3070
3071  /* sort the peers by preference and rate */
3072  for (i=0, size=0; i<peerCount; ++i)
3073    {
3074      tr_peer * peer = peers[i];
3075      tr_peerMsgs * msgs = PEER_MSGS (peer);
3076
3077      struct peer_atom * atom = peer->atom;
3078
3079      if (tr_peerIsSeed (peer)) /* choke seeds and partial seeds */
3080        {
3081          tr_peerMsgsSetChoke (PEER_MSGS(peer), true);
3082        }
3083      else if (chokeAll) /* choke everyone if we're not uploading */
3084        {
3085          tr_peerMsgsSetChoke (PEER_MSGS(peer), true);
3086        }
3087      else if (msgs != s->optimistic)
3088        {
3089          struct ChokeData * n = &choke[size++];
3090          n->msgs         = msgs;
3091          n->isInterested = tr_peerMsgsIsPeerInterested (msgs);
3092          n->wasChoked    = tr_peerMsgsIsPeerChoked (msgs);
3093          n->rate         = getRate (s->tor, atom, now);
3094          n->salt         = tr_rand_int_weak (INT_MAX);
3095          n->isChoked     = true;
3096        }
3097    }
3098
3099  qsort (choke, size, sizeof (struct ChokeData), compareChoke);
3100
3101  /**
3102   * Reciprocation and number of uploads capping is managed by unchoking
3103   * the N peers which have the best upload rate and are interested.
3104   * This maximizes the client's download rate. These N peers are
3105   * referred to as downloaders, because they are interested in downloading
3106   * from the client.
3107   *
3108   * Peers which have a better upload rate (as compared to the downloaders)
3109   * but aren't interested get unchoked. If they become interested, the
3110   * downloader with the worst upload rate gets choked. If a client has
3111   * a complete file, it uses its upload rate rather than its download
3112   * rate to decide which peers to unchoke.
3113   *
3114   * If our bandwidth is maxed out, don't unchoke any more peers.
3115   */
3116  unchokedInterested = 0;
3117  for (i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i)
3118    {
3119      choke[i].isChoked = isMaxedOut ? choke[i].wasChoked : false;
3120      if (choke[i].isInterested)
3121        ++unchokedInterested;
3122    }
3123
3124  /* optimistic unchoke */
3125  if (!s->optimistic && !isMaxedOut && (i<size))
3126    {
3127      int n;
3128      struct ChokeData * c;
3129      tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
3130
3131      for (; i<size; ++i)
3132        {
3133          if (choke[i].isInterested)
3134            {
3135              const tr_peerMsgs * msgs = choke[i].msgs;
3136              int x = 1, y;
3137              if (isNew (msgs)) x *= 3;
3138              for (y=0; y<x; ++y)
3139                tr_ptrArrayAppend (&randPool, &choke[i]);
3140            }
3141        }
3142
3143      if ((n = tr_ptrArraySize (&randPool)))
3144        {
3145          c = tr_ptrArrayNth (&randPool, tr_rand_int_weak (n));
3146          c->isChoked = false;
3147          s->optimistic = c->msgs;
3148          s->optimisticUnchokeTimeScaler = OPTIMISTIC_UNCHOKE_MULTIPLIER;
3149        }
3150
3151      tr_ptrArrayDestruct (&randPool, NULL);
3152    }
3153
3154  for (i=0; i<size; ++i)
3155    tr_peerMsgsSetChoke (choke[i].msgs, choke[i].isChoked);
3156
3157  /* cleanup */
3158  tr_free (choke);
3159}
3160
3161static void
3162rechokePulse (evutil_socket_t foo UNUSED, short bar UNUSED, void * vmgr)
3163{
3164  tr_torrent * tor = NULL;
3165  tr_peerMgr * mgr = vmgr;
3166  const uint64_t now = tr_time_msec ();
3167
3168  managerLock (mgr);
3169
3170  while ((tor = tr_torrentNext (mgr->session, tor)))
3171    {
3172      if (tor->isRunning)
3173        {
3174          tr_swarm * s = tor->swarm;
3175
3176          if (s->stats.peerCount > 0)
3177            {
3178              rechokeUploads (s, now);
3179              rechokeDownloads (s);
3180            }
3181        }
3182    }
3183
3184  tr_timerAddMsec (mgr->rechokeTimer, RECHOKE_PERIOD_MSEC);
3185  managerUnlock (mgr);
3186}
3187
3188/***
3189****
3190****  Life and Death
3191****
3192***/
3193
3194static bool
3195shouldPeerBeClosed (const tr_swarm   * s,
3196                    const tr_peer    * peer,
3197                    int                peerCount,
3198                    const time_t       now)
3199{
3200  const tr_torrent * tor = s->tor;
3201  const struct peer_atom * atom = peer->atom;
3202
3203  /* if it's marked for purging, close it */
3204  if (peer->doPurge)
3205    {
3206      tordbg (s, "purging peer %s because its doPurge flag is set",
3207              tr_atomAddrStr (atom));
3208      return true;
3209    }
3210
3211  /* disconnect if we're both seeds and enough time has passed for PEX */
3212  if (tr_torrentIsSeed (tor) && tr_peerIsSeed (peer))
3213    return !tr_torrentAllowsPex (tor) || (now-atom->time>=30);
3214
3215  /* disconnect if it's been too long since piece data has been transferred.
3216   * this is on a sliding scale based on number of available peers... */
3217  {
3218    const int relaxStrictnessIfFewerThanN = (int)((getMaxPeerCount (tor) * 0.9) + 0.5);
3219    /* if we have >= relaxIfFewerThan, strictness is 100%.
3220     * if we have zero connections, strictness is 0% */
3221    const float strictness = peerCount >= relaxStrictnessIfFewerThanN
3222                           ? 1.0
3223                           : peerCount / (float)relaxStrictnessIfFewerThanN;
3224    const int lo = MIN_UPLOAD_IDLE_SECS;
3225    const int hi = MAX_UPLOAD_IDLE_SECS;
3226    const int limit = hi - ((hi - lo) * strictness);
3227    const int idleTime = now - MAX (atom->time, atom->piece_data_time);
3228/*fprintf (stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit);*/
3229    if (idleTime > limit)
3230      {
3231        tordbg (s, "purging peer %s because it's been %d secs since we shared anything",
3232                tr_atomAddrStr (atom), idleTime);
3233        return true;
3234      }
3235  }
3236
3237  return false;
3238}
3239
3240static tr_peer **
3241getPeersToClose (tr_swarm * s, const time_t now_sec, int * setmeSize)
3242{
3243  int i, peerCount, outsize;
3244  struct tr_peer ** ret = NULL;
3245  tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek (&s->peers, &peerCount);
3246
3247  assert (swarmIsLocked (s));
3248
3249  for (i=outsize=0; i<peerCount; ++i)
3250    {
3251      if (shouldPeerBeClosed (s, peers[i], peerCount, now_sec))
3252        {
3253          if (ret == NULL)
3254            ret = tr_new (tr_peer *, peerCount);
3255          ret[outsize++] = peers[i];
3256        }
3257    }
3258
3259  *setmeSize = outsize;
3260  return ret;
3261}
3262
3263static int
3264getReconnectIntervalSecs (const struct peer_atom * atom, const time_t now)
3265{
3266  int sec;
3267  const bool unreachable = (atom->flags2 & MYFLAG_UNREACHABLE) != 0;
3268
3269  /* if we were recently connected to this peer and transferring piece
3270   * data, try to reconnect to them sooner rather that later -- we don't
3271   * want network troubles to get in the way of a good peer. */
3272  if (!unreachable && ((now - atom->piece_data_time) <= (MINIMUM_RECONNECT_INTERVAL_SECS * 2)))
3273    sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3274
3275  /* otherwise, the interval depends on how many times we've tried
3276   * and failed to connect to the peer */
3277  else
3278    {
3279      int step = atom->numFails;
3280
3281      /* penalize peers that were unreachable the last time we tried */
3282      if (unreachable)
3283        step += 2;
3284
3285      switch (step)
3286        {
3287          case 0: sec = 0; break;
3288          case 1: sec = 10; break;
3289          case 2: sec = 60 * 2; break;
3290          case 3: sec = 60 * 15; break;
3291          case 4: sec = 60 * 30; break;
3292          case 5: sec = 60 * 60; break;
3293          default: sec = 60 * 120; break;
3294        }
3295    }
3296
3297  dbgmsg ("reconnect interval for %s is %d seconds", tr_atomAddrStr (atom), sec);
3298  return sec;
3299}
3300
3301static void
3302removePeer (tr_swarm * s, tr_peer * peer)
3303{
3304  struct peer_atom * atom = peer->atom;
3305
3306  assert (swarmIsLocked (s));
3307  assert (atom);
3308
3309  atom->time = tr_time ();
3310
3311  tr_ptrArrayRemoveSortedPointer (&s->peers, peer, peerCompare);
3312  --s->stats.peerCount;
3313  --s->stats.peerFromCount[atom->fromFirst];
3314
3315  if (replicationExists (s))
3316    tr_decrReplicationFromBitfield (s, &peer->have);
3317
3318  assert (s->stats.peerCount == tr_ptrArraySize (&s->peers));
3319  assert (s->stats.peerFromCount[atom->fromFirst] >= 0);
3320
3321  tr_peerFree (peer);
3322}
3323
3324static void
3325closePeer (tr_swarm * s, tr_peer * peer)
3326{
3327  struct peer_atom * atom;
3328
3329  assert (s != NULL);
3330  assert (peer != NULL);
3331
3332  atom = peer->atom;
3333
3334  /* if we transferred piece data, then they might be good peers,
3335     so reset their `numFails' weight to zero. otherwise we connected
3336     to them fruitlessly, so mark it as another fail */
3337  if (atom->piece_data_time)
3338    {
3339      tordbg (s, "resetting atom %s numFails to 0", tr_atomAddrStr (atom));
3340      atom->numFails = 0;
3341    }
3342  else
3343    {
3344      ++atom->numFails;
3345      tordbg (s, "incremented atom %s numFails to %d", tr_atomAddrStr (atom), (int)atom->numFails);
3346    }
3347
3348  tordbg (s, "removing bad peer %s", tr_atomAddrStr (peer->atom));
3349  removePeer (s, peer);
3350}
3351
3352static void
3353removeAllPeers (tr_swarm * s)
3354{
3355  while (!tr_ptrArrayEmpty (&s->peers))
3356    removePeer (s, tr_ptrArrayNth (&s->peers, 0));
3357
3358  assert (!s->stats.peerCount);
3359}
3360
3361static void
3362closeBadPeers (tr_swarm * s, const time_t now_sec)
3363{
3364  if (!tr_ptrArrayEmpty (&s->peers))
3365    {
3366      int i;
3367      int peerCount;
3368      struct tr_peer ** peers;
3369
3370      peers = getPeersToClose (s, now_sec, &peerCount);
3371      for (i=0; i<peerCount; ++i)
3372        closePeer (s, peers[i]);
3373      tr_free (peers);
3374    }
3375}
3376
3377struct peer_liveliness
3378{
3379  tr_peer * peer;
3380  void * clientData;
3381  time_t pieceDataTime;
3382  time_t time;
3383  unsigned int speed;
3384  bool doPurge;
3385};
3386
3387static int
3388comparePeerLiveliness (const void * va, const void * vb)
3389{
3390  const struct peer_liveliness * a = va;
3391  const struct peer_liveliness * b = vb;
3392
3393  if (a->doPurge != b->doPurge)
3394    return a->doPurge ? 1 : -1;
3395
3396  if (a->speed != b->speed) /* faster goes first */
3397    return a->speed > b->speed ? -1 : 1;
3398
3399  /* the one to give us data more recently goes first */
3400  if (a->pieceDataTime != b->pieceDataTime)
3401    return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
3402
3403  /* the one we connected to most recently goes first */
3404  if (a->time != b->time)
3405    return a->time > b->time ? -1 : 1;
3406
3407  return 0;
3408}
3409
3410static void
3411sortPeersByLivelinessImpl (tr_peer  ** peers,
3412                           void     ** clientData,
3413                           int         n,
3414                           uint64_t    now,
3415                           int (*compare)(const void *va, const void *vb))
3416{
3417  int i;
3418  struct peer_liveliness *lives, *l;
3419
3420  /* build a sortable array of peer + extra info */
3421  lives = l = tr_new0 (struct peer_liveliness, n);
3422  for (i=0; i<n; ++i, ++l)
3423    {
3424      tr_peer * p = peers[i];
3425      l->peer = p;
3426      l->doPurge = p->doPurge;
3427      l->pieceDataTime = p->atom->piece_data_time;
3428      l->time = p->atom->time;
3429      l->speed = tr_peerGetPieceSpeed_Bps (p, now, TR_UP)
3430               + tr_peerGetPieceSpeed_Bps (p, now, TR_DOWN);
3431      if (clientData)
3432        l->clientData = clientData[i];
3433    }
3434
3435  /* sort 'em */
3436  assert (n == (l - lives));
3437  qsort (lives, n, sizeof (struct peer_liveliness), compare);
3438
3439  /* build the peer array */
3440  for (i=0, l=lives; i<n; ++i, ++l)
3441    {
3442      peers[i] = l->peer;
3443      if (clientData)
3444        clientData[i] = l->clientData;
3445    }
3446  assert (n == (l - lives));
3447
3448  /* cleanup */
3449  tr_free (lives);
3450}
3451
3452static void
3453sortPeersByLiveliness (tr_peer ** peers, void ** clientData, int n, uint64_t now)
3454{
3455  sortPeersByLivelinessImpl (peers, clientData, n, now, comparePeerLiveliness);
3456}
3457
3458
3459static void
3460enforceTorrentPeerLimit (tr_swarm * s, uint64_t now)
3461{
3462  int n = tr_ptrArraySize (&s->peers);
3463  const int max = tr_torrentGetPeerLimit (s->tor);
3464  if (n > max)
3465    {
3466      void * base = tr_ptrArrayBase (&s->peers);
3467      tr_peer ** peers = tr_memdup (base, n*sizeof (tr_peer*));
3468      sortPeersByLiveliness (peers, NULL, n, now);
3469      while (n > max)
3470        closePeer (s, peers[--n]);
3471      tr_free (peers);
3472    }
3473}
3474
3475static void
3476enforceSessionPeerLimit (tr_session * session, uint64_t now)
3477{
3478  int n = 0;
3479  tr_torrent * tor = NULL;
3480  const int max = tr_sessionGetPeerLimit (session);
3481
3482  /* count the total number of peers */
3483  while ((tor = tr_torrentNext (session, tor)))
3484    n += tr_ptrArraySize (&tor->swarm->peers);
3485
3486  /* if there are too many, prune out the worst */
3487  if (n > max)
3488    {
3489      tr_peer ** peers = tr_new (tr_peer*, n);
3490      tr_swarm ** swarms = tr_new (tr_swarm*, n);
3491
3492      /* populate the peer array */
3493      n = 0;
3494      tor = NULL;
3495      while ((tor = tr_torrentNext (session, tor)))
3496        {
3497          int i;
3498          tr_swarm * s = tor->swarm;
3499          const int tn = tr_ptrArraySize (&s->peers);
3500          for (i=0; i<tn; ++i, ++n)
3501            {
3502              peers[n] = tr_ptrArrayNth (&s->peers, i);
3503              swarms[n] = s;
3504            }
3505        }
3506
3507      /* sort 'em */
3508      sortPeersByLiveliness (peers, (void**)swarms, n, now);
3509
3510      /* cull out the crappiest */
3511      while (n-- > max)
3512        closePeer (swarms[n], peers[n]);
3513
3514      /* cleanup */
3515      tr_free (swarms);
3516      tr_free (peers);
3517    }
3518}
3519
3520static void makeNewPeerConnections (tr_peerMgr * mgr, const int max);
3521
3522static void
3523reconnectPulse (evutil_socket_t foo UNUSED, short bar UNUSED, void * vmgr)
3524{
3525  tr_torrent * tor;
3526  tr_peerMgr * mgr = vmgr;
3527  const time_t now_sec = tr_time ();
3528  const uint64_t now_msec = tr_time_msec ();
3529
3530  /**
3531  ***  enforce the per-session and per-torrent peer limits
3532  **/
3533
3534  /* if we're over the per-torrent peer limits, cull some peers */
3535  tor = NULL;
3536  while ((tor = tr_torrentNext (mgr->session, tor)))
3537    if (tor->isRunning)
3538      enforceTorrentPeerLimit (tor->swarm, now_msec);
3539
3540  /* if we're over the per-session peer limits, cull some peers */
3541  enforceSessionPeerLimit (mgr->session, now_msec);
3542
3543  /* remove crappy peers */
3544  tor = NULL;
3545  while ((tor = tr_torrentNext (mgr->session, tor)))
3546    if (!tor->swarm->isRunning)
3547      removeAllPeers (tor->swarm);
3548    else
3549      closeBadPeers (tor->swarm, now_sec);
3550
3551  /* try to make new peer connections */
3552  makeNewPeerConnections (mgr, MAX_CONNECTIONS_PER_PULSE);
3553}
3554
3555/****
3556*****
3557*****  BANDWIDTH ALLOCATION
3558*****
3559****/
3560
3561static void
3562pumpAllPeers (tr_peerMgr * mgr)
3563{
3564  tr_torrent * tor = NULL;
3565
3566  while ((tor = tr_torrentNext (mgr->session, tor)))
3567    {
3568      int j;
3569      tr_swarm * s = tor->swarm;
3570
3571      for (j=0; j<tr_ptrArraySize (&s->peers); ++j)
3572        tr_peerMsgsPulse (tr_ptrArrayNth (&s->peers, j));
3573    }
3574}
3575
3576
3577static void
3578queuePulseForeach (void * vtor)
3579{
3580  tr_torrent * tor = vtor;
3581
3582  tr_torrentStartNow (tor);
3583
3584  if (tor->queue_started_callback != NULL)
3585    (*tor->queue_started_callback)(tor, tor->queue_started_user_data);
3586}
3587
3588static void
3589queuePulse (tr_session * session, tr_direction dir)
3590{
3591  assert (tr_isSession (session));
3592  assert (tr_isDirection (dir));
3593
3594  if (tr_sessionGetQueueEnabled (session, dir))
3595    {
3596      tr_ptrArray torrents = TR_PTR_ARRAY_INIT;
3597
3598      tr_sessionGetNextQueuedTorrents (session,
3599                                       dir,
3600                                       tr_sessionCountQueueFreeSlots (session, dir),
3601                                       &torrents);
3602
3603      tr_ptrArrayForeach (&torrents, queuePulseForeach);
3604
3605      tr_ptrArrayDestruct (&torrents, NULL);
3606    }
3607}
3608
3609static void
3610bandwidthPulse (evutil_socket_t foo UNUSED, short bar UNUSED, void * vmgr)
3611{
3612  tr_torrent * tor;
3613  tr_peerMgr * mgr = vmgr;
3614  tr_session * session = mgr->session;
3615  managerLock (mgr);
3616
3617  /* FIXME: this next line probably isn't necessary... */
3618  pumpAllPeers (mgr);
3619
3620  /* allocate bandwidth to the peers */
3621  tr_bandwidthAllocate (&session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC);
3622  tr_bandwidthAllocate (&session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC);
3623
3624  /* torrent upkeep */
3625  tor = NULL;
3626  while ((tor = tr_torrentNext (session, tor)))
3627    {
3628      /* possibly stop torrents that have seeded enough */
3629      tr_torrentCheckSeedLimit (tor);
3630
3631      /* run the completeness check for any torrents that need it */
3632      if (tor->swarm->needsCompletenessCheck)
3633        {
3634          tor->swarm->needsCompletenessCheck  = false;
3635          tr_torrentRecheckCompleteness (tor);
3636        }
3637
3638      /* stop torrents that are ready to stop, but couldn't be stopped
3639         earlier during the peer-io callback call chain */
3640      if (tor->isStopping)
3641        tr_torrentStop (tor);
3642
3643      /* update the torrent's stats */
3644      tor->swarm->stats.activeWebseedCount = countActiveWebseeds (tor->swarm);
3645    }
3646
3647  /* pump the queues */
3648  queuePulse (session, TR_UP);
3649  queuePulse (session, TR_DOWN);
3650
3651  reconnectPulse (0, 0, mgr);
3652
3653  tr_timerAddMsec (mgr->bandwidthTimer, BANDWIDTH_PERIOD_MSEC);
3654  managerUnlock (mgr);
3655}
3656
3657/***
3658****
3659***/
3660
3661static int
3662compareAtomPtrsByAddress (const void * va, const void *vb)
3663{
3664  const struct peer_atom * a = * (const struct peer_atom**) va;
3665  const struct peer_atom * b = * (const struct peer_atom**) vb;
3666
3667  assert (tr_isAtom (a));
3668  assert (tr_isAtom (b));
3669
3670  return tr_address_compare (&a->addr, &b->addr);
3671}
3672
3673/* best come first, worst go last */
3674static int
3675compareAtomPtrsByShelfDate (const void * va, const void *vb)
3676{
3677  time_t atime;
3678  time_t btime;
3679  const struct peer_atom * a = * (const struct peer_atom**) va;
3680  const struct peer_atom * b = * (const struct peer_atom**) vb;
3681  const int data_time_cutoff_secs = 60 * 60;
3682  const time_t tr_now = tr_time ();
3683
3684  assert (tr_isAtom (a));
3685  assert (tr_isAtom (b));
3686
3687  /* primary key: the last piece data time *if* it was within the last hour */
3688  atime = a->piece_data_time; if (atime + data_time_cutoff_secs < tr_now) atime = 0;
3689  btime = b->piece_data_time; if (btime + data_time_cutoff_secs < tr_now) btime = 0;
3690  if (atime != btime)
3691    return atime > btime ? -1 : 1;
3692
3693  /* secondary key: shelf date. */
3694  if (a->shelf_date != b->shelf_date)
3695    return a->shelf_date > b->shelf_date ? -1 : 1;
3696
3697  return 0;
3698}
3699
3700static int
3701getMaxAtomCount (const tr_torrent * tor)
3702{
3703  return MIN (50, tor->maxConnectedPeers * 3);
3704}
3705
3706static void
3707atomPulse (evutil_socket_t foo UNUSED, short bar UNUSED, void * vmgr)
3708{
3709  tr_torrent * tor = NULL;
3710  tr_peerMgr * mgr = vmgr;
3711  managerLock (mgr);
3712
3713  while ((tor = tr_torrentNext (mgr->session, tor)))
3714    {
3715      int atomCount;
3716      tr_swarm * s = tor->swarm;
3717      const int maxAtomCount = getMaxAtomCount (tor);
3718      struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek (&s->pool, &atomCount);
3719
3720      if (atomCount > maxAtomCount) /* we've got too many atoms... time to prune */
3721        {
3722          int i;
3723          int keepCount = 0;
3724          int testCount = 0;
3725          struct peer_atom ** keep = tr_new (struct peer_atom*, atomCount);
3726          struct peer_atom ** test = tr_new (struct peer_atom*, atomCount);
3727
3728          /* keep the ones that are in use */
3729          for (i=0; i<atomCount; ++i)
3730            {
3731              struct peer_atom * atom = atoms[i];
3732              if (peerIsInUse (s, atom))
3733                keep[keepCount++] = atom;
3734              else
3735                test[testCount++] = atom;
3736            }
3737
3738          /* if there's room, keep the best of what's left */
3739          i = 0;
3740          if (keepCount < maxAtomCount)
3741            {
3742              qsort (test, testCount, sizeof (struct peer_atom *), compareAtomPtrsByShelfDate);
3743              while (i<testCount && keepCount<maxAtomCount)
3744                keep[keepCount++] = test[i++];
3745            }
3746
3747          /* free the culled atoms */
3748          while (i<testCount)
3749            tr_free (test[i++]);
3750
3751          /* rebuild Torrent.pool with what's left */
3752          tr_ptrArrayDestruct (&s->pool, NULL);
3753          s->pool = TR_PTR_ARRAY_INIT;
3754          qsort (keep, keepCount, sizeof (struct peer_atom *), compareAtomPtrsByAddress);
3755          for (i=0; i<keepCount; ++i)
3756            tr_ptrArrayAppend (&s->pool, keep[i]);
3757
3758          tordbg (s, "max atom count is %d... pruned from %d to %d\n", maxAtomCount, atomCount, keepCount);
3759
3760          /* cleanup */
3761          tr_free (test);
3762          tr_free (keep);
3763        }
3764    }
3765
3766  tr_timerAddMsec (mgr->atomTimer, ATOM_PERIOD_MSEC);
3767  managerUnlock (mgr);
3768}
3769
3770/***
3771****
3772****
3773****
3774***/
3775
3776/* is this atom someone that we'd want to initiate a connection to? */
3777static bool
3778isPeerCandidate (const tr_torrent * tor, struct peer_atom * atom, const time_t now)
3779{
3780  /* not if we're both seeds */
3781  if (tr_torrentIsSeed (tor) && atomIsSeed (atom))
3782    return false;
3783
3784  /* not if we've already got a connection to them... */
3785  if (peerIsInUse (tor->swarm, atom))
3786    return false;
3787
3788  /* not if we just tried them already */
3789  if ((now - atom->time) < getReconnectIntervalSecs (atom, now))
3790    return false;
3791
3792  /* not if they're blocklisted */
3793  if (isAtomBlocklisted (tor->session, atom))
3794    return false;
3795
3796  /* not if they're banned... */
3797  if (atom->flags2 & MYFLAG_BANNED)
3798    return false;
3799
3800  return true;
3801}
3802
3803struct peer_candidate
3804{
3805  uint64_t score;
3806  tr_torrent * tor;
3807  struct peer_atom * atom;
3808};
3809
3810static bool
3811torrentWasRecentlyStarted (const tr_torrent * tor)
3812{
3813  return difftime (tr_time (), tor->startDate) < 120;
3814}
3815
3816static inline uint64_t
3817addValToKey (uint64_t value, int width, uint64_t addme)
3818{
3819  value = (value << (uint64_t)width);
3820  value |= addme;
3821  return value;
3822}
3823
3824/* smaller value is better */
3825static uint64_t
3826getPeerCandidateScore (const tr_torrent * tor, const struct peer_atom * atom, uint8_t salt)
3827{
3828  uint64_t i;
3829  uint64_t score = 0;
3830  const bool failed = atom->lastConnectionAt < atom->lastConnectionAttemptAt;
3831
3832  /* prefer peers we've connected to, or never tried, over peers we failed to connect to. */
3833  i = failed ? 1 : 0;
3834  score = addValToKey (score, 1, i);
3835
3836  /* prefer the one we attempted least recently (to cycle through all peers) */
3837  i = atom->lastConnectionAttemptAt;
3838  score = addValToKey (score, 32, i);
3839
3840  /* prefer peers belonging to a torrent of a higher priority */
3841  switch (tr_torrentGetPriority (tor))
3842    {
3843      case TR_PRI_HIGH:    i = 0; break;
3844      case TR_PRI_NORMAL:  i = 1; break;
3845      case TR_PRI_LOW:     i = 2; break;
3846    }
3847  score = addValToKey (score, 4, i);
3848
3849  /* prefer recently-started torrents */
3850  i = torrentWasRecentlyStarted (tor) ? 0 : 1;
3851  score = addValToKey (score, 1, i);
3852
3853  /* prefer torrents we're downloading with */
3854  i = tr_torrentIsSeed (tor) ? 1 : 0;
3855  score = addValToKey (score, 1, i);
3856
3857  /* prefer peers that are known to be connectible */
3858  i = (atom->flags & ADDED_F_CONNECTABLE) ? 0 : 1;
3859  score = addValToKey (score, 1, i);
3860
3861  /* prefer peers that we might have a chance of uploading to...
3862  so lower seed probability is better */
3863  if (atom->seedProbability == 100) i = 101;
3864  else if (atom->seedProbability == -1) i = 100;
3865  else i = atom->seedProbability;
3866  score = addValToKey (score, 8, i);
3867
3868  /* Prefer peers that we got from more trusted sources.
3869   * lower `fromBest' values indicate more trusted sources */
3870  score = addValToKey (score, 4, atom->fromBest);
3871
3872  /* salt */
3873  score = addValToKey (score, 8, salt);
3874
3875  return score;
3876}
3877
3878static int
3879comparePeerCandidates (const void * va, const void * vb)
3880{
3881  int ret;
3882  const struct peer_candidate * a = va;
3883  const struct peer_candidate * b = vb;
3884
3885  if (a->score < b->score)
3886    ret = -1;
3887  else if (a->score > b->score)
3888    ret = 1;
3889  else
3890    ret = 0;
3891
3892  return ret;
3893}
3894
3895/* Partial sorting -- selecting the k best candidates
3896   Adapted from http://en.wikipedia.org/wiki/Selection_algorithm */
3897static void
3898selectPeerCandidates (struct peer_candidate * candidates, int candidate_count, int select_count)
3899{
3900  tr_quickfindFirstK (candidates,
3901                      candidate_count,
3902                      sizeof(struct peer_candidate),
3903                      comparePeerCandidates,
3904                      select_count);
3905}
3906
3907#ifndef NDEBUG
3908static bool
3909checkBestScoresComeFirst (const struct peer_candidate * candidates, int n, int k)
3910{
3911  int i;
3912  uint64_t worstFirstScore = 0;
3913  const int x = MIN (n, k) - 1;
3914
3915  for (i=0; i<x; i++)
3916    if (worstFirstScore < candidates[i].score)
3917      worstFirstScore = candidates[i].score;
3918
3919  for (i=0; i<x; i++)
3920    assert (candidates[i].score <= worstFirstScore);
3921
3922  for (i=x+1; i<n; i++)
3923    assert (candidates[i].score >= worstFirstScore);
3924
3925  return true;
3926}
3927#endif /* NDEBUG */
3928
3929/** @return an array of all the atoms we might want to connect to */
3930static struct peer_candidate*
3931getPeerCandidates (tr_session * session, int * candidateCount, int max)
3932{
3933  int atomCount;
3934  int peerCount;
3935  tr_torrent * tor;
3936  struct peer_candidate * candidates;
3937  struct peer_candidate * walk;
3938  const time_t now = tr_time ();
3939  const uint64_t now_msec = tr_time_msec ();
3940  /* leave 5% of connection slots for incoming connections -- ticket #2609 */
3941  const int maxCandidates = tr_sessionGetPeerLimit (session) * 0.95;
3942
3943  /* count how many peers and atoms we've got */
3944  tor= NULL;
3945  atomCount = 0;
3946  peerCount = 0;
3947  while ((tor = tr_torrentNext (session, tor)))
3948    {
3949      atomCount += tr_ptrArraySize (&tor->swarm->pool);
3950      peerCount += tr_ptrArraySize (&tor->swarm->peers);
3951    }
3952
3953  /* don't start any new handshakes if we're full up */
3954  if (maxCandidates <= peerCount)
3955    {
3956      *candidateCount = 0;
3957      return NULL;
3958    }
3959
3960  /* allocate an array of candidates */
3961  walk = candidates = tr_new (struct peer_candidate, atomCount);
3962
3963  /* populate the candidate array */
3964  tor = NULL;
3965  while ((tor = tr_torrentNext (session, tor)))
3966    {
3967      int i, nAtoms;
3968      struct peer_atom ** atoms;
3969
3970      if (!tor->swarm->isRunning)
3971        continue;
3972
3973      /* if we've already got enough peers in this torrent... */
3974      if (tr_torrentGetPeerLimit (tor) <= tr_ptrArraySize (&tor->swarm->peers))
3975        continue;
3976
3977      /* if we've already got enough speed in this torrent... */
3978      if (tr_torrentIsSeed (tor) && isBandwidthMaxedOut (&tor->bandwidth, now_msec, TR_UP))
3979        continue;
3980
3981      atoms = (struct peer_atom**) tr_ptrArrayPeek (&tor->swarm->pool, &nAtoms);
3982      for (i=0; i<nAtoms; ++i)
3983        {
3984          struct peer_atom * atom = atoms[i];
3985
3986          if (isPeerCandidate (tor, atom, now))
3987            {
3988              const uint8_t salt = tr_rand_int_weak (1024);
3989              walk->tor = tor;
3990              walk->atom = atom;
3991              walk->score = getPeerCandidateScore (tor, atom, salt);
3992              ++walk;
3993            }
3994        }
3995    }
3996
3997  *candidateCount = walk - candidates;
3998  if (walk != candidates)
3999    selectPeerCandidates (candidates, walk-candidates, max);
4000
4001  assert (checkBestScoresComeFirst (candidates, *candidateCount, max));
4002  return candidates;
4003}
4004
4005static void
4006initiateConnection (tr_peerMgr * mgr, tr_swarm * s, struct peer_atom * atom)
4007{
4008  tr_peerIo * io;
4009  const time_t now = tr_time ();
4010  bool utp = tr_sessionIsUTPEnabled (mgr->session) && !atom->utp_failed;
4011
4012  if (atom->fromFirst == TR_PEER_FROM_PEX)
4013    /* PEX has explicit signalling for uTP support.  If an atom
4014       originally came from PEX and doesn't have the uTP flag, skip the
4015       uTP connection attempt.  Are we being optimistic here? */
4016    utp = utp && (atom->flags & ADDED_F_UTP_FLAGS);
4017
4018  tordbg (s, "Starting an OUTGOING%s connection with %s",
4019          utp ? " µTP" : "", tr_atomAddrStr (atom));
4020
4021  io = tr_peerIoNewOutgoing (mgr->session,
4022                             &mgr->session->bandwidth,
4023                             &atom->addr,
4024                             atom->port,
4025                             s->tor->info.hash,
4026                             s->tor->completeness == TR_SEED,
4027                             utp);
4028
4029  if (io == NULL)
4030    {
4031      tordbg (s, "peerIo not created; marking peer %s as unreachable", tr_atomAddrStr (atom));
4032      atom->flags2 |= MYFLAG_UNREACHABLE;
4033      atom->numFails++;
4034    }
4035  else
4036    {
4037      tr_handshake * handshake = tr_handshakeNew (io,
4038                                                  mgr->session->encryptionMode,
4039                                                  myHandshakeDoneCB,
4040                                                  mgr);
4041
4042      assert (tr_peerIoGetTorrentHash (io));
4043
4044      tr_peerIoUnref (io); /* balanced by the initial ref
4045                              in tr_peerIoNewOutgoing () */
4046
4047      tr_ptrArrayInsertSorted (&s->outgoingHandshakes, handshake,
4048                               handshakeCompare);
4049    }
4050
4051  atom->lastConnectionAttemptAt = now;
4052  atom->time = now;
4053}
4054
4055static void
4056initiateCandidateConnection (tr_peerMgr * mgr, struct peer_candidate * c)
4057{
4058#if 0
4059  fprintf (stderr, "Starting an OUTGOING connection with %s - [%s] seedProbability==%d; %s, %s\n",
4060           tr_atomAddrStr (c->atom),
4061           tr_torrentName (c->tor),
4062           (int)c->atom->seedProbability,
4063           tr_torrentIsPrivate (c->tor) ? "private" : "public",
4064           tr_torrentIsSeed (c->tor) ? "seed" : "downloader");
4065#endif
4066
4067  initiateConnection (mgr, c->tor->swarm, c->atom);
4068}
4069
4070static void
4071makeNewPeerConnections (struct tr_peerMgr * mgr, const int max)
4072{
4073  int i, n;
4074  struct peer_candidate * candidates;
4075
4076  candidates = getPeerCandidates (mgr->session, &n, max);
4077
4078  for (i=0; i<n && i<max; ++i)
4079    initiateCandidateConnection (mgr, &candidates[i]);
4080
4081  tr_free (candidates);
4082}
Note: See TracBrowser for help on using the repository browser.