source: trunk/libtransmission/peer-mgr.c @ 13903

Last change on this file since 13903 was 13903, checked in by jordan, 8 years ago

(libT) increment the announced downloadCount only when the piece becomes complete.

  • Property svn:keywords set to Date Rev Author Id
File size: 108.2 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2 (b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 13903 2013-01-30 20:06:12Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h> /* error codes ERANGE, ... */
15#include <limits.h> /* INT_MAX */
16#include <string.h> /* memcpy, memcmp, strstr */
17#include <stdlib.h> /* qsort */
18
19#include <event2/event.h>
20
21#include <libutp/utp.h>
22
23#include "transmission.h"
24#include "announcer.h"
25#include "bandwidth.h"
26#include "blocklist.h"
27#include "cache.h"
28#include "clients.h"
29#include "completion.h"
30#include "crypto.h"
31#include "handshake.h"
32#include "log.h"
33#include "net.h"
34#include "peer-io.h"
35#include "peer-mgr.h"
36#include "peer-msgs.h"
37#include "ptrarray.h"
38#include "session.h"
39#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
40#include "torrent.h"
41#include "tr-utp.h"
42#include "utils.h"
43#include "webseed.h"
44
45enum
46{
47  /* how frequently to cull old atoms */
48  ATOM_PERIOD_MSEC = (60 * 1000),
49
50  /* how frequently to change which peers are choked */
51  RECHOKE_PERIOD_MSEC = (10 * 1000),
52
53  /* an optimistically unchoked peer is immune from rechoking
54     for this many calls to rechokeUploads (). */
55  OPTIMISTIC_UNCHOKE_MULTIPLIER = 4,
56
57  /* how frequently to reallocate bandwidth */
58  BANDWIDTH_PERIOD_MSEC = 500,
59
60  /* how frequently to age out old piece request lists */
61  REFILL_UPKEEP_PERIOD_MSEC = (10 * 1000),
62
63  /* how frequently to decide which peers live and die */
64  RECONNECT_PERIOD_MSEC = 500,
65
66  /* when many peers are available, keep idle ones this long */
67  MIN_UPLOAD_IDLE_SECS = (60),
68
69  /* when few peers are available, keep idle ones this long */
70  MAX_UPLOAD_IDLE_SECS = (60 * 5),
71
72  /* max number of peers to ask for per second overall.
73   * this throttle is to avoid overloading the router */
74  MAX_CONNECTIONS_PER_SECOND = 12,
75
76  MAX_CONNECTIONS_PER_PULSE = (int)(MAX_CONNECTIONS_PER_SECOND * (RECONNECT_PERIOD_MSEC/1000.0)),
77
78  /* number of bad pieces a peer is allowed to send before we ban them */
79  MAX_BAD_PIECES_PER_PEER = 5,
80
81  /* amount of time to keep a list of request pieces lying around
82     before it's considered too old and needs to be rebuilt */
83  PIECE_LIST_SHELF_LIFE_SECS = 60,
84
85  /* use for bitwise operations w/peer_atom.flags2 */
86  MYFLAG_BANNED = 1,
87
88  /* use for bitwise operations w/peer_atom.flags2 */
89  /* unreachable for now... but not banned.
90   * if they try to connect to us it's okay */
91  MYFLAG_UNREACHABLE = 2,
92
93  /* the minimum we'll wait before attempting to reconnect to a peer */
94  MINIMUM_RECONNECT_INTERVAL_SECS = 5,
95
96  /** how long we'll let requests we've made linger before we cancel them */
97  REQUEST_TTL_SECS = 90,
98
99  NO_BLOCKS_CANCEL_HISTORY = 120,
100
101  CANCEL_HISTORY_SEC = 60
102};
103
104const tr_peer_event TR_PEER_EVENT_INIT = { 0, 0, NULL, 0, 0, 0, 0 };
105
106/**
107***
108**/
109
110/**
111 * Peer information that should be kept even before we've connected and
112 * after we've disconnected. These are kept in a pool of peer_atoms to decide
113 * which ones would make good candidates for connecting to, and to watch out
114 * for banned peers.
115 *
116 * @see tr_peer
117 * @see tr_peermsgs
118 */
119struct peer_atom
120{
121  uint8_t     fromFirst;          /* where the peer was first found */
122  uint8_t     fromBest;           /* the "best" value of where the peer has been found */
123  uint8_t     flags;              /* these match the added_f flags */
124  uint8_t     flags2;             /* flags that aren't defined in added_f */
125  int8_t      seedProbability;    /* how likely is this to be a seed... [0..100] or -1 for unknown */
126  int8_t      blocklisted;        /* -1 for unknown, true for blocklisted, false for not blocklisted */
127
128  tr_port     port;
129  bool        utp_failed;         /* We recently failed to connect over uTP */
130  uint16_t    numFails;
131  time_t      time;               /* when the peer's connection status last changed */
132  time_t      piece_data_time;
133
134  time_t      lastConnectionAttemptAt;
135  time_t      lastConnectionAt;
136
137  /* similar to a TTL field, but less rigid --
138   * if the swarm is small, the atom will be kept past this date. */
139  time_t      shelf_date;
140  tr_peer   * peer;               /* will be NULL if not connected */
141  tr_address  addr;
142};
143
144#ifdef NDEBUG
145#define tr_isAtom(a) (TRUE)
146#else
147static bool
148tr_isAtom (const struct peer_atom * atom)
149{
150  return (atom != NULL)
151      && (atom->fromFirst < TR_PEER_FROM__MAX)
152      && (atom->fromBest < TR_PEER_FROM__MAX)
153      && (tr_address_is_valid (&atom->addr));
154}
155#endif
156
157static const char*
158tr_atomAddrStr (const struct peer_atom * atom)
159{
160  return atom ? tr_peerIoAddrStr (&atom->addr, atom->port) : "[no atom]";
161}
162
163struct block_request
164{
165  tr_block_index_t block;
166  tr_peer * peer;
167  time_t sentAt;
168};
169
170struct weighted_piece
171{
172  tr_piece_index_t index;
173  int16_t salt;
174  int16_t requestCount;
175};
176
177enum piece_sort_state
178{
179  PIECES_UNSORTED,
180  PIECES_SORTED_BY_INDEX,
181  PIECES_SORTED_BY_WEIGHT
182};
183
184/** @brief Opaque, per-torrent data structure for peer connection information */
185typedef struct tr_swarm
186{
187  tr_ptrArray                outgoingHandshakes; /* tr_handshake */
188  tr_ptrArray                pool; /* struct peer_atom */
189  tr_ptrArray                peers; /* tr_peer */
190  tr_ptrArray                webseeds; /* tr_webseed */
191
192  tr_torrent               * tor;
193  struct tr_peerMgr        * manager;
194
195  tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
196  int                        optimisticUnchokeTimeScaler;
197
198  bool                       isRunning;
199  bool                       needsCompletenessCheck;
200
201  struct block_request     * requests;
202  int                        requestCount;
203  int                        requestAlloc;
204
205  struct weighted_piece    * pieces;
206  int                        pieceCount;
207  enum piece_sort_state      pieceSortState;
208
209  /* An array of pieceCount items stating how many peers have each piece.
210     This is used to help us for downloading pieces "rarest first."
211     This may be NULL if we don't have metainfo yet, or if we're not
212     downloading and don't care about rarity */
213  uint16_t                 * pieceReplication;
214  size_t                     pieceReplicationSize;
215
216  int                        interestedCount;
217  int                        maxPeers;
218  time_t                     lastCancel;
219
220  /* Before the endgame this should be 0. In endgame, is contains the average
221   * number of pending requests per peer. Only peers which have more pending
222   * requests are considered 'fast' are allowed to request a block that's
223   * already been requested from another (slower?) peer. */
224  int                        endgame;
225}
226tr_swarm;
227
228struct tr_peerMgr
229{
230  tr_session    * session;
231  tr_ptrArray     incomingHandshakes; /* tr_handshake */
232  struct event  * bandwidthTimer;
233  struct event  * rechokeTimer;
234  struct event  * refillUpkeepTimer;
235  struct event  * atomTimer;
236};
237
238#define tordbg(t, ...) \
239  do \
240    { \
241      if (tr_logGetDeepEnabled ()) \
242        tr_logAddDeep (__FILE__, __LINE__, tr_torrentName (t->tor), __VA_ARGS__); \
243    } \
244  while (0)
245
246#define dbgmsg(...) \
247  do \
248    { \
249      if (tr_logGetDeepEnabled ()) \
250        tr_logAddDeep (__FILE__, __LINE__, NULL, __VA_ARGS__); \
251    } \
252  while (0)
253
254/**
255***
256**/
257
258static inline void
259managerLock (const struct tr_peerMgr * manager)
260{
261  tr_sessionLock (manager->session);
262}
263
264static inline void
265managerUnlock (const struct tr_peerMgr * manager)
266{
267  tr_sessionUnlock (manager->session);
268}
269
270static inline void
271swarmLock (tr_swarm * swarm)
272{
273  managerLock (swarm->manager);
274}
275
276static inline void
277swarmUnlock (tr_swarm * swarm)
278{
279  managerUnlock (swarm->manager);
280}
281
282static inline int
283swarmIsLocked (const tr_swarm * swarm)
284{
285  return tr_sessionIsLocked (swarm->manager->session);
286}
287
288/**
289***
290**/
291
292static int
293handshakeCompareToAddr (const void * va, const void * vb)
294{
295  const tr_handshake * a = va;
296
297  return tr_address_compare (tr_handshakeGetAddr (a, NULL), vb);
298}
299
300static int
301handshakeCompare (const void * a, const void * b)
302{
303  return handshakeCompareToAddr (a, tr_handshakeGetAddr (b, NULL));
304}
305
306static inline tr_handshake*
307getExistingHandshake (tr_ptrArray * handshakes, const tr_address * addr)
308{
309  if (tr_ptrArrayEmpty (handshakes))
310    return NULL;
311
312  return tr_ptrArrayFindSorted (handshakes, addr, handshakeCompareToAddr);
313}
314
315static int
316comparePeerAtomToAddress (const void * va, const void * vb)
317{
318  const struct peer_atom * a = va;
319
320  return tr_address_compare (&a->addr, vb);
321}
322
323static int
324compareAtomsByAddress (const void * va, const void * vb)
325{
326  const struct peer_atom * b = vb;
327
328  assert (tr_isAtom (b));
329
330  return comparePeerAtomToAddress (va, &b->addr);
331}
332
333/**
334***
335**/
336
337const tr_address *
338tr_peerAddress (const tr_peer * peer)
339{
340  return &peer->atom->addr;
341}
342
343static tr_swarm *
344getExistingSwarm (tr_peerMgr *    manager,
345                  const uint8_t * hash)
346{
347  tr_torrent * tor = tr_torrentFindFromHash (manager->session, hash);
348
349  return tor == NULL ? NULL : tor->swarm;
350}
351
352static int
353peerCompare (const void * a, const void * b)
354{
355  return tr_address_compare (tr_peerAddress (a), tr_peerAddress (b));
356}
357
358static struct peer_atom*
359getExistingAtom (const tr_swarm   * cswarm,
360                 const tr_address * addr)
361{
362  tr_swarm * swarm = (tr_swarm*) cswarm;
363  return tr_ptrArrayFindSorted (&swarm->pool, addr, comparePeerAtomToAddress);
364}
365
366static bool
367peerIsInUse (const tr_swarm * cs, const struct peer_atom * atom)
368{
369  tr_swarm * s = (tr_swarm*) cs;
370
371  assert (swarmIsLocked (s));
372
373  return (atom->peer != NULL)
374      || getExistingHandshake (&s->outgoingHandshakes, &atom->addr)
375      || getExistingHandshake (&s->manager->incomingHandshakes, &atom->addr);
376}
377
378void
379tr_peerConstruct (tr_peer * peer)
380{
381  memset (peer, 0, sizeof (tr_peer));
382
383  peer->have = TR_BITFIELD_INIT;
384}
385
386static tr_peer*
387peerNew (struct peer_atom * atom)
388{
389  tr_peer * peer = tr_new (tr_peer, 1);
390  tr_peerConstruct (peer);
391
392  peer->atom = atom;
393  atom->peer = peer;
394
395  return peer;
396}
397
398static tr_peer*
399getPeer (tr_swarm * s, struct peer_atom * atom)
400{
401  tr_peer * peer;
402
403  assert (swarmIsLocked (s));
404
405  peer = atom->peer;
406
407  if (peer == NULL)
408    {
409      peer = peerNew (atom);
410      tr_bitfieldConstruct (&peer->have, s->tor->info.pieceCount);
411      tr_bitfieldConstruct (&peer->blame, s->tor->blockCount);
412      tr_ptrArrayInsertSorted (&s->peers, peer, peerCompare);
413    }
414
415  return peer;
416}
417
418static void peerDeclinedAllRequests (tr_swarm *, const tr_peer *);
419
420void
421tr_peerDestruct (tr_torrent * tor, tr_peer * peer)
422{
423  assert (peer != NULL);
424
425  peerDeclinedAllRequests (tor->swarm, peer);
426
427  if (peer->msgs != NULL)
428    tr_peerMsgsFree (peer->msgs);
429
430  if (peer->io)
431    {
432      tr_peerIoClear (peer->io);
433      tr_peerIoUnref (peer->io); /* balanced by the ref in handshakeDoneCB () */
434    }
435
436  tr_bitfieldDestruct (&peer->have);
437  tr_bitfieldDestruct (&peer->blame);
438
439  if (peer->atom)
440    peer->atom->peer = NULL;
441}
442
443static void
444peerDelete (tr_swarm * s, tr_peer * peer)
445{
446  tr_peerDestruct (s->tor, peer);
447  tr_free (peer);
448}
449
450static inline bool
451replicationExists (const tr_swarm * s)
452{
453  return s->pieceReplication != NULL;
454}
455
456static void
457replicationFree (tr_swarm * s)
458{
459  tr_free (s->pieceReplication);
460  s->pieceReplication = NULL;
461  s->pieceReplicationSize = 0;
462}
463
464static void
465replicationNew (tr_swarm * s)
466{
467  tr_piece_index_t piece_i;
468  const tr_piece_index_t piece_count = s->tor->info.pieceCount;
469  tr_peer ** peers = (tr_peer**) tr_ptrArrayBase (&s->peers);
470  const int peer_count = tr_ptrArraySize (&s->peers);
471
472  assert (!replicationExists (s));
473
474  s->pieceReplicationSize = piece_count;
475  s->pieceReplication = tr_new0 (uint16_t, piece_count);
476
477  for (piece_i=0; piece_i<piece_count; ++piece_i)
478    {
479      int peer_i;
480      uint16_t r = 0;
481
482      for (peer_i=0; peer_i<peer_count; ++peer_i)
483        if (tr_bitfieldHas (&peers[peer_i]->have, piece_i))
484          ++r;
485
486      s->pieceReplication[piece_i] = r;
487    }
488}
489
490static void
491swarmFree (void * vs)
492{
493  tr_swarm * s = vs;
494
495  assert (s);
496  assert (!s->isRunning);
497  assert (swarmIsLocked (s));
498  assert (tr_ptrArrayEmpty (&s->outgoingHandshakes));
499  assert (tr_ptrArrayEmpty (&s->peers));
500
501  tr_ptrArrayDestruct (&s->webseeds, (PtrArrayForeachFunc)tr_webseedFree);
502  tr_ptrArrayDestruct (&s->pool, (PtrArrayForeachFunc)tr_free);
503  tr_ptrArrayDestruct (&s->outgoingHandshakes, NULL);
504  tr_ptrArrayDestruct (&s->peers, NULL);
505
506  replicationFree (s);
507
508  tr_free (s->requests);
509  tr_free (s->pieces);
510  tr_free (s);
511}
512
513static void peerCallbackFunc (tr_peer *, const tr_peer_event *, void *);
514
515static void
516rebuildWebseedArray (tr_swarm * s, tr_torrent * tor)
517{
518  unsigned int i;
519  const tr_info * inf = &tor->info;
520
521  /* clear the array */
522  tr_ptrArrayDestruct (&s->webseeds, (PtrArrayForeachFunc)tr_webseedFree);
523  s->webseeds = TR_PTR_ARRAY_INIT;
524
525  /* repopulate it */
526  for (i = 0; i < inf->webseedCount; ++i)
527    {
528      tr_webseed * w = tr_webseedNew (tor, inf->webseeds[i], peerCallbackFunc, s);
529      tr_ptrArrayAppend (&s->webseeds, w);
530    }
531}
532
533static tr_swarm *
534swarmNew (tr_peerMgr * manager, tr_torrent * tor)
535{
536  tr_swarm * s;
537
538  s = tr_new0 (tr_swarm, 1);
539  s->manager = manager;
540  s->tor = tor;
541  s->pool = TR_PTR_ARRAY_INIT;
542  s->peers = TR_PTR_ARRAY_INIT;
543  s->webseeds = TR_PTR_ARRAY_INIT;
544  s->outgoingHandshakes = TR_PTR_ARRAY_INIT;
545
546  rebuildWebseedArray (s, tor);
547
548  return s;
549}
550
551static void ensureMgrTimersExist (struct tr_peerMgr * m);
552
553tr_peerMgr*
554tr_peerMgrNew (tr_session * session)
555{
556  tr_peerMgr * m = tr_new0 (tr_peerMgr, 1);
557  m->session = session;
558  m->incomingHandshakes = TR_PTR_ARRAY_INIT;
559  ensureMgrTimersExist (m);
560  return m;
561}
562
563static void
564deleteTimer (struct event ** t)
565{
566  if (*t != NULL)
567    {
568      event_free (*t);
569      *t = NULL;
570    }
571}
572
573static void
574deleteTimers (struct tr_peerMgr * m)
575{
576  deleteTimer (&m->atomTimer);
577  deleteTimer (&m->bandwidthTimer);
578  deleteTimer (&m->rechokeTimer);
579  deleteTimer (&m->refillUpkeepTimer);
580}
581
582void
583tr_peerMgrFree (tr_peerMgr * manager)
584{
585  managerLock (manager);
586
587  deleteTimers (manager);
588
589  /* free the handshakes. Abort invokes handshakeDoneCB (), which removes
590   * the item from manager->handshakes, so this is a little roundabout... */
591  while (!tr_ptrArrayEmpty (&manager->incomingHandshakes))
592    tr_handshakeAbort (tr_ptrArrayNth (&manager->incomingHandshakes, 0));
593
594  tr_ptrArrayDestruct (&manager->incomingHandshakes, NULL);
595
596  managerUnlock (manager);
597  tr_free (manager);
598}
599
600static int
601clientIsDownloadingFrom (const tr_torrent * tor, const tr_peer * peer)
602{
603  if (!tr_torrentHasMetadata (tor))
604    return true;
605
606  return peer->clientIsInterested && !peer->clientIsChoked;
607}
608
609static int
610clientIsUploadingTo (const tr_peer * peer)
611{
612  return peer->peerIsInterested && !peer->peerIsChoked;
613}
614
615/***
616****
617***/
618
619void
620tr_peerMgrOnBlocklistChanged (tr_peerMgr * mgr)
621{
622  tr_torrent * tor = NULL;
623  tr_session * session = mgr->session;
624
625  /* we cache whether or not a peer is blocklisted...
626     since the blocklist has changed, erase that cached value */
627  while ((tor = tr_torrentNext (session, tor)))
628    {
629      int i;
630      tr_swarm * s = tor->swarm;
631      const int n = tr_ptrArraySize (&s->pool);
632      for (i=0; i<n; ++i)
633        {
634          struct peer_atom * atom = tr_ptrArrayNth (&s->pool, i);
635          atom->blocklisted = -1;
636        }
637    }
638}
639
640static bool
641isAtomBlocklisted (tr_session * session, struct peer_atom * atom)
642{
643  if (atom->blocklisted < 0)
644    atom->blocklisted = tr_sessionIsAddressBlocked (session, &atom->addr);
645
646  assert (tr_isBool (atom->blocklisted));
647  return atom->blocklisted;
648}
649
650
651/***
652****
653***/
654
655static void
656atomSetSeedProbability (struct peer_atom * atom, int seedProbability)
657{
658  assert (atom != NULL);
659  assert (-1<=seedProbability && seedProbability<=100);
660
661  atom->seedProbability = seedProbability;
662
663  if (seedProbability == 100)
664    atom->flags |= ADDED_F_SEED_FLAG;
665  else if (seedProbability != -1)
666    atom->flags &= ~ADDED_F_SEED_FLAG;
667}
668
669static inline bool
670atomIsSeed (const struct peer_atom * atom)
671{
672  return atom->seedProbability == 100;
673}
674
675static void
676atomSetSeed (const tr_swarm * s, struct peer_atom * atom)
677{
678  if (!atomIsSeed (atom))
679    {
680      tordbg (s, "marking peer %s as a seed", tr_atomAddrStr (atom));
681
682      atomSetSeedProbability (atom, 100);
683    }
684}
685
686
687bool
688tr_peerMgrPeerIsSeed (const tr_torrent  * tor,
689                      const tr_address  * addr)
690{
691  bool isSeed = false;
692  const tr_swarm * s = tor->swarm;
693  const struct peer_atom * atom = getExistingAtom (s, addr);
694
695  if (atom)
696    isSeed = atomIsSeed (atom);
697
698  return isSeed;
699}
700
701void
702tr_peerMgrSetUtpSupported (tr_torrent * tor, const tr_address * addr)
703{
704  struct peer_atom * atom = getExistingAtom (tor->swarm, addr);
705
706  if (atom)
707    atom->flags |= ADDED_F_UTP_FLAGS;
708}
709
710void
711tr_peerMgrSetUtpFailed (tr_torrent *tor, const tr_address *addr, bool failed)
712{
713  struct peer_atom * atom = getExistingAtom (tor->swarm, addr);
714
715  if (atom)
716    atom->utp_failed = failed;
717}
718
719
720/**
721***  REQUESTS
722***
723*** There are two data structures associated with managing block requests:
724***
725*** 1. tr_swarm::requests, an array of "struct block_request" which keeps
726***    track of which blocks have been requested, and when, and by which peers.
727***    This is list is used for (a) cancelling requests that have been pending
728***    for too long and (b) avoiding duplicate requests before endgame.
729***
730*** 2. tr_swarm::pieces, an array of "struct weighted_piece" which lists the
731***    pieces that we want to request. It's used to decide which blocks to
732***    return next when tr_peerMgrGetBlockRequests () is called.
733**/
734
735/**
736*** struct block_request
737**/
738
739static int
740compareReqByBlock (const void * va, const void * vb)
741{
742  const struct block_request * a = va;
743  const struct block_request * b = vb;
744
745  /* primary key: block */
746  if (a->block < b->block) return -1;
747  if (a->block > b->block) return 1;
748
749  /* secondary key: peer */
750  if (a->peer < b->peer) return -1;
751  if (a->peer > b->peer) return 1;
752
753  return 0;
754}
755
756static void
757requestListAdd (tr_swarm * s, tr_block_index_t block, tr_peer * peer)
758{
759  struct block_request key;
760
761  /* ensure enough room is available... */
762  if (s->requestCount + 1 >= s->requestAlloc)
763    {
764      const int CHUNK_SIZE = 128;
765      s->requestAlloc += CHUNK_SIZE;
766      s->requests = tr_renew (struct block_request,
767                              s->requests, s->requestAlloc);
768    }
769
770  /* populate the record we're inserting */
771  key.block = block;
772  key.peer = peer;
773  key.sentAt = tr_time ();
774
775  /* insert the request to our array... */
776  {
777    bool exact;
778    const int pos = tr_lowerBound (&key, s->requests, s->requestCount,
779                                   sizeof (struct block_request),
780                                   compareReqByBlock, &exact);
781    assert (!exact);
782    memmove (s->requests + pos + 1,
783             s->requests + pos,
784             sizeof (struct block_request) * (s->requestCount++ - pos));
785    s->requests[pos] = key;
786  }
787
788  if (peer != NULL)
789    {
790      ++peer->pendingReqsToPeer;
791      assert (peer->pendingReqsToPeer >= 0);
792    }
793
794  /*fprintf (stderr, "added request of block %lu from peer %s... "
795                     "there are now %d block\n",
796                     (unsigned long)block, tr_atomAddrStr (peer->atom), s->requestCount);*/
797}
798
799static struct block_request *
800requestListLookup (tr_swarm * s, tr_block_index_t block, const tr_peer * peer)
801{
802  struct block_request key;
803  key.block = block;
804  key.peer = (tr_peer*) peer;
805
806  return bsearch (&key, s->requests, s->requestCount,
807                  sizeof (struct block_request),
808                  compareReqByBlock);
809}
810
811/**
812 * Find the peers are we currently requesting the block
813 * with index @a block from and append them to @a peerArr.
814 */
815static void
816getBlockRequestPeers (tr_swarm * s, tr_block_index_t block,
817                      tr_ptrArray * peerArr)
818{
819  bool exact;
820  int i, pos;
821  struct block_request key;
822
823  key.block = block;
824  key.peer = NULL;
825  pos = tr_lowerBound (&key, s->requests, s->requestCount,
826                       sizeof (struct block_request),
827                       compareReqByBlock, &exact);
828
829  assert (!exact); /* shouldn't have a request with .peer == NULL */
830
831  for (i=pos; i<s->requestCount; ++i)
832    {
833      if (s->requests[i].block != block)
834        break;
835      tr_ptrArrayAppend (peerArr, s->requests[i].peer);
836    }
837}
838
839static void
840decrementPendingReqCount (const struct block_request * b)
841{
842  if (b->peer != NULL)
843    if (b->peer->pendingReqsToPeer > 0)
844      --b->peer->pendingReqsToPeer;
845}
846
847static void
848requestListRemove (tr_swarm * s, tr_block_index_t block, const tr_peer * peer)
849{
850  const struct block_request * b = requestListLookup (s, block, peer);
851
852  if (b != NULL)
853    {
854      const int pos = b - s->requests;
855      assert (pos < s->requestCount);
856
857      decrementPendingReqCount (b);
858
859      tr_removeElementFromArray (s->requests,
860                                 pos,
861                                 sizeof (struct block_request),
862                                 s->requestCount--);
863
864      /*fprintf (stderr, "removing request of block %lu from peer %s... "
865                         "there are now %d block requests left\n",
866                         (unsigned long)block, tr_atomAddrStr (peer->atom), t->requestCount);*/
867    }
868}
869
870static int
871countActiveWebseeds (const tr_swarm * s)
872{
873  int activeCount = 0;
874  const tr_webseed ** w = (const tr_webseed **) tr_ptrArrayBase (&s->webseeds);
875  const tr_webseed ** const wend = w + tr_ptrArraySize (&s->webseeds);
876
877  for (; w!=wend; ++w)
878    if (tr_webseedIsActive (*w))
879      ++activeCount;
880
881  return activeCount;
882}
883
884static bool
885testForEndgame (const tr_swarm * s)
886{
887  /* we consider ourselves to be in endgame if the number of bytes
888     we've got requested is >= the number of bytes left to download */
889  return (s->requestCount * s->tor->blockSize)
890               >= tr_cpLeftUntilDone (&s->tor->completion);
891}
892
893static void
894updateEndgame (tr_swarm * s)
895{
896  assert (s->requestCount >= 0);
897
898  if (!testForEndgame (s))
899    {
900      /* not in endgame */
901      s->endgame = 0;
902    }
903  else if (!s->endgame) /* only recalculate when endgame first begins */
904    {
905      int numDownloading = 0;
906      const tr_peer ** p = (const tr_peer **) tr_ptrArrayBase (&s->peers);
907      const tr_peer ** const pend = p + tr_ptrArraySize (&s->peers);
908
909      /* add the active bittorrent peers... */
910      for (; p!=pend; ++p)
911        if ((*p)->pendingReqsToPeer > 0)
912          ++numDownloading;
913
914      /* add the active webseeds... */
915      numDownloading += countActiveWebseeds (s);
916
917      /* average number of pending requests per downloading peer */
918      s->endgame = s->requestCount / MAX (numDownloading, 1);
919    }
920}
921
922
923/****
924*****
925*****  Piece List Manipulation / Accessors
926*****
927****/
928
929static inline void
930invalidatePieceSorting (tr_swarm * s)
931{
932  s->pieceSortState = PIECES_UNSORTED;
933}
934
935static const tr_torrent * weightTorrent;
936
937static const uint16_t * weightReplication;
938
939static void
940setComparePieceByWeightTorrent (tr_swarm * s)
941{
942  if (!replicationExists (s))
943    replicationNew (s);
944
945  weightTorrent = s->tor;
946  weightReplication = s->pieceReplication;
947}
948
949/* we try to create a "weight" s.t. high-priority pieces come before others,
950 * and that partially-complete pieces come before empty ones. */
951static int
952comparePieceByWeight (const void * va, const void * vb)
953{
954  const struct weighted_piece * a = va;
955  const struct weighted_piece * b = vb;
956  int ia, ib, missing, pending;
957  const tr_torrent * tor = weightTorrent;
958  const uint16_t * rep = weightReplication;
959
960  /* primary key: weight */
961  missing = tr_cpMissingBlocksInPiece (&tor->completion, a->index);
962  pending = a->requestCount;
963  ia = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
964  missing = tr_cpMissingBlocksInPiece (&tor->completion, b->index);
965  pending = b->requestCount;
966  ib = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
967  if (ia < ib) return -1;
968  if (ia > ib) return 1;
969
970  /* secondary key: higher priorities go first */
971  ia = tor->info.pieces[a->index].priority;
972  ib = tor->info.pieces[b->index].priority;
973  if (ia > ib) return -1;
974  if (ia < ib) return 1;
975
976  /* tertiary key: rarest first. */
977  ia = rep[a->index];
978  ib = rep[b->index];
979  if (ia < ib) return -1;
980  if (ia > ib) return 1;
981
982  /* quaternary key: random */
983  if (a->salt < b->salt) return -1;
984  if (a->salt > b->salt) return 1;
985
986  /* okay, they're equal */
987  return 0;
988}
989
990static int
991comparePieceByIndex (const void * va, const void * vb)
992{
993  const struct weighted_piece * a = va;
994  const struct weighted_piece * b = vb;
995  if (a->index < b->index) return -1;
996  if (a->index > b->index) return 1;
997  return 0;
998}
999
1000static void
1001pieceListSort (tr_swarm * s, enum piece_sort_state state)
1002{
1003  assert (state==PIECES_SORTED_BY_INDEX
1004       || state==PIECES_SORTED_BY_WEIGHT);
1005
1006  if (state == PIECES_SORTED_BY_WEIGHT)
1007    {
1008      setComparePieceByWeightTorrent (s);
1009      qsort (s->pieces, s->pieceCount, sizeof (struct weighted_piece), comparePieceByWeight);
1010    }
1011  else
1012    {
1013      qsort (s->pieces, s->pieceCount, sizeof (struct weighted_piece), comparePieceByIndex);
1014    }
1015
1016  s->pieceSortState = state;
1017}
1018
1019/**
1020 * These functions are useful for testing, but too expensive for nightly builds.
1021 * let's leave it disabled but add an easy hook to compile it back in
1022 */
1023#if 1
1024#define assertWeightedPiecesAreSorted(t)
1025#define assertReplicationCountIsExact(t)
1026#else
1027static void
1028assertWeightedPiecesAreSorted (Torrent * t)
1029{
1030    if (!t->endgame)
1031    {
1032        int i;
1033        setComparePieceByWeightTorrent (t);
1034        for (i=0; i<t->pieceCount-1; ++i)
1035            assert (comparePieceByWeight (&t->pieces[i], &t->pieces[i+1]) <= 0);
1036    }
1037}
1038static void
1039assertReplicationCountIsExact (Torrent * t)
1040{
1041    /* This assert might fail due to errors of implementations in other
1042     * clients. It happens when receiving duplicate bitfields/HaveAll/HaveNone
1043     * from a client. If a such a behavior is noticed,
1044     * a bug report should be filled to the faulty client. */
1045
1046    size_t piece_i;
1047    const uint16_t * rep = t->pieceReplication;
1048    const size_t piece_count = t->pieceReplicationSize;
1049    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase (&t->peers);
1050    const int peer_count = tr_ptrArraySize (&t->peers);
1051
1052    assert (piece_count == t->tor->info.pieceCount);
1053
1054    for (piece_i=0; piece_i<piece_count; ++piece_i)
1055    {
1056        int peer_i;
1057        uint16_t r = 0;
1058
1059        for (peer_i=0; peer_i<peer_count; ++peer_i)
1060            if (tr_bitsetHas (&peers[peer_i]->have, piece_i))
1061                ++r;
1062
1063        assert (rep[piece_i] == r);
1064    }
1065}
1066#endif
1067
1068static struct weighted_piece *
1069pieceListLookup (tr_swarm * s, tr_piece_index_t index)
1070{
1071  int i;
1072
1073  for (i=0; i<s->pieceCount; ++i)
1074    if (s->pieces[i].index == index)
1075      return &s->pieces[i];
1076
1077  return NULL;
1078}
1079
1080static void
1081pieceListRebuild (tr_swarm * s)
1082{
1083  if (!tr_torrentIsSeed (s->tor))
1084    {
1085      tr_piece_index_t i;
1086      tr_piece_index_t * pool;
1087      tr_piece_index_t poolCount = 0;
1088      const tr_torrent * tor = s->tor;
1089      const tr_info * inf = tr_torrentInfo (tor);
1090      struct weighted_piece * pieces;
1091      int pieceCount;
1092
1093      /* build the new list */
1094      pool = tr_new (tr_piece_index_t, inf->pieceCount);
1095      for (i=0; i<inf->pieceCount; ++i)
1096        if (!inf->pieces[i].dnd)
1097          if (!tr_cpPieceIsComplete (&tor->completion, i))
1098            pool[poolCount++] = i;
1099      pieceCount = poolCount;
1100      pieces = tr_new0 (struct weighted_piece, pieceCount);
1101      for (i=0; i<poolCount; ++i)
1102        {
1103          struct weighted_piece * piece = pieces + i;
1104          piece->index = pool[i];
1105          piece->requestCount = 0;
1106          piece->salt = tr_cryptoWeakRandInt (4096);
1107        }
1108
1109      /* if we already had a list of pieces, merge it into
1110       * the new list so we don't lose its requestCounts */
1111      if (s->pieces != NULL)
1112        {
1113          struct weighted_piece * o = s->pieces;
1114          struct weighted_piece * oend = o + s->pieceCount;
1115          struct weighted_piece * n = pieces;
1116          struct weighted_piece * nend = n + pieceCount;
1117
1118          pieceListSort (s, PIECES_SORTED_BY_INDEX);
1119
1120          while (o!=oend && n!=nend)
1121            {
1122              if (o->index < n->index)
1123                ++o;
1124              else if (o->index > n->index)
1125                ++n;
1126              else
1127                *n++ = *o++;
1128            }
1129
1130          tr_free (s->pieces);
1131        }
1132
1133      s->pieces = pieces;
1134      s->pieceCount = pieceCount;
1135
1136      pieceListSort (s, PIECES_SORTED_BY_WEIGHT);
1137
1138      /* cleanup */
1139      tr_free (pool);
1140    }
1141}
1142
1143static void
1144pieceListRemovePiece (tr_swarm * s, tr_piece_index_t piece)
1145{
1146  struct weighted_piece * p;
1147
1148  if ((p = pieceListLookup (s, piece)))
1149    {
1150      const int pos = p - s->pieces;
1151
1152      tr_removeElementFromArray (s->pieces,
1153                                 pos,
1154                                 sizeof (struct weighted_piece),
1155                                 s->pieceCount--);
1156
1157      if (s->pieceCount == 0)
1158        {
1159          tr_free (s->pieces);
1160          s->pieces = NULL;
1161        }
1162    }
1163}
1164
1165static void
1166pieceListResortPiece (tr_swarm * s, struct weighted_piece * p)
1167{
1168  int pos;
1169  bool isSorted = true;
1170
1171  if (p == NULL)
1172    return;
1173
1174  /* is the torrent already sorted? */
1175  pos = p - s->pieces;
1176  setComparePieceByWeightTorrent (s);
1177  if (isSorted && (pos > 0) && (comparePieceByWeight (p-1, p) > 0))
1178    isSorted = false;
1179  if (isSorted && (pos < s->pieceCount - 1) && (comparePieceByWeight (p, p+1) > 0))
1180    isSorted = false;
1181
1182  if (s->pieceSortState != PIECES_SORTED_BY_WEIGHT)
1183    {
1184      pieceListSort (s, PIECES_SORTED_BY_WEIGHT);
1185      isSorted = true;
1186    }
1187
1188  /* if it's not sorted, move it around */
1189  if (!isSorted)
1190    {
1191      bool exact;
1192      const struct weighted_piece tmp = *p;
1193
1194      tr_removeElementFromArray (s->pieces,
1195                                 pos,
1196                                 sizeof (struct weighted_piece),
1197                                 s->pieceCount--);
1198
1199      pos = tr_lowerBound (&tmp, s->pieces, s->pieceCount,
1200                           sizeof (struct weighted_piece),
1201                           comparePieceByWeight, &exact);
1202
1203      memmove (&s->pieces[pos + 1],
1204               &s->pieces[pos],
1205               sizeof (struct weighted_piece) * (s->pieceCount++ - pos));
1206
1207      s->pieces[pos] = tmp;
1208    }
1209
1210  assertWeightedPiecesAreSorted (s);
1211}
1212
1213static void
1214pieceListRemoveRequest (tr_swarm * s, tr_block_index_t block)
1215{
1216  struct weighted_piece * p;
1217  const tr_piece_index_t index = tr_torBlockPiece (s->tor, block);
1218
1219  if (((p = pieceListLookup (s, index))) && (p->requestCount > 0))
1220    {
1221      --p->requestCount;
1222      pieceListResortPiece (s, p);
1223    }
1224}
1225
1226
1227/****
1228*****
1229*****  Replication count (for rarest first policy)
1230*****
1231****/
1232
1233/**
1234 * Increase the replication count of this piece and sort it if the
1235 * piece list is already sorted
1236 */
1237static void
1238tr_incrReplicationOfPiece (tr_swarm * s, const size_t index)
1239{
1240  assert (replicationExists (s));
1241  assert (s->pieceReplicationSize == s->tor->info.pieceCount);
1242
1243  /* One more replication of this piece is present in the swarm */
1244  ++s->pieceReplication[index];
1245
1246  /* we only resort the piece if the list is already sorted */
1247  if (s->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1248    pieceListResortPiece (s, pieceListLookup (s, index));
1249}
1250
1251/**
1252 * Increases the replication count of pieces present in the bitfield
1253 */
1254static void
1255tr_incrReplicationFromBitfield (tr_swarm * s, const tr_bitfield * b)
1256{
1257  size_t i;
1258  uint16_t * rep = s->pieceReplication;
1259  const size_t n = s->tor->info.pieceCount;
1260
1261  assert (replicationExists (s));
1262
1263  for (i=0; i<n; ++i)
1264    if (tr_bitfieldHas (b, i))
1265      ++rep[i];
1266
1267  if (s->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1268    invalidatePieceSorting (s);
1269}
1270
1271/**
1272 * Increase the replication count of every piece
1273 */
1274static void
1275tr_incrReplication (tr_swarm * s)
1276{
1277  int i;
1278  const int n = s->pieceReplicationSize;
1279
1280  assert (replicationExists (s));
1281  assert (s->pieceReplicationSize == s->tor->info.pieceCount);
1282
1283  for (i=0; i<n; ++i)
1284    ++s->pieceReplication[i];
1285}
1286
1287/**
1288 * Decrease the replication count of pieces present in the bitset.
1289 */
1290static void
1291tr_decrReplicationFromBitfield (tr_swarm * s, const tr_bitfield * b)
1292{
1293  int i;
1294  const int n = s->pieceReplicationSize;
1295
1296  assert (replicationExists (s));
1297  assert (s->pieceReplicationSize == s->tor->info.pieceCount);
1298
1299  if (tr_bitfieldHasAll (b))
1300    {
1301      for (i=0; i<n; ++i)
1302        --s->pieceReplication[i];
1303    }
1304  else if (!tr_bitfieldHasNone (b))
1305    {
1306      for (i=0; i<n; ++i)
1307        if (tr_bitfieldHas (b, i))
1308          --s->pieceReplication[i];
1309
1310      if (s->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1311        invalidatePieceSorting (s);
1312    }
1313}
1314
1315/**
1316***
1317**/
1318
1319void
1320tr_peerMgrRebuildRequests (tr_torrent * tor)
1321{
1322  assert (tr_isTorrent (tor));
1323
1324  pieceListRebuild (tor->swarm);
1325}
1326
1327void
1328tr_peerMgrGetNextRequests (tr_torrent           * tor,
1329                           tr_peer              * peer,
1330                           int                    numwant,
1331                           tr_block_index_t     * setme,
1332                           int                  * numgot,
1333                           bool                   get_intervals)
1334{
1335  int i;
1336  int got;
1337  tr_swarm * s;
1338  struct weighted_piece * pieces;
1339  const tr_bitfield * const have = &peer->have;
1340
1341  /* sanity clause */
1342  assert (tr_isTorrent (tor));
1343  assert (peer->clientIsInterested);
1344  assert (!peer->clientIsChoked);
1345  assert (numwant > 0);
1346
1347  /* walk through the pieces and find blocks that should be requested */
1348  got = 0;
1349  s = tor->swarm;
1350
1351  /* prep the pieces list */
1352  if (s->pieces == NULL)
1353    pieceListRebuild (s);
1354
1355  if (s->pieceSortState != PIECES_SORTED_BY_WEIGHT)
1356    pieceListSort (s, PIECES_SORTED_BY_WEIGHT);
1357
1358  assertReplicationCountIsExact (s);
1359  assertWeightedPiecesAreSorted (s);
1360
1361  updateEndgame (s);
1362  pieces = s->pieces;
1363  for (i=0; i<s->pieceCount && got<numwant; ++i)
1364    {
1365      struct weighted_piece * p = pieces + i;
1366
1367      /* if the peer has this piece that we want... */
1368      if (tr_bitfieldHas (have, p->index))
1369        {
1370          tr_block_index_t b;
1371          tr_block_index_t first;
1372          tr_block_index_t last;
1373          tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1374
1375          tr_torGetPieceBlockRange (tor, p->index, &first, &last);
1376
1377          for (b=first; b<=last && (got<numwant || (get_intervals && setme[2*got-1] == b-1)); ++b)
1378            {
1379              int peerCount;
1380              tr_peer ** peers;
1381
1382              /* don't request blocks we've already got */
1383              if (tr_cpBlockIsComplete (&tor->completion, b))
1384                continue;
1385
1386              /* always add peer if this block has no peers yet */
1387              tr_ptrArrayClear (&peerArr);
1388              getBlockRequestPeers (s, b, &peerArr);
1389              peers = (tr_peer **) tr_ptrArrayPeek (&peerArr, &peerCount);
1390              if (peerCount != 0)
1391                {
1392                  /* don't make a second block request until the endgame */
1393                  if (!s->endgame)
1394                    continue;
1395
1396                  /* don't have more than two peers requesting this block */
1397                  if (peerCount > 1)
1398                    continue;
1399
1400                  /* don't send the same request to the same peer twice */
1401                  if (peer == peers[0])
1402                    continue;
1403
1404                  /* in the endgame allow an additional peer to download a
1405                     block but only if the peer seems to be handling requests
1406                     relatively fast */
1407                  if (peer->pendingReqsToPeer + numwant - got < s->endgame)
1408                    continue;
1409                }
1410
1411              /* update the caller's table */
1412              if (!get_intervals)
1413                {
1414                  setme[got++] = b;
1415                }
1416              /* if intervals are requested two array entries are necessarry:
1417                 one for the interval's starting block and one for its end block */
1418              else if (got && setme[2 * got - 1] == b - 1 && b != first)
1419                {
1420                  /* expand the last interval */
1421                  ++setme[2 * got - 1];
1422                }
1423              else
1424                {
1425                  /* begin a new interval */
1426                  setme[2 * got] = setme[2 * got + 1] = b;
1427                  ++got;
1428                }
1429
1430              /* update our own tables */
1431              requestListAdd (s, b, peer);
1432              ++p->requestCount;
1433            }
1434
1435          tr_ptrArrayDestruct (&peerArr, NULL);
1436        }
1437    }
1438
1439  /* In most cases we've just changed the weights of a small number of pieces.
1440   * So rather than qsort ()ing the entire array, it's faster to apply an
1441   * adaptive insertion sort algorithm. */
1442  if (got > 0)
1443    {
1444      /* not enough requests || last piece modified */
1445      if (i == s->pieceCount)
1446        --i;
1447
1448      setComparePieceByWeightTorrent (s);
1449      while (--i >= 0)
1450        {
1451          bool exact;
1452
1453          /* relative position! */
1454          const int newpos = tr_lowerBound (&s->pieces[i], &s->pieces[i + 1],
1455                                            s->pieceCount - (i + 1),
1456                                            sizeof (struct weighted_piece),
1457                                            comparePieceByWeight, &exact);
1458          if (newpos > 0)
1459            {
1460              const struct weighted_piece piece = s->pieces[i];
1461              memmove (&s->pieces[i],
1462                       &s->pieces[i + 1],
1463                       sizeof (struct weighted_piece) * (newpos));
1464              s->pieces[i + newpos] = piece;
1465            }
1466        }
1467    }
1468
1469  assertWeightedPiecesAreSorted (t);
1470  *numgot = got;
1471}
1472
1473bool
1474tr_peerMgrDidPeerRequest (const tr_torrent  * tor,
1475                          const tr_peer     * peer,
1476                          tr_block_index_t    block)
1477{
1478  return requestListLookup ((tr_swarm*)tor->swarm, block, peer) != NULL;
1479}
1480
1481/* cancel requests that are too old */
1482static void
1483refillUpkeep (int foo UNUSED, short bar UNUSED, void * vmgr)
1484{
1485    time_t now;
1486    time_t too_old;
1487    tr_torrent * tor;
1488    int cancel_buflen = 0;
1489    struct block_request * cancel = NULL;
1490    tr_peerMgr * mgr = vmgr;
1491    managerLock (mgr);
1492
1493    now = tr_time ();
1494    too_old = now - REQUEST_TTL_SECS;
1495
1496    /* alloc the temporary "cancel" buffer */
1497    tor = NULL;
1498    while ((tor = tr_torrentNext (mgr->session, tor)))
1499        cancel_buflen = MAX (cancel_buflen, tor->swarm->requestCount);
1500    if (cancel_buflen > 0)
1501        cancel = tr_new (struct block_request, cancel_buflen);
1502
1503    /* prune requests that are too old */
1504    tor = NULL;
1505    while ((tor = tr_torrentNext (mgr->session, tor)))
1506    {
1507        tr_swarm * s = tor->swarm;
1508        const int n = s->requestCount;
1509        if (n > 0)
1510        {
1511            int keepCount = 0;
1512            int cancelCount = 0;
1513            const struct block_request * it;
1514            const struct block_request * end;
1515
1516            for (it=s->requests, end=it+n; it!=end; ++it)
1517            {
1518                if ((it->sentAt <= too_old) && it->peer->msgs && !tr_peerMsgsIsReadingBlock (it->peer->msgs, it->block))
1519                    cancel[cancelCount++] = *it;
1520                else
1521                {
1522                    if (it != &s->requests[keepCount])
1523                        s->requests[keepCount] = *it;
1524                    keepCount++;
1525                }
1526            }
1527
1528            /* prune out the ones we aren't keeping */
1529            s->requestCount = keepCount;
1530
1531            /* send cancel messages for all the "cancel" ones */
1532            for (it=cancel, end=it+cancelCount; it!=end; ++it) {
1533                if ((it->peer != NULL) && (it->peer->msgs != NULL)) {
1534                    tr_historyAdd (&it->peer->cancelsSentToPeer, now, 1);
1535                    tr_peerMsgsCancel (it->peer->msgs, it->block);
1536                    decrementPendingReqCount (it);
1537                }
1538            }
1539
1540            /* decrement the pending request counts for the timed-out blocks */
1541            for (it=cancel, end=it+cancelCount; it!=end; ++it)
1542                pieceListRemoveRequest (s, it->block);
1543        }
1544    }
1545
1546    tr_free (cancel);
1547    tr_timerAddMsec (mgr->refillUpkeepTimer, REFILL_UPKEEP_PERIOD_MSEC);
1548    managerUnlock (mgr);
1549}
1550
1551static void
1552addStrike (tr_swarm * s, tr_peer * peer)
1553{
1554  tordbg (s, "increasing peer %s strike count to %d",
1555          tr_atomAddrStr (peer->atom), peer->strikes + 1);
1556
1557  if (++peer->strikes >= MAX_BAD_PIECES_PER_PEER)
1558    {
1559      struct peer_atom * atom = peer->atom;
1560      atom->flags2 |= MYFLAG_BANNED;
1561      peer->doPurge = 1;
1562      tordbg (s, "banning peer %s", tr_atomAddrStr (atom));
1563    }
1564}
1565
1566static void
1567peerSuggestedPiece (tr_swarm           * s UNUSED,
1568                    tr_peer            * peer UNUSED,
1569                    tr_piece_index_t     pieceIndex UNUSED,
1570                    int                  isFastAllowed UNUSED)
1571{
1572#if 0
1573    assert (t);
1574    assert (peer);
1575    assert (peer->msgs);
1576
1577    /* is this a valid piece? */
1578    if (pieceIndex >= t->tor->info.pieceCount)
1579        return;
1580
1581    /* don't ask for it if we've already got it */
1582    if (tr_cpPieceIsComplete (t->tor->completion, pieceIndex))
1583        return;
1584
1585    /* don't ask for it if they don't have it */
1586    if (!tr_bitfieldHas (peer->have, pieceIndex))
1587        return;
1588
1589    /* don't ask for it if we're choked and it's not fast */
1590    if (!isFastAllowed && peer->clientIsChoked)
1591        return;
1592
1593    /* request the blocks that we don't have in this piece */
1594    {
1595        tr_block_index_t b;
1596        tr_block_index_t first;
1597        tr_block_index_t last;
1598        const tr_torrent * tor = t->tor;
1599
1600        tr_torGetPieceBlockRange (t->tor, pieceIndex, &first, &last);
1601
1602        for (b=first; b<=last; ++b)
1603        {
1604            if (!tr_cpBlockIsComplete (tor->completion, b))
1605            {
1606                const uint32_t offset = getBlockOffsetInPiece (tor, b);
1607                const uint32_t length = tr_torBlockCountBytes (tor, b);
1608                tr_peerMsgsAddRequest (peer->msgs, pieceIndex, offset, length);
1609                incrementPieceRequests (t, pieceIndex);
1610            }
1611        }
1612    }
1613#endif
1614}
1615
1616static void
1617removeRequestFromTables (tr_swarm * s, tr_block_index_t block, const tr_peer * peer)
1618{
1619  requestListRemove (s, block, peer);
1620  pieceListRemoveRequest (s, block);
1621}
1622
1623/* peer choked us, or maybe it disconnected.
1624   either way we need to remove all its requests */
1625static void
1626peerDeclinedAllRequests (tr_swarm * s, const tr_peer * peer)
1627{
1628  int i, n;
1629  tr_block_index_t * blocks = tr_new (tr_block_index_t, s->requestCount);
1630
1631  for (i=n=0; i<s->requestCount; ++i)
1632    if (peer == s->requests[i].peer)
1633      blocks[n++] = s->requests[i].block;
1634
1635  for (i=0; i<n; ++i)
1636    removeRequestFromTables (s, blocks[i], peer);
1637
1638  tr_free (blocks);
1639}
1640
1641static void
1642cancelAllRequestsForBlock (tr_swarm          * s,
1643                           tr_block_index_t    block,
1644                           tr_peer           * no_notify)
1645{
1646  int i;
1647  int peerCount;
1648  tr_peer ** peers;
1649  tr_ptrArray peerArr;
1650
1651  peerArr = TR_PTR_ARRAY_INIT;
1652  getBlockRequestPeers (s, block, &peerArr);
1653  peers = (tr_peer **) tr_ptrArrayPeek (&peerArr, &peerCount);
1654  for (i=0; i<peerCount; ++i)
1655    {
1656      tr_peer * p = peers[i];
1657
1658      if ((p != no_notify) && (p->msgs != NULL))
1659        {
1660          tr_historyAdd (&p->cancelsSentToPeer, tr_time (), 1);
1661          tr_peerMsgsCancel (p->msgs, block);
1662        }
1663
1664      removeRequestFromTables (s, block, p);
1665    }
1666
1667  tr_ptrArrayDestruct (&peerArr, NULL);
1668}
1669
1670void
1671tr_peerMgrPieceCompleted (tr_torrent * tor, tr_piece_index_t p)
1672{
1673  bool pieceCameFromPeers = false;
1674  tr_swarm * const s = tor->swarm;
1675  const tr_peer ** peer = (const tr_peer **) tr_ptrArrayBase (&s->peers);
1676  const tr_peer ** const pend = peer + tr_ptrArraySize (&s->peers);
1677
1678  /* walk through our peers */
1679  for ( ; peer!=pend; ++peer)
1680    {
1681      /* notify the peer that we now have this piece */
1682      tr_peerMsgsHave ((*peer)->msgs, p);
1683
1684      if (!pieceCameFromPeers)
1685        pieceCameFromPeers = tr_bitfieldHas (&(*peer)->blame, p);
1686    }
1687
1688  if (pieceCameFromPeers) /* webseed downloads don't belong in announce totals */
1689    tr_announcerAddBytes (tor, TR_ANN_DOWN, tr_torPieceCountBytes (tor, p));
1690
1691  /* bookkeeping */
1692  pieceListRemovePiece (s, p);
1693  s->needsCompletenessCheck = true;
1694}
1695
1696static void
1697peerCallbackFunc (tr_peer * peer, const tr_peer_event * e, void * vs)
1698{
1699  tr_swarm * s = vs;
1700
1701  swarmLock (s);
1702
1703  assert (peer != NULL);
1704
1705  switch (e->eventType)
1706    {
1707      case TR_PEER_PEER_GOT_PIECE_DATA:
1708        {
1709          const time_t now = tr_time ();
1710          tr_torrent * tor = s->tor;
1711
1712          tor->uploadedCur += e->length;
1713          tr_announcerAddBytes (tor, TR_ANN_UP, e->length);
1714          tr_torrentSetActivityDate (tor, now);
1715          tr_torrentSetDirty (tor);
1716          tr_statsAddUploaded (tor->session, e->length);
1717
1718          if (peer->atom != NULL)
1719            peer->atom->piece_data_time = now;
1720
1721          break;
1722        }
1723
1724      case TR_PEER_CLIENT_GOT_PIECE_DATA:
1725        {
1726          const time_t now = tr_time ();
1727          tr_torrent * tor = s->tor;
1728
1729          tor->downloadedCur += e->length;
1730          tr_torrentSetActivityDate (tor, now);
1731          tr_torrentSetDirty (tor);
1732
1733          tr_statsAddDownloaded (tor->session, e->length);
1734
1735          if (peer->atom != NULL)
1736            peer->atom->piece_data_time = now;
1737
1738          break;
1739        }
1740
1741      case TR_PEER_CLIENT_GOT_HAVE:
1742        if (replicationExists (s))
1743          {
1744            tr_incrReplicationOfPiece (s, e->pieceIndex);
1745            assertReplicationCountIsExact (s);
1746          }
1747        break;
1748
1749      case TR_PEER_CLIENT_GOT_HAVE_ALL:
1750        if (replicationExists (s))
1751          {
1752            tr_incrReplication (s);
1753            assertReplicationCountIsExact (s);
1754          }
1755        break;
1756
1757      case TR_PEER_CLIENT_GOT_HAVE_NONE:
1758        /* noop */
1759        break;
1760
1761      case TR_PEER_CLIENT_GOT_BITFIELD:
1762        assert (e->bitfield != NULL);
1763        if (replicationExists (s))
1764          {
1765            tr_incrReplicationFromBitfield (s, e->bitfield);
1766            assertReplicationCountIsExact (s);
1767          }
1768        break;
1769
1770      case TR_PEER_CLIENT_GOT_REJ:
1771        {
1772          tr_block_index_t b = _tr_block (s->tor, e->pieceIndex, e->offset);
1773          if (b < s->tor->blockCount)
1774            removeRequestFromTables (s, b, peer);
1775          else
1776            tordbg (s, "Peer %s sent an out-of-range reject message",
1777                    tr_atomAddrStr (peer->atom));
1778          break;
1779        }
1780
1781      case TR_PEER_CLIENT_GOT_CHOKE:
1782        peerDeclinedAllRequests (s, peer);
1783        break;
1784
1785      case TR_PEER_CLIENT_GOT_PORT:
1786        if (peer->atom)
1787          peer->atom->port = e->port;
1788        break;
1789
1790      case TR_PEER_CLIENT_GOT_SUGGEST:
1791        peerSuggestedPiece (s, peer, e->pieceIndex, false);
1792        break;
1793
1794      case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1795        peerSuggestedPiece (s, peer, e->pieceIndex, true);
1796        break;
1797
1798      case TR_PEER_CLIENT_GOT_BLOCK:
1799        {
1800          tr_torrent * tor = s->tor;
1801          const tr_piece_index_t p = e->pieceIndex;
1802          const tr_block_index_t block = _tr_block (tor, p, e->offset);
1803          cancelAllRequestsForBlock (s, block, peer);
1804          tr_historyAdd (&peer->blocksSentToClient, tr_time(), 1);
1805          pieceListResortPiece (s, pieceListLookup (s, p));
1806          tr_torrentGotBlock (tor, block);
1807          break;
1808        }
1809
1810      case TR_PEER_ERROR:
1811        if ((e->err == ERANGE) || (e->err == EMSGSIZE) || (e->err == ENOTCONN))
1812          {
1813            /* some protocol error from the peer */
1814            peer->doPurge = 1;
1815            tordbg (s, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1816                    tr_atomAddrStr (peer->atom));
1817          }
1818        else
1819          {
1820            tordbg (s, "unhandled error: %s", tr_strerror (e->err));
1821          }
1822        break;
1823
1824      default:
1825        assert (0);
1826    }
1827
1828  swarmUnlock (s);
1829}
1830
1831static int
1832getDefaultShelfLife (uint8_t from)
1833{
1834    /* in general, peers obtained from firsthand contact
1835     * are better than those from secondhand, etc etc */
1836    switch (from)
1837    {
1838        case TR_PEER_FROM_INCOMING : return 60 * 60 * 6;
1839        case TR_PEER_FROM_LTEP     : return 60 * 60 * 6;
1840        case TR_PEER_FROM_TRACKER  : return 60 * 60 * 3;
1841        case TR_PEER_FROM_DHT      : return 60 * 60 * 3;
1842        case TR_PEER_FROM_PEX      : return 60 * 60 * 2;
1843        case TR_PEER_FROM_RESUME   : return 60 * 60;
1844        case TR_PEER_FROM_LPD      : return 10 * 60;
1845        default                    : return 60 * 60;
1846    }
1847}
1848
1849static void
1850ensureAtomExists (tr_swarm          * s,
1851                  const tr_address  * addr,
1852                  const tr_port       port,
1853                  const uint8_t       flags,
1854                  const int8_t        seedProbability,
1855                  const uint8_t       from)
1856{
1857  struct peer_atom * a;
1858
1859  assert (tr_address_is_valid (addr));
1860  assert (from < TR_PEER_FROM__MAX);
1861
1862  a = getExistingAtom (s, addr);
1863
1864  if (a == NULL)
1865    {
1866      const int jitter = tr_cryptoWeakRandInt (60*10);
1867      a = tr_new0 (struct peer_atom, 1);
1868      a->addr = *addr;
1869      a->port = port;
1870      a->flags = flags;
1871      a->fromFirst = from;
1872      a->fromBest = from;
1873      a->shelf_date = tr_time () + getDefaultShelfLife (from) + jitter;
1874      a->blocklisted = -1;
1875      atomSetSeedProbability (a, seedProbability);
1876      tr_ptrArrayInsertSorted (&s->pool, a, compareAtomsByAddress);
1877
1878      tordbg (s, "got a new atom: %s", tr_atomAddrStr (a));
1879    }
1880  else
1881    {
1882      if (from < a->fromBest)
1883        a->fromBest = from;
1884
1885      if (a->seedProbability == -1)
1886        atomSetSeedProbability (a, seedProbability);
1887
1888      a->flags |= flags;
1889    }
1890}
1891
1892static int
1893getMaxPeerCount (const tr_torrent * tor)
1894{
1895  return tor->maxConnectedPeers;
1896}
1897
1898static int
1899getPeerCount (const tr_swarm * s)
1900{
1901  return tr_ptrArraySize (&s->peers);/* + tr_ptrArraySize (&t->outgoingHandshakes); */
1902}
1903
1904/* FIXME: this is kind of a mess. */
1905static bool
1906myHandshakeDoneCB (tr_handshake  * handshake,
1907                   tr_peerIo     * io,
1908                   bool            readAnythingFromPeer,
1909                   bool            isConnected,
1910                   const uint8_t * peer_id,
1911                   void          * vmanager)
1912{
1913  bool ok = isConnected;
1914  bool success = false;
1915  tr_port port;
1916  const tr_address * addr;
1917  tr_peerMgr * manager = vmanager;
1918  tr_swarm  * s;
1919  tr_handshake * ours;
1920
1921  assert (io);
1922  assert (tr_isBool (ok));
1923
1924  s = tr_peerIoHasTorrentHash (io)
1925    ? getExistingSwarm (manager, tr_peerIoGetTorrentHash (io))
1926    : NULL;
1927
1928  if (tr_peerIoIsIncoming (io))
1929    ours = tr_ptrArrayRemoveSorted (&manager->incomingHandshakes,
1930                                    handshake, handshakeCompare);
1931  else if (s)
1932    ours = tr_ptrArrayRemoveSorted (&s->outgoingHandshakes,
1933                                    handshake, handshakeCompare);
1934  else
1935    ours = handshake;
1936
1937  assert (ours);
1938  assert (ours == handshake);
1939
1940  if (s)
1941    swarmLock (s);
1942
1943  addr = tr_peerIoGetAddress (io, &port);
1944
1945  if (!ok || !s || !s->isRunning)
1946    {
1947      if (s)
1948        {
1949          struct peer_atom * atom = getExistingAtom (s, addr);
1950          if (atom)
1951            {
1952              ++atom->numFails;
1953
1954              if (!readAnythingFromPeer)
1955                {
1956                  tordbg (s, "marking peer %s as unreachable... numFails is %d", tr_atomAddrStr (atom), (int)atom->numFails);
1957                  atom->flags2 |= MYFLAG_UNREACHABLE;
1958                }
1959            }
1960        }
1961    }
1962  else /* looking good */
1963    {
1964      struct peer_atom * atom;
1965
1966      ensureAtomExists (s, addr, port, 0, -1, TR_PEER_FROM_INCOMING);
1967      atom = getExistingAtom (s, addr);
1968      atom->time = tr_time ();
1969      atom->piece_data_time = 0;
1970      atom->lastConnectionAt = tr_time ();
1971
1972      if (!tr_peerIoIsIncoming (io))
1973        {
1974          atom->flags |= ADDED_F_CONNECTABLE;
1975          atom->flags2 &= ~MYFLAG_UNREACHABLE;
1976        }
1977
1978      /* In principle, this flag specifies whether the peer groks uTP,
1979         not whether it's currently connected over uTP. */
1980      if (io->utp_socket)
1981        atom->flags |= ADDED_F_UTP_FLAGS;
1982
1983      if (atom->flags2 & MYFLAG_BANNED)
1984        {
1985          tordbg (s, "banned peer %s tried to reconnect",
1986                  tr_atomAddrStr (atom));
1987        }
1988      else if (tr_peerIoIsIncoming (io) && (getPeerCount (s) >= getMaxPeerCount (s->tor)))
1989        {
1990        }
1991      else
1992        {
1993          tr_peer * peer = atom->peer;
1994
1995          if (peer) /* we already have this peer */
1996            {
1997            }
1998          else
1999            {
2000              peer = getPeer (s, atom);
2001
2002              if (!peer_id)
2003                peer->client = TR_KEY_NONE;
2004              else
2005                {
2006                  char client[128];
2007                  tr_clientForId (client, sizeof (client), peer_id);
2008                  peer->client = tr_quark_new (client, -1);
2009                }
2010
2011              peer->io = tr_handshakeStealIO (handshake); /* this steals its refcount too, which is
2012                                                                balanced by our unref in peerDelete ()  */
2013              tr_peerIoSetParent (peer->io, &s->tor->bandwidth);
2014              tr_peerMsgsNew (s->tor, peer, peerCallbackFunc, s);
2015
2016              success = true;
2017            }
2018        }
2019    }
2020
2021  if (s != NULL)
2022    swarmUnlock (s);
2023
2024  return success;
2025}
2026
2027void
2028tr_peerMgrAddIncoming (tr_peerMgr       * manager,
2029                       tr_address       * addr,
2030                       tr_port            port,
2031                       int                socket,
2032                       struct UTPSocket * utp_socket)
2033{
2034  tr_session * session;
2035
2036  managerLock (manager);
2037
2038  assert (tr_isSession (manager->session));
2039  session = manager->session;
2040
2041  if (tr_sessionIsAddressBlocked (session, addr))
2042    {
2043      tr_logAddDebug ("Banned IP address \"%s\" tried to connect to us", tr_address_to_string (addr));
2044      if (socket >= 0)
2045        tr_netClose (session, socket);
2046      else
2047        UTP_Close (utp_socket);
2048    }
2049  else if (getExistingHandshake (&manager->incomingHandshakes, addr))
2050    {
2051      if (socket >= 0)
2052        tr_netClose (session, socket);
2053      else
2054        UTP_Close (utp_socket);
2055    }
2056  else /* we don't have a connection to them yet... */
2057    {
2058      tr_peerIo *    io;
2059      tr_handshake * handshake;
2060
2061      io = tr_peerIoNewIncoming (session, &session->bandwidth, addr, port, socket, utp_socket);
2062
2063      handshake = tr_handshakeNew (io,
2064                                   session->encryptionMode,
2065                                   myHandshakeDoneCB,
2066                                   manager);
2067
2068      tr_peerIoUnref (io); /* balanced by the implicit ref in tr_peerIoNewIncoming () */
2069
2070      tr_ptrArrayInsertSorted (&manager->incomingHandshakes, handshake,
2071                               handshakeCompare);
2072    }
2073
2074  managerUnlock (manager);
2075}
2076
2077void
2078tr_peerMgrAddPex (tr_torrent * tor, uint8_t from,
2079                  const tr_pex * pex, int8_t seedProbability)
2080{
2081  if (tr_isPex (pex)) /* safeguard against corrupt data */
2082    {
2083      tr_swarm * s = tor->swarm;
2084      managerLock (s->manager);
2085
2086      if (!tr_sessionIsAddressBlocked (s->manager->session, &pex->addr))
2087        if (tr_address_is_valid_for_peers (&pex->addr, pex->port))
2088          ensureAtomExists (s, &pex->addr, pex->port, pex->flags, seedProbability, from);
2089
2090      managerUnlock (s->manager);
2091    }
2092}
2093
2094void
2095tr_peerMgrMarkAllAsSeeds (tr_torrent * tor)
2096{
2097  tr_swarm * s = tor->swarm;
2098  const int n = tr_ptrArraySize (&s->pool);
2099  struct peer_atom ** it = (struct peer_atom**) tr_ptrArrayBase (&s->pool);
2100  struct peer_atom ** end = it + n;
2101
2102  while (it != end)
2103    atomSetSeed (s, *it++);
2104}
2105
2106tr_pex *
2107tr_peerMgrCompactToPex (const void    * compact,
2108                        size_t          compactLen,
2109                        const uint8_t * added_f,
2110                        size_t          added_f_len,
2111                        size_t        * pexCount)
2112{
2113  size_t i;
2114  size_t n = compactLen / 6;
2115  const uint8_t * walk = compact;
2116  tr_pex * pex = tr_new0 (tr_pex, n);
2117
2118  for (i=0; i<n; ++i)
2119    {
2120      pex[i].addr.type = TR_AF_INET;
2121      memcpy (&pex[i].addr.addr, walk, 4); walk += 4;
2122      memcpy (&pex[i].port, walk, 2); walk += 2;
2123      if (added_f && (n == added_f_len))
2124        pex[i].flags = added_f[i];
2125    }
2126
2127  *pexCount = n;
2128  return pex;
2129}
2130
2131tr_pex *
2132tr_peerMgrCompact6ToPex (const void    * compact,
2133                         size_t          compactLen,
2134                         const uint8_t * added_f,
2135                         size_t          added_f_len,
2136                         size_t        * pexCount)
2137{
2138  size_t i;
2139  size_t n = compactLen / 18;
2140  const uint8_t * walk = compact;
2141  tr_pex * pex = tr_new0 (tr_pex, n);
2142
2143  for (i=0; i<n; ++i)
2144    {
2145      pex[i].addr.type = TR_AF_INET6;
2146      memcpy (&pex[i].addr.addr.addr6.s6_addr, walk, 16); walk += 16;
2147      memcpy (&pex[i].port, walk, 2); walk += 2;
2148      if (added_f && (n == added_f_len))
2149        pex[i].flags = added_f[i];
2150    }
2151
2152  *pexCount = n;
2153  return pex;
2154}
2155
2156tr_pex *
2157tr_peerMgrArrayToPex (const void  * array,
2158                      size_t        arrayLen,
2159                      size_t      * pexCount)
2160{
2161  size_t i;
2162  size_t n = arrayLen / (sizeof (tr_address) + 2);
2163  const uint8_t * walk = array;
2164  tr_pex * pex = tr_new0 (tr_pex, n);
2165
2166  for (i=0 ; i<n ; ++i)
2167    {
2168      memcpy (&pex[i].addr, walk, sizeof (tr_address));
2169      memcpy (&pex[i].port, walk + sizeof (tr_address), 2);
2170      pex[i].flags = 0x00;
2171      walk += sizeof (tr_address) + 2;
2172    }
2173
2174  *pexCount = n;
2175  return pex;
2176}
2177
2178/**
2179***
2180**/
2181
2182void
2183tr_peerMgrGotBadPiece (tr_torrent * tor, tr_piece_index_t pieceIndex)
2184{
2185  int i;
2186  int n;
2187  tr_swarm * s = tor->swarm;
2188  const uint32_t byteCount = tr_torPieceCountBytes (tor, pieceIndex);
2189
2190  for (i=0, n=tr_ptrArraySize(&s->peers); i!=n; ++i)
2191    {
2192      tr_peer * peer = tr_ptrArrayNth (&s->peers, i);
2193
2194      if (tr_bitfieldHas (&peer->blame, pieceIndex))
2195        {
2196          tordbg (s, "peer %s contributed to corrupt piece (%d); now has %d strikes",
2197                  tr_atomAddrStr(peer->atom), pieceIndex, (int)peer->strikes + 1);
2198          addStrike (s, peer);
2199        }
2200    }
2201
2202
2203  tr_announcerAddBytes (tor, TR_ANN_CORRUPT, byteCount);
2204}
2205
2206int
2207tr_pexCompare (const void * va, const void * vb)
2208{
2209  int i;
2210  const tr_pex * a = va;
2211  const tr_pex * b = vb;
2212
2213  assert (tr_isPex (a));
2214  assert (tr_isPex (b));
2215
2216  if ((i = tr_address_compare (&a->addr, &b->addr)))
2217    return i;
2218
2219  if (a->port != b->port)
2220    return a->port < b->port ? -1 : 1;
2221
2222  return 0;
2223}
2224
2225/* better goes first */
2226static int
2227compareAtomsByUsefulness (const void * va, const void *vb)
2228{
2229  const struct peer_atom * a = * (const struct peer_atom**) va;
2230  const struct peer_atom * b = * (const struct peer_atom**) vb;
2231
2232  assert (tr_isAtom (a));
2233  assert (tr_isAtom (b));
2234
2235  if (a->piece_data_time != b->piece_data_time)
2236    return a->piece_data_time > b->piece_data_time ? -1 : 1;
2237  if (a->fromBest != b->fromBest)
2238    return a->fromBest < b->fromBest ? -1 : 1;
2239  if (a->numFails != b->numFails)
2240    return a->numFails < b->numFails ? -1 : 1;
2241
2242  return 0;
2243}
2244
2245static bool
2246isAtomInteresting (const tr_torrent * tor, struct peer_atom * atom)
2247{
2248  if (tr_torrentIsSeed (tor) && atomIsSeed (atom))
2249    return false;
2250
2251  if (peerIsInUse (tor->swarm, atom))
2252    return true;
2253
2254  if (isAtomBlocklisted (tor->session, atom))
2255    return false;
2256
2257  if (atom->flags2 & MYFLAG_BANNED)
2258    return false;
2259
2260  return true;
2261}
2262
2263int
2264tr_peerMgrGetPeers (tr_torrent   * tor,
2265                    tr_pex      ** setme_pex,
2266                    uint8_t        af,
2267                    uint8_t        list_mode,
2268                    int            maxCount)
2269{
2270  int i;
2271  int n;
2272  int count = 0;
2273  int atomCount = 0;
2274  const tr_swarm * s = tor->swarm;
2275  struct peer_atom ** atoms = NULL;
2276  tr_pex * pex;
2277  tr_pex * walk;
2278
2279  assert (tr_isTorrent (tor));
2280  assert (setme_pex != NULL);
2281  assert (af==TR_AF_INET || af==TR_AF_INET6);
2282  assert (list_mode==TR_PEERS_CONNECTED || list_mode==TR_PEERS_INTERESTING);
2283
2284  managerLock (s->manager);
2285
2286  /**
2287  ***  build a list of atoms
2288  **/
2289
2290  if (list_mode == TR_PEERS_CONNECTED) /* connected peers only */
2291    {
2292      int i;
2293      const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase (&s->peers);
2294      atomCount = tr_ptrArraySize (&s->peers);
2295      atoms = tr_new (struct peer_atom *, atomCount);
2296      for (i=0; i<atomCount; ++i)
2297        atoms[i] = peers[i]->atom;
2298    }
2299  else /* TR_PEERS_INTERESTING */
2300    {
2301      int i;
2302      struct peer_atom ** atomBase = (struct peer_atom**) tr_ptrArrayBase (&s->pool);
2303      n = tr_ptrArraySize (&s->pool);
2304      atoms = tr_new (struct peer_atom *, n);
2305      for (i=0; i<n; ++i)
2306        if (isAtomInteresting (tor, atomBase[i]))
2307          atoms[atomCount++] = atomBase[i];
2308    }
2309
2310  qsort (atoms, atomCount, sizeof (struct peer_atom *), compareAtomsByUsefulness);
2311
2312  /**
2313  ***  add the first N of them into our return list
2314  **/
2315
2316  n = MIN (atomCount, maxCount);
2317  pex = walk = tr_new0 (tr_pex, n);
2318
2319  for (i=0; i<atomCount && count<n; ++i)
2320    {
2321      const struct peer_atom * atom = atoms[i];
2322      if (atom->addr.type == af)
2323        {
2324          assert (tr_address_is_valid (&atom->addr));
2325          walk->addr = atom->addr;
2326          walk->port = atom->port;
2327          walk->flags = atom->flags;
2328          ++count;
2329          ++walk;
2330        }
2331    }
2332
2333  qsort (pex, count, sizeof (tr_pex), tr_pexCompare);
2334
2335  assert ((walk - pex) == count);
2336  *setme_pex = pex;
2337
2338  /* cleanup */
2339  tr_free (atoms);
2340  managerUnlock (s->manager);
2341  return count;
2342}
2343
2344static void atomPulse    (int, short, void *);
2345static void bandwidthPulse (int, short, void *);
2346static void rechokePulse (int, short, void *);
2347static void reconnectPulse (int, short, void *);
2348
2349static struct event *
2350createTimer (tr_session * session, int msec, void (*callback)(int, short, void *), void * cbdata)
2351{
2352  struct event * timer = evtimer_new (session->event_base, callback, cbdata);
2353  tr_timerAddMsec (timer, msec);
2354  return timer;
2355}
2356
2357static void
2358ensureMgrTimersExist (struct tr_peerMgr * m)
2359{
2360  if (m->atomTimer == NULL)
2361    m->atomTimer = createTimer (m->session, ATOM_PERIOD_MSEC, atomPulse, m);
2362
2363  if (m->bandwidthTimer == NULL)
2364    m->bandwidthTimer = createTimer (m->session, BANDWIDTH_PERIOD_MSEC, bandwidthPulse, m);
2365
2366  if (m->rechokeTimer == NULL)
2367    m->rechokeTimer = createTimer (m->session, RECHOKE_PERIOD_MSEC, rechokePulse, m);
2368
2369  if (m->refillUpkeepTimer == NULL)
2370    m->refillUpkeepTimer = createTimer (m->session, REFILL_UPKEEP_PERIOD_MSEC, refillUpkeep, m);
2371}
2372
2373void
2374tr_peerMgrStartTorrent (tr_torrent * tor)
2375{
2376  tr_swarm * s;
2377
2378  assert (tr_isTorrent (tor));
2379  assert (tr_torrentIsLocked (tor));
2380
2381  s = tor->swarm;
2382  ensureMgrTimersExist (s->manager);
2383
2384  s->isRunning = true;
2385  s->maxPeers = tor->maxConnectedPeers;
2386  s->pieceSortState = PIECES_UNSORTED;
2387
2388  rechokePulse (0, 0, s->manager);
2389}
2390
2391static void
2392stopSwarm (tr_swarm * swarm)
2393{
2394  tr_peer * peer;
2395
2396  swarm->isRunning = false;
2397
2398  replicationFree (swarm);
2399  invalidatePieceSorting (swarm);
2400
2401  /* disconnect the peers. */
2402  while ((peer = tr_ptrArrayPop (&swarm->peers)))
2403    peerDelete (swarm, peer);
2404
2405  /* disconnect the handshakes. handshakeAbort calls handshakeDoneCB (),
2406   * which removes the handshake from t->outgoingHandshakes... */
2407  while (!tr_ptrArrayEmpty (&swarm->outgoingHandshakes))
2408    tr_handshakeAbort (tr_ptrArrayNth (&swarm->outgoingHandshakes, 0));
2409}
2410
2411void
2412tr_peerMgrStopTorrent (tr_torrent * tor)
2413{
2414  assert (tr_isTorrent (tor));
2415  assert (tr_torrentIsLocked (tor));
2416
2417  stopSwarm (tor->swarm);
2418}
2419
2420void
2421tr_peerMgrAddTorrent (tr_peerMgr * manager, tr_torrent * tor)
2422{
2423  assert (tr_isTorrent (tor));
2424  assert (tr_torrentIsLocked (tor));
2425  assert (tor->swarm == NULL);
2426
2427  tor->swarm = swarmNew (manager, tor);
2428}
2429
2430void
2431tr_peerMgrRemoveTorrent (tr_torrent * tor)
2432{
2433  assert (tr_isTorrent (tor));
2434  assert (tr_torrentIsLocked (tor));
2435
2436  stopSwarm (tor->swarm);
2437  swarmFree (tor->swarm);
2438}
2439
2440void
2441tr_peerUpdateProgress (tr_torrent * tor, tr_peer * peer)
2442{
2443  const tr_bitfield * have = &peer->have;
2444
2445  if (tr_bitfieldHasAll (have))
2446    {
2447      peer->progress = 1.0;
2448    }
2449  else if (tr_bitfieldHasNone (have))
2450    {
2451      peer->progress = 0.0;
2452    }
2453  else
2454    {
2455      const float true_count = tr_bitfieldCountTrueBits (have);
2456
2457      if (tr_torrentHasMetadata (tor))
2458        {
2459          peer->progress = true_count / tor->info.pieceCount;
2460        }
2461      else /* without pieceCount, this result is only a best guess... */
2462        {
2463          peer->progress = true_count / (have->bit_count + 1);
2464        }
2465    }
2466
2467  /* clamp the progress range */
2468  if (peer->progress < 0.0)
2469    peer->progress = 0.0;
2470  if (peer->progress > 1.0)
2471    peer->progress = 1.0;
2472
2473  if (peer->atom && (peer->progress >= 1.0))
2474    atomSetSeed (tor->swarm, peer->atom);
2475}
2476
2477void
2478tr_peerMgrOnTorrentGotMetainfo (tr_torrent * tor)
2479{
2480  int i;
2481  int peerCount;
2482  tr_peer ** peers;
2483
2484  /* the webseed list may have changed... */
2485  rebuildWebseedArray (tor->swarm, tor);
2486
2487  /* some peer_msgs' progress fields may not be accurate if we
2488     didn't have the metadata before now... so refresh them all... */
2489  peerCount = tr_ptrArraySize (&tor->swarm->peers);
2490  peers = (tr_peer**) tr_ptrArrayBase (&tor->swarm->peers);
2491  for (i=0; i<peerCount; ++i)
2492    tr_peerUpdateProgress (tor, peers[i]);
2493}
2494
2495void
2496tr_peerMgrTorrentAvailability (const tr_torrent  * tor,
2497                               int8_t            * tab,
2498                               unsigned int        tabCount)
2499{
2500  assert (tr_isTorrent (tor));
2501  assert (tab != NULL);
2502  assert (tabCount > 0);
2503
2504  memset (tab, 0, tabCount);
2505
2506  if (tr_torrentHasMetadata (tor))
2507    {
2508      tr_piece_index_t i;
2509      const int peerCount = tr_ptrArraySize (&tor->swarm->peers);
2510      const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase (&tor->swarm->peers);
2511      const float interval = tor->info.pieceCount / (float)tabCount;
2512      const bool isSeed = tr_cpGetStatus (&tor->completion) == TR_SEED;
2513
2514      for (i=0; i<tabCount; ++i)
2515        {
2516          const int piece = i * interval;
2517
2518          if (isSeed || tr_cpPieceIsComplete (&tor->completion, piece))
2519            {
2520              tab[i] = -1;
2521            }
2522          else if (peerCount)
2523            {
2524              int j;
2525              for (j=0; j<peerCount; ++j)
2526                if (tr_bitfieldHas (&peers[j]->have, piece))
2527                  ++tab[i];
2528            }
2529        }
2530    }
2531}
2532
2533static bool
2534peerIsSeed (const tr_peer * peer)
2535{
2536    if (peer->progress >= 1.0)
2537        return true;
2538
2539    if (peer->atom && atomIsSeed (peer->atom))
2540        return true;
2541
2542    return false;
2543}
2544
2545/* count how many bytes we want that connected peers have */
2546uint64_t
2547tr_peerMgrGetDesiredAvailable (const tr_torrent * tor)
2548{
2549  size_t i;
2550  size_t n;
2551  uint64_t desiredAvailable;
2552  const tr_swarm * s = tor->swarm;
2553
2554  /* common shortcuts... */
2555
2556  if (tr_torrentIsSeed (s->tor))
2557    return 0;
2558
2559  if (!tr_torrentHasMetadata (tor))
2560    return 0;
2561
2562  n = tr_ptrArraySize (&s->peers);
2563  if (n == 0)
2564    {
2565      return 0;
2566    }
2567  else
2568    {
2569      const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase (&s->peers);
2570      for (i=0; i<n; ++i)
2571        if (peers[i]->atom && atomIsSeed (peers[i]->atom))
2572          return tr_cpLeftUntilDone (&tor->completion);
2573    }
2574
2575  if (!s->pieceReplication || !s->pieceReplicationSize)
2576    return 0;
2577
2578  /* do it the hard way */
2579
2580  desiredAvailable = 0;
2581  for (i=0, n=MIN (tor->info.pieceCount, s->pieceReplicationSize); i<n; ++i)
2582    if (!tor->info.pieces[i].dnd && (s->pieceReplication[i] > 0))
2583      desiredAvailable += tr_cpMissingBytesInPiece (&s->tor->completion, i);
2584
2585  assert (desiredAvailable <= tor->info.totalSize);
2586  return desiredAvailable;
2587}
2588
2589void
2590tr_peerMgrTorrentStats (tr_torrent  * tor,
2591                        int         * setmePeersConnected,
2592                        int         * setmeWebseedsSendingToUs,
2593                        int         * setmePeersSendingToUs,
2594                        int         * setmePeersGettingFromUs,
2595                        int         * setmePeersFrom)
2596{
2597  int i;
2598  int size;
2599  tr_swarm * s;
2600  const tr_peer ** peers;
2601
2602  assert (tr_isTorrent (tor));
2603
2604  *setmePeersConnected       = 0;
2605  *setmePeersGettingFromUs   = 0;
2606  *setmePeersSendingToUs     = 0;
2607  *setmeWebseedsSendingToUs  = 0;
2608
2609  s = tor->swarm;
2610  size = tr_ptrArraySize (&s->peers);
2611  peers = (const tr_peer **) tr_ptrArrayBase (&s->peers);
2612
2613  for (i=0; i<TR_PEER_FROM__MAX; ++i)
2614    setmePeersFrom[i] = 0;
2615
2616  for (i=0; i<size; ++i)
2617    {
2618      const tr_peer * peer = peers[i];
2619      const struct peer_atom * atom = peer->atom;
2620
2621      if (peer->io == NULL) /* not connected */
2622        continue;
2623
2624      ++*setmePeersConnected;
2625
2626      ++setmePeersFrom[atom->fromFirst];
2627
2628      if (clientIsDownloadingFrom (tor, peer))
2629        ++*setmePeersSendingToUs;
2630
2631      if (clientIsUploadingTo (peer))
2632        ++*setmePeersGettingFromUs;
2633    }
2634
2635  *setmeWebseedsSendingToUs = countActiveWebseeds (s);
2636}
2637
2638double*
2639tr_peerMgrWebSpeeds_KBps (const tr_torrent * tor)
2640{
2641  unsigned int i;
2642  unsigned int webseedCount;
2643  const tr_swarm * s;
2644  const tr_webseed ** webseeds;
2645  double * ret = NULL;
2646  const uint64_t now = tr_time_msec ();
2647
2648  assert (tr_isTorrent (tor));
2649
2650  s = tor->swarm;
2651  webseedCount = tr_ptrArraySize (&s->webseeds);
2652  webseeds = (const tr_webseed**) tr_ptrArrayBase (&s->webseeds);
2653  ret = tr_new0 (double, webseedCount);
2654
2655  assert (s->manager != NULL);
2656  assert (webseedCount == tor->info.webseedCount);
2657
2658  for (i=0; i<webseedCount; ++i)
2659    {
2660      unsigned int Bps;
2661      if (tr_webseedGetSpeed_Bps (webseeds[i], now, &Bps))
2662        ret[i] = Bps / (double)tr_speed_K;
2663      else
2664        ret[i] = -1.0;
2665    }
2666
2667  return ret;
2668}
2669
2670unsigned int
2671tr_peerGetPieceSpeed_Bps (const tr_peer * peer, uint64_t now, tr_direction direction)
2672{
2673  return peer->io ? tr_peerIoGetPieceSpeed_Bps (peer->io, now, direction) : 0.0;
2674}
2675
2676struct tr_peer_stat *
2677tr_peerMgrPeerStats (const tr_torrent * tor, int * setmeCount)
2678{
2679  int i;
2680  int size = 0;
2681  tr_peer_stat * ret;
2682  const tr_swarm * s;
2683  const tr_peer ** peers;
2684  const time_t now = tr_time ();
2685  const uint64_t now_msec = tr_time_msec ();
2686
2687  assert (tr_isTorrent (tor));
2688  assert (tor->swarm->manager != NULL);
2689
2690  s = tor->swarm;
2691  peers = (const tr_peer**) tr_ptrArrayBase (&s->peers);
2692  size = tr_ptrArraySize (&s->peers);
2693  ret = tr_new0 (tr_peer_stat, size);
2694
2695  for (i=0; i<size; ++i)
2696    {
2697      char *                   pch;
2698      const tr_peer *          peer = peers[i];
2699      const struct peer_atom * atom = peer->atom;
2700      tr_peer_stat *           stat = ret + i;
2701
2702      tr_address_to_string_with_buf (&atom->addr, stat->addr, sizeof (stat->addr));
2703      tr_strlcpy (stat->client, tr_quark_get_string(peer->client,NULL), sizeof (stat->client));
2704      stat->port                = ntohs (peer->atom->port);
2705      stat->from                = atom->fromFirst;
2706      stat->progress            = peer->progress;
2707      stat->isUTP               = peer->io->utp_socket != NULL;
2708      stat->isEncrypted         = tr_peerIoIsEncrypted (peer->io) ? 1 : 0;
2709      stat->rateToPeer_KBps     = toSpeedKBps (tr_peerGetPieceSpeed_Bps (peer, now_msec, TR_CLIENT_TO_PEER));
2710      stat->rateToClient_KBps   = toSpeedKBps (tr_peerGetPieceSpeed_Bps (peer, now_msec, TR_PEER_TO_CLIENT));
2711      stat->peerIsChoked        = peer->peerIsChoked;
2712      stat->peerIsInterested    = peer->peerIsInterested;
2713      stat->clientIsChoked      = peer->clientIsChoked;
2714      stat->clientIsInterested  = peer->clientIsInterested;
2715      stat->isIncoming          = tr_peerIoIsIncoming (peer->io);
2716      stat->isDownloadingFrom   = clientIsDownloadingFrom (tor, peer);
2717      stat->isUploadingTo       = clientIsUploadingTo (peer);
2718      stat->isSeed              = peerIsSeed (peer);
2719
2720      stat->blocksToPeer        = tr_historyGet (&peer->blocksSentToPeer,    now, CANCEL_HISTORY_SEC);
2721      stat->blocksToClient      = tr_historyGet (&peer->blocksSentToClient,  now, CANCEL_HISTORY_SEC);
2722      stat->cancelsToPeer       = tr_historyGet (&peer->cancelsSentToPeer,   now, CANCEL_HISTORY_SEC);
2723      stat->cancelsToClient     = tr_historyGet (&peer->cancelsSentToClient, now, CANCEL_HISTORY_SEC);
2724
2725      stat->pendingReqsToPeer   = peer->pendingReqsToPeer;
2726      stat->pendingReqsToClient = peer->pendingReqsToClient;
2727
2728      pch = stat->flagStr;
2729      if (stat->isUTP) *pch++ = 'T';
2730      if (s->optimistic == peer) *pch++ = 'O';
2731      if (stat->isDownloadingFrom) *pch++ = 'D';
2732      else if (stat->clientIsInterested) *pch++ = 'd';
2733      if (stat->isUploadingTo) *pch++ = 'U';
2734      else if (stat->peerIsInterested) *pch++ = 'u';
2735      if (!stat->clientIsChoked && !stat->clientIsInterested) *pch++ = 'K';
2736      if (!stat->peerIsChoked && !stat->peerIsInterested) *pch++ = '?';
2737      if (stat->isEncrypted) *pch++ = 'E';
2738      if (stat->from == TR_PEER_FROM_DHT) *pch++ = 'H';
2739      else if (stat->from == TR_PEER_FROM_PEX) *pch++ = 'X';
2740      if (stat->isIncoming) *pch++ = 'I';
2741      *pch = '\0';
2742    }
2743
2744  *setmeCount = size;
2745  return ret;
2746}
2747
2748/***
2749****
2750****
2751***/
2752
2753void
2754tr_peerMgrClearInterest (tr_torrent * tor)
2755{
2756  int i;
2757  tr_swarm * s = tor->swarm;
2758  const int peerCount = tr_ptrArraySize (&s->peers);
2759
2760  assert (tr_isTorrent (tor));
2761  assert (tr_torrentIsLocked (tor));
2762
2763  for (i=0; i<peerCount; ++i)
2764    {
2765      const tr_peer * peer = tr_ptrArrayNth (&s->peers, i);
2766      tr_peerMsgsSetInterested (peer->msgs, false);
2767    }
2768}
2769
2770/* does this peer have any pieces that we want? */
2771static bool
2772isPeerInteresting (tr_torrent     * const tor,
2773                   const bool     * const piece_is_interesting,
2774                   const tr_peer  * const peer)
2775{
2776  tr_piece_index_t i, n;
2777
2778  /* these cases should have already been handled by the calling code... */
2779  assert (!tr_torrentIsSeed (tor));
2780  assert (tr_torrentIsPieceTransferAllowed (tor, TR_PEER_TO_CLIENT));
2781
2782  if (peerIsSeed (peer))
2783    return true;
2784
2785  for (i=0, n=tor->info.pieceCount; i<n; ++i)
2786    if (piece_is_interesting[i] && tr_bitfieldHas (&peer->have, i))
2787      return true;
2788
2789  return false;
2790}
2791
2792typedef enum
2793{
2794  RECHOKE_STATE_GOOD,
2795  RECHOKE_STATE_UNTESTED,
2796  RECHOKE_STATE_BAD
2797}
2798tr_rechoke_state;
2799
2800struct tr_rechoke_info
2801{
2802  tr_peer * peer;
2803  int salt;
2804  int rechoke_state;
2805};
2806
2807static int
2808compare_rechoke_info (const void * va, const void * vb)
2809{
2810  const struct tr_rechoke_info * a = va;
2811  const struct tr_rechoke_info * b = vb;
2812
2813  if (a->rechoke_state != b->rechoke_state)
2814    return a->rechoke_state - b->rechoke_state;
2815
2816  return a->salt - b->salt;
2817}
2818
2819/* determines who we send "interested" messages to */
2820static void
2821rechokeDownloads (tr_swarm * s)
2822{
2823  int i;
2824  int maxPeers = 0;
2825  int rechoke_count = 0;
2826  struct tr_rechoke_info * rechoke = NULL;
2827  const int MIN_INTERESTING_PEERS = 5;
2828  const int peerCount = tr_ptrArraySize (&s->peers);
2829  const time_t now = tr_time ();
2830
2831  /* some cases where this function isn't necessary */
2832  if (tr_torrentIsSeed (s->tor))
2833    return;
2834  if (!tr_torrentIsPieceTransferAllowed (s->tor, TR_PEER_TO_CLIENT))
2835    return;
2836
2837  /* decide HOW MANY peers to be interested in */
2838  {
2839    int blocks = 0;
2840    int cancels = 0;
2841    time_t timeSinceCancel;
2842
2843    /* Count up how many blocks & cancels each peer has.
2844     *
2845     * There are two situations where we send out cancels --
2846     *
2847     * 1. We've got unresponsive peers, which is handled by deciding
2848     *    -which- peers to be interested in.
2849     *
2850     * 2. We've hit our bandwidth cap, which is handled by deciding
2851     *    -how many- peers to be interested in.
2852     *
2853     * We're working on 2. here, so we need to ignore unresponsive
2854     * peers in our calculations lest they confuse Transmission into
2855     * thinking it's hit its bandwidth cap.
2856     */
2857    for (i=0; i<peerCount; ++i)
2858      {
2859        const tr_peer * peer = tr_ptrArrayNth (&s->peers, i);
2860        const int b = tr_historyGet (&peer->blocksSentToClient, now, CANCEL_HISTORY_SEC);
2861        const int c = tr_historyGet (&peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC);
2862
2863        if (b == 0) /* ignore unresponsive peers, as described above */
2864          continue;
2865
2866        blocks += b;
2867        cancels += c;
2868      }
2869
2870    if (cancels > 0)
2871      {
2872        /* cancelRate: of the block requests we've recently made, the percentage we cancelled.
2873         * higher values indicate more congestion. */
2874        const double cancelRate = cancels / (double)(cancels + blocks);
2875        const double mult = 1 - MIN (cancelRate, 0.5);
2876        maxPeers = s->interestedCount * mult;
2877        tordbg (s, "cancel rate is %.3f -- reducing the "
2878                   "number of peers we're interested in by %.0f percent",
2879                   cancelRate, mult * 100);
2880        s->lastCancel = now;
2881      }
2882
2883    timeSinceCancel = now - s->lastCancel;
2884    if (timeSinceCancel)
2885      {
2886        const int maxIncrease = 15;
2887        const time_t maxHistory = 2 * CANCEL_HISTORY_SEC;
2888        const double mult = MIN (timeSinceCancel, maxHistory) / (double) maxHistory;
2889        const int inc = maxIncrease * mult;
2890        maxPeers = s->maxPeers + inc;
2891        tordbg (s, "time since last cancel is %li -- increasing the "
2892                   "number of peers we're interested in by %d",
2893                   timeSinceCancel, inc);
2894      }
2895  }
2896
2897  /* don't let the previous section's number tweaking go too far... */
2898  if (maxPeers < MIN_INTERESTING_PEERS)
2899    maxPeers = MIN_INTERESTING_PEERS;
2900  if (maxPeers > s->tor->maxConnectedPeers)
2901    maxPeers = s->tor->maxConnectedPeers;
2902
2903  s->maxPeers = maxPeers;
2904
2905  if (peerCount > 0)
2906    {
2907      bool * piece_is_interesting;
2908      const tr_torrent * const tor = s->tor;
2909      const int n = tor->info.pieceCount;
2910
2911      /* build a bitfield of interesting pieces... */
2912      piece_is_interesting = tr_new (bool, n);
2913      for (i=0; i<n; i++)
2914        piece_is_interesting[i] = !tor->info.pieces[i].dnd && !tr_cpPieceIsComplete (&tor->completion, i);
2915
2916      /* decide WHICH peers to be interested in (based on their cancel-to-block ratio) */
2917      for (i=0; i<peerCount; ++i)
2918        {
2919          tr_peer * peer = tr_ptrArrayNth (&s->peers, i);
2920
2921          if (!isPeerInteresting (s->tor, piece_is_interesting, peer))
2922            {
2923              tr_peerMsgsSetInterested (peer->msgs, false);
2924            }
2925          else
2926            {
2927              tr_rechoke_state rechoke_state;
2928              const int blocks = tr_historyGet (&peer->blocksSentToClient, now, CANCEL_HISTORY_SEC);
2929              const int cancels = tr_historyGet (&peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC);
2930
2931              if (!blocks && !cancels)
2932                rechoke_state = RECHOKE_STATE_UNTESTED;
2933              else if (!cancels)
2934                rechoke_state = RECHOKE_STATE_GOOD;
2935              else if (!blocks)
2936                rechoke_state = RECHOKE_STATE_BAD;
2937              else if ((cancels * 10) < blocks)
2938                rechoke_state = RECHOKE_STATE_GOOD;
2939              else
2940                rechoke_state = RECHOKE_STATE_BAD;
2941
2942              if (rechoke == NULL)
2943                rechoke = tr_new (struct tr_rechoke_info, peerCount);
2944
2945              rechoke[rechoke_count].peer = peer;
2946              rechoke[rechoke_count].rechoke_state = rechoke_state;
2947              rechoke[rechoke_count].salt = tr_cryptoWeakRandInt (INT_MAX);
2948              rechoke_count++;
2949            }
2950
2951        }
2952
2953      tr_free (piece_is_interesting);
2954    }
2955
2956  /* now that we know which & how many peers to be interested in... update the peer interest */
2957  qsort (rechoke, rechoke_count, sizeof (struct tr_rechoke_info), compare_rechoke_info);
2958  s->interestedCount = MIN (maxPeers, rechoke_count);
2959  for (i=0; i<rechoke_count; ++i)
2960    tr_peerMsgsSetInterested (rechoke[i].peer->msgs, i<s->interestedCount);
2961
2962  /* cleanup */
2963  tr_free (rechoke);
2964}
2965
2966/**
2967***
2968**/
2969
2970struct ChokeData
2971{
2972  bool      isInterested;
2973  bool      wasChoked;
2974  bool      isChoked;
2975  int       rate;
2976  int       salt;
2977  tr_peer * peer;
2978};
2979
2980static int
2981compareChoke (const void * va, const void * vb)
2982{
2983  const struct ChokeData * a = va;
2984  const struct ChokeData * b = vb;
2985
2986  if (a->rate != b->rate) /* prefer higher overall speeds */
2987    return a->rate > b->rate ? -1 : 1;
2988
2989  if (a->wasChoked != b->wasChoked) /* prefer unchoked */
2990    return a->wasChoked ? 1 : -1;
2991
2992  if (a->salt != b->salt) /* random order */
2993    return a->salt - b->salt;
2994
2995  return 0;
2996}
2997
2998/* is this a new connection? */
2999static int
3000isNew (const tr_peer * peer)
3001{
3002  return peer && peer->io && tr_peerIoGetAge (peer->io) < 45;
3003}
3004
3005/* get a rate for deciding which peers to choke and unchoke. */
3006static int
3007getRate (const tr_torrent * tor, struct peer_atom * atom, uint64_t now)
3008{
3009  unsigned int Bps;
3010
3011  if (tr_torrentIsSeed (tor))
3012    Bps = tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_CLIENT_TO_PEER);
3013
3014  /* downloading a private torrent... take upload speed into account
3015   * because there may only be a small window of opportunity to share */
3016  else if (tr_torrentIsPrivate (tor))
3017    Bps = tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_PEER_TO_CLIENT)
3018        + tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_CLIENT_TO_PEER);
3019
3020  /* downloading a public torrent */
3021  else
3022    Bps = tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_PEER_TO_CLIENT);
3023
3024  /* convert it to bytes per second */
3025  return Bps;
3026}
3027
3028static inline bool
3029isBandwidthMaxedOut (const tr_bandwidth * b,
3030                     const uint64_t now_msec, tr_direction dir)
3031{
3032  if (!tr_bandwidthIsLimited (b, dir))
3033    {
3034      return false;
3035    }
3036  else
3037    {
3038      const unsigned int got = tr_bandwidthGetPieceSpeed_Bps (b, now_msec, dir);
3039      const unsigned int want = tr_bandwidthGetDesiredSpeed_Bps (b, dir);
3040      return got >= want;
3041    }
3042}
3043
3044static void
3045rechokeUploads (tr_swarm * s, const uint64_t now)
3046{
3047  int i, size, unchokedInterested;
3048  const int peerCount = tr_ptrArraySize (&s->peers);
3049  tr_peer ** peers = (tr_peer**) tr_ptrArrayBase (&s->peers);
3050  struct ChokeData * choke = tr_new0 (struct ChokeData, peerCount);
3051  const tr_session * session = s->manager->session;
3052  const int chokeAll = !tr_torrentIsPieceTransferAllowed (s->tor, TR_CLIENT_TO_PEER);
3053  const bool isMaxedOut = isBandwidthMaxedOut (&s->tor->bandwidth, now, TR_UP);
3054
3055  assert (swarmIsLocked (s));
3056
3057  /* an optimistic unchoke peer's "optimistic"
3058   * state lasts for N calls to rechokeUploads (). */
3059  if (s->optimisticUnchokeTimeScaler > 0)
3060    s->optimisticUnchokeTimeScaler--;
3061  else
3062    s->optimistic = NULL;
3063
3064  /* sort the peers by preference and rate */
3065  for (i=0, size=0; i<peerCount; ++i)
3066    {
3067      tr_peer * peer = peers[i];
3068      struct peer_atom * atom = peer->atom;
3069
3070      if (peerIsSeed (peer)) /* choke seeds and partial seeds */
3071        {
3072          tr_peerMsgsSetChoke (peer->msgs, true);
3073        }
3074      else if (chokeAll) /* choke everyone if we're not uploading */
3075        {
3076          tr_peerMsgsSetChoke (peer->msgs, true);
3077        }
3078      else if (peer != s->optimistic)
3079        {
3080          struct ChokeData * n = &choke[size++];
3081          n->peer         = peer;
3082          n->isInterested = peer->peerIsInterested;
3083          n->wasChoked    = peer->peerIsChoked;
3084          n->rate         = getRate (s->tor, atom, now);
3085          n->salt         = tr_cryptoWeakRandInt (INT_MAX);
3086          n->isChoked     = true;
3087        }
3088    }
3089
3090  qsort (choke, size, sizeof (struct ChokeData), compareChoke);
3091
3092  /**
3093   * Reciprocation and number of uploads capping is managed by unchoking
3094   * the N peers which have the best upload rate and are interested.
3095   * This maximizes the client's download rate. These N peers are
3096   * referred to as downloaders, because they are interested in downloading
3097   * from the client.
3098   *
3099   * Peers which have a better upload rate (as compared to the downloaders)
3100   * but aren't interested get unchoked. If they become interested, the
3101   * downloader with the worst upload rate gets choked. If a client has
3102   * a complete file, it uses its upload rate rather than its download
3103   * rate to decide which peers to unchoke.
3104   *
3105   * If our bandwidth is maxed out, don't unchoke any more peers.
3106   */
3107  unchokedInterested = 0;
3108  for (i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i)
3109    {
3110      choke[i].isChoked = isMaxedOut ? choke[i].wasChoked : false;
3111      if (choke[i].isInterested)
3112        ++unchokedInterested;
3113    }
3114
3115  /* optimistic unchoke */
3116  if (!s->optimistic && !isMaxedOut && (i<size))
3117    {
3118      int n;
3119      struct ChokeData * c;
3120      tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
3121
3122      for (; i<size; ++i)
3123        {
3124          if (choke[i].isInterested)
3125            {
3126              const tr_peer * peer = choke[i].peer;
3127              int x = 1, y;
3128              if (isNew (peer)) x *= 3;
3129              for (y=0; y<x; ++y)
3130                tr_ptrArrayAppend (&randPool, &choke[i]);
3131            }
3132        }
3133
3134      if ((n = tr_ptrArraySize (&randPool)))
3135        {
3136          c = tr_ptrArrayNth (&randPool, tr_cryptoWeakRandInt (n));
3137          c->isChoked = false;
3138          s->optimistic = c->peer;
3139          s->optimisticUnchokeTimeScaler = OPTIMISTIC_UNCHOKE_MULTIPLIER;
3140        }
3141
3142      tr_ptrArrayDestruct (&randPool, NULL);
3143    }
3144
3145  for (i=0; i<size; ++i)
3146    tr_peerMsgsSetChoke (choke[i].peer->msgs, choke[i].isChoked);
3147
3148  /* cleanup */
3149  tr_free (choke);
3150}
3151
3152static void
3153rechokePulse (int foo UNUSED, short bar UNUSED, void * vmgr)
3154{
3155  tr_torrent * tor = NULL;
3156  tr_peerMgr * mgr = vmgr;
3157  const uint64_t now = tr_time_msec ();
3158
3159  managerLock (mgr);
3160
3161  while ((tor = tr_torrentNext (mgr->session, tor)))
3162    {
3163      if (tor->isRunning)
3164        {
3165          tr_swarm * s = tor->swarm;
3166          if (!tr_ptrArrayEmpty (&s->peers))
3167            {
3168              rechokeUploads (s, now);
3169              rechokeDownloads (s);
3170            }
3171        }
3172    }
3173
3174  tr_timerAddMsec (mgr->rechokeTimer, RECHOKE_PERIOD_MSEC);
3175  managerUnlock (mgr);
3176}
3177
3178/***
3179****
3180****  Life and Death
3181****
3182***/
3183
3184static bool
3185shouldPeerBeClosed (const tr_swarm   * s,
3186                    const tr_peer    * peer,
3187                    int                peerCount,
3188                    const time_t       now)
3189{
3190  const tr_torrent * tor = s->tor;
3191  const struct peer_atom * atom = peer->atom;
3192
3193  /* if it's marked for purging, close it */
3194  if (peer->doPurge)
3195    {
3196      tordbg (s, "purging peer %s because its doPurge flag is set",
3197              tr_atomAddrStr (atom));
3198      return true;
3199    }
3200
3201  /* disconnect if we're both seeds and enough time has passed for PEX */
3202  if (tr_torrentIsSeed (tor) && peerIsSeed (peer))
3203    return !tr_torrentAllowsPex (tor) || (now-atom->time>=30);
3204
3205  /* disconnect if it's been too long since piece data has been transferred.
3206   * this is on a sliding scale based on number of available peers... */
3207  {
3208    const int relaxStrictnessIfFewerThanN = (int)((getMaxPeerCount (tor) * 0.9) + 0.5);
3209    /* if we have >= relaxIfFewerThan, strictness is 100%.
3210     * if we have zero connections, strictness is 0% */
3211    const float strictness = peerCount >= relaxStrictnessIfFewerThanN
3212                           ? 1.0
3213                           : peerCount / (float)relaxStrictnessIfFewerThanN;
3214    const int lo = MIN_UPLOAD_IDLE_SECS;
3215    const int hi = MAX_UPLOAD_IDLE_SECS;
3216    const int limit = hi - ((hi - lo) * strictness);
3217    const int idleTime = now - MAX (atom->time, atom->piece_data_time);
3218/*fprintf (stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit);*/
3219    if (idleTime > limit)
3220      {
3221        tordbg (s, "purging peer %s because it's been %d secs since we shared anything",
3222                tr_atomAddrStr (atom), idleTime);
3223        return true;
3224      }
3225  }
3226
3227  return false;
3228}
3229
3230static tr_peer **
3231getPeersToClose (tr_swarm * s, const time_t now_sec, int * setmeSize)
3232{
3233  int i, peerCount, outsize;
3234  struct tr_peer ** ret = NULL;
3235  tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek (&s->peers, &peerCount);
3236
3237  assert (swarmIsLocked (s));
3238
3239  for (i=outsize=0; i<peerCount; ++i)
3240    {
3241      if (shouldPeerBeClosed (s, peers[i], peerCount, now_sec))
3242        {
3243          if (ret == NULL)
3244            ret = tr_new (tr_peer *, peerCount);
3245          ret[outsize++] = peers[i];
3246        }
3247    }
3248
3249  *setmeSize = outsize;
3250  return ret;
3251}
3252
3253static int
3254getReconnectIntervalSecs (const struct peer_atom * atom, const time_t now)
3255{
3256  int sec;
3257  const bool unreachable = (atom->flags2 & MYFLAG_UNREACHABLE) != 0;
3258
3259  /* if we were recently connected to this peer and transferring piece
3260   * data, try to reconnect to them sooner rather that later -- we don't
3261   * want network troubles to get in the way of a good peer. */
3262  if (!unreachable && ((now - atom->piece_data_time) <= (MINIMUM_RECONNECT_INTERVAL_SECS * 2)))
3263    sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3264
3265  /* otherwise, the interval depends on how many times we've tried
3266   * and failed to connect to the peer */
3267  else
3268    {
3269      int step = atom->numFails;
3270
3271      /* penalize peers that were unreachable the last time we tried */
3272      if (unreachable)
3273        step += 2;
3274 
3275      switch (step)
3276        {
3277          case 0: sec = 0; break;
3278          case 1: sec = 10; break;
3279          case 2: sec = 60 * 2; break;
3280          case 3: sec = 60 * 15; break;
3281          case 4: sec = 60 * 30; break;
3282          case 5: sec = 60 * 60; break;
3283          default: sec = 60 * 120; break;
3284        }
3285    }
3286
3287  dbgmsg ("reconnect interval for %s is %d seconds", tr_atomAddrStr (atom), sec);
3288  return sec;
3289}
3290
3291static void
3292removePeer (tr_swarm * s, tr_peer * peer)
3293{
3294  tr_peer * removed;
3295  struct peer_atom * atom = peer->atom;
3296
3297  assert (swarmIsLocked (s));
3298  assert (atom);
3299
3300  atom->time = tr_time ();
3301
3302  removed = tr_ptrArrayRemoveSorted (&s->peers, peer, peerCompare);
3303
3304  if (replicationExists (s))
3305    tr_decrReplicationFromBitfield (s, &peer->have);
3306
3307  assert (removed == peer);
3308  peerDelete (s, removed);
3309}
3310
3311static void
3312closePeer (tr_swarm * s, tr_peer * peer)
3313{
3314  struct peer_atom * atom;
3315
3316  assert (s != NULL);
3317  assert (peer != NULL);
3318
3319  atom = peer->atom;
3320
3321  /* if we transferred piece data, then they might be good peers,
3322     so reset their `numFails' weight to zero. otherwise we connected
3323     to them fruitlessly, so mark it as another fail */
3324  if (atom->piece_data_time)
3325    {
3326      tordbg (s, "resetting atom %s numFails to 0", tr_atomAddrStr (atom));
3327      atom->numFails = 0;
3328    }
3329  else
3330    {
3331      ++atom->numFails;
3332      tordbg (s, "incremented atom %s numFails to %d", tr_atomAddrStr (atom), (int)atom->numFails);
3333    }
3334
3335  tordbg (s, "removing bad peer %s", tr_peerIoGetAddrStr (peer->io));
3336  removePeer (s, peer);
3337}
3338
3339static void
3340removeAllPeers (tr_swarm * s)
3341{
3342  while (!tr_ptrArrayEmpty (&s->peers))
3343    removePeer (s, tr_ptrArrayNth (&s->peers, 0));
3344}
3345
3346static void
3347closeBadPeers (tr_swarm * s, const time_t now_sec)
3348{
3349  if (!tr_ptrArrayEmpty (&s->peers))
3350    {
3351      int i;
3352      int peerCount;
3353      struct tr_peer ** peers;
3354
3355      peers = getPeersToClose (s, now_sec, &peerCount);
3356      for (i=0; i<peerCount; ++i)
3357        closePeer (s, peers[i]);
3358      tr_free (peers);
3359    }
3360}
3361
3362struct peer_liveliness
3363{
3364  tr_peer * peer;
3365  void * clientData;
3366  time_t pieceDataTime;
3367  time_t time;
3368  int speed;
3369  bool doPurge;
3370};
3371
3372static int
3373comparePeerLiveliness (const void * va, const void * vb)
3374{
3375  const struct peer_liveliness * a = va;
3376  const struct peer_liveliness * b = vb;
3377
3378  if (a->doPurge != b->doPurge)
3379    return a->doPurge ? 1 : -1;
3380
3381  if (a->speed != b->speed) /* faster goes first */
3382    return a->speed > b->speed ? -1 : 1;
3383
3384  /* the one to give us data more recently goes first */
3385  if (a->pieceDataTime != b->pieceDataTime)
3386    return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
3387
3388  /* the one we connected to most recently goes first */
3389  if (a->time != b->time)
3390    return a->time > b->time ? -1 : 1;
3391
3392  return 0;
3393}
3394
3395static void
3396sortPeersByLivelinessImpl (tr_peer  ** peers,
3397                           void     ** clientData,
3398                           int         n,
3399                           uint64_t    now,
3400                           int (*compare)(const void *va, const void *vb))
3401{
3402  int i;
3403  struct peer_liveliness *lives, *l;
3404
3405  /* build a sortable array of peer + extra info */
3406  lives = l = tr_new0 (struct peer_liveliness, n);
3407  for (i=0; i<n; ++i, ++l)
3408    {
3409      tr_peer * p = peers[i];
3410      l->peer = p;
3411      l->doPurge = p->doPurge;
3412      l->pieceDataTime = p->atom->piece_data_time;
3413      l->time = p->atom->time;
3414      l->speed = tr_peerGetPieceSpeed_Bps (p, now, TR_UP)
3415               + tr_peerGetPieceSpeed_Bps (p, now, TR_DOWN);
3416      if (clientData)
3417        l->clientData = clientData[i];
3418    }
3419
3420  /* sort 'em */
3421  assert (n == (l - lives));
3422  qsort (lives, n, sizeof (struct peer_liveliness), compare);
3423
3424  /* build the peer array */
3425  for (i=0, l=lives; i<n; ++i, ++l)
3426    {
3427      peers[i] = l->peer;
3428      if (clientData)
3429        clientData[i] = l->clientData;
3430    }
3431  assert (n == (l - lives));
3432
3433  /* cleanup */
3434  tr_free (lives);
3435}
3436
3437static void
3438sortPeersByLiveliness (tr_peer ** peers, void ** clientData, int n, uint64_t now)
3439{
3440  sortPeersByLivelinessImpl (peers, clientData, n, now, comparePeerLiveliness);
3441}
3442
3443
3444static void
3445enforceTorrentPeerLimit (tr_swarm * s, uint64_t now)
3446{
3447  int n = tr_ptrArraySize (&s->peers);
3448  const int max = tr_torrentGetPeerLimit (s->tor);
3449  if (n > max)
3450    {
3451      void * base = tr_ptrArrayBase (&s->peers);
3452      tr_peer ** peers = tr_memdup (base, n*sizeof (tr_peer*));
3453      sortPeersByLiveliness (peers, NULL, n, now);
3454      while (n > max)
3455        closePeer (s, peers[--n]);
3456      tr_free (peers);
3457    }
3458}
3459
3460static void
3461enforceSessionPeerLimit (tr_session * session, uint64_t now)
3462{
3463  int n = 0;
3464  tr_torrent * tor = NULL;
3465  const int max = tr_sessionGetPeerLimit (session);
3466
3467  /* count the total number of peers */
3468  while ((tor = tr_torrentNext (session, tor)))
3469    n += tr_ptrArraySize (&tor->swarm->peers);
3470
3471  /* if there are too many, prune out the worst */
3472  if (n > max)
3473    {
3474      tr_peer ** peers = tr_new (tr_peer*, n);
3475      tr_swarm ** swarms = tr_new (tr_swarm*, n);
3476
3477      /* populate the peer array */
3478      n = 0;
3479      tor = NULL;
3480      while ((tor = tr_torrentNext (session, tor)))
3481        {
3482          int i;
3483          tr_swarm * s = tor->swarm;
3484          const int tn = tr_ptrArraySize (&s->peers);
3485          for (i=0; i<tn; ++i, ++n)
3486            {
3487              peers[n] = tr_ptrArrayNth (&s->peers, i);
3488              swarms[n] = s;
3489            }
3490        }
3491
3492      /* sort 'em */
3493      sortPeersByLiveliness (peers, (void**)swarms, n, now);
3494
3495      /* cull out the crappiest */
3496      while (n-- > max)
3497        closePeer (swarms[n], peers[n]);
3498
3499      /* cleanup */
3500      tr_free (swarms);
3501      tr_free (peers);
3502    }
3503}
3504
3505static void makeNewPeerConnections (tr_peerMgr * mgr, const int max);
3506
3507static void
3508reconnectPulse (int foo UNUSED, short bar UNUSED, void * vmgr)
3509{
3510  tr_torrent * tor;
3511  tr_peerMgr * mgr = vmgr;
3512  const time_t now_sec = tr_time ();
3513  const uint64_t now_msec = tr_time_msec ();
3514
3515  /**
3516  ***  enforce the per-session and per-torrent peer limits
3517  **/
3518
3519  /* if we're over the per-torrent peer limits, cull some peers */
3520  tor = NULL;
3521  while ((tor = tr_torrentNext (mgr->session, tor)))
3522    if (tor->isRunning)
3523      enforceTorrentPeerLimit (tor->swarm, now_msec);
3524
3525  /* if we're over the per-session peer limits, cull some peers */
3526  enforceSessionPeerLimit (mgr->session, now_msec);
3527
3528  /* remove crappy peers */
3529  tor = NULL;
3530  while ((tor = tr_torrentNext (mgr->session, tor)))
3531    if (!tor->swarm->isRunning)
3532      removeAllPeers (tor->swarm);
3533    else
3534      closeBadPeers (tor->swarm, now_sec);
3535
3536  /* try to make new peer connections */
3537  makeNewPeerConnections (mgr, MAX_CONNECTIONS_PER_PULSE);
3538}
3539
3540/****
3541*****
3542*****  BANDWIDTH ALLOCATION
3543*****
3544****/
3545
3546static void
3547pumpAllPeers (tr_peerMgr * mgr)
3548{
3549  tr_torrent * tor = NULL;
3550
3551  while ((tor = tr_torrentNext (mgr->session, tor)))
3552    {
3553      int j;
3554      tr_swarm * s = tor->swarm;
3555
3556      for (j=0; j<tr_ptrArraySize (&s->peers); ++j)
3557        {
3558          tr_peer * peer = tr_ptrArrayNth (&s->peers, j);
3559          tr_peerMsgsPulse (peer->msgs);
3560        }
3561    }
3562}
3563
3564
3565static void
3566queuePulseForeach (void * vtor)
3567{
3568  tr_torrent * tor = vtor;
3569
3570  tr_torrentStartNow (tor);
3571
3572  if (tor->queue_started_callback != NULL)
3573    (*tor->queue_started_callback)(tor, tor->queue_started_user_data);
3574}
3575
3576static void
3577queuePulse (tr_session * session, tr_direction dir)
3578{
3579  assert (tr_isSession (session));
3580  assert (tr_isDirection (dir));
3581
3582  if (tr_sessionGetQueueEnabled (session, dir))
3583    {
3584      tr_ptrArray torrents = TR_PTR_ARRAY_INIT;
3585
3586      tr_sessionGetNextQueuedTorrents (session,
3587                                       dir,
3588                                       tr_sessionCountQueueFreeSlots (session, dir),
3589                                       &torrents);
3590
3591      tr_ptrArrayForeach (&torrents, queuePulseForeach);
3592
3593      tr_ptrArrayDestruct (&torrents, NULL);
3594    }
3595}
3596
3597static void
3598bandwidthPulse (int foo UNUSED, short bar UNUSED, void * vmgr)
3599{
3600  tr_torrent * tor;
3601  tr_peerMgr * mgr = vmgr;
3602  tr_session * session = mgr->session;
3603  managerLock (mgr);
3604
3605  /* FIXME: this next line probably isn't necessary... */
3606  pumpAllPeers (mgr);
3607
3608  /* allocate bandwidth to the peers */
3609  tr_bandwidthAllocate (&session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC);
3610  tr_bandwidthAllocate (&session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC);
3611
3612  /* torrent upkeep */
3613  tor = NULL;
3614  while ((tor = tr_torrentNext (session, tor)))
3615    {
3616      /* possibly stop torrents that have seeded enough */
3617      tr_torrentCheckSeedLimit (tor);
3618
3619      /* run the completeness check for any torrents that need it */
3620      if (tor->swarm->needsCompletenessCheck)
3621        {
3622          tor->swarm->needsCompletenessCheck  = false;
3623          tr_torrentRecheckCompleteness (tor);
3624        }
3625
3626      /* stop torrents that are ready to stop, but couldn't be stopped
3627         earlier during the peer-io callback call chain */
3628      if (tor->isStopping)
3629        tr_torrentStop (tor);
3630    }
3631
3632  /* pump the queues */
3633  queuePulse (session, TR_UP);
3634  queuePulse (session, TR_DOWN);
3635
3636  reconnectPulse (0, 0, mgr);
3637
3638  tr_timerAddMsec (mgr->bandwidthTimer, BANDWIDTH_PERIOD_MSEC);
3639  managerUnlock (mgr);
3640}
3641
3642/***
3643****
3644***/
3645
3646static int
3647compareAtomPtrsByAddress (const void * va, const void *vb)
3648{
3649  const struct peer_atom * a = * (const struct peer_atom**) va;
3650  const struct peer_atom * b = * (const struct peer_atom**) vb;
3651
3652  assert (tr_isAtom (a));
3653  assert (tr_isAtom (b));
3654
3655  return tr_address_compare (&a->addr, &b->addr);
3656}
3657
3658/* best come first, worst go last */
3659static int
3660compareAtomPtrsByShelfDate (const void * va, const void *vb)
3661{
3662  time_t atime;
3663  time_t btime;
3664  const struct peer_atom * a = * (const struct peer_atom**) va;
3665  const struct peer_atom * b = * (const struct peer_atom**) vb;
3666  const int data_time_cutoff_secs = 60 * 60;
3667  const time_t tr_now = tr_time ();
3668
3669  assert (tr_isAtom (a));
3670  assert (tr_isAtom (b));
3671
3672  /* primary key: the last piece data time *if* it was within the last hour */
3673  atime = a->piece_data_time; if (atime + data_time_cutoff_secs < tr_now) atime = 0;
3674  btime = b->piece_data_time; if (btime + data_time_cutoff_secs < tr_now) btime = 0;
3675  if (atime != btime)
3676    return atime > btime ? -1 : 1;
3677
3678  /* secondary key: shelf date. */
3679  if (a->shelf_date != b->shelf_date)
3680    return a->shelf_date > b->shelf_date ? -1 : 1;
3681
3682  return 0;
3683}
3684
3685static int
3686getMaxAtomCount (const tr_torrent * tor)
3687{
3688  return MIN (50, tor->maxConnectedPeers * 3);
3689}
3690
3691static void
3692atomPulse (int foo UNUSED, short bar UNUSED, void * vmgr)
3693{
3694  tr_torrent * tor = NULL;
3695  tr_peerMgr * mgr = vmgr;
3696  managerLock (mgr);
3697
3698  while ((tor = tr_torrentNext (mgr->session, tor)))
3699    {
3700      int atomCount;
3701      tr_swarm * s = tor->swarm;
3702      const int maxAtomCount = getMaxAtomCount (tor);
3703      struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek (&s->pool, &atomCount);
3704
3705      if (atomCount > maxAtomCount) /* we've got too many atoms... time to prune */
3706        {
3707          int i;
3708          int keepCount = 0;
3709          int testCount = 0;
3710          struct peer_atom ** keep = tr_new (struct peer_atom*, atomCount);
3711          struct peer_atom ** test = tr_new (struct peer_atom*, atomCount);
3712
3713          /* keep the ones that are in use */
3714          for (i=0; i<atomCount; ++i)
3715            {
3716              struct peer_atom * atom = atoms[i];
3717              if (peerIsInUse (s, atom))
3718                keep[keepCount++] = atom;
3719              else
3720                test[testCount++] = atom;
3721            }
3722
3723          /* if there's room, keep the best of what's left */
3724          i = 0;
3725          if (keepCount < maxAtomCount)
3726            {
3727              qsort (test, testCount, sizeof (struct peer_atom *), compareAtomPtrsByShelfDate);
3728              while (i<testCount && keepCount<maxAtomCount)
3729                keep[keepCount++] = test[i++];
3730            }
3731
3732          /* free the culled atoms */
3733          while (i<testCount)
3734            tr_free (test[i++]);
3735
3736          /* rebuild Torrent.pool with what's left */
3737          tr_ptrArrayDestruct (&s->pool, NULL);
3738          s->pool = TR_PTR_ARRAY_INIT;
3739          qsort (keep, keepCount, sizeof (struct peer_atom *), compareAtomPtrsByAddress);
3740          for (i=0; i<keepCount; ++i)
3741            tr_ptrArrayAppend (&s->pool, keep[i]);
3742
3743          tordbg (s, "max atom count is %d... pruned from %d to %d\n", maxAtomCount, atomCount, keepCount);
3744
3745          /* cleanup */
3746          tr_free (test);
3747          tr_free (keep);
3748        }
3749    }
3750
3751  tr_timerAddMsec (mgr->atomTimer, ATOM_PERIOD_MSEC);
3752  managerUnlock (mgr);
3753}
3754
3755/***
3756****
3757****
3758****
3759***/
3760
3761/* is this atom someone that we'd want to initiate a connection to? */
3762static bool
3763isPeerCandidate (const tr_torrent * tor, struct peer_atom * atom, const time_t now)
3764{
3765  /* not if we're both seeds */
3766  if (tr_torrentIsSeed (tor) && atomIsSeed (atom))
3767    return false;
3768
3769  /* not if we've already got a connection to them... */
3770  if (peerIsInUse (tor->swarm, atom))
3771    return false;
3772
3773  /* not if we just tried them already */
3774  if ((now - atom->time) < getReconnectIntervalSecs (atom, now))
3775    return false;
3776
3777  /* not if they're blocklisted */
3778  if (isAtomBlocklisted (tor->session, atom))
3779    return false;
3780
3781  /* not if they're banned... */
3782  if (atom->flags2 & MYFLAG_BANNED)
3783    return false;
3784
3785  return true;
3786}
3787
3788struct peer_candidate
3789{
3790  uint64_t score;
3791  tr_torrent * tor;
3792  struct peer_atom * atom;
3793};
3794
3795static bool
3796torrentWasRecentlyStarted (const tr_torrent * tor)
3797{
3798  return difftime (tr_time (), tor->startDate) < 120;
3799}
3800
3801static inline uint64_t
3802addValToKey (uint64_t value, int width, uint64_t addme)
3803{
3804  value = (value << (uint64_t)width);
3805  value |= addme;
3806  return value;
3807}
3808
3809/* smaller value is better */
3810static uint64_t
3811getPeerCandidateScore (const tr_torrent * tor, const struct peer_atom * atom, uint8_t salt)
3812{
3813  uint64_t i;
3814  uint64_t score = 0;
3815  const bool failed = atom->lastConnectionAt < atom->lastConnectionAttemptAt;
3816
3817  /* prefer peers we've connected to, or never tried, over peers we failed to connect to. */
3818  i = failed ? 1 : 0;
3819  score = addValToKey (score, 1, i);
3820
3821  /* prefer the one we attempted least recently (to cycle through all peers) */
3822  i = atom->lastConnectionAttemptAt;
3823  score = addValToKey (score, 32, i);
3824
3825  /* prefer peers belonging to a torrent of a higher priority */
3826  switch (tr_torrentGetPriority (tor))
3827    {
3828      case TR_PRI_HIGH:    i = 0; break;
3829      case TR_PRI_NORMAL:  i = 1; break;
3830      case TR_PRI_LOW:     i = 2; break;
3831    }
3832  score = addValToKey (score, 4, i);
3833
3834  /* prefer recently-started torrents */
3835  i = torrentWasRecentlyStarted (tor) ? 0 : 1;
3836  score = addValToKey (score, 1, i);
3837
3838  /* prefer torrents we're downloading with */
3839  i = tr_torrentIsSeed (tor) ? 1 : 0;
3840  score = addValToKey (score, 1, i);
3841
3842  /* prefer peers that are known to be connectible */
3843  i = (atom->flags & ADDED_F_CONNECTABLE) ? 0 : 1;
3844  score = addValToKey (score, 1, i);
3845
3846  /* prefer peers that we might have a chance of uploading to...
3847  so lower seed probability is better */
3848  if (atom->seedProbability == 100) i = 101;
3849  else if (atom->seedProbability == -1) i = 100;
3850  else i = atom->seedProbability;
3851  score = addValToKey (score, 8, i);
3852
3853  /* Prefer peers that we got from more trusted sources.
3854   * lower `fromBest' values indicate more trusted sources */
3855  score = addValToKey (score, 4, atom->fromBest);
3856
3857  /* salt */
3858  score = addValToKey (score, 8, salt);
3859
3860  return score;
3861}
3862
3863static int
3864comparePeerCandidates (const void * va, const void * vb)
3865{
3866  int ret;
3867  const struct peer_candidate * a = va;
3868  const struct peer_candidate * b = vb;
3869
3870  if (a->score < b->score)
3871    ret = -1;
3872  else if (a->score > b->score)
3873    ret = 1;
3874  else
3875    ret = 0;
3876
3877  return ret;
3878}
3879
3880/* Partial sorting -- selecting the k best candidates
3881   Adapted from http://en.wikipedia.org/wiki/Selection_algorithm */
3882static void
3883selectPeerCandidates (struct peer_candidate * candidates, int candidate_count, int select_count)
3884{
3885  tr_quickfindFirstK (candidates,
3886                      candidate_count,
3887                      sizeof(struct peer_candidate),
3888                      comparePeerCandidates,
3889                      select_count);
3890}
3891
3892#ifndef NDEBUG
3893static bool
3894checkBestScoresComeFirst (const struct peer_candidate * candidates, int n, int k)
3895{
3896  int i;
3897  uint64_t worstFirstScore = 0;
3898  const int x = MIN (n, k) - 1;
3899
3900  for (i=0; i<x; i++)
3901    if (worstFirstScore < candidates[i].score)
3902      worstFirstScore = candidates[i].score;
3903
3904  for (i=0; i<x; i++)
3905    assert (candidates[i].score <= worstFirstScore);
3906
3907  for (i=x+1; i<n; i++)
3908    assert (candidates[i].score >= worstFirstScore);
3909
3910  return true;
3911}
3912#endif /* NDEBUG */
3913
3914/** @return an array of all the atoms we might want to connect to */
3915static struct peer_candidate*
3916getPeerCandidates (tr_session * session, int * candidateCount, int max)
3917{
3918  int atomCount;
3919  int peerCount;
3920  tr_torrent * tor;
3921  struct peer_candidate * candidates;
3922  struct peer_candidate * walk;
3923  const time_t now = tr_time ();
3924  const uint64_t now_msec = tr_time_msec ();
3925  /* leave 5% of connection slots for incoming connections -- ticket #2609 */
3926  const int maxCandidates = tr_sessionGetPeerLimit (session) * 0.95;
3927
3928  /* count how many peers and atoms we've got */
3929  tor= NULL;
3930  atomCount = 0;
3931  peerCount = 0;
3932  while ((tor = tr_torrentNext (session, tor)))
3933    {
3934      atomCount += tr_ptrArraySize (&tor->swarm->pool);
3935      peerCount += tr_ptrArraySize (&tor->swarm->peers);
3936    }
3937
3938  /* don't start any new handshakes if we're full up */
3939  if (maxCandidates <= peerCount)
3940    {
3941      *candidateCount = 0;
3942      return NULL;
3943    }
3944
3945  /* allocate an array of candidates */
3946  walk = candidates = tr_new (struct peer_candidate, atomCount);
3947
3948  /* populate the candidate array */
3949  tor = NULL;
3950  while ((tor = tr_torrentNext (session, tor)))
3951    {
3952      int i, nAtoms;
3953      struct peer_atom ** atoms;
3954
3955      if (!tor->swarm->isRunning)
3956        continue;
3957
3958      /* if we've already got enough peers in this torrent... */
3959      if (tr_torrentGetPeerLimit (tor) <= tr_ptrArraySize (&tor->swarm->peers))
3960        continue;
3961
3962      /* if we've already got enough speed in this torrent... */
3963      if (tr_torrentIsSeed (tor) && isBandwidthMaxedOut (&tor->bandwidth, now_msec, TR_UP))
3964        continue;
3965
3966      atoms = (struct peer_atom**) tr_ptrArrayPeek (&tor->swarm->pool, &nAtoms);
3967      for (i=0; i<nAtoms; ++i)
3968        {
3969          struct peer_atom * atom = atoms[i];
3970
3971          if (isPeerCandidate (tor, atom, now))
3972            {
3973              const uint8_t salt = tr_cryptoWeakRandInt (1024);
3974              walk->tor = tor;
3975              walk->atom = atom;
3976              walk->score = getPeerCandidateScore (tor, atom, salt);
3977              ++walk;
3978            }
3979        }
3980    }
3981
3982  *candidateCount = walk - candidates;
3983  if (walk != candidates)
3984    selectPeerCandidates (candidates, walk-candidates, max);
3985
3986  assert (checkBestScoresComeFirst (candidates, *candidateCount, max));
3987  return candidates;
3988}
3989
3990static void
3991initiateConnection (tr_peerMgr * mgr, tr_swarm * s, struct peer_atom * atom)
3992{
3993  tr_peerIo * io;
3994  const time_t now = tr_time ();
3995  bool utp = tr_sessionIsUTPEnabled (mgr->session) && !atom->utp_failed;
3996
3997  if (atom->fromFirst == TR_PEER_FROM_PEX)
3998    /* PEX has explicit signalling for uTP support.  If an atom
3999       originally came from PEX and doesn't have the uTP flag, skip the
4000       uTP connection attempt.  Are we being optimistic here? */
4001    utp = utp && (atom->flags & ADDED_F_UTP_FLAGS);
4002
4003  tordbg (s, "Starting an OUTGOING%s connection with %s",
4004          utp ? " µTP" : "", tr_atomAddrStr (atom));
4005
4006  io = tr_peerIoNewOutgoing (mgr->session,
4007                             &mgr->session->bandwidth,
4008                             &atom->addr,
4009                             atom->port,
4010                             s->tor->info.hash,
4011                             s->tor->completeness == TR_SEED,
4012                             utp);
4013
4014  if (io == NULL)
4015    {
4016      tordbg (s, "peerIo not created; marking peer %s as unreachable", tr_atomAddrStr (atom));
4017      atom->flags2 |= MYFLAG_UNREACHABLE;
4018      atom->numFails++;
4019    }
4020  else
4021    {
4022      tr_handshake * handshake = tr_handshakeNew (io,
4023                                                  mgr->session->encryptionMode,
4024                                                  myHandshakeDoneCB,
4025                                                  mgr);
4026
4027      assert (tr_peerIoGetTorrentHash (io));
4028
4029      tr_peerIoUnref (io); /* balanced by the initial ref
4030                              in tr_peerIoNewOutgoing () */
4031
4032      tr_ptrArrayInsertSorted (&s->outgoingHandshakes, handshake,
4033                               handshakeCompare);
4034    }
4035
4036  atom->lastConnectionAttemptAt = now;
4037  atom->time = now;
4038}
4039
4040static void
4041initiateCandidateConnection (tr_peerMgr * mgr, struct peer_candidate * c)
4042{
4043#if 0
4044  fprintf (stderr, "Starting an OUTGOING connection with %s - [%s] seedProbability==%d; %s, %s\n",
4045           tr_atomAddrStr (c->atom),
4046           tr_torrentName (c->tor),
4047           (int)c->atom->seedProbability,
4048           tr_torrentIsPrivate (c->tor) ? "private" : "public",
4049           tr_torrentIsSeed (c->tor) ? "seed" : "downloader");
4050#endif
4051
4052  initiateConnection (mgr, c->tor->swarm, c->atom);
4053}
4054
4055static void
4056makeNewPeerConnections (struct tr_peerMgr * mgr, const int max)
4057{
4058  int i, n;
4059  struct peer_candidate * candidates;
4060
4061  candidates = getPeerCandidates (mgr->session, &n, max);
4062
4063  for (i=0; i<n && i<max; ++i)
4064    initiateCandidateConnection (mgr, &candidates[i]);
4065
4066  tr_free (candidates);
4067}
Note: See TracBrowser for help on using the repository browser.