source: trunk/libtransmission/peer-mgr.c @ 14525

Last change on this file since 14525 was 14525, checked in by mikedld, 6 years ago

Fix some issues revealed by coverity

  • Property svn:keywords set to Date Rev Author Id
File size: 108.6 KB
Line 
1/*
2 * This file Copyright (C) 2007-2014 Mnemosyne LLC
3 *
4 * It may be used under the GNU GPL versions 2 or 3
5 * or any future license endorsed by Mnemosyne LLC.
6 *
7 * $Id: peer-mgr.c 14525 2015-05-09 08:37:55Z mikedld $
8 */
9
10#include <assert.h>
11#include <errno.h> /* error codes ERANGE, ... */
12#include <limits.h> /* INT_MAX */
13#include <string.h> /* memcpy, memcmp, strstr */
14#include <stdlib.h> /* qsort */
15
16#include <event2/event.h>
17
18#include <libutp/utp.h>
19
20#include "transmission.h"
21#include "announcer.h"
22#include "bandwidth.h"
23#include "blocklist.h"
24#include "cache.h"
25#include "clients.h"
26#include "completion.h"
27#include "crypto-utils.h"
28#include "handshake.h"
29#include "log.h"
30#include "net.h"
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-msgs.h"
34#include "ptrarray.h"
35#include "session.h"
36#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
37#include "torrent.h"
38#include "tr-utp.h"
39#include "utils.h"
40#include "webseed.h"
41
42enum
43{
44  /* how frequently to cull old atoms */
45  ATOM_PERIOD_MSEC = (60 * 1000),
46
47  /* how frequently to change which peers are choked */
48  RECHOKE_PERIOD_MSEC = (10 * 1000),
49
50  /* an optimistically unchoked peer is immune from rechoking
51     for this many calls to rechokeUploads (). */
52  OPTIMISTIC_UNCHOKE_MULTIPLIER = 4,
53
54  /* how frequently to reallocate bandwidth */
55  BANDWIDTH_PERIOD_MSEC = 500,
56
57  /* how frequently to age out old piece request lists */
58  REFILL_UPKEEP_PERIOD_MSEC = (10 * 1000),
59
60  /* how frequently to decide which peers live and die */
61  RECONNECT_PERIOD_MSEC = 500,
62
63  /* when many peers are available, keep idle ones this long */
64  MIN_UPLOAD_IDLE_SECS = (60),
65
66  /* when few peers are available, keep idle ones this long */
67  MAX_UPLOAD_IDLE_SECS = (60 * 5),
68
69  /* max number of peers to ask for per second overall.
70   * this throttle is to avoid overloading the router */
71  MAX_CONNECTIONS_PER_SECOND = 12,
72
73  MAX_CONNECTIONS_PER_PULSE = (int)(MAX_CONNECTIONS_PER_SECOND * (RECONNECT_PERIOD_MSEC/1000.0)),
74
75  /* number of bad pieces a peer is allowed to send before we ban them */
76  MAX_BAD_PIECES_PER_PEER = 5,
77
78  /* amount of time to keep a list of request pieces lying around
79     before it's considered too old and needs to be rebuilt */
80  PIECE_LIST_SHELF_LIFE_SECS = 60,
81
82  /* use for bitwise operations w/peer_atom.flags2 */
83  MYFLAG_BANNED = 1,
84
85  /* use for bitwise operations w/peer_atom.flags2 */
86  /* unreachable for now... but not banned.
87   * if they try to connect to us it's okay */
88  MYFLAG_UNREACHABLE = 2,
89
90  /* the minimum we'll wait before attempting to reconnect to a peer */
91  MINIMUM_RECONNECT_INTERVAL_SECS = 5,
92
93  /** how long we'll let requests we've made linger before we cancel them */
94  REQUEST_TTL_SECS = 90,
95
96  NO_BLOCKS_CANCEL_HISTORY = 120,
97
98  CANCEL_HISTORY_SEC = 60
99};
100
101const tr_peer_event TR_PEER_EVENT_INIT = { 0, 0, NULL, 0, 0, 0, 0 };
102
103const tr_swarm_stats TR_SWARM_STATS_INIT = { { 0, 0 }, 0, 0, { 0, 0, 0, 0, 0, 0, 0 } };
104
105/**
106***
107**/
108
109/**
110 * Peer information that should be kept even before we've connected and
111 * after we've disconnected. These are kept in a pool of peer_atoms to decide
112 * which ones would make good candidates for connecting to, and to watch out
113 * for banned peers.
114 *
115 * @see tr_peer
116 * @see tr_peerMsgs
117 */
118struct peer_atom
119{
120  uint8_t     fromFirst;          /* where the peer was first found */
121  uint8_t     fromBest;           /* the "best" value of where the peer has been found */
122  uint8_t     flags;              /* these match the added_f flags */
123  uint8_t     flags2;             /* flags that aren't defined in added_f */
124  int8_t      seedProbability;    /* how likely is this to be a seed... [0..100] or -1 for unknown */
125  int8_t      blocklisted;        /* -1 for unknown, true for blocklisted, false for not blocklisted */
126
127  tr_port     port;
128  bool        utp_failed;         /* We recently failed to connect over uTP */
129  uint16_t    numFails;
130  time_t      time;               /* when the peer's connection status last changed */
131  time_t      piece_data_time;
132
133  time_t      lastConnectionAttemptAt;
134  time_t      lastConnectionAt;
135
136  /* similar to a TTL field, but less rigid --
137   * if the swarm is small, the atom will be kept past this date. */
138  time_t      shelf_date;
139  tr_peer   * peer;               /* will be NULL if not connected */
140  tr_address  addr;
141};
142
143#ifdef NDEBUG
144#define tr_isAtom(a) (TRUE)
145#else
146static bool
147tr_isAtom (const struct peer_atom * atom)
148{
149  return (atom != NULL)
150      && (atom->fromFirst < TR_PEER_FROM__MAX)
151      && (atom->fromBest < TR_PEER_FROM__MAX)
152      && (tr_address_is_valid (&atom->addr));
153}
154#endif
155
156static const char*
157tr_atomAddrStr (const struct peer_atom * atom)
158{
159  return atom ? tr_peerIoAddrStr (&atom->addr, atom->port) : "[no atom]";
160}
161
162struct block_request
163{
164  tr_block_index_t block;
165  tr_peer * peer;
166  time_t sentAt;
167};
168
169struct weighted_piece
170{
171  tr_piece_index_t index;
172  int16_t salt;
173  int16_t requestCount;
174};
175
176enum piece_sort_state
177{
178  PIECES_UNSORTED,
179  PIECES_SORTED_BY_INDEX,
180  PIECES_SORTED_BY_WEIGHT
181};
182
183/** @brief Opaque, per-torrent data structure for peer connection information */
184typedef struct tr_swarm
185{
186  tr_swarm_stats             stats;
187
188  tr_ptrArray                outgoingHandshakes; /* tr_handshake */
189  tr_ptrArray                pool; /* struct peer_atom */
190  tr_ptrArray                peers; /* tr_peerMsgs */
191  tr_ptrArray                webseeds; /* tr_webseed */
192
193  tr_torrent               * tor;
194  struct tr_peerMgr        * manager;
195
196  tr_peerMsgs              * optimistic; /* the optimistic peer, or NULL if none */
197  int                        optimisticUnchokeTimeScaler;
198
199  bool                       isRunning;
200  bool                       needsCompletenessCheck;
201
202  struct block_request     * requests;
203  int                        requestCount;
204  int                        requestAlloc;
205
206  struct weighted_piece    * pieces;
207  int                        pieceCount;
208  enum piece_sort_state      pieceSortState;
209
210  /* An array of pieceCount items stating how many peers have each piece.
211     This is used to help us for downloading pieces "rarest first."
212     This may be NULL if we don't have metainfo yet, or if we're not
213     downloading and don't care about rarity */
214  uint16_t                 * pieceReplication;
215  size_t                     pieceReplicationSize;
216
217  int                        interestedCount;
218  int                        maxPeers;
219  time_t                     lastCancel;
220
221  /* Before the endgame this should be 0. In endgame, is contains the average
222   * number of pending requests per peer. Only peers which have more pending
223   * requests are considered 'fast' are allowed to request a block that's
224   * already been requested from another (slower?) peer. */
225  int                        endgame;
226}
227tr_swarm;
228
229struct tr_peerMgr
230{
231  tr_session    * session;
232  tr_ptrArray     incomingHandshakes; /* tr_handshake */
233  struct event  * bandwidthTimer;
234  struct event  * rechokeTimer;
235  struct event  * refillUpkeepTimer;
236  struct event  * atomTimer;
237};
238
239#define tordbg(t, ...) \
240  do \
241    { \
242      if (tr_logGetDeepEnabled ()) \
243        tr_logAddDeep (__FILE__, __LINE__, tr_torrentName (t->tor), __VA_ARGS__); \
244    } \
245  while (0)
246
247#define dbgmsg(...) \
248  do \
249    { \
250      if (tr_logGetDeepEnabled ()) \
251        tr_logAddDeep (__FILE__, __LINE__, NULL, __VA_ARGS__); \
252    } \
253  while (0)
254
255/**
256*** tr_peer virtual functions
257**/
258
259static bool
260tr_peerIsTransferringPieces (const tr_peer * peer,
261                             uint64_t        now,
262                             tr_direction    direction,
263                             unsigned int  * Bps)
264{
265  assert (peer != NULL);
266  assert (peer->funcs != NULL);
267
268  return (*peer->funcs->is_transferring_pieces)(peer, now, direction, Bps);
269}
270
271unsigned int
272tr_peerGetPieceSpeed_Bps (const tr_peer    * peer,
273                          uint64_t           now,
274                          tr_direction       direction)
275{
276  unsigned int Bps = 0;
277  tr_peerIsTransferringPieces (peer, now, direction, &Bps);
278  return Bps;
279}
280
281static void
282tr_peerFree (tr_peer * peer)
283{
284  assert (peer != NULL);
285  assert (peer->funcs != NULL);
286
287  (*peer->funcs->destruct)(peer);
288
289  tr_free (peer);
290}
291
292void
293tr_peerConstruct (tr_peer * peer, const tr_torrent * tor)
294{
295  assert (peer != NULL);
296  assert (tr_isTorrent (tor));
297
298  memset (peer, 0, sizeof (tr_peer));
299
300  peer->client = TR_KEY_NONE;
301  peer->swarm = tor->swarm;
302  tr_bitfieldConstruct (&peer->have, tor->info.pieceCount);
303  tr_bitfieldConstruct (&peer->blame, tor->blockCount);
304}
305
306static void peerDeclinedAllRequests (tr_swarm *, const tr_peer *);
307
308void
309tr_peerDestruct (tr_peer * peer)
310{
311  assert (peer != NULL);
312
313  if (peer->swarm != NULL)
314    peerDeclinedAllRequests (peer->swarm, peer);
315
316  tr_bitfieldDestruct (&peer->have);
317  tr_bitfieldDestruct (&peer->blame);
318
319  if (peer->atom)
320    peer->atom->peer = NULL;
321}
322
323/**
324***
325**/
326
327static inline void
328managerLock (const struct tr_peerMgr * manager)
329{
330  tr_sessionLock (manager->session);
331}
332
333static inline void
334managerUnlock (const struct tr_peerMgr * manager)
335{
336  tr_sessionUnlock (manager->session);
337}
338
339static inline void
340swarmLock (tr_swarm * swarm)
341{
342  managerLock (swarm->manager);
343}
344
345static inline void
346swarmUnlock (tr_swarm * swarm)
347{
348  managerUnlock (swarm->manager);
349}
350
351static inline int
352swarmIsLocked (const tr_swarm * swarm)
353{
354  return tr_sessionIsLocked (swarm->manager->session);
355}
356
357/**
358***
359**/
360
361static int
362handshakeCompareToAddr (const void * va, const void * vb)
363{
364  const tr_handshake * a = va;
365
366  return tr_address_compare (tr_handshakeGetAddr (a, NULL), vb);
367}
368
369static int
370handshakeCompare (const void * a, const void * b)
371{
372  return handshakeCompareToAddr (a, tr_handshakeGetAddr (b, NULL));
373}
374
375static inline tr_handshake*
376getExistingHandshake (tr_ptrArray * handshakes, const tr_address * addr)
377{
378  if (tr_ptrArrayEmpty (handshakes))
379    return NULL;
380
381  return tr_ptrArrayFindSorted (handshakes, addr, handshakeCompareToAddr);
382}
383
384static int
385comparePeerAtomToAddress (const void * va, const void * vb)
386{
387  const struct peer_atom * a = va;
388
389  return tr_address_compare (&a->addr, vb);
390}
391
392static int
393compareAtomsByAddress (const void * va, const void * vb)
394{
395  const struct peer_atom * b = vb;
396
397  assert (tr_isAtom (b));
398
399  return comparePeerAtomToAddress (va, &b->addr);
400}
401
402/**
403***
404**/
405
406const tr_address *
407tr_peerAddress (const tr_peer * peer)
408{
409  return &peer->atom->addr;
410}
411
412static tr_swarm *
413getExistingSwarm (tr_peerMgr *    manager,
414                  const uint8_t * hash)
415{
416  tr_torrent * tor = tr_torrentFindFromHash (manager->session, hash);
417
418  return tor == NULL ? NULL : tor->swarm;
419}
420
421static int
422peerCompare (const void * a, const void * b)
423{
424  return tr_address_compare (tr_peerAddress (a), tr_peerAddress (b));
425}
426
427static struct peer_atom*
428getExistingAtom (const tr_swarm   * cswarm,
429                 const tr_address * addr)
430{
431  tr_swarm * swarm = (tr_swarm*) cswarm;
432  return tr_ptrArrayFindSorted (&swarm->pool, addr, comparePeerAtomToAddress);
433}
434
435static bool
436peerIsInUse (const tr_swarm * cs, const struct peer_atom * atom)
437{
438  tr_swarm * s = (tr_swarm*) cs;
439
440  assert (swarmIsLocked (s));
441
442  return (atom->peer != NULL)
443      || getExistingHandshake (&s->outgoingHandshakes, &atom->addr)
444      || getExistingHandshake (&s->manager->incomingHandshakes, &atom->addr);
445}
446
447static inline bool
448replicationExists (const tr_swarm * s)
449{
450  return s->pieceReplication != NULL;
451}
452
453static void
454replicationFree (tr_swarm * s)
455{
456  tr_free (s->pieceReplication);
457  s->pieceReplication = NULL;
458  s->pieceReplicationSize = 0;
459}
460
461static void
462replicationNew (tr_swarm * s)
463{
464  tr_piece_index_t piece_i;
465  const tr_piece_index_t piece_count = s->tor->info.pieceCount;
466  const int n = tr_ptrArraySize (&s->peers);
467
468  assert (!replicationExists (s));
469
470  s->pieceReplicationSize = piece_count;
471  s->pieceReplication = tr_new0 (uint16_t, piece_count);
472
473  for (piece_i=0; piece_i<piece_count; ++piece_i)
474    {
475      int peer_i;
476      uint16_t r = 0;
477
478      for (peer_i=0; peer_i<n; ++peer_i)
479        {
480          tr_peer * peer = tr_ptrArrayNth (&s->peers, peer_i);
481          if (tr_bitfieldHas (&peer->have, piece_i))
482            ++r;
483        }
484
485      s->pieceReplication[piece_i] = r;
486    }
487}
488
489static void
490swarmFree (void * vs)
491{
492  tr_swarm * s = vs;
493
494  assert (s);
495  assert (!s->isRunning);
496  assert (swarmIsLocked (s));
497  assert (tr_ptrArrayEmpty (&s->outgoingHandshakes));
498  assert (tr_ptrArrayEmpty (&s->peers));
499
500  tr_ptrArrayDestruct (&s->webseeds, (PtrArrayForeachFunc)tr_peerFree);
501  tr_ptrArrayDestruct (&s->pool, (PtrArrayForeachFunc)tr_free);
502  tr_ptrArrayDestruct (&s->outgoingHandshakes, NULL);
503  tr_ptrArrayDestruct (&s->peers, NULL);
504  s->stats = TR_SWARM_STATS_INIT;
505
506  replicationFree (s);
507
508  tr_free (s->requests);
509  tr_free (s->pieces);
510  tr_free (s);
511}
512
513static void peerCallbackFunc (tr_peer *, const tr_peer_event *, void *);
514
515static void
516rebuildWebseedArray (tr_swarm * s, tr_torrent * tor)
517{
518  unsigned int i;
519  const tr_info * inf = &tor->info;
520
521  /* clear the array */
522  tr_ptrArrayDestruct (&s->webseeds, (PtrArrayForeachFunc)tr_peerFree);
523  s->webseeds = TR_PTR_ARRAY_INIT;
524  s->stats.activeWebseedCount = 0;
525
526  /* repopulate it */
527  for (i=0; i<inf->webseedCount; ++i)
528    {
529      tr_webseed * w = tr_webseedNew (tor, inf->webseeds[i], peerCallbackFunc, s);
530      tr_ptrArrayAppend (&s->webseeds, w);
531    }
532}
533
534static tr_swarm *
535swarmNew (tr_peerMgr * manager, tr_torrent * tor)
536{
537  tr_swarm * s;
538
539  s = tr_new0 (tr_swarm, 1);
540  s->manager = manager;
541  s->tor = tor;
542  s->pool = TR_PTR_ARRAY_INIT;
543  s->peers = TR_PTR_ARRAY_INIT;
544  s->webseeds = TR_PTR_ARRAY_INIT;
545  s->outgoingHandshakes = TR_PTR_ARRAY_INIT;
546
547  rebuildWebseedArray (s, tor);
548
549  return s;
550}
551
552static void ensureMgrTimersExist (struct tr_peerMgr * m);
553
554tr_peerMgr*
555tr_peerMgrNew (tr_session * session)
556{
557  tr_peerMgr * m = tr_new0 (tr_peerMgr, 1);
558  m->session = session;
559  m->incomingHandshakes = TR_PTR_ARRAY_INIT;
560  ensureMgrTimersExist (m);
561  return m;
562}
563
564static void
565deleteTimer (struct event ** t)
566{
567  if (*t != NULL)
568    {
569      event_free (*t);
570      *t = NULL;
571    }
572}
573
574static void
575deleteTimers (struct tr_peerMgr * m)
576{
577  deleteTimer (&m->atomTimer);
578  deleteTimer (&m->bandwidthTimer);
579  deleteTimer (&m->rechokeTimer);
580  deleteTimer (&m->refillUpkeepTimer);
581}
582
583void
584tr_peerMgrFree (tr_peerMgr * manager)
585{
586  managerLock (manager);
587
588  deleteTimers (manager);
589
590  /* free the handshakes. Abort invokes handshakeDoneCB (), which removes
591   * the item from manager->handshakes, so this is a little roundabout... */
592  while (!tr_ptrArrayEmpty (&manager->incomingHandshakes))
593    tr_handshakeAbort (tr_ptrArrayNth (&manager->incomingHandshakes, 0));
594
595  tr_ptrArrayDestruct (&manager->incomingHandshakes, NULL);
596
597  managerUnlock (manager);
598  tr_free (manager);
599}
600
601/***
602****
603***/
604
605void
606tr_peerMgrOnBlocklistChanged (tr_peerMgr * mgr)
607{
608  tr_torrent * tor = NULL;
609  tr_session * session = mgr->session;
610
611  /* we cache whether or not a peer is blocklisted...
612     since the blocklist has changed, erase that cached value */
613  while ((tor = tr_torrentNext (session, tor)))
614    {
615      int i;
616      tr_swarm * s = tor->swarm;
617      const int n = tr_ptrArraySize (&s->pool);
618      for (i=0; i<n; ++i)
619        {
620          struct peer_atom * atom = tr_ptrArrayNth (&s->pool, i);
621          atom->blocklisted = -1;
622        }
623    }
624}
625
626static bool
627isAtomBlocklisted (tr_session * session, struct peer_atom * atom)
628{
629  if (atom->blocklisted < 0)
630    atom->blocklisted = tr_sessionIsAddressBlocked (session, &atom->addr);
631
632  assert (tr_isBool (atom->blocklisted));
633  return atom->blocklisted;
634}
635
636
637/***
638****
639***/
640
641static void
642atomSetSeedProbability (struct peer_atom * atom, int seedProbability)
643{
644  assert (atom != NULL);
645  assert (-1<=seedProbability && seedProbability<=100);
646
647  atom->seedProbability = seedProbability;
648
649  if (seedProbability == 100)
650    atom->flags |= ADDED_F_SEED_FLAG;
651  else if (seedProbability != -1)
652    atom->flags &= ~ADDED_F_SEED_FLAG;
653}
654
655static inline bool
656atomIsSeed (const struct peer_atom * atom)
657{
658  return atom->seedProbability == 100;
659}
660
661static void
662atomSetSeed (const tr_swarm * s, struct peer_atom * atom)
663{
664  if (!atomIsSeed (atom))
665    {
666      tordbg (s, "marking peer %s as a seed", tr_atomAddrStr (atom));
667
668      atomSetSeedProbability (atom, 100);
669    }
670}
671
672
673bool
674tr_peerMgrPeerIsSeed (const tr_torrent  * tor,
675                      const tr_address  * addr)
676{
677  bool isSeed = false;
678  const tr_swarm * s = tor->swarm;
679  const struct peer_atom * atom = getExistingAtom (s, addr);
680
681  if (atom)
682    isSeed = atomIsSeed (atom);
683
684  return isSeed;
685}
686
687void
688tr_peerMgrSetUtpSupported (tr_torrent * tor, const tr_address * addr)
689{
690  struct peer_atom * atom = getExistingAtom (tor->swarm, addr);
691
692  if (atom)
693    atom->flags |= ADDED_F_UTP_FLAGS;
694}
695
696void
697tr_peerMgrSetUtpFailed (tr_torrent *tor, const tr_address *addr, bool failed)
698{
699  struct peer_atom * atom = getExistingAtom (tor->swarm, addr);
700
701  if (atom)
702    atom->utp_failed = failed;
703}
704
705
706/**
707***  REQUESTS
708***
709*** There are two data structures associated with managing block requests:
710***
711*** 1. tr_swarm::requests, an array of "struct block_request" which keeps
712***    track of which blocks have been requested, and when, and by which peers.
713***    This is list is used for (a) cancelling requests that have been pending
714***    for too long and (b) avoiding duplicate requests before endgame.
715***
716*** 2. tr_swarm::pieces, an array of "struct weighted_piece" which lists the
717***    pieces that we want to request. It's used to decide which blocks to
718***    return next when tr_peerMgrGetBlockRequests () is called.
719**/
720
721/**
722*** struct block_request
723**/
724
725static int
726compareReqByBlock (const void * va, const void * vb)
727{
728  const struct block_request * a = va;
729  const struct block_request * b = vb;
730
731  /* primary key: block */
732  if (a->block < b->block) return -1;
733  if (a->block > b->block) return 1;
734
735  /* secondary key: peer */
736  if (a->peer < b->peer) return -1;
737  if (a->peer > b->peer) return 1;
738
739  return 0;
740}
741
742static void
743requestListAdd (tr_swarm * s, tr_block_index_t block, tr_peer * peer)
744{
745  struct block_request key;
746
747  /* ensure enough room is available... */
748  if (s->requestCount + 1 >= s->requestAlloc)
749    {
750      const int CHUNK_SIZE = 128;
751      s->requestAlloc += CHUNK_SIZE;
752      s->requests = tr_renew (struct block_request,
753                              s->requests, s->requestAlloc);
754    }
755
756  /* populate the record we're inserting */
757  key.block = block;
758  key.peer = peer;
759  key.sentAt = tr_time ();
760
761  /* insert the request to our array... */
762  {
763    bool exact;
764    const int pos = tr_lowerBound (&key, s->requests, s->requestCount,
765                                   sizeof (struct block_request),
766                                   compareReqByBlock, &exact);
767    assert (!exact);
768    memmove (s->requests + pos + 1,
769             s->requests + pos,
770             sizeof (struct block_request) * (s->requestCount++ - pos));
771    s->requests[pos] = key;
772  }
773
774  if (peer != NULL)
775    {
776      ++peer->pendingReqsToPeer;
777      assert (peer->pendingReqsToPeer >= 0);
778    }
779
780  /*fprintf (stderr, "added request of block %lu from peer %s... "
781                     "there are now %d block\n",
782                     (unsigned long)block, tr_atomAddrStr (peer->atom), s->requestCount);*/
783}
784
785static struct block_request *
786requestListLookup (tr_swarm * s, tr_block_index_t block, const tr_peer * peer)
787{
788  struct block_request key;
789  key.block = block;
790  key.peer = (tr_peer*) peer;
791
792  return bsearch (&key, s->requests, s->requestCount,
793                  sizeof (struct block_request),
794                  compareReqByBlock);
795}
796
797/**
798 * Find the peers are we currently requesting the block
799 * with index @a block from and append them to @a peerArr.
800 */
801static void
802getBlockRequestPeers (tr_swarm * s, tr_block_index_t block,
803                      tr_ptrArray * peerArr)
804{
805  bool exact;
806  int i, pos;
807  struct block_request key;
808
809  key.block = block;
810  key.peer = NULL;
811  pos = tr_lowerBound (&key, s->requests, s->requestCount,
812                       sizeof (struct block_request),
813                       compareReqByBlock, &exact);
814
815  assert (!exact); /* shouldn't have a request with .peer == NULL */
816
817  for (i=pos; i<s->requestCount; ++i)
818    {
819      if (s->requests[i].block != block)
820        break;
821      tr_ptrArrayAppend (peerArr, s->requests[i].peer);
822    }
823}
824
825static void
826decrementPendingReqCount (const struct block_request * b)
827{
828  if (b->peer != NULL)
829    if (b->peer->pendingReqsToPeer > 0)
830      --b->peer->pendingReqsToPeer;
831}
832
833static void
834requestListRemove (tr_swarm * s, tr_block_index_t block, const tr_peer * peer)
835{
836  const struct block_request * b = requestListLookup (s, block, peer);
837
838  if (b != NULL)
839    {
840      const int pos = b - s->requests;
841      assert (pos < s->requestCount);
842
843      decrementPendingReqCount (b);
844
845      tr_removeElementFromArray (s->requests,
846                                 pos,
847                                 sizeof (struct block_request),
848                                 s->requestCount--);
849
850      /*fprintf (stderr, "removing request of block %lu from peer %s... "
851                         "there are now %d block requests left\n",
852                         (unsigned long)block, tr_atomAddrStr (peer->atom), t->requestCount);*/
853    }
854}
855
856static int
857countActiveWebseeds (tr_swarm * s)
858{
859  int activeCount = 0;
860
861  if (s->tor->isRunning && !tr_torrentIsSeed (s->tor))
862    {
863      int i;
864      const int n = tr_ptrArraySize (&s->webseeds);
865      const uint64_t now = tr_time_msec ();
866
867      for (i=0; i<n; ++i)
868        if (tr_peerIsTransferringPieces (tr_ptrArrayNth(&s->webseeds,i), now, TR_DOWN, NULL))
869          ++activeCount;
870    }
871
872  return activeCount;
873}
874
875static bool
876testForEndgame (const tr_swarm * s)
877{
878  /* we consider ourselves to be in endgame if the number of bytes
879     we've got requested is >= the number of bytes left to download */
880  return ((uint64_t) s->requestCount * s->tor->blockSize)
881               >= tr_torrentGetLeftUntilDone (s->tor);
882}
883
884static void
885updateEndgame (tr_swarm * s)
886{
887  assert (s->requestCount >= 0);
888
889  if (!testForEndgame (s))
890    {
891      /* not in endgame */
892      s->endgame = 0;
893    }
894  else if (!s->endgame) /* only recalculate when endgame first begins */
895    {
896      int i;
897      int numDownloading = 0;
898      const int n = tr_ptrArraySize (&s->peers);
899
900      /* add the active bittorrent peers... */
901      for (i=0; i<n; ++i)
902        {
903          const tr_peer * p = tr_ptrArrayNth (&s->peers, i);
904          if (p->pendingReqsToPeer > 0)
905            ++numDownloading;
906        }
907
908      /* add the active webseeds... */
909      numDownloading += countActiveWebseeds (s);
910
911      /* average number of pending requests per downloading peer */
912      s->endgame = s->requestCount / MAX (numDownloading, 1);
913    }
914}
915
916
917/****
918*****
919*****  Piece List Manipulation / Accessors
920*****
921****/
922
923static inline void
924invalidatePieceSorting (tr_swarm * s)
925{
926  s->pieceSortState = PIECES_UNSORTED;
927}
928
929static const tr_torrent * weightTorrent;
930
931static const uint16_t * weightReplication;
932
933static void
934setComparePieceByWeightTorrent (tr_swarm * s)
935{
936  if (!replicationExists (s))
937    replicationNew (s);
938
939  weightTorrent = s->tor;
940  weightReplication = s->pieceReplication;
941}
942
943/* we try to create a "weight" s.t. high-priority pieces come before others,
944 * and that partially-complete pieces come before empty ones. */
945static int
946comparePieceByWeight (const void * va, const void * vb)
947{
948  const struct weighted_piece * a = va;
949  const struct weighted_piece * b = vb;
950  int ia, ib, missing, pending;
951  const tr_torrent * tor = weightTorrent;
952  const uint16_t * rep = weightReplication;
953
954  /* primary key: weight */
955  missing = tr_torrentMissingBlocksInPiece (tor, a->index);
956  pending = a->requestCount;
957  ia = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
958  missing = tr_torrentMissingBlocksInPiece (tor, b->index);
959  pending = b->requestCount;
960  ib = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
961  if (ia < ib) return -1;
962  if (ia > ib) return 1;
963
964  /* secondary key: higher priorities go first */
965  ia = tor->info.pieces[a->index].priority;
966  ib = tor->info.pieces[b->index].priority;
967  if (ia > ib) return -1;
968  if (ia < ib) return 1;
969
970  /* tertiary key: rarest first. */
971  ia = rep[a->index];
972  ib = rep[b->index];
973  if (ia < ib) return -1;
974  if (ia > ib) return 1;
975
976  /* quaternary key: random */
977  if (a->salt < b->salt) return -1;
978  if (a->salt > b->salt) return 1;
979
980  /* okay, they're equal */
981  return 0;
982}
983
984static int
985comparePieceByIndex (const void * va, const void * vb)
986{
987  const struct weighted_piece * a = va;
988  const struct weighted_piece * b = vb;
989  if (a->index < b->index) return -1;
990  if (a->index > b->index) return 1;
991  return 0;
992}
993
994static void
995pieceListSort (tr_swarm * s, enum piece_sort_state state)
996{
997  assert (state==PIECES_SORTED_BY_INDEX
998       || state==PIECES_SORTED_BY_WEIGHT);
999
1000  if (state == PIECES_SORTED_BY_WEIGHT)
1001    {
1002      setComparePieceByWeightTorrent (s);
1003      qsort (s->pieces, s->pieceCount, sizeof (struct weighted_piece), comparePieceByWeight);
1004    }
1005  else
1006    {
1007      qsort (s->pieces, s->pieceCount, sizeof (struct weighted_piece), comparePieceByIndex);
1008    }
1009
1010  s->pieceSortState = state;
1011}
1012
1013/**
1014 * These functions are useful for testing, but too expensive for nightly builds.
1015 * let's leave it disabled but add an easy hook to compile it back in
1016 */
1017#if 1
1018#define assertWeightedPiecesAreSorted(t)
1019#define assertReplicationCountIsExact(t)
1020#else
1021static void
1022assertWeightedPiecesAreSorted (Torrent * t)
1023{
1024    if (!t->endgame)
1025    {
1026        int i;
1027        setComparePieceByWeightTorrent (t);
1028        for (i=0; i<t->pieceCount-1; ++i)
1029            assert (comparePieceByWeight (&t->pieces[i], &t->pieces[i+1]) <= 0);
1030    }
1031}
1032static void
1033assertReplicationCountIsExact (Torrent * t)
1034{
1035    /* This assert might fail due to errors of implementations in other
1036     * clients. It happens when receiving duplicate bitfields/HaveAll/HaveNone
1037     * from a client. If a such a behavior is noticed,
1038     * a bug report should be filled to the faulty client. */
1039
1040    size_t piece_i;
1041    const uint16_t * rep = t->pieceReplication;
1042    const size_t piece_count = t->pieceReplicationSize;
1043    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase (&t->peers);
1044    const int peer_count = tr_ptrArraySize (&t->peers);
1045
1046    assert (piece_count == t->tor->info.pieceCount);
1047
1048    for (piece_i=0; piece_i<piece_count; ++piece_i)
1049    {
1050        int peer_i;
1051        uint16_t r = 0;
1052
1053        for (peer_i=0; peer_i<peer_count; ++peer_i)
1054            if (tr_bitsetHas (&peers[peer_i]->have, piece_i))
1055                ++r;
1056
1057        assert (rep[piece_i] == r);
1058    }
1059}
1060#endif
1061
1062static struct weighted_piece *
1063pieceListLookup (tr_swarm * s, tr_piece_index_t index)
1064{
1065  int i;
1066
1067  for (i=0; i<s->pieceCount; ++i)
1068    if (s->pieces[i].index == index)
1069      return &s->pieces[i];
1070
1071  return NULL;
1072}
1073
1074static void
1075pieceListRebuild (tr_swarm * s)
1076{
1077  if (!tr_torrentIsSeed (s->tor))
1078    {
1079      tr_piece_index_t i;
1080      tr_piece_index_t * pool;
1081      tr_piece_index_t poolCount = 0;
1082      const tr_torrent * tor = s->tor;
1083      const tr_info * inf = tr_torrentInfo (tor);
1084      struct weighted_piece * pieces;
1085      int pieceCount;
1086
1087      /* build the new list */
1088      pool = tr_new (tr_piece_index_t, inf->pieceCount);
1089      for (i=0; i<inf->pieceCount; ++i)
1090        if (!inf->pieces[i].dnd)
1091          if (!tr_torrentPieceIsComplete (tor, i))
1092            pool[poolCount++] = i;
1093      pieceCount = poolCount;
1094      pieces = tr_new0 (struct weighted_piece, pieceCount);
1095      for (i=0; i<poolCount; ++i)
1096        {
1097          struct weighted_piece * piece = pieces + i;
1098          piece->index = pool[i];
1099          piece->requestCount = 0;
1100          piece->salt = tr_rand_int_weak (4096);
1101        }
1102
1103      /* if we already had a list of pieces, merge it into
1104       * the new list so we don't lose its requestCounts */
1105      if (s->pieces != NULL)
1106        {
1107          struct weighted_piece * o = s->pieces;
1108          struct weighted_piece * oend = o + s->pieceCount;
1109          struct weighted_piece * n = pieces;
1110          struct weighted_piece * nend = n + pieceCount;
1111
1112          pieceListSort (s, PIECES_SORTED_BY_INDEX);
1113
1114          while (o!=oend && n!=nend)
1115            {
1116              if (o->index < n->index)
1117                ++o;
1118              else if (o->index > n->index)
1119                ++n;
1120              else
1121                *n++ = *o++;
1122            }
1123
1124          tr_free (s->pieces);
1125        }
1126
1127      s->pieces = pieces;
1128      s->pieceCount = pieceCount;
1129
1130      pieceListSort (s, PIECES_SORTED_BY_WEIGHT);
1131
1132      /* cleanup */
1133      tr_free (pool);
1134    }
1135}
1136
1137static void
1138pieceListRemovePiece (tr_swarm * s, tr_piece_index_t piece)
1139{
1140  struct weighted_piece * p;
1141
1142  if ((p = pieceListLookup (s, piece)))
1143    {
1144      const int pos = p - s->pieces;
1145
1146      tr_removeElementFromArray (s->pieces,
1147                                 pos,
1148                                 sizeof (struct weighted_piece),
1149                                 s->pieceCount--);
1150
1151      if (s->pieceCount == 0)
1152        {
1153          tr_free (s->pieces);
1154          s->pieces = NULL;
1155        }
1156    }
1157}
1158
1159static void
1160pieceListResortPiece (tr_swarm * s, struct weighted_piece * p)
1161{
1162  int pos;
1163  bool isSorted = true;
1164
1165  if (p == NULL)
1166    return;
1167
1168  /* is the torrent already sorted? */
1169  pos = p - s->pieces;
1170  setComparePieceByWeightTorrent (s);
1171  if (isSorted && (pos > 0) && (comparePieceByWeight (p-1, p) > 0))
1172    isSorted = false;
1173  if (isSorted && (pos < s->pieceCount - 1) && (comparePieceByWeight (p, p+1) > 0))
1174    isSorted = false;
1175
1176  if (s->pieceSortState != PIECES_SORTED_BY_WEIGHT)
1177    {
1178      pieceListSort (s, PIECES_SORTED_BY_WEIGHT);
1179      isSorted = true;
1180    }
1181
1182  /* if it's not sorted, move it around */
1183  if (!isSorted)
1184    {
1185      bool exact;
1186      const struct weighted_piece tmp = *p;
1187
1188      tr_removeElementFromArray (s->pieces,
1189                                 pos,
1190                                 sizeof (struct weighted_piece),
1191                                 s->pieceCount--);
1192
1193      pos = tr_lowerBound (&tmp, s->pieces, s->pieceCount,
1194                           sizeof (struct weighted_piece),
1195                           comparePieceByWeight, &exact);
1196
1197      memmove (&s->pieces[pos + 1],
1198               &s->pieces[pos],
1199               sizeof (struct weighted_piece) * (s->pieceCount++ - pos));
1200
1201      s->pieces[pos] = tmp;
1202    }
1203
1204  assertWeightedPiecesAreSorted (s);
1205}
1206
1207static void
1208pieceListRemoveRequest (tr_swarm * s, tr_block_index_t block)
1209{
1210  struct weighted_piece * p;
1211  const tr_piece_index_t index = tr_torBlockPiece (s->tor, block);
1212
1213  if (((p = pieceListLookup (s, index))) && (p->requestCount > 0))
1214    {
1215      --p->requestCount;
1216      pieceListResortPiece (s, p);
1217    }
1218}
1219
1220
1221/****
1222*****
1223*****  Replication count (for rarest first policy)
1224*****
1225****/
1226
1227/**
1228 * Increase the replication count of this piece and sort it if the
1229 * piece list is already sorted
1230 */
1231static void
1232tr_incrReplicationOfPiece (tr_swarm * s, const size_t index)
1233{
1234  assert (replicationExists (s));
1235  assert (s->pieceReplicationSize == s->tor->info.pieceCount);
1236
1237  /* One more replication of this piece is present in the swarm */
1238  ++s->pieceReplication[index];
1239
1240  /* we only resort the piece if the list is already sorted */
1241  if (s->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1242    pieceListResortPiece (s, pieceListLookup (s, index));
1243}
1244
1245/**
1246 * Increases the replication count of pieces present in the bitfield
1247 */
1248static void
1249tr_incrReplicationFromBitfield (tr_swarm * s, const tr_bitfield * b)
1250{
1251  size_t i;
1252  uint16_t * rep = s->pieceReplication;
1253  const size_t n = s->tor->info.pieceCount;
1254
1255  assert (replicationExists (s));
1256
1257  for (i=0; i<n; ++i)
1258    if (tr_bitfieldHas (b, i))
1259      ++rep[i];
1260
1261  if (s->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1262    invalidatePieceSorting (s);
1263}
1264
1265/**
1266 * Increase the replication count of every piece
1267 */
1268static void
1269tr_incrReplication (tr_swarm * s)
1270{
1271  int i;
1272  const int n = s->pieceReplicationSize;
1273
1274  assert (replicationExists (s));
1275  assert (s->pieceReplicationSize == s->tor->info.pieceCount);
1276
1277  for (i=0; i<n; ++i)
1278    ++s->pieceReplication[i];
1279}
1280
1281/**
1282 * Decrease the replication count of pieces present in the bitset.
1283 */
1284static void
1285tr_decrReplicationFromBitfield (tr_swarm * s, const tr_bitfield * b)
1286{
1287  int i;
1288  const int n = s->pieceReplicationSize;
1289
1290  assert (replicationExists (s));
1291  assert (s->pieceReplicationSize == s->tor->info.pieceCount);
1292
1293  if (tr_bitfieldHasAll (b))
1294    {
1295      for (i=0; i<n; ++i)
1296        --s->pieceReplication[i];
1297    }
1298  else if (!tr_bitfieldHasNone (b))
1299    {
1300      for (i=0; i<n; ++i)
1301        if (tr_bitfieldHas (b, i))
1302          --s->pieceReplication[i];
1303
1304      if (s->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1305        invalidatePieceSorting (s);
1306    }
1307}
1308
1309/**
1310***
1311**/
1312
1313void
1314tr_peerMgrRebuildRequests (tr_torrent * tor)
1315{
1316  assert (tr_isTorrent (tor));
1317
1318  pieceListRebuild (tor->swarm);
1319}
1320
1321void
1322tr_peerMgrGetNextRequests (tr_torrent           * tor,
1323                           tr_peer              * peer,
1324                           int                    numwant,
1325                           tr_block_index_t     * setme,
1326                           int                  * numgot,
1327                           bool                   get_intervals)
1328{
1329  int i;
1330  int got;
1331  tr_swarm * s;
1332  struct weighted_piece * pieces;
1333  const tr_bitfield * const have = &peer->have;
1334
1335  /* sanity clause */
1336  assert (tr_isTorrent (tor));
1337  assert (numwant > 0);
1338
1339  /* walk through the pieces and find blocks that should be requested */
1340  got = 0;
1341  s = tor->swarm;
1342
1343  /* prep the pieces list */
1344  if (s->pieces == NULL)
1345    pieceListRebuild (s);
1346
1347  if (s->pieceSortState != PIECES_SORTED_BY_WEIGHT)
1348    pieceListSort (s, PIECES_SORTED_BY_WEIGHT);
1349
1350  assertReplicationCountIsExact (s);
1351  assertWeightedPiecesAreSorted (s);
1352
1353  updateEndgame (s);
1354  pieces = s->pieces;
1355  for (i=0; i<s->pieceCount && got<numwant; ++i)
1356    {
1357      struct weighted_piece * p = pieces + i;
1358
1359      /* if the peer has this piece that we want... */
1360      if (tr_bitfieldHas (have, p->index))
1361        {
1362          tr_block_index_t b;
1363          tr_block_index_t first;
1364          tr_block_index_t last;
1365          tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1366
1367          tr_torGetPieceBlockRange (tor, p->index, &first, &last);
1368
1369          for (b=first; b<=last && (got<numwant || (get_intervals && setme[2*got-1] == b-1)); ++b)
1370            {
1371              int peerCount;
1372              tr_peer ** peers;
1373
1374              /* don't request blocks we've already got */
1375              if (tr_torrentBlockIsComplete (tor, b))
1376                continue;
1377
1378              /* always add peer if this block has no peers yet */
1379              tr_ptrArrayClear (&peerArr);
1380              getBlockRequestPeers (s, b, &peerArr);
1381              peers = (tr_peer **) tr_ptrArrayPeek (&peerArr, &peerCount);
1382              if (peerCount != 0)
1383                {
1384                  /* don't make a second block request until the endgame */
1385                  if (!s->endgame)
1386                    continue;
1387
1388                  /* don't have more than two peers requesting this block */
1389                  if (peerCount > 1)
1390                    continue;
1391
1392                  /* don't send the same request to the same peer twice */
1393                  if (peer == peers[0])
1394                    continue;
1395
1396                  /* in the endgame allow an additional peer to download a
1397                     block but only if the peer seems to be handling requests
1398                     relatively fast */
1399                  if (peer->pendingReqsToPeer + numwant - got < s->endgame)
1400                    continue;
1401                }
1402
1403              /* update the caller's table */
1404              if (!get_intervals)
1405                {
1406                  setme[got++] = b;
1407                }
1408              /* if intervals are requested two array entries are necessarry:
1409                 one for the interval's starting block and one for its end block */
1410              else if (got && setme[2 * got - 1] == b - 1 && b != first)
1411                {
1412                  /* expand the last interval */
1413                  ++setme[2 * got - 1];
1414                }
1415              else
1416                {
1417                  /* begin a new interval */
1418                  setme[2 * got] = setme[2 * got + 1] = b;
1419                  ++got;
1420                }
1421
1422              /* update our own tables */
1423              requestListAdd (s, b, peer);
1424              ++p->requestCount;
1425            }
1426
1427          tr_ptrArrayDestruct (&peerArr, NULL);
1428        }
1429    }
1430
1431  /* In most cases we've just changed the weights of a small number of pieces.
1432   * So rather than qsort ()ing the entire array, it's faster to apply an
1433   * adaptive insertion sort algorithm. */
1434  if (got > 0)
1435    {
1436      /* not enough requests || last piece modified */
1437      if (i == s->pieceCount)
1438        --i;
1439
1440      setComparePieceByWeightTorrent (s);
1441      while (--i >= 0)
1442        {
1443          bool exact;
1444
1445          /* relative position! */
1446          const int newpos = tr_lowerBound (&s->pieces[i], &s->pieces[i + 1],
1447                                            s->pieceCount - (i + 1),
1448                                            sizeof (struct weighted_piece),
1449                                            comparePieceByWeight, &exact);
1450          if (newpos > 0)
1451            {
1452              const struct weighted_piece piece = s->pieces[i];
1453              memmove (&s->pieces[i],
1454                       &s->pieces[i + 1],
1455                       sizeof (struct weighted_piece) * (newpos));
1456              s->pieces[i + newpos] = piece;
1457            }
1458        }
1459    }
1460
1461  assertWeightedPiecesAreSorted (t);
1462  *numgot = got;
1463}
1464
1465bool
1466tr_peerMgrDidPeerRequest (const tr_torrent  * tor,
1467                          const tr_peer     * peer,
1468                          tr_block_index_t    block)
1469{
1470  return requestListLookup ((tr_swarm*)tor->swarm, block, peer) != NULL;
1471}
1472
1473/* cancel requests that are too old */
1474static void
1475refillUpkeep (evutil_socket_t foo UNUSED, short bar UNUSED, void * vmgr)
1476{
1477    time_t now;
1478    time_t too_old;
1479    tr_torrent * tor;
1480    int cancel_buflen = 0;
1481    struct block_request * cancel = NULL;
1482    tr_peerMgr * mgr = vmgr;
1483    managerLock (mgr);
1484
1485    now = tr_time ();
1486    too_old = now - REQUEST_TTL_SECS;
1487
1488    /* alloc the temporary "cancel" buffer */
1489    tor = NULL;
1490    while ((tor = tr_torrentNext (mgr->session, tor)))
1491        cancel_buflen = MAX (cancel_buflen, tor->swarm->requestCount);
1492    if (cancel_buflen > 0)
1493        cancel = tr_new (struct block_request, cancel_buflen);
1494
1495    /* prune requests that are too old */
1496    tor = NULL;
1497    while ((tor = tr_torrentNext (mgr->session, tor)))
1498    {
1499        tr_swarm * s = tor->swarm;
1500        const int n = s->requestCount;
1501        if (n > 0)
1502        {
1503            int keepCount = 0;
1504            int cancelCount = 0;
1505            const struct block_request * it;
1506            const struct block_request * end;
1507
1508            for (it=s->requests, end=it+n; it!=end; ++it)
1509            {
1510                tr_peerMsgs * msgs = PEER_MSGS(it->peer);
1511
1512                if ((msgs !=NULL) && (it->sentAt <= too_old) && !tr_peerMsgsIsReadingBlock (msgs, it->block))
1513                    cancel[cancelCount++] = *it;
1514                else
1515                {
1516                    if (it != &s->requests[keepCount])
1517                        s->requests[keepCount] = *it;
1518                    keepCount++;
1519                }
1520            }
1521
1522            /* prune out the ones we aren't keeping */
1523            s->requestCount = keepCount;
1524
1525            /* send cancel messages for all the "cancel" ones */
1526            for (it=cancel, end=it+cancelCount; it!=end; ++it)
1527            {
1528              tr_peerMsgs * msgs = PEER_MSGS(it->peer);
1529
1530              if (msgs != NULL)
1531                {
1532                  tr_historyAdd (&it->peer->cancelsSentToPeer, now, 1);
1533                  tr_peerMsgsCancel (msgs, it->block);
1534                  decrementPendingReqCount (it);
1535                }
1536            }
1537
1538            /* decrement the pending request counts for the timed-out blocks */
1539            for (it=cancel, end=it+cancelCount; it!=end; ++it)
1540                pieceListRemoveRequest (s, it->block);
1541        }
1542    }
1543
1544    tr_free (cancel);
1545    tr_timerAddMsec (mgr->refillUpkeepTimer, REFILL_UPKEEP_PERIOD_MSEC);
1546    managerUnlock (mgr);
1547}
1548
1549static void
1550addStrike (tr_swarm * s, tr_peer * peer)
1551{
1552  tordbg (s, "increasing peer %s strike count to %d",
1553          tr_atomAddrStr (peer->atom), peer->strikes + 1);
1554
1555  if (++peer->strikes >= MAX_BAD_PIECES_PER_PEER)
1556    {
1557      struct peer_atom * atom = peer->atom;
1558      atom->flags2 |= MYFLAG_BANNED;
1559      peer->doPurge = true;
1560      tordbg (s, "banning peer %s", tr_atomAddrStr (atom));
1561    }
1562}
1563
1564static void
1565peerSuggestedPiece (tr_swarm           * s UNUSED,
1566                    tr_peer            * peer UNUSED,
1567                    tr_piece_index_t     pieceIndex UNUSED,
1568                    int                  isFastAllowed UNUSED)
1569{
1570#if 0
1571    assert (t);
1572    assert (peer);
1573    assert (peer->msgs);
1574
1575    /* is this a valid piece? */
1576    if (pieceIndex >= t->tor->info.pieceCount)
1577        return;
1578
1579    /* don't ask for it if we've already got it */
1580    if (tr_torrentPieceIsComplete (t->tor, pieceIndex))
1581        return;
1582
1583    /* don't ask for it if they don't have it */
1584    if (!tr_bitfieldHas (peer->have, pieceIndex))
1585        return;
1586
1587    /* don't ask for it if we're choked and it's not fast */
1588    if (!isFastAllowed && peer->clientIsChoked)
1589        return;
1590
1591    /* request the blocks that we don't have in this piece */
1592    {
1593        tr_block_index_t b;
1594        tr_block_index_t first;
1595        tr_block_index_t last;
1596        const tr_torrent * tor = t->tor;
1597
1598        tr_torGetPieceBlockRange (t->tor, pieceIndex, &first, &last);
1599
1600        for (b=first; b<=last; ++b)
1601        {
1602            if (tr_torrentBlockIsComplete (tor, b))
1603            {
1604                const uint32_t offset = getBlockOffsetInPiece (tor, b);
1605                const uint32_t length = tr_torBlockCountBytes (tor, b);
1606                tr_peerMsgsAddRequest (peer->msgs, pieceIndex, offset, length);
1607                incrementPieceRequests (t, pieceIndex);
1608            }
1609        }
1610    }
1611#endif
1612}
1613
1614static void
1615removeRequestFromTables (tr_swarm * s, tr_block_index_t block, const tr_peer * peer)
1616{
1617  requestListRemove (s, block, peer);
1618  pieceListRemoveRequest (s, block);
1619}
1620
1621/* peer choked us, or maybe it disconnected.
1622   either way we need to remove all its requests */
1623static void
1624peerDeclinedAllRequests (tr_swarm * s, const tr_peer * peer)
1625{
1626  int i, n;
1627  tr_block_index_t * blocks = tr_new (tr_block_index_t, s->requestCount);
1628
1629  for (i=n=0; i<s->requestCount; ++i)
1630    if (peer == s->requests[i].peer)
1631      blocks[n++] = s->requests[i].block;
1632
1633  for (i=0; i<n; ++i)
1634    removeRequestFromTables (s, blocks[i], peer);
1635
1636  tr_free (blocks);
1637}
1638
1639static void
1640cancelAllRequestsForBlock (tr_swarm          * s,
1641                           tr_block_index_t    block,
1642                           tr_peer           * no_notify)
1643{
1644  int i;
1645  int peerCount;
1646  tr_peer ** peers;
1647  tr_ptrArray peerArr;
1648
1649  peerArr = TR_PTR_ARRAY_INIT;
1650  getBlockRequestPeers (s, block, &peerArr);
1651  peers = (tr_peer **) tr_ptrArrayPeek (&peerArr, &peerCount);
1652  for (i=0; i<peerCount; ++i)
1653    {
1654      tr_peer * p = peers[i];
1655
1656      if ((p != no_notify) && tr_isPeerMsgs (p))
1657        {
1658          tr_historyAdd (&p->cancelsSentToPeer, tr_time (), 1);
1659          tr_peerMsgsCancel (PEER_MSGS(p), block);
1660        }
1661
1662      removeRequestFromTables (s, block, p);
1663    }
1664
1665  tr_ptrArrayDestruct (&peerArr, NULL);
1666}
1667
1668void
1669tr_peerMgrPieceCompleted (tr_torrent * tor, tr_piece_index_t p)
1670{
1671  int i;
1672  bool pieceCameFromPeers = false;
1673  tr_swarm * const s = tor->swarm;
1674  const int n = tr_ptrArraySize (&s->peers);
1675
1676  /* walk through our peers */
1677  for (i=0; i<n; ++i)
1678    {
1679      tr_peer * peer = tr_ptrArrayNth (&s->peers, i);
1680
1681      /* notify the peer that we now have this piece */
1682      tr_peerMsgsHave (PEER_MSGS(peer), p);
1683
1684      if (!pieceCameFromPeers)
1685        pieceCameFromPeers = tr_bitfieldHas (&peer->blame, p);
1686    }
1687
1688  if (pieceCameFromPeers) /* webseed downloads don't belong in announce totals */
1689    tr_announcerAddBytes (tor, TR_ANN_DOWN, tr_torPieceCountBytes (tor, p));
1690
1691  /* bookkeeping */
1692  pieceListRemovePiece (s, p);
1693  s->needsCompletenessCheck = true;
1694}
1695
1696static void
1697peerCallbackFunc (tr_peer * peer, const tr_peer_event * e, void * vs)
1698{
1699  tr_swarm * s = vs;
1700
1701  swarmLock (s);
1702
1703  assert (peer != NULL);
1704
1705  switch (e->eventType)
1706    {
1707      case TR_PEER_PEER_GOT_PIECE_DATA:
1708        {
1709          const time_t now = tr_time ();
1710          tr_torrent * tor = s->tor;
1711
1712          tor->uploadedCur += e->length;
1713          tr_announcerAddBytes (tor, TR_ANN_UP, e->length);
1714          tr_torrentSetActivityDate (tor, now);
1715          tr_torrentSetDirty (tor);
1716          tr_statsAddUploaded (tor->session, e->length);
1717
1718          if (peer->atom != NULL)
1719            peer->atom->piece_data_time = now;
1720
1721          break;
1722        }
1723
1724      case TR_PEER_CLIENT_GOT_PIECE_DATA:
1725        {
1726          const time_t now = tr_time ();
1727          tr_torrent * tor = s->tor;
1728
1729          tor->downloadedCur += e->length;
1730          tr_torrentSetActivityDate (tor, now);
1731          tr_torrentSetDirty (tor);
1732
1733          tr_statsAddDownloaded (tor->session, e->length);
1734
1735          if (peer->atom != NULL)
1736            peer->atom->piece_data_time = now;
1737
1738          break;
1739        }
1740
1741      case TR_PEER_CLIENT_GOT_HAVE:
1742        if (replicationExists (s))
1743          {
1744            tr_incrReplicationOfPiece (s, e->pieceIndex);
1745            assertReplicationCountIsExact (s);
1746          }
1747        break;
1748
1749      case TR_PEER_CLIENT_GOT_HAVE_ALL:
1750        if (replicationExists (s))
1751          {
1752            tr_incrReplication (s);
1753            assertReplicationCountIsExact (s);
1754          }
1755        break;
1756
1757      case TR_PEER_CLIENT_GOT_HAVE_NONE:
1758        /* noop */
1759        break;
1760
1761      case TR_PEER_CLIENT_GOT_BITFIELD:
1762        assert (e->bitfield != NULL);
1763        if (replicationExists (s))
1764          {
1765            tr_incrReplicationFromBitfield (s, e->bitfield);
1766            assertReplicationCountIsExact (s);
1767          }
1768        break;
1769
1770      case TR_PEER_CLIENT_GOT_REJ:
1771        {
1772          tr_block_index_t b = _tr_block (s->tor, e->pieceIndex, e->offset);
1773          if (b < s->tor->blockCount)
1774            removeRequestFromTables (s, b, peer);
1775          else
1776            tordbg (s, "Peer %s sent an out-of-range reject message",
1777                    tr_atomAddrStr (peer->atom));
1778          break;
1779        }
1780
1781      case TR_PEER_CLIENT_GOT_CHOKE:
1782        peerDeclinedAllRequests (s, peer);
1783        break;
1784
1785      case TR_PEER_CLIENT_GOT_PORT:
1786        if (peer->atom)
1787          peer->atom->port = e->port;
1788        break;
1789
1790      case TR_PEER_CLIENT_GOT_SUGGEST:
1791        peerSuggestedPiece (s, peer, e->pieceIndex, false);
1792        break;
1793
1794      case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1795        peerSuggestedPiece (s, peer, e->pieceIndex, true);
1796        break;
1797
1798      case TR_PEER_CLIENT_GOT_BLOCK:
1799        {
1800          tr_torrent * tor = s->tor;
1801          const tr_piece_index_t p = e->pieceIndex;
1802          const tr_block_index_t block = _tr_block (tor, p, e->offset);
1803          cancelAllRequestsForBlock (s, block, peer);
1804          tr_historyAdd (&peer->blocksSentToClient, tr_time(), 1);
1805          pieceListResortPiece (s, pieceListLookup (s, p));
1806          tr_torrentGotBlock (tor, block);
1807          break;
1808        }
1809
1810      case TR_PEER_ERROR:
1811        if ((e->err == ERANGE) || (e->err == EMSGSIZE) || (e->err == ENOTCONN))
1812          {
1813            /* some protocol error from the peer */
1814            peer->doPurge = true;
1815            tordbg (s, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1816                    tr_atomAddrStr (peer->atom));
1817          }
1818        else
1819          {
1820            tordbg (s, "unhandled error: %s", tr_strerror (e->err));
1821          }
1822        break;
1823
1824      default:
1825        assert (0);
1826    }
1827
1828  swarmUnlock (s);
1829}
1830
1831static int
1832getDefaultShelfLife (uint8_t from)
1833{
1834    /* in general, peers obtained from firsthand contact
1835     * are better than those from secondhand, etc etc */
1836    switch (from)
1837    {
1838        case TR_PEER_FROM_INCOMING : return 60 * 60 * 6;
1839        case TR_PEER_FROM_LTEP     : return 60 * 60 * 6;
1840        case TR_PEER_FROM_TRACKER  : return 60 * 60 * 3;
1841        case TR_PEER_FROM_DHT      : return 60 * 60 * 3;
1842        case TR_PEER_FROM_PEX      : return 60 * 60 * 2;
1843        case TR_PEER_FROM_RESUME   : return 60 * 60;
1844        case TR_PEER_FROM_LPD      : return 10 * 60;
1845        default                    : return 60 * 60;
1846    }
1847}
1848
1849static void
1850ensureAtomExists (tr_swarm          * s,
1851                  const tr_address  * addr,
1852                  const tr_port       port,
1853                  const uint8_t       flags,
1854                  const int8_t        seedProbability,
1855                  const uint8_t       from)
1856{
1857  struct peer_atom * a;
1858
1859  assert (tr_address_is_valid (addr));
1860  assert (from < TR_PEER_FROM__MAX);
1861
1862  a = getExistingAtom (s, addr);
1863
1864  if (a == NULL)
1865    {
1866      const int jitter = tr_rand_int_weak (60*10);
1867      a = tr_new0 (struct peer_atom, 1);
1868      a->addr = *addr;
1869      a->port = port;
1870      a->flags = flags;
1871      a->fromFirst = from;
1872      a->fromBest = from;
1873      a->shelf_date = tr_time () + getDefaultShelfLife (from) + jitter;
1874      a->blocklisted = -1;
1875      atomSetSeedProbability (a, seedProbability);
1876      tr_ptrArrayInsertSorted (&s->pool, a, compareAtomsByAddress);
1877
1878      tordbg (s, "got a new atom: %s", tr_atomAddrStr (a));
1879    }
1880  else
1881    {
1882      if (from < a->fromBest)
1883        a->fromBest = from;
1884
1885      if (a->seedProbability == -1)
1886        atomSetSeedProbability (a, seedProbability);
1887
1888      a->flags |= flags;
1889    }
1890}
1891
1892static int
1893getMaxPeerCount (const tr_torrent * tor)
1894{
1895  return tor->maxConnectedPeers;
1896}
1897
1898static int
1899getPeerCount (const tr_swarm * s)
1900{
1901  return tr_ptrArraySize (&s->peers);/* + tr_ptrArraySize (&t->outgoingHandshakes); */
1902}
1903
1904
1905static void
1906createBitTorrentPeer (tr_torrent       * tor,
1907                      struct tr_peerIo * io,
1908                      struct peer_atom * atom,
1909                      tr_quark           client)
1910{
1911  tr_peer * peer;
1912  tr_peerMsgs * msgs;
1913  tr_swarm * swarm;
1914
1915  assert (atom != NULL);
1916  assert (tr_isTorrent (tor));
1917  assert (tor->swarm != NULL);
1918
1919  swarm = tor->swarm;
1920
1921  peer = (tr_peer*) tr_peerMsgsNew (tor, io, peerCallbackFunc, swarm);
1922  peer->atom = atom;
1923  peer->client = client;
1924  atom->peer = peer;
1925
1926  tr_ptrArrayInsertSorted (&swarm->peers, peer, peerCompare);
1927  ++swarm->stats.peerCount;
1928  ++swarm->stats.peerFromCount[atom->fromFirst];
1929
1930  assert (swarm->stats.peerCount == tr_ptrArraySize (&swarm->peers));
1931  assert (swarm->stats.peerFromCount[atom->fromFirst] <= swarm->stats.peerCount);
1932
1933  msgs = PEER_MSGS (peer);
1934  tr_peerMsgsUpdateActive (msgs, TR_UP);
1935  tr_peerMsgsUpdateActive (msgs, TR_DOWN);
1936}
1937
1938
1939/* FIXME: this is kind of a mess. */
1940static bool
1941myHandshakeDoneCB (tr_handshake  * handshake,
1942                   tr_peerIo     * io,
1943                   bool            readAnythingFromPeer,
1944                   bool            isConnected,
1945                   const uint8_t * peer_id,
1946                   void          * vmanager)
1947{
1948  bool ok = isConnected;
1949  bool success = false;
1950  tr_port port;
1951  const tr_address * addr;
1952  tr_peerMgr * manager = vmanager;
1953  tr_swarm  * s;
1954
1955  assert (io);
1956  assert (tr_isBool (ok));
1957
1958  s = tr_peerIoHasTorrentHash (io)
1959    ? getExistingSwarm (manager, tr_peerIoGetTorrentHash (io))
1960    : NULL;
1961
1962  if (tr_peerIoIsIncoming (io))
1963    tr_ptrArrayRemoveSortedPointer (&manager->incomingHandshakes,
1964                                    handshake, handshakeCompare);
1965  else if (s)
1966    tr_ptrArrayRemoveSortedPointer (&s->outgoingHandshakes,
1967                                    handshake, handshakeCompare);
1968
1969  if (s)
1970    swarmLock (s);
1971
1972  addr = tr_peerIoGetAddress (io, &port);
1973
1974  if (!ok || !s || !s->isRunning)
1975    {
1976      if (s)
1977        {
1978          struct peer_atom * atom = getExistingAtom (s, addr);
1979          if (atom)
1980            {
1981              ++atom->numFails;
1982
1983              if (!readAnythingFromPeer)
1984                {
1985                  tordbg (s, "marking peer %s as unreachable... numFails is %d", tr_atomAddrStr (atom), (int)atom->numFails);
1986                  atom->flags2 |= MYFLAG_UNREACHABLE;
1987                }
1988            }
1989        }
1990    }
1991  else /* looking good */
1992    {
1993      struct peer_atom * atom;
1994
1995      ensureAtomExists (s, addr, port, 0, -1, TR_PEER_FROM_INCOMING);
1996      atom = getExistingAtom (s, addr);
1997
1998      assert (atom != NULL);
1999
2000      atom->time = tr_time ();
2001      atom->piece_data_time = 0;
2002      atom->lastConnectionAt = tr_time ();
2003
2004      if (!tr_peerIoIsIncoming (io))
2005        {
2006          atom->flags |= ADDED_F_CONNECTABLE;
2007          atom->flags2 &= ~MYFLAG_UNREACHABLE;
2008        }
2009
2010      /* In principle, this flag specifies whether the peer groks uTP,
2011         not whether it's currently connected over uTP. */
2012      if (io->utp_socket)
2013        atom->flags |= ADDED_F_UTP_FLAGS;
2014
2015      if (atom->flags2 & MYFLAG_BANNED)
2016        {
2017          tordbg (s, "banned peer %s tried to reconnect",
2018                  tr_atomAddrStr (atom));
2019        }
2020      else if (tr_peerIoIsIncoming (io) && (getPeerCount (s) >= getMaxPeerCount (s->tor)))
2021        {
2022        }
2023      else
2024        {
2025          tr_peer * peer = atom->peer;
2026
2027          if (peer) /* we already have this peer */
2028            {
2029            }
2030          else
2031            {
2032              tr_quark client;
2033              tr_peerIo * io;
2034              char buf[128];
2035
2036              if (peer_id != NULL)
2037                client = tr_quark_new (tr_clientForId (buf, sizeof (buf), peer_id), -1);
2038              else
2039                client = TR_KEY_NONE;
2040
2041              io = tr_handshakeStealIO (handshake); /* this steals its refcount too, which is
2042                                                       balanced by our unref in peerDelete ()  */
2043              tr_peerIoSetParent (io, &s->tor->bandwidth);
2044              createBitTorrentPeer (s->tor, io, atom, client);
2045
2046              success = true;
2047            }
2048        }
2049    }
2050
2051  if (s != NULL)
2052    swarmUnlock (s);
2053
2054  return success;
2055}
2056
2057void
2058tr_peerMgrAddIncoming (tr_peerMgr       * manager,
2059                       tr_address       * addr,
2060                       tr_port            port,
2061                       tr_socket_t        socket,
2062                       struct UTPSocket * utp_socket)
2063{
2064  tr_session * session;
2065
2066  managerLock (manager);
2067
2068  assert (tr_isSession (manager->session));
2069  session = manager->session;
2070
2071  if (tr_sessionIsAddressBlocked (session, addr))
2072    {
2073      tr_logAddDebug ("Banned IP address \"%s\" tried to connect to us", tr_address_to_string (addr));
2074      if (socket != TR_BAD_SOCKET)
2075        tr_netClose (session, socket);
2076      else
2077        UTP_Close (utp_socket);
2078    }
2079  else if (getExistingHandshake (&manager->incomingHandshakes, addr))
2080    {
2081      if (socket != TR_BAD_SOCKET)
2082        tr_netClose (session, socket);
2083      else
2084        UTP_Close (utp_socket);
2085    }
2086  else /* we don't have a connection to them yet... */
2087    {
2088      tr_peerIo *    io;
2089      tr_handshake * handshake;
2090
2091      io = tr_peerIoNewIncoming (session, &session->bandwidth, addr, port, socket, utp_socket);
2092
2093      handshake = tr_handshakeNew (io,
2094                                   session->encryptionMode,
2095                                   myHandshakeDoneCB,
2096                                   manager);
2097
2098      tr_peerIoUnref (io); /* balanced by the implicit ref in tr_peerIoNewIncoming () */
2099
2100      tr_ptrArrayInsertSorted (&manager->incomingHandshakes, handshake,
2101                               handshakeCompare);
2102    }
2103
2104  managerUnlock (manager);
2105}
2106
2107void
2108tr_peerMgrAddPex (tr_torrent * tor, uint8_t from,
2109                  const tr_pex * pex, int8_t seedProbability)
2110{
2111  if (tr_isPex (pex)) /* safeguard against corrupt data */
2112    {
2113      tr_swarm * s = tor->swarm;
2114      managerLock (s->manager);
2115
2116      if (!tr_sessionIsAddressBlocked (s->manager->session, &pex->addr))
2117        if (tr_address_is_valid_for_peers (&pex->addr, pex->port))
2118          ensureAtomExists (s, &pex->addr, pex->port, pex->flags, seedProbability, from);
2119
2120      managerUnlock (s->manager);
2121    }
2122}
2123
2124void
2125tr_peerMgrMarkAllAsSeeds (tr_torrent * tor)
2126{
2127  tr_swarm * s = tor->swarm;
2128  const int n = tr_ptrArraySize (&s->pool);
2129  struct peer_atom ** it = (struct peer_atom**) tr_ptrArrayBase (&s->pool);
2130  struct peer_atom ** end = it + n;
2131
2132  while (it != end)
2133    atomSetSeed (s, *it++);
2134}
2135
2136tr_pex *
2137tr_peerMgrCompactToPex (const void    * compact,
2138                        size_t          compactLen,
2139                        const uint8_t * added_f,
2140                        size_t          added_f_len,
2141                        size_t        * pexCount)
2142{
2143  size_t i;
2144  size_t n = compactLen / 6;
2145  const uint8_t * walk = compact;
2146  tr_pex * pex = tr_new0 (tr_pex, n);
2147
2148  for (i=0; i<n; ++i)
2149    {
2150      pex[i].addr.type = TR_AF_INET;
2151      memcpy (&pex[i].addr.addr, walk, 4); walk += 4;
2152      memcpy (&pex[i].port, walk, 2); walk += 2;
2153      if (added_f && (n == added_f_len))
2154        pex[i].flags = added_f[i];
2155    }
2156
2157  *pexCount = n;
2158  return pex;
2159}
2160
2161tr_pex *
2162tr_peerMgrCompact6ToPex (const void    * compact,
2163                         size_t          compactLen,
2164                         const uint8_t * added_f,
2165                         size_t          added_f_len,
2166                         size_t        * pexCount)
2167{
2168  size_t i;
2169  size_t n = compactLen / 18;
2170  const uint8_t * walk = compact;
2171  tr_pex * pex = tr_new0 (tr_pex, n);
2172
2173  for (i=0; i<n; ++i)
2174    {
2175      pex[i].addr.type = TR_AF_INET6;
2176      memcpy (&pex[i].addr.addr.addr6.s6_addr, walk, 16); walk += 16;
2177      memcpy (&pex[i].port, walk, 2); walk += 2;
2178      if (added_f && (n == added_f_len))
2179        pex[i].flags = added_f[i];
2180    }
2181
2182  *pexCount = n;
2183  return pex;
2184}
2185
2186tr_pex *
2187tr_peerMgrArrayToPex (const void  * array,
2188                      size_t        arrayLen,
2189                      size_t      * pexCount)
2190{
2191  size_t i;
2192  size_t n = arrayLen / (sizeof (tr_address) + 2);
2193  const uint8_t * walk = array;
2194  tr_pex * pex = tr_new0 (tr_pex, n);
2195
2196  for (i=0 ; i<n ; ++i)
2197    {
2198      memcpy (&pex[i].addr, walk, sizeof (tr_address));
2199      memcpy (&pex[i].port, walk + sizeof (tr_address), 2);
2200      pex[i].flags = 0x00;
2201      walk += sizeof (tr_address) + 2;
2202    }
2203
2204  *pexCount = n;
2205  return pex;
2206}
2207
2208/**
2209***
2210**/
2211
2212void
2213tr_peerMgrGotBadPiece (tr_torrent * tor, tr_piece_index_t pieceIndex)
2214{
2215  int i;
2216  int n;
2217  tr_swarm * s = tor->swarm;
2218  const uint32_t byteCount = tr_torPieceCountBytes (tor, pieceIndex);
2219
2220  for (i=0, n=tr_ptrArraySize(&s->peers); i!=n; ++i)
2221    {
2222      tr_peer * peer = tr_ptrArrayNth (&s->peers, i);
2223
2224      if (tr_bitfieldHas (&peer->blame, pieceIndex))
2225        {
2226          tordbg (s, "peer %s contributed to corrupt piece (%d); now has %d strikes",
2227                  tr_atomAddrStr(peer->atom), pieceIndex, (int)peer->strikes + 1);
2228          addStrike (s, peer);
2229        }
2230    }
2231
2232
2233  tr_announcerAddBytes (tor, TR_ANN_CORRUPT, byteCount);
2234}
2235
2236int
2237tr_pexCompare (const void * va, const void * vb)
2238{
2239  int i;
2240  const tr_pex * a = va;
2241  const tr_pex * b = vb;
2242
2243  assert (tr_isPex (a));
2244  assert (tr_isPex (b));
2245
2246  if ((i = tr_address_compare (&a->addr, &b->addr)))
2247    return i;
2248
2249  if (a->port != b->port)
2250    return a->port < b->port ? -1 : 1;
2251
2252  return 0;
2253}
2254
2255/* better goes first */
2256static int
2257compareAtomsByUsefulness (const void * va, const void *vb)
2258{
2259  const struct peer_atom * a = * (const struct peer_atom**) va;
2260  const struct peer_atom * b = * (const struct peer_atom**) vb;
2261
2262  assert (tr_isAtom (a));
2263  assert (tr_isAtom (b));
2264
2265  if (a->piece_data_time != b->piece_data_time)
2266    return a->piece_data_time > b->piece_data_time ? -1 : 1;
2267  if (a->fromBest != b->fromBest)
2268    return a->fromBest < b->fromBest ? -1 : 1;
2269  if (a->numFails != b->numFails)
2270    return a->numFails < b->numFails ? -1 : 1;
2271
2272  return 0;
2273}
2274
2275static bool
2276isAtomInteresting (const tr_torrent * tor, struct peer_atom * atom)
2277{
2278  if (tr_torrentIsSeed (tor) && atomIsSeed (atom))
2279    return false;
2280
2281  if (peerIsInUse (tor->swarm, atom))
2282    return true;
2283
2284  if (isAtomBlocklisted (tor->session, atom))
2285    return false;
2286
2287  if (atom->flags2 & MYFLAG_BANNED)
2288    return false;
2289
2290  return true;
2291}
2292
2293int
2294tr_peerMgrGetPeers (tr_torrent   * tor,
2295                    tr_pex      ** setme_pex,
2296                    uint8_t        af,
2297                    uint8_t        list_mode,
2298                    int            maxCount)
2299{
2300  int i;
2301  int n;
2302  int count = 0;
2303  int atomCount = 0;
2304  const tr_swarm * s = tor->swarm;
2305  struct peer_atom ** atoms = NULL;
2306  tr_pex * pex;
2307  tr_pex * walk;
2308
2309  assert (tr_isTorrent (tor));
2310  assert (setme_pex != NULL);
2311  assert (af==TR_AF_INET || af==TR_AF_INET6);
2312  assert (list_mode==TR_PEERS_CONNECTED || list_mode==TR_PEERS_INTERESTING);
2313
2314  managerLock (s->manager);
2315
2316  /**
2317  ***  build a list of atoms
2318  **/
2319
2320  if (list_mode == TR_PEERS_CONNECTED) /* connected peers only */
2321    {
2322      int i;
2323      const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase (&s->peers);
2324      atomCount = tr_ptrArraySize (&s->peers);
2325      atoms = tr_new (struct peer_atom *, atomCount);
2326      for (i=0; i<atomCount; ++i)
2327        atoms[i] = peers[i]->atom;
2328    }
2329  else /* TR_PEERS_INTERESTING */
2330    {
2331      int i;
2332      struct peer_atom ** atomBase = (struct peer_atom**) tr_ptrArrayBase (&s->pool);
2333      n = tr_ptrArraySize (&s->pool);
2334      atoms = tr_new (struct peer_atom *, n);
2335      for (i=0; i<n; ++i)
2336        if (isAtomInteresting (tor, atomBase[i]))
2337          atoms[atomCount++] = atomBase[i];
2338    }
2339
2340  qsort (atoms, atomCount, sizeof (struct peer_atom *), compareAtomsByUsefulness);
2341
2342  /**
2343  ***  add the first N of them into our return list
2344  **/
2345
2346  n = MIN (atomCount, maxCount);
2347  pex = walk = tr_new0 (tr_pex, n);
2348
2349  for (i=0; i<atomCount && count<n; ++i)
2350    {
2351      const struct peer_atom * atom = atoms[i];
2352      if (atom->addr.type == af)
2353        {
2354          assert (tr_address_is_valid (&atom->addr));
2355          walk->addr = atom->addr;
2356          walk->port = atom->port;
2357          walk->flags = atom->flags;
2358          ++count;
2359          ++walk;
2360        }
2361    }
2362
2363  qsort (pex, count, sizeof (tr_pex), tr_pexCompare);
2364
2365  assert ((walk - pex) == count);
2366  *setme_pex = pex;
2367
2368  /* cleanup */
2369  tr_free (atoms);
2370  managerUnlock (s->manager);
2371  return count;
2372}
2373
2374static void atomPulse      (evutil_socket_t, short, void *);
2375static void bandwidthPulse (evutil_socket_t, short, void *);
2376static void rechokePulse   (evutil_socket_t, short, void *);
2377static void reconnectPulse (evutil_socket_t, short, void *);
2378
2379static struct event *
2380createTimer (tr_session * session, int msec, event_callback_fn callback, void * cbdata)
2381{
2382  struct event * timer = evtimer_new (session->event_base, callback, cbdata);
2383  tr_timerAddMsec (timer, msec);
2384  return timer;
2385}
2386
2387static void
2388ensureMgrTimersExist (struct tr_peerMgr * m)
2389{
2390  if (m->atomTimer == NULL)
2391    m->atomTimer = createTimer (m->session, ATOM_PERIOD_MSEC, atomPulse, m);
2392
2393  if (m->bandwidthTimer == NULL)
2394    m->bandwidthTimer = createTimer (m->session, BANDWIDTH_PERIOD_MSEC, bandwidthPulse, m);
2395
2396  if (m->rechokeTimer == NULL)
2397    m->rechokeTimer = createTimer (m->session, RECHOKE_PERIOD_MSEC, rechokePulse, m);
2398
2399  if (m->refillUpkeepTimer == NULL)
2400    m->refillUpkeepTimer = createTimer (m->session, REFILL_UPKEEP_PERIOD_MSEC, refillUpkeep, m);
2401}
2402
2403void
2404tr_peerMgrStartTorrent (tr_torrent * tor)
2405{
2406  tr_swarm * s;
2407
2408  assert (tr_isTorrent (tor));
2409  assert (tr_torrentIsLocked (tor));
2410
2411  s = tor->swarm;
2412  ensureMgrTimersExist (s->manager);
2413
2414  s->isRunning = true;
2415  s->maxPeers = tor->maxConnectedPeers;
2416  s->pieceSortState = PIECES_UNSORTED;
2417
2418  rechokePulse (0, 0, s->manager);
2419}
2420
2421static void removeAllPeers (tr_swarm *);
2422
2423static void
2424stopSwarm (tr_swarm * swarm)
2425{
2426  swarm->isRunning = false;
2427
2428  replicationFree (swarm);
2429  invalidatePieceSorting (swarm);
2430
2431  removeAllPeers (swarm);
2432
2433  /* disconnect the handshakes. handshakeAbort calls handshakeDoneCB (),
2434   * which removes the handshake from t->outgoingHandshakes... */
2435  while (!tr_ptrArrayEmpty (&swarm->outgoingHandshakes))
2436    tr_handshakeAbort (tr_ptrArrayNth (&swarm->outgoingHandshakes, 0));
2437}
2438
2439void
2440tr_peerMgrStopTorrent (tr_torrent * tor)
2441{
2442  assert (tr_isTorrent (tor));
2443  assert (tr_torrentIsLocked (tor));
2444
2445  stopSwarm (tor->swarm);
2446}
2447
2448void
2449tr_peerMgrAddTorrent (tr_peerMgr * manager, tr_torrent * tor)
2450{
2451  assert (tr_isTorrent (tor));
2452  assert (tr_torrentIsLocked (tor));
2453  assert (tor->swarm == NULL);
2454
2455  tor->swarm = swarmNew (manager, tor);
2456}
2457
2458void
2459tr_peerMgrRemoveTorrent (tr_torrent * tor)
2460{
2461  assert (tr_isTorrent (tor));
2462  assert (tr_torrentIsLocked (tor));
2463
2464  stopSwarm (tor->swarm);
2465  swarmFree (tor->swarm);
2466}
2467
2468void
2469tr_peerUpdateProgress (tr_torrent * tor, tr_peer * peer)
2470{
2471  const tr_bitfield * have = &peer->have;
2472
2473  if (tr_bitfieldHasAll (have))
2474    {
2475      peer->progress = 1.0;
2476    }
2477  else if (tr_bitfieldHasNone (have))
2478    {
2479      peer->progress = 0.0;
2480    }
2481  else
2482    {
2483      const float true_count = tr_bitfieldCountTrueBits (have);
2484
2485      if (tr_torrentHasMetadata (tor))
2486        {
2487          peer->progress = true_count / tor->info.pieceCount;
2488        }
2489      else /* without pieceCount, this result is only a best guess... */
2490        {
2491          peer->progress = true_count / (have->bit_count + 1);
2492        }
2493    }
2494
2495  /* clamp the progress range */
2496  if (peer->progress < 0.0)
2497    peer->progress = 0.0;
2498  if (peer->progress > 1.0)
2499    peer->progress = 1.0;
2500
2501  if (peer->atom && (peer->progress >= 1.0))
2502    atomSetSeed (tor->swarm, peer->atom);
2503}
2504
2505void
2506tr_peerMgrOnTorrentGotMetainfo (tr_torrent * tor)
2507{
2508  int i;
2509  int peerCount;
2510  tr_peer ** peers;
2511
2512  /* the webseed list may have changed... */
2513  rebuildWebseedArray (tor->swarm, tor);
2514
2515  /* some peer_msgs' progress fields may not be accurate if we
2516     didn't have the metadata before now... so refresh them all... */
2517  peerCount = tr_ptrArraySize (&tor->swarm->peers);
2518  peers = (tr_peer**) tr_ptrArrayBase (&tor->swarm->peers);
2519  for (i=0; i<peerCount; ++i)
2520    tr_peerUpdateProgress (tor, peers[i]);
2521
2522  /* update the bittorrent peers' willingnes... */
2523  for (i=0; i<peerCount; ++i)
2524    {
2525      tr_peerMsgsUpdateActive (tr_peerMsgsCast(peers[i]), TR_UP);
2526      tr_peerMsgsUpdateActive (tr_peerMsgsCast(peers[i]), TR_DOWN);
2527    }
2528}
2529
2530void
2531tr_peerMgrTorrentAvailability (const tr_torrent  * tor,
2532                               int8_t            * tab,
2533                               unsigned int        tabCount)
2534{
2535  assert (tr_isTorrent (tor));
2536  assert (tab != NULL);
2537  assert (tabCount > 0);
2538
2539  memset (tab, 0, tabCount);
2540
2541  if (tr_torrentHasMetadata (tor))
2542    {
2543      tr_piece_index_t i;
2544      const int peerCount = tr_ptrArraySize (&tor->swarm->peers);
2545      const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase (&tor->swarm->peers);
2546      const float interval = tor->info.pieceCount / (float)tabCount;
2547      const bool isSeed = tr_torrentGetCompleteness (tor) == TR_SEED;
2548
2549      for (i=0; i<tabCount; ++i)
2550        {
2551          const int piece = i * interval;
2552
2553          if (isSeed || tr_torrentPieceIsComplete (tor, piece))
2554            {
2555              tab[i] = -1;
2556            }
2557          else if (peerCount)
2558            {
2559              int j;
2560              for (j=0; j<peerCount; ++j)
2561                if (tr_bitfieldHas (&peers[j]->have, piece))
2562                  ++tab[i];
2563            }
2564        }
2565    }
2566}
2567
2568void
2569tr_swarmGetStats (const tr_swarm * swarm, tr_swarm_stats * setme)
2570{
2571  assert (swarm != NULL);
2572  assert (setme != NULL);
2573
2574  *setme = swarm->stats;
2575}
2576
2577void
2578tr_swarmIncrementActivePeers (tr_swarm * swarm, tr_direction direction, bool is_active)
2579{
2580  int n = swarm->stats.activePeerCount[direction];
2581
2582  if (is_active)
2583    ++n;
2584  else
2585    --n;
2586
2587  assert (0 <= n);
2588  assert (n <= swarm->stats.peerCount);
2589
2590  swarm->stats.activePeerCount[direction] = n;
2591}
2592
2593bool
2594tr_peerIsSeed (const tr_peer * peer)
2595{
2596  if (peer->progress >= 1.0)
2597    return true;
2598
2599  if (peer->atom && atomIsSeed (peer->atom))
2600    return true;
2601
2602  return false;
2603}
2604
2605/* count how many bytes we want that connected peers have */
2606uint64_t
2607tr_peerMgrGetDesiredAvailable (const tr_torrent * tor)
2608{
2609  size_t i;
2610  size_t n;
2611  uint64_t desiredAvailable;
2612  const tr_swarm * s;
2613
2614  assert (tr_isTorrent (tor));
2615
2616  /* common shortcuts... */
2617
2618  if (!tor->isRunning || tor->isStopping)
2619    return 0;
2620
2621  if (tr_torrentIsSeed (tor))
2622    return 0;
2623
2624  if (!tr_torrentHasMetadata (tor))
2625    return 0;
2626
2627  s = tor->swarm;
2628  if (s == NULL)
2629    return 0;
2630
2631  n = tr_ptrArraySize (&s->peers);
2632  if (n == 0)
2633    {
2634      return 0;
2635    }
2636  else
2637    {
2638      const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase (&s->peers);
2639      for (i=0; i<n; ++i)
2640        if (peers[i]->atom && atomIsSeed (peers[i]->atom))
2641          return tr_torrentGetLeftUntilDone (tor);
2642    }
2643
2644  if (!s->pieceReplication || !s->pieceReplicationSize)
2645    return 0;
2646
2647  /* do it the hard way */
2648
2649  desiredAvailable = 0;
2650  for (i=0, n=MIN (tor->info.pieceCount, s->pieceReplicationSize); i<n; ++i)
2651    if (!tor->info.pieces[i].dnd && (s->pieceReplication[i] > 0))
2652      desiredAvailable += tr_torrentMissingBytesInPiece (tor, i);
2653
2654  assert (desiredAvailable <= tor->info.totalSize);
2655  return desiredAvailable;
2656}
2657
2658double*
2659tr_peerMgrWebSpeeds_KBps (const tr_torrent * tor)
2660{
2661  unsigned int i;
2662  tr_swarm * s;
2663  unsigned int n;
2664  double * ret = NULL;
2665  const uint64_t now = tr_time_msec ();
2666
2667  assert (tr_isTorrent (tor));
2668
2669  s = tor->swarm;
2670  n = tr_ptrArraySize (&s->webseeds);
2671  ret = tr_new0 (double, n);
2672
2673  assert (s->manager != NULL);
2674  assert (n == tor->info.webseedCount);
2675
2676  for (i=0; i<n; ++i)
2677    {
2678      unsigned int Bps = 0;
2679      if (tr_peerIsTransferringPieces (tr_ptrArrayNth(&s->webseeds,i), now, TR_DOWN, &Bps))
2680        ret[i] = Bps / (double)tr_speed_K;
2681      else
2682        ret[i] = -1.0;
2683    }
2684
2685  return ret;
2686}
2687
2688struct tr_peer_stat *
2689tr_peerMgrPeerStats (const tr_torrent * tor, int * setmeCount)
2690{
2691  int i;
2692  int size = 0;
2693  tr_peer_stat * ret;
2694  const tr_swarm * s;
2695  tr_peer ** peers;
2696  const time_t now = tr_time ();
2697  const uint64_t now_msec = tr_time_msec ();
2698
2699  assert (tr_isTorrent (tor));
2700  assert (tor->swarm->manager != NULL);
2701
2702  s = tor->swarm;
2703  peers = (tr_peer**) tr_ptrArrayBase (&s->peers);
2704  size = tr_ptrArraySize (&s->peers);
2705  ret = tr_new0 (tr_peer_stat, size);
2706
2707  for (i=0; i<size; ++i)
2708    {
2709      char *                   pch;
2710      tr_peer *                peer = peers[i];
2711      tr_peerMsgs *            msgs = PEER_MSGS (peer);
2712      const struct peer_atom * atom = peer->atom;
2713      tr_peer_stat *           stat = ret + i;
2714
2715      tr_address_to_string_with_buf (&atom->addr, stat->addr, sizeof (stat->addr));
2716      tr_strlcpy (stat->client, tr_quark_get_string(peer->client,NULL), sizeof (stat->client));
2717      stat->port                = ntohs (peer->atom->port);
2718      stat->from                = atom->fromFirst;
2719      stat->progress            = peer->progress;
2720      stat->isUTP               = tr_peerMsgsIsUtpConnection (msgs);
2721      stat->isEncrypted         = tr_peerMsgsIsEncrypted (msgs);
2722      stat->rateToPeer_KBps     = toSpeedKBps (tr_peerGetPieceSpeed_Bps (peer, now_msec, TR_CLIENT_TO_PEER));
2723      stat->rateToClient_KBps   = toSpeedKBps (tr_peerGetPieceSpeed_Bps (peer, now_msec, TR_PEER_TO_CLIENT));
2724      stat->peerIsChoked        = tr_peerMsgsIsPeerChoked (msgs);
2725      stat->peerIsInterested    = tr_peerMsgsIsPeerInterested (msgs);
2726      stat->clientIsChoked      = tr_peerMsgsIsClientChoked (msgs);
2727      stat->clientIsInterested  = tr_peerMsgsIsClientInterested (msgs);
2728      stat->isIncoming          = tr_peerMsgsIsIncomingConnection (msgs);
2729      stat->isDownloadingFrom   = tr_peerMsgsIsActive (msgs, TR_PEER_TO_CLIENT);
2730      stat->isUploadingTo       = tr_peerMsgsIsActive (msgs, TR_CLIENT_TO_PEER);
2731      stat->isSeed              = tr_peerIsSeed (peer);
2732
2733      stat->blocksToPeer        = tr_historyGet (&peer->blocksSentToPeer,    now, CANCEL_HISTORY_SEC);
2734      stat->blocksToClient      = tr_historyGet (&peer->blocksSentToClient,  now, CANCEL_HISTORY_SEC);
2735      stat->cancelsToPeer       = tr_historyGet (&peer->cancelsSentToPeer,   now, CANCEL_HISTORY_SEC);
2736      stat->cancelsToClient     = tr_historyGet (&peer->cancelsSentToClient, now, CANCEL_HISTORY_SEC);
2737
2738      stat->pendingReqsToPeer   = peer->pendingReqsToPeer;
2739      stat->pendingReqsToClient = peer->pendingReqsToClient;
2740
2741      pch = stat->flagStr;
2742      if (stat->isUTP) *pch++ = 'T';
2743      if (s->optimistic == msgs) *pch++ = 'O';
2744      if (stat->isDownloadingFrom) *pch++ = 'D';
2745      else if (stat->clientIsInterested) *pch++ = 'd';
2746      if (stat->isUploadingTo) *pch++ = 'U';
2747      else if (stat->peerIsInterested) *pch++ = 'u';
2748      if (!stat->clientIsChoked && !stat->clientIsInterested) *pch++ = 'K';
2749      if (!stat->peerIsChoked && !stat->peerIsInterested) *pch++ = '?';
2750      if (stat->isEncrypted) *pch++ = 'E';
2751      if (stat->from == TR_PEER_FROM_DHT) *pch++ = 'H';
2752      else if (stat->from == TR_PEER_FROM_PEX) *pch++ = 'X';
2753      if (stat->isIncoming) *pch++ = 'I';
2754      *pch = '\0';
2755    }
2756
2757  *setmeCount = size;
2758  return ret;
2759}
2760
2761/***
2762****
2763****
2764***/
2765
2766void
2767tr_peerMgrClearInterest (tr_torrent * tor)
2768{
2769  int i;
2770  tr_swarm * s = tor->swarm;
2771  const int peerCount = tr_ptrArraySize (&s->peers);
2772
2773  assert (tr_isTorrent (tor));
2774  assert (tr_torrentIsLocked (tor));
2775
2776  for (i=0; i<peerCount; ++i)
2777    tr_peerMsgsSetInterested (tr_ptrArrayNth (&s->peers, i), false);
2778}
2779
2780/* does this peer have any pieces that we want? */
2781static bool
2782isPeerInteresting (tr_torrent     * const tor,
2783                   const bool     * const piece_is_interesting,
2784                   const tr_peer  * const peer)
2785{
2786  tr_piece_index_t i, n;
2787
2788  /* these cases should have already been handled by the calling code... */
2789  assert (!tr_torrentIsSeed (tor));
2790  assert (tr_torrentIsPieceTransferAllowed (tor, TR_PEER_TO_CLIENT));
2791
2792  if (tr_peerIsSeed (peer))
2793    return true;
2794
2795  for (i=0, n=tor->info.pieceCount; i<n; ++i)
2796    if (piece_is_interesting[i] && tr_bitfieldHas (&peer->have, i))
2797      return true;
2798
2799  return false;
2800}
2801
2802typedef enum
2803{
2804  RECHOKE_STATE_GOOD,
2805  RECHOKE_STATE_UNTESTED,
2806  RECHOKE_STATE_BAD
2807}
2808tr_rechoke_state;
2809
2810struct tr_rechoke_info
2811{
2812  tr_peer * peer;
2813  int salt;
2814  int rechoke_state;
2815};
2816
2817static int
2818compare_rechoke_info (const void * va, const void * vb)
2819{
2820  const struct tr_rechoke_info * a = va;
2821  const struct tr_rechoke_info * b = vb;
2822
2823  if (a->rechoke_state != b->rechoke_state)
2824    return a->rechoke_state - b->rechoke_state;
2825
2826  return a->salt - b->salt;
2827}
2828
2829/* determines who we send "interested" messages to */
2830static void
2831rechokeDownloads (tr_swarm * s)
2832{
2833  int i;
2834  int maxPeers = 0;
2835  int rechoke_count = 0;
2836  struct tr_rechoke_info * rechoke = NULL;
2837  const int MIN_INTERESTING_PEERS = 5;
2838  const int peerCount = tr_ptrArraySize (&s->peers);
2839  const time_t now = tr_time ();
2840
2841  /* some cases where this function isn't necessary */
2842  if (tr_torrentIsSeed (s->tor))
2843    return;
2844  if (!tr_torrentIsPieceTransferAllowed (s->tor, TR_PEER_TO_CLIENT))
2845    return;
2846
2847  /* decide HOW MANY peers to be interested in */
2848  {
2849    int blocks = 0;
2850    int cancels = 0;
2851    time_t timeSinceCancel;
2852
2853    /* Count up how many blocks & cancels each peer has.
2854     *
2855     * There are two situations where we send out cancels --
2856     *
2857     * 1. We've got unresponsive peers, which is handled by deciding
2858     *    -which- peers to be interested in.
2859     *
2860     * 2. We've hit our bandwidth cap, which is handled by deciding
2861     *    -how many- peers to be interested in.
2862     *
2863     * We're working on 2. here, so we need to ignore unresponsive
2864     * peers in our calculations lest they confuse Transmission into
2865     * thinking it's hit its bandwidth cap.
2866     */
2867    for (i=0; i<peerCount; ++i)
2868      {
2869        const tr_peer * peer = tr_ptrArrayNth (&s->peers, i);
2870        const int b = tr_historyGet (&peer->blocksSentToClient, now, CANCEL_HISTORY_SEC);
2871        const int c = tr_historyGet (&peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC);
2872
2873        if (b == 0) /* ignore unresponsive peers, as described above */
2874          continue;
2875
2876        blocks += b;
2877        cancels += c;
2878      }
2879
2880    if (cancels > 0)
2881      {
2882        /* cancelRate: of the block requests we've recently made, the percentage we cancelled.
2883         * higher values indicate more congestion. */
2884        const double cancelRate = cancels / (double)(cancels + blocks);
2885        const double mult = 1 - MIN (cancelRate, 0.5);
2886        maxPeers = s->interestedCount * mult;
2887        tordbg (s, "cancel rate is %.3f -- reducing the "
2888                   "number of peers we're interested in by %.0f percent",
2889                   cancelRate, mult * 100);
2890        s->lastCancel = now;
2891      }
2892
2893    timeSinceCancel = now - s->lastCancel;
2894    if (timeSinceCancel)
2895      {
2896        const int maxIncrease = 15;
2897        const time_t maxHistory = 2 * CANCEL_HISTORY_SEC;
2898        const double mult = MIN (timeSinceCancel, maxHistory) / (double) maxHistory;
2899        const int inc = maxIncrease * mult;
2900        maxPeers = s->maxPeers + inc;
2901        tordbg (s, "time since last cancel is %"PRIdMAX" -- increasing the "
2902                   "number of peers we're interested in by %d",
2903                   (intmax_t)timeSinceCancel, inc);
2904      }
2905  }
2906
2907  /* don't let the previous section's number tweaking go too far... */
2908  if (maxPeers < MIN_INTERESTING_PEERS)
2909    maxPeers = MIN_INTERESTING_PEERS;
2910  if (maxPeers > s->tor->maxConnectedPeers)
2911    maxPeers = s->tor->maxConnectedPeers;
2912
2913  s->maxPeers = maxPeers;
2914
2915  if (peerCount > 0)
2916    {
2917      bool * piece_is_interesting;
2918      const tr_torrent * const tor = s->tor;
2919      const int n = tor->info.pieceCount;
2920
2921      /* build a bitfield of interesting pieces... */
2922      piece_is_interesting = tr_new (bool, n);
2923      for (i=0; i<n; i++)
2924        piece_is_interesting[i] = !tor->info.pieces[i].dnd && !tr_torrentPieceIsComplete (tor, i);
2925
2926      /* decide WHICH peers to be interested in (based on their cancel-to-block ratio) */
2927      for (i=0; i<peerCount; ++i)
2928        {
2929          tr_peer * peer = tr_ptrArrayNth (&s->peers, i);
2930
2931          if (!isPeerInteresting (s->tor, piece_is_interesting, peer))
2932            {
2933              tr_peerMsgsSetInterested (PEER_MSGS(peer), false);
2934            }
2935          else
2936            {
2937              tr_rechoke_state rechoke_state;
2938              const int blocks = tr_historyGet (&peer->blocksSentToClient, now, CANCEL_HISTORY_SEC);
2939              const int cancels = tr_historyGet (&peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC);
2940
2941              if (!blocks && !cancels)
2942                rechoke_state = RECHOKE_STATE_UNTESTED;
2943              else if (!cancels)
2944                rechoke_state = RECHOKE_STATE_GOOD;
2945              else if (!blocks)
2946                rechoke_state = RECHOKE_STATE_BAD;
2947              else if ((cancels * 10) < blocks)
2948                rechoke_state = RECHOKE_STATE_GOOD;
2949              else
2950                rechoke_state = RECHOKE_STATE_BAD;
2951
2952              if (rechoke == NULL)
2953                rechoke = tr_new (struct tr_rechoke_info, peerCount);
2954
2955              rechoke[rechoke_count].peer = peer;
2956              rechoke[rechoke_count].rechoke_state = rechoke_state;
2957              rechoke[rechoke_count].salt = tr_rand_int_weak (INT_MAX);
2958              rechoke_count++;
2959            }
2960
2961        }
2962
2963      tr_free (piece_is_interesting);
2964    }
2965
2966  /* now that we know which & how many peers to be interested in... update the peer interest */
2967  qsort (rechoke, rechoke_count, sizeof (struct tr_rechoke_info), compare_rechoke_info);
2968  s->interestedCount = MIN (maxPeers, rechoke_count);
2969  for (i=0; i<rechoke_count; ++i)
2970    tr_peerMsgsSetInterested (PEER_MSGS(rechoke[i].peer), i<s->interestedCount);
2971
2972  /* cleanup */
2973  tr_free (rechoke);
2974}
2975
2976/**
2977***
2978**/
2979
2980struct ChokeData
2981{
2982  bool          isInterested;
2983  bool          wasChoked;
2984  bool          isChoked;
2985  int           rate;
2986  int           salt;
2987  tr_peerMsgs * msgs;
2988};
2989
2990static int
2991compareChoke (const void * va, const void * vb)
2992{
2993  const struct ChokeData * a = va;
2994  const struct ChokeData * b = vb;
2995
2996  if (a->rate != b->rate) /* prefer higher overall speeds */
2997    return a->rate > b->rate ? -1 : 1;
2998
2999  if (a->wasChoked != b->wasChoked) /* prefer unchoked */
3000    return a->wasChoked ? 1 : -1;
3001
3002  if (a->salt != b->salt) /* random order */
3003    return a->salt - b->salt;
3004
3005  return 0;
3006}
3007
3008/* is this a new connection? */
3009static bool
3010isNew (const tr_peerMsgs * msgs)
3011{
3012  return (msgs != NULL) && (tr_peerMsgsGetConnectionAge (msgs) < 45);
3013}
3014
3015/* get a rate for deciding which peers to choke and unchoke. */
3016static int
3017getRate (const tr_torrent * tor, struct peer_atom * atom, uint64_t now)
3018{
3019  unsigned int Bps;
3020
3021  if (tr_torrentIsSeed (tor))
3022    Bps = tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_CLIENT_TO_PEER);
3023
3024  /* downloading a private torrent... take upload speed into account
3025   * because there may only be a small window of opportunity to share */
3026  else if (tr_torrentIsPrivate (tor))
3027    Bps = tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_PEER_TO_CLIENT)
3028        + tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_CLIENT_TO_PEER);
3029
3030  /* downloading a public torrent */
3031  else
3032    Bps = tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_PEER_TO_CLIENT);
3033
3034  /* convert it to bytes per second */
3035  return Bps;
3036}
3037
3038static inline bool
3039isBandwidthMaxedOut (const tr_bandwidth * b,
3040                     const uint64_t now_msec, tr_direction dir)
3041{
3042  if (!tr_bandwidthIsLimited (b, dir))
3043    {
3044      return false;
3045    }
3046  else
3047    {
3048      const unsigned int got = tr_bandwidthGetPieceSpeed_Bps (b, now_msec, dir);
3049      const unsigned int want = tr_bandwidthGetDesiredSpeed_Bps (b, dir);
3050      return got >= want;
3051    }
3052}
3053
3054static void
3055rechokeUploads (tr_swarm * s, const uint64_t now)
3056{
3057  int i, size, unchokedInterested;
3058  const int peerCount = tr_ptrArraySize (&s->peers);
3059  tr_peer ** peers = (tr_peer**) tr_ptrArrayBase (&s->peers);
3060  struct ChokeData * choke = tr_new0 (struct ChokeData, peerCount);
3061  const tr_session * session = s->manager->session;
3062  const int chokeAll = !tr_torrentIsPieceTransferAllowed (s->tor, TR_CLIENT_TO_PEER);
3063  const bool isMaxedOut = isBandwidthMaxedOut (&s->tor->bandwidth, now, TR_UP);
3064
3065  assert (swarmIsLocked (s));
3066
3067  /* an optimistic unchoke peer's "optimistic"
3068   * state lasts for N calls to rechokeUploads (). */
3069  if (s->optimisticUnchokeTimeScaler > 0)
3070    s->optimisticUnchokeTimeScaler--;
3071  else
3072    s->optimistic = NULL;
3073
3074  /* sort the peers by preference and rate */
3075  for (i=0, size=0; i<peerCount; ++i)
3076    {
3077      tr_peer * peer = peers[i];
3078      tr_peerMsgs * msgs = PEER_MSGS (peer);
3079
3080      struct peer_atom * atom = peer->atom;
3081
3082      if (tr_peerIsSeed (peer)) /* choke seeds and partial seeds */
3083        {
3084          tr_peerMsgsSetChoke (PEER_MSGS(peer), true);
3085        }
3086      else if (chokeAll) /* choke everyone if we're not uploading */
3087        {
3088          tr_peerMsgsSetChoke (PEER_MSGS(peer), true);
3089        }
3090      else if (msgs != s->optimistic)
3091        {
3092          struct ChokeData * n = &choke[size++];
3093          n->msgs         = msgs;
3094          n->isInterested = tr_peerMsgsIsPeerInterested (msgs);
3095          n->wasChoked    = tr_peerMsgsIsPeerChoked (msgs);
3096          n->rate         = getRate (s->tor, atom, now);
3097          n->salt         = tr_rand_int_weak (INT_MAX);
3098          n->isChoked     = true;
3099        }
3100    }
3101
3102  qsort (choke, size, sizeof (struct ChokeData), compareChoke);
3103
3104  /**
3105   * Reciprocation and number of uploads capping is managed by unchoking
3106   * the N peers which have the best upload rate and are interested.
3107   * This maximizes the client's download rate. These N peers are
3108   * referred to as downloaders, because they are interested in downloading
3109   * from the client.
3110   *
3111   * Peers which have a better upload rate (as compared to the downloaders)
3112   * but aren't interested get unchoked. If they become interested, the
3113   * downloader with the worst upload rate gets choked. If a client has
3114   * a complete file, it uses its upload rate rather than its download
3115   * rate to decide which peers to unchoke.
3116   *
3117   * If our bandwidth is maxed out, don't unchoke any more peers.
3118   */
3119  unchokedInterested = 0;
3120  for (i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i)
3121    {
3122      choke[i].isChoked = isMaxedOut ? choke[i].wasChoked : false;
3123      if (choke[i].isInterested)
3124        ++unchokedInterested;
3125    }
3126
3127  /* optimistic unchoke */
3128  if (!s->optimistic && !isMaxedOut && (i<size))
3129    {
3130      int n;
3131      struct ChokeData * c;
3132      tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
3133
3134      for (; i<size; ++i)
3135        {
3136          if (choke[i].isInterested)
3137            {
3138              const tr_peerMsgs * msgs = choke[i].msgs;
3139              int x = 1, y;
3140              if (isNew (msgs)) x *= 3;
3141              for (y=0; y<x; ++y)
3142                tr_ptrArrayAppend (&randPool, &choke[i]);
3143            }
3144        }
3145
3146      if ((n = tr_ptrArraySize (&randPool)))
3147        {
3148          c = tr_ptrArrayNth (&randPool, tr_rand_int_weak (n));
3149          c->isChoked = false;
3150          s->optimistic = c->msgs;
3151          s->optimisticUnchokeTimeScaler = OPTIMISTIC_UNCHOKE_MULTIPLIER;
3152        }
3153
3154      tr_ptrArrayDestruct (&randPool, NULL);
3155    }
3156
3157  for (i=0; i<size; ++i)
3158    tr_peerMsgsSetChoke (choke[i].msgs, choke[i].isChoked);
3159
3160  /* cleanup */
3161  tr_free (choke);
3162}
3163
3164static void
3165rechokePulse (evutil_socket_t foo UNUSED, short bar UNUSED, void * vmgr)
3166{
3167  tr_torrent * tor = NULL;
3168  tr_peerMgr * mgr = vmgr;
3169  const uint64_t now = tr_time_msec ();
3170
3171  managerLock (mgr);
3172
3173  while ((tor = tr_torrentNext (mgr->session, tor)))
3174    {
3175      if (tor->isRunning)
3176        {
3177          tr_swarm * s = tor->swarm;
3178
3179          if (s->stats.peerCount > 0)
3180            {
3181              rechokeUploads (s, now);
3182              rechokeDownloads (s);
3183            }
3184        }
3185    }
3186
3187  tr_timerAddMsec (mgr->rechokeTimer, RECHOKE_PERIOD_MSEC);
3188  managerUnlock (mgr);
3189}
3190
3191/***
3192****
3193****  Life and Death
3194****
3195***/
3196
3197static bool
3198shouldPeerBeClosed (const tr_swarm   * s,
3199                    const tr_peer    * peer,
3200                    int                peerCount,
3201                    const time_t       now)
3202{
3203  const tr_torrent * tor = s->tor;
3204  const struct peer_atom * atom = peer->atom;
3205
3206  /* if it's marked for purging, close it */
3207  if (peer->doPurge)
3208    {
3209      tordbg (s, "purging peer %s because its doPurge flag is set",
3210              tr_atomAddrStr (atom));
3211      return true;
3212    }
3213
3214  /* disconnect if we're both seeds and enough time has passed for PEX */
3215  if (tr_torrentIsSeed (tor) && tr_peerIsSeed (peer))
3216    return !tr_torrentAllowsPex (tor) || (now-atom->time>=30);
3217
3218  /* disconnect if it's been too long since piece data has been transferred.
3219   * this is on a sliding scale based on number of available peers... */
3220  {
3221    const int relaxStrictnessIfFewerThanN = (int)((getMaxPeerCount (tor) * 0.9) + 0.5);
3222    /* if we have >= relaxIfFewerThan, strictness is 100%.
3223     * if we have zero connections, strictness is 0% */
3224    const float strictness = peerCount >= relaxStrictnessIfFewerThanN
3225                           ? 1.0
3226                           : peerCount / (float)relaxStrictnessIfFewerThanN;
3227    const int lo = MIN_UPLOAD_IDLE_SECS;
3228    const int hi = MAX_UPLOAD_IDLE_SECS;
3229    const int limit = hi - ((hi - lo) * strictness);
3230    const int idleTime = now - MAX (atom->time, atom->piece_data_time);
3231/*fprintf (stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit);*/
3232    if (idleTime > limit)
3233      {
3234        tordbg (s, "purging peer %s because it's been %d secs since we shared anything",
3235                tr_atomAddrStr (atom), idleTime);
3236        return true;
3237      }
3238  }
3239
3240  return false;
3241}
3242
3243static tr_peer **
3244getPeersToClose (tr_swarm * s, const time_t now_sec, int * setmeSize)
3245{
3246  int i, peerCount, outsize;
3247  struct tr_peer ** ret = NULL;
3248  tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek (&s->peers, &peerCount);
3249
3250  assert (swarmIsLocked (s));
3251
3252  for (i=outsize=0; i<peerCount; ++i)
3253    {
3254      if (shouldPeerBeClosed (s, peers[i], peerCount, now_sec))
3255        {
3256          if (ret == NULL)
3257            ret = tr_new (tr_peer *, peerCount);
3258          ret[outsize++] = peers[i];
3259        }
3260    }
3261
3262  *setmeSize = outsize;
3263  return ret;
3264}
3265
3266static int
3267getReconnectIntervalSecs (const struct peer_atom * atom, const time_t now)
3268{
3269  int sec;
3270  const bool unreachable = (atom->flags2 & MYFLAG_UNREACHABLE) != 0;
3271
3272  /* if we were recently connected to this peer and transferring piece
3273   * data, try to reconnect to them sooner rather that later -- we don't
3274   * want network troubles to get in the way of a good peer. */
3275  if (!unreachable && ((now - atom->piece_data_time) <= (MINIMUM_RECONNECT_INTERVAL_SECS * 2)))
3276    sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3277
3278  /* otherwise, the interval depends on how many times we've tried
3279   * and failed to connect to the peer */
3280  else
3281    {
3282      int step = atom->numFails;
3283
3284      /* penalize peers that were unreachable the last time we tried */
3285      if (unreachable)
3286        step += 2;
3287
3288      switch (step)
3289        {
3290          case 0: sec = 0; break;
3291          case 1: sec = 10; break;
3292          case 2: sec = 60 * 2; break;
3293          case 3: sec = 60 * 15; break;
3294          case 4: sec = 60 * 30; break;
3295          case 5: sec = 60 * 60; break;
3296          default: sec = 60 * 120; break;
3297        }
3298    }
3299
3300  dbgmsg ("reconnect interval for %s is %d seconds", tr_atomAddrStr (atom), sec);
3301  return sec;
3302}
3303
3304static void
3305removePeer (tr_swarm * s, tr_peer * peer)
3306{
3307  struct peer_atom * atom = peer->atom;
3308
3309  assert (swarmIsLocked (s));
3310  assert (atom);
3311
3312  atom->time = tr_time ();
3313
3314  tr_ptrArrayRemoveSortedPointer (&s->peers, peer, peerCompare);
3315  --s->stats.peerCount;
3316  --s->stats.peerFromCount[atom->fromFirst];
3317
3318  if (replicationExists (s))
3319    tr_decrReplicationFromBitfield (s, &peer->have);
3320
3321  assert (s->stats.peerCount == tr_ptrArraySize (&s->peers));
3322  assert (s->stats.peerFromCount[atom->fromFirst] >= 0);
3323
3324  tr_peerFree (peer);
3325}
3326
3327static void
3328closePeer (tr_swarm * s, tr_peer * peer)
3329{
3330  struct peer_atom * atom;
3331
3332  assert (s != NULL);
3333  assert (peer != NULL);
3334
3335  atom = peer->atom;
3336
3337  /* if we transferred piece data, then they might be good peers,
3338     so reset their `numFails' weight to zero. otherwise we connected
3339     to them fruitlessly, so mark it as another fail */
3340  if (atom->piece_data_time)
3341    {
3342      tordbg (s, "resetting atom %s numFails to 0", tr_atomAddrStr (atom));
3343      atom->numFails = 0;
3344    }
3345  else
3346    {
3347      ++atom->numFails;
3348      tordbg (s, "incremented atom %s numFails to %d", tr_atomAddrStr (atom), (int)atom->numFails);
3349    }
3350
3351  tordbg (s, "removing bad peer %s", tr_atomAddrStr (peer->atom));
3352  removePeer (s, peer);
3353}
3354
3355static void
3356removeAllPeers (tr_swarm * s)
3357{
3358  while (!tr_ptrArrayEmpty (&s->peers))
3359    removePeer (s, tr_ptrArrayNth (&s->peers, 0));
3360
3361  assert (!s->stats.peerCount);
3362}
3363
3364static void
3365closeBadPeers (tr_swarm * s, const time_t now_sec)
3366{
3367  if (!tr_ptrArrayEmpty (&s->peers))
3368    {
3369      int i;
3370      int peerCount;
3371      struct tr_peer ** peers;
3372
3373      peers = getPeersToClose (s, now_sec, &peerCount);
3374      for (i=0; i<peerCount; ++i)
3375        closePeer (s, peers[i]);
3376      tr_free (peers);
3377    }
3378}
3379
3380struct peer_liveliness
3381{
3382  tr_peer * peer;
3383  void * clientData;
3384  time_t pieceDataTime;
3385  time_t time;
3386  unsigned int speed;
3387  bool doPurge;
3388};
3389
3390static int
3391comparePeerLiveliness (const void * va, const void * vb)
3392{
3393  const struct peer_liveliness * a = va;
3394  const struct peer_liveliness * b = vb;
3395
3396  if (a->doPurge != b->doPurge)
3397    return a->doPurge ? 1 : -1;
3398
3399  if (a->speed != b->speed) /* faster goes first */
3400    return a->speed > b->speed ? -1 : 1;
3401
3402  /* the one to give us data more recently goes first */
3403  if (a->pieceDataTime != b->pieceDataTime)
3404    return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
3405
3406  /* the one we connected to most recently goes first */
3407  if (a->time != b->time)
3408    return a->time > b->time ? -1 : 1;
3409
3410  return 0;
3411}
3412
3413static void
3414sortPeersByLivelinessImpl (tr_peer  ** peers,
3415                           void     ** clientData,
3416                           int         n,
3417                           uint64_t    now,
3418                           int (*compare)(const void *va, const void *vb))
3419{
3420  int i;
3421  struct peer_liveliness *lives, *l;
3422
3423  /* build a sortable array of peer + extra info */
3424  lives = l = tr_new0 (struct peer_liveliness, n);
3425  for (i=0; i<n; ++i, ++l)
3426    {
3427      tr_peer * p = peers[i];
3428      l->peer = p;
3429      l->doPurge = p->doPurge;
3430      l->pieceDataTime = p->atom->piece_data_time;
3431      l->time = p->atom->time;
3432      l->speed = tr_peerGetPieceSpeed_Bps (p, now, TR_UP)
3433               + tr_peerGetPieceSpeed_Bps (p, now, TR_DOWN);
3434      if (clientData)
3435        l->clientData = clientData[i];
3436    }
3437
3438  /* sort 'em */
3439  assert (n == (l - lives));
3440  qsort (lives, n, sizeof (struct peer_liveliness), compare);
3441
3442  /* build the peer array */
3443  for (i=0, l=lives; i<n; ++i, ++l)
3444    {
3445      peers[i] = l->peer;
3446      if (clientData)
3447        clientData[i] = l->clientData;
3448    }
3449  assert (n == (l - lives));
3450
3451  /* cleanup */
3452  tr_free (lives);
3453}
3454
3455static void
3456sortPeersByLiveliness (tr_peer ** peers, void ** clientData, int n, uint64_t now)
3457{
3458  sortPeersByLivelinessImpl (peers, clientData, n, now, comparePeerLiveliness);
3459}
3460
3461
3462static void
3463enforceTorrentPeerLimit (tr_swarm * s, uint64_t now)
3464{
3465  int n = tr_ptrArraySize (&s->peers);
3466  const int max = tr_torrentGetPeerLimit (s->tor);
3467  if (n > max)
3468    {
3469      void * base = tr_ptrArrayBase (&s->peers);
3470      tr_peer ** peers = tr_memdup (base, n*sizeof (tr_peer*));
3471      sortPeersByLiveliness (peers, NULL, n, now);
3472      while (n > max)
3473        closePeer (s, peers[--n]);
3474      tr_free (peers);
3475    }
3476}
3477
3478static void
3479enforceSessionPeerLimit (tr_session * session, uint64_t now)
3480{
3481  int n = 0;
3482  tr_torrent * tor = NULL;
3483  const int max = tr_sessionGetPeerLimit (session);
3484
3485  /* count the total number of peers */
3486  while ((tor = tr_torrentNext (session, tor)))
3487    n += tr_ptrArraySize (&tor->swarm->peers);
3488
3489  /* if there are too many, prune out the worst */
3490  if (n > max)
3491    {
3492      tr_peer ** peers = tr_new (tr_peer*, n);
3493      tr_swarm ** swarms = tr_new (tr_swarm*, n);
3494
3495      /* populate the peer array */
3496      n = 0;
3497      tor = NULL;
3498      while ((tor = tr_torrentNext (session, tor)))
3499        {
3500          int i;
3501          tr_swarm * s = tor->swarm;
3502          const int tn = tr_ptrArraySize (&s->peers);
3503          for (i=0; i<tn; ++i, ++n)
3504            {
3505              peers[n] = tr_ptrArrayNth (&s->peers, i);
3506              swarms[n] = s;
3507            }
3508        }
3509
3510      /* sort 'em */
3511      sortPeersByLiveliness (peers, (void**)swarms, n, now);
3512
3513      /* cull out the crappiest */
3514      while (n-- > max)
3515        closePeer (swarms[n], peers[n]);
3516
3517      /* cleanup */
3518      tr_free (swarms);
3519      tr_free (peers);
3520    }
3521}
3522
3523static void makeNewPeerConnections (tr_peerMgr * mgr, const int max);
3524
3525static void
3526reconnectPulse (evutil_socket_t foo UNUSED, short bar UNUSED, void * vmgr)
3527{
3528  tr_torrent * tor;
3529  tr_peerMgr * mgr = vmgr;
3530  const time_t now_sec = tr_time ();
3531  const uint64_t now_msec = tr_time_msec ();
3532
3533  /**
3534  ***  enforce the per-session and per-torrent peer limits
3535  **/
3536
3537  /* if we're over the per-torrent peer limits, cull some peers */
3538  tor = NULL;
3539  while ((tor = tr_torrentNext (mgr->session, tor)))
3540    if (tor->isRunning)
3541      enforceTorrentPeerLimit (tor->swarm, now_msec);
3542
3543  /* if we're over the per-session peer limits, cull some peers */
3544  enforceSessionPeerLimit (mgr->session, now_msec);
3545
3546  /* remove crappy peers */
3547  tor = NULL;
3548  while ((tor = tr_torrentNext (mgr->session, tor)))
3549    if (!tor->swarm->isRunning)
3550      removeAllPeers (tor->swarm);
3551    else
3552      closeBadPeers (tor->swarm, now_sec);
3553
3554  /* try to make new peer connections */
3555  makeNewPeerConnections (mgr, MAX_CONNECTIONS_PER_PULSE);
3556}
3557
3558/****
3559*****
3560*****  BANDWIDTH ALLOCATION
3561*****
3562****/
3563
3564static void
3565pumpAllPeers (tr_peerMgr * mgr)
3566{
3567  tr_torrent * tor = NULL;
3568
3569  while ((tor = tr_torrentNext (mgr->session, tor)))
3570    {
3571      int j;
3572      tr_swarm * s = tor->swarm;
3573
3574      for (j=0; j<tr_ptrArraySize (&s->peers); ++j)
3575        tr_peerMsgsPulse (tr_ptrArrayNth (&s->peers, j));
3576    }
3577}
3578
3579
3580static void
3581queuePulseForeach (void * vtor)
3582{
3583  tr_torrent * tor = vtor;
3584
3585  tr_torrentStartNow (tor);
3586
3587  if (tor->queue_started_callback != NULL)
3588    (*tor->queue_started_callback)(tor, tor->queue_started_user_data);
3589}
3590
3591static void
3592queuePulse (tr_session * session, tr_direction dir)
3593{
3594  assert (tr_isSession (session));
3595  assert (tr_isDirection (dir));
3596
3597  if (tr_sessionGetQueueEnabled (session, dir))
3598    {
3599      tr_ptrArray torrents = TR_PTR_ARRAY_INIT;
3600
3601      tr_sessionGetNextQueuedTorrents (session,
3602                                       dir,
3603                                       tr_sessionCountQueueFreeSlots (session, dir),
3604                                       &torrents);
3605
3606      tr_ptrArrayForeach (&torrents, queuePulseForeach);
3607
3608      tr_ptrArrayDestruct (&torrents, NULL);
3609    }
3610}
3611
3612static void
3613bandwidthPulse (evutil_socket_t foo UNUSED, short bar UNUSED, void * vmgr)
3614{
3615  tr_torrent * tor;
3616  tr_peerMgr * mgr = vmgr;
3617  tr_session * session = mgr->session;
3618  managerLock (mgr);
3619
3620  /* FIXME: this next line probably isn't necessary... */
3621  pumpAllPeers (mgr);
3622
3623  /* allocate bandwidth to the peers */
3624  tr_bandwidthAllocate (&session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC);
3625  tr_bandwidthAllocate (&session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC);
3626
3627  /* torrent upkeep */
3628  tor = NULL;
3629  while ((tor = tr_torrentNext (session, tor)))
3630    {
3631      /* possibly stop torrents that have seeded enough */
3632      tr_torrentCheckSeedLimit (tor);
3633
3634      /* run the completeness check for any torrents that need it */
3635      if (tor->swarm->needsCompletenessCheck)
3636        {
3637          tor->swarm->needsCompletenessCheck  = false;
3638          tr_torrentRecheckCompleteness (tor);
3639        }
3640
3641      /* stop torrents that are ready to stop, but couldn't be stopped
3642         earlier during the peer-io callback call chain */
3643      if (tor->isStopping)
3644        tr_torrentStop (tor);
3645
3646      /* update the torrent's stats */
3647      tor->swarm->stats.activeWebseedCount = countActiveWebseeds (tor->swarm);
3648    }
3649
3650  /* pump the queues */
3651  queuePulse (session, TR_UP);
3652  queuePulse (session, TR_DOWN);
3653
3654  reconnectPulse (0, 0, mgr);
3655
3656  tr_timerAddMsec (mgr->bandwidthTimer, BANDWIDTH_PERIOD_MSEC);
3657  managerUnlock (mgr);
3658}
3659
3660/***
3661****
3662***/
3663
3664static int
3665compareAtomPtrsByAddress (const void * va, const void *vb)
3666{
3667  const struct peer_atom * a = * (const struct peer_atom**) va;
3668  const struct peer_atom * b = * (const struct peer_atom**) vb;
3669
3670  assert (tr_isAtom (a));
3671  assert (tr_isAtom (b));
3672
3673  return tr_address_compare (&a->addr, &b->addr);
3674}
3675
3676/* best come first, worst go last */
3677static int
3678compareAtomPtrsByShelfDate (const void * va, const void *vb)
3679{
3680  time_t atime;
3681  time_t btime;
3682  const struct peer_atom * a = * (const struct peer_atom**) va;
3683  const struct peer_atom * b = * (const struct peer_atom**) vb;
3684  const int data_time_cutoff_secs = 60 * 60;
3685  const time_t tr_now = tr_time ();
3686
3687  assert (tr_isAtom (a));
3688  assert (tr_isAtom (b));
3689
3690  /* primary key: the last piece data time *if* it was within the last hour */
3691  atime = a->piece_data_time; if (atime + data_time_cutoff_secs < tr_now) atime = 0;
3692  btime = b->piece_data_time; if (btime + data_time_cutoff_secs < tr_now) btime = 0;
3693  if (atime != btime)
3694    return atime > btime ? -1 : 1;
3695
3696  /* secondary key: shelf date. */
3697  if (a->shelf_date != b->shelf_date)
3698    return a->shelf_date > b->shelf_date ? -1 : 1;
3699
3700  return 0;
3701}
3702
3703static int
3704getMaxAtomCount (const tr_torrent * tor)
3705{
3706  return MIN (50, tor->maxConnectedPeers * 3);
3707}
3708
3709static void
3710atomPulse (evutil_socket_t foo UNUSED, short bar UNUSED, void * vmgr)
3711{
3712  tr_torrent * tor = NULL;
3713  tr_peerMgr * mgr = vmgr;
3714  managerLock (mgr);
3715
3716  while ((tor = tr_torrentNext (mgr->session, tor)))
3717    {
3718      int atomCount;
3719      tr_swarm * s = tor->swarm;
3720      const int maxAtomCount = getMaxAtomCount (tor);
3721      struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek (&s->pool, &atomCount);
3722
3723      if (atomCount > maxAtomCount) /* we've got too many atoms... time to prune */
3724        {
3725          int i;
3726          int keepCount = 0;
3727          int testCount = 0;
3728          struct peer_atom ** keep = tr_new (struct peer_atom*, atomCount);
3729          struct peer_atom ** test = tr_new (struct peer_atom*, atomCount);
3730
3731          /* keep the ones that are in use */
3732          for (i=0; i<atomCount; ++i)
3733            {
3734              struct peer_atom * atom = atoms[i];
3735              if (peerIsInUse (s, atom))
3736                keep[keepCount++] = atom;
3737              else
3738                test[testCount++] = atom;
3739            }
3740
3741          /* if there's room, keep the best of what's left */
3742          i = 0;
3743          if (keepCount < maxAtomCount)
3744            {
3745              qsort (test, testCount, sizeof (struct peer_atom *), compareAtomPtrsByShelfDate);
3746              while (i<testCount && keepCount<maxAtomCount)
3747                keep[keepCount++] = test[i++];
3748            }
3749
3750          /* free the culled atoms */
3751          while (i<testCount)
3752            tr_free (test[i++]);
3753
3754          /* rebuild Torrent.pool with what's left */
3755          tr_ptrArrayDestruct (&s->pool, NULL);
3756          s->pool = TR_PTR_ARRAY_INIT;
3757          qsort (keep, keepCount, sizeof (struct peer_atom *), compareAtomPtrsByAddress);
3758          for (i=0; i<keepCount; ++i)
3759            tr_ptrArrayAppend (&s->pool, keep[i]);
3760
3761          tordbg (s, "max atom count is %d... pruned from %d to %d\n", maxAtomCount, atomCount, keepCount);
3762
3763          /* cleanup */
3764          tr_free (test);
3765          tr_free (keep);
3766        }
3767    }
3768
3769  tr_timerAddMsec (mgr->atomTimer, ATOM_PERIOD_MSEC);
3770  managerUnlock (mgr);
3771}
3772
3773/***
3774****
3775****
3776****
3777***/
3778
3779/* is this atom someone that we'd want to initiate a connection to? */
3780static bool
3781isPeerCandidate (const tr_torrent * tor, struct peer_atom * atom, const time_t now)
3782{
3783  /* not if we're both seeds */
3784  if (tr_torrentIsSeed (tor) && atomIsSeed (atom))
3785    return false;
3786
3787  /* not if we've already got a connection to them... */
3788  if (peerIsInUse (tor->swarm, atom))
3789    return false;
3790
3791  /* not if we just tried them already */
3792  if ((now - atom->time) < getReconnectIntervalSecs (atom, now))
3793    return false;
3794
3795  /* not if they're blocklisted */
3796  if (isAtomBlocklisted (tor->session, atom))
3797    return false;
3798
3799  /* not if they're banned... */
3800  if (atom->flags2 & MYFLAG_BANNED)
3801    return false;
3802
3803  return true;
3804}
3805
3806struct peer_candidate
3807{
3808  uint64_t score;
3809  tr_torrent * tor;
3810  struct peer_atom * atom;
3811};
3812
3813static bool
3814torrentWasRecentlyStarted (const tr_torrent * tor)
3815{
3816  return difftime (tr_time (), tor->startDate) < 120;
3817}
3818
3819static inline uint64_t
3820addValToKey (uint64_t value, int width, uint64_t addme)
3821{
3822  value = (value << (uint64_t)width);
3823  value |= addme;
3824  return value;
3825}
3826
3827/* smaller value is better */
3828static uint64_t
3829getPeerCandidateScore (const tr_torrent * tor, const struct peer_atom * atom, uint8_t salt)
3830{
3831  uint64_t i;
3832  uint64_t score = 0;
3833  const bool failed = atom->lastConnectionAt < atom->lastConnectionAttemptAt;
3834
3835  /* prefer peers we've connected to, or never tried, over peers we failed to connect to. */
3836  i = failed ? 1 : 0;
3837  score = addValToKey (score, 1, i);
3838
3839  /* prefer the one we attempted least recently (to cycle through all peers) */
3840  i = atom->lastConnectionAttemptAt;
3841  score = addValToKey (score, 32, i);
3842
3843  /* prefer peers belonging to a torrent of a higher priority */
3844  switch (tr_torrentGetPriority (tor))
3845    {
3846      case TR_PRI_HIGH:    i = 0; break;
3847      case TR_PRI_NORMAL:  i = 1; break;
3848      case TR_PRI_LOW:     i = 2; break;
3849    }
3850  score = addValToKey (score, 4, i);
3851
3852  /* prefer recently-started torrents */
3853  i = torrentWasRecentlyStarted (tor) ? 0 : 1;
3854  score = addValToKey (score, 1, i);
3855
3856  /* prefer torrents we're downloading with */
3857  i = tr_torrentIsSeed (tor) ? 1 : 0;
3858  score = addValToKey (score, 1, i);
3859
3860  /* prefer peers that are known to be connectible */
3861  i = (atom->flags & ADDED_F_CONNECTABLE) ? 0 : 1;
3862  score = addValToKey (score, 1, i);
3863
3864  /* prefer peers that we might have a chance of uploading to...
3865  so lower seed probability is better */
3866  if (atom->seedProbability == 100) i = 101;
3867  else if (atom->seedProbability == -1) i = 100;
3868  else i = atom->seedProbability;
3869  score = addValToKey (score, 8, i);
3870
3871  /* Prefer peers that we got from more trusted sources.
3872   * lower `fromBest' values indicate more trusted sources */
3873  score = addValToKey (score, 4, atom->fromBest);
3874
3875  /* salt */
3876  score = addValToKey (score, 8, salt);
3877
3878  return score;
3879}
3880
3881static int
3882comparePeerCandidates (const void * va, const void * vb)
3883{
3884  int ret;
3885  const struct peer_candidate * a = va;
3886  const struct peer_candidate * b = vb;
3887
3888  if (a->score < b->score)
3889    ret = -1;
3890  else if (a->score > b->score)
3891    ret = 1;
3892  else
3893    ret = 0;
3894
3895  return ret;
3896}
3897
3898/* Partial sorting -- selecting the k best candidates
3899   Adapted from http://en.wikipedia.org/wiki/Selection_algorithm */
3900static void
3901selectPeerCandidates (struct peer_candidate * candidates, int candidate_count, int select_count)
3902{
3903  tr_quickfindFirstK (candidates,
3904                      candidate_count,
3905                      sizeof(struct peer_candidate),
3906                      comparePeerCandidates,
3907                      select_count);
3908}
3909
3910#ifndef NDEBUG
3911static bool
3912checkBestScoresComeFirst (const struct peer_candidate * candidates, int n, int k)
3913{
3914  int i;
3915  uint64_t worstFirstScore = 0;
3916  const int x = MIN (n, k) - 1;
3917
3918  for (i=0; i<x; i++)
3919    if (worstFirstScore < candidates[i].score)
3920      worstFirstScore = candidates[i].score;
3921
3922  for (i=0; i<x; i++)
3923    assert (candidates[i].score <= worstFirstScore);
3924
3925  for (i=x+1; i<n; i++)
3926    assert (candidates[i].score >= worstFirstScore);
3927
3928  return true;
3929}
3930#endif /* NDEBUG */
3931
3932/** @return an array of all the atoms we might want to connect to */
3933static struct peer_candidate*
3934getPeerCandidates (tr_session * session, int * candidateCount, int max)
3935{
3936  int atomCount;
3937  int peerCount;
3938  tr_torrent * tor;
3939  struct peer_candidate * candidates;
3940  struct peer_candidate * walk;
3941  const time_t now = tr_time ();
3942  const uint64_t now_msec = tr_time_msec ();
3943  /* leave 5% of connection slots for incoming connections -- ticket #2609 */
3944  const int maxCandidates = tr_sessionGetPeerLimit (session) * 0.95;
3945
3946  /* count how many peers and atoms we've got */
3947  tor= NULL;
3948  atomCount = 0;
3949  peerCount = 0;
3950  while ((tor = tr_torrentNext (session, tor)))
3951    {
3952      atomCount += tr_ptrArraySize (&tor->swarm->pool);
3953      peerCount += tr_ptrArraySize (&tor->swarm->peers);
3954    }
3955
3956  /* don't start any new handshakes if we're full up */
3957  if (maxCandidates <= peerCount)
3958    {
3959      *candidateCount = 0;
3960      return NULL;
3961    }
3962
3963  /* allocate an array of candidates */
3964  walk = candidates = tr_new (struct peer_candidate, atomCount);
3965
3966  /* populate the candidate array */
3967  tor = NULL;
3968  while ((tor = tr_torrentNext (session, tor)))
3969    {
3970      int i, nAtoms;
3971      struct peer_atom ** atoms;
3972
3973      if (!tor->swarm->isRunning)
3974        continue;
3975
3976      /* if we've already got enough peers in this torrent... */
3977      if (tr_torrentGetPeerLimit (tor) <= tr_ptrArraySize (&tor->swarm->peers))
3978        continue;
3979
3980      /* if we've already got enough speed in this torrent... */
3981      if (tr_torrentIsSeed (tor) && isBandwidthMaxedOut (&tor->bandwidth, now_msec, TR_UP))
3982        continue;
3983
3984      atoms = (struct peer_atom**) tr_ptrArrayPeek (&tor->swarm->pool, &nAtoms);
3985      for (i=0; i<nAtoms; ++i)
3986        {
3987          struct peer_atom * atom = atoms[i];
3988
3989          if (isPeerCandidate (tor, atom, now))
3990            {
3991              const uint8_t salt = tr_rand_int_weak (1024);
3992              walk->tor = tor;
3993              walk->atom = atom;
3994              walk->score = getPeerCandidateScore (tor, atom, salt);
3995              ++walk;
3996            }
3997        }
3998    }
3999
4000  *candidateCount = walk - candidates;
4001  if (walk != candidates)
4002    selectPeerCandidates (candidates, walk-candidates, max);
4003
4004  assert (checkBestScoresComeFirst (candidates, *candidateCount, max));
4005  return candidates;
4006}
4007
4008static void
4009initiateConnection (tr_peerMgr * mgr, tr_swarm * s, struct peer_atom * atom)
4010{
4011  tr_peerIo * io;
4012  const time_t now = tr_time ();
4013  bool utp = tr_sessionIsUTPEnabled (mgr->session) && !atom->utp_failed;
4014
4015  if (atom->fromFirst == TR_PEER_FROM_PEX)
4016    /* PEX has explicit signalling for uTP support.  If an atom
4017       originally came from PEX and doesn't have the uTP flag, skip the
4018       uTP connection attempt.  Are we being optimistic here? */
4019    utp = utp && (atom->flags & ADDED_F_UTP_FLAGS);
4020
4021  tordbg (s, "Starting an OUTGOING%s connection with %s",
4022          utp ? " µTP" : "", tr_atomAddrStr (atom));
4023
4024  io = tr_peerIoNewOutgoing (mgr->session,
4025                             &mgr->session->bandwidth,
4026                             &atom->addr,
4027                             atom->port,
4028                             s->tor->info.hash,
4029                             s->tor->completeness == TR_SEED,
4030                             utp);
4031
4032  if (io == NULL)
4033    {
4034      tordbg (s, "peerIo not created; marking peer %s as unreachable", tr_atomAddrStr (atom));
4035      atom->flags2 |= MYFLAG_UNREACHABLE;
4036      atom->numFails++;
4037    }
4038  else
4039    {
4040      tr_handshake * handshake = tr_handshakeNew (io,
4041                                                  mgr->session->encryptionMode,
4042                                                  myHandshakeDoneCB,
4043                                                  mgr);
4044
4045      assert (tr_peerIoGetTorrentHash (io));
4046
4047      tr_peerIoUnref (io); /* balanced by the initial ref
4048                              in tr_peerIoNewOutgoing () */
4049
4050      tr_ptrArrayInsertSorted (&s->outgoingHandshakes, handshake,
4051                               handshakeCompare);
4052    }
4053
4054  atom->lastConnectionAttemptAt = now;
4055  atom->time = now;
4056}
4057
4058static void
4059initiateCandidateConnection (tr_peerMgr * mgr, struct peer_candidate * c)
4060{
4061#if 0
4062  fprintf (stderr, "Starting an OUTGOING connection with %s - [%s] seedProbability==%d; %s, %s\n",
4063           tr_atomAddrStr (c->atom),
4064           tr_torrentName (c->tor),
4065           (int)c->atom->seedProbability,
4066           tr_torrentIsPrivate (c->tor) ? "private" : "public",
4067           tr_torrentIsSeed (c->tor) ? "seed" : "downloader");
4068#endif
4069
4070  initiateConnection (mgr, c->tor->swarm, c->atom);
4071}
4072
4073static void
4074makeNewPeerConnections (struct tr_peerMgr * mgr, const int max)
4075{
4076  int i, n;
4077  struct peer_candidate * candidates;
4078
4079  candidates = getPeerCandidates (mgr->session, &n, max);
4080
4081  for (i=0; i<n && i<max; ++i)
4082    initiateCandidateConnection (mgr, &candidates[i]);
4083
4084  tr_free (candidates);
4085}
Note: See TracBrowser for help on using the repository browser.