source: trunk/libtransmission/peer-mgr.c

Last change on this file was 14666, checked in by mikedld, 5 years ago

Use TR_BAD_SIZE instead of -1 in tr_quark_new() calls

Extend quark test to improve branch coverage.

  • Property svn:keywords set to Date Rev Author Id
File size: 108.6 KB
Line 
1/*
2 * This file Copyright (C) 2007-2014 Mnemosyne LLC
3 *
4 * It may be used under the GNU GPL versions 2 or 3
5 * or any future license endorsed by Mnemosyne LLC.
6 *
7 * $Id: peer-mgr.c 14666 2016-01-07 19:20:14Z mikedld $
8 */
9
10#include <assert.h>
11#include <errno.h> /* error codes ERANGE, ... */
12#include <limits.h> /* INT_MAX */
13#include <string.h> /* memcpy, memcmp, strstr */
14#include <stdlib.h> /* qsort */
15
16#include <event2/event.h>
17
18#include <libutp/utp.h>
19
20#include "transmission.h"
21#include "announcer.h"
22#include "bandwidth.h"
23#include "blocklist.h"
24#include "cache.h"
25#include "clients.h"
26#include "completion.h"
27#include "crypto-utils.h"
28#include "handshake.h"
29#include "log.h"
30#include "net.h"
31#include "peer-io.h"
32#include "peer-mgr.h"
33#include "peer-msgs.h"
34#include "ptrarray.h"
35#include "session.h"
36#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
37#include "torrent.h"
38#include "tr-utp.h"
39#include "utils.h"
40#include "webseed.h"
41
42enum
43{
44  /* how frequently to cull old atoms */
45  ATOM_PERIOD_MSEC = (60 * 1000),
46
47  /* how frequently to change which peers are choked */
48  RECHOKE_PERIOD_MSEC = (10 * 1000),
49
50  /* an optimistically unchoked peer is immune from rechoking
51     for this many calls to rechokeUploads (). */
52  OPTIMISTIC_UNCHOKE_MULTIPLIER = 4,
53
54  /* how frequently to reallocate bandwidth */
55  BANDWIDTH_PERIOD_MSEC = 500,
56
57  /* how frequently to age out old piece request lists */
58  REFILL_UPKEEP_PERIOD_MSEC = (10 * 1000),
59
60  /* how frequently to decide which peers live and die */
61  RECONNECT_PERIOD_MSEC = 500,
62
63  /* when many peers are available, keep idle ones this long */
64  MIN_UPLOAD_IDLE_SECS = (60),
65
66  /* when few peers are available, keep idle ones this long */
67  MAX_UPLOAD_IDLE_SECS = (60 * 5),
68
69  /* max number of peers to ask for per second overall.
70   * this throttle is to avoid overloading the router */
71  MAX_CONNECTIONS_PER_SECOND = 12,
72
73  MAX_CONNECTIONS_PER_PULSE = (int)(MAX_CONNECTIONS_PER_SECOND * (RECONNECT_PERIOD_MSEC/1000.0)),
74
75  /* number of bad pieces a peer is allowed to send before we ban them */
76  MAX_BAD_PIECES_PER_PEER = 5,
77
78  /* amount of time to keep a list of request pieces lying around
79     before it's considered too old and needs to be rebuilt */
80  PIECE_LIST_SHELF_LIFE_SECS = 60,
81
82  /* use for bitwise operations w/peer_atom.flags2 */
83  MYFLAG_BANNED = 1,
84
85  /* use for bitwise operations w/peer_atom.flags2 */
86  /* unreachable for now... but not banned.
87   * if they try to connect to us it's okay */
88  MYFLAG_UNREACHABLE = 2,
89
90  /* the minimum we'll wait before attempting to reconnect to a peer */
91  MINIMUM_RECONNECT_INTERVAL_SECS = 5,
92
93  /** how long we'll let requests we've made linger before we cancel them */
94  REQUEST_TTL_SECS = 90,
95
96  NO_BLOCKS_CANCEL_HISTORY = 120,
97
98  CANCEL_HISTORY_SEC = 60
99};
100
101const tr_peer_event TR_PEER_EVENT_INIT = { 0, 0, NULL, 0, 0, 0, 0 };
102
103const tr_swarm_stats TR_SWARM_STATS_INIT = { { 0, 0 }, 0, 0, { 0, 0, 0, 0, 0, 0, 0 } };
104
105/**
106***
107**/
108
109/**
110 * Peer information that should be kept even before we've connected and
111 * after we've disconnected. These are kept in a pool of peer_atoms to decide
112 * which ones would make good candidates for connecting to, and to watch out
113 * for banned peers.
114 *
115 * @see tr_peer
116 * @see tr_peerMsgs
117 */
118struct peer_atom
119{
120  uint8_t     fromFirst;          /* where the peer was first found */
121  uint8_t     fromBest;           /* the "best" value of where the peer has been found */
122  uint8_t     flags;              /* these match the added_f flags */
123  uint8_t     flags2;             /* flags that aren't defined in added_f */
124  int8_t      seedProbability;    /* how likely is this to be a seed... [0..100] or -1 for unknown */
125  int8_t      blocklisted;        /* -1 for unknown, true for blocklisted, false for not blocklisted */
126
127  tr_port     port;
128  bool        utp_failed;         /* We recently failed to connect over uTP */
129  uint16_t    numFails;
130  time_t      time;               /* when the peer's connection status last changed */
131  time_t      piece_data_time;
132
133  time_t      lastConnectionAttemptAt;
134  time_t      lastConnectionAt;
135
136  /* similar to a TTL field, but less rigid --
137   * if the swarm is small, the atom will be kept past this date. */
138  time_t      shelf_date;
139  tr_peer   * peer;               /* will be NULL if not connected */
140  tr_address  addr;
141};
142
143#ifdef NDEBUG
144#define tr_isAtom(a) (TRUE)
145#else
146static bool
147tr_isAtom (const struct peer_atom * atom)
148{
149  return (atom != NULL)
150      && (atom->fromFirst < TR_PEER_FROM__MAX)
151      && (atom->fromBest < TR_PEER_FROM__MAX)
152      && (tr_address_is_valid (&atom->addr));
153}
154#endif
155
156static const char*
157tr_atomAddrStr (const struct peer_atom * atom)
158{
159  return atom ? tr_peerIoAddrStr (&atom->addr, atom->port) : "[no atom]";
160}
161
162struct block_request
163{
164  tr_block_index_t block;
165  tr_peer * peer;
166  time_t sentAt;
167};
168
169struct weighted_piece
170{
171  tr_piece_index_t index;
172  int16_t salt;
173  int16_t requestCount;
174};
175
176enum piece_sort_state
177{
178  PIECES_UNSORTED,
179  PIECES_SORTED_BY_INDEX,
180  PIECES_SORTED_BY_WEIGHT
181};
182
183/** @brief Opaque, per-torrent data structure for peer connection information */
184typedef struct tr_swarm
185{
186  tr_swarm_stats             stats;
187
188  tr_ptrArray                outgoingHandshakes; /* tr_handshake */
189  tr_ptrArray                pool; /* struct peer_atom */
190  tr_ptrArray                peers; /* tr_peerMsgs */
191  tr_ptrArray                webseeds; /* tr_webseed */
192
193  tr_torrent               * tor;
194  struct tr_peerMgr        * manager;
195
196  tr_peerMsgs              * optimistic; /* the optimistic peer, or NULL if none */
197  int                        optimisticUnchokeTimeScaler;
198
199  bool                       isRunning;
200  bool                       needsCompletenessCheck;
201
202  struct block_request     * requests;
203  int                        requestCount;
204  int                        requestAlloc;
205
206  struct weighted_piece    * pieces;
207  int                        pieceCount;
208  enum piece_sort_state      pieceSortState;
209
210  /* An array of pieceCount items stating how many peers have each piece.
211     This is used to help us for downloading pieces "rarest first."
212     This may be NULL if we don't have metainfo yet, or if we're not
213     downloading and don't care about rarity */
214  uint16_t                 * pieceReplication;
215  size_t                     pieceReplicationSize;
216
217  int                        interestedCount;
218  int                        maxPeers;
219  time_t                     lastCancel;
220
221  /* Before the endgame this should be 0. In endgame, is contains the average
222   * number of pending requests per peer. Only peers which have more pending
223   * requests are considered 'fast' are allowed to request a block that's
224   * already been requested from another (slower?) peer. */
225  int                        endgame;
226}
227tr_swarm;
228
229struct tr_peerMgr
230{
231  tr_session    * session;
232  tr_ptrArray     incomingHandshakes; /* tr_handshake */
233  struct event  * bandwidthTimer;
234  struct event  * rechokeTimer;
235  struct event  * refillUpkeepTimer;
236  struct event  * atomTimer;
237};
238
239#define tordbg(t, ...) \
240  do \
241    { \
242      if (tr_logGetDeepEnabled ()) \
243        tr_logAddDeep (__FILE__, __LINE__, tr_torrentName (t->tor), __VA_ARGS__); \
244    } \
245  while (0)
246
247#define dbgmsg(...) \
248  do \
249    { \
250      if (tr_logGetDeepEnabled ()) \
251        tr_logAddDeep (__FILE__, __LINE__, NULL, __VA_ARGS__); \
252    } \
253  while (0)
254
255/**
256*** tr_peer virtual functions
257**/
258
259static bool
260tr_peerIsTransferringPieces (const tr_peer * peer,
261                             uint64_t        now,
262                             tr_direction    direction,
263                             unsigned int  * Bps)
264{
265  assert (peer != NULL);
266  assert (peer->funcs != NULL);
267
268  return (*peer->funcs->is_transferring_pieces)(peer, now, direction, Bps);
269}
270
271unsigned int
272tr_peerGetPieceSpeed_Bps (const tr_peer    * peer,
273                          uint64_t           now,
274                          tr_direction       direction)
275{
276  unsigned int Bps = 0;
277  tr_peerIsTransferringPieces (peer, now, direction, &Bps);
278  return Bps;
279}
280
281static void
282tr_peerFree (tr_peer * peer)
283{
284  assert (peer != NULL);
285  assert (peer->funcs != NULL);
286
287  (*peer->funcs->destruct)(peer);
288
289  tr_free (peer);
290}
291
292void
293tr_peerConstruct (tr_peer * peer, const tr_torrent * tor)
294{
295  assert (peer != NULL);
296  assert (tr_isTorrent (tor));
297
298  memset (peer, 0, sizeof (tr_peer));
299
300  peer->client = TR_KEY_NONE;
301  peer->swarm = tor->swarm;
302  tr_bitfieldConstruct (&peer->have, tor->info.pieceCount);
303  tr_bitfieldConstruct (&peer->blame, tor->blockCount);
304}
305
306static void peerDeclinedAllRequests (tr_swarm *, const tr_peer *);
307
308void
309tr_peerDestruct (tr_peer * peer)
310{
311  assert (peer != NULL);
312
313  if (peer->swarm != NULL)
314    peerDeclinedAllRequests (peer->swarm, peer);
315
316  tr_bitfieldDestruct (&peer->have);
317  tr_bitfieldDestruct (&peer->blame);
318
319  if (peer->atom)
320    peer->atom->peer = NULL;
321}
322
323/**
324***
325**/
326
327static inline void
328managerLock (const struct tr_peerMgr * manager)
329{
330  tr_sessionLock (manager->session);
331}
332
333static inline void
334managerUnlock (const struct tr_peerMgr * manager)
335{
336  tr_sessionUnlock (manager->session);
337}
338
339static inline void
340swarmLock (tr_swarm * swarm)
341{
342  managerLock (swarm->manager);
343}
344
345static inline void
346swarmUnlock (tr_swarm * swarm)
347{
348  managerUnlock (swarm->manager);
349}
350
351#ifndef NDEBUG
352
353static inline int
354swarmIsLocked (const tr_swarm * swarm)
355{
356  return tr_sessionIsLocked (swarm->manager->session);
357}
358
359#endif /* NDEBUG */
360
361/**
362***
363**/
364
365static int
366handshakeCompareToAddr (const void * va, const void * vb)
367{
368  const tr_handshake * a = va;
369
370  return tr_address_compare (tr_handshakeGetAddr (a, NULL), vb);
371}
372
373static int
374handshakeCompare (const void * a, const void * b)
375{
376  return handshakeCompareToAddr (a, tr_handshakeGetAddr (b, NULL));
377}
378
379static inline tr_handshake*
380getExistingHandshake (tr_ptrArray * handshakes, const tr_address * addr)
381{
382  if (tr_ptrArrayEmpty (handshakes))
383    return NULL;
384
385  return tr_ptrArrayFindSorted (handshakes, addr, handshakeCompareToAddr);
386}
387
388static int
389comparePeerAtomToAddress (const void * va, const void * vb)
390{
391  const struct peer_atom * a = va;
392
393  return tr_address_compare (&a->addr, vb);
394}
395
396static int
397compareAtomsByAddress (const void * va, const void * vb)
398{
399  const struct peer_atom * b = vb;
400
401  assert (tr_isAtom (b));
402
403  return comparePeerAtomToAddress (va, &b->addr);
404}
405
406/**
407***
408**/
409
410const tr_address *
411tr_peerAddress (const tr_peer * peer)
412{
413  return &peer->atom->addr;
414}
415
416static tr_swarm *
417getExistingSwarm (tr_peerMgr *    manager,
418                  const uint8_t * hash)
419{
420  tr_torrent * tor = tr_torrentFindFromHash (manager->session, hash);
421
422  return tor == NULL ? NULL : tor->swarm;
423}
424
425static int
426peerCompare (const void * a, const void * b)
427{
428  return tr_address_compare (tr_peerAddress (a), tr_peerAddress (b));
429}
430
431static struct peer_atom*
432getExistingAtom (const tr_swarm   * cswarm,
433                 const tr_address * addr)
434{
435  tr_swarm * swarm = (tr_swarm*) cswarm;
436  return tr_ptrArrayFindSorted (&swarm->pool, addr, comparePeerAtomToAddress);
437}
438
439static bool
440peerIsInUse (const tr_swarm * cs, const struct peer_atom * atom)
441{
442  tr_swarm * s = (tr_swarm*) cs;
443
444  assert (swarmIsLocked (s));
445
446  return (atom->peer != NULL)
447      || getExistingHandshake (&s->outgoingHandshakes, &atom->addr)
448      || getExistingHandshake (&s->manager->incomingHandshakes, &atom->addr);
449}
450
451static inline bool
452replicationExists (const tr_swarm * s)
453{
454  return s->pieceReplication != NULL;
455}
456
457static void
458replicationFree (tr_swarm * s)
459{
460  tr_free (s->pieceReplication);
461  s->pieceReplication = NULL;
462  s->pieceReplicationSize = 0;
463}
464
465static void
466replicationNew (tr_swarm * s)
467{
468  tr_piece_index_t piece_i;
469  const tr_piece_index_t piece_count = s->tor->info.pieceCount;
470  const int n = tr_ptrArraySize (&s->peers);
471
472  assert (!replicationExists (s));
473
474  s->pieceReplicationSize = piece_count;
475  s->pieceReplication = tr_new0 (uint16_t, piece_count);
476
477  for (piece_i=0; piece_i<piece_count; ++piece_i)
478    {
479      int peer_i;
480      uint16_t r = 0;
481
482      for (peer_i=0; peer_i<n; ++peer_i)
483        {
484          tr_peer * peer = tr_ptrArrayNth (&s->peers, peer_i);
485          if (tr_bitfieldHas (&peer->have, piece_i))
486            ++r;
487        }
488
489      s->pieceReplication[piece_i] = r;
490    }
491}
492
493static void
494swarmFree (void * vs)
495{
496  tr_swarm * s = vs;
497
498  assert (s);
499  assert (!s->isRunning);
500  assert (swarmIsLocked (s));
501  assert (tr_ptrArrayEmpty (&s->outgoingHandshakes));
502  assert (tr_ptrArrayEmpty (&s->peers));
503
504  tr_ptrArrayDestruct (&s->webseeds, (PtrArrayForeachFunc)tr_peerFree);
505  tr_ptrArrayDestruct (&s->pool, (PtrArrayForeachFunc)tr_free);
506  tr_ptrArrayDestruct (&s->outgoingHandshakes, NULL);
507  tr_ptrArrayDestruct (&s->peers, NULL);
508  s->stats = TR_SWARM_STATS_INIT;
509
510  replicationFree (s);
511
512  tr_free (s->requests);
513  tr_free (s->pieces);
514  tr_free (s);
515}
516
517static void peerCallbackFunc (tr_peer *, const tr_peer_event *, void *);
518
519static void
520rebuildWebseedArray (tr_swarm * s, tr_torrent * tor)
521{
522  unsigned int i;
523  const tr_info * inf = &tor->info;
524
525  /* clear the array */
526  tr_ptrArrayDestruct (&s->webseeds, (PtrArrayForeachFunc)tr_peerFree);
527  s->webseeds = TR_PTR_ARRAY_INIT;
528  s->stats.activeWebseedCount = 0;
529
530  /* repopulate it */
531  for (i=0; i<inf->webseedCount; ++i)
532    {
533      tr_webseed * w = tr_webseedNew (tor, inf->webseeds[i], peerCallbackFunc, s);
534      tr_ptrArrayAppend (&s->webseeds, w);
535    }
536}
537
538static tr_swarm *
539swarmNew (tr_peerMgr * manager, tr_torrent * tor)
540{
541  tr_swarm * s;
542
543  s = tr_new0 (tr_swarm, 1);
544  s->manager = manager;
545  s->tor = tor;
546  s->pool = TR_PTR_ARRAY_INIT;
547  s->peers = TR_PTR_ARRAY_INIT;
548  s->webseeds = TR_PTR_ARRAY_INIT;
549  s->outgoingHandshakes = TR_PTR_ARRAY_INIT;
550
551  rebuildWebseedArray (s, tor);
552
553  return s;
554}
555
556static void ensureMgrTimersExist (struct tr_peerMgr * m);
557
558tr_peerMgr*
559tr_peerMgrNew (tr_session * session)
560{
561  tr_peerMgr * m = tr_new0 (tr_peerMgr, 1);
562  m->session = session;
563  m->incomingHandshakes = TR_PTR_ARRAY_INIT;
564  ensureMgrTimersExist (m);
565  return m;
566}
567
568static void
569deleteTimer (struct event ** t)
570{
571  if (*t != NULL)
572    {
573      event_free (*t);
574      *t = NULL;
575    }
576}
577
578static void
579deleteTimers (struct tr_peerMgr * m)
580{
581  deleteTimer (&m->atomTimer);
582  deleteTimer (&m->bandwidthTimer);
583  deleteTimer (&m->rechokeTimer);
584  deleteTimer (&m->refillUpkeepTimer);
585}
586
587void
588tr_peerMgrFree (tr_peerMgr * manager)
589{
590  managerLock (manager);
591
592  deleteTimers (manager);
593
594  /* free the handshakes. Abort invokes handshakeDoneCB (), which removes
595   * the item from manager->handshakes, so this is a little roundabout... */
596  while (!tr_ptrArrayEmpty (&manager->incomingHandshakes))
597    tr_handshakeAbort (tr_ptrArrayNth (&manager->incomingHandshakes, 0));
598
599  tr_ptrArrayDestruct (&manager->incomingHandshakes, NULL);
600
601  managerUnlock (manager);
602  tr_free (manager);
603}
604
605/***
606****
607***/
608
609void
610tr_peerMgrOnBlocklistChanged (tr_peerMgr * mgr)
611{
612  tr_torrent * tor = NULL;
613  tr_session * session = mgr->session;
614
615  /* we cache whether or not a peer is blocklisted...
616     since the blocklist has changed, erase that cached value */
617  while ((tor = tr_torrentNext (session, tor)))
618    {
619      int i;
620      tr_swarm * s = tor->swarm;
621      const int n = tr_ptrArraySize (&s->pool);
622      for (i=0; i<n; ++i)
623        {
624          struct peer_atom * atom = tr_ptrArrayNth (&s->pool, i);
625          atom->blocklisted = -1;
626        }
627    }
628}
629
630static bool
631isAtomBlocklisted (tr_session * session, struct peer_atom * atom)
632{
633  if (atom->blocklisted < 0)
634    atom->blocklisted = tr_sessionIsAddressBlocked (session, &atom->addr);
635
636  assert (tr_isBool (atom->blocklisted));
637  return atom->blocklisted;
638}
639
640
641/***
642****
643***/
644
645static void
646atomSetSeedProbability (struct peer_atom * atom, int seedProbability)
647{
648  assert (atom != NULL);
649  assert (-1<=seedProbability && seedProbability<=100);
650
651  atom->seedProbability = seedProbability;
652
653  if (seedProbability == 100)
654    atom->flags |= ADDED_F_SEED_FLAG;
655  else if (seedProbability != -1)
656    atom->flags &= ~ADDED_F_SEED_FLAG;
657}
658
659static inline bool
660atomIsSeed (const struct peer_atom * atom)
661{
662  return atom->seedProbability == 100;
663}
664
665static void
666atomSetSeed (const tr_swarm * s, struct peer_atom * atom)
667{
668  if (!atomIsSeed (atom))
669    {
670      tordbg (s, "marking peer %s as a seed", tr_atomAddrStr (atom));
671
672      atomSetSeedProbability (atom, 100);
673    }
674}
675
676
677bool
678tr_peerMgrPeerIsSeed (const tr_torrent  * tor,
679                      const tr_address  * addr)
680{
681  bool isSeed = false;
682  const tr_swarm * s = tor->swarm;
683  const struct peer_atom * atom = getExistingAtom (s, addr);
684
685  if (atom)
686    isSeed = atomIsSeed (atom);
687
688  return isSeed;
689}
690
691void
692tr_peerMgrSetUtpSupported (tr_torrent * tor, const tr_address * addr)
693{
694  struct peer_atom * atom = getExistingAtom (tor->swarm, addr);
695
696  if (atom)
697    atom->flags |= ADDED_F_UTP_FLAGS;
698}
699
700void
701tr_peerMgrSetUtpFailed (tr_torrent *tor, const tr_address *addr, bool failed)
702{
703  struct peer_atom * atom = getExistingAtom (tor->swarm, addr);
704
705  if (atom)
706    atom->utp_failed = failed;
707}
708
709
710/**
711***  REQUESTS
712***
713*** There are two data structures associated with managing block requests:
714***
715*** 1. tr_swarm::requests, an array of "struct block_request" which keeps
716***    track of which blocks have been requested, and when, and by which peers.
717***    This is list is used for (a) cancelling requests that have been pending
718***    for too long and (b) avoiding duplicate requests before endgame.
719***
720*** 2. tr_swarm::pieces, an array of "struct weighted_piece" which lists the
721***    pieces that we want to request. It's used to decide which blocks to
722***    return next when tr_peerMgrGetBlockRequests () is called.
723**/
724
725/**
726*** struct block_request
727**/
728
729static int
730compareReqByBlock (const void * va, const void * vb)
731{
732  const struct block_request * a = va;
733  const struct block_request * b = vb;
734
735  /* primary key: block */
736  if (a->block < b->block) return -1;
737  if (a->block > b->block) return 1;
738
739  /* secondary key: peer */
740  if (a->peer < b->peer) return -1;
741  if (a->peer > b->peer) return 1;
742
743  return 0;
744}
745
746static void
747requestListAdd (tr_swarm * s, tr_block_index_t block, tr_peer * peer)
748{
749  struct block_request key;
750
751  /* ensure enough room is available... */
752  if (s->requestCount + 1 >= s->requestAlloc)
753    {
754      const int CHUNK_SIZE = 128;
755      s->requestAlloc += CHUNK_SIZE;
756      s->requests = tr_renew (struct block_request,
757                              s->requests, s->requestAlloc);
758    }
759
760  /* populate the record we're inserting */
761  key.block = block;
762  key.peer = peer;
763  key.sentAt = tr_time ();
764
765  /* insert the request to our array... */
766  {
767    bool exact;
768    const int pos = tr_lowerBound (&key, s->requests, s->requestCount,
769                                   sizeof (struct block_request),
770                                   compareReqByBlock, &exact);
771    assert (!exact);
772    memmove (s->requests + pos + 1,
773             s->requests + pos,
774             sizeof (struct block_request) * (s->requestCount++ - pos));
775    s->requests[pos] = key;
776  }
777
778  if (peer != NULL)
779    {
780      ++peer->pendingReqsToPeer;
781      assert (peer->pendingReqsToPeer >= 0);
782    }
783
784  /*fprintf (stderr, "added request of block %lu from peer %s... "
785                     "there are now %d block\n",
786                     (unsigned long)block, tr_atomAddrStr (peer->atom), s->requestCount);*/
787}
788
789static struct block_request *
790requestListLookup (tr_swarm * s, tr_block_index_t block, const tr_peer * peer)
791{
792  struct block_request key;
793  key.block = block;
794  key.peer = (tr_peer*) peer;
795
796  return bsearch (&key, s->requests, s->requestCount,
797                  sizeof (struct block_request),
798                  compareReqByBlock);
799}
800
801/**
802 * Find the peers are we currently requesting the block
803 * with index @a block from and append them to @a peerArr.
804 */
805static void
806getBlockRequestPeers (tr_swarm * s, tr_block_index_t block,
807                      tr_ptrArray * peerArr)
808{
809  bool exact;
810  int i, pos;
811  struct block_request key;
812
813  key.block = block;
814  key.peer = NULL;
815  pos = tr_lowerBound (&key, s->requests, s->requestCount,
816                       sizeof (struct block_request),
817                       compareReqByBlock, &exact);
818
819  assert (!exact); /* shouldn't have a request with .peer == NULL */
820
821  for (i=pos; i<s->requestCount; ++i)
822    {
823      if (s->requests[i].block != block)
824        break;
825      tr_ptrArrayAppend (peerArr, s->requests[i].peer);
826    }
827}
828
829static void
830decrementPendingReqCount (const struct block_request * b)
831{
832  if (b->peer != NULL)
833    if (b->peer->pendingReqsToPeer > 0)
834      --b->peer->pendingReqsToPeer;
835}
836
837static void
838requestListRemove (tr_swarm * s, tr_block_index_t block, const tr_peer * peer)
839{
840  const struct block_request * b = requestListLookup (s, block, peer);
841
842  if (b != NULL)
843    {
844      const int pos = b - s->requests;
845      assert (pos < s->requestCount);
846
847      decrementPendingReqCount (b);
848
849      tr_removeElementFromArray (s->requests,
850                                 pos,
851                                 sizeof (struct block_request),
852                                 s->requestCount--);
853
854      /*fprintf (stderr, "removing request of block %lu from peer %s... "
855                         "there are now %d block requests left\n",
856                         (unsigned long)block, tr_atomAddrStr (peer->atom), t->requestCount);*/
857    }
858}
859
860static int
861countActiveWebseeds (tr_swarm * s)
862{
863  int activeCount = 0;
864
865  if (s->tor->isRunning && !tr_torrentIsSeed (s->tor))
866    {
867      int i;
868      const int n = tr_ptrArraySize (&s->webseeds);
869      const uint64_t now = tr_time_msec ();
870
871      for (i=0; i<n; ++i)
872        if (tr_peerIsTransferringPieces (tr_ptrArrayNth(&s->webseeds,i), now, TR_DOWN, NULL))
873          ++activeCount;
874    }
875
876  return activeCount;
877}
878
879static bool
880testForEndgame (const tr_swarm * s)
881{
882  /* we consider ourselves to be in endgame if the number of bytes
883     we've got requested is >= the number of bytes left to download */
884  return ((uint64_t) s->requestCount * s->tor->blockSize)
885               >= tr_torrentGetLeftUntilDone (s->tor);
886}
887
888static void
889updateEndgame (tr_swarm * s)
890{
891  assert (s->requestCount >= 0);
892
893  if (!testForEndgame (s))
894    {
895      /* not in endgame */
896      s->endgame = 0;
897    }
898  else if (!s->endgame) /* only recalculate when endgame first begins */
899    {
900      int i;
901      int numDownloading = 0;
902      const int n = tr_ptrArraySize (&s->peers);
903
904      /* add the active bittorrent peers... */
905      for (i=0; i<n; ++i)
906        {
907          const tr_peer * p = tr_ptrArrayNth (&s->peers, i);
908          if (p->pendingReqsToPeer > 0)
909            ++numDownloading;
910        }
911
912      /* add the active webseeds... */
913      numDownloading += countActiveWebseeds (s);
914
915      /* average number of pending requests per downloading peer */
916      s->endgame = s->requestCount / MAX (numDownloading, 1);
917    }
918}
919
920
921/****
922*****
923*****  Piece List Manipulation / Accessors
924*****
925****/
926
927static inline void
928invalidatePieceSorting (tr_swarm * s)
929{
930  s->pieceSortState = PIECES_UNSORTED;
931}
932
933static const tr_torrent * weightTorrent;
934
935static const uint16_t * weightReplication;
936
937static void
938setComparePieceByWeightTorrent (tr_swarm * s)
939{
940  if (!replicationExists (s))
941    replicationNew (s);
942
943  weightTorrent = s->tor;
944  weightReplication = s->pieceReplication;
945}
946
947/* we try to create a "weight" s.t. high-priority pieces come before others,
948 * and that partially-complete pieces come before empty ones. */
949static int
950comparePieceByWeight (const void * va, const void * vb)
951{
952  const struct weighted_piece * a = va;
953  const struct weighted_piece * b = vb;
954  int ia, ib, missing, pending;
955  const tr_torrent * tor = weightTorrent;
956  const uint16_t * rep = weightReplication;
957
958  /* primary key: weight */
959  missing = tr_torrentMissingBlocksInPiece (tor, a->index);
960  pending = a->requestCount;
961  ia = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
962  missing = tr_torrentMissingBlocksInPiece (tor, b->index);
963  pending = b->requestCount;
964  ib = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
965  if (ia < ib) return -1;
966  if (ia > ib) return 1;
967
968  /* secondary key: higher priorities go first */
969  ia = tor->info.pieces[a->index].priority;
970  ib = tor->info.pieces[b->index].priority;
971  if (ia > ib) return -1;
972  if (ia < ib) return 1;
973
974  /* tertiary key: rarest first. */
975  ia = rep[a->index];
976  ib = rep[b->index];
977  if (ia < ib) return -1;
978  if (ia > ib) return 1;
979
980  /* quaternary key: random */
981  if (a->salt < b->salt) return -1;
982  if (a->salt > b->salt) return 1;
983
984  /* okay, they're equal */
985  return 0;
986}
987
988static int
989comparePieceByIndex (const void * va, const void * vb)
990{
991  const struct weighted_piece * a = va;
992  const struct weighted_piece * b = vb;
993  if (a->index < b->index) return -1;
994  if (a->index > b->index) return 1;
995  return 0;
996}
997
998static void
999pieceListSort (tr_swarm * s, enum piece_sort_state state)
1000{
1001  assert (state==PIECES_SORTED_BY_INDEX
1002       || state==PIECES_SORTED_BY_WEIGHT);
1003
1004  if (state == PIECES_SORTED_BY_WEIGHT)
1005    {
1006      setComparePieceByWeightTorrent (s);
1007      qsort (s->pieces, s->pieceCount, sizeof (struct weighted_piece), comparePieceByWeight);
1008    }
1009  else
1010    {
1011      qsort (s->pieces, s->pieceCount, sizeof (struct weighted_piece), comparePieceByIndex);
1012    }
1013
1014  s->pieceSortState = state;
1015}
1016
1017/**
1018 * These functions are useful for testing, but too expensive for nightly builds.
1019 * let's leave it disabled but add an easy hook to compile it back in
1020 */
1021#if 1
1022#define assertWeightedPiecesAreSorted(t)
1023#define assertReplicationCountIsExact(t)
1024#else
1025static void
1026assertWeightedPiecesAreSorted (Torrent * t)
1027{
1028    if (!t->endgame)
1029    {
1030        int i;
1031        setComparePieceByWeightTorrent (t);
1032        for (i=0; i<t->pieceCount-1; ++i)
1033            assert (comparePieceByWeight (&t->pieces[i], &t->pieces[i+1]) <= 0);
1034    }
1035}
1036static void
1037assertReplicationCountIsExact (Torrent * t)
1038{
1039    /* This assert might fail due to errors of implementations in other
1040     * clients. It happens when receiving duplicate bitfields/HaveAll/HaveNone
1041     * from a client. If a such a behavior is noticed,
1042     * a bug report should be filled to the faulty client. */
1043
1044    size_t piece_i;
1045    const uint16_t * rep = t->pieceReplication;
1046    const size_t piece_count = t->pieceReplicationSize;
1047    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase (&t->peers);
1048    const int peer_count = tr_ptrArraySize (&t->peers);
1049
1050    assert (piece_count == t->tor->info.pieceCount);
1051
1052    for (piece_i=0; piece_i<piece_count; ++piece_i)
1053    {
1054        int peer_i;
1055        uint16_t r = 0;
1056
1057        for (peer_i=0; peer_i<peer_count; ++peer_i)
1058            if (tr_bitsetHas (&peers[peer_i]->have, piece_i))
1059                ++r;
1060
1061        assert (rep[piece_i] == r);
1062    }
1063}
1064#endif
1065
1066static struct weighted_piece *
1067pieceListLookup (tr_swarm * s, tr_piece_index_t index)
1068{
1069  int i;
1070
1071  for (i=0; i<s->pieceCount; ++i)
1072    if (s->pieces[i].index == index)
1073      return &s->pieces[i];
1074
1075  return NULL;
1076}
1077
1078static void
1079pieceListRebuild (tr_swarm * s)
1080{
1081  if (!tr_torrentIsSeed (s->tor))
1082    {
1083      tr_piece_index_t i;
1084      tr_piece_index_t * pool;
1085      tr_piece_index_t poolCount = 0;
1086      const tr_torrent * tor = s->tor;
1087      const tr_info * inf = tr_torrentInfo (tor);
1088      struct weighted_piece * pieces;
1089      int pieceCount;
1090
1091      /* build the new list */
1092      pool = tr_new (tr_piece_index_t, inf->pieceCount);
1093      for (i=0; i<inf->pieceCount; ++i)
1094        if (!inf->pieces[i].dnd)
1095          if (!tr_torrentPieceIsComplete (tor, i))
1096            pool[poolCount++] = i;
1097      pieceCount = poolCount;
1098      pieces = tr_new0 (struct weighted_piece, pieceCount);
1099      for (i=0; i<poolCount; ++i)
1100        {
1101          struct weighted_piece * piece = pieces + i;
1102          piece->index = pool[i];
1103          piece->requestCount = 0;
1104          piece->salt = tr_rand_int_weak (4096);
1105        }
1106
1107      /* if we already had a list of pieces, merge it into
1108       * the new list so we don't lose its requestCounts */
1109      if (s->pieces != NULL)
1110        {
1111          struct weighted_piece * o = s->pieces;
1112          struct weighted_piece * oend = o + s->pieceCount;
1113          struct weighted_piece * n = pieces;
1114          struct weighted_piece * nend = n + pieceCount;
1115
1116          pieceListSort (s, PIECES_SORTED_BY_INDEX);
1117
1118          while (o!=oend && n!=nend)
1119            {
1120              if (o->index < n->index)
1121                ++o;
1122              else if (o->index > n->index)
1123                ++n;
1124              else
1125                *n++ = *o++;
1126            }
1127
1128          tr_free (s->pieces);
1129        }
1130
1131      s->pieces = pieces;
1132      s->pieceCount = pieceCount;
1133
1134      pieceListSort (s, PIECES_SORTED_BY_WEIGHT);
1135
1136      /* cleanup */
1137      tr_free (pool);
1138    }
1139}
1140
1141static void
1142pieceListRemovePiece (tr_swarm * s, tr_piece_index_t piece)
1143{
1144  struct weighted_piece * p;
1145
1146  if ((p = pieceListLookup (s, piece)))
1147    {
1148      const int pos = p - s->pieces;
1149
1150      tr_removeElementFromArray (s->pieces,
1151                                 pos,
1152                                 sizeof (struct weighted_piece),
1153                                 s->pieceCount--);
1154
1155      if (s->pieceCount == 0)
1156        {
1157          tr_free (s->pieces);
1158          s->pieces = NULL;
1159        }
1160    }
1161}
1162
1163static void
1164pieceListResortPiece (tr_swarm * s, struct weighted_piece * p)
1165{
1166  int pos;
1167  bool isSorted = true;
1168
1169  if (p == NULL)
1170    return;
1171
1172  /* is the torrent already sorted? */
1173  pos = p - s->pieces;
1174  setComparePieceByWeightTorrent (s);
1175  if (isSorted && (pos > 0) && (comparePieceByWeight (p-1, p) > 0))
1176    isSorted = false;
1177  if (isSorted && (pos < s->pieceCount - 1) && (comparePieceByWeight (p, p+1) > 0))
1178    isSorted = false;
1179
1180  if (s->pieceSortState != PIECES_SORTED_BY_WEIGHT)
1181    {
1182      pieceListSort (s, PIECES_SORTED_BY_WEIGHT);
1183      isSorted = true;
1184    }
1185
1186  /* if it's not sorted, move it around */
1187  if (!isSorted)
1188    {
1189      bool exact;
1190      const struct weighted_piece tmp = *p;
1191
1192      tr_removeElementFromArray (s->pieces,
1193                                 pos,
1194                                 sizeof (struct weighted_piece),
1195                                 s->pieceCount--);
1196
1197      pos = tr_lowerBound (&tmp, s->pieces, s->pieceCount,
1198                           sizeof (struct weighted_piece),
1199                           comparePieceByWeight, &exact);
1200
1201      memmove (&s->pieces[pos + 1],
1202               &s->pieces[pos],
1203               sizeof (struct weighted_piece) * (s->pieceCount++ - pos));
1204
1205      s->pieces[pos] = tmp;
1206    }
1207
1208  assertWeightedPiecesAreSorted (s);
1209}
1210
1211static void
1212pieceListRemoveRequest (tr_swarm * s, tr_block_index_t block)
1213{
1214  struct weighted_piece * p;
1215  const tr_piece_index_t index = tr_torBlockPiece (s->tor, block);
1216
1217  if (((p = pieceListLookup (s, index))) && (p->requestCount > 0))
1218    {
1219      --p->requestCount;
1220      pieceListResortPiece (s, p);
1221    }
1222}
1223
1224
1225/****
1226*****
1227*****  Replication count (for rarest first policy)
1228*****
1229****/
1230
1231/**
1232 * Increase the replication count of this piece and sort it if the
1233 * piece list is already sorted
1234 */
1235static void
1236tr_incrReplicationOfPiece (tr_swarm * s, const size_t index)
1237{
1238  assert (replicationExists (s));
1239  assert (s->pieceReplicationSize == s->tor->info.pieceCount);
1240
1241  /* One more replication of this piece is present in the swarm */
1242  ++s->pieceReplication[index];
1243
1244  /* we only resort the piece if the list is already sorted */
1245  if (s->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1246    pieceListResortPiece (s, pieceListLookup (s, index));
1247}
1248
1249/**
1250 * Increases the replication count of pieces present in the bitfield
1251 */
1252static void
1253tr_incrReplicationFromBitfield (tr_swarm * s, const tr_bitfield * b)
1254{
1255  size_t i;
1256  uint16_t * rep = s->pieceReplication;
1257  const size_t n = s->tor->info.pieceCount;
1258
1259  assert (replicationExists (s));
1260
1261  for (i=0; i<n; ++i)
1262    if (tr_bitfieldHas (b, i))
1263      ++rep[i];
1264
1265  if (s->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1266    invalidatePieceSorting (s);
1267}
1268
1269/**
1270 * Increase the replication count of every piece
1271 */
1272static void
1273tr_incrReplication (tr_swarm * s)
1274{
1275  int i;
1276  const int n = s->pieceReplicationSize;
1277
1278  assert (replicationExists (s));
1279  assert (s->pieceReplicationSize == s->tor->info.pieceCount);
1280
1281  for (i=0; i<n; ++i)
1282    ++s->pieceReplication[i];
1283}
1284
1285/**
1286 * Decrease the replication count of pieces present in the bitset.
1287 */
1288static void
1289tr_decrReplicationFromBitfield (tr_swarm * s, const tr_bitfield * b)
1290{
1291  int i;
1292  const int n = s->pieceReplicationSize;
1293
1294  assert (replicationExists (s));
1295  assert (s->pieceReplicationSize == s->tor->info.pieceCount);
1296
1297  if (tr_bitfieldHasAll (b))
1298    {
1299      for (i=0; i<n; ++i)
1300        --s->pieceReplication[i];
1301    }
1302  else if (!tr_bitfieldHasNone (b))
1303    {
1304      for (i=0; i<n; ++i)
1305        if (tr_bitfieldHas (b, i))
1306          --s->pieceReplication[i];
1307
1308      if (s->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1309        invalidatePieceSorting (s);
1310    }
1311}
1312
1313/**
1314***
1315**/
1316
1317void
1318tr_peerMgrRebuildRequests (tr_torrent * tor)
1319{
1320  assert (tr_isTorrent (tor));
1321
1322  pieceListRebuild (tor->swarm);
1323}
1324
1325void
1326tr_peerMgrGetNextRequests (tr_torrent           * tor,
1327                           tr_peer              * peer,
1328                           int                    numwant,
1329                           tr_block_index_t     * setme,
1330                           int                  * numgot,
1331                           bool                   get_intervals)
1332{
1333  int i;
1334  int got;
1335  tr_swarm * s;
1336  struct weighted_piece * pieces;
1337  const tr_bitfield * const have = &peer->have;
1338
1339  /* sanity clause */
1340  assert (tr_isTorrent (tor));
1341  assert (numwant > 0);
1342
1343  /* walk through the pieces and find blocks that should be requested */
1344  got = 0;
1345  s = tor->swarm;
1346
1347  /* prep the pieces list */
1348  if (s->pieces == NULL)
1349    pieceListRebuild (s);
1350
1351  if (s->pieceSortState != PIECES_SORTED_BY_WEIGHT)
1352    pieceListSort (s, PIECES_SORTED_BY_WEIGHT);
1353
1354  assertReplicationCountIsExact (s);
1355  assertWeightedPiecesAreSorted (s);
1356
1357  updateEndgame (s);
1358  pieces = s->pieces;
1359  for (i=0; i<s->pieceCount && got<numwant; ++i)
1360    {
1361      struct weighted_piece * p = pieces + i;
1362
1363      /* if the peer has this piece that we want... */
1364      if (tr_bitfieldHas (have, p->index))
1365        {
1366          tr_block_index_t b;
1367          tr_block_index_t first;
1368          tr_block_index_t last;
1369          tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1370
1371          tr_torGetPieceBlockRange (tor, p->index, &first, &last);
1372
1373          for (b=first; b<=last && (got<numwant || (get_intervals && setme[2*got-1] == b-1)); ++b)
1374            {
1375              int peerCount;
1376              tr_peer ** peers;
1377
1378              /* don't request blocks we've already got */
1379              if (tr_torrentBlockIsComplete (tor, b))
1380                continue;
1381
1382              /* always add peer if this block has no peers yet */
1383              tr_ptrArrayClear (&peerArr);
1384              getBlockRequestPeers (s, b, &peerArr);
1385              peers = (tr_peer **) tr_ptrArrayPeek (&peerArr, &peerCount);
1386              if (peerCount != 0)
1387                {
1388                  /* don't make a second block request until the endgame */
1389                  if (!s->endgame)
1390                    continue;
1391
1392                  /* don't have more than two peers requesting this block */
1393                  if (peerCount > 1)
1394                    continue;
1395
1396                  /* don't send the same request to the same peer twice */
1397                  if (peer == peers[0])
1398                    continue;
1399
1400                  /* in the endgame allow an additional peer to download a
1401                     block but only if the peer seems to be handling requests
1402                     relatively fast */
1403                  if (peer->pendingReqsToPeer + numwant - got < s->endgame)
1404                    continue;
1405                }
1406
1407              /* update the caller's table */
1408              if (!get_intervals)
1409                {
1410                  setme[got++] = b;
1411                }
1412              /* if intervals are requested two array entries are necessarry:
1413                 one for the interval's starting block and one for its end block */
1414              else if (got && setme[2 * got - 1] == b - 1 && b != first)
1415                {
1416                  /* expand the last interval */
1417                  ++setme[2 * got - 1];
1418                }
1419              else
1420                {
1421                  /* begin a new interval */
1422                  setme[2 * got] = setme[2 * got + 1] = b;
1423                  ++got;
1424                }
1425
1426              /* update our own tables */
1427              requestListAdd (s, b, peer);
1428              ++p->requestCount;
1429            }
1430
1431          tr_ptrArrayDestruct (&peerArr, NULL);
1432        }
1433    }
1434
1435  /* In most cases we've just changed the weights of a small number of pieces.
1436   * So rather than qsort ()ing the entire array, it's faster to apply an
1437   * adaptive insertion sort algorithm. */
1438  if (got > 0)
1439    {
1440      /* not enough requests || last piece modified */
1441      if (i == s->pieceCount)
1442        --i;
1443
1444      setComparePieceByWeightTorrent (s);
1445      while (--i >= 0)
1446        {
1447          bool exact;
1448
1449          /* relative position! */
1450          const int newpos = tr_lowerBound (&s->pieces[i], &s->pieces[i + 1],
1451                                            s->pieceCount - (i + 1),
1452                                            sizeof (struct weighted_piece),
1453                                            comparePieceByWeight, &exact);
1454          if (newpos > 0)
1455            {
1456              const struct weighted_piece piece = s->pieces[i];
1457              memmove (&s->pieces[i],
1458                       &s->pieces[i + 1],
1459                       sizeof (struct weighted_piece) * (newpos));
1460              s->pieces[i + newpos] = piece;
1461            }
1462        }
1463    }
1464
1465  assertWeightedPiecesAreSorted (t);
1466  *numgot = got;
1467}
1468
1469bool
1470tr_peerMgrDidPeerRequest (const tr_torrent  * tor,
1471                          const tr_peer     * peer,
1472                          tr_block_index_t    block)
1473{
1474  return requestListLookup ((tr_swarm*)tor->swarm, block, peer) != NULL;
1475}
1476
1477/* cancel requests that are too old */
1478static void
1479refillUpkeep (evutil_socket_t foo UNUSED, short bar UNUSED, void * vmgr)
1480{
1481    time_t now;
1482    time_t too_old;
1483    tr_torrent * tor;
1484    int cancel_buflen = 0;
1485    struct block_request * cancel = NULL;
1486    tr_peerMgr * mgr = vmgr;
1487    managerLock (mgr);
1488
1489    now = tr_time ();
1490    too_old = now - REQUEST_TTL_SECS;
1491
1492    /* alloc the temporary "cancel" buffer */
1493    tor = NULL;
1494    while ((tor = tr_torrentNext (mgr->session, tor)))
1495        cancel_buflen = MAX (cancel_buflen, tor->swarm->requestCount);
1496    if (cancel_buflen > 0)
1497        cancel = tr_new (struct block_request, cancel_buflen);
1498
1499    /* prune requests that are too old */
1500    tor = NULL;
1501    while ((tor = tr_torrentNext (mgr->session, tor)))
1502    {
1503        tr_swarm * s = tor->swarm;
1504        const int n = s->requestCount;
1505        if (n > 0)
1506        {
1507            int keepCount = 0;
1508            int cancelCount = 0;
1509            const struct block_request * it;
1510            const struct block_request * end;
1511
1512            for (it=s->requests, end=it+n; it!=end; ++it)
1513            {
1514                tr_peerMsgs * msgs = PEER_MSGS(it->peer);
1515
1516                if ((msgs !=NULL) && (it->sentAt <= too_old) && !tr_peerMsgsIsReadingBlock (msgs, it->block))
1517                    cancel[cancelCount++] = *it;
1518                else
1519                {
1520                    if (it != &s->requests[keepCount])
1521                        s->requests[keepCount] = *it;
1522                    keepCount++;
1523                }
1524            }
1525
1526            /* prune out the ones we aren't keeping */
1527            s->requestCount = keepCount;
1528
1529            /* send cancel messages for all the "cancel" ones */
1530            for (it=cancel, end=it+cancelCount; it!=end; ++it)
1531            {
1532              tr_peerMsgs * msgs = PEER_MSGS(it->peer);
1533
1534              if (msgs != NULL)
1535                {
1536                  tr_historyAdd (&it->peer->cancelsSentToPeer, now, 1);
1537                  tr_peerMsgsCancel (msgs, it->block);
1538                  decrementPendingReqCount (it);
1539                }
1540            }
1541
1542            /* decrement the pending request counts for the timed-out blocks */
1543            for (it=cancel, end=it+cancelCount; it!=end; ++it)
1544                pieceListRemoveRequest (s, it->block);
1545        }
1546    }
1547
1548    tr_free (cancel);
1549    tr_timerAddMsec (mgr->refillUpkeepTimer, REFILL_UPKEEP_PERIOD_MSEC);
1550    managerUnlock (mgr);
1551}
1552
1553static void
1554addStrike (tr_swarm * s, tr_peer * peer)
1555{
1556  tordbg (s, "increasing peer %s strike count to %d",
1557          tr_atomAddrStr (peer->atom), peer->strikes + 1);
1558
1559  if (++peer->strikes >= MAX_BAD_PIECES_PER_PEER)
1560    {
1561      struct peer_atom * atom = peer->atom;
1562      atom->flags2 |= MYFLAG_BANNED;
1563      peer->doPurge = true;
1564      tordbg (s, "banning peer %s", tr_atomAddrStr (atom));
1565    }
1566}
1567
1568static void
1569peerSuggestedPiece (tr_swarm           * s UNUSED,
1570                    tr_peer            * peer UNUSED,
1571                    tr_piece_index_t     pieceIndex UNUSED,
1572                    int                  isFastAllowed UNUSED)
1573{
1574#if 0
1575    assert (t);
1576    assert (peer);
1577    assert (peer->msgs);
1578
1579    /* is this a valid piece? */
1580    if (pieceIndex >= t->tor->info.pieceCount)
1581        return;
1582
1583    /* don't ask for it if we've already got it */
1584    if (tr_torrentPieceIsComplete (t->tor, pieceIndex))
1585        return;
1586
1587    /* don't ask for it if they don't have it */
1588    if (!tr_bitfieldHas (peer->have, pieceIndex))
1589        return;
1590
1591    /* don't ask for it if we're choked and it's not fast */
1592    if (!isFastAllowed && peer->clientIsChoked)
1593        return;
1594
1595    /* request the blocks that we don't have in this piece */
1596    {
1597        tr_block_index_t b;
1598        tr_block_index_t first;
1599        tr_block_index_t last;
1600        const tr_torrent * tor = t->tor;
1601
1602        tr_torGetPieceBlockRange (t->tor, pieceIndex, &first, &last);
1603
1604        for (b=first; b<=last; ++b)
1605        {
1606            if (tr_torrentBlockIsComplete (tor, b))
1607            {
1608                const uint32_t offset = getBlockOffsetInPiece (tor, b);
1609                const uint32_t length = tr_torBlockCountBytes (tor, b);
1610                tr_peerMsgsAddRequest (peer->msgs, pieceIndex, offset, length);
1611                incrementPieceRequests (t, pieceIndex);
1612            }
1613        }
1614    }
1615#endif
1616}
1617
1618static void
1619removeRequestFromTables (tr_swarm * s, tr_block_index_t block, const tr_peer * peer)
1620{
1621  requestListRemove (s, block, peer);
1622  pieceListRemoveRequest (s, block);
1623}
1624
1625/* peer choked us, or maybe it disconnected.
1626   either way we need to remove all its requests */
1627static void
1628peerDeclinedAllRequests (tr_swarm * s, const tr_peer * peer)
1629{
1630  int i, n;
1631  tr_block_index_t * blocks = tr_new (tr_block_index_t, s->requestCount);
1632
1633  for (i=n=0; i<s->requestCount; ++i)
1634    if (peer == s->requests[i].peer)
1635      blocks[n++] = s->requests[i].block;
1636
1637  for (i=0; i<n; ++i)
1638    removeRequestFromTables (s, blocks[i], peer);
1639
1640  tr_free (blocks);
1641}
1642
1643static void
1644cancelAllRequestsForBlock (tr_swarm          * s,
1645                           tr_block_index_t    block,
1646                           tr_peer           * no_notify)
1647{
1648  int i;
1649  int peerCount;
1650  tr_peer ** peers;
1651  tr_ptrArray peerArr;
1652
1653  peerArr = TR_PTR_ARRAY_INIT;
1654  getBlockRequestPeers (s, block, &peerArr);
1655  peers = (tr_peer **) tr_ptrArrayPeek (&peerArr, &peerCount);
1656  for (i=0; i<peerCount; ++i)
1657    {
1658      tr_peer * p = peers[i];
1659
1660      if ((p != no_notify) && tr_isPeerMsgs (p))
1661        {
1662          tr_historyAdd (&p->cancelsSentToPeer, tr_time (), 1);
1663          tr_peerMsgsCancel (PEER_MSGS(p), block);
1664        }
1665
1666      removeRequestFromTables (s, block, p);
1667    }
1668
1669  tr_ptrArrayDestruct (&peerArr, NULL);
1670}
1671
1672void
1673tr_peerMgrPieceCompleted (tr_torrent * tor, tr_piece_index_t p)
1674{
1675  int i;
1676  bool pieceCameFromPeers = false;
1677  tr_swarm * const s = tor->swarm;
1678  const int n = tr_ptrArraySize (&s->peers);
1679
1680  /* walk through our peers */
1681  for (i=0; i<n; ++i)
1682    {
1683      tr_peer * peer = tr_ptrArrayNth (&s->peers, i);
1684
1685      /* notify the peer that we now have this piece */
1686      tr_peerMsgsHave (PEER_MSGS(peer), p);
1687
1688      if (!pieceCameFromPeers)
1689        pieceCameFromPeers = tr_bitfieldHas (&peer->blame, p);
1690    }
1691
1692  if (pieceCameFromPeers) /* webseed downloads don't belong in announce totals */
1693    tr_announcerAddBytes (tor, TR_ANN_DOWN, tr_torPieceCountBytes (tor, p));
1694
1695  /* bookkeeping */
1696  pieceListRemovePiece (s, p);
1697  s->needsCompletenessCheck = true;
1698}
1699
1700static void
1701peerCallbackFunc (tr_peer * peer, const tr_peer_event * e, void * vs)
1702{
1703  tr_swarm * s = vs;
1704
1705  swarmLock (s);
1706
1707  assert (peer != NULL);
1708
1709  switch (e->eventType)
1710    {
1711      case TR_PEER_PEER_GOT_PIECE_DATA:
1712        {
1713          const time_t now = tr_time ();
1714          tr_torrent * tor = s->tor;
1715
1716          tor->uploadedCur += e->length;
1717          tr_announcerAddBytes (tor, TR_ANN_UP, e->length);
1718          tr_torrentSetActivityDate (tor, now);
1719          tr_torrentSetDirty (tor);
1720          tr_statsAddUploaded (tor->session, e->length);
1721
1722          if (peer->atom != NULL)
1723            peer->atom->piece_data_time = now;
1724
1725          break;
1726        }
1727
1728      case TR_PEER_CLIENT_GOT_PIECE_DATA:
1729        {
1730          const time_t now = tr_time ();
1731          tr_torrent * tor = s->tor;
1732
1733          tor->downloadedCur += e->length;
1734          tr_torrentSetActivityDate (tor, now);
1735          tr_torrentSetDirty (tor);
1736
1737          tr_statsAddDownloaded (tor->session, e->length);
1738
1739          if (peer->atom != NULL)
1740            peer->atom->piece_data_time = now;
1741
1742          break;
1743        }
1744
1745      case TR_PEER_CLIENT_GOT_HAVE:
1746        if (replicationExists (s))
1747          {
1748            tr_incrReplicationOfPiece (s, e->pieceIndex);
1749            assertReplicationCountIsExact (s);
1750          }
1751        break;
1752
1753      case TR_PEER_CLIENT_GOT_HAVE_ALL:
1754        if (replicationExists (s))
1755          {
1756            tr_incrReplication (s);
1757            assertReplicationCountIsExact (s);
1758          }
1759        break;
1760
1761      case TR_PEER_CLIENT_GOT_HAVE_NONE:
1762        /* noop */
1763        break;
1764
1765      case TR_PEER_CLIENT_GOT_BITFIELD:
1766        assert (e->bitfield != NULL);
1767        if (replicationExists (s))
1768          {
1769            tr_incrReplicationFromBitfield (s, e->bitfield);
1770            assertReplicationCountIsExact (s);
1771          }
1772        break;
1773
1774      case TR_PEER_CLIENT_GOT_REJ:
1775        {
1776          tr_block_index_t b = _tr_block (s->tor, e->pieceIndex, e->offset);
1777          if (b < s->tor->blockCount)
1778            removeRequestFromTables (s, b, peer);
1779          else
1780            tordbg (s, "Peer %s sent an out-of-range reject message",
1781                    tr_atomAddrStr (peer->atom));
1782          break;
1783        }
1784
1785      case TR_PEER_CLIENT_GOT_CHOKE:
1786        peerDeclinedAllRequests (s, peer);
1787        break;
1788
1789      case TR_PEER_CLIENT_GOT_PORT:
1790        if (peer->atom)
1791          peer->atom->port = e->port;
1792        break;
1793
1794      case TR_PEER_CLIENT_GOT_SUGGEST:
1795        peerSuggestedPiece (s, peer, e->pieceIndex, false);
1796        break;
1797
1798      case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1799        peerSuggestedPiece (s, peer, e->pieceIndex, true);
1800        break;
1801
1802      case TR_PEER_CLIENT_GOT_BLOCK:
1803        {
1804          tr_torrent * tor = s->tor;
1805          const tr_piece_index_t p = e->pieceIndex;
1806          const tr_block_index_t block = _tr_block (tor, p, e->offset);
1807          cancelAllRequestsForBlock (s, block, peer);
1808          tr_historyAdd (&peer->blocksSentToClient, tr_time(), 1);
1809          pieceListResortPiece (s, pieceListLookup (s, p));
1810          tr_torrentGotBlock (tor, block);
1811          break;
1812        }
1813
1814      case TR_PEER_ERROR:
1815        if ((e->err == ERANGE) || (e->err == EMSGSIZE) || (e->err == ENOTCONN))
1816          {
1817            /* some protocol error from the peer */
1818            peer->doPurge = true;
1819            tordbg (s, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1820                    tr_atomAddrStr (peer->atom));
1821          }
1822        else
1823          {
1824            tordbg (s, "unhandled error: %s", tr_strerror (e->err));
1825          }
1826        break;
1827
1828      default:
1829        assert (0);
1830    }
1831
1832  swarmUnlock (s);
1833}
1834
1835static int
1836getDefaultShelfLife (uint8_t from)
1837{
1838    /* in general, peers obtained from firsthand contact
1839     * are better than those from secondhand, etc etc */
1840    switch (from)
1841    {
1842        case TR_PEER_FROM_INCOMING : return 60 * 60 * 6;
1843        case TR_PEER_FROM_LTEP     : return 60 * 60 * 6;
1844        case TR_PEER_FROM_TRACKER  : return 60 * 60 * 3;
1845        case TR_PEER_FROM_DHT      : return 60 * 60 * 3;
1846        case TR_PEER_FROM_PEX      : return 60 * 60 * 2;
1847        case TR_PEER_FROM_RESUME   : return 60 * 60;
1848        case TR_PEER_FROM_LPD      : return 10 * 60;
1849        default                    : return 60 * 60;
1850    }
1851}
1852
1853static void
1854ensureAtomExists (tr_swarm          * s,
1855                  const tr_address  * addr,
1856                  const tr_port       port,
1857                  const uint8_t       flags,
1858                  const int8_t        seedProbability,
1859                  const uint8_t       from)
1860{
1861  struct peer_atom * a;
1862
1863  assert (tr_address_is_valid (addr));
1864  assert (from < TR_PEER_FROM__MAX);
1865
1866  a = getExistingAtom (s, addr);
1867
1868  if (a == NULL)
1869    {
1870      const int jitter = tr_rand_int_weak (60*10);
1871      a = tr_new0 (struct peer_atom, 1);
1872      a->addr = *addr;
1873      a->port = port;
1874      a->flags = flags;
1875      a->fromFirst = from;
1876      a->fromBest = from;
1877      a->shelf_date = tr_time () + getDefaultShelfLife (from) + jitter;
1878      a->blocklisted = -1;
1879      atomSetSeedProbability (a, seedProbability);
1880      tr_ptrArrayInsertSorted (&s->pool, a, compareAtomsByAddress);
1881
1882      tordbg (s, "got a new atom: %s", tr_atomAddrStr (a));
1883    }
1884  else
1885    {
1886      if (from < a->fromBest)
1887        a->fromBest = from;
1888
1889      if (a->seedProbability == -1)
1890        atomSetSeedProbability (a, seedProbability);
1891
1892      a->flags |= flags;
1893    }
1894}
1895
1896static int
1897getMaxPeerCount (const tr_torrent * tor)
1898{
1899  return tor->maxConnectedPeers;
1900}
1901
1902static int
1903getPeerCount (const tr_swarm * s)
1904{
1905  return tr_ptrArraySize (&s->peers);/* + tr_ptrArraySize (&t->outgoingHandshakes); */
1906}
1907
1908
1909static void
1910createBitTorrentPeer (tr_torrent       * tor,
1911                      struct tr_peerIo * io,
1912                      struct peer_atom * atom,
1913                      tr_quark           client)
1914{
1915  tr_peer * peer;
1916  tr_peerMsgs * msgs;
1917  tr_swarm * swarm;
1918
1919  assert (atom != NULL);
1920  assert (tr_isTorrent (tor));
1921  assert (tor->swarm != NULL);
1922
1923  swarm = tor->swarm;
1924
1925  peer = (tr_peer*) tr_peerMsgsNew (tor, io, peerCallbackFunc, swarm);
1926  peer->atom = atom;
1927  peer->client = client;
1928  atom->peer = peer;
1929
1930  tr_ptrArrayInsertSorted (&swarm->peers, peer, peerCompare);
1931  ++swarm->stats.peerCount;
1932  ++swarm->stats.peerFromCount[atom->fromFirst];
1933
1934  assert (swarm->stats.peerCount == tr_ptrArraySize (&swarm->peers));
1935  assert (swarm->stats.peerFromCount[atom->fromFirst] <= swarm->stats.peerCount);
1936
1937  msgs = PEER_MSGS (peer);
1938  tr_peerMsgsUpdateActive (msgs, TR_UP);
1939  tr_peerMsgsUpdateActive (msgs, TR_DOWN);
1940}
1941
1942
1943/* FIXME: this is kind of a mess. */
1944static bool
1945myHandshakeDoneCB (tr_handshake  * handshake,
1946                   tr_peerIo     * io,
1947                   bool            readAnythingFromPeer,
1948                   bool            isConnected,
1949                   const uint8_t * peer_id,
1950                   void          * vmanager)
1951{
1952  bool ok = isConnected;
1953  bool success = false;
1954  tr_port port;
1955  const tr_address * addr;
1956  tr_peerMgr * manager = vmanager;
1957  tr_swarm  * s;
1958
1959  assert (io);
1960  assert (tr_isBool (ok));
1961
1962  s = tr_peerIoHasTorrentHash (io)
1963    ? getExistingSwarm (manager, tr_peerIoGetTorrentHash (io))
1964    : NULL;
1965
1966  if (tr_peerIoIsIncoming (io))
1967    tr_ptrArrayRemoveSortedPointer (&manager->incomingHandshakes,
1968                                    handshake, handshakeCompare);
1969  else if (s)
1970    tr_ptrArrayRemoveSortedPointer (&s->outgoingHandshakes,
1971                                    handshake, handshakeCompare);
1972
1973  if (s)
1974    swarmLock (s);
1975
1976  addr = tr_peerIoGetAddress (io, &port);
1977
1978  if (!ok || !s || !s->isRunning)
1979    {
1980      if (s)
1981        {
1982          struct peer_atom * atom = getExistingAtom (s, addr);
1983          if (atom)
1984            {
1985              ++atom->numFails;
1986
1987              if (!readAnythingFromPeer)
1988                {
1989                  tordbg (s, "marking peer %s as unreachable... numFails is %d", tr_atomAddrStr (atom), (int)atom->numFails);
1990                  atom->flags2 |= MYFLAG_UNREACHABLE;
1991                }
1992            }
1993        }
1994    }
1995  else /* looking good */
1996    {
1997      struct peer_atom * atom;
1998
1999      ensureAtomExists (s, addr, port, 0, -1, TR_PEER_FROM_INCOMING);
2000      atom = getExistingAtom (s, addr);
2001
2002      assert (atom != NULL);
2003
2004      atom->time = tr_time ();
2005      atom->piece_data_time = 0;
2006      atom->lastConnectionAt = tr_time ();
2007
2008      if (!tr_peerIoIsIncoming (io))
2009        {
2010          atom->flags |= ADDED_F_CONNECTABLE;
2011          atom->flags2 &= ~MYFLAG_UNREACHABLE;
2012        }
2013
2014      /* In principle, this flag specifies whether the peer groks uTP,
2015         not whether it's currently connected over uTP. */
2016      if (io->utp_socket)
2017        atom->flags |= ADDED_F_UTP_FLAGS;
2018
2019      if (atom->flags2 & MYFLAG_BANNED)
2020        {
2021          tordbg (s, "banned peer %s tried to reconnect",
2022                  tr_atomAddrStr (atom));
2023        }
2024      else if (tr_peerIoIsIncoming (io) && (getPeerCount (s) >= getMaxPeerCount (s->tor)))
2025        {
2026        }
2027      else
2028        {
2029          tr_peer * peer = atom->peer;
2030
2031          if (peer) /* we already have this peer */
2032            {
2033            }
2034          else
2035            {
2036              tr_quark client;
2037              tr_peerIo * io;
2038              char buf[128];
2039
2040              if (peer_id != NULL)
2041                client = tr_quark_new (tr_clientForId (buf, sizeof (buf), peer_id), TR_BAD_SIZE);
2042              else
2043                client = TR_KEY_NONE;
2044
2045              io = tr_handshakeStealIO (handshake); /* this steals its refcount too, which is
2046                                                       balanced by our unref in peerDelete ()  */
2047              tr_peerIoSetParent (io, &s->tor->bandwidth);
2048              createBitTorrentPeer (s->tor, io, atom, client);
2049
2050              success = true;
2051            }
2052        }
2053    }
2054
2055  if (s != NULL)
2056    swarmUnlock (s);
2057
2058  return success;
2059}
2060
2061void
2062tr_peerMgrAddIncoming (tr_peerMgr       * manager,
2063                       tr_address       * addr,
2064                       tr_port            port,
2065                       tr_socket_t        socket,
2066                       struct UTPSocket * utp_socket)
2067{
2068  tr_session * session;
2069
2070  managerLock (manager);
2071
2072  assert (tr_isSession (manager->session));
2073  session = manager->session;
2074
2075  if (tr_sessionIsAddressBlocked (session, addr))
2076    {
2077      tr_logAddDebug ("Banned IP address \"%s\" tried to connect to us", tr_address_to_string (addr));
2078      if (socket != TR_BAD_SOCKET)
2079        tr_netClose (session, socket);
2080      else
2081        UTP_Close (utp_socket);
2082    }
2083  else if (getExistingHandshake (&manager->incomingHandshakes, addr))
2084    {
2085      if (socket != TR_BAD_SOCKET)
2086        tr_netClose (session, socket);
2087      else
2088        UTP_Close (utp_socket);
2089    }
2090  else /* we don't have a connection to them yet... */
2091    {
2092      tr_peerIo *    io;
2093      tr_handshake * handshake;
2094
2095      io = tr_peerIoNewIncoming (session, &session->bandwidth, addr, port, socket, utp_socket);
2096
2097      handshake = tr_handshakeNew (io,
2098                                   session->encryptionMode,
2099                                   myHandshakeDoneCB,
2100                                   manager);
2101
2102      tr_peerIoUnref (io); /* balanced by the implicit ref in tr_peerIoNewIncoming () */
2103
2104      tr_ptrArrayInsertSorted (&manager->incomingHandshakes, handshake,
2105                               handshakeCompare);
2106    }
2107
2108  managerUnlock (manager);
2109}
2110
2111void
2112tr_peerMgrAddPex (tr_torrent * tor, uint8_t from,
2113                  const tr_pex * pex, int8_t seedProbability)
2114{
2115  if (tr_isPex (pex)) /* safeguard against corrupt data */
2116    {
2117      tr_swarm * s = tor->swarm;
2118      managerLock (s->manager);
2119
2120      if (!tr_sessionIsAddressBlocked (s->manager->session, &pex->addr))
2121        if (tr_address_is_valid_for_peers (&pex->addr, pex->port))
2122          ensureAtomExists (s, &pex->addr, pex->port, pex->flags, seedProbability, from);
2123
2124      managerUnlock (s->manager);
2125    }
2126}
2127
2128void
2129tr_peerMgrMarkAllAsSeeds (tr_torrent * tor)
2130{
2131  tr_swarm * s = tor->swarm;
2132  const int n = tr_ptrArraySize (&s->pool);
2133  struct peer_atom ** it = (struct peer_atom**) tr_ptrArrayBase (&s->pool);
2134  struct peer_atom ** end = it + n;
2135
2136  while (it != end)
2137    atomSetSeed (s, *it++);
2138}
2139
2140tr_pex *
2141tr_peerMgrCompactToPex (const void    * compact,
2142                        size_t          compactLen,
2143                        const uint8_t * added_f,
2144                        size_t          added_f_len,
2145                        size_t        * pexCount)
2146{
2147  size_t i;
2148  size_t n = compactLen / 6;
2149  const uint8_t * walk = compact;
2150  tr_pex * pex = tr_new0 (tr_pex, n);
2151
2152  for (i=0; i<n; ++i)
2153    {
2154      pex[i].addr.type = TR_AF_INET;
2155      memcpy (&pex[i].addr.addr, walk, 4); walk += 4;
2156      memcpy (&pex[i].port, walk, 2); walk += 2;
2157      if (added_f && (n == added_f_len))
2158        pex[i].flags = added_f[i];
2159    }
2160
2161  *pexCount = n;
2162  return pex;
2163}
2164
2165tr_pex *
2166tr_peerMgrCompact6ToPex (const void    * compact,
2167                         size_t          compactLen,
2168                         const uint8_t * added_f,
2169                         size_t          added_f_len,
2170                         size_t        * pexCount)
2171{
2172  size_t i;
2173  size_t n = compactLen / 18;
2174  const uint8_t * walk = compact;
2175  tr_pex * pex = tr_new0 (tr_pex, n);
2176
2177  for (i=0; i<n; ++i)
2178    {
2179      pex[i].addr.type = TR_AF_INET6;
2180      memcpy (&pex[i].addr.addr.addr6.s6_addr, walk, 16); walk += 16;
2181      memcpy (&pex[i].port, walk, 2); walk += 2;
2182      if (added_f && (n == added_f_len))
2183        pex[i].flags = added_f[i];
2184    }
2185
2186  *pexCount = n;
2187  return pex;
2188}
2189
2190tr_pex *
2191tr_peerMgrArrayToPex (const void  * array,
2192                      size_t        arrayLen,
2193                      size_t      * pexCount)
2194{
2195  size_t i;
2196  size_t n = arrayLen / (sizeof (tr_address) + 2);
2197  const uint8_t * walk = array;
2198  tr_pex * pex = tr_new0 (tr_pex, n);
2199
2200  for (i=0 ; i<n ; ++i)
2201    {
2202      memcpy (&pex[i].addr, walk, sizeof (tr_address));
2203      memcpy (&pex[i].port, walk + sizeof (tr_address), 2);
2204      pex[i].flags = 0x00;
2205      walk += sizeof (tr_address) + 2;
2206    }
2207
2208  *pexCount = n;
2209  return pex;
2210}
2211
2212/**
2213***
2214**/
2215
2216void
2217tr_peerMgrGotBadPiece (tr_torrent * tor, tr_piece_index_t pieceIndex)
2218{
2219  int i;
2220  int n;
2221  tr_swarm * s = tor->swarm;
2222  const uint32_t byteCount = tr_torPieceCountBytes (tor, pieceIndex);
2223
2224  for (i=0, n=tr_ptrArraySize(&s->peers); i!=n; ++i)
2225    {
2226      tr_peer * peer = tr_ptrArrayNth (&s->peers, i);
2227
2228      if (tr_bitfieldHas (&peer->blame, pieceIndex))
2229        {
2230          tordbg (s, "peer %s contributed to corrupt piece (%d); now has %d strikes",
2231                  tr_atomAddrStr(peer->atom), pieceIndex, (int)peer->strikes + 1);
2232          addStrike (s, peer);
2233        }
2234    }
2235
2236
2237  tr_announcerAddBytes (tor, TR_ANN_CORRUPT, byteCount);
2238}
2239
2240int
2241tr_pexCompare (const void * va, const void * vb)
2242{
2243  int i;
2244  const tr_pex * a = va;
2245  const tr_pex * b = vb;
2246
2247  assert (tr_isPex (a));
2248  assert (tr_isPex (b));
2249
2250  if ((i = tr_address_compare (&a->addr, &b->addr)))
2251    return i;
2252
2253  if (a->port != b->port)
2254    return a->port < b->port ? -1 : 1;
2255
2256  return 0;
2257}
2258
2259/* better goes first */
2260static int
2261compareAtomsByUsefulness (const void * va, const void *vb)
2262{
2263  const struct peer_atom * a = * (const struct peer_atom* const *) va;
2264  const struct peer_atom * b = * (const struct peer_atom* const *) vb;
2265
2266  assert (tr_isAtom (a));
2267  assert (tr_isAtom (b));
2268
2269  if (a->piece_data_time != b->piece_data_time)
2270    return a->piece_data_time > b->piece_data_time ? -1 : 1;
2271  if (a->fromBest != b->fromBest)
2272    return a->fromBest < b->fromBest ? -1 : 1;
2273  if (a->numFails != b->numFails)
2274    return a->numFails < b->numFails ? -1 : 1;
2275
2276  return 0;
2277}
2278
2279static bool
2280isAtomInteresting (const tr_torrent * tor, struct peer_atom * atom)
2281{
2282  if (tr_torrentIsSeed (tor) && atomIsSeed (atom))
2283    return false;
2284
2285  if (peerIsInUse (tor->swarm, atom))
2286    return true;
2287
2288  if (isAtomBlocklisted (tor->session, atom))
2289    return false;
2290
2291  if (atom->flags2 & MYFLAG_BANNED)
2292    return false;
2293
2294  return true;
2295}
2296
2297int
2298tr_peerMgrGetPeers (tr_torrent   * tor,
2299                    tr_pex      ** setme_pex,
2300                    uint8_t        af,
2301                    uint8_t        list_mode,
2302                    int            maxCount)
2303{
2304  int i;
2305  int n;
2306  int count = 0;
2307  int atomCount = 0;
2308  const tr_swarm * s = tor->swarm;
2309  struct peer_atom ** atoms = NULL;
2310  tr_pex * pex;
2311  tr_pex * walk;
2312
2313  assert (tr_isTorrent (tor));
2314  assert (setme_pex != NULL);
2315  assert (af==TR_AF_INET || af==TR_AF_INET6);
2316  assert (list_mode==TR_PEERS_CONNECTED || list_mode==TR_PEERS_INTERESTING);
2317
2318  managerLock (s->manager);
2319
2320  /**
2321  ***  build a list of atoms
2322  **/
2323
2324  if (list_mode == TR_PEERS_CONNECTED) /* connected peers only */
2325    {
2326      int i;
2327      const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase (&s->peers);
2328      atomCount = tr_ptrArraySize (&s->peers);
2329      atoms = tr_new (struct peer_atom *, atomCount);
2330      for (i=0; i<atomCount; ++i)
2331        atoms[i] = peers[i]->atom;
2332    }
2333  else /* TR_PEERS_INTERESTING */
2334    {
2335      int i;
2336      struct peer_atom ** atomBase = (struct peer_atom**) tr_ptrArrayBase (&s->pool);
2337      n = tr_ptrArraySize (&s->pool);
2338      atoms = tr_new (struct peer_atom *, n);
2339      for (i=0; i<n; ++i)
2340        if (isAtomInteresting (tor, atomBase[i]))
2341          atoms[atomCount++] = atomBase[i];
2342    }
2343
2344  qsort (atoms, atomCount, sizeof (struct peer_atom *), compareAtomsByUsefulness);
2345
2346  /**
2347  ***  add the first N of them into our return list
2348  **/
2349
2350  n = MIN (atomCount, maxCount);
2351  pex = walk = tr_new0 (tr_pex, n);
2352
2353  for (i=0; i<atomCount && count<n; ++i)
2354    {
2355      const struct peer_atom * atom = atoms[i];
2356      if (atom->addr.type == af)
2357        {
2358          assert (tr_address_is_valid (&atom->addr));
2359          walk->addr = atom->addr;
2360          walk->port = atom->port;
2361          walk->flags = atom->flags;
2362          ++count;
2363          ++walk;
2364        }
2365    }
2366
2367  qsort (pex, count, sizeof (tr_pex), tr_pexCompare);
2368
2369  assert ((walk - pex) == count);
2370  *setme_pex = pex;
2371
2372  /* cleanup */
2373  tr_free (atoms);
2374  managerUnlock (s->manager);
2375  return count;
2376}
2377
2378static void atomPulse      (evutil_socket_t, short, void *);
2379static void bandwidthPulse (evutil_socket_t, short, void *);
2380static void rechokePulse   (evutil_socket_t, short, void *);
2381static void reconnectPulse (evutil_socket_t, short, void *);
2382
2383static struct event *
2384createTimer (tr_session * session, int msec, event_callback_fn callback, void * cbdata)
2385{
2386  struct event * timer = evtimer_new (session->event_base, callback, cbdata);
2387  tr_timerAddMsec (timer, msec);
2388  return timer;
2389}
2390
2391static void
2392ensureMgrTimersExist (struct tr_peerMgr * m)
2393{
2394  if (m->atomTimer == NULL)
2395    m->atomTimer = createTimer (m->session, ATOM_PERIOD_MSEC, atomPulse, m);
2396
2397  if (m->bandwidthTimer == NULL)
2398    m->bandwidthTimer = createTimer (m->session, BANDWIDTH_PERIOD_MSEC, bandwidthPulse, m);
2399
2400  if (m->rechokeTimer == NULL)
2401    m->rechokeTimer = createTimer (m->session, RECHOKE_PERIOD_MSEC, rechokePulse, m);
2402
2403  if (m->refillUpkeepTimer == NULL)
2404    m->refillUpkeepTimer = createTimer (m->session, REFILL_UPKEEP_PERIOD_MSEC, refillUpkeep, m);
2405}
2406
2407void
2408tr_peerMgrStartTorrent (tr_torrent * tor)
2409{
2410  tr_swarm * s;
2411
2412  assert (tr_isTorrent (tor));
2413  assert (tr_torrentIsLocked (tor));
2414
2415  s = tor->swarm;
2416  ensureMgrTimersExist (s->manager);
2417
2418  s->isRunning = true;
2419  s->maxPeers = tor->maxConnectedPeers;
2420  s->pieceSortState = PIECES_UNSORTED;
2421
2422  rechokePulse (0, 0, s->manager);
2423}
2424
2425static void removeAllPeers (tr_swarm *);
2426
2427static void
2428stopSwarm (tr_swarm * swarm)
2429{
2430  swarm->isRunning = false;
2431
2432  replicationFree (swarm);
2433  invalidatePieceSorting (swarm);
2434
2435  removeAllPeers (swarm);
2436
2437  /* disconnect the handshakes. handshakeAbort calls handshakeDoneCB (),
2438   * which removes the handshake from t->outgoingHandshakes... */
2439  while (!tr_ptrArrayEmpty (&swarm->outgoingHandshakes))
2440    tr_handshakeAbort (tr_ptrArrayNth (&swarm->outgoingHandshakes, 0));
2441}
2442
2443void
2444tr_peerMgrStopTorrent (tr_torrent * tor)
2445{
2446  assert (tr_isTorrent (tor));
2447  assert (tr_torrentIsLocked (tor));
2448
2449  stopSwarm (tor->swarm);
2450}
2451
2452void
2453tr_peerMgrAddTorrent (tr_peerMgr * manager, tr_torrent * tor)
2454{
2455  assert (tr_isTorrent (tor));
2456  assert (tr_torrentIsLocked (tor));
2457  assert (tor->swarm == NULL);
2458
2459  tor->swarm = swarmNew (manager, tor);
2460}
2461
2462void
2463tr_peerMgrRemoveTorrent (tr_torrent * tor)
2464{
2465  assert (tr_isTorrent (tor));
2466  assert (tr_torrentIsLocked (tor));
2467
2468  stopSwarm (tor->swarm);
2469  swarmFree (tor->swarm);
2470}
2471
2472void
2473tr_peerUpdateProgress (tr_torrent * tor, tr_peer * peer)
2474{
2475  const tr_bitfield * have = &peer->have;
2476
2477  if (tr_bitfieldHasAll (have))
2478    {
2479      peer->progress = 1.0;
2480    }
2481  else if (tr_bitfieldHasNone (have))
2482    {
2483      peer->progress = 0.0;
2484    }
2485  else
2486    {
2487      const float true_count = tr_bitfieldCountTrueBits (have);
2488
2489      if (tr_torrentHasMetadata (tor))
2490        {
2491          peer->progress = true_count / tor->info.pieceCount;
2492        }
2493      else /* without pieceCount, this result is only a best guess... */
2494        {
2495          peer->progress = true_count / (have->bit_count + 1);
2496        }
2497    }
2498
2499  /* clamp the progress range */
2500  if (peer->progress < 0.0)
2501    peer->progress = 0.0;
2502  if (peer->progress > 1.0)
2503    peer->progress = 1.0;
2504
2505  if (peer->atom && (peer->progress >= 1.0))
2506    atomSetSeed (tor->swarm, peer->atom);
2507}
2508
2509void
2510tr_peerMgrOnTorrentGotMetainfo (tr_torrent * tor)
2511{
2512  int i;
2513  int peerCount;
2514  tr_peer ** peers;
2515
2516  /* the webseed list may have changed... */
2517  rebuildWebseedArray (tor->swarm, tor);
2518
2519  /* some peer_msgs' progress fields may not be accurate if we
2520     didn't have the metadata before now... so refresh them all... */
2521  peerCount = tr_ptrArraySize (&tor->swarm->peers);
2522  peers = (tr_peer**) tr_ptrArrayBase (&tor->swarm->peers);
2523  for (i=0; i<peerCount; ++i)
2524    tr_peerUpdateProgress (tor, peers[i]);
2525
2526  /* update the bittorrent peers' willingnes... */
2527  for (i=0; i<peerCount; ++i)
2528    {
2529      tr_peerMsgsUpdateActive (tr_peerMsgsCast(peers[i]), TR_UP);
2530      tr_peerMsgsUpdateActive (tr_peerMsgsCast(peers[i]), TR_DOWN);
2531    }
2532}
2533
2534void
2535tr_peerMgrTorrentAvailability (const tr_torrent  * tor,
2536                               int8_t            * tab,
2537                               unsigned int        tabCount)
2538{
2539  assert (tr_isTorrent (tor));
2540  assert (tab != NULL);
2541  assert (tabCount > 0);
2542
2543  memset (tab, 0, tabCount);
2544
2545  if (tr_torrentHasMetadata (tor))
2546    {
2547      tr_piece_index_t i;
2548      const int peerCount = tr_ptrArraySize (&tor->swarm->peers);
2549      const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase (&tor->swarm->peers);
2550      const float interval = tor->info.pieceCount / (float)tabCount;
2551      const bool isSeed = tr_torrentGetCompleteness (tor) == TR_SEED;
2552
2553      for (i=0; i<tabCount; ++i)
2554        {
2555          const int piece = i * interval;
2556
2557          if (isSeed || tr_torrentPieceIsComplete (tor, piece))
2558            {
2559              tab[i] = -1;
2560            }
2561          else if (peerCount)
2562            {
2563              int j;
2564              for (j=0; j<peerCount; ++j)
2565                if (tr_bitfieldHas (&peers[j]->have, piece))
2566                  ++tab[i];
2567            }
2568        }
2569    }
2570}
2571
2572void
2573tr_swarmGetStats (const tr_swarm * swarm, tr_swarm_stats * setme)
2574{
2575  assert (swarm != NULL);
2576  assert (setme != NULL);
2577
2578  *setme = swarm->stats;
2579}
2580
2581void
2582tr_swarmIncrementActivePeers (tr_swarm * swarm, tr_direction direction, bool is_active)
2583{
2584  int n = swarm->stats.activePeerCount[direction];
2585
2586  if (is_active)
2587    ++n;
2588  else
2589    --n;
2590
2591  assert (0 <= n);
2592  assert (n <= swarm->stats.peerCount);
2593
2594  swarm->stats.activePeerCount[direction] = n;
2595}
2596
2597bool
2598tr_peerIsSeed (const tr_peer * peer)
2599{
2600  if (peer->progress >= 1.0)
2601    return true;
2602
2603  if (peer->atom && atomIsSeed (peer->atom))
2604    return true;
2605
2606  return false;
2607}
2608
2609/* count how many bytes we want that connected peers have */
2610uint64_t
2611tr_peerMgrGetDesiredAvailable (const tr_torrent * tor)
2612{
2613  size_t i;
2614  size_t n;
2615  uint64_t desiredAvailable;
2616  const tr_swarm * s;
2617
2618  assert (tr_isTorrent (tor));
2619
2620  /* common shortcuts... */
2621
2622  if (!tor->isRunning || tor->isStopping ||
2623      tr_torrentIsSeed (tor) || !tr_torrentHasMetadata (tor))
2624    return 0;
2625
2626  s = tor->swarm;
2627  if (s == NULL || !s->isRunning)
2628    return 0;
2629
2630  n = tr_ptrArraySize (&s->peers);
2631  if (n == 0)
2632    {
2633      return 0;
2634    }
2635  else
2636    {
2637      const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase (&s->peers);
2638      for (i=0; i<n; ++i)
2639        if (peers[i]->atom && atomIsSeed (peers[i]->atom))
2640          return tr_torrentGetLeftUntilDone (tor);
2641    }
2642
2643  if (!s->pieceReplication || !s->pieceReplicationSize)
2644    return 0;
2645
2646  /* do it the hard way */
2647
2648  desiredAvailable = 0;
2649  for (i=0, n=MIN (tor->info.pieceCount, s->pieceReplicationSize); i<n; ++i)
2650    if (!tor->info.pieces[i].dnd && (s->pieceReplication[i] > 0))
2651      desiredAvailable += tr_torrentMissingBytesInPiece (tor, i);
2652
2653  assert (desiredAvailable <= tor->info.totalSize);
2654  return desiredAvailable;
2655}
2656
2657double*
2658tr_peerMgrWebSpeeds_KBps (const tr_torrent * tor)
2659{
2660  unsigned int i;
2661  tr_swarm * s;
2662  unsigned int n;
2663  double * ret = NULL;
2664  const uint64_t now = tr_time_msec ();
2665
2666  assert (tr_isTorrent (tor));
2667
2668  s = tor->swarm;
2669  n = tr_ptrArraySize (&s->webseeds);
2670  ret = tr_new0 (double, n);
2671
2672  assert (s->manager != NULL);
2673  assert (n == tor->info.webseedCount);
2674
2675  for (i=0; i<n; ++i)
2676    {
2677      unsigned int Bps = 0;
2678      if (tr_peerIsTransferringPieces (tr_ptrArrayNth(&s->webseeds,i), now, TR_DOWN, &Bps))
2679        ret[i] = Bps / (double)tr_speed_K;
2680      else
2681        ret[i] = -1.0;
2682    }
2683
2684  return ret;
2685}
2686
2687struct tr_peer_stat *
2688tr_peerMgrPeerStats (const tr_torrent * tor, int * setmeCount)
2689{
2690  int i;
2691  int size = 0;
2692  tr_peer_stat * ret;
2693  const tr_swarm * s;
2694  tr_peer ** peers;
2695  const time_t now = tr_time ();
2696  const uint64_t now_msec = tr_time_msec ();
2697
2698  assert (tr_isTorrent (tor));
2699  assert (tor->swarm->manager != NULL);
2700
2701  s = tor->swarm;
2702  peers = (tr_peer**) tr_ptrArrayBase (&s->peers);
2703  size = tr_ptrArraySize (&s->peers);
2704  ret = tr_new0 (tr_peer_stat, size);
2705
2706  for (i=0; i<size; ++i)
2707    {
2708      char *                   pch;
2709      tr_peer *                peer = peers[i];
2710      tr_peerMsgs *            msgs = PEER_MSGS (peer);
2711      const struct peer_atom * atom = peer->atom;
2712      tr_peer_stat *           stat = ret + i;
2713
2714      tr_address_to_string_with_buf (&atom->addr, stat->addr, sizeof (stat->addr));
2715      tr_strlcpy (stat->client, tr_quark_get_string(peer->client,NULL), sizeof (stat->client));
2716      stat->port                = ntohs (peer->atom->port);
2717      stat->from                = atom->fromFirst;
2718      stat->progress            = peer->progress;
2719      stat->isUTP               = tr_peerMsgsIsUtpConnection (msgs);
2720      stat->isEncrypted         = tr_peerMsgsIsEncrypted (msgs);
2721      stat->rateToPeer_KBps     = toSpeedKBps (tr_peerGetPieceSpeed_Bps (peer, now_msec, TR_CLIENT_TO_PEER));
2722      stat->rateToClient_KBps   = toSpeedKBps (tr_peerGetPieceSpeed_Bps (peer, now_msec, TR_PEER_TO_CLIENT));
2723      stat->peerIsChoked        = tr_peerMsgsIsPeerChoked (msgs);
2724      stat->peerIsInterested    = tr_peerMsgsIsPeerInterested (msgs);
2725      stat->clientIsChoked      = tr_peerMsgsIsClientChoked (msgs);
2726      stat->clientIsInterested  = tr_peerMsgsIsClientInterested (msgs);
2727      stat->isIncoming          = tr_peerMsgsIsIncomingConnection (msgs);
2728      stat->isDownloadingFrom   = tr_peerMsgsIsActive (msgs, TR_PEER_TO_CLIENT);
2729      stat->isUploadingTo       = tr_peerMsgsIsActive (msgs, TR_CLIENT_TO_PEER);
2730      stat->isSeed              = tr_peerIsSeed (peer);
2731
2732      stat->blocksToPeer        = tr_historyGet (&peer->blocksSentToPeer,    now, CANCEL_HISTORY_SEC);
2733      stat->blocksToClient      = tr_historyGet (&peer->blocksSentToClient,  now, CANCEL_HISTORY_SEC);
2734      stat->cancelsToPeer       = tr_historyGet (&peer->cancelsSentToPeer,   now, CANCEL_HISTORY_SEC);
2735      stat->cancelsToClient     = tr_historyGet (&peer->cancelsSentToClient, now, CANCEL_HISTORY_SEC);
2736
2737      stat->pendingReqsToPeer   = peer->pendingReqsToPeer;
2738      stat->pendingReqsToClient = peer->pendingReqsToClient;
2739
2740      pch = stat->flagStr;
2741      if (stat->isUTP) *pch++ = 'T';
2742      if (s->optimistic == msgs) *pch++ = 'O';
2743      if (stat->isDownloadingFrom) *pch++ = 'D';
2744      else if (stat->clientIsInterested) *pch++ = 'd';
2745      if (stat->isUploadingTo) *pch++ = 'U';
2746      else if (stat->peerIsInterested) *pch++ = 'u';
2747      if (!stat->clientIsChoked && !stat->clientIsInterested) *pch++ = 'K';
2748      if (!stat->peerIsChoked && !stat->peerIsInterested) *pch++ = '?';
2749      if (stat->isEncrypted) *pch++ = 'E';
2750      if (stat->from == TR_PEER_FROM_DHT) *pch++ = 'H';
2751      else if (stat->from == TR_PEER_FROM_PEX) *pch++ = 'X';
2752      if (stat->isIncoming) *pch++ = 'I';
2753      *pch = '\0';
2754    }
2755
2756  *setmeCount = size;
2757  return ret;
2758}
2759
2760/***
2761****
2762****
2763***/
2764
2765void
2766tr_peerMgrClearInterest (tr_torrent * tor)
2767{
2768  int i;
2769  tr_swarm * s = tor->swarm;
2770  const int peerCount = tr_ptrArraySize (&s->peers);
2771
2772  assert (tr_isTorrent (tor));
2773  assert (tr_torrentIsLocked (tor));
2774
2775  for (i=0; i<peerCount; ++i)
2776    tr_peerMsgsSetInterested (tr_ptrArrayNth (&s->peers, i), false);
2777}
2778
2779/* does this peer have any pieces that we want? */
2780static bool
2781isPeerInteresting (tr_torrent     * const tor,
2782                   const bool     * const piece_is_interesting,
2783                   const tr_peer  * const peer)
2784{
2785  tr_piece_index_t i, n;
2786
2787  /* these cases should have already been handled by the calling code... */
2788  assert (!tr_torrentIsSeed (tor));
2789  assert (tr_torrentIsPieceTransferAllowed (tor, TR_PEER_TO_CLIENT));
2790
2791  if (tr_peerIsSeed (peer))
2792    return true;
2793
2794  for (i=0, n=tor->info.pieceCount; i<n; ++i)
2795    if (piece_is_interesting[i] && tr_bitfieldHas (&peer->have, i))
2796      return true;
2797
2798  return false;
2799}
2800
2801typedef enum
2802{
2803  RECHOKE_STATE_GOOD,
2804  RECHOKE_STATE_UNTESTED,
2805  RECHOKE_STATE_BAD
2806}
2807tr_rechoke_state;
2808
2809struct tr_rechoke_info
2810{
2811  tr_peer * peer;
2812  int salt;
2813  int rechoke_state;
2814};
2815
2816static int
2817compare_rechoke_info (const void * va, const void * vb)
2818{
2819  const struct tr_rechoke_info * a = va;
2820  const struct tr_rechoke_info * b = vb;
2821
2822  if (a->rechoke_state != b->rechoke_state)
2823    return a->rechoke_state - b->rechoke_state;
2824
2825  return a->salt - b->salt;
2826}
2827
2828/* determines who we send "interested" messages to */
2829static void
2830rechokeDownloads (tr_swarm * s)
2831{
2832  int i;
2833  int maxPeers = 0;
2834  int rechoke_count = 0;
2835  struct tr_rechoke_info * rechoke = NULL;
2836  const int MIN_INTERESTING_PEERS = 5;
2837  const int peerCount = tr_ptrArraySize (&s->peers);
2838  const time_t now = tr_time ();
2839
2840  /* some cases where this function isn't necessary */
2841  if (tr_torrentIsSeed (s->tor))
2842    return;
2843  if (!tr_torrentIsPieceTransferAllowed (s->tor, TR_PEER_TO_CLIENT))
2844    return;
2845
2846  /* decide HOW MANY peers to be interested in */
2847  {
2848    int blocks = 0;
2849    int cancels = 0;
2850    time_t timeSinceCancel;
2851
2852    /* Count up how many blocks & cancels each peer has.
2853     *
2854     * There are two situations where we send out cancels --
2855     *
2856     * 1. We've got unresponsive peers, which is handled by deciding
2857     *    -which- peers to be interested in.
2858     *
2859     * 2. We've hit our bandwidth cap, which is handled by deciding
2860     *    -how many- peers to be interested in.
2861     *
2862     * We're working on 2. here, so we need to ignore unresponsive
2863     * peers in our calculations lest they confuse Transmission into
2864     * thinking it's hit its bandwidth cap.
2865     */
2866    for (i=0; i<peerCount; ++i)
2867      {
2868        const tr_peer * peer = tr_ptrArrayNth (&s->peers, i);
2869        const int b = tr_historyGet (&peer->blocksSentToClient, now, CANCEL_HISTORY_SEC);
2870        const int c = tr_historyGet (&peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC);
2871
2872        if (b == 0) /* ignore unresponsive peers, as described above */
2873          continue;
2874
2875        blocks += b;
2876        cancels += c;
2877      }
2878
2879    if (cancels > 0)
2880      {
2881        /* cancelRate: of the block requests we've recently made, the percentage we cancelled.
2882         * higher values indicate more congestion. */
2883        const double cancelRate = cancels / (double)(cancels + blocks);
2884        const double mult = 1 - MIN (cancelRate, 0.5);
2885        maxPeers = s->interestedCount * mult;
2886        tordbg (s, "cancel rate is %.3f -- reducing the "
2887                   "number of peers we're interested in by %.0f percent",
2888                   cancelRate, mult * 100);
2889        s->lastCancel = now;
2890      }
2891
2892    timeSinceCancel = now - s->lastCancel;
2893    if (timeSinceCancel)
2894      {
2895        const int maxIncrease = 15;
2896        const time_t maxHistory = 2 * CANCEL_HISTORY_SEC;
2897        const double mult = MIN (timeSinceCancel, maxHistory) / (double) maxHistory;
2898        const int inc = maxIncrease * mult;
2899        maxPeers = s->maxPeers + inc;
2900        tordbg (s, "time since last cancel is %jd -- increasing the "
2901                   "number of peers we're interested in by %d",
2902                   (intmax_t)timeSinceCancel, inc);
2903      }
2904  }
2905
2906  /* don't let the previous section's number tweaking go too far... */
2907  if (maxPeers < MIN_INTERESTING_PEERS)
2908    maxPeers = MIN_INTERESTING_PEERS;
2909  if (maxPeers > s->tor->maxConnectedPeers)
2910    maxPeers = s->tor->maxConnectedPeers;
2911
2912  s->maxPeers = maxPeers;
2913
2914  if (peerCount > 0)
2915    {
2916      bool * piece_is_interesting;
2917      const tr_torrent * const tor = s->tor;
2918      const int n = tor->info.pieceCount;
2919
2920      /* build a bitfield of interesting pieces... */
2921      piece_is_interesting = tr_new (bool, n);
2922      for (i=0; i<n; i++)
2923        piece_is_interesting[i] = !tor->info.pieces[i].dnd && !tr_torrentPieceIsComplete (tor, i);
2924
2925      /* decide WHICH peers to be interested in (based on their cancel-to-block ratio) */
2926      for (i=0; i<peerCount; ++i)
2927        {
2928          tr_peer * peer = tr_ptrArrayNth (&s->peers, i);
2929
2930          if (!isPeerInteresting (s->tor, piece_is_interesting, peer))
2931            {
2932              tr_peerMsgsSetInterested (PEER_MSGS(peer), false);
2933            }
2934          else
2935            {
2936              tr_rechoke_state rechoke_state;
2937              const int blocks = tr_historyGet (&peer->blocksSentToClient, now, CANCEL_HISTORY_SEC);
2938              const int cancels = tr_historyGet (&peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC);
2939
2940              if (!blocks && !cancels)
2941                rechoke_state = RECHOKE_STATE_UNTESTED;
2942              else if (!cancels)
2943                rechoke_state = RECHOKE_STATE_GOOD;
2944              else if (!blocks)
2945                rechoke_state = RECHOKE_STATE_BAD;
2946              else if ((cancels * 10) < blocks)
2947                rechoke_state = RECHOKE_STATE_GOOD;
2948              else
2949                rechoke_state = RECHOKE_STATE_BAD;
2950
2951              if (rechoke == NULL)
2952                rechoke = tr_new (struct tr_rechoke_info, peerCount);
2953
2954              rechoke[rechoke_count].peer = peer;
2955              rechoke[rechoke_count].rechoke_state = rechoke_state;
2956              rechoke[rechoke_count].salt = tr_rand_int_weak (INT_MAX);
2957              rechoke_count++;
2958            }
2959
2960        }
2961
2962      tr_free (piece_is_interesting);
2963    }
2964
2965  /* now that we know which & how many peers to be interested in... update the peer interest */
2966  qsort (rechoke, rechoke_count, sizeof (struct tr_rechoke_info), compare_rechoke_info);
2967  s->interestedCount = MIN (maxPeers, rechoke_count);
2968  for (i=0; i<rechoke_count; ++i)
2969    tr_peerMsgsSetInterested (PEER_MSGS(rechoke[i].peer), i<s->interestedCount);
2970
2971  /* cleanup */
2972  tr_free (rechoke);
2973}
2974
2975/**
2976***
2977**/
2978
2979struct ChokeData
2980{
2981  bool          isInterested;
2982  bool          wasChoked;
2983  bool          isChoked;
2984  int           rate;
2985  int           salt;
2986  tr_peerMsgs * msgs;
2987};
2988
2989static int
2990compareChoke (const void * va, const void * vb)
2991{
2992  const struct ChokeData * a = va;
2993  const struct ChokeData * b = vb;
2994
2995  if (a->rate != b->rate) /* prefer higher overall speeds */
2996    return a->rate > b->rate ? -1 : 1;
2997
2998  if (a->wasChoked != b->wasChoked) /* prefer unchoked */
2999    return a->wasChoked ? 1 : -1;
3000
3001  if (a->salt != b->salt) /* random order */
3002    return a->salt - b->salt;
3003
3004  return 0;
3005}
3006
3007/* is this a new connection? */
3008static bool
3009isNew (const tr_peerMsgs * msgs)
3010{
3011  return (msgs != NULL) && (tr_peerMsgsGetConnectionAge (msgs) < 45);
3012}
3013
3014/* get a rate for deciding which peers to choke and unchoke. */
3015static int
3016getRate (const tr_torrent * tor, struct peer_atom * atom, uint64_t now)
3017{
3018  unsigned int Bps;
3019
3020  if (tr_torrentIsSeed (tor))
3021    Bps = tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_CLIENT_TO_PEER);
3022
3023  /* downloading a private torrent... take upload speed into account
3024   * because there may only be a small window of opportunity to share */
3025  else if (tr_torrentIsPrivate (tor))
3026    Bps = tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_PEER_TO_CLIENT)
3027        + tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_CLIENT_TO_PEER);
3028
3029  /* downloading a public torrent */
3030  else
3031    Bps = tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_PEER_TO_CLIENT);
3032
3033  /* convert it to bytes per second */
3034  return Bps;
3035}
3036
3037static inline bool
3038isBandwidthMaxedOut (const tr_bandwidth * b,
3039                     const uint64_t now_msec, tr_direction dir)
3040{
3041  if (!tr_bandwidthIsLimited (b, dir))
3042    {
3043      return false;
3044    }
3045  else
3046    {
3047      const unsigned int got = tr_bandwidthGetPieceSpeed_Bps (b, now_msec, dir);
3048      const unsigned int want = tr_bandwidthGetDesiredSpeed_Bps (b, dir);
3049      return got >= want;
3050    }
3051}
3052
3053static void
3054rechokeUploads (tr_swarm * s, const uint64_t now)
3055{
3056  int i, size, unchokedInterested;
3057  const int peerCount = tr_ptrArraySize (&s->peers);
3058  tr_peer ** peers = (tr_peer**) tr_ptrArrayBase (&s->peers);
3059  struct ChokeData * choke = tr_new0 (struct ChokeData, peerCount);
3060  const tr_session * session = s->manager->session;
3061  const bool chokeAll = !tr_torrentIsPieceTransferAllowed (s->tor, TR_CLIENT_TO_PEER);
3062  const bool isMaxedOut = isBandwidthMaxedOut (&s->tor->bandwidth, now, TR_UP);
3063
3064  assert (swarmIsLocked (s));
3065
3066  /* an optimistic unchoke peer's "optimistic"
3067   * state lasts for N calls to rechokeUploads (). */
3068  if (s->optimisticUnchokeTimeScaler > 0)
3069    s->optimisticUnchokeTimeScaler--;
3070  else
3071    s->optimistic = NULL;
3072
3073  /* sort the peers by preference and rate */
3074  for (i=0, size=0; i<peerCount; ++i)
3075    {
3076      tr_peer * peer = peers[i];
3077      tr_peerMsgs * msgs = PEER_MSGS (peer);
3078
3079      struct peer_atom * atom = peer->atom;
3080
3081      if (tr_peerIsSeed (peer)) /* choke seeds and partial seeds */
3082        {
3083          tr_peerMsgsSetChoke (PEER_MSGS(peer), true);
3084        }
3085      else if (chokeAll) /* choke everyone if we're not uploading */
3086        {
3087          tr_peerMsgsSetChoke (PEER_MSGS(peer), true);
3088        }
3089      else if (msgs != s->optimistic)
3090        {
3091          struct ChokeData * n = &choke[size++];
3092          n->msgs         = msgs;
3093          n->isInterested = tr_peerMsgsIsPeerInterested (msgs);
3094          n->wasChoked    = tr_peerMsgsIsPeerChoked (msgs);
3095          n->rate         = getRate (s->tor, atom, now);
3096          n->salt         = tr_rand_int_weak (INT_MAX);
3097          n->isChoked     = true;
3098        }
3099    }
3100
3101  qsort (choke, size, sizeof (struct ChokeData), compareChoke);
3102
3103  /**
3104   * Reciprocation and number of uploads capping is managed by unchoking
3105   * the N peers which have the best upload rate and are interested.
3106   * This maximizes the client's download rate. These N peers are
3107   * referred to as downloaders, because they are interested in downloading
3108   * from the client.
3109   *
3110   * Peers which have a better upload rate (as compared to the downloaders)
3111   * but aren't interested get unchoked. If they become interested, the
3112   * downloader with the worst upload rate gets choked. If a client has
3113   * a complete file, it uses its upload rate rather than its download
3114   * rate to decide which peers to unchoke.
3115   *
3116   * If our bandwidth is maxed out, don't unchoke any more peers.
3117   */
3118  unchokedInterested = 0;
3119  for (i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i)
3120    {
3121      choke[i].isChoked = isMaxedOut ? choke[i].wasChoked : false;
3122      if (choke[i].isInterested)
3123        ++unchokedInterested;
3124    }
3125
3126  /* optimistic unchoke */
3127  if (!s->optimistic && !isMaxedOut && (i<size))
3128    {
3129      int n;
3130      struct ChokeData * c;
3131      tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
3132
3133      for (; i<size; ++i)
3134        {
3135          if (choke[i].isInterested)
3136            {
3137              const tr_peerMsgs * msgs = choke[i].msgs;
3138              int x = 1, y;
3139              if (isNew (msgs)) x *= 3;
3140              for (y=0; y<x; ++y)
3141                tr_ptrArrayAppend (&randPool, &choke[i]);
3142            }
3143        }
3144
3145      if ((n = tr_ptrArraySize (&randPool)))
3146        {
3147          c = tr_ptrArrayNth (&randPool, tr_rand_int_weak (n));
3148          c->isChoked = false;
3149          s->optimistic = c->msgs;
3150          s->optimisticUnchokeTimeScaler = OPTIMISTIC_UNCHOKE_MULTIPLIER;
3151        }
3152
3153      tr_ptrArrayDestruct (&randPool, NULL);
3154    }
3155
3156  for (i=0; i<size; ++i)
3157    tr_peerMsgsSetChoke (choke[i].msgs, choke[i].isChoked);
3158
3159  /* cleanup */
3160  tr_free (choke);
3161}
3162
3163static void
3164rechokePulse (evutil_socket_t foo UNUSED, short bar UNUSED, void * vmgr)
3165{
3166  tr_torrent * tor = NULL;
3167  tr_peerMgr * mgr = vmgr;
3168  const uint64_t now = tr_time_msec ();
3169
3170  managerLock (mgr);
3171
3172  while ((tor = tr_torrentNext (mgr->session, tor)))
3173    {
3174      if (tor->isRunning)
3175        {
3176          tr_swarm * s = tor->swarm;
3177
3178          if (s->stats.peerCount > 0)
3179            {
3180              rechokeUploads (s, now);
3181              rechokeDownloads (s);
3182            }
3183        }
3184    }
3185
3186  tr_timerAddMsec (mgr->rechokeTimer, RECHOKE_PERIOD_MSEC);
3187  managerUnlock (mgr);
3188}
3189
3190/***
3191****
3192****  Life and Death
3193****
3194***/
3195
3196static bool
3197shouldPeerBeClosed (const tr_swarm   * s,
3198                    const tr_peer    * peer,
3199                    int                peerCount,
3200                    const time_t       now)
3201{
3202  const tr_torrent * tor = s->tor;
3203  const struct peer_atom * atom = peer->atom;
3204
3205  /* if it's marked for purging, close it */
3206  if (peer->doPurge)
3207    {
3208      tordbg (s, "purging peer %s because its doPurge flag is set",
3209              tr_atomAddrStr (atom));
3210      return true;
3211    }
3212
3213  /* disconnect if we're both seeds and enough time has passed for PEX */
3214  if (tr_torrentIsSeed (tor) && tr_peerIsSeed (peer))
3215    return !tr_torrentAllowsPex (tor) || (now-atom->time>=30);
3216
3217  /* disconnect if it's been too long since piece data has been transferred.
3218   * this is on a sliding scale based on number of available peers... */
3219  {
3220    const int relaxStrictnessIfFewerThanN = (int)((getMaxPeerCount (tor) * 0.9) + 0.5);
3221    /* if we have >= relaxIfFewerThan, strictness is 100%.
3222     * if we have zero connections, strictness is 0% */
3223    const float strictness = peerCount >= relaxStrictnessIfFewerThanN
3224                           ? 1.0
3225                           : peerCount / (float)relaxStrictnessIfFewerThanN;
3226    const int lo = MIN_UPLOAD_IDLE_SECS;
3227    const int hi = MAX_UPLOAD_IDLE_SECS;
3228    const int limit = hi - ((hi - lo) * strictness);
3229    const int idleTime = now - MAX (atom->time, atom->piece_data_time);
3230/*fprintf (stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit);*/
3231    if (idleTime > limit)
3232      {
3233        tordbg (s, "purging peer %s because it's been %d secs since we shared anything",
3234                tr_atomAddrStr (atom), idleTime);
3235        return true;
3236      }
3237  }
3238
3239  return false;
3240}
3241
3242static tr_peer **
3243getPeersToClose (tr_swarm * s, const time_t now_sec, int * setmeSize)
3244{
3245  int i, peerCount, outsize;
3246  struct tr_peer ** ret = NULL;
3247  tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek (&s->peers, &peerCount);
3248
3249  assert (swarmIsLocked (s));
3250
3251  for (i=outsize=0; i<peerCount; ++i)
3252    {
3253      if (shouldPeerBeClosed (s, peers[i], peerCount, now_sec))
3254        {
3255          if (ret == NULL)
3256            ret = tr_new (tr_peer *, peerCount);
3257          ret[outsize++] = peers[i];
3258        }
3259    }
3260
3261  *setmeSize = outsize;
3262  return ret;
3263}
3264
3265static int
3266getReconnectIntervalSecs (const struct peer_atom * atom, const time_t now)
3267{
3268  int sec;
3269  const bool unreachable = (atom->flags2 & MYFLAG_UNREACHABLE) != 0;
3270
3271  /* if we were recently connected to this peer and transferring piece
3272   * data, try to reconnect to them sooner rather that later -- we don't
3273   * want network troubles to get in the way of a good peer. */
3274  if (!unreachable && ((now - atom->piece_data_time) <= (MINIMUM_RECONNECT_INTERVAL_SECS * 2)))
3275    sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3276
3277  /* otherwise, the interval depends on how many times we've tried
3278   * and failed to connect to the peer */
3279  else
3280    {
3281      int step = atom->numFails;
3282
3283      /* penalize peers that were unreachable the last time we tried */
3284      if (unreachable)
3285        step += 2;
3286
3287      switch (step)
3288        {
3289          case 0: sec = 0; break;
3290          case 1: sec = 10; break;
3291          case 2: sec = 60 * 2; break;
3292          case 3: sec = 60 * 15; break;
3293          case 4: sec = 60 * 30; break;
3294          case 5: sec = 60 * 60; break;
3295          default: sec = 60 * 120; break;
3296        }
3297    }
3298
3299  dbgmsg ("reconnect interval for %s is %d seconds", tr_atomAddrStr (atom), sec);
3300  return sec;
3301}
3302
3303static void
3304removePeer (tr_swarm * s, tr_peer * peer)
3305{
3306  struct peer_atom * atom = peer->atom;
3307
3308  assert (swarmIsLocked (s));
3309  assert (atom);
3310
3311  atom->time = tr_time ();
3312
3313  tr_ptrArrayRemoveSortedPointer (&s->peers, peer, peerCompare);
3314  --s->stats.peerCount;
3315  --s->stats.peerFromCount[atom->fromFirst];
3316
3317  if (replicationExists (s))
3318    tr_decrReplicationFromBitfield (s, &peer->have);
3319
3320  assert (s->stats.peerCount == tr_ptrArraySize (&s->peers));
3321  assert (s->stats.peerFromCount[atom->fromFirst] >= 0);
3322
3323  tr_peerFree (peer);
3324}
3325
3326static void
3327closePeer (tr_swarm * s, tr_peer * peer)
3328{
3329  struct peer_atom * atom;
3330
3331  assert (s != NULL);
3332  assert (peer != NULL);
3333
3334  atom = peer->atom;
3335
3336  /* if we transferred piece data, then they might be good peers,
3337     so reset their `numFails' weight to zero. otherwise we connected
3338     to them fruitlessly, so mark it as another fail */
3339  if (atom->piece_data_time)
3340    {
3341      tordbg (s, "resetting atom %s numFails to 0", tr_atomAddrStr (atom));
3342      atom->numFails = 0;
3343    }
3344  else
3345    {
3346      ++atom->numFails;
3347      tordbg (s, "incremented atom %s numFails to %d", tr_atomAddrStr (atom), (int)atom->numFails);
3348    }
3349
3350  tordbg (s, "removing bad peer %s", tr_atomAddrStr (peer->atom));
3351  removePeer (s, peer);
3352}
3353
3354static void
3355removeAllPeers (tr_swarm * s)
3356{
3357  while (!tr_ptrArrayEmpty (&s->peers))
3358    removePeer (s, tr_ptrArrayNth (&s->peers, 0));
3359
3360  assert (!s->stats.peerCount);
3361}
3362
3363static void
3364closeBadPeers (tr_swarm * s, const time_t now_sec)
3365{
3366  if (!tr_ptrArrayEmpty (&s->peers))
3367    {
3368      int i;
3369      int peerCount;
3370      struct tr_peer ** peers;
3371
3372      peers = getPeersToClose (s, now_sec, &peerCount);
3373      for (i=0; i<peerCount; ++i)
3374        closePeer (s, peers[i]);
3375      tr_free (peers);
3376    }
3377}
3378
3379struct peer_liveliness
3380{
3381  tr_peer * peer;
3382  void * clientData;
3383  time_t pieceDataTime;
3384  time_t time;
3385  unsigned int speed;
3386  bool doPurge;
3387};
3388
3389static int
3390comparePeerLiveliness (const void * va, const void * vb)
3391{
3392  const struct peer_liveliness * a = va;
3393  const struct peer_liveliness * b = vb;
3394
3395  if (a->doPurge != b->doPurge)
3396    return a->doPurge ? 1 : -1;
3397
3398  if (a->speed != b->speed) /* faster goes first */
3399    return a->speed > b->speed ? -1 : 1;
3400
3401  /* the one to give us data more recently goes first */
3402  if (a->pieceDataTime != b->pieceDataTime)
3403    return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
3404
3405  /* the one we connected to most recently goes first */
3406  if (a->time != b->time)
3407    return a->time > b->time ? -1 : 1;
3408
3409  return 0;
3410}
3411
3412static void
3413sortPeersByLivelinessImpl (tr_peer  ** peers,
3414                           void     ** clientData,
3415                           int         n,
3416                           uint64_t    now,
3417                           int (*compare)(const void *va, const void *vb))
3418{
3419  int i;
3420  struct peer_liveliness *lives, *l;
3421
3422  /* build a sortable array of peer + extra info */
3423  lives = l = tr_new0 (struct peer_liveliness, n);
3424  for (i=0; i<n; ++i, ++l)
3425    {
3426      tr_peer * p = peers[i];
3427      l->peer = p;
3428      l->doPurge = p->doPurge;
3429      l->pieceDataTime = p->atom->piece_data_time;
3430      l->time = p->atom->time;
3431      l->speed = tr_peerGetPieceSpeed_Bps (p, now, TR_UP)
3432               + tr_peerGetPieceSpeed_Bps (p, now, TR_DOWN);
3433      if (clientData)
3434        l->clientData = clientData[i];
3435    }
3436
3437  /* sort 'em */
3438  assert (n == (l - lives));
3439  qsort (lives, n, sizeof (struct peer_liveliness), compare);
3440
3441  /* build the peer array */
3442  for (i=0, l=lives; i<n; ++i, ++l)
3443    {
3444      peers[i] = l->peer;
3445      if (clientData)
3446        clientData[i] = l->clientData;
3447    }
3448  assert (n == (l - lives));
3449
3450  /* cleanup */
3451  tr_free (lives);
3452}
3453
3454static void
3455sortPeersByLiveliness (tr_peer ** peers, void ** clientData, int n, uint64_t now)
3456{
3457  sortPeersByLivelinessImpl (peers, clientData, n, now, comparePeerLiveliness);
3458}
3459
3460
3461static void
3462enforceTorrentPeerLimit (tr_swarm * s, uint64_t now)
3463{
3464  int n = tr_ptrArraySize (&s->peers);
3465  const int max = tr_torrentGetPeerLimit (s->tor);
3466  if (n > max)
3467    {
3468      void * base = tr_ptrArrayBase (&s->peers);
3469      tr_peer ** peers = tr_memdup (base, n*sizeof (tr_peer*));
3470      sortPeersByLiveliness (peers, NULL, n, now);
3471      while (n > max)
3472        closePeer (s, peers[--n]);
3473      tr_free (peers);
3474    }
3475}
3476
3477static void
3478enforceSessionPeerLimit (tr_session * session, uint64_t now)
3479{
3480  int n = 0;
3481  tr_torrent * tor = NULL;
3482  const int max = tr_sessionGetPeerLimit (session);
3483
3484  /* count the total number of peers */
3485  while ((tor = tr_torrentNext (session, tor)))
3486    n += tr_ptrArraySize (&tor->swarm->peers);
3487
3488  /* if there are too many, prune out the worst */
3489  if (n > max)
3490    {
3491      tr_peer ** peers = tr_new (tr_peer*, n);
3492      tr_swarm ** swarms = tr_new (tr_swarm*, n);
3493
3494      /* populate the peer array */
3495      n = 0;
3496      tor = NULL;
3497      while ((tor = tr_torrentNext (session, tor)))
3498        {
3499          int i;
3500          tr_swarm * s = tor->swarm;
3501          const int tn = tr_ptrArraySize (&s->peers);
3502          for (i=0; i<tn; ++i, ++n)
3503            {
3504              peers[n] = tr_ptrArrayNth (&s->peers, i);
3505              swarms[n] = s;
3506            }
3507        }
3508
3509      /* sort 'em */
3510      sortPeersByLiveliness (peers, (void**)swarms, n, now);
3511
3512      /* cull out the crappiest */
3513      while (n-- > max)
3514        closePeer (swarms[n], peers[n]);
3515
3516      /* cleanup */
3517      tr_free (swarms);
3518      tr_free (peers);
3519    }
3520}
3521
3522static void makeNewPeerConnections (tr_peerMgr * mgr, const int max);
3523
3524static void
3525reconnectPulse (evutil_socket_t foo UNUSED, short bar UNUSED, void * vmgr)
3526{
3527  tr_torrent * tor;
3528  tr_peerMgr * mgr = vmgr;
3529  const time_t now_sec = tr_time ();
3530  const uint64_t now_msec = tr_time_msec ();
3531
3532  /**
3533  ***  enforce the per-session and per-torrent peer limits
3534  **/
3535
3536  /* if we're over the per-torrent peer limits, cull some peers */
3537  tor = NULL;
3538  while ((tor = tr_torrentNext (mgr->session, tor)))
3539    if (tor->isRunning)
3540      enforceTorrentPeerLimit (tor->swarm, now_msec);
3541
3542  /* if we're over the per-session peer limits, cull some peers */
3543  enforceSessionPeerLimit (mgr->session, now_msec);
3544
3545  /* remove crappy peers */
3546  tor = NULL;
3547  while ((tor = tr_torrentNext (mgr->session, tor)))
3548    if (!tor->swarm->isRunning)
3549      removeAllPeers (tor->swarm);
3550    else
3551      closeBadPeers (tor->swarm, now_sec);
3552
3553  /* try to make new peer connections */
3554  makeNewPeerConnections (mgr, MAX_CONNECTIONS_PER_PULSE);
3555}
3556
3557/****
3558*****
3559*****  BANDWIDTH ALLOCATION
3560*****
3561****/
3562
3563static void
3564pumpAllPeers (tr_peerMgr * mgr)
3565{
3566  tr_torrent * tor = NULL;
3567
3568  while ((tor = tr_torrentNext (mgr->session, tor)))
3569    {
3570      int j;
3571      tr_swarm * s = tor->swarm;
3572
3573      for (j=0; j<tr_ptrArraySize (&s->peers); ++j)
3574        tr_peerMsgsPulse (tr_ptrArrayNth (&s->peers, j));
3575    }
3576}
3577
3578
3579static void
3580queuePulseForeach (void * vtor)
3581{
3582  tr_torrent * tor = vtor;
3583
3584  tr_torrentStartNow (tor);
3585
3586  if (tor->queue_started_callback != NULL)
3587    (*tor->queue_started_callback)(tor, tor->queue_started_user_data);
3588}
3589
3590static void
3591queuePulse (tr_session * session, tr_direction dir)
3592{
3593  assert (tr_isSession (session));
3594  assert (tr_isDirection (dir));
3595
3596  if (tr_sessionGetQueueEnabled (session, dir))
3597    {
3598      tr_ptrArray torrents = TR_PTR_ARRAY_INIT;
3599
3600      tr_sessionGetNextQueuedTorrents (session,
3601                                       dir,
3602                                       tr_sessionCountQueueFreeSlots (session, dir),
3603                                       &torrents);
3604
3605      tr_ptrArrayForeach (&torrents, queuePulseForeach);
3606
3607      tr_ptrArrayDestruct (&torrents, NULL);
3608    }
3609}
3610
3611static void
3612bandwidthPulse (evutil_socket_t foo UNUSED, short bar UNUSED, void * vmgr)
3613{
3614  tr_torrent * tor;
3615  tr_peerMgr * mgr = vmgr;
3616  tr_session * session = mgr->session;
3617  managerLock (mgr);
3618
3619  /* FIXME: this next line probably isn't necessary... */
3620  pumpAllPeers (mgr);
3621
3622  /* allocate bandwidth to the peers */
3623  tr_bandwidthAllocate (&session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC);
3624  tr_bandwidthAllocate (&session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC);
3625
3626  /* torrent upkeep */
3627  tor = NULL;
3628  while ((tor = tr_torrentNext (session, tor)))
3629    {
3630      /* possibly stop torrents that have seeded enough */
3631      tr_torrentCheckSeedLimit (tor);
3632
3633      /* run the completeness check for any torrents that need it */
3634      if (tor->swarm->needsCompletenessCheck)
3635        {
3636          tor->swarm->needsCompletenessCheck  = false;
3637          tr_torrentRecheckCompleteness (tor);
3638        }
3639
3640      /* stop torrents that are ready to stop, but couldn't be stopped
3641         earlier during the peer-io callback call chain */
3642      if (tor->isStopping)
3643        tr_torrentStop (tor);
3644
3645      /* update the torrent's stats */
3646      tor->swarm->stats.activeWebseedCount = countActiveWebseeds (tor->swarm);
3647    }
3648
3649  /* pump the queues */
3650  queuePulse (session, TR_UP);
3651  queuePulse (session, TR_DOWN);
3652
3653  reconnectPulse (0, 0, mgr);
3654
3655  tr_timerAddMsec (mgr->bandwidthTimer, BANDWIDTH_PERIOD_MSEC);
3656  managerUnlock (mgr);
3657}
3658
3659/***
3660****
3661***/
3662
3663static int
3664compareAtomPtrsByAddress (const void * va, const void *vb)
3665{
3666  const struct peer_atom * a = * (const struct peer_atom* const *) va;
3667  const struct peer_atom * b = * (const struct peer_atom* const *) vb;
3668
3669  assert (tr_isAtom (a));
3670  assert (tr_isAtom (b));
3671
3672  return tr_address_compare (&a->addr, &b->addr);
3673}
3674
3675/* best come first, worst go last */
3676static int
3677compareAtomPtrsByShelfDate (const void * va, const void *vb)
3678{
3679  time_t atime;
3680  time_t btime;
3681  const struct peer_atom * a = * (const struct peer_atom* const *) va;
3682  const struct peer_atom * b = * (const struct peer_atom* const *) vb;
3683  const int data_time_cutoff_secs = 60 * 60;
3684  const time_t tr_now = tr_time ();
3685
3686  assert (tr_isAtom (a));
3687  assert (tr_isAtom (b));
3688
3689  /* primary key: the last piece data time *if* it was within the last hour */
3690  atime = a->piece_data_time; if (atime + data_time_cutoff_secs < tr_now) atime = 0;
3691  btime = b->piece_data_time; if (btime + data_time_cutoff_secs < tr_now) btime = 0;
3692  if (atime != btime)
3693    return atime > btime ? -1 : 1;
3694
3695  /* secondary key: shelf date. */
3696  if (a->shelf_date != b->shelf_date)
3697    return a->shelf_date > b->shelf_date ? -1 : 1;
3698
3699  return 0;
3700}
3701
3702static int
3703getMaxAtomCount (const tr_torrent * tor)
3704{
3705  return MIN (50, tor->maxConnectedPeers * 3);
3706}
3707
3708static void
3709atomPulse (evutil_socket_t foo UNUSED, short bar UNUSED, void * vmgr)
3710{
3711  tr_torrent * tor = NULL;
3712  tr_peerMgr * mgr = vmgr;
3713  managerLock (mgr);
3714
3715  while ((tor = tr_torrentNext (mgr->session, tor)))
3716    {
3717      int atomCount;
3718      tr_swarm * s = tor->swarm;
3719      const int maxAtomCount = getMaxAtomCount (tor);
3720      struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek (&s->pool, &atomCount);
3721
3722      if (atomCount > maxAtomCount) /* we've got too many atoms... time to prune */
3723        {
3724          int i;
3725          int keepCount = 0;
3726          int testCount = 0;
3727          struct peer_atom ** keep = tr_new (struct peer_atom*, atomCount);
3728          struct peer_atom ** test = tr_new (struct peer_atom*, atomCount);
3729
3730          /* keep the ones that are in use */
3731          for (i=0; i<atomCount; ++i)
3732            {
3733              struct peer_atom * atom = atoms[i];
3734              if (peerIsInUse (s, atom))
3735                keep[keepCount++] = atom;
3736              else
3737                test[testCount++] = atom;
3738            }
3739
3740          /* if there's room, keep the best of what's left */
3741          i = 0;
3742          if (keepCount < maxAtomCount)
3743            {
3744              qsort (test, testCount, sizeof (struct peer_atom *), compareAtomPtrsByShelfDate);
3745              while (i<testCount && keepCount<maxAtomCount)
3746                keep[keepCount++] = test[i++];
3747            }
3748
3749          /* free the culled atoms */
3750          while (i<testCount)
3751            tr_free (test[i++]);
3752
3753          /* rebuild Torrent.pool with what's left */
3754          tr_ptrArrayDestruct (&s->pool, NULL);
3755          s->pool = TR_PTR_ARRAY_INIT;
3756          qsort (keep, keepCount, sizeof (struct peer_atom *), compareAtomPtrsByAddress);
3757          for (i=0; i<keepCount; ++i)
3758            tr_ptrArrayAppend (&s->pool, keep[i]);
3759
3760          tordbg (s, "max atom count is %d... pruned from %d to %d\n", maxAtomCount, atomCount, keepCount);
3761
3762          /* cleanup */
3763          tr_free (test);
3764          tr_free (keep);
3765        }
3766    }
3767
3768  tr_timerAddMsec (mgr->atomTimer, ATOM_PERIOD_MSEC);
3769  managerUnlock (mgr);
3770}
3771
3772/***
3773****
3774****
3775****
3776***/
3777
3778/* is this atom someone that we'd want to initiate a connection to? */
3779static bool
3780isPeerCandidate (const tr_torrent * tor, struct peer_atom * atom, const time_t now)
3781{
3782  /* not if we're both seeds */
3783  if (tr_torrentIsSeed (tor) && atomIsSeed (atom))
3784    return false;
3785
3786  /* not if we've already got a connection to them... */
3787  if (peerIsInUse (tor->swarm, atom))
3788    return false;
3789
3790  /* not if we just tried them already */
3791  if ((now - atom->time) < getReconnectIntervalSecs (atom, now))
3792    return false;
3793
3794  /* not if they're blocklisted */
3795  if (isAtomBlocklisted (tor->session, atom))
3796    return false;
3797
3798  /* not if they're banned... */
3799  if (atom->flags2 & MYFLAG_BANNED)
3800    return false;
3801
3802  return true;
3803}
3804
3805struct peer_candidate
3806{
3807  uint64_t score;
3808  tr_torrent * tor;
3809  struct peer_atom * atom;
3810};
3811
3812static bool
3813torrentWasRecentlyStarted (const tr_torrent * tor)
3814{
3815  return difftime (tr_time (), tor->startDate) < 120;
3816}
3817
3818static inline uint64_t
3819addValToKey (uint64_t value, int width, uint64_t addme)
3820{
3821  value = (value << (uint64_t)width);
3822  value |= addme;
3823  return value;
3824}
3825
3826/* smaller value is better */
3827static uint64_t
3828getPeerCandidateScore (const tr_torrent * tor, const struct peer_atom * atom, uint8_t salt)
3829{
3830  uint64_t i;
3831  uint64_t score = 0;
3832  const bool failed = atom->lastConnectionAt < atom->lastConnectionAttemptAt;
3833
3834  /* prefer peers we've connected to, or never tried, over peers we failed to connect to. */
3835  i = failed ? 1 : 0;
3836  score = addValToKey (score, 1, i);
3837
3838  /* prefer the one we attempted least recently (to cycle through all peers) */
3839  i = atom->lastConnectionAttemptAt;
3840  score = addValToKey (score, 32, i);
3841
3842  /* prefer peers belonging to a torrent of a higher priority */
3843  switch (tr_torrentGetPriority (tor))
3844    {
3845      case TR_PRI_HIGH:    i = 0; break;
3846      case TR_PRI_NORMAL:  i = 1; break;
3847      case TR_PRI_LOW:     i = 2; break;
3848    }
3849  score = addValToKey (score, 4, i);
3850
3851  /* prefer recently-started torrents */
3852  i = torrentWasRecentlyStarted (tor) ? 0 : 1;
3853  score = addValToKey (score, 1, i);
3854
3855  /* prefer torrents we're downloading with */
3856  i = tr_torrentIsSeed (tor) ? 1 : 0;
3857  score = addValToKey (score, 1, i);
3858
3859  /* prefer peers that are known to be connectible */
3860  i = (atom->flags & ADDED_F_CONNECTABLE) ? 0 : 1;
3861  score = addValToKey (score, 1, i);
3862
3863  /* prefer peers that we might have a chance of uploading to...
3864  so lower seed probability is better */
3865  if (atom->seedProbability == 100) i = 101;
3866  else if (atom->seedProbability == -1) i = 100;
3867  else i = atom->seedProbability;
3868  score = addValToKey (score, 8, i);
3869
3870  /* Prefer peers that we got from more trusted sources.
3871   * lower `fromBest' values indicate more trusted sources */
3872  score = addValToKey (score, 4, atom->fromBest);
3873
3874  /* salt */
3875  score = addValToKey (score, 8, salt);
3876
3877  return score;
3878}
3879
3880static int
3881comparePeerCandidates (const void * va, const void * vb)
3882{
3883  int ret;
3884  const struct peer_candidate * a = va;
3885  const struct peer_candidate * b = vb;
3886
3887  if (a->score < b->score)
3888    ret = -1;
3889  else if (a->score > b->score)
3890    ret = 1;
3891  else
3892    ret = 0;
3893
3894  return ret;
3895}
3896
3897/* Partial sorting -- selecting the k best candidates
3898   Adapted from http://en.wikipedia.org/wiki/Selection_algorithm */
3899static void
3900selectPeerCandidates (struct peer_candidate * candidates, int candidate_count, int select_count)
3901{
3902  tr_quickfindFirstK (candidates,
3903                      candidate_count,
3904                      sizeof(struct peer_candidate),
3905                      comparePeerCandidates,
3906                      select_count);
3907}
3908
3909#ifndef NDEBUG
3910static bool
3911checkBestScoresComeFirst (const struct peer_candidate * candidates, int n, int k)
3912{
3913  int i;
3914  uint64_t worstFirstScore = 0;
3915  const int x = MIN (n, k) - 1;
3916
3917  for (i=0; i<x; i++)
3918    if (worstFirstScore < candidates[i].score)
3919      worstFirstScore = candidates[i].score;
3920
3921  for (i=0; i<x; i++)
3922    assert (candidates[i].score <= worstFirstScore);
3923
3924  for (i=x+1; i<n; i++)
3925    assert (candidates[i].score >= worstFirstScore);
3926
3927  return true;
3928}
3929#endif /* NDEBUG */
3930
3931/** @return an array of all the atoms we might want to connect to */
3932static struct peer_candidate*
3933getPeerCandidates (tr_session * session, int * candidateCount, int max)
3934{
3935  int atomCount;
3936  int peerCount;
3937  tr_torrent * tor;
3938  struct peer_candidate * candidates;
3939  struct peer_candidate * walk;
3940  const time_t now = tr_time ();
3941  const uint64_t now_msec = tr_time_msec ();
3942  /* leave 5% of connection slots for incoming connections -- ticket #2609 */
3943  const int maxCandidates = tr_sessionGetPeerLimit (session) * 0.95;
3944
3945  /* count how many peers and atoms we've got */
3946  tor= NULL;
3947  atomCount = 0;
3948  peerCount = 0;
3949  while ((tor = tr_torrentNext (session, tor)))
3950    {
3951      atomCount += tr_ptrArraySize (&tor->swarm->pool);
3952      peerCount += tr_ptrArraySize (&tor->swarm->peers);
3953    }
3954
3955  /* don't start any new handshakes if we're full up */
3956  if (maxCandidates <= peerCount)
3957    {
3958      *candidateCount = 0;
3959      return NULL;
3960    }
3961
3962  /* allocate an array of candidates */
3963  walk = candidates = tr_new (struct peer_candidate, atomCount);
3964
3965  /* populate the candidate array */
3966  tor = NULL;
3967  while ((tor = tr_torrentNext (session, tor)))
3968    {
3969      int i, nAtoms;
3970      struct peer_atom ** atoms;
3971
3972      if (!tor->swarm->isRunning)
3973        continue;
3974
3975      /* if we've already got enough peers in this torrent... */
3976      if (tr_torrentGetPeerLimit (tor) <= tr_ptrArraySize (&tor->swarm->peers))
3977        continue;
3978
3979      /* if we've already got enough speed in this torrent... */
3980      if (tr_torrentIsSeed (tor) && isBandwidthMaxedOut (&tor->bandwidth, now_msec, TR_UP))
3981        continue;
3982
3983      atoms = (struct peer_atom**) tr_ptrArrayPeek (&tor->swarm->pool, &nAtoms);
3984      for (i=0; i<nAtoms; ++i)
3985        {
3986          struct peer_atom * atom = atoms[i];
3987
3988          if (isPeerCandidate (tor, atom, now))
3989            {
3990              const uint8_t salt = tr_rand_int_weak (1024);
3991              walk->tor = tor;
3992              walk->atom = atom;
3993              walk->score = getPeerCandidateScore (tor, atom, salt);
3994              ++walk;
3995            }
3996        }
3997    }
3998
3999  *candidateCount = walk - candidates;
4000  if (walk != candidates)
4001    selectPeerCandidates (candidates, walk-candidates, max);
4002
4003  assert (checkBestScoresComeFirst (candidates, *candidateCount, max));
4004  return candidates;
4005}
4006
4007static void
4008initiateConnection (tr_peerMgr * mgr, tr_swarm * s, struct peer_atom * atom)
4009{
4010  tr_peerIo * io;
4011  const time_t now = tr_time ();
4012  bool utp = tr_sessionIsUTPEnabled (mgr->session) && !atom->utp_failed;
4013
4014  if (atom->fromFirst == TR_PEER_FROM_PEX)
4015    /* PEX has explicit signalling for uTP support.  If an atom
4016       originally came from PEX and doesn't have the uTP flag, skip the
4017       uTP connection attempt.  Are we being optimistic here? */
4018    utp = utp && (atom->flags & ADDED_F_UTP_FLAGS);
4019
4020  tordbg (s, "Starting an OUTGOING%s connection with %s",
4021          utp ? " µTP" : "", tr_atomAddrStr (atom));
4022
4023  io = tr_peerIoNewOutgoing (mgr->session,
4024                             &mgr->session->bandwidth,
4025                             &atom->addr,
4026                             atom->port,
4027                             s->tor->info.hash,
4028                             s->tor->completeness == TR_SEED,
4029                             utp);
4030
4031  if (io == NULL)
4032    {
4033      tordbg (s, "peerIo not created; marking peer %s as unreachable", tr_atomAddrStr (atom));
4034      atom->flags2 |= MYFLAG_UNREACHABLE;
4035      atom->numFails++;
4036    }
4037  else
4038    {
4039      tr_handshake * handshake = tr_handshakeNew (io,
4040                                                  mgr->session->encryptionMode,
4041                                                  myHandshakeDoneCB,
4042                                                  mgr);
4043
4044      assert (tr_peerIoGetTorrentHash (io));
4045
4046      tr_peerIoUnref (io); /* balanced by the initial ref
4047                              in tr_peerIoNewOutgoing () */
4048
4049      tr_ptrArrayInsertSorted (&s->outgoingHandshakes, handshake,
4050                               handshakeCompare);
4051    }
4052
4053  atom->lastConnectionAttemptAt = now;
4054  atom->time = now;
4055}
4056
4057static void
4058initiateCandidateConnection (tr_peerMgr * mgr, struct peer_candidate * c)
4059{
4060#if 0
4061  fprintf (stderr, "Starting an OUTGOING connection with %s - [%s] seedProbability==%d; %s, %s\n",
4062           tr_atomAddrStr (c->atom),
4063           tr_torrentName (c->tor),
4064           (int)c->atom->seedProbability,
4065           tr_torrentIsPrivate (c->tor) ? "private" : "public",
4066           tr_torrentIsSeed (c->tor) ? "seed" : "downloader");
4067#endif
4068
4069  initiateConnection (mgr, c->tor->swarm, c->atom);
4070}
4071
4072static void
4073makeNewPeerConnections (struct tr_peerMgr * mgr, const int max)
4074{
4075  int i, n;
4076  struct peer_candidate * candidates;
4077
4078  candidates = getPeerCandidates (mgr->session, &n, max);
4079
4080  for (i=0; i<n && i<max; ++i)
4081    initiateCandidateConnection (mgr, &candidates[i]);
4082
4083  tr_free (candidates);
4084}
Note: See TracBrowser for help on using the repository browser.