source: trunk/libtransmission/peer-mgr.c @ 13900

Last change on this file since 13900 was 13900, checked in by jordan, 8 years ago

(libT) peer-mgr doesn't care about non-piece data being transferred, so don't notify it when it happens

  • Property svn:keywords set to Date Rev Author Id
File size: 108.1 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2 (b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 13900 2013-01-30 18:00:03Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h> /* error codes ERANGE, ... */
15#include <limits.h> /* INT_MAX */
16#include <string.h> /* memcpy, memcmp, strstr */
17#include <stdlib.h> /* qsort */
18
19#include <event2/event.h>
20
21#include <libutp/utp.h>
22
23#include "transmission.h"
24#include "announcer.h"
25#include "bandwidth.h"
26#include "blocklist.h"
27#include "cache.h"
28#include "clients.h"
29#include "completion.h"
30#include "crypto.h"
31#include "handshake.h"
32#include "log.h"
33#include "net.h"
34#include "peer-io.h"
35#include "peer-mgr.h"
36#include "peer-msgs.h"
37#include "ptrarray.h"
38#include "session.h"
39#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
40#include "torrent.h"
41#include "tr-utp.h"
42#include "utils.h"
43#include "webseed.h"
44
45enum
46{
47  /* how frequently to cull old atoms */
48  ATOM_PERIOD_MSEC = (60 * 1000),
49
50  /* how frequently to change which peers are choked */
51  RECHOKE_PERIOD_MSEC = (10 * 1000),
52
53  /* an optimistically unchoked peer is immune from rechoking
54     for this many calls to rechokeUploads (). */
55  OPTIMISTIC_UNCHOKE_MULTIPLIER = 4,
56
57  /* how frequently to reallocate bandwidth */
58  BANDWIDTH_PERIOD_MSEC = 500,
59
60  /* how frequently to age out old piece request lists */
61  REFILL_UPKEEP_PERIOD_MSEC = (10 * 1000),
62
63  /* how frequently to decide which peers live and die */
64  RECONNECT_PERIOD_MSEC = 500,
65
66  /* when many peers are available, keep idle ones this long */
67  MIN_UPLOAD_IDLE_SECS = (60),
68
69  /* when few peers are available, keep idle ones this long */
70  MAX_UPLOAD_IDLE_SECS = (60 * 5),
71
72  /* max number of peers to ask for per second overall.
73   * this throttle is to avoid overloading the router */
74  MAX_CONNECTIONS_PER_SECOND = 12,
75
76  MAX_CONNECTIONS_PER_PULSE = (int)(MAX_CONNECTIONS_PER_SECOND * (RECONNECT_PERIOD_MSEC/1000.0)),
77
78  /* number of bad pieces a peer is allowed to send before we ban them */
79  MAX_BAD_PIECES_PER_PEER = 5,
80
81  /* amount of time to keep a list of request pieces lying around
82     before it's considered too old and needs to be rebuilt */
83  PIECE_LIST_SHELF_LIFE_SECS = 60,
84
85  /* use for bitwise operations w/peer_atom.flags2 */
86  MYFLAG_BANNED = 1,
87
88  /* use for bitwise operations w/peer_atom.flags2 */
89  /* unreachable for now... but not banned.
90   * if they try to connect to us it's okay */
91  MYFLAG_UNREACHABLE = 2,
92
93  /* the minimum we'll wait before attempting to reconnect to a peer */
94  MINIMUM_RECONNECT_INTERVAL_SECS = 5,
95
96  /** how long we'll let requests we've made linger before we cancel them */
97  REQUEST_TTL_SECS = 90,
98
99  NO_BLOCKS_CANCEL_HISTORY = 120,
100
101  CANCEL_HISTORY_SEC = 60
102};
103
104const tr_peer_event TR_PEER_EVENT_INIT = { 0, 0, NULL, 0, 0, 0, 0 };
105
106/**
107***
108**/
109
110/**
111 * Peer information that should be kept even before we've connected and
112 * after we've disconnected. These are kept in a pool of peer_atoms to decide
113 * which ones would make good candidates for connecting to, and to watch out
114 * for banned peers.
115 *
116 * @see tr_peer
117 * @see tr_peermsgs
118 */
119struct peer_atom
120{
121  uint8_t     fromFirst;          /* where the peer was first found */
122  uint8_t     fromBest;           /* the "best" value of where the peer has been found */
123  uint8_t     flags;              /* these match the added_f flags */
124  uint8_t     flags2;             /* flags that aren't defined in added_f */
125  int8_t      seedProbability;    /* how likely is this to be a seed... [0..100] or -1 for unknown */
126  int8_t      blocklisted;        /* -1 for unknown, true for blocklisted, false for not blocklisted */
127
128  tr_port     port;
129  bool        utp_failed;         /* We recently failed to connect over uTP */
130  uint16_t    numFails;
131  time_t      time;               /* when the peer's connection status last changed */
132  time_t      piece_data_time;
133
134  time_t      lastConnectionAttemptAt;
135  time_t      lastConnectionAt;
136
137  /* similar to a TTL field, but less rigid --
138   * if the swarm is small, the atom will be kept past this date. */
139  time_t      shelf_date;
140  tr_peer   * peer;               /* will be NULL if not connected */
141  tr_address  addr;
142};
143
144#ifdef NDEBUG
145#define tr_isAtom(a) (TRUE)
146#else
147static bool
148tr_isAtom (const struct peer_atom * atom)
149{
150  return (atom != NULL)
151      && (atom->fromFirst < TR_PEER_FROM__MAX)
152      && (atom->fromBest < TR_PEER_FROM__MAX)
153      && (tr_address_is_valid (&atom->addr));
154}
155#endif
156
157static const char*
158tr_atomAddrStr (const struct peer_atom * atom)
159{
160  return atom ? tr_peerIoAddrStr (&atom->addr, atom->port) : "[no atom]";
161}
162
163struct block_request
164{
165  tr_block_index_t block;
166  tr_peer * peer;
167  time_t sentAt;
168};
169
170struct weighted_piece
171{
172  tr_piece_index_t index;
173  int16_t salt;
174  int16_t requestCount;
175};
176
177enum piece_sort_state
178{
179  PIECES_UNSORTED,
180  PIECES_SORTED_BY_INDEX,
181  PIECES_SORTED_BY_WEIGHT
182};
183
184/** @brief Opaque, per-torrent data structure for peer connection information */
185typedef struct tr_swarm
186{
187  tr_ptrArray                outgoingHandshakes; /* tr_handshake */
188  tr_ptrArray                pool; /* struct peer_atom */
189  tr_ptrArray                peers; /* tr_peer */
190  tr_ptrArray                webseeds; /* tr_webseed */
191
192  tr_torrent               * tor;
193  struct tr_peerMgr        * manager;
194
195  tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
196  int                        optimisticUnchokeTimeScaler;
197
198  bool                       isRunning;
199  bool                       needsCompletenessCheck;
200
201  struct block_request     * requests;
202  int                        requestCount;
203  int                        requestAlloc;
204
205  struct weighted_piece    * pieces;
206  int                        pieceCount;
207  enum piece_sort_state      pieceSortState;
208
209  /* An array of pieceCount items stating how many peers have each piece.
210     This is used to help us for downloading pieces "rarest first."
211     This may be NULL if we don't have metainfo yet, or if we're not
212     downloading and don't care about rarity */
213  uint16_t                 * pieceReplication;
214  size_t                     pieceReplicationSize;
215
216  int                        interestedCount;
217  int                        maxPeers;
218  time_t                     lastCancel;
219
220  /* Before the endgame this should be 0. In endgame, is contains the average
221   * number of pending requests per peer. Only peers which have more pending
222   * requests are considered 'fast' are allowed to request a block that's
223   * already been requested from another (slower?) peer. */
224  int                        endgame;
225}
226tr_swarm;
227
228struct tr_peerMgr
229{
230  tr_session    * session;
231  tr_ptrArray     incomingHandshakes; /* tr_handshake */
232  struct event  * bandwidthTimer;
233  struct event  * rechokeTimer;
234  struct event  * refillUpkeepTimer;
235  struct event  * atomTimer;
236};
237
238#define tordbg(t, ...) \
239  do \
240    { \
241      if (tr_logGetDeepEnabled ()) \
242        tr_logAddDeep (__FILE__, __LINE__, tr_torrentName (t->tor), __VA_ARGS__); \
243    } \
244  while (0)
245
246#define dbgmsg(...) \
247  do \
248    { \
249      if (tr_logGetDeepEnabled ()) \
250        tr_logAddDeep (__FILE__, __LINE__, NULL, __VA_ARGS__); \
251    } \
252  while (0)
253
254/**
255***
256**/
257
258static inline void
259managerLock (const struct tr_peerMgr * manager)
260{
261  tr_sessionLock (manager->session);
262}
263
264static inline void
265managerUnlock (const struct tr_peerMgr * manager)
266{
267  tr_sessionUnlock (manager->session);
268}
269
270static inline void
271swarmLock (tr_swarm * swarm)
272{
273  managerLock (swarm->manager);
274}
275
276static inline void
277swarmUnlock (tr_swarm * swarm)
278{
279  managerUnlock (swarm->manager);
280}
281
282static inline int
283swarmIsLocked (const tr_swarm * swarm)
284{
285  return tr_sessionIsLocked (swarm->manager->session);
286}
287
288/**
289***
290**/
291
292static int
293handshakeCompareToAddr (const void * va, const void * vb)
294{
295  const tr_handshake * a = va;
296
297  return tr_address_compare (tr_handshakeGetAddr (a, NULL), vb);
298}
299
300static int
301handshakeCompare (const void * a, const void * b)
302{
303  return handshakeCompareToAddr (a, tr_handshakeGetAddr (b, NULL));
304}
305
306static inline tr_handshake*
307getExistingHandshake (tr_ptrArray * handshakes, const tr_address * addr)
308{
309  if (tr_ptrArrayEmpty (handshakes))
310    return NULL;
311
312  return tr_ptrArrayFindSorted (handshakes, addr, handshakeCompareToAddr);
313}
314
315static int
316comparePeerAtomToAddress (const void * va, const void * vb)
317{
318  const struct peer_atom * a = va;
319
320  return tr_address_compare (&a->addr, vb);
321}
322
323static int
324compareAtomsByAddress (const void * va, const void * vb)
325{
326  const struct peer_atom * b = vb;
327
328  assert (tr_isAtom (b));
329
330  return comparePeerAtomToAddress (va, &b->addr);
331}
332
333/**
334***
335**/
336
337const tr_address *
338tr_peerAddress (const tr_peer * peer)
339{
340  return &peer->atom->addr;
341}
342
343static tr_swarm *
344getExistingSwarm (tr_peerMgr *    manager,
345                  const uint8_t * hash)
346{
347  tr_torrent * tor = tr_torrentFindFromHash (manager->session, hash);
348
349  return tor == NULL ? NULL : tor->swarm;
350}
351
352static int
353peerCompare (const void * a, const void * b)
354{
355  return tr_address_compare (tr_peerAddress (a), tr_peerAddress (b));
356}
357
358static struct peer_atom*
359getExistingAtom (const tr_swarm   * cswarm,
360                 const tr_address * addr)
361{
362  tr_swarm * swarm = (tr_swarm*) cswarm;
363  return tr_ptrArrayFindSorted (&swarm->pool, addr, comparePeerAtomToAddress);
364}
365
366static bool
367peerIsInUse (const tr_swarm * cs, const struct peer_atom * atom)
368{
369  tr_swarm * s = (tr_swarm*) cs;
370
371  assert (swarmIsLocked (s));
372
373  return (atom->peer != NULL)
374      || getExistingHandshake (&s->outgoingHandshakes, &atom->addr)
375      || getExistingHandshake (&s->manager->incomingHandshakes, &atom->addr);
376}
377
378void
379tr_peerConstruct (tr_peer * peer)
380{
381  memset (peer, 0, sizeof (tr_peer));
382
383  peer->have = TR_BITFIELD_INIT;
384}
385
386static tr_peer*
387peerNew (struct peer_atom * atom)
388{
389  tr_peer * peer = tr_new (tr_peer, 1);
390  tr_peerConstruct (peer);
391
392  peer->atom = atom;
393  atom->peer = peer;
394
395  return peer;
396}
397
398static tr_peer*
399getPeer (tr_swarm * s, struct peer_atom * atom)
400{
401  tr_peer * peer;
402
403  assert (swarmIsLocked (s));
404
405  peer = atom->peer;
406
407  if (peer == NULL)
408    {
409      peer = peerNew (atom);
410      tr_bitfieldConstruct (&peer->have, s->tor->info.pieceCount);
411      tr_bitfieldConstruct (&peer->blame, s->tor->blockCount);
412      tr_ptrArrayInsertSorted (&s->peers, peer, peerCompare);
413    }
414
415  return peer;
416}
417
418static void peerDeclinedAllRequests (tr_swarm *, const tr_peer *);
419
420void
421tr_peerDestruct (tr_torrent * tor, tr_peer * peer)
422{
423  assert (peer != NULL);
424
425  peerDeclinedAllRequests (tor->swarm, peer);
426
427  if (peer->msgs != NULL)
428    tr_peerMsgsFree (peer->msgs);
429
430  if (peer->io)
431    {
432      tr_peerIoClear (peer->io);
433      tr_peerIoUnref (peer->io); /* balanced by the ref in handshakeDoneCB () */
434    }
435
436  tr_bitfieldDestruct (&peer->have);
437  tr_bitfieldDestruct (&peer->blame);
438
439  if (peer->atom)
440    peer->atom->peer = NULL;
441}
442
443static void
444peerDelete (tr_swarm * s, tr_peer * peer)
445{
446  tr_peerDestruct (s->tor, peer);
447  tr_free (peer);
448}
449
450static inline bool
451replicationExists (const tr_swarm * s)
452{
453  return s->pieceReplication != NULL;
454}
455
456static void
457replicationFree (tr_swarm * s)
458{
459  tr_free (s->pieceReplication);
460  s->pieceReplication = NULL;
461  s->pieceReplicationSize = 0;
462}
463
464static void
465replicationNew (tr_swarm * s)
466{
467  tr_piece_index_t piece_i;
468  const tr_piece_index_t piece_count = s->tor->info.pieceCount;
469  tr_peer ** peers = (tr_peer**) tr_ptrArrayBase (&s->peers);
470  const int peer_count = tr_ptrArraySize (&s->peers);
471
472  assert (!replicationExists (s));
473
474  s->pieceReplicationSize = piece_count;
475  s->pieceReplication = tr_new0 (uint16_t, piece_count);
476
477  for (piece_i=0; piece_i<piece_count; ++piece_i)
478    {
479      int peer_i;
480      uint16_t r = 0;
481
482      for (peer_i=0; peer_i<peer_count; ++peer_i)
483        if (tr_bitfieldHas (&peers[peer_i]->have, piece_i))
484          ++r;
485
486      s->pieceReplication[piece_i] = r;
487    }
488}
489
490static void
491swarmFree (void * vs)
492{
493  tr_swarm * s = vs;
494
495  assert (s);
496  assert (!s->isRunning);
497  assert (swarmIsLocked (s));
498  assert (tr_ptrArrayEmpty (&s->outgoingHandshakes));
499  assert (tr_ptrArrayEmpty (&s->peers));
500
501  tr_ptrArrayDestruct (&s->webseeds, (PtrArrayForeachFunc)tr_webseedFree);
502  tr_ptrArrayDestruct (&s->pool, (PtrArrayForeachFunc)tr_free);
503  tr_ptrArrayDestruct (&s->outgoingHandshakes, NULL);
504  tr_ptrArrayDestruct (&s->peers, NULL);
505
506  replicationFree (s);
507
508  tr_free (s->requests);
509  tr_free (s->pieces);
510  tr_free (s);
511}
512
513static void peerCallbackFunc (tr_peer *, const tr_peer_event *, void *);
514
515static void
516rebuildWebseedArray (tr_swarm * s, tr_torrent * tor)
517{
518  unsigned int i;
519  const tr_info * inf = &tor->info;
520
521  /* clear the array */
522  tr_ptrArrayDestruct (&s->webseeds, (PtrArrayForeachFunc)tr_webseedFree);
523  s->webseeds = TR_PTR_ARRAY_INIT;
524
525  /* repopulate it */
526  for (i = 0; i < inf->webseedCount; ++i)
527    {
528      tr_webseed * w = tr_webseedNew (tor, inf->webseeds[i], peerCallbackFunc, s);
529      tr_ptrArrayAppend (&s->webseeds, w);
530    }
531}
532
533static tr_swarm *
534swarmNew (tr_peerMgr * manager, tr_torrent * tor)
535{
536  tr_swarm * s;
537
538  s = tr_new0 (tr_swarm, 1);
539  s->manager = manager;
540  s->tor = tor;
541  s->pool = TR_PTR_ARRAY_INIT;
542  s->peers = TR_PTR_ARRAY_INIT;
543  s->webseeds = TR_PTR_ARRAY_INIT;
544  s->outgoingHandshakes = TR_PTR_ARRAY_INIT;
545
546  rebuildWebseedArray (s, tor);
547
548  return s;
549}
550
551static void ensureMgrTimersExist (struct tr_peerMgr * m);
552
553tr_peerMgr*
554tr_peerMgrNew (tr_session * session)
555{
556  tr_peerMgr * m = tr_new0 (tr_peerMgr, 1);
557  m->session = session;
558  m->incomingHandshakes = TR_PTR_ARRAY_INIT;
559  ensureMgrTimersExist (m);
560  return m;
561}
562
563static void
564deleteTimer (struct event ** t)
565{
566  if (*t != NULL)
567    {
568      event_free (*t);
569      *t = NULL;
570    }
571}
572
573static void
574deleteTimers (struct tr_peerMgr * m)
575{
576  deleteTimer (&m->atomTimer);
577  deleteTimer (&m->bandwidthTimer);
578  deleteTimer (&m->rechokeTimer);
579  deleteTimer (&m->refillUpkeepTimer);
580}
581
582void
583tr_peerMgrFree (tr_peerMgr * manager)
584{
585  managerLock (manager);
586
587  deleteTimers (manager);
588
589  /* free the handshakes. Abort invokes handshakeDoneCB (), which removes
590   * the item from manager->handshakes, so this is a little roundabout... */
591  while (!tr_ptrArrayEmpty (&manager->incomingHandshakes))
592    tr_handshakeAbort (tr_ptrArrayNth (&manager->incomingHandshakes, 0));
593
594  tr_ptrArrayDestruct (&manager->incomingHandshakes, NULL);
595
596  managerUnlock (manager);
597  tr_free (manager);
598}
599
600static int
601clientIsDownloadingFrom (const tr_torrent * tor, const tr_peer * peer)
602{
603  if (!tr_torrentHasMetadata (tor))
604    return true;
605
606  return peer->clientIsInterested && !peer->clientIsChoked;
607}
608
609static int
610clientIsUploadingTo (const tr_peer * peer)
611{
612  return peer->peerIsInterested && !peer->peerIsChoked;
613}
614
615/***
616****
617***/
618
619void
620tr_peerMgrOnBlocklistChanged (tr_peerMgr * mgr)
621{
622  tr_torrent * tor = NULL;
623  tr_session * session = mgr->session;
624
625  /* we cache whether or not a peer is blocklisted...
626     since the blocklist has changed, erase that cached value */
627  while ((tor = tr_torrentNext (session, tor)))
628    {
629      int i;
630      tr_swarm * s = tor->swarm;
631      const int n = tr_ptrArraySize (&s->pool);
632      for (i=0; i<n; ++i)
633        {
634          struct peer_atom * atom = tr_ptrArrayNth (&s->pool, i);
635          atom->blocklisted = -1;
636        }
637    }
638}
639
640static bool
641isAtomBlocklisted (tr_session * session, struct peer_atom * atom)
642{
643  if (atom->blocklisted < 0)
644    atom->blocklisted = tr_sessionIsAddressBlocked (session, &atom->addr);
645
646  assert (tr_isBool (atom->blocklisted));
647  return atom->blocklisted;
648}
649
650
651/***
652****
653***/
654
655static void
656atomSetSeedProbability (struct peer_atom * atom, int seedProbability)
657{
658  assert (atom != NULL);
659  assert (-1<=seedProbability && seedProbability<=100);
660
661  atom->seedProbability = seedProbability;
662
663  if (seedProbability == 100)
664    atom->flags |= ADDED_F_SEED_FLAG;
665  else if (seedProbability != -1)
666    atom->flags &= ~ADDED_F_SEED_FLAG;
667}
668
669static inline bool
670atomIsSeed (const struct peer_atom * atom)
671{
672  return atom->seedProbability == 100;
673}
674
675static void
676atomSetSeed (const tr_swarm * s, struct peer_atom * atom)
677{
678  if (!atomIsSeed (atom))
679    {
680      tordbg (s, "marking peer %s as a seed", tr_atomAddrStr (atom));
681
682      atomSetSeedProbability (atom, 100);
683    }
684}
685
686
687bool
688tr_peerMgrPeerIsSeed (const tr_torrent  * tor,
689                      const tr_address  * addr)
690{
691  bool isSeed = false;
692  const tr_swarm * s = tor->swarm;
693  const struct peer_atom * atom = getExistingAtom (s, addr);
694
695  if (atom)
696    isSeed = atomIsSeed (atom);
697
698  return isSeed;
699}
700
701void
702tr_peerMgrSetUtpSupported (tr_torrent * tor, const tr_address * addr)
703{
704  struct peer_atom * atom = getExistingAtom (tor->swarm, addr);
705
706  if (atom)
707    atom->flags |= ADDED_F_UTP_FLAGS;
708}
709
710void
711tr_peerMgrSetUtpFailed (tr_torrent *tor, const tr_address *addr, bool failed)
712{
713  struct peer_atom * atom = getExistingAtom (tor->swarm, addr);
714
715  if (atom)
716    atom->utp_failed = failed;
717}
718
719
720/**
721***  REQUESTS
722***
723*** There are two data structures associated with managing block requests:
724***
725*** 1. tr_swarm::requests, an array of "struct block_request" which keeps
726***    track of which blocks have been requested, and when, and by which peers.
727***    This is list is used for (a) cancelling requests that have been pending
728***    for too long and (b) avoiding duplicate requests before endgame.
729***
730*** 2. tr_swarm::pieces, an array of "struct weighted_piece" which lists the
731***    pieces that we want to request. It's used to decide which blocks to
732***    return next when tr_peerMgrGetBlockRequests () is called.
733**/
734
735/**
736*** struct block_request
737**/
738
739static int
740compareReqByBlock (const void * va, const void * vb)
741{
742  const struct block_request * a = va;
743  const struct block_request * b = vb;
744
745  /* primary key: block */
746  if (a->block < b->block) return -1;
747  if (a->block > b->block) return 1;
748
749  /* secondary key: peer */
750  if (a->peer < b->peer) return -1;
751  if (a->peer > b->peer) return 1;
752
753  return 0;
754}
755
756static void
757requestListAdd (tr_swarm * s, tr_block_index_t block, tr_peer * peer)
758{
759  struct block_request key;
760
761  /* ensure enough room is available... */
762  if (s->requestCount + 1 >= s->requestAlloc)
763    {
764      const int CHUNK_SIZE = 128;
765      s->requestAlloc += CHUNK_SIZE;
766      s->requests = tr_renew (struct block_request,
767                              s->requests, s->requestAlloc);
768    }
769
770  /* populate the record we're inserting */
771  key.block = block;
772  key.peer = peer;
773  key.sentAt = tr_time ();
774
775  /* insert the request to our array... */
776  {
777    bool exact;
778    const int pos = tr_lowerBound (&key, s->requests, s->requestCount,
779                                   sizeof (struct block_request),
780                                   compareReqByBlock, &exact);
781    assert (!exact);
782    memmove (s->requests + pos + 1,
783             s->requests + pos,
784             sizeof (struct block_request) * (s->requestCount++ - pos));
785    s->requests[pos] = key;
786  }
787
788  if (peer != NULL)
789    {
790      ++peer->pendingReqsToPeer;
791      assert (peer->pendingReqsToPeer >= 0);
792    }
793
794  /*fprintf (stderr, "added request of block %lu from peer %s... "
795                     "there are now %d block\n",
796                     (unsigned long)block, tr_atomAddrStr (peer->atom), s->requestCount);*/
797}
798
799static struct block_request *
800requestListLookup (tr_swarm * s, tr_block_index_t block, const tr_peer * peer)
801{
802  struct block_request key;
803  key.block = block;
804  key.peer = (tr_peer*) peer;
805
806  return bsearch (&key, s->requests, s->requestCount,
807                  sizeof (struct block_request),
808                  compareReqByBlock);
809}
810
811/**
812 * Find the peers are we currently requesting the block
813 * with index @a block from and append them to @a peerArr.
814 */
815static void
816getBlockRequestPeers (tr_swarm * s, tr_block_index_t block,
817                      tr_ptrArray * peerArr)
818{
819  bool exact;
820  int i, pos;
821  struct block_request key;
822
823  key.block = block;
824  key.peer = NULL;
825  pos = tr_lowerBound (&key, s->requests, s->requestCount,
826                       sizeof (struct block_request),
827                       compareReqByBlock, &exact);
828
829  assert (!exact); /* shouldn't have a request with .peer == NULL */
830
831  for (i=pos; i<s->requestCount; ++i)
832    {
833      if (s->requests[i].block != block)
834        break;
835      tr_ptrArrayAppend (peerArr, s->requests[i].peer);
836    }
837}
838
839static void
840decrementPendingReqCount (const struct block_request * b)
841{
842  if (b->peer != NULL)
843    if (b->peer->pendingReqsToPeer > 0)
844      --b->peer->pendingReqsToPeer;
845}
846
847static void
848requestListRemove (tr_swarm * s, tr_block_index_t block, const tr_peer * peer)
849{
850  const struct block_request * b = requestListLookup (s, block, peer);
851
852  if (b != NULL)
853    {
854      const int pos = b - s->requests;
855      assert (pos < s->requestCount);
856
857      decrementPendingReqCount (b);
858
859      tr_removeElementFromArray (s->requests,
860                                 pos,
861                                 sizeof (struct block_request),
862                                 s->requestCount--);
863
864      /*fprintf (stderr, "removing request of block %lu from peer %s... "
865                         "there are now %d block requests left\n",
866                         (unsigned long)block, tr_atomAddrStr (peer->atom), t->requestCount);*/
867    }
868}
869
870static int
871countActiveWebseeds (const tr_swarm * s)
872{
873  int activeCount = 0;
874  const tr_webseed ** w = (const tr_webseed **) tr_ptrArrayBase (&s->webseeds);
875  const tr_webseed ** const wend = w + tr_ptrArraySize (&s->webseeds);
876
877  for (; w!=wend; ++w)
878    if (tr_webseedIsActive (*w))
879      ++activeCount;
880
881  return activeCount;
882}
883
884static bool
885testForEndgame (const tr_swarm * s)
886{
887  /* we consider ourselves to be in endgame if the number of bytes
888     we've got requested is >= the number of bytes left to download */
889  return (s->requestCount * s->tor->blockSize)
890               >= tr_cpLeftUntilDone (&s->tor->completion);
891}
892
893static void
894updateEndgame (tr_swarm * s)
895{
896  assert (s->requestCount >= 0);
897
898  if (!testForEndgame (s))
899    {
900      /* not in endgame */
901      s->endgame = 0;
902    }
903  else if (!s->endgame) /* only recalculate when endgame first begins */
904    {
905      int numDownloading = 0;
906      const tr_peer ** p = (const tr_peer **) tr_ptrArrayBase (&s->peers);
907      const tr_peer ** const pend = p + tr_ptrArraySize (&s->peers);
908
909      /* add the active bittorrent peers... */
910      for (; p!=pend; ++p)
911        if ((*p)->pendingReqsToPeer > 0)
912          ++numDownloading;
913
914      /* add the active webseeds... */
915      numDownloading += countActiveWebseeds (s);
916
917      /* average number of pending requests per downloading peer */
918      s->endgame = s->requestCount / MAX (numDownloading, 1);
919    }
920}
921
922
923/****
924*****
925*****  Piece List Manipulation / Accessors
926*****
927****/
928
929static inline void
930invalidatePieceSorting (tr_swarm * s)
931{
932  s->pieceSortState = PIECES_UNSORTED;
933}
934
935static const tr_torrent * weightTorrent;
936
937static const uint16_t * weightReplication;
938
939static void
940setComparePieceByWeightTorrent (tr_swarm * s)
941{
942  if (!replicationExists (s))
943    replicationNew (s);
944
945  weightTorrent = s->tor;
946  weightReplication = s->pieceReplication;
947}
948
949/* we try to create a "weight" s.t. high-priority pieces come before others,
950 * and that partially-complete pieces come before empty ones. */
951static int
952comparePieceByWeight (const void * va, const void * vb)
953{
954  const struct weighted_piece * a = va;
955  const struct weighted_piece * b = vb;
956  int ia, ib, missing, pending;
957  const tr_torrent * tor = weightTorrent;
958  const uint16_t * rep = weightReplication;
959
960  /* primary key: weight */
961  missing = tr_cpMissingBlocksInPiece (&tor->completion, a->index);
962  pending = a->requestCount;
963  ia = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
964  missing = tr_cpMissingBlocksInPiece (&tor->completion, b->index);
965  pending = b->requestCount;
966  ib = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
967  if (ia < ib) return -1;
968  if (ia > ib) return 1;
969
970  /* secondary key: higher priorities go first */
971  ia = tor->info.pieces[a->index].priority;
972  ib = tor->info.pieces[b->index].priority;
973  if (ia > ib) return -1;
974  if (ia < ib) return 1;
975
976  /* tertiary key: rarest first. */
977  ia = rep[a->index];
978  ib = rep[b->index];
979  if (ia < ib) return -1;
980  if (ia > ib) return 1;
981
982  /* quaternary key: random */
983  if (a->salt < b->salt) return -1;
984  if (a->salt > b->salt) return 1;
985
986  /* okay, they're equal */
987  return 0;
988}
989
990static int
991comparePieceByIndex (const void * va, const void * vb)
992{
993  const struct weighted_piece * a = va;
994  const struct weighted_piece * b = vb;
995  if (a->index < b->index) return -1;
996  if (a->index > b->index) return 1;
997  return 0;
998}
999
1000static void
1001pieceListSort (tr_swarm * s, enum piece_sort_state state)
1002{
1003  assert (state==PIECES_SORTED_BY_INDEX
1004       || state==PIECES_SORTED_BY_WEIGHT);
1005
1006  if (state == PIECES_SORTED_BY_WEIGHT)
1007    {
1008      setComparePieceByWeightTorrent (s);
1009      qsort (s->pieces, s->pieceCount, sizeof (struct weighted_piece), comparePieceByWeight);
1010    }
1011  else
1012    {
1013      qsort (s->pieces, s->pieceCount, sizeof (struct weighted_piece), comparePieceByIndex);
1014    }
1015
1016  s->pieceSortState = state;
1017}
1018
1019/**
1020 * These functions are useful for testing, but too expensive for nightly builds.
1021 * let's leave it disabled but add an easy hook to compile it back in
1022 */
1023#if 1
1024#define assertWeightedPiecesAreSorted(t)
1025#define assertReplicationCountIsExact(t)
1026#else
1027static void
1028assertWeightedPiecesAreSorted (Torrent * t)
1029{
1030    if (!t->endgame)
1031    {
1032        int i;
1033        setComparePieceByWeightTorrent (t);
1034        for (i=0; i<t->pieceCount-1; ++i)
1035            assert (comparePieceByWeight (&t->pieces[i], &t->pieces[i+1]) <= 0);
1036    }
1037}
1038static void
1039assertReplicationCountIsExact (Torrent * t)
1040{
1041    /* This assert might fail due to errors of implementations in other
1042     * clients. It happens when receiving duplicate bitfields/HaveAll/HaveNone
1043     * from a client. If a such a behavior is noticed,
1044     * a bug report should be filled to the faulty client. */
1045
1046    size_t piece_i;
1047    const uint16_t * rep = t->pieceReplication;
1048    const size_t piece_count = t->pieceReplicationSize;
1049    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase (&t->peers);
1050    const int peer_count = tr_ptrArraySize (&t->peers);
1051
1052    assert (piece_count == t->tor->info.pieceCount);
1053
1054    for (piece_i=0; piece_i<piece_count; ++piece_i)
1055    {
1056        int peer_i;
1057        uint16_t r = 0;
1058
1059        for (peer_i=0; peer_i<peer_count; ++peer_i)
1060            if (tr_bitsetHas (&peers[peer_i]->have, piece_i))
1061                ++r;
1062
1063        assert (rep[piece_i] == r);
1064    }
1065}
1066#endif
1067
1068static struct weighted_piece *
1069pieceListLookup (tr_swarm * s, tr_piece_index_t index)
1070{
1071  int i;
1072
1073  for (i=0; i<s->pieceCount; ++i)
1074    if (s->pieces[i].index == index)
1075      return &s->pieces[i];
1076
1077  return NULL;
1078}
1079
1080static void
1081pieceListRebuild (tr_swarm * s)
1082{
1083  if (!tr_torrentIsSeed (s->tor))
1084    {
1085      tr_piece_index_t i;
1086      tr_piece_index_t * pool;
1087      tr_piece_index_t poolCount = 0;
1088      const tr_torrent * tor = s->tor;
1089      const tr_info * inf = tr_torrentInfo (tor);
1090      struct weighted_piece * pieces;
1091      int pieceCount;
1092
1093      /* build the new list */
1094      pool = tr_new (tr_piece_index_t, inf->pieceCount);
1095      for (i=0; i<inf->pieceCount; ++i)
1096        if (!inf->pieces[i].dnd)
1097          if (!tr_cpPieceIsComplete (&tor->completion, i))
1098            pool[poolCount++] = i;
1099      pieceCount = poolCount;
1100      pieces = tr_new0 (struct weighted_piece, pieceCount);
1101      for (i=0; i<poolCount; ++i)
1102        {
1103          struct weighted_piece * piece = pieces + i;
1104          piece->index = pool[i];
1105          piece->requestCount = 0;
1106          piece->salt = tr_cryptoWeakRandInt (4096);
1107        }
1108
1109      /* if we already had a list of pieces, merge it into
1110       * the new list so we don't lose its requestCounts */
1111      if (s->pieces != NULL)
1112        {
1113          struct weighted_piece * o = s->pieces;
1114          struct weighted_piece * oend = o + s->pieceCount;
1115          struct weighted_piece * n = pieces;
1116          struct weighted_piece * nend = n + pieceCount;
1117
1118          pieceListSort (s, PIECES_SORTED_BY_INDEX);
1119
1120          while (o!=oend && n!=nend)
1121            {
1122              if (o->index < n->index)
1123                ++o;
1124              else if (o->index > n->index)
1125                ++n;
1126              else
1127                *n++ = *o++;
1128            }
1129
1130          tr_free (s->pieces);
1131        }
1132
1133      s->pieces = pieces;
1134      s->pieceCount = pieceCount;
1135
1136      pieceListSort (s, PIECES_SORTED_BY_WEIGHT);
1137
1138      /* cleanup */
1139      tr_free (pool);
1140    }
1141}
1142
1143static void
1144pieceListRemovePiece (tr_swarm * s, tr_piece_index_t piece)
1145{
1146  struct weighted_piece * p;
1147
1148  if ((p = pieceListLookup (s, piece)))
1149    {
1150      const int pos = p - s->pieces;
1151
1152      tr_removeElementFromArray (s->pieces,
1153                                 pos,
1154                                 sizeof (struct weighted_piece),
1155                                 s->pieceCount--);
1156
1157      if (s->pieceCount == 0)
1158        {
1159          tr_free (s->pieces);
1160          s->pieces = NULL;
1161        }
1162    }
1163}
1164
1165static void
1166pieceListResortPiece (tr_swarm * s, struct weighted_piece * p)
1167{
1168  int pos;
1169  bool isSorted = true;
1170
1171  if (p == NULL)
1172    return;
1173
1174  /* is the torrent already sorted? */
1175  pos = p - s->pieces;
1176  setComparePieceByWeightTorrent (s);
1177  if (isSorted && (pos > 0) && (comparePieceByWeight (p-1, p) > 0))
1178    isSorted = false;
1179  if (isSorted && (pos < s->pieceCount - 1) && (comparePieceByWeight (p, p+1) > 0))
1180    isSorted = false;
1181
1182  if (s->pieceSortState != PIECES_SORTED_BY_WEIGHT)
1183    {
1184      pieceListSort (s, PIECES_SORTED_BY_WEIGHT);
1185      isSorted = true;
1186    }
1187
1188  /* if it's not sorted, move it around */
1189  if (!isSorted)
1190    {
1191      bool exact;
1192      const struct weighted_piece tmp = *p;
1193
1194      tr_removeElementFromArray (s->pieces,
1195                                 pos,
1196                                 sizeof (struct weighted_piece),
1197                                 s->pieceCount--);
1198
1199      pos = tr_lowerBound (&tmp, s->pieces, s->pieceCount,
1200                           sizeof (struct weighted_piece),
1201                           comparePieceByWeight, &exact);
1202
1203      memmove (&s->pieces[pos + 1],
1204               &s->pieces[pos],
1205               sizeof (struct weighted_piece) * (s->pieceCount++ - pos));
1206
1207      s->pieces[pos] = tmp;
1208    }
1209
1210  assertWeightedPiecesAreSorted (s);
1211}
1212
1213static void
1214pieceListRemoveRequest (tr_swarm * s, tr_block_index_t block)
1215{
1216  struct weighted_piece * p;
1217  const tr_piece_index_t index = tr_torBlockPiece (s->tor, block);
1218
1219  if (((p = pieceListLookup (s, index))) && (p->requestCount > 0))
1220    {
1221      --p->requestCount;
1222      pieceListResortPiece (s, p);
1223    }
1224}
1225
1226
1227/****
1228*****
1229*****  Replication count (for rarest first policy)
1230*****
1231****/
1232
1233/**
1234 * Increase the replication count of this piece and sort it if the
1235 * piece list is already sorted
1236 */
1237static void
1238tr_incrReplicationOfPiece (tr_swarm * s, const size_t index)
1239{
1240  assert (replicationExists (s));
1241  assert (s->pieceReplicationSize == s->tor->info.pieceCount);
1242
1243  /* One more replication of this piece is present in the swarm */
1244  ++s->pieceReplication[index];
1245
1246  /* we only resort the piece if the list is already sorted */
1247  if (s->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1248    pieceListResortPiece (s, pieceListLookup (s, index));
1249}
1250
1251/**
1252 * Increases the replication count of pieces present in the bitfield
1253 */
1254static void
1255tr_incrReplicationFromBitfield (tr_swarm * s, const tr_bitfield * b)
1256{
1257  size_t i;
1258  uint16_t * rep = s->pieceReplication;
1259  const size_t n = s->tor->info.pieceCount;
1260
1261  assert (replicationExists (s));
1262
1263  for (i=0; i<n; ++i)
1264    if (tr_bitfieldHas (b, i))
1265      ++rep[i];
1266
1267  if (s->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1268    invalidatePieceSorting (s);
1269}
1270
1271/**
1272 * Increase the replication count of every piece
1273 */
1274static void
1275tr_incrReplication (tr_swarm * s)
1276{
1277  int i;
1278  const int n = s->pieceReplicationSize;
1279
1280  assert (replicationExists (s));
1281  assert (s->pieceReplicationSize == s->tor->info.pieceCount);
1282
1283  for (i=0; i<n; ++i)
1284    ++s->pieceReplication[i];
1285}
1286
1287/**
1288 * Decrease the replication count of pieces present in the bitset.
1289 */
1290static void
1291tr_decrReplicationFromBitfield (tr_swarm * s, const tr_bitfield * b)
1292{
1293  int i;
1294  const int n = s->pieceReplicationSize;
1295
1296  assert (replicationExists (s));
1297  assert (s->pieceReplicationSize == s->tor->info.pieceCount);
1298
1299  if (tr_bitfieldHasAll (b))
1300    {
1301      for (i=0; i<n; ++i)
1302        --s->pieceReplication[i];
1303    }
1304  else if (!tr_bitfieldHasNone (b))
1305    {
1306      for (i=0; i<n; ++i)
1307        if (tr_bitfieldHas (b, i))
1308          --s->pieceReplication[i];
1309
1310      if (s->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1311        invalidatePieceSorting (s);
1312    }
1313}
1314
1315/**
1316***
1317**/
1318
1319void
1320tr_peerMgrRebuildRequests (tr_torrent * tor)
1321{
1322  assert (tr_isTorrent (tor));
1323
1324  pieceListRebuild (tor->swarm);
1325}
1326
1327void
1328tr_peerMgrGetNextRequests (tr_torrent           * tor,
1329                           tr_peer              * peer,
1330                           int                    numwant,
1331                           tr_block_index_t     * setme,
1332                           int                  * numgot,
1333                           bool                   get_intervals)
1334{
1335  int i;
1336  int got;
1337  tr_swarm * s;
1338  struct weighted_piece * pieces;
1339  const tr_bitfield * const have = &peer->have;
1340
1341  /* sanity clause */
1342  assert (tr_isTorrent (tor));
1343  assert (peer->clientIsInterested);
1344  assert (!peer->clientIsChoked);
1345  assert (numwant > 0);
1346
1347  /* walk through the pieces and find blocks that should be requested */
1348  got = 0;
1349  s = tor->swarm;
1350
1351  /* prep the pieces list */
1352  if (s->pieces == NULL)
1353    pieceListRebuild (s);
1354
1355  if (s->pieceSortState != PIECES_SORTED_BY_WEIGHT)
1356    pieceListSort (s, PIECES_SORTED_BY_WEIGHT);
1357
1358  assertReplicationCountIsExact (s);
1359  assertWeightedPiecesAreSorted (s);
1360
1361  updateEndgame (s);
1362  pieces = s->pieces;
1363  for (i=0; i<s->pieceCount && got<numwant; ++i)
1364    {
1365      struct weighted_piece * p = pieces + i;
1366
1367      /* if the peer has this piece that we want... */
1368      if (tr_bitfieldHas (have, p->index))
1369        {
1370          tr_block_index_t b;
1371          tr_block_index_t first;
1372          tr_block_index_t last;
1373          tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1374
1375          tr_torGetPieceBlockRange (tor, p->index, &first, &last);
1376
1377          for (b=first; b<=last && (got<numwant || (get_intervals && setme[2*got-1] == b-1)); ++b)
1378            {
1379              int peerCount;
1380              tr_peer ** peers;
1381
1382              /* don't request blocks we've already got */
1383              if (tr_cpBlockIsComplete (&tor->completion, b))
1384                continue;
1385
1386              /* always add peer if this block has no peers yet */
1387              tr_ptrArrayClear (&peerArr);
1388              getBlockRequestPeers (s, b, &peerArr);
1389              peers = (tr_peer **) tr_ptrArrayPeek (&peerArr, &peerCount);
1390              if (peerCount != 0)
1391                {
1392                  /* don't make a second block request until the endgame */
1393                  if (!s->endgame)
1394                    continue;
1395
1396                  /* don't have more than two peers requesting this block */
1397                  if (peerCount > 1)
1398                    continue;
1399
1400                  /* don't send the same request to the same peer twice */
1401                  if (peer == peers[0])
1402                    continue;
1403
1404                  /* in the endgame allow an additional peer to download a
1405                     block but only if the peer seems to be handling requests
1406                     relatively fast */
1407                  if (peer->pendingReqsToPeer + numwant - got < s->endgame)
1408                    continue;
1409                }
1410
1411              /* update the caller's table */
1412              if (!get_intervals)
1413                {
1414                  setme[got++] = b;
1415                }
1416              /* if intervals are requested two array entries are necessarry:
1417                 one for the interval's starting block and one for its end block */
1418              else if (got && setme[2 * got - 1] == b - 1 && b != first)
1419                {
1420                  /* expand the last interval */
1421                  ++setme[2 * got - 1];
1422                }
1423              else
1424                {
1425                  /* begin a new interval */
1426                  setme[2 * got] = setme[2 * got + 1] = b;
1427                  ++got;
1428                }
1429
1430              /* update our own tables */
1431              requestListAdd (s, b, peer);
1432              ++p->requestCount;
1433            }
1434
1435          tr_ptrArrayDestruct (&peerArr, NULL);
1436        }
1437    }
1438
1439  /* In most cases we've just changed the weights of a small number of pieces.
1440   * So rather than qsort ()ing the entire array, it's faster to apply an
1441   * adaptive insertion sort algorithm. */
1442  if (got > 0)
1443    {
1444      /* not enough requests || last piece modified */
1445      if (i == s->pieceCount)
1446        --i;
1447
1448      setComparePieceByWeightTorrent (s);
1449      while (--i >= 0)
1450        {
1451          bool exact;
1452
1453          /* relative position! */
1454          const int newpos = tr_lowerBound (&s->pieces[i], &s->pieces[i + 1],
1455                                            s->pieceCount - (i + 1),
1456                                            sizeof (struct weighted_piece),
1457                                            comparePieceByWeight, &exact);
1458          if (newpos > 0)
1459            {
1460              const struct weighted_piece piece = s->pieces[i];
1461              memmove (&s->pieces[i],
1462                       &s->pieces[i + 1],
1463                       sizeof (struct weighted_piece) * (newpos));
1464              s->pieces[i + newpos] = piece;
1465            }
1466        }
1467    }
1468
1469  assertWeightedPiecesAreSorted (t);
1470  *numgot = got;
1471}
1472
1473bool
1474tr_peerMgrDidPeerRequest (const tr_torrent  * tor,
1475                          const tr_peer     * peer,
1476                          tr_block_index_t    block)
1477{
1478  return requestListLookup ((tr_swarm*)tor->swarm, block, peer) != NULL;
1479}
1480
1481/* cancel requests that are too old */
1482static void
1483refillUpkeep (int foo UNUSED, short bar UNUSED, void * vmgr)
1484{
1485    time_t now;
1486    time_t too_old;
1487    tr_torrent * tor;
1488    int cancel_buflen = 0;
1489    struct block_request * cancel = NULL;
1490    tr_peerMgr * mgr = vmgr;
1491    managerLock (mgr);
1492
1493    now = tr_time ();
1494    too_old = now - REQUEST_TTL_SECS;
1495
1496    /* alloc the temporary "cancel" buffer */
1497    tor = NULL;
1498    while ((tor = tr_torrentNext (mgr->session, tor)))
1499        cancel_buflen = MAX (cancel_buflen, tor->swarm->requestCount);
1500    if (cancel_buflen > 0)
1501        cancel = tr_new (struct block_request, cancel_buflen);
1502
1503    /* prune requests that are too old */
1504    tor = NULL;
1505    while ((tor = tr_torrentNext (mgr->session, tor)))
1506    {
1507        tr_swarm * s = tor->swarm;
1508        const int n = s->requestCount;
1509        if (n > 0)
1510        {
1511            int keepCount = 0;
1512            int cancelCount = 0;
1513            const struct block_request * it;
1514            const struct block_request * end;
1515
1516            for (it=s->requests, end=it+n; it!=end; ++it)
1517            {
1518                if ((it->sentAt <= too_old) && it->peer->msgs && !tr_peerMsgsIsReadingBlock (it->peer->msgs, it->block))
1519                    cancel[cancelCount++] = *it;
1520                else
1521                {
1522                    if (it != &s->requests[keepCount])
1523                        s->requests[keepCount] = *it;
1524                    keepCount++;
1525                }
1526            }
1527
1528            /* prune out the ones we aren't keeping */
1529            s->requestCount = keepCount;
1530
1531            /* send cancel messages for all the "cancel" ones */
1532            for (it=cancel, end=it+cancelCount; it!=end; ++it) {
1533                if ((it->peer != NULL) && (it->peer->msgs != NULL)) {
1534                    tr_historyAdd (&it->peer->cancelsSentToPeer, now, 1);
1535                    tr_peerMsgsCancel (it->peer->msgs, it->block);
1536                    decrementPendingReqCount (it);
1537                }
1538            }
1539
1540            /* decrement the pending request counts for the timed-out blocks */
1541            for (it=cancel, end=it+cancelCount; it!=end; ++it)
1542                pieceListRemoveRequest (s, it->block);
1543        }
1544    }
1545
1546    tr_free (cancel);
1547    tr_timerAddMsec (mgr->refillUpkeepTimer, REFILL_UPKEEP_PERIOD_MSEC);
1548    managerUnlock (mgr);
1549}
1550
1551static void
1552addStrike (tr_swarm * s, tr_peer * peer)
1553{
1554  tordbg (s, "increasing peer %s strike count to %d",
1555          tr_atomAddrStr (peer->atom), peer->strikes + 1);
1556
1557  if (++peer->strikes >= MAX_BAD_PIECES_PER_PEER)
1558    {
1559      struct peer_atom * atom = peer->atom;
1560      atom->flags2 |= MYFLAG_BANNED;
1561      peer->doPurge = 1;
1562      tordbg (s, "banning peer %s", tr_atomAddrStr (atom));
1563    }
1564}
1565
1566static void
1567peerSuggestedPiece (tr_swarm           * s UNUSED,
1568                    tr_peer            * peer UNUSED,
1569                    tr_piece_index_t     pieceIndex UNUSED,
1570                    int                  isFastAllowed UNUSED)
1571{
1572#if 0
1573    assert (t);
1574    assert (peer);
1575    assert (peer->msgs);
1576
1577    /* is this a valid piece? */
1578    if (pieceIndex >= t->tor->info.pieceCount)
1579        return;
1580
1581    /* don't ask for it if we've already got it */
1582    if (tr_cpPieceIsComplete (t->tor->completion, pieceIndex))
1583        return;
1584
1585    /* don't ask for it if they don't have it */
1586    if (!tr_bitfieldHas (peer->have, pieceIndex))
1587        return;
1588
1589    /* don't ask for it if we're choked and it's not fast */
1590    if (!isFastAllowed && peer->clientIsChoked)
1591        return;
1592
1593    /* request the blocks that we don't have in this piece */
1594    {
1595        tr_block_index_t b;
1596        tr_block_index_t first;
1597        tr_block_index_t last;
1598        const tr_torrent * tor = t->tor;
1599
1600        tr_torGetPieceBlockRange (t->tor, pieceIndex, &first, &last);
1601
1602        for (b=first; b<=last; ++b)
1603        {
1604            if (!tr_cpBlockIsComplete (tor->completion, b))
1605            {
1606                const uint32_t offset = getBlockOffsetInPiece (tor, b);
1607                const uint32_t length = tr_torBlockCountBytes (tor, b);
1608                tr_peerMsgsAddRequest (peer->msgs, pieceIndex, offset, length);
1609                incrementPieceRequests (t, pieceIndex);
1610            }
1611        }
1612    }
1613#endif
1614}
1615
1616static void
1617removeRequestFromTables (tr_swarm * s, tr_block_index_t block, const tr_peer * peer)
1618{
1619  requestListRemove (s, block, peer);
1620  pieceListRemoveRequest (s, block);
1621}
1622
1623/* peer choked us, or maybe it disconnected.
1624   either way we need to remove all its requests */
1625static void
1626peerDeclinedAllRequests (tr_swarm * s, const tr_peer * peer)
1627{
1628  int i, n;
1629  tr_block_index_t * blocks = tr_new (tr_block_index_t, s->requestCount);
1630
1631  for (i=n=0; i<s->requestCount; ++i)
1632    if (peer == s->requests[i].peer)
1633      blocks[n++] = s->requests[i].block;
1634
1635  for (i=0; i<n; ++i)
1636    removeRequestFromTables (s, blocks[i], peer);
1637
1638  tr_free (blocks);
1639}
1640
1641static void
1642cancelAllRequestsForBlock (tr_swarm          * s,
1643                           tr_block_index_t    block,
1644                           tr_peer           * no_notify)
1645{
1646  int i;
1647  int peerCount;
1648  tr_peer ** peers;
1649  tr_ptrArray peerArr;
1650
1651  peerArr = TR_PTR_ARRAY_INIT;
1652  getBlockRequestPeers (s, block, &peerArr);
1653  peers = (tr_peer **) tr_ptrArrayPeek (&peerArr, &peerCount);
1654  for (i=0; i<peerCount; ++i)
1655    {
1656      tr_peer * p = peers[i];
1657
1658      if ((p != no_notify) && (p->msgs != NULL))
1659        {
1660          tr_historyAdd (&p->cancelsSentToPeer, tr_time (), 1);
1661          tr_peerMsgsCancel (p->msgs, block);
1662        }
1663
1664      removeRequestFromTables (s, block, p);
1665    }
1666
1667  tr_ptrArrayDestruct (&peerArr, NULL);
1668}
1669
1670void
1671tr_peerMgrPieceCompleted (tr_torrent * tor, tr_piece_index_t p)
1672{
1673  int i;
1674  int peerCount;
1675  tr_peer ** peers;
1676  tr_swarm * s = tor->swarm;
1677
1678  /* notify the peers that we now have this piece */
1679  peerCount = tr_ptrArraySize (&s->peers);
1680  peers = (tr_peer**) tr_ptrArrayBase (&s->peers);
1681  for (i=0; i<peerCount; ++i)
1682    tr_peerMsgsHave (peers[i]->msgs, p);
1683
1684  /* bookkeeping */
1685  pieceListRemovePiece (s, p);
1686  s->needsCompletenessCheck = true;
1687}
1688
1689static void
1690peerCallbackFunc (tr_peer * peer, const tr_peer_event * e, void * vs)
1691{
1692  tr_swarm * s = vs;
1693
1694  swarmLock (s);
1695
1696  assert (peer != NULL);
1697
1698  switch (e->eventType)
1699    {
1700      case TR_PEER_PEER_GOT_PIECE_DATA:
1701        {
1702          const time_t now = tr_time ();
1703          tr_torrent * tor = s->tor;
1704
1705          tor->uploadedCur += e->length;
1706          tr_announcerAddBytes (tor, TR_ANN_UP, e->length);
1707          tr_torrentSetActivityDate (tor, now);
1708          tr_torrentSetDirty (tor);
1709          tr_statsAddUploaded (tor->session, e->length);
1710
1711          if (peer->atom != NULL)
1712            peer->atom->piece_data_time = now;
1713
1714          break;
1715        }
1716
1717      case TR_PEER_CLIENT_GOT_PIECE_DATA:
1718        {
1719          const time_t now = tr_time ();
1720          tr_torrent * tor = s->tor;
1721
1722          tor->downloadedCur += e->length;
1723          tr_torrentSetActivityDate (tor, now);
1724          tr_torrentSetDirty (tor);
1725
1726          tr_statsAddDownloaded (tor->session, e->length);
1727
1728          if (peer->atom != NULL)
1729            peer->atom->piece_data_time = now;
1730
1731          break;
1732        }
1733
1734      case TR_PEER_CLIENT_GOT_HAVE:
1735        if (replicationExists (s))
1736          {
1737            tr_incrReplicationOfPiece (s, e->pieceIndex);
1738            assertReplicationCountIsExact (s);
1739          }
1740        break;
1741
1742      case TR_PEER_CLIENT_GOT_HAVE_ALL:
1743        if (replicationExists (s))
1744          {
1745            tr_incrReplication (s);
1746            assertReplicationCountIsExact (s);
1747          }
1748        break;
1749
1750      case TR_PEER_CLIENT_GOT_HAVE_NONE:
1751        /* noop */
1752        break;
1753
1754      case TR_PEER_CLIENT_GOT_BITFIELD:
1755        assert (e->bitfield != NULL);
1756        if (replicationExists (s))
1757          {
1758            tr_incrReplicationFromBitfield (s, e->bitfield);
1759            assertReplicationCountIsExact (s);
1760          }
1761        break;
1762
1763      case TR_PEER_CLIENT_GOT_REJ:
1764        {
1765          tr_block_index_t b = _tr_block (s->tor, e->pieceIndex, e->offset);
1766          if (b < s->tor->blockCount)
1767            removeRequestFromTables (s, b, peer);
1768          else
1769            tordbg (s, "Peer %s sent an out-of-range reject message",
1770                    tr_atomAddrStr (peer->atom));
1771          break;
1772        }
1773
1774      case TR_PEER_CLIENT_GOT_CHOKE:
1775        peerDeclinedAllRequests (s, peer);
1776        break;
1777
1778      case TR_PEER_CLIENT_GOT_PORT:
1779        if (peer->atom)
1780          peer->atom->port = e->port;
1781        break;
1782
1783      case TR_PEER_CLIENT_GOT_SUGGEST:
1784        peerSuggestedPiece (s, peer, e->pieceIndex, false);
1785        break;
1786
1787      case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1788        peerSuggestedPiece (s, peer, e->pieceIndex, true);
1789        break;
1790
1791      case TR_PEER_CLIENT_GOT_BLOCK:
1792        {
1793          tr_torrent * tor = s->tor;
1794          const tr_piece_index_t p = e->pieceIndex;
1795          const tr_block_index_t block = _tr_block (tor, p, e->offset);
1796          if (peer->msgs != NULL) /* webseed downloads don't belong in announce totals */
1797            tr_announcerAddBytes (tor, TR_ANN_DOWN, tr_torPieceCountBytes (tor, p));
1798          cancelAllRequestsForBlock (s, block, peer);
1799          tr_historyAdd (&peer->blocksSentToClient, tr_time(), 1);
1800          pieceListResortPiece (s, pieceListLookup (s, p));
1801          tr_torrentGotBlock (tor, block);
1802          break;
1803        }
1804
1805      case TR_PEER_ERROR:
1806        if ((e->err == ERANGE) || (e->err == EMSGSIZE) || (e->err == ENOTCONN))
1807          {
1808            /* some protocol error from the peer */
1809            peer->doPurge = 1;
1810            tordbg (s, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1811                    tr_atomAddrStr (peer->atom));
1812          }
1813        else
1814          {
1815            tordbg (s, "unhandled error: %s", tr_strerror (e->err));
1816          }
1817        break;
1818
1819      default:
1820        assert (0);
1821    }
1822
1823  swarmUnlock (s);
1824}
1825
1826static int
1827getDefaultShelfLife (uint8_t from)
1828{
1829    /* in general, peers obtained from firsthand contact
1830     * are better than those from secondhand, etc etc */
1831    switch (from)
1832    {
1833        case TR_PEER_FROM_INCOMING : return 60 * 60 * 6;
1834        case TR_PEER_FROM_LTEP     : return 60 * 60 * 6;
1835        case TR_PEER_FROM_TRACKER  : return 60 * 60 * 3;
1836        case TR_PEER_FROM_DHT      : return 60 * 60 * 3;
1837        case TR_PEER_FROM_PEX      : return 60 * 60 * 2;
1838        case TR_PEER_FROM_RESUME   : return 60 * 60;
1839        case TR_PEER_FROM_LPD      : return 10 * 60;
1840        default                    : return 60 * 60;
1841    }
1842}
1843
1844static void
1845ensureAtomExists (tr_swarm          * s,
1846                  const tr_address  * addr,
1847                  const tr_port       port,
1848                  const uint8_t       flags,
1849                  const int8_t        seedProbability,
1850                  const uint8_t       from)
1851{
1852  struct peer_atom * a;
1853
1854  assert (tr_address_is_valid (addr));
1855  assert (from < TR_PEER_FROM__MAX);
1856
1857  a = getExistingAtom (s, addr);
1858
1859  if (a == NULL)
1860    {
1861      const int jitter = tr_cryptoWeakRandInt (60*10);
1862      a = tr_new0 (struct peer_atom, 1);
1863      a->addr = *addr;
1864      a->port = port;
1865      a->flags = flags;
1866      a->fromFirst = from;
1867      a->fromBest = from;
1868      a->shelf_date = tr_time () + getDefaultShelfLife (from) + jitter;
1869      a->blocklisted = -1;
1870      atomSetSeedProbability (a, seedProbability);
1871      tr_ptrArrayInsertSorted (&s->pool, a, compareAtomsByAddress);
1872
1873      tordbg (s, "got a new atom: %s", tr_atomAddrStr (a));
1874    }
1875  else
1876    {
1877      if (from < a->fromBest)
1878        a->fromBest = from;
1879
1880      if (a->seedProbability == -1)
1881        atomSetSeedProbability (a, seedProbability);
1882
1883      a->flags |= flags;
1884    }
1885}
1886
1887static int
1888getMaxPeerCount (const tr_torrent * tor)
1889{
1890  return tor->maxConnectedPeers;
1891}
1892
1893static int
1894getPeerCount (const tr_swarm * s)
1895{
1896  return tr_ptrArraySize (&s->peers);/* + tr_ptrArraySize (&t->outgoingHandshakes); */
1897}
1898
1899/* FIXME: this is kind of a mess. */
1900static bool
1901myHandshakeDoneCB (tr_handshake  * handshake,
1902                   tr_peerIo     * io,
1903                   bool            readAnythingFromPeer,
1904                   bool            isConnected,
1905                   const uint8_t * peer_id,
1906                   void          * vmanager)
1907{
1908  bool ok = isConnected;
1909  bool success = false;
1910  tr_port port;
1911  const tr_address * addr;
1912  tr_peerMgr * manager = vmanager;
1913  tr_swarm  * s;
1914  tr_handshake * ours;
1915
1916  assert (io);
1917  assert (tr_isBool (ok));
1918
1919  s = tr_peerIoHasTorrentHash (io)
1920    ? getExistingSwarm (manager, tr_peerIoGetTorrentHash (io))
1921    : NULL;
1922
1923  if (tr_peerIoIsIncoming (io))
1924    ours = tr_ptrArrayRemoveSorted (&manager->incomingHandshakes,
1925                                    handshake, handshakeCompare);
1926  else if (s)
1927    ours = tr_ptrArrayRemoveSorted (&s->outgoingHandshakes,
1928                                    handshake, handshakeCompare);
1929  else
1930    ours = handshake;
1931
1932  assert (ours);
1933  assert (ours == handshake);
1934
1935  if (s)
1936    swarmLock (s);
1937
1938  addr = tr_peerIoGetAddress (io, &port);
1939
1940  if (!ok || !s || !s->isRunning)
1941    {
1942      if (s)
1943        {
1944          struct peer_atom * atom = getExistingAtom (s, addr);
1945          if (atom)
1946            {
1947              ++atom->numFails;
1948
1949              if (!readAnythingFromPeer)
1950                {
1951                  tordbg (s, "marking peer %s as unreachable... numFails is %d", tr_atomAddrStr (atom), (int)atom->numFails);
1952                  atom->flags2 |= MYFLAG_UNREACHABLE;
1953                }
1954            }
1955        }
1956    }
1957  else /* looking good */
1958    {
1959      struct peer_atom * atom;
1960
1961      ensureAtomExists (s, addr, port, 0, -1, TR_PEER_FROM_INCOMING);
1962      atom = getExistingAtom (s, addr);
1963      atom->time = tr_time ();
1964      atom->piece_data_time = 0;
1965      atom->lastConnectionAt = tr_time ();
1966
1967      if (!tr_peerIoIsIncoming (io))
1968        {
1969          atom->flags |= ADDED_F_CONNECTABLE;
1970          atom->flags2 &= ~MYFLAG_UNREACHABLE;
1971        }
1972
1973      /* In principle, this flag specifies whether the peer groks uTP,
1974         not whether it's currently connected over uTP. */
1975      if (io->utp_socket)
1976        atom->flags |= ADDED_F_UTP_FLAGS;
1977
1978      if (atom->flags2 & MYFLAG_BANNED)
1979        {
1980          tordbg (s, "banned peer %s tried to reconnect",
1981                  tr_atomAddrStr (atom));
1982        }
1983      else if (tr_peerIoIsIncoming (io) && (getPeerCount (s) >= getMaxPeerCount (s->tor)))
1984        {
1985        }
1986      else
1987        {
1988          tr_peer * peer = atom->peer;
1989
1990          if (peer) /* we already have this peer */
1991            {
1992            }
1993          else
1994            {
1995              peer = getPeer (s, atom);
1996
1997              if (!peer_id)
1998                peer->client = TR_KEY_NONE;
1999              else
2000                {
2001                  char client[128];
2002                  tr_clientForId (client, sizeof (client), peer_id);
2003                  peer->client = tr_quark_new (client, -1);
2004                }
2005
2006              peer->io = tr_handshakeStealIO (handshake); /* this steals its refcount too, which is
2007                                                                balanced by our unref in peerDelete ()  */
2008              tr_peerIoSetParent (peer->io, &s->tor->bandwidth);
2009              tr_peerMsgsNew (s->tor, peer, peerCallbackFunc, s);
2010
2011              success = true;
2012            }
2013        }
2014    }
2015
2016  if (s != NULL)
2017    swarmUnlock (s);
2018
2019  return success;
2020}
2021
2022void
2023tr_peerMgrAddIncoming (tr_peerMgr       * manager,
2024                       tr_address       * addr,
2025                       tr_port            port,
2026                       int                socket,
2027                       struct UTPSocket * utp_socket)
2028{
2029  tr_session * session;
2030
2031  managerLock (manager);
2032
2033  assert (tr_isSession (manager->session));
2034  session = manager->session;
2035
2036  if (tr_sessionIsAddressBlocked (session, addr))
2037    {
2038      tr_logAddDebug ("Banned IP address \"%s\" tried to connect to us", tr_address_to_string (addr));
2039      if (socket >= 0)
2040        tr_netClose (session, socket);
2041      else
2042        UTP_Close (utp_socket);
2043    }
2044  else if (getExistingHandshake (&manager->incomingHandshakes, addr))
2045    {
2046      if (socket >= 0)
2047        tr_netClose (session, socket);
2048      else
2049        UTP_Close (utp_socket);
2050    }
2051  else /* we don't have a connection to them yet... */
2052    {
2053      tr_peerIo *    io;
2054      tr_handshake * handshake;
2055
2056      io = tr_peerIoNewIncoming (session, &session->bandwidth, addr, port, socket, utp_socket);
2057
2058      handshake = tr_handshakeNew (io,
2059                                   session->encryptionMode,
2060                                   myHandshakeDoneCB,
2061                                   manager);
2062
2063      tr_peerIoUnref (io); /* balanced by the implicit ref in tr_peerIoNewIncoming () */
2064
2065      tr_ptrArrayInsertSorted (&manager->incomingHandshakes, handshake,
2066                               handshakeCompare);
2067    }
2068
2069  managerUnlock (manager);
2070}
2071
2072void
2073tr_peerMgrAddPex (tr_torrent * tor, uint8_t from,
2074                  const tr_pex * pex, int8_t seedProbability)
2075{
2076  if (tr_isPex (pex)) /* safeguard against corrupt data */
2077    {
2078      tr_swarm * s = tor->swarm;
2079      managerLock (s->manager);
2080
2081      if (!tr_sessionIsAddressBlocked (s->manager->session, &pex->addr))
2082        if (tr_address_is_valid_for_peers (&pex->addr, pex->port))
2083          ensureAtomExists (s, &pex->addr, pex->port, pex->flags, seedProbability, from);
2084
2085      managerUnlock (s->manager);
2086    }
2087}
2088
2089void
2090tr_peerMgrMarkAllAsSeeds (tr_torrent * tor)
2091{
2092  tr_swarm * s = tor->swarm;
2093  const int n = tr_ptrArraySize (&s->pool);
2094  struct peer_atom ** it = (struct peer_atom**) tr_ptrArrayBase (&s->pool);
2095  struct peer_atom ** end = it + n;
2096
2097  while (it != end)
2098    atomSetSeed (s, *it++);
2099}
2100
2101tr_pex *
2102tr_peerMgrCompactToPex (const void    * compact,
2103                        size_t          compactLen,
2104                        const uint8_t * added_f,
2105                        size_t          added_f_len,
2106                        size_t        * pexCount)
2107{
2108  size_t i;
2109  size_t n = compactLen / 6;
2110  const uint8_t * walk = compact;
2111  tr_pex * pex = tr_new0 (tr_pex, n);
2112
2113  for (i=0; i<n; ++i)
2114    {
2115      pex[i].addr.type = TR_AF_INET;
2116      memcpy (&pex[i].addr.addr, walk, 4); walk += 4;
2117      memcpy (&pex[i].port, walk, 2); walk += 2;
2118      if (added_f && (n == added_f_len))
2119        pex[i].flags = added_f[i];
2120    }
2121
2122  *pexCount = n;
2123  return pex;
2124}
2125
2126tr_pex *
2127tr_peerMgrCompact6ToPex (const void    * compact,
2128                         size_t          compactLen,
2129                         const uint8_t * added_f,
2130                         size_t          added_f_len,
2131                         size_t        * pexCount)
2132{
2133  size_t i;
2134  size_t n = compactLen / 18;
2135  const uint8_t * walk = compact;
2136  tr_pex * pex = tr_new0 (tr_pex, n);
2137
2138  for (i=0; i<n; ++i)
2139    {
2140      pex[i].addr.type = TR_AF_INET6;
2141      memcpy (&pex[i].addr.addr.addr6.s6_addr, walk, 16); walk += 16;
2142      memcpy (&pex[i].port, walk, 2); walk += 2;
2143      if (added_f && (n == added_f_len))
2144        pex[i].flags = added_f[i];
2145    }
2146
2147  *pexCount = n;
2148  return pex;
2149}
2150
2151tr_pex *
2152tr_peerMgrArrayToPex (const void  * array,
2153                      size_t        arrayLen,
2154                      size_t      * pexCount)
2155{
2156  size_t i;
2157  size_t n = arrayLen / (sizeof (tr_address) + 2);
2158  const uint8_t * walk = array;
2159  tr_pex * pex = tr_new0 (tr_pex, n);
2160
2161  for (i=0 ; i<n ; ++i)
2162    {
2163      memcpy (&pex[i].addr, walk, sizeof (tr_address));
2164      memcpy (&pex[i].port, walk + sizeof (tr_address), 2);
2165      pex[i].flags = 0x00;
2166      walk += sizeof (tr_address) + 2;
2167    }
2168
2169  *pexCount = n;
2170  return pex;
2171}
2172
2173/**
2174***
2175**/
2176
2177void
2178tr_peerMgrGotBadPiece (tr_torrent * tor, tr_piece_index_t pieceIndex)
2179{
2180  int i;
2181  int n;
2182  tr_swarm * s = tor->swarm;
2183  const uint32_t byteCount = tr_torPieceCountBytes (tor, pieceIndex);
2184
2185  for (i=0, n=tr_ptrArraySize(&s->peers); i!=n; ++i)
2186    {
2187      tr_peer * peer = tr_ptrArrayNth (&s->peers, i);
2188
2189      if (tr_bitfieldHas (&peer->blame, pieceIndex))
2190        {
2191          tordbg (s, "peer %s contributed to corrupt piece (%d); now has %d strikes",
2192                  tr_atomAddrStr(peer->atom), pieceIndex, (int)peer->strikes + 1);
2193          addStrike (s, peer);
2194        }
2195    }
2196
2197
2198  tr_announcerAddBytes (tor, TR_ANN_CORRUPT, byteCount);
2199}
2200
2201int
2202tr_pexCompare (const void * va, const void * vb)
2203{
2204  int i;
2205  const tr_pex * a = va;
2206  const tr_pex * b = vb;
2207
2208  assert (tr_isPex (a));
2209  assert (tr_isPex (b));
2210
2211  if ((i = tr_address_compare (&a->addr, &b->addr)))
2212    return i;
2213
2214  if (a->port != b->port)
2215    return a->port < b->port ? -1 : 1;
2216
2217  return 0;
2218}
2219
2220/* better goes first */
2221static int
2222compareAtomsByUsefulness (const void * va, const void *vb)
2223{
2224  const struct peer_atom * a = * (const struct peer_atom**) va;
2225  const struct peer_atom * b = * (const struct peer_atom**) vb;
2226
2227  assert (tr_isAtom (a));
2228  assert (tr_isAtom (b));
2229
2230  if (a->piece_data_time != b->piece_data_time)
2231    return a->piece_data_time > b->piece_data_time ? -1 : 1;
2232  if (a->fromBest != b->fromBest)
2233    return a->fromBest < b->fromBest ? -1 : 1;
2234  if (a->numFails != b->numFails)
2235    return a->numFails < b->numFails ? -1 : 1;
2236
2237  return 0;
2238}
2239
2240static bool
2241isAtomInteresting (const tr_torrent * tor, struct peer_atom * atom)
2242{
2243  if (tr_torrentIsSeed (tor) && atomIsSeed (atom))
2244    return false;
2245
2246  if (peerIsInUse (tor->swarm, atom))
2247    return true;
2248
2249  if (isAtomBlocklisted (tor->session, atom))
2250    return false;
2251
2252  if (atom->flags2 & MYFLAG_BANNED)
2253    return false;
2254
2255  return true;
2256}
2257
2258int
2259tr_peerMgrGetPeers (tr_torrent   * tor,
2260                    tr_pex      ** setme_pex,
2261                    uint8_t        af,
2262                    uint8_t        list_mode,
2263                    int            maxCount)
2264{
2265  int i;
2266  int n;
2267  int count = 0;
2268  int atomCount = 0;
2269  const tr_swarm * s = tor->swarm;
2270  struct peer_atom ** atoms = NULL;
2271  tr_pex * pex;
2272  tr_pex * walk;
2273
2274  assert (tr_isTorrent (tor));
2275  assert (setme_pex != NULL);
2276  assert (af==TR_AF_INET || af==TR_AF_INET6);
2277  assert (list_mode==TR_PEERS_CONNECTED || list_mode==TR_PEERS_INTERESTING);
2278
2279  managerLock (s->manager);
2280
2281  /**
2282  ***  build a list of atoms
2283  **/
2284
2285  if (list_mode == TR_PEERS_CONNECTED) /* connected peers only */
2286    {
2287      int i;
2288      const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase (&s->peers);
2289      atomCount = tr_ptrArraySize (&s->peers);
2290      atoms = tr_new (struct peer_atom *, atomCount);
2291      for (i=0; i<atomCount; ++i)
2292        atoms[i] = peers[i]->atom;
2293    }
2294  else /* TR_PEERS_INTERESTING */
2295    {
2296      int i;
2297      struct peer_atom ** atomBase = (struct peer_atom**) tr_ptrArrayBase (&s->pool);
2298      n = tr_ptrArraySize (&s->pool);
2299      atoms = tr_new (struct peer_atom *, n);
2300      for (i=0; i<n; ++i)
2301        if (isAtomInteresting (tor, atomBase[i]))
2302          atoms[atomCount++] = atomBase[i];
2303    }
2304
2305  qsort (atoms, atomCount, sizeof (struct peer_atom *), compareAtomsByUsefulness);
2306
2307  /**
2308  ***  add the first N of them into our return list
2309  **/
2310
2311  n = MIN (atomCount, maxCount);
2312  pex = walk = tr_new0 (tr_pex, n);
2313
2314  for (i=0; i<atomCount && count<n; ++i)
2315    {
2316      const struct peer_atom * atom = atoms[i];
2317      if (atom->addr.type == af)
2318        {
2319          assert (tr_address_is_valid (&atom->addr));
2320          walk->addr = atom->addr;
2321          walk->port = atom->port;
2322          walk->flags = atom->flags;
2323          ++count;
2324          ++walk;
2325        }
2326    }
2327
2328  qsort (pex, count, sizeof (tr_pex), tr_pexCompare);
2329
2330  assert ((walk - pex) == count);
2331  *setme_pex = pex;
2332
2333  /* cleanup */
2334  tr_free (atoms);
2335  managerUnlock (s->manager);
2336  return count;
2337}
2338
2339static void atomPulse    (int, short, void *);
2340static void bandwidthPulse (int, short, void *);
2341static void rechokePulse (int, short, void *);
2342static void reconnectPulse (int, short, void *);
2343
2344static struct event *
2345createTimer (tr_session * session, int msec, void (*callback)(int, short, void *), void * cbdata)
2346{
2347  struct event * timer = evtimer_new (session->event_base, callback, cbdata);
2348  tr_timerAddMsec (timer, msec);
2349  return timer;
2350}
2351
2352static void
2353ensureMgrTimersExist (struct tr_peerMgr * m)
2354{
2355  if (m->atomTimer == NULL)
2356    m->atomTimer = createTimer (m->session, ATOM_PERIOD_MSEC, atomPulse, m);
2357
2358  if (m->bandwidthTimer == NULL)
2359    m->bandwidthTimer = createTimer (m->session, BANDWIDTH_PERIOD_MSEC, bandwidthPulse, m);
2360
2361  if (m->rechokeTimer == NULL)
2362    m->rechokeTimer = createTimer (m->session, RECHOKE_PERIOD_MSEC, rechokePulse, m);
2363
2364  if (m->refillUpkeepTimer == NULL)
2365    m->refillUpkeepTimer = createTimer (m->session, REFILL_UPKEEP_PERIOD_MSEC, refillUpkeep, m);
2366}
2367
2368void
2369tr_peerMgrStartTorrent (tr_torrent * tor)
2370{
2371  tr_swarm * s;
2372
2373  assert (tr_isTorrent (tor));
2374  assert (tr_torrentIsLocked (tor));
2375
2376  s = tor->swarm;
2377  ensureMgrTimersExist (s->manager);
2378
2379  s->isRunning = true;
2380  s->maxPeers = tor->maxConnectedPeers;
2381  s->pieceSortState = PIECES_UNSORTED;
2382
2383  rechokePulse (0, 0, s->manager);
2384}
2385
2386static void
2387stopSwarm (tr_swarm * swarm)
2388{
2389  tr_peer * peer;
2390
2391  swarm->isRunning = false;
2392
2393  replicationFree (swarm);
2394  invalidatePieceSorting (swarm);
2395
2396  /* disconnect the peers. */
2397  while ((peer = tr_ptrArrayPop (&swarm->peers)))
2398    peerDelete (swarm, peer);
2399
2400  /* disconnect the handshakes. handshakeAbort calls handshakeDoneCB (),
2401   * which removes the handshake from t->outgoingHandshakes... */
2402  while (!tr_ptrArrayEmpty (&swarm->outgoingHandshakes))
2403    tr_handshakeAbort (tr_ptrArrayNth (&swarm->outgoingHandshakes, 0));
2404}
2405
2406void
2407tr_peerMgrStopTorrent (tr_torrent * tor)
2408{
2409  assert (tr_isTorrent (tor));
2410  assert (tr_torrentIsLocked (tor));
2411
2412  stopSwarm (tor->swarm);
2413}
2414
2415void
2416tr_peerMgrAddTorrent (tr_peerMgr * manager, tr_torrent * tor)
2417{
2418  assert (tr_isTorrent (tor));
2419  assert (tr_torrentIsLocked (tor));
2420  assert (tor->swarm == NULL);
2421
2422  tor->swarm = swarmNew (manager, tor);
2423}
2424
2425void
2426tr_peerMgrRemoveTorrent (tr_torrent * tor)
2427{
2428  assert (tr_isTorrent (tor));
2429  assert (tr_torrentIsLocked (tor));
2430
2431  stopSwarm (tor->swarm);
2432  swarmFree (tor->swarm);
2433}
2434
2435void
2436tr_peerUpdateProgress (tr_torrent * tor, tr_peer * peer)
2437{
2438  const tr_bitfield * have = &peer->have;
2439
2440  if (tr_bitfieldHasAll (have))
2441    {
2442      peer->progress = 1.0;
2443    }
2444  else if (tr_bitfieldHasNone (have))
2445    {
2446      peer->progress = 0.0;
2447    }
2448  else
2449    {
2450      const float true_count = tr_bitfieldCountTrueBits (have);
2451
2452      if (tr_torrentHasMetadata (tor))
2453        {
2454          peer->progress = true_count / tor->info.pieceCount;
2455        }
2456      else /* without pieceCount, this result is only a best guess... */
2457        {
2458          peer->progress = true_count / (have->bit_count + 1);
2459        }
2460    }
2461
2462  /* clamp the progress range */
2463  if (peer->progress < 0.0)
2464    peer->progress = 0.0;
2465  if (peer->progress > 1.0)
2466    peer->progress = 1.0;
2467
2468  if (peer->atom && (peer->progress >= 1.0))
2469    atomSetSeed (tor->swarm, peer->atom);
2470}
2471
2472void
2473tr_peerMgrOnTorrentGotMetainfo (tr_torrent * tor)
2474{
2475  int i;
2476  int peerCount;
2477  tr_peer ** peers;
2478
2479  /* the webseed list may have changed... */
2480  rebuildWebseedArray (tor->swarm, tor);
2481
2482  /* some peer_msgs' progress fields may not be accurate if we
2483     didn't have the metadata before now... so refresh them all... */
2484  peerCount = tr_ptrArraySize (&tor->swarm->peers);
2485  peers = (tr_peer**) tr_ptrArrayBase (&tor->swarm->peers);
2486  for (i=0; i<peerCount; ++i)
2487    tr_peerUpdateProgress (tor, peers[i]);
2488}
2489
2490void
2491tr_peerMgrTorrentAvailability (const tr_torrent  * tor,
2492                               int8_t            * tab,
2493                               unsigned int        tabCount)
2494{
2495  assert (tr_isTorrent (tor));
2496  assert (tab != NULL);
2497  assert (tabCount > 0);
2498
2499  memset (tab, 0, tabCount);
2500
2501  if (tr_torrentHasMetadata (tor))
2502    {
2503      tr_piece_index_t i;
2504      const int peerCount = tr_ptrArraySize (&tor->swarm->peers);
2505      const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase (&tor->swarm->peers);
2506      const float interval = tor->info.pieceCount / (float)tabCount;
2507      const bool isSeed = tr_cpGetStatus (&tor->completion) == TR_SEED;
2508
2509      for (i=0; i<tabCount; ++i)
2510        {
2511          const int piece = i * interval;
2512
2513          if (isSeed || tr_cpPieceIsComplete (&tor->completion, piece))
2514            {
2515              tab[i] = -1;
2516            }
2517          else if (peerCount)
2518            {
2519              int j;
2520              for (j=0; j<peerCount; ++j)
2521                if (tr_bitfieldHas (&peers[j]->have, piece))
2522                  ++tab[i];
2523            }
2524        }
2525    }
2526}
2527
2528static bool
2529peerIsSeed (const tr_peer * peer)
2530{
2531    if (peer->progress >= 1.0)
2532        return true;
2533
2534    if (peer->atom && atomIsSeed (peer->atom))
2535        return true;
2536
2537    return false;
2538}
2539
2540/* count how many bytes we want that connected peers have */
2541uint64_t
2542tr_peerMgrGetDesiredAvailable (const tr_torrent * tor)
2543{
2544  size_t i;
2545  size_t n;
2546  uint64_t desiredAvailable;
2547  const tr_swarm * s = tor->swarm;
2548
2549  /* common shortcuts... */
2550
2551  if (tr_torrentIsSeed (s->tor))
2552    return 0;
2553
2554  if (!tr_torrentHasMetadata (tor))
2555    return 0;
2556
2557  n = tr_ptrArraySize (&s->peers);
2558  if (n == 0)
2559    {
2560      return 0;
2561    }
2562  else
2563    {
2564      const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase (&s->peers);
2565      for (i=0; i<n; ++i)
2566        if (peers[i]->atom && atomIsSeed (peers[i]->atom))
2567          return tr_cpLeftUntilDone (&tor->completion);
2568    }
2569
2570  if (!s->pieceReplication || !s->pieceReplicationSize)
2571    return 0;
2572
2573  /* do it the hard way */
2574
2575  desiredAvailable = 0;
2576  for (i=0, n=MIN (tor->info.pieceCount, s->pieceReplicationSize); i<n; ++i)
2577    if (!tor->info.pieces[i].dnd && (s->pieceReplication[i] > 0))
2578      desiredAvailable += tr_cpMissingBytesInPiece (&s->tor->completion, i);
2579
2580  assert (desiredAvailable <= tor->info.totalSize);
2581  return desiredAvailable;
2582}
2583
2584void
2585tr_peerMgrTorrentStats (tr_torrent  * tor,
2586                        int         * setmePeersConnected,
2587                        int         * setmeWebseedsSendingToUs,
2588                        int         * setmePeersSendingToUs,
2589                        int         * setmePeersGettingFromUs,
2590                        int         * setmePeersFrom)
2591{
2592  int i;
2593  int size;
2594  tr_swarm * s;
2595  const tr_peer ** peers;
2596
2597  assert (tr_isTorrent (tor));
2598
2599  *setmePeersConnected       = 0;
2600  *setmePeersGettingFromUs   = 0;
2601  *setmePeersSendingToUs     = 0;
2602  *setmeWebseedsSendingToUs  = 0;
2603
2604  s = tor->swarm;
2605  size = tr_ptrArraySize (&s->peers);
2606  peers = (const tr_peer **) tr_ptrArrayBase (&s->peers);
2607
2608  for (i=0; i<TR_PEER_FROM__MAX; ++i)
2609    setmePeersFrom[i] = 0;
2610
2611  for (i=0; i<size; ++i)
2612    {
2613      const tr_peer * peer = peers[i];
2614      const struct peer_atom * atom = peer->atom;
2615
2616      if (peer->io == NULL) /* not connected */
2617        continue;
2618
2619      ++*setmePeersConnected;
2620
2621      ++setmePeersFrom[atom->fromFirst];
2622
2623      if (clientIsDownloadingFrom (tor, peer))
2624        ++*setmePeersSendingToUs;
2625
2626      if (clientIsUploadingTo (peer))
2627        ++*setmePeersGettingFromUs;
2628    }
2629
2630  *setmeWebseedsSendingToUs = countActiveWebseeds (s);
2631}
2632
2633double*
2634tr_peerMgrWebSpeeds_KBps (const tr_torrent * tor)
2635{
2636  unsigned int i;
2637  unsigned int webseedCount;
2638  const tr_swarm * s;
2639  const tr_webseed ** webseeds;
2640  double * ret = NULL;
2641  const uint64_t now = tr_time_msec ();
2642
2643  assert (tr_isTorrent (tor));
2644
2645  s = tor->swarm;
2646  webseedCount = tr_ptrArraySize (&s->webseeds);
2647  webseeds = (const tr_webseed**) tr_ptrArrayBase (&s->webseeds);
2648  ret = tr_new0 (double, webseedCount);
2649
2650  assert (s->manager != NULL);
2651  assert (webseedCount == tor->info.webseedCount);
2652
2653  for (i=0; i<webseedCount; ++i)
2654    {
2655      unsigned int Bps;
2656      if (tr_webseedGetSpeed_Bps (webseeds[i], now, &Bps))
2657        ret[i] = Bps / (double)tr_speed_K;
2658      else
2659        ret[i] = -1.0;
2660    }
2661
2662  return ret;
2663}
2664
2665unsigned int
2666tr_peerGetPieceSpeed_Bps (const tr_peer * peer, uint64_t now, tr_direction direction)
2667{
2668  return peer->io ? tr_peerIoGetPieceSpeed_Bps (peer->io, now, direction) : 0.0;
2669}
2670
2671struct tr_peer_stat *
2672tr_peerMgrPeerStats (const tr_torrent * tor, int * setmeCount)
2673{
2674  int i;
2675  int size = 0;
2676  tr_peer_stat * ret;
2677  const tr_swarm * s;
2678  const tr_peer ** peers;
2679  const time_t now = tr_time ();
2680  const uint64_t now_msec = tr_time_msec ();
2681
2682  assert (tr_isTorrent (tor));
2683  assert (tor->swarm->manager != NULL);
2684
2685  s = tor->swarm;
2686  peers = (const tr_peer**) tr_ptrArrayBase (&s->peers);
2687  size = tr_ptrArraySize (&s->peers);
2688  ret = tr_new0 (tr_peer_stat, size);
2689
2690  for (i=0; i<size; ++i)
2691    {
2692      char *                   pch;
2693      const tr_peer *          peer = peers[i];
2694      const struct peer_atom * atom = peer->atom;
2695      tr_peer_stat *           stat = ret + i;
2696
2697      tr_address_to_string_with_buf (&atom->addr, stat->addr, sizeof (stat->addr));
2698      tr_strlcpy (stat->client, tr_quark_get_string(peer->client,NULL), sizeof (stat->client));
2699      stat->port                = ntohs (peer->atom->port);
2700      stat->from                = atom->fromFirst;
2701      stat->progress            = peer->progress;
2702      stat->isUTP               = peer->io->utp_socket != NULL;
2703      stat->isEncrypted         = tr_peerIoIsEncrypted (peer->io) ? 1 : 0;
2704      stat->rateToPeer_KBps     = toSpeedKBps (tr_peerGetPieceSpeed_Bps (peer, now_msec, TR_CLIENT_TO_PEER));
2705      stat->rateToClient_KBps   = toSpeedKBps (tr_peerGetPieceSpeed_Bps (peer, now_msec, TR_PEER_TO_CLIENT));
2706      stat->peerIsChoked        = peer->peerIsChoked;
2707      stat->peerIsInterested    = peer->peerIsInterested;
2708      stat->clientIsChoked      = peer->clientIsChoked;
2709      stat->clientIsInterested  = peer->clientIsInterested;
2710      stat->isIncoming          = tr_peerIoIsIncoming (peer->io);
2711      stat->isDownloadingFrom   = clientIsDownloadingFrom (tor, peer);
2712      stat->isUploadingTo       = clientIsUploadingTo (peer);
2713      stat->isSeed              = peerIsSeed (peer);
2714
2715      stat->blocksToPeer        = tr_historyGet (&peer->blocksSentToPeer,    now, CANCEL_HISTORY_SEC);
2716      stat->blocksToClient      = tr_historyGet (&peer->blocksSentToClient,  now, CANCEL_HISTORY_SEC);
2717      stat->cancelsToPeer       = tr_historyGet (&peer->cancelsSentToPeer,   now, CANCEL_HISTORY_SEC);
2718      stat->cancelsToClient     = tr_historyGet (&peer->cancelsSentToClient, now, CANCEL_HISTORY_SEC);
2719
2720      stat->pendingReqsToPeer   = peer->pendingReqsToPeer;
2721      stat->pendingReqsToClient = peer->pendingReqsToClient;
2722
2723      pch = stat->flagStr;
2724      if (stat->isUTP) *pch++ = 'T';
2725      if (s->optimistic == peer) *pch++ = 'O';
2726      if (stat->isDownloadingFrom) *pch++ = 'D';
2727      else if (stat->clientIsInterested) *pch++ = 'd';
2728      if (stat->isUploadingTo) *pch++ = 'U';
2729      else if (stat->peerIsInterested) *pch++ = 'u';
2730      if (!stat->clientIsChoked && !stat->clientIsInterested) *pch++ = 'K';
2731      if (!stat->peerIsChoked && !stat->peerIsInterested) *pch++ = '?';
2732      if (stat->isEncrypted) *pch++ = 'E';
2733      if (stat->from == TR_PEER_FROM_DHT) *pch++ = 'H';
2734      else if (stat->from == TR_PEER_FROM_PEX) *pch++ = 'X';
2735      if (stat->isIncoming) *pch++ = 'I';
2736      *pch = '\0';
2737    }
2738
2739  *setmeCount = size;
2740  return ret;
2741}
2742
2743/***
2744****
2745****
2746***/
2747
2748void
2749tr_peerMgrClearInterest (tr_torrent * tor)
2750{
2751  int i;
2752  tr_swarm * s = tor->swarm;
2753  const int peerCount = tr_ptrArraySize (&s->peers);
2754
2755  assert (tr_isTorrent (tor));
2756  assert (tr_torrentIsLocked (tor));
2757
2758  for (i=0; i<peerCount; ++i)
2759    {
2760      const tr_peer * peer = tr_ptrArrayNth (&s->peers, i);
2761      tr_peerMsgsSetInterested (peer->msgs, false);
2762    }
2763}
2764
2765/* does this peer have any pieces that we want? */
2766static bool
2767isPeerInteresting (tr_torrent     * const tor,
2768                   const bool     * const piece_is_interesting,
2769                   const tr_peer  * const peer)
2770{
2771  tr_piece_index_t i, n;
2772
2773  /* these cases should have already been handled by the calling code... */
2774  assert (!tr_torrentIsSeed (tor));
2775  assert (tr_torrentIsPieceTransferAllowed (tor, TR_PEER_TO_CLIENT));
2776
2777  if (peerIsSeed (peer))
2778    return true;
2779
2780  for (i=0, n=tor->info.pieceCount; i<n; ++i)
2781    if (piece_is_interesting[i] && tr_bitfieldHas (&peer->have, i))
2782      return true;
2783
2784  return false;
2785}
2786
2787typedef enum
2788{
2789  RECHOKE_STATE_GOOD,
2790  RECHOKE_STATE_UNTESTED,
2791  RECHOKE_STATE_BAD
2792}
2793tr_rechoke_state;
2794
2795struct tr_rechoke_info
2796{
2797  tr_peer * peer;
2798  int salt;
2799  int rechoke_state;
2800};
2801
2802static int
2803compare_rechoke_info (const void * va, const void * vb)
2804{
2805  const struct tr_rechoke_info * a = va;
2806  const struct tr_rechoke_info * b = vb;
2807
2808  if (a->rechoke_state != b->rechoke_state)
2809    return a->rechoke_state - b->rechoke_state;
2810
2811  return a->salt - b->salt;
2812}
2813
2814/* determines who we send "interested" messages to */
2815static void
2816rechokeDownloads (tr_swarm * s)
2817{
2818  int i;
2819  int maxPeers = 0;
2820  int rechoke_count = 0;
2821  struct tr_rechoke_info * rechoke = NULL;
2822  const int MIN_INTERESTING_PEERS = 5;
2823  const int peerCount = tr_ptrArraySize (&s->peers);
2824  const time_t now = tr_time ();
2825
2826  /* some cases where this function isn't necessary */
2827  if (tr_torrentIsSeed (s->tor))
2828    return;
2829  if (!tr_torrentIsPieceTransferAllowed (s->tor, TR_PEER_TO_CLIENT))
2830    return;
2831
2832  /* decide HOW MANY peers to be interested in */
2833  {
2834    int blocks = 0;
2835    int cancels = 0;
2836    time_t timeSinceCancel;
2837
2838    /* Count up how many blocks & cancels each peer has.
2839     *
2840     * There are two situations where we send out cancels --
2841     *
2842     * 1. We've got unresponsive peers, which is handled by deciding
2843     *    -which- peers to be interested in.
2844     *
2845     * 2. We've hit our bandwidth cap, which is handled by deciding
2846     *    -how many- peers to be interested in.
2847     *
2848     * We're working on 2. here, so we need to ignore unresponsive
2849     * peers in our calculations lest they confuse Transmission into
2850     * thinking it's hit its bandwidth cap.
2851     */
2852    for (i=0; i<peerCount; ++i)
2853      {
2854        const tr_peer * peer = tr_ptrArrayNth (&s->peers, i);
2855        const int b = tr_historyGet (&peer->blocksSentToClient, now, CANCEL_HISTORY_SEC);
2856        const int c = tr_historyGet (&peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC);
2857
2858        if (b == 0) /* ignore unresponsive peers, as described above */
2859          continue;
2860
2861        blocks += b;
2862        cancels += c;
2863      }
2864
2865    if (cancels > 0)
2866      {
2867        /* cancelRate: of the block requests we've recently made, the percentage we cancelled.
2868         * higher values indicate more congestion. */
2869        const double cancelRate = cancels / (double)(cancels + blocks);
2870        const double mult = 1 - MIN (cancelRate, 0.5);
2871        maxPeers = s->interestedCount * mult;
2872        tordbg (s, "cancel rate is %.3f -- reducing the "
2873                   "number of peers we're interested in by %.0f percent",
2874                   cancelRate, mult * 100);
2875        s->lastCancel = now;
2876      }
2877
2878    timeSinceCancel = now - s->lastCancel;
2879    if (timeSinceCancel)
2880      {
2881        const int maxIncrease = 15;
2882        const time_t maxHistory = 2 * CANCEL_HISTORY_SEC;
2883        const double mult = MIN (timeSinceCancel, maxHistory) / (double) maxHistory;
2884        const int inc = maxIncrease * mult;
2885        maxPeers = s->maxPeers + inc;
2886        tordbg (s, "time since last cancel is %li -- increasing the "
2887                   "number of peers we're interested in by %d",
2888                   timeSinceCancel, inc);
2889      }
2890  }
2891
2892  /* don't let the previous section's number tweaking go too far... */
2893  if (maxPeers < MIN_INTERESTING_PEERS)
2894    maxPeers = MIN_INTERESTING_PEERS;
2895  if (maxPeers > s->tor->maxConnectedPeers)
2896    maxPeers = s->tor->maxConnectedPeers;
2897
2898  s->maxPeers = maxPeers;
2899
2900  if (peerCount > 0)
2901    {
2902      bool * piece_is_interesting;
2903      const tr_torrent * const tor = s->tor;
2904      const int n = tor->info.pieceCount;
2905
2906      /* build a bitfield of interesting pieces... */
2907      piece_is_interesting = tr_new (bool, n);
2908      for (i=0; i<n; i++)
2909        piece_is_interesting[i] = !tor->info.pieces[i].dnd && !tr_cpPieceIsComplete (&tor->completion, i);
2910
2911      /* decide WHICH peers to be interested in (based on their cancel-to-block ratio) */
2912      for (i=0; i<peerCount; ++i)
2913        {
2914          tr_peer * peer = tr_ptrArrayNth (&s->peers, i);
2915
2916          if (!isPeerInteresting (s->tor, piece_is_interesting, peer))
2917            {
2918              tr_peerMsgsSetInterested (peer->msgs, false);
2919            }
2920          else
2921            {
2922              tr_rechoke_state rechoke_state;
2923              const int blocks = tr_historyGet (&peer->blocksSentToClient, now, CANCEL_HISTORY_SEC);
2924              const int cancels = tr_historyGet (&peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC);
2925
2926              if (!blocks && !cancels)
2927                rechoke_state = RECHOKE_STATE_UNTESTED;
2928              else if (!cancels)
2929                rechoke_state = RECHOKE_STATE_GOOD;
2930              else if (!blocks)
2931                rechoke_state = RECHOKE_STATE_BAD;
2932              else if ((cancels * 10) < blocks)
2933                rechoke_state = RECHOKE_STATE_GOOD;
2934              else
2935                rechoke_state = RECHOKE_STATE_BAD;
2936
2937              if (rechoke == NULL)
2938                rechoke = tr_new (struct tr_rechoke_info, peerCount);
2939
2940              rechoke[rechoke_count].peer = peer;
2941              rechoke[rechoke_count].rechoke_state = rechoke_state;
2942              rechoke[rechoke_count].salt = tr_cryptoWeakRandInt (INT_MAX);
2943              rechoke_count++;
2944            }
2945
2946        }
2947
2948      tr_free (piece_is_interesting);
2949    }
2950
2951  /* now that we know which & how many peers to be interested in... update the peer interest */
2952  qsort (rechoke, rechoke_count, sizeof (struct tr_rechoke_info), compare_rechoke_info);
2953  s->interestedCount = MIN (maxPeers, rechoke_count);
2954  for (i=0; i<rechoke_count; ++i)
2955    tr_peerMsgsSetInterested (rechoke[i].peer->msgs, i<s->interestedCount);
2956
2957  /* cleanup */
2958  tr_free (rechoke);
2959}
2960
2961/**
2962***
2963**/
2964
2965struct ChokeData
2966{
2967  bool      isInterested;
2968  bool      wasChoked;
2969  bool      isChoked;
2970  int       rate;
2971  int       salt;
2972  tr_peer * peer;
2973};
2974
2975static int
2976compareChoke (const void * va, const void * vb)
2977{
2978  const struct ChokeData * a = va;
2979  const struct ChokeData * b = vb;
2980
2981  if (a->rate != b->rate) /* prefer higher overall speeds */
2982    return a->rate > b->rate ? -1 : 1;
2983
2984  if (a->wasChoked != b->wasChoked) /* prefer unchoked */
2985    return a->wasChoked ? 1 : -1;
2986
2987  if (a->salt != b->salt) /* random order */
2988    return a->salt - b->salt;
2989
2990  return 0;
2991}
2992
2993/* is this a new connection? */
2994static int
2995isNew (const tr_peer * peer)
2996{
2997  return peer && peer->io && tr_peerIoGetAge (peer->io) < 45;
2998}
2999
3000/* get a rate for deciding which peers to choke and unchoke. */
3001static int
3002getRate (const tr_torrent * tor, struct peer_atom * atom, uint64_t now)
3003{
3004  unsigned int Bps;
3005
3006  if (tr_torrentIsSeed (tor))
3007    Bps = tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_CLIENT_TO_PEER);
3008
3009  /* downloading a private torrent... take upload speed into account
3010   * because there may only be a small window of opportunity to share */
3011  else if (tr_torrentIsPrivate (tor))
3012    Bps = tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_PEER_TO_CLIENT)
3013        + tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_CLIENT_TO_PEER);
3014
3015  /* downloading a public torrent */
3016  else
3017    Bps = tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_PEER_TO_CLIENT);
3018
3019  /* convert it to bytes per second */
3020  return Bps;
3021}
3022
3023static inline bool
3024isBandwidthMaxedOut (const tr_bandwidth * b,
3025                     const uint64_t now_msec, tr_direction dir)
3026{
3027  if (!tr_bandwidthIsLimited (b, dir))
3028    {
3029      return false;
3030    }
3031  else
3032    {
3033      const unsigned int got = tr_bandwidthGetPieceSpeed_Bps (b, now_msec, dir);
3034      const unsigned int want = tr_bandwidthGetDesiredSpeed_Bps (b, dir);
3035      return got >= want;
3036    }
3037}
3038
3039static void
3040rechokeUploads (tr_swarm * s, const uint64_t now)
3041{
3042  int i, size, unchokedInterested;
3043  const int peerCount = tr_ptrArraySize (&s->peers);
3044  tr_peer ** peers = (tr_peer**) tr_ptrArrayBase (&s->peers);
3045  struct ChokeData * choke = tr_new0 (struct ChokeData, peerCount);
3046  const tr_session * session = s->manager->session;
3047  const int chokeAll = !tr_torrentIsPieceTransferAllowed (s->tor, TR_CLIENT_TO_PEER);
3048  const bool isMaxedOut = isBandwidthMaxedOut (&s->tor->bandwidth, now, TR_UP);
3049
3050  assert (swarmIsLocked (s));
3051
3052  /* an optimistic unchoke peer's "optimistic"
3053   * state lasts for N calls to rechokeUploads (). */
3054  if (s->optimisticUnchokeTimeScaler > 0)
3055    s->optimisticUnchokeTimeScaler--;
3056  else
3057    s->optimistic = NULL;
3058
3059  /* sort the peers by preference and rate */
3060  for (i=0, size=0; i<peerCount; ++i)
3061    {
3062      tr_peer * peer = peers[i];
3063      struct peer_atom * atom = peer->atom;
3064
3065      if (peerIsSeed (peer)) /* choke seeds and partial seeds */
3066        {
3067          tr_peerMsgsSetChoke (peer->msgs, true);
3068        }
3069      else if (chokeAll) /* choke everyone if we're not uploading */
3070        {
3071          tr_peerMsgsSetChoke (peer->msgs, true);
3072        }
3073      else if (peer != s->optimistic)
3074        {
3075          struct ChokeData * n = &choke[size++];
3076          n->peer         = peer;
3077          n->isInterested = peer->peerIsInterested;
3078          n->wasChoked    = peer->peerIsChoked;
3079          n->rate         = getRate (s->tor, atom, now);
3080          n->salt         = tr_cryptoWeakRandInt (INT_MAX);
3081          n->isChoked     = true;
3082        }
3083    }
3084
3085  qsort (choke, size, sizeof (struct ChokeData), compareChoke);
3086
3087  /**
3088   * Reciprocation and number of uploads capping is managed by unchoking
3089   * the N peers which have the best upload rate and are interested.
3090   * This maximizes the client's download rate. These N peers are
3091   * referred to as downloaders, because they are interested in downloading
3092   * from the client.
3093   *
3094   * Peers which have a better upload rate (as compared to the downloaders)
3095   * but aren't interested get unchoked. If they become interested, the
3096   * downloader with the worst upload rate gets choked. If a client has
3097   * a complete file, it uses its upload rate rather than its download
3098   * rate to decide which peers to unchoke.
3099   *
3100   * If our bandwidth is maxed out, don't unchoke any more peers.
3101   */
3102  unchokedInterested = 0;
3103  for (i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i)
3104    {
3105      choke[i].isChoked = isMaxedOut ? choke[i].wasChoked : false;
3106      if (choke[i].isInterested)
3107        ++unchokedInterested;
3108    }
3109
3110  /* optimistic unchoke */
3111  if (!s->optimistic && !isMaxedOut && (i<size))
3112    {
3113      int n;
3114      struct ChokeData * c;
3115      tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
3116
3117      for (; i<size; ++i)
3118        {
3119          if (choke[i].isInterested)
3120            {
3121              const tr_peer * peer = choke[i].peer;
3122              int x = 1, y;
3123              if (isNew (peer)) x *= 3;
3124              for (y=0; y<x; ++y)
3125                tr_ptrArrayAppend (&randPool, &choke[i]);
3126            }
3127        }
3128
3129      if ((n = tr_ptrArraySize (&randPool)))
3130        {
3131          c = tr_ptrArrayNth (&randPool, tr_cryptoWeakRandInt (n));
3132          c->isChoked = false;
3133          s->optimistic = c->peer;
3134          s->optimisticUnchokeTimeScaler = OPTIMISTIC_UNCHOKE_MULTIPLIER;
3135        }
3136
3137      tr_ptrArrayDestruct (&randPool, NULL);
3138    }
3139
3140  for (i=0; i<size; ++i)
3141    tr_peerMsgsSetChoke (choke[i].peer->msgs, choke[i].isChoked);
3142
3143  /* cleanup */
3144  tr_free (choke);
3145}
3146
3147static void
3148rechokePulse (int foo UNUSED, short bar UNUSED, void * vmgr)
3149{
3150  tr_torrent * tor = NULL;
3151  tr_peerMgr * mgr = vmgr;
3152  const uint64_t now = tr_time_msec ();
3153
3154  managerLock (mgr);
3155
3156  while ((tor = tr_torrentNext (mgr->session, tor)))
3157    {
3158      if (tor->isRunning)
3159        {
3160          tr_swarm * s = tor->swarm;
3161          if (!tr_ptrArrayEmpty (&s->peers))
3162            {
3163              rechokeUploads (s, now);
3164              rechokeDownloads (s);
3165            }
3166        }
3167    }
3168
3169  tr_timerAddMsec (mgr->rechokeTimer, RECHOKE_PERIOD_MSEC);
3170  managerUnlock (mgr);
3171}
3172
3173/***
3174****
3175****  Life and Death
3176****
3177***/
3178
3179static bool
3180shouldPeerBeClosed (const tr_swarm   * s,
3181                    const tr_peer    * peer,
3182                    int                peerCount,
3183                    const time_t       now)
3184{
3185  const tr_torrent * tor = s->tor;
3186  const struct peer_atom * atom = peer->atom;
3187
3188  /* if it's marked for purging, close it */
3189  if (peer->doPurge)
3190    {
3191      tordbg (s, "purging peer %s because its doPurge flag is set",
3192              tr_atomAddrStr (atom));
3193      return true;
3194    }
3195
3196  /* disconnect if we're both seeds and enough time has passed for PEX */
3197  if (tr_torrentIsSeed (tor) && peerIsSeed (peer))
3198    return !tr_torrentAllowsPex (tor) || (now-atom->time>=30);
3199
3200  /* disconnect if it's been too long since piece data has been transferred.
3201   * this is on a sliding scale based on number of available peers... */
3202  {
3203    const int relaxStrictnessIfFewerThanN = (int)((getMaxPeerCount (tor) * 0.9) + 0.5);
3204    /* if we have >= relaxIfFewerThan, strictness is 100%.
3205     * if we have zero connections, strictness is 0% */
3206    const float strictness = peerCount >= relaxStrictnessIfFewerThanN
3207                           ? 1.0
3208                           : peerCount / (float)relaxStrictnessIfFewerThanN;
3209    const int lo = MIN_UPLOAD_IDLE_SECS;
3210    const int hi = MAX_UPLOAD_IDLE_SECS;
3211    const int limit = hi - ((hi - lo) * strictness);
3212    const int idleTime = now - MAX (atom->time, atom->piece_data_time);
3213/*fprintf (stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit);*/
3214    if (idleTime > limit)
3215      {
3216        tordbg (s, "purging peer %s because it's been %d secs since we shared anything",
3217                tr_atomAddrStr (atom), idleTime);
3218        return true;
3219      }
3220  }
3221
3222  return false;
3223}
3224
3225static tr_peer **
3226getPeersToClose (tr_swarm * s, const time_t now_sec, int * setmeSize)
3227{
3228  int i, peerCount, outsize;
3229  struct tr_peer ** ret = NULL;
3230  tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek (&s->peers, &peerCount);
3231
3232  assert (swarmIsLocked (s));
3233
3234  for (i=outsize=0; i<peerCount; ++i)
3235    {
3236      if (shouldPeerBeClosed (s, peers[i], peerCount, now_sec))
3237        {
3238          if (ret == NULL)
3239            ret = tr_new (tr_peer *, peerCount);
3240          ret[outsize++] = peers[i];
3241        }
3242    }
3243
3244  *setmeSize = outsize;
3245  return ret;
3246}
3247
3248static int
3249getReconnectIntervalSecs (const struct peer_atom * atom, const time_t now)
3250{
3251  int sec;
3252  const bool unreachable = (atom->flags2 & MYFLAG_UNREACHABLE) != 0;
3253
3254  /* if we were recently connected to this peer and transferring piece
3255   * data, try to reconnect to them sooner rather that later -- we don't
3256   * want network troubles to get in the way of a good peer. */
3257  if (!unreachable && ((now - atom->piece_data_time) <= (MINIMUM_RECONNECT_INTERVAL_SECS * 2)))
3258    sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3259
3260  /* otherwise, the interval depends on how many times we've tried
3261   * and failed to connect to the peer */
3262  else
3263    {
3264      int step = atom->numFails;
3265
3266      /* penalize peers that were unreachable the last time we tried */
3267      if (unreachable)
3268        step += 2;
3269 
3270      switch (step)
3271        {
3272          case 0: sec = 0; break;
3273          case 1: sec = 10; break;
3274          case 2: sec = 60 * 2; break;
3275          case 3: sec = 60 * 15; break;
3276          case 4: sec = 60 * 30; break;
3277          case 5: sec = 60 * 60; break;
3278          default: sec = 60 * 120; break;
3279        }
3280    }
3281
3282  dbgmsg ("reconnect interval for %s is %d seconds", tr_atomAddrStr (atom), sec);
3283  return sec;
3284}
3285
3286static void
3287removePeer (tr_swarm * s, tr_peer * peer)
3288{
3289  tr_peer * removed;
3290  struct peer_atom * atom = peer->atom;
3291
3292  assert (swarmIsLocked (s));
3293  assert (atom);
3294
3295  atom->time = tr_time ();
3296
3297  removed = tr_ptrArrayRemoveSorted (&s->peers, peer, peerCompare);
3298
3299  if (replicationExists (s))
3300    tr_decrReplicationFromBitfield (s, &peer->have);
3301
3302  assert (removed == peer);
3303  peerDelete (s, removed);
3304}
3305
3306static void
3307closePeer (tr_swarm * s, tr_peer * peer)
3308{
3309  struct peer_atom * atom;
3310
3311  assert (s != NULL);
3312  assert (peer != NULL);
3313
3314  atom = peer->atom;
3315
3316  /* if we transferred piece data, then they might be good peers,
3317     so reset their `numFails' weight to zero. otherwise we connected
3318     to them fruitlessly, so mark it as another fail */
3319  if (atom->piece_data_time)
3320    {
3321      tordbg (s, "resetting atom %s numFails to 0", tr_atomAddrStr (atom));
3322      atom->numFails = 0;
3323    }
3324  else
3325    {
3326      ++atom->numFails;
3327      tordbg (s, "incremented atom %s numFails to %d", tr_atomAddrStr (atom), (int)atom->numFails);
3328    }
3329
3330  tordbg (s, "removing bad peer %s", tr_peerIoGetAddrStr (peer->io));
3331  removePeer (s, peer);
3332}
3333
3334static void
3335removeAllPeers (tr_swarm * s)
3336{
3337  while (!tr_ptrArrayEmpty (&s->peers))
3338    removePeer (s, tr_ptrArrayNth (&s->peers, 0));
3339}
3340
3341static void
3342closeBadPeers (tr_swarm * s, const time_t now_sec)
3343{
3344  if (!tr_ptrArrayEmpty (&s->peers))
3345    {
3346      int i;
3347      int peerCount;
3348      struct tr_peer ** peers;
3349
3350      peers = getPeersToClose (s, now_sec, &peerCount);
3351      for (i=0; i<peerCount; ++i)
3352        closePeer (s, peers[i]);
3353      tr_free (peers);
3354    }
3355}
3356
3357struct peer_liveliness
3358{
3359  tr_peer * peer;
3360  void * clientData;
3361  time_t pieceDataTime;
3362  time_t time;
3363  int speed;
3364  bool doPurge;
3365};
3366
3367static int
3368comparePeerLiveliness (const void * va, const void * vb)
3369{
3370  const struct peer_liveliness * a = va;
3371  const struct peer_liveliness * b = vb;
3372
3373  if (a->doPurge != b->doPurge)
3374    return a->doPurge ? 1 : -1;
3375
3376  if (a->speed != b->speed) /* faster goes first */
3377    return a->speed > b->speed ? -1 : 1;
3378
3379  /* the one to give us data more recently goes first */
3380  if (a->pieceDataTime != b->pieceDataTime)
3381    return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
3382
3383  /* the one we connected to most recently goes first */
3384  if (a->time != b->time)
3385    return a->time > b->time ? -1 : 1;
3386
3387  return 0;
3388}
3389
3390static void
3391sortPeersByLivelinessImpl (tr_peer  ** peers,
3392                           void     ** clientData,
3393                           int         n,
3394                           uint64_t    now,
3395                           int (*compare)(const void *va, const void *vb))
3396{
3397  int i;
3398  struct peer_liveliness *lives, *l;
3399
3400  /* build a sortable array of peer + extra info */
3401  lives = l = tr_new0 (struct peer_liveliness, n);
3402  for (i=0; i<n; ++i, ++l)
3403    {
3404      tr_peer * p = peers[i];
3405      l->peer = p;
3406      l->doPurge = p->doPurge;
3407      l->pieceDataTime = p->atom->piece_data_time;
3408      l->time = p->atom->time;
3409      l->speed = tr_peerGetPieceSpeed_Bps (p, now, TR_UP)
3410               + tr_peerGetPieceSpeed_Bps (p, now, TR_DOWN);
3411      if (clientData)
3412        l->clientData = clientData[i];
3413    }
3414
3415  /* sort 'em */
3416  assert (n == (l - lives));
3417  qsort (lives, n, sizeof (struct peer_liveliness), compare);
3418
3419  /* build the peer array */
3420  for (i=0, l=lives; i<n; ++i, ++l)
3421    {
3422      peers[i] = l->peer;
3423      if (clientData)
3424        clientData[i] = l->clientData;
3425    }
3426  assert (n == (l - lives));
3427
3428  /* cleanup */
3429  tr_free (lives);
3430}
3431
3432static void
3433sortPeersByLiveliness (tr_peer ** peers, void ** clientData, int n, uint64_t now)
3434{
3435  sortPeersByLivelinessImpl (peers, clientData, n, now, comparePeerLiveliness);
3436}
3437
3438
3439static void
3440enforceTorrentPeerLimit (tr_swarm * s, uint64_t now)
3441{
3442  int n = tr_ptrArraySize (&s->peers);
3443  const int max = tr_torrentGetPeerLimit (s->tor);
3444  if (n > max)
3445    {
3446      void * base = tr_ptrArrayBase (&s->peers);
3447      tr_peer ** peers = tr_memdup (base, n*sizeof (tr_peer*));
3448      sortPeersByLiveliness (peers, NULL, n, now);
3449      while (n > max)
3450        closePeer (s, peers[--n]);
3451      tr_free (peers);
3452    }
3453}
3454
3455static void
3456enforceSessionPeerLimit (tr_session * session, uint64_t now)
3457{
3458  int n = 0;
3459  tr_torrent * tor = NULL;
3460  const int max = tr_sessionGetPeerLimit (session);
3461
3462  /* count the total number of peers */
3463  while ((tor = tr_torrentNext (session, tor)))
3464    n += tr_ptrArraySize (&tor->swarm->peers);
3465
3466  /* if there are too many, prune out the worst */
3467  if (n > max)
3468    {
3469      tr_peer ** peers = tr_new (tr_peer*, n);
3470      tr_swarm ** swarms = tr_new (tr_swarm*, n);
3471
3472      /* populate the peer array */
3473      n = 0;
3474      tor = NULL;
3475      while ((tor = tr_torrentNext (session, tor)))
3476        {
3477          int i;
3478          tr_swarm * s = tor->swarm;
3479          const int tn = tr_ptrArraySize (&s->peers);
3480          for (i=0; i<tn; ++i, ++n)
3481            {
3482              peers[n] = tr_ptrArrayNth (&s->peers, i);
3483              swarms[n] = s;
3484            }
3485        }
3486
3487      /* sort 'em */
3488      sortPeersByLiveliness (peers, (void**)swarms, n, now);
3489
3490      /* cull out the crappiest */
3491      while (n-- > max)
3492        closePeer (swarms[n], peers[n]);
3493
3494      /* cleanup */
3495      tr_free (swarms);
3496      tr_free (peers);
3497    }
3498}
3499
3500static void makeNewPeerConnections (tr_peerMgr * mgr, const int max);
3501
3502static void
3503reconnectPulse (int foo UNUSED, short bar UNUSED, void * vmgr)
3504{
3505  tr_torrent * tor;
3506  tr_peerMgr * mgr = vmgr;
3507  const time_t now_sec = tr_time ();
3508  const uint64_t now_msec = tr_time_msec ();
3509
3510  /**
3511  ***  enforce the per-session and per-torrent peer limits
3512  **/
3513
3514  /* if we're over the per-torrent peer limits, cull some peers */
3515  tor = NULL;
3516  while ((tor = tr_torrentNext (mgr->session, tor)))
3517    if (tor->isRunning)
3518      enforceTorrentPeerLimit (tor->swarm, now_msec);
3519
3520  /* if we're over the per-session peer limits, cull some peers */
3521  enforceSessionPeerLimit (mgr->session, now_msec);
3522
3523  /* remove crappy peers */
3524  tor = NULL;
3525  while ((tor = tr_torrentNext (mgr->session, tor)))
3526    if (!tor->swarm->isRunning)
3527      removeAllPeers (tor->swarm);
3528    else
3529      closeBadPeers (tor->swarm, now_sec);
3530
3531  /* try to make new peer connections */
3532  makeNewPeerConnections (mgr, MAX_CONNECTIONS_PER_PULSE);
3533}
3534
3535/****
3536*****
3537*****  BANDWIDTH ALLOCATION
3538*****
3539****/
3540
3541static void
3542pumpAllPeers (tr_peerMgr * mgr)
3543{
3544  tr_torrent * tor = NULL;
3545
3546  while ((tor = tr_torrentNext (mgr->session, tor)))
3547    {
3548      int j;
3549      tr_swarm * s = tor->swarm;
3550
3551      for (j=0; j<tr_ptrArraySize (&s->peers); ++j)
3552        {
3553          tr_peer * peer = tr_ptrArrayNth (&s->peers, j);
3554          tr_peerMsgsPulse (peer->msgs);
3555        }
3556    }
3557}
3558
3559
3560static void
3561queuePulseForeach (void * vtor)
3562{
3563  tr_torrent * tor = vtor;
3564
3565  tr_torrentStartNow (tor);
3566
3567  if (tor->queue_started_callback != NULL)
3568    (*tor->queue_started_callback)(tor, tor->queue_started_user_data);
3569}
3570
3571static void
3572queuePulse (tr_session * session, tr_direction dir)
3573{
3574  assert (tr_isSession (session));
3575  assert (tr_isDirection (dir));
3576
3577  if (tr_sessionGetQueueEnabled (session, dir))
3578    {
3579      tr_ptrArray torrents = TR_PTR_ARRAY_INIT;
3580
3581      tr_sessionGetNextQueuedTorrents (session,
3582                                       dir,
3583                                       tr_sessionCountQueueFreeSlots (session, dir),
3584                                       &torrents);
3585
3586      tr_ptrArrayForeach (&torrents, queuePulseForeach);
3587
3588      tr_ptrArrayDestruct (&torrents, NULL);
3589    }
3590}
3591
3592static void
3593bandwidthPulse (int foo UNUSED, short bar UNUSED, void * vmgr)
3594{
3595  tr_torrent * tor;
3596  tr_peerMgr * mgr = vmgr;
3597  tr_session * session = mgr->session;
3598  managerLock (mgr);
3599
3600  /* FIXME: this next line probably isn't necessary... */
3601  pumpAllPeers (mgr);
3602
3603  /* allocate bandwidth to the peers */
3604  tr_bandwidthAllocate (&session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC);
3605  tr_bandwidthAllocate (&session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC);
3606
3607  /* torrent upkeep */
3608  tor = NULL;
3609  while ((tor = tr_torrentNext (session, tor)))
3610    {
3611      /* possibly stop torrents that have seeded enough */
3612      tr_torrentCheckSeedLimit (tor);
3613
3614      /* run the completeness check for any torrents that need it */
3615      if (tor->swarm->needsCompletenessCheck)
3616        {
3617          tor->swarm->needsCompletenessCheck  = false;
3618          tr_torrentRecheckCompleteness (tor);
3619        }
3620
3621      /* stop torrents that are ready to stop, but couldn't be stopped
3622         earlier during the peer-io callback call chain */
3623      if (tor->isStopping)
3624        tr_torrentStop (tor);
3625    }
3626
3627  /* pump the queues */
3628  queuePulse (session, TR_UP);
3629  queuePulse (session, TR_DOWN);
3630
3631  reconnectPulse (0, 0, mgr);
3632
3633  tr_timerAddMsec (mgr->bandwidthTimer, BANDWIDTH_PERIOD_MSEC);
3634  managerUnlock (mgr);
3635}
3636
3637/***
3638****
3639***/
3640
3641static int
3642compareAtomPtrsByAddress (const void * va, const void *vb)
3643{
3644  const struct peer_atom * a = * (const struct peer_atom**) va;
3645  const struct peer_atom * b = * (const struct peer_atom**) vb;
3646
3647  assert (tr_isAtom (a));
3648  assert (tr_isAtom (b));
3649
3650  return tr_address_compare (&a->addr, &b->addr);
3651}
3652
3653/* best come first, worst go last */
3654static int
3655compareAtomPtrsByShelfDate (const void * va, const void *vb)
3656{
3657  time_t atime;
3658  time_t btime;
3659  const struct peer_atom * a = * (const struct peer_atom**) va;
3660  const struct peer_atom * b = * (const struct peer_atom**) vb;
3661  const int data_time_cutoff_secs = 60 * 60;
3662  const time_t tr_now = tr_time ();
3663
3664  assert (tr_isAtom (a));
3665  assert (tr_isAtom (b));
3666
3667  /* primary key: the last piece data time *if* it was within the last hour */
3668  atime = a->piece_data_time; if (atime + data_time_cutoff_secs < tr_now) atime = 0;
3669  btime = b->piece_data_time; if (btime + data_time_cutoff_secs < tr_now) btime = 0;
3670  if (atime != btime)
3671    return atime > btime ? -1 : 1;
3672
3673  /* secondary key: shelf date. */
3674  if (a->shelf_date != b->shelf_date)
3675    return a->shelf_date > b->shelf_date ? -1 : 1;
3676
3677  return 0;
3678}
3679
3680static int
3681getMaxAtomCount (const tr_torrent * tor)
3682{
3683  return MIN (50, tor->maxConnectedPeers * 3);
3684}
3685
3686static void
3687atomPulse (int foo UNUSED, short bar UNUSED, void * vmgr)
3688{
3689  tr_torrent * tor = NULL;
3690  tr_peerMgr * mgr = vmgr;
3691  managerLock (mgr);
3692
3693  while ((tor = tr_torrentNext (mgr->session, tor)))
3694    {
3695      int atomCount;
3696      tr_swarm * s = tor->swarm;
3697      const int maxAtomCount = getMaxAtomCount (tor);
3698      struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek (&s->pool, &atomCount);
3699
3700      if (atomCount > maxAtomCount) /* we've got too many atoms... time to prune */
3701        {
3702          int i;
3703          int keepCount = 0;
3704          int testCount = 0;
3705          struct peer_atom ** keep = tr_new (struct peer_atom*, atomCount);
3706          struct peer_atom ** test = tr_new (struct peer_atom*, atomCount);
3707
3708          /* keep the ones that are in use */
3709          for (i=0; i<atomCount; ++i)
3710            {
3711              struct peer_atom * atom = atoms[i];
3712              if (peerIsInUse (s, atom))
3713                keep[keepCount++] = atom;
3714              else
3715                test[testCount++] = atom;
3716            }
3717
3718          /* if there's room, keep the best of what's left */
3719          i = 0;
3720          if (keepCount < maxAtomCount)
3721            {
3722              qsort (test, testCount, sizeof (struct peer_atom *), compareAtomPtrsByShelfDate);
3723              while (i<testCount && keepCount<maxAtomCount)
3724                keep[keepCount++] = test[i++];
3725            }
3726
3727          /* free the culled atoms */
3728          while (i<testCount)
3729            tr_free (test[i++]);
3730
3731          /* rebuild Torrent.pool with what's left */
3732          tr_ptrArrayDestruct (&s->pool, NULL);
3733          s->pool = TR_PTR_ARRAY_INIT;
3734          qsort (keep, keepCount, sizeof (struct peer_atom *), compareAtomPtrsByAddress);
3735          for (i=0; i<keepCount; ++i)
3736            tr_ptrArrayAppend (&s->pool, keep[i]);
3737
3738          tordbg (s, "max atom count is %d... pruned from %d to %d\n", maxAtomCount, atomCount, keepCount);
3739
3740          /* cleanup */
3741          tr_free (test);
3742          tr_free (keep);
3743        }
3744    }
3745
3746  tr_timerAddMsec (mgr->atomTimer, ATOM_PERIOD_MSEC);
3747  managerUnlock (mgr);
3748}
3749
3750/***
3751****
3752****
3753****
3754***/
3755
3756/* is this atom someone that we'd want to initiate a connection to? */
3757static bool
3758isPeerCandidate (const tr_torrent * tor, struct peer_atom * atom, const time_t now)
3759{
3760  /* not if we're both seeds */
3761  if (tr_torrentIsSeed (tor) && atomIsSeed (atom))
3762    return false;
3763
3764  /* not if we've already got a connection to them... */
3765  if (peerIsInUse (tor->swarm, atom))
3766    return false;
3767
3768  /* not if we just tried them already */
3769  if ((now - atom->time) < getReconnectIntervalSecs (atom, now))
3770    return false;
3771
3772  /* not if they're blocklisted */
3773  if (isAtomBlocklisted (tor->session, atom))
3774    return false;
3775
3776  /* not if they're banned... */
3777  if (atom->flags2 & MYFLAG_BANNED)
3778    return false;
3779
3780  return true;
3781}
3782
3783struct peer_candidate
3784{
3785  uint64_t score;
3786  tr_torrent * tor;
3787  struct peer_atom * atom;
3788};
3789
3790static bool
3791torrentWasRecentlyStarted (const tr_torrent * tor)
3792{
3793  return difftime (tr_time (), tor->startDate) < 120;
3794}
3795
3796static inline uint64_t
3797addValToKey (uint64_t value, int width, uint64_t addme)
3798{
3799  value = (value << (uint64_t)width);
3800  value |= addme;
3801  return value;
3802}
3803
3804/* smaller value is better */
3805static uint64_t
3806getPeerCandidateScore (const tr_torrent * tor, const struct peer_atom * atom, uint8_t salt)
3807{
3808  uint64_t i;
3809  uint64_t score = 0;
3810  const bool failed = atom->lastConnectionAt < atom->lastConnectionAttemptAt;
3811
3812  /* prefer peers we've connected to, or never tried, over peers we failed to connect to. */
3813  i = failed ? 1 : 0;
3814  score = addValToKey (score, 1, i);
3815
3816  /* prefer the one we attempted least recently (to cycle through all peers) */
3817  i = atom->lastConnectionAttemptAt;
3818  score = addValToKey (score, 32, i);
3819
3820  /* prefer peers belonging to a torrent of a higher priority */
3821  switch (tr_torrentGetPriority (tor))
3822    {
3823      case TR_PRI_HIGH:    i = 0; break;
3824      case TR_PRI_NORMAL:  i = 1; break;
3825      case TR_PRI_LOW:     i = 2; break;
3826    }
3827  score = addValToKey (score, 4, i);
3828
3829  /* prefer recently-started torrents */
3830  i = torrentWasRecentlyStarted (tor) ? 0 : 1;
3831  score = addValToKey (score, 1, i);
3832
3833  /* prefer torrents we're downloading with */
3834  i = tr_torrentIsSeed (tor) ? 1 : 0;
3835  score = addValToKey (score, 1, i);
3836
3837  /* prefer peers that are known to be connectible */
3838  i = (atom->flags & ADDED_F_CONNECTABLE) ? 0 : 1;
3839  score = addValToKey (score, 1, i);
3840
3841  /* prefer peers that we might have a chance of uploading to...
3842  so lower seed probability is better */
3843  if (atom->seedProbability == 100) i = 101;
3844  else if (atom->seedProbability == -1) i = 100;
3845  else i = atom->seedProbability;
3846  score = addValToKey (score, 8, i);
3847
3848  /* Prefer peers that we got from more trusted sources.
3849   * lower `fromBest' values indicate more trusted sources */
3850  score = addValToKey (score, 4, atom->fromBest);
3851
3852  /* salt */
3853  score = addValToKey (score, 8, salt);
3854
3855  return score;
3856}
3857
3858static int
3859comparePeerCandidates (const void * va, const void * vb)
3860{
3861  int ret;
3862  const struct peer_candidate * a = va;
3863  const struct peer_candidate * b = vb;
3864
3865  if (a->score < b->score)
3866    ret = -1;
3867  else if (a->score > b->score)
3868    ret = 1;
3869  else
3870    ret = 0;
3871
3872  return ret;
3873}
3874
3875/* Partial sorting -- selecting the k best candidates
3876   Adapted from http://en.wikipedia.org/wiki/Selection_algorithm */
3877static void
3878selectPeerCandidates (struct peer_candidate * candidates, int candidate_count, int select_count)
3879{
3880  tr_quickfindFirstK (candidates,
3881                      candidate_count,
3882                      sizeof(struct peer_candidate),
3883                      comparePeerCandidates,
3884                      select_count);
3885}
3886
3887#ifndef NDEBUG
3888static bool
3889checkBestScoresComeFirst (const struct peer_candidate * candidates, int n, int k)
3890{
3891  int i;
3892  uint64_t worstFirstScore = 0;
3893  const int x = MIN (n, k) - 1;
3894
3895  for (i=0; i<x; i++)
3896    if (worstFirstScore < candidates[i].score)
3897      worstFirstScore = candidates[i].score;
3898
3899  for (i=0; i<x; i++)
3900    assert (candidates[i].score <= worstFirstScore);
3901
3902  for (i=x+1; i<n; i++)
3903    assert (candidates[i].score >= worstFirstScore);
3904
3905  return true;
3906}
3907#endif /* NDEBUG */
3908
3909/** @return an array of all the atoms we might want to connect to */
3910static struct peer_candidate*
3911getPeerCandidates (tr_session * session, int * candidateCount, int max)
3912{
3913  int atomCount;
3914  int peerCount;
3915  tr_torrent * tor;
3916  struct peer_candidate * candidates;
3917  struct peer_candidate * walk;
3918  const time_t now = tr_time ();
3919  const uint64_t now_msec = tr_time_msec ();
3920  /* leave 5% of connection slots for incoming connections -- ticket #2609 */
3921  const int maxCandidates = tr_sessionGetPeerLimit (session) * 0.95;
3922
3923  /* count how many peers and atoms we've got */
3924  tor= NULL;
3925  atomCount = 0;
3926  peerCount = 0;
3927  while ((tor = tr_torrentNext (session, tor)))
3928    {
3929      atomCount += tr_ptrArraySize (&tor->swarm->pool);
3930      peerCount += tr_ptrArraySize (&tor->swarm->peers);
3931    }
3932
3933  /* don't start any new handshakes if we're full up */
3934  if (maxCandidates <= peerCount)
3935    {
3936      *candidateCount = 0;
3937      return NULL;
3938    }
3939
3940  /* allocate an array of candidates */
3941  walk = candidates = tr_new (struct peer_candidate, atomCount);
3942
3943  /* populate the candidate array */
3944  tor = NULL;
3945  while ((tor = tr_torrentNext (session, tor)))
3946    {
3947      int i, nAtoms;
3948      struct peer_atom ** atoms;
3949
3950      if (!tor->swarm->isRunning)
3951        continue;
3952
3953      /* if we've already got enough peers in this torrent... */
3954      if (tr_torrentGetPeerLimit (tor) <= tr_ptrArraySize (&tor->swarm->peers))
3955        continue;
3956
3957      /* if we've already got enough speed in this torrent... */
3958      if (tr_torrentIsSeed (tor) && isBandwidthMaxedOut (&tor->bandwidth, now_msec, TR_UP))
3959        continue;
3960
3961      atoms = (struct peer_atom**) tr_ptrArrayPeek (&tor->swarm->pool, &nAtoms);
3962      for (i=0; i<nAtoms; ++i)
3963        {
3964          struct peer_atom * atom = atoms[i];
3965
3966          if (isPeerCandidate (tor, atom, now))
3967            {
3968              const uint8_t salt = tr_cryptoWeakRandInt (1024);
3969              walk->tor = tor;
3970              walk->atom = atom;
3971              walk->score = getPeerCandidateScore (tor, atom, salt);
3972              ++walk;
3973            }
3974        }
3975    }
3976
3977  *candidateCount = walk - candidates;
3978  if (walk != candidates)
3979    selectPeerCandidates (candidates, walk-candidates, max);
3980
3981  assert (checkBestScoresComeFirst (candidates, *candidateCount, max));
3982  return candidates;
3983}
3984
3985static void
3986initiateConnection (tr_peerMgr * mgr, tr_swarm * s, struct peer_atom * atom)
3987{
3988  tr_peerIo * io;
3989  const time_t now = tr_time ();
3990  bool utp = tr_sessionIsUTPEnabled (mgr->session) && !atom->utp_failed;
3991
3992  if (atom->fromFirst == TR_PEER_FROM_PEX)
3993    /* PEX has explicit signalling for uTP support.  If an atom
3994       originally came from PEX and doesn't have the uTP flag, skip the
3995       uTP connection attempt.  Are we being optimistic here? */
3996    utp = utp && (atom->flags & ADDED_F_UTP_FLAGS);
3997
3998  tordbg (s, "Starting an OUTGOING%s connection with %s",
3999          utp ? " µTP" : "", tr_atomAddrStr (atom));
4000
4001  io = tr_peerIoNewOutgoing (mgr->session,
4002                             &mgr->session->bandwidth,
4003                             &atom->addr,
4004                             atom->port,
4005                             s->tor->info.hash,
4006                             s->tor->completeness == TR_SEED,
4007                             utp);
4008
4009  if (io == NULL)
4010    {
4011      tordbg (s, "peerIo not created; marking peer %s as unreachable", tr_atomAddrStr (atom));
4012      atom->flags2 |= MYFLAG_UNREACHABLE;
4013      atom->numFails++;
4014    }
4015  else
4016    {
4017      tr_handshake * handshake = tr_handshakeNew (io,
4018                                                  mgr->session->encryptionMode,
4019                                                  myHandshakeDoneCB,
4020                                                  mgr);
4021
4022      assert (tr_peerIoGetTorrentHash (io));
4023
4024      tr_peerIoUnref (io); /* balanced by the initial ref
4025                              in tr_peerIoNewOutgoing () */
4026
4027      tr_ptrArrayInsertSorted (&s->outgoingHandshakes, handshake,
4028                               handshakeCompare);
4029    }
4030
4031  atom->lastConnectionAttemptAt = now;
4032  atom->time = now;
4033}
4034
4035static void
4036initiateCandidateConnection (tr_peerMgr * mgr, struct peer_candidate * c)
4037{
4038#if 0
4039  fprintf (stderr, "Starting an OUTGOING connection with %s - [%s] seedProbability==%d; %s, %s\n",
4040           tr_atomAddrStr (c->atom),
4041           tr_torrentName (c->tor),
4042           (int)c->atom->seedProbability,
4043           tr_torrentIsPrivate (c->tor) ? "private" : "public",
4044           tr_torrentIsSeed (c->tor) ? "seed" : "downloader");
4045#endif
4046
4047  initiateConnection (mgr, c->tor->swarm, c->atom);
4048}
4049
4050static void
4051makeNewPeerConnections (struct tr_peerMgr * mgr, const int max)
4052{
4053  int i, n;
4054  struct peer_candidate * candidates;
4055
4056  candidates = getPeerCandidates (mgr->session, &n, max);
4057
4058  for (i=0; i<n && i<max; ++i)
4059    initiateCandidateConnection (mgr, &candidates[i]);
4060
4061  tr_free (candidates);
4062}
Note: See TracBrowser for help on using the repository browser.