source: trunk/libtransmission/peer-mgr.c @ 14076

Last change on this file since 14076 was 14076, checked in by jordan, 9 years ago

undo previous commit's accidental inclusion of changes to peer-msgs.* and peer-mgr.c

  • Property svn:keywords set to Date Rev Author Id
File size: 108.6 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2 (b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 14076 2013-05-22 19:06:54Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h> /* error codes ERANGE, ... */
15#include <limits.h> /* INT_MAX */
16#include <string.h> /* memcpy, memcmp, strstr */
17#include <stdlib.h> /* qsort */
18
19#include <event2/event.h>
20
21#include <libutp/utp.h>
22
23#include "transmission.h"
24#include "announcer.h"
25#include "bandwidth.h"
26#include "blocklist.h"
27#include "cache.h"
28#include "clients.h"
29#include "completion.h"
30#include "crypto.h"
31#include "handshake.h"
32#include "log.h"
33#include "net.h"
34#include "peer-io.h"
35#include "peer-mgr.h"
36#include "peer-msgs.h"
37#include "ptrarray.h"
38#include "session.h"
39#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
40#include "torrent.h"
41#include "tr-utp.h"
42#include "utils.h"
43#include "webseed.h"
44
45enum
46{
47  /* how frequently to cull old atoms */
48  ATOM_PERIOD_MSEC = (60 * 1000),
49
50  /* how frequently to change which peers are choked */
51  RECHOKE_PERIOD_MSEC = (10 * 1000),
52
53  /* an optimistically unchoked peer is immune from rechoking
54     for this many calls to rechokeUploads (). */
55  OPTIMISTIC_UNCHOKE_MULTIPLIER = 4,
56
57  /* how frequently to reallocate bandwidth */
58  BANDWIDTH_PERIOD_MSEC = 500,
59
60  /* how frequently to age out old piece request lists */
61  REFILL_UPKEEP_PERIOD_MSEC = (10 * 1000),
62
63  /* how frequently to decide which peers live and die */
64  RECONNECT_PERIOD_MSEC = 500,
65
66  /* when many peers are available, keep idle ones this long */
67  MIN_UPLOAD_IDLE_SECS = (60),
68
69  /* when few peers are available, keep idle ones this long */
70  MAX_UPLOAD_IDLE_SECS = (60 * 5),
71
72  /* max number of peers to ask for per second overall.
73   * this throttle is to avoid overloading the router */
74  MAX_CONNECTIONS_PER_SECOND = 12,
75
76  MAX_CONNECTIONS_PER_PULSE = (int)(MAX_CONNECTIONS_PER_SECOND * (RECONNECT_PERIOD_MSEC/1000.0)),
77
78  /* number of bad pieces a peer is allowed to send before we ban them */
79  MAX_BAD_PIECES_PER_PEER = 5,
80
81  /* amount of time to keep a list of request pieces lying around
82     before it's considered too old and needs to be rebuilt */
83  PIECE_LIST_SHELF_LIFE_SECS = 60,
84
85  /* use for bitwise operations w/peer_atom.flags2 */
86  MYFLAG_BANNED = 1,
87
88  /* use for bitwise operations w/peer_atom.flags2 */
89  /* unreachable for now... but not banned.
90   * if they try to connect to us it's okay */
91  MYFLAG_UNREACHABLE = 2,
92
93  /* the minimum we'll wait before attempting to reconnect to a peer */
94  MINIMUM_RECONNECT_INTERVAL_SECS = 5,
95
96  /** how long we'll let requests we've made linger before we cancel them */
97  REQUEST_TTL_SECS = 90,
98
99  NO_BLOCKS_CANCEL_HISTORY = 120,
100
101  CANCEL_HISTORY_SEC = 60
102};
103
104const tr_peer_event TR_PEER_EVENT_INIT = { 0, 0, NULL, 0, 0, 0, 0 };
105
106/**
107***
108**/
109
110/**
111 * Peer information that should be kept even before we've connected and
112 * after we've disconnected. These are kept in a pool of peer_atoms to decide
113 * which ones would make good candidates for connecting to, and to watch out
114 * for banned peers.
115 *
116 * @see tr_peer
117 * @see tr_peerMsgs
118 */
119struct peer_atom
120{
121  uint8_t     fromFirst;          /* where the peer was first found */
122  uint8_t     fromBest;           /* the "best" value of where the peer has been found */
123  uint8_t     flags;              /* these match the added_f flags */
124  uint8_t     flags2;             /* flags that aren't defined in added_f */
125  int8_t      seedProbability;    /* how likely is this to be a seed... [0..100] or -1 for unknown */
126  int8_t      blocklisted;        /* -1 for unknown, true for blocklisted, false for not blocklisted */
127
128  tr_port     port;
129  bool        utp_failed;         /* We recently failed to connect over uTP */
130  uint16_t    numFails;
131  time_t      time;               /* when the peer's connection status last changed */
132  time_t      piece_data_time;
133
134  time_t      lastConnectionAttemptAt;
135  time_t      lastConnectionAt;
136
137  /* similar to a TTL field, but less rigid --
138   * if the swarm is small, the atom will be kept past this date. */
139  time_t      shelf_date;
140  tr_peer   * peer;               /* will be NULL if not connected */
141  tr_address  addr;
142};
143
144#ifdef NDEBUG
145#define tr_isAtom(a) (TRUE)
146#else
147static bool
148tr_isAtom (const struct peer_atom * atom)
149{
150  return (atom != NULL)
151      && (atom->fromFirst < TR_PEER_FROM__MAX)
152      && (atom->fromBest < TR_PEER_FROM__MAX)
153      && (tr_address_is_valid (&atom->addr));
154}
155#endif
156
157static const char*
158tr_atomAddrStr (const struct peer_atom * atom)
159{
160  return atom ? tr_peerIoAddrStr (&atom->addr, atom->port) : "[no atom]";
161}
162
163struct block_request
164{
165  tr_block_index_t block;
166  tr_peer * peer;
167  time_t sentAt;
168};
169
170struct weighted_piece
171{
172  tr_piece_index_t index;
173  int16_t salt;
174  int16_t requestCount;
175};
176
177enum piece_sort_state
178{
179  PIECES_UNSORTED,
180  PIECES_SORTED_BY_INDEX,
181  PIECES_SORTED_BY_WEIGHT
182};
183
184/** @brief Opaque, per-torrent data structure for peer connection information */
185typedef struct tr_swarm
186{
187  tr_ptrArray                outgoingHandshakes; /* tr_handshake */
188  tr_ptrArray                pool; /* struct peer_atom */
189  tr_ptrArray                peers; /* tr_peerMsgs */
190  tr_ptrArray                webseeds; /* tr_webseed */
191
192  tr_torrent               * tor;
193  struct tr_peerMgr        * manager;
194
195  tr_peerMsgs              * optimistic; /* the optimistic peer, or NULL if none */
196  int                        optimisticUnchokeTimeScaler;
197
198  bool                       isRunning;
199  bool                       needsCompletenessCheck;
200
201  struct block_request     * requests;
202  int                        requestCount;
203  int                        requestAlloc;
204
205  struct weighted_piece    * pieces;
206  int                        pieceCount;
207  enum piece_sort_state      pieceSortState;
208
209  /* An array of pieceCount items stating how many peers have each piece.
210     This is used to help us for downloading pieces "rarest first."
211     This may be NULL if we don't have metainfo yet, or if we're not
212     downloading and don't care about rarity */
213  uint16_t                 * pieceReplication;
214  size_t                     pieceReplicationSize;
215
216  int                        interestedCount;
217  int                        maxPeers;
218  time_t                     lastCancel;
219
220  /* Before the endgame this should be 0. In endgame, is contains the average
221   * number of pending requests per peer. Only peers which have more pending
222   * requests are considered 'fast' are allowed to request a block that's
223   * already been requested from another (slower?) peer. */
224  int                        endgame;
225}
226tr_swarm;
227
228struct tr_peerMgr
229{
230  tr_session    * session;
231  tr_ptrArray     incomingHandshakes; /* tr_handshake */
232  struct event  * bandwidthTimer;
233  struct event  * rechokeTimer;
234  struct event  * refillUpkeepTimer;
235  struct event  * atomTimer;
236};
237
238#define tordbg(t, ...) \
239  do \
240    { \
241      if (tr_logGetDeepEnabled ()) \
242        tr_logAddDeep (__FILE__, __LINE__, tr_torrentName (t->tor), __VA_ARGS__); \
243    } \
244  while (0)
245
246#define dbgmsg(...) \
247  do \
248    { \
249      if (tr_logGetDeepEnabled ()) \
250        tr_logAddDeep (__FILE__, __LINE__, NULL, __VA_ARGS__); \
251    } \
252  while (0)
253
254/**
255*** tr_peer virtual functions
256**/
257
258static bool
259tr_peerIsTransferringPieces (const tr_peer * peer,
260                             uint64_t        now,
261                             tr_direction    direction,
262                             unsigned int  * Bps)
263{
264  assert (peer != NULL);
265  assert (peer->funcs != NULL);
266
267  return (*peer->funcs->is_transferring_pieces)(peer, now, direction, Bps);
268}
269
270unsigned int
271tr_peerGetPieceSpeed_Bps (const tr_peer    * peer,
272                          uint64_t           now,
273                          tr_direction       direction)
274{
275  unsigned int Bps = 0;
276  tr_peerIsTransferringPieces (peer, now, direction, &Bps);
277  return Bps;
278}
279
280static void
281tr_peerFree (tr_peer * peer)
282{
283  assert (peer != NULL);
284  assert (peer->funcs != NULL);
285
286  (*peer->funcs->destruct)(peer);
287
288  tr_free (peer);
289}
290
291void
292tr_peerConstruct (tr_peer * peer, const tr_torrent * tor)
293{
294  assert (peer != NULL);
295  assert (tr_isTorrent (tor));
296
297  memset (peer, 0, sizeof (tr_peer));
298
299  peer->client = TR_KEY_NONE;
300  peer->swarm = tor->swarm;
301  tr_bitfieldConstruct (&peer->have, tor->info.pieceCount);
302  tr_bitfieldConstruct (&peer->blame, tor->blockCount);
303}
304
305static void peerDeclinedAllRequests (tr_swarm *, const tr_peer *);
306
307void
308tr_peerDestruct (tr_peer * peer)
309{
310  assert (peer != NULL);
311
312  if (peer->swarm != NULL)
313    peerDeclinedAllRequests (peer->swarm, peer);
314
315  tr_bitfieldDestruct (&peer->have);
316  tr_bitfieldDestruct (&peer->blame);
317
318  if (peer->atom)
319    peer->atom->peer = NULL;
320}
321
322/**
323***
324**/
325
326static inline void
327managerLock (const struct tr_peerMgr * manager)
328{
329  tr_sessionLock (manager->session);
330}
331
332static inline void
333managerUnlock (const struct tr_peerMgr * manager)
334{
335  tr_sessionUnlock (manager->session);
336}
337
338static inline void
339swarmLock (tr_swarm * swarm)
340{
341  managerLock (swarm->manager);
342}
343
344static inline void
345swarmUnlock (tr_swarm * swarm)
346{
347  managerUnlock (swarm->manager);
348}
349
350static inline int
351swarmIsLocked (const tr_swarm * swarm)
352{
353  return tr_sessionIsLocked (swarm->manager->session);
354}
355
356/**
357***
358**/
359
360static int
361handshakeCompareToAddr (const void * va, const void * vb)
362{
363  const tr_handshake * a = va;
364
365  return tr_address_compare (tr_handshakeGetAddr (a, NULL), vb);
366}
367
368static int
369handshakeCompare (const void * a, const void * b)
370{
371  return handshakeCompareToAddr (a, tr_handshakeGetAddr (b, NULL));
372}
373
374static inline tr_handshake*
375getExistingHandshake (tr_ptrArray * handshakes, const tr_address * addr)
376{
377  if (tr_ptrArrayEmpty (handshakes))
378    return NULL;
379
380  return tr_ptrArrayFindSorted (handshakes, addr, handshakeCompareToAddr);
381}
382
383static int
384comparePeerAtomToAddress (const void * va, const void * vb)
385{
386  const struct peer_atom * a = va;
387
388  return tr_address_compare (&a->addr, vb);
389}
390
391static int
392compareAtomsByAddress (const void * va, const void * vb)
393{
394  const struct peer_atom * b = vb;
395
396  assert (tr_isAtom (b));
397
398  return comparePeerAtomToAddress (va, &b->addr);
399}
400
401/**
402***
403**/
404
405const tr_address *
406tr_peerAddress (const tr_peer * peer)
407{
408  return &peer->atom->addr;
409}
410
411static tr_swarm *
412getExistingSwarm (tr_peerMgr *    manager,
413                  const uint8_t * hash)
414{
415  tr_torrent * tor = tr_torrentFindFromHash (manager->session, hash);
416
417  return tor == NULL ? NULL : tor->swarm;
418}
419
420static int
421peerCompare (const void * a, const void * b)
422{
423  return tr_address_compare (tr_peerAddress (a), tr_peerAddress (b));
424}
425
426static struct peer_atom*
427getExistingAtom (const tr_swarm   * cswarm,
428                 const tr_address * addr)
429{
430  tr_swarm * swarm = (tr_swarm*) cswarm;
431  return tr_ptrArrayFindSorted (&swarm->pool, addr, comparePeerAtomToAddress);
432}
433
434static bool
435peerIsInUse (const tr_swarm * cs, const struct peer_atom * atom)
436{
437  tr_swarm * s = (tr_swarm*) cs;
438
439  assert (swarmIsLocked (s));
440
441  return (atom->peer != NULL)
442      || getExistingHandshake (&s->outgoingHandshakes, &atom->addr)
443      || getExistingHandshake (&s->manager->incomingHandshakes, &atom->addr);
444}
445
446static inline bool
447replicationExists (const tr_swarm * s)
448{
449  return s->pieceReplication != NULL;
450}
451
452static void
453replicationFree (tr_swarm * s)
454{
455  tr_free (s->pieceReplication);
456  s->pieceReplication = NULL;
457  s->pieceReplicationSize = 0;
458}
459
460static void
461replicationNew (tr_swarm * s)
462{
463  tr_piece_index_t piece_i;
464  const tr_piece_index_t piece_count = s->tor->info.pieceCount;
465  const int n = tr_ptrArraySize (&s->peers);
466
467  assert (!replicationExists (s));
468
469  s->pieceReplicationSize = piece_count;
470  s->pieceReplication = tr_new0 (uint16_t, piece_count);
471
472  for (piece_i=0; piece_i<piece_count; ++piece_i)
473    {
474      int peer_i;
475      uint16_t r = 0;
476
477      for (peer_i=0; peer_i<n; ++peer_i)
478        {
479          tr_peer * peer = tr_ptrArrayNth (&s->peers, peer_i);
480          if (tr_bitfieldHas (&peer->have, piece_i))
481            ++r;
482        }
483
484      s->pieceReplication[piece_i] = r;
485    }
486}
487
488static void
489swarmFree (void * vs)
490{
491  tr_swarm * s = vs;
492
493  assert (s);
494  assert (!s->isRunning);
495  assert (swarmIsLocked (s));
496  assert (tr_ptrArrayEmpty (&s->outgoingHandshakes));
497  assert (tr_ptrArrayEmpty (&s->peers));
498
499  tr_ptrArrayDestruct (&s->webseeds, (PtrArrayForeachFunc)tr_peerFree);
500  tr_ptrArrayDestruct (&s->pool, (PtrArrayForeachFunc)tr_free);
501  tr_ptrArrayDestruct (&s->outgoingHandshakes, NULL);
502  tr_ptrArrayDestruct (&s->peers, NULL);
503
504  replicationFree (s);
505
506  tr_free (s->requests);
507  tr_free (s->pieces);
508  tr_free (s);
509}
510
511static void peerCallbackFunc (tr_peer *, const tr_peer_event *, void *);
512
513static void
514rebuildWebseedArray (tr_swarm * s, tr_torrent * tor)
515{
516  unsigned int i;
517  const tr_info * inf = &tor->info;
518
519  /* clear the array */
520  tr_ptrArrayDestruct (&s->webseeds, (PtrArrayForeachFunc)tr_peerFree);
521  s->webseeds = TR_PTR_ARRAY_INIT;
522
523  /* repopulate it */
524  for (i=0; i<inf->webseedCount; ++i)
525    {
526      tr_webseed * w = tr_webseedNew (tor, inf->webseeds[i], peerCallbackFunc, s);
527      tr_ptrArrayAppend (&s->webseeds, w);
528    }
529}
530
531static tr_swarm *
532swarmNew (tr_peerMgr * manager, tr_torrent * tor)
533{
534  tr_swarm * s;
535
536  s = tr_new0 (tr_swarm, 1);
537  s->manager = manager;
538  s->tor = tor;
539  s->pool = TR_PTR_ARRAY_INIT;
540  s->peers = TR_PTR_ARRAY_INIT;
541  s->webseeds = TR_PTR_ARRAY_INIT;
542  s->outgoingHandshakes = TR_PTR_ARRAY_INIT;
543
544  rebuildWebseedArray (s, tor);
545
546  return s;
547}
548
549static void ensureMgrTimersExist (struct tr_peerMgr * m);
550
551tr_peerMgr*
552tr_peerMgrNew (tr_session * session)
553{
554  tr_peerMgr * m = tr_new0 (tr_peerMgr, 1);
555  m->session = session;
556  m->incomingHandshakes = TR_PTR_ARRAY_INIT;
557  ensureMgrTimersExist (m);
558  return m;
559}
560
561static void
562deleteTimer (struct event ** t)
563{
564  if (*t != NULL)
565    {
566      event_free (*t);
567      *t = NULL;
568    }
569}
570
571static void
572deleteTimers (struct tr_peerMgr * m)
573{
574  deleteTimer (&m->atomTimer);
575  deleteTimer (&m->bandwidthTimer);
576  deleteTimer (&m->rechokeTimer);
577  deleteTimer (&m->refillUpkeepTimer);
578}
579
580void
581tr_peerMgrFree (tr_peerMgr * manager)
582{
583  managerLock (manager);
584
585  deleteTimers (manager);
586
587  /* free the handshakes. Abort invokes handshakeDoneCB (), which removes
588   * the item from manager->handshakes, so this is a little roundabout... */
589  while (!tr_ptrArrayEmpty (&manager->incomingHandshakes))
590    tr_handshakeAbort (tr_ptrArrayNth (&manager->incomingHandshakes, 0));
591
592  tr_ptrArrayDestruct (&manager->incomingHandshakes, NULL);
593
594  managerUnlock (manager);
595  tr_free (manager);
596}
597
598static int
599clientIsDownloadingFrom (const tr_torrent * tor, const tr_peerMsgs * p)
600{
601  if (!tr_torrentHasMetadata (tor))
602    return true;
603
604  return tr_peerMsgsIsClientInterested (p) && !tr_peerMsgsIsClientChoked (p);
605}
606
607static int
608clientIsUploadingTo (const tr_peerMsgs * p)
609{
610  return tr_peerMsgsIsPeerInterested (p) && !tr_peerMsgsIsPeerChoked (p);
611}
612
613/***
614****
615***/
616
617void
618tr_peerMgrOnBlocklistChanged (tr_peerMgr * mgr)
619{
620  tr_torrent * tor = NULL;
621  tr_session * session = mgr->session;
622
623  /* we cache whether or not a peer is blocklisted...
624     since the blocklist has changed, erase that cached value */
625  while ((tor = tr_torrentNext (session, tor)))
626    {
627      int i;
628      tr_swarm * s = tor->swarm;
629      const int n = tr_ptrArraySize (&s->pool);
630      for (i=0; i<n; ++i)
631        {
632          struct peer_atom * atom = tr_ptrArrayNth (&s->pool, i);
633          atom->blocklisted = -1;
634        }
635    }
636}
637
638static bool
639isAtomBlocklisted (tr_session * session, struct peer_atom * atom)
640{
641  if (atom->blocklisted < 0)
642    atom->blocklisted = tr_sessionIsAddressBlocked (session, &atom->addr);
643
644  assert (tr_isBool (atom->blocklisted));
645  return atom->blocklisted;
646}
647
648
649/***
650****
651***/
652
653static void
654atomSetSeedProbability (struct peer_atom * atom, int seedProbability)
655{
656  assert (atom != NULL);
657  assert (-1<=seedProbability && seedProbability<=100);
658
659  atom->seedProbability = seedProbability;
660
661  if (seedProbability == 100)
662    atom->flags |= ADDED_F_SEED_FLAG;
663  else if (seedProbability != -1)
664    atom->flags &= ~ADDED_F_SEED_FLAG;
665}
666
667static inline bool
668atomIsSeed (const struct peer_atom * atom)
669{
670  return atom->seedProbability == 100;
671}
672
673static void
674atomSetSeed (const tr_swarm * s, struct peer_atom * atom)
675{
676  if (!atomIsSeed (atom))
677    {
678      tordbg (s, "marking peer %s as a seed", tr_atomAddrStr (atom));
679
680      atomSetSeedProbability (atom, 100);
681    }
682}
683
684
685bool
686tr_peerMgrPeerIsSeed (const tr_torrent  * tor,
687                      const tr_address  * addr)
688{
689  bool isSeed = false;
690  const tr_swarm * s = tor->swarm;
691  const struct peer_atom * atom = getExistingAtom (s, addr);
692
693  if (atom)
694    isSeed = atomIsSeed (atom);
695
696  return isSeed;
697}
698
699void
700tr_peerMgrSetUtpSupported (tr_torrent * tor, const tr_address * addr)
701{
702  struct peer_atom * atom = getExistingAtom (tor->swarm, addr);
703
704  if (atom)
705    atom->flags |= ADDED_F_UTP_FLAGS;
706}
707
708void
709tr_peerMgrSetUtpFailed (tr_torrent *tor, const tr_address *addr, bool failed)
710{
711  struct peer_atom * atom = getExistingAtom (tor->swarm, addr);
712
713  if (atom)
714    atom->utp_failed = failed;
715}
716
717
718/**
719***  REQUESTS
720***
721*** There are two data structures associated with managing block requests:
722***
723*** 1. tr_swarm::requests, an array of "struct block_request" which keeps
724***    track of which blocks have been requested, and when, and by which peers.
725***    This is list is used for (a) cancelling requests that have been pending
726***    for too long and (b) avoiding duplicate requests before endgame.
727***
728*** 2. tr_swarm::pieces, an array of "struct weighted_piece" which lists the
729***    pieces that we want to request. It's used to decide which blocks to
730***    return next when tr_peerMgrGetBlockRequests () is called.
731**/
732
733/**
734*** struct block_request
735**/
736
737static int
738compareReqByBlock (const void * va, const void * vb)
739{
740  const struct block_request * a = va;
741  const struct block_request * b = vb;
742
743  /* primary key: block */
744  if (a->block < b->block) return -1;
745  if (a->block > b->block) return 1;
746
747  /* secondary key: peer */
748  if (a->peer < b->peer) return -1;
749  if (a->peer > b->peer) return 1;
750
751  return 0;
752}
753
754static void
755requestListAdd (tr_swarm * s, tr_block_index_t block, tr_peer * peer)
756{
757  struct block_request key;
758
759  /* ensure enough room is available... */
760  if (s->requestCount + 1 >= s->requestAlloc)
761    {
762      const int CHUNK_SIZE = 128;
763      s->requestAlloc += CHUNK_SIZE;
764      s->requests = tr_renew (struct block_request,
765                              s->requests, s->requestAlloc);
766    }
767
768  /* populate the record we're inserting */
769  key.block = block;
770  key.peer = peer;
771  key.sentAt = tr_time ();
772
773  /* insert the request to our array... */
774  {
775    bool exact;
776    const int pos = tr_lowerBound (&key, s->requests, s->requestCount,
777                                   sizeof (struct block_request),
778                                   compareReqByBlock, &exact);
779    assert (!exact);
780    memmove (s->requests + pos + 1,
781             s->requests + pos,
782             sizeof (struct block_request) * (s->requestCount++ - pos));
783    s->requests[pos] = key;
784  }
785
786  if (peer != NULL)
787    {
788      ++peer->pendingReqsToPeer;
789      assert (peer->pendingReqsToPeer >= 0);
790    }
791
792  /*fprintf (stderr, "added request of block %lu from peer %s... "
793                     "there are now %d block\n",
794                     (unsigned long)block, tr_atomAddrStr (peer->atom), s->requestCount);*/
795}
796
797static struct block_request *
798requestListLookup (tr_swarm * s, tr_block_index_t block, const tr_peer * peer)
799{
800  struct block_request key;
801  key.block = block;
802  key.peer = (tr_peer*) peer;
803
804  return bsearch (&key, s->requests, s->requestCount,
805                  sizeof (struct block_request),
806                  compareReqByBlock);
807}
808
809/**
810 * Find the peers are we currently requesting the block
811 * with index @a block from and append them to @a peerArr.
812 */
813static void
814getBlockRequestPeers (tr_swarm * s, tr_block_index_t block,
815                      tr_ptrArray * peerArr)
816{
817  bool exact;
818  int i, pos;
819  struct block_request key;
820
821  key.block = block;
822  key.peer = NULL;
823  pos = tr_lowerBound (&key, s->requests, s->requestCount,
824                       sizeof (struct block_request),
825                       compareReqByBlock, &exact);
826
827  assert (!exact); /* shouldn't have a request with .peer == NULL */
828
829  for (i=pos; i<s->requestCount; ++i)
830    {
831      if (s->requests[i].block != block)
832        break;
833      tr_ptrArrayAppend (peerArr, s->requests[i].peer);
834    }
835}
836
837static void
838decrementPendingReqCount (const struct block_request * b)
839{
840  if (b->peer != NULL)
841    if (b->peer->pendingReqsToPeer > 0)
842      --b->peer->pendingReqsToPeer;
843}
844
845static void
846requestListRemove (tr_swarm * s, tr_block_index_t block, const tr_peer * peer)
847{
848  const struct block_request * b = requestListLookup (s, block, peer);
849
850  if (b != NULL)
851    {
852      const int pos = b - s->requests;
853      assert (pos < s->requestCount);
854
855      decrementPendingReqCount (b);
856
857      tr_removeElementFromArray (s->requests,
858                                 pos,
859                                 sizeof (struct block_request),
860                                 s->requestCount--);
861
862      /*fprintf (stderr, "removing request of block %lu from peer %s... "
863                         "there are now %d block requests left\n",
864                         (unsigned long)block, tr_atomAddrStr (peer->atom), t->requestCount);*/
865    }
866}
867
868static int
869countActiveWebseeds (tr_swarm * s)
870{
871  int i;
872  int activeCount = 0;
873  const int n = tr_ptrArraySize (&s->webseeds);
874  const uint64_t now = tr_time_msec ();
875
876  for (i=0; i<n; ++i)
877    if (tr_peerIsTransferringPieces (tr_ptrArrayNth(&s->webseeds,i), now, TR_DOWN, NULL))
878      ++activeCount;
879
880  return activeCount;
881}
882
883static bool
884testForEndgame (const tr_swarm * s)
885{
886  /* we consider ourselves to be in endgame if the number of bytes
887     we've got requested is >= the number of bytes left to download */
888  return (s->requestCount * s->tor->blockSize)
889               >= tr_cpLeftUntilDone (&s->tor->completion);
890}
891
892static void
893updateEndgame (tr_swarm * s)
894{
895  assert (s->requestCount >= 0);
896
897  if (!testForEndgame (s))
898    {
899      /* not in endgame */
900      s->endgame = 0;
901    }
902  else if (!s->endgame) /* only recalculate when endgame first begins */
903    {
904      int i;
905      int numDownloading = 0;
906      const int n = tr_ptrArraySize (&s->peers);
907
908      /* add the active bittorrent peers... */
909      for (i=0; i<n; ++i)
910        {
911          const tr_peer * p = tr_ptrArrayNth (&s->peers, i);
912          if (p->pendingReqsToPeer > 0)
913            ++numDownloading;
914        }
915
916      /* add the active webseeds... */
917      numDownloading += countActiveWebseeds (s);
918
919      /* average number of pending requests per downloading peer */
920      s->endgame = s->requestCount / MAX (numDownloading, 1);
921    }
922}
923
924
925/****
926*****
927*****  Piece List Manipulation / Accessors
928*****
929****/
930
931static inline void
932invalidatePieceSorting (tr_swarm * s)
933{
934  s->pieceSortState = PIECES_UNSORTED;
935}
936
937static const tr_torrent * weightTorrent;
938
939static const uint16_t * weightReplication;
940
941static void
942setComparePieceByWeightTorrent (tr_swarm * s)
943{
944  if (!replicationExists (s))
945    replicationNew (s);
946
947  weightTorrent = s->tor;
948  weightReplication = s->pieceReplication;
949}
950
951/* we try to create a "weight" s.t. high-priority pieces come before others,
952 * and that partially-complete pieces come before empty ones. */
953static int
954comparePieceByWeight (const void * va, const void * vb)
955{
956  const struct weighted_piece * a = va;
957  const struct weighted_piece * b = vb;
958  int ia, ib, missing, pending;
959  const tr_torrent * tor = weightTorrent;
960  const uint16_t * rep = weightReplication;
961
962  /* primary key: weight */
963  missing = tr_cpMissingBlocksInPiece (&tor->completion, a->index);
964  pending = a->requestCount;
965  ia = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
966  missing = tr_cpMissingBlocksInPiece (&tor->completion, b->index);
967  pending = b->requestCount;
968  ib = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
969  if (ia < ib) return -1;
970  if (ia > ib) return 1;
971
972  /* secondary key: higher priorities go first */
973  ia = tor->info.pieces[a->index].priority;
974  ib = tor->info.pieces[b->index].priority;
975  if (ia > ib) return -1;
976  if (ia < ib) return 1;
977
978  /* tertiary key: rarest first. */
979  ia = rep[a->index];
980  ib = rep[b->index];
981  if (ia < ib) return -1;
982  if (ia > ib) return 1;
983
984  /* quaternary key: random */
985  if (a->salt < b->salt) return -1;
986  if (a->salt > b->salt) return 1;
987
988  /* okay, they're equal */
989  return 0;
990}
991
992static int
993comparePieceByIndex (const void * va, const void * vb)
994{
995  const struct weighted_piece * a = va;
996  const struct weighted_piece * b = vb;
997  if (a->index < b->index) return -1;
998  if (a->index > b->index) return 1;
999  return 0;
1000}
1001
1002static void
1003pieceListSort (tr_swarm * s, enum piece_sort_state state)
1004{
1005  assert (state==PIECES_SORTED_BY_INDEX
1006       || state==PIECES_SORTED_BY_WEIGHT);
1007
1008  if (state == PIECES_SORTED_BY_WEIGHT)
1009    {
1010      setComparePieceByWeightTorrent (s);
1011      qsort (s->pieces, s->pieceCount, sizeof (struct weighted_piece), comparePieceByWeight);
1012    }
1013  else
1014    {
1015      qsort (s->pieces, s->pieceCount, sizeof (struct weighted_piece), comparePieceByIndex);
1016    }
1017
1018  s->pieceSortState = state;
1019}
1020
1021/**
1022 * These functions are useful for testing, but too expensive for nightly builds.
1023 * let's leave it disabled but add an easy hook to compile it back in
1024 */
1025#if 1
1026#define assertWeightedPiecesAreSorted(t)
1027#define assertReplicationCountIsExact(t)
1028#else
1029static void
1030assertWeightedPiecesAreSorted (Torrent * t)
1031{
1032    if (!t->endgame)
1033    {
1034        int i;
1035        setComparePieceByWeightTorrent (t);
1036        for (i=0; i<t->pieceCount-1; ++i)
1037            assert (comparePieceByWeight (&t->pieces[i], &t->pieces[i+1]) <= 0);
1038    }
1039}
1040static void
1041assertReplicationCountIsExact (Torrent * t)
1042{
1043    /* This assert might fail due to errors of implementations in other
1044     * clients. It happens when receiving duplicate bitfields/HaveAll/HaveNone
1045     * from a client. If a such a behavior is noticed,
1046     * a bug report should be filled to the faulty client. */
1047
1048    size_t piece_i;
1049    const uint16_t * rep = t->pieceReplication;
1050    const size_t piece_count = t->pieceReplicationSize;
1051    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase (&t->peers);
1052    const int peer_count = tr_ptrArraySize (&t->peers);
1053
1054    assert (piece_count == t->tor->info.pieceCount);
1055
1056    for (piece_i=0; piece_i<piece_count; ++piece_i)
1057    {
1058        int peer_i;
1059        uint16_t r = 0;
1060
1061        for (peer_i=0; peer_i<peer_count; ++peer_i)
1062            if (tr_bitsetHas (&peers[peer_i]->have, piece_i))
1063                ++r;
1064
1065        assert (rep[piece_i] == r);
1066    }
1067}
1068#endif
1069
1070static struct weighted_piece *
1071pieceListLookup (tr_swarm * s, tr_piece_index_t index)
1072{
1073  int i;
1074
1075  for (i=0; i<s->pieceCount; ++i)
1076    if (s->pieces[i].index == index)
1077      return &s->pieces[i];
1078
1079  return NULL;
1080}
1081
1082static void
1083pieceListRebuild (tr_swarm * s)
1084{
1085  if (!tr_torrentIsSeed (s->tor))
1086    {
1087      tr_piece_index_t i;
1088      tr_piece_index_t * pool;
1089      tr_piece_index_t poolCount = 0;
1090      const tr_torrent * tor = s->tor;
1091      const tr_info * inf = tr_torrentInfo (tor);
1092      struct weighted_piece * pieces;
1093      int pieceCount;
1094
1095      /* build the new list */
1096      pool = tr_new (tr_piece_index_t, inf->pieceCount);
1097      for (i=0; i<inf->pieceCount; ++i)
1098        if (!inf->pieces[i].dnd)
1099          if (!tr_cpPieceIsComplete (&tor->completion, i))
1100            pool[poolCount++] = i;
1101      pieceCount = poolCount;
1102      pieces = tr_new0 (struct weighted_piece, pieceCount);
1103      for (i=0; i<poolCount; ++i)
1104        {
1105          struct weighted_piece * piece = pieces + i;
1106          piece->index = pool[i];
1107          piece->requestCount = 0;
1108          piece->salt = tr_cryptoWeakRandInt (4096);
1109        }
1110
1111      /* if we already had a list of pieces, merge it into
1112       * the new list so we don't lose its requestCounts */
1113      if (s->pieces != NULL)
1114        {
1115          struct weighted_piece * o = s->pieces;
1116          struct weighted_piece * oend = o + s->pieceCount;
1117          struct weighted_piece * n = pieces;
1118          struct weighted_piece * nend = n + pieceCount;
1119
1120          pieceListSort (s, PIECES_SORTED_BY_INDEX);
1121
1122          while (o!=oend && n!=nend)
1123            {
1124              if (o->index < n->index)
1125                ++o;
1126              else if (o->index > n->index)
1127                ++n;
1128              else
1129                *n++ = *o++;
1130            }
1131
1132          tr_free (s->pieces);
1133        }
1134
1135      s->pieces = pieces;
1136      s->pieceCount = pieceCount;
1137
1138      pieceListSort (s, PIECES_SORTED_BY_WEIGHT);
1139
1140      /* cleanup */
1141      tr_free (pool);
1142    }
1143}
1144
1145static void
1146pieceListRemovePiece (tr_swarm * s, tr_piece_index_t piece)
1147{
1148  struct weighted_piece * p;
1149
1150  if ((p = pieceListLookup (s, piece)))
1151    {
1152      const int pos = p - s->pieces;
1153
1154      tr_removeElementFromArray (s->pieces,
1155                                 pos,
1156                                 sizeof (struct weighted_piece),
1157                                 s->pieceCount--);
1158
1159      if (s->pieceCount == 0)
1160        {
1161          tr_free (s->pieces);
1162          s->pieces = NULL;
1163        }
1164    }
1165}
1166
1167static void
1168pieceListResortPiece (tr_swarm * s, struct weighted_piece * p)
1169{
1170  int pos;
1171  bool isSorted = true;
1172
1173  if (p == NULL)
1174    return;
1175
1176  /* is the torrent already sorted? */
1177  pos = p - s->pieces;
1178  setComparePieceByWeightTorrent (s);
1179  if (isSorted && (pos > 0) && (comparePieceByWeight (p-1, p) > 0))
1180    isSorted = false;
1181  if (isSorted && (pos < s->pieceCount - 1) && (comparePieceByWeight (p, p+1) > 0))
1182    isSorted = false;
1183
1184  if (s->pieceSortState != PIECES_SORTED_BY_WEIGHT)
1185    {
1186      pieceListSort (s, PIECES_SORTED_BY_WEIGHT);
1187      isSorted = true;
1188    }
1189
1190  /* if it's not sorted, move it around */
1191  if (!isSorted)
1192    {
1193      bool exact;
1194      const struct weighted_piece tmp = *p;
1195
1196      tr_removeElementFromArray (s->pieces,
1197                                 pos,
1198                                 sizeof (struct weighted_piece),
1199                                 s->pieceCount--);
1200
1201      pos = tr_lowerBound (&tmp, s->pieces, s->pieceCount,
1202                           sizeof (struct weighted_piece),
1203                           comparePieceByWeight, &exact);
1204
1205      memmove (&s->pieces[pos + 1],
1206               &s->pieces[pos],
1207               sizeof (struct weighted_piece) * (s->pieceCount++ - pos));
1208
1209      s->pieces[pos] = tmp;
1210    }
1211
1212  assertWeightedPiecesAreSorted (s);
1213}
1214
1215static void
1216pieceListRemoveRequest (tr_swarm * s, tr_block_index_t block)
1217{
1218  struct weighted_piece * p;
1219  const tr_piece_index_t index = tr_torBlockPiece (s->tor, block);
1220
1221  if (((p = pieceListLookup (s, index))) && (p->requestCount > 0))
1222    {
1223      --p->requestCount;
1224      pieceListResortPiece (s, p);
1225    }
1226}
1227
1228
1229/****
1230*****
1231*****  Replication count (for rarest first policy)
1232*****
1233****/
1234
1235/**
1236 * Increase the replication count of this piece and sort it if the
1237 * piece list is already sorted
1238 */
1239static void
1240tr_incrReplicationOfPiece (tr_swarm * s, const size_t index)
1241{
1242  assert (replicationExists (s));
1243  assert (s->pieceReplicationSize == s->tor->info.pieceCount);
1244
1245  /* One more replication of this piece is present in the swarm */
1246  ++s->pieceReplication[index];
1247
1248  /* we only resort the piece if the list is already sorted */
1249  if (s->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1250    pieceListResortPiece (s, pieceListLookup (s, index));
1251}
1252
1253/**
1254 * Increases the replication count of pieces present in the bitfield
1255 */
1256static void
1257tr_incrReplicationFromBitfield (tr_swarm * s, const tr_bitfield * b)
1258{
1259  size_t i;
1260  uint16_t * rep = s->pieceReplication;
1261  const size_t n = s->tor->info.pieceCount;
1262
1263  assert (replicationExists (s));
1264
1265  for (i=0; i<n; ++i)
1266    if (tr_bitfieldHas (b, i))
1267      ++rep[i];
1268
1269  if (s->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1270    invalidatePieceSorting (s);
1271}
1272
1273/**
1274 * Increase the replication count of every piece
1275 */
1276static void
1277tr_incrReplication (tr_swarm * s)
1278{
1279  int i;
1280  const int n = s->pieceReplicationSize;
1281
1282  assert (replicationExists (s));
1283  assert (s->pieceReplicationSize == s->tor->info.pieceCount);
1284
1285  for (i=0; i<n; ++i)
1286    ++s->pieceReplication[i];
1287}
1288
1289/**
1290 * Decrease the replication count of pieces present in the bitset.
1291 */
1292static void
1293tr_decrReplicationFromBitfield (tr_swarm * s, const tr_bitfield * b)
1294{
1295  int i;
1296  const int n = s->pieceReplicationSize;
1297
1298  assert (replicationExists (s));
1299  assert (s->pieceReplicationSize == s->tor->info.pieceCount);
1300
1301  if (tr_bitfieldHasAll (b))
1302    {
1303      for (i=0; i<n; ++i)
1304        --s->pieceReplication[i];
1305    }
1306  else if (!tr_bitfieldHasNone (b))
1307    {
1308      for (i=0; i<n; ++i)
1309        if (tr_bitfieldHas (b, i))
1310          --s->pieceReplication[i];
1311
1312      if (s->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1313        invalidatePieceSorting (s);
1314    }
1315}
1316
1317/**
1318***
1319**/
1320
1321void
1322tr_peerMgrRebuildRequests (tr_torrent * tor)
1323{
1324  assert (tr_isTorrent (tor));
1325
1326  pieceListRebuild (tor->swarm);
1327}
1328
1329void
1330tr_peerMgrGetNextRequests (tr_torrent           * tor,
1331                           tr_peer              * peer,
1332                           int                    numwant,
1333                           tr_block_index_t     * setme,
1334                           int                  * numgot,
1335                           bool                   get_intervals)
1336{
1337  int i;
1338  int got;
1339  tr_swarm * s;
1340  struct weighted_piece * pieces;
1341  const tr_bitfield * const have = &peer->have;
1342
1343  /* sanity clause */
1344  assert (tr_isTorrent (tor));
1345  assert (numwant > 0);
1346
1347  /* walk through the pieces and find blocks that should be requested */
1348  got = 0;
1349  s = tor->swarm;
1350
1351  /* prep the pieces list */
1352  if (s->pieces == NULL)
1353    pieceListRebuild (s);
1354
1355  if (s->pieceSortState != PIECES_SORTED_BY_WEIGHT)
1356    pieceListSort (s, PIECES_SORTED_BY_WEIGHT);
1357
1358  assertReplicationCountIsExact (s);
1359  assertWeightedPiecesAreSorted (s);
1360
1361  updateEndgame (s);
1362  pieces = s->pieces;
1363  for (i=0; i<s->pieceCount && got<numwant; ++i)
1364    {
1365      struct weighted_piece * p = pieces + i;
1366
1367      /* if the peer has this piece that we want... */
1368      if (tr_bitfieldHas (have, p->index))
1369        {
1370          tr_block_index_t b;
1371          tr_block_index_t first;
1372          tr_block_index_t last;
1373          tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1374
1375          tr_torGetPieceBlockRange (tor, p->index, &first, &last);
1376
1377          for (b=first; b<=last && (got<numwant || (get_intervals && setme[2*got-1] == b-1)); ++b)
1378            {
1379              int peerCount;
1380              tr_peer ** peers;
1381
1382              /* don't request blocks we've already got */
1383              if (tr_cpBlockIsComplete (&tor->completion, b))
1384                continue;
1385
1386              /* always add peer if this block has no peers yet */
1387              tr_ptrArrayClear (&peerArr);
1388              getBlockRequestPeers (s, b, &peerArr);
1389              peers = (tr_peer **) tr_ptrArrayPeek (&peerArr, &peerCount);
1390              if (peerCount != 0)
1391                {
1392                  /* don't make a second block request until the endgame */
1393                  if (!s->endgame)
1394                    continue;
1395
1396                  /* don't have more than two peers requesting this block */
1397                  if (peerCount > 1)
1398                    continue;
1399
1400                  /* don't send the same request to the same peer twice */
1401                  if (peer == peers[0])
1402                    continue;
1403
1404                  /* in the endgame allow an additional peer to download a
1405                     block but only if the peer seems to be handling requests
1406                     relatively fast */
1407                  if (peer->pendingReqsToPeer + numwant - got < s->endgame)
1408                    continue;
1409                }
1410
1411              /* update the caller's table */
1412              if (!get_intervals)
1413                {
1414                  setme[got++] = b;
1415                }
1416              /* if intervals are requested two array entries are necessarry:
1417                 one for the interval's starting block and one for its end block */
1418              else if (got && setme[2 * got - 1] == b - 1 && b != first)
1419                {
1420                  /* expand the last interval */
1421                  ++setme[2 * got - 1];
1422                }
1423              else
1424                {
1425                  /* begin a new interval */
1426                  setme[2 * got] = setme[2 * got + 1] = b;
1427                  ++got;
1428                }
1429
1430              /* update our own tables */
1431              requestListAdd (s, b, peer);
1432              ++p->requestCount;
1433            }
1434
1435          tr_ptrArrayDestruct (&peerArr, NULL);
1436        }
1437    }
1438
1439  /* In most cases we've just changed the weights of a small number of pieces.
1440   * So rather than qsort ()ing the entire array, it's faster to apply an
1441   * adaptive insertion sort algorithm. */
1442  if (got > 0)
1443    {
1444      /* not enough requests || last piece modified */
1445      if (i == s->pieceCount)
1446        --i;
1447
1448      setComparePieceByWeightTorrent (s);
1449      while (--i >= 0)
1450        {
1451          bool exact;
1452
1453          /* relative position! */
1454          const int newpos = tr_lowerBound (&s->pieces[i], &s->pieces[i + 1],
1455                                            s->pieceCount - (i + 1),
1456                                            sizeof (struct weighted_piece),
1457                                            comparePieceByWeight, &exact);
1458          if (newpos > 0)
1459            {
1460              const struct weighted_piece piece = s->pieces[i];
1461              memmove (&s->pieces[i],
1462                       &s->pieces[i + 1],
1463                       sizeof (struct weighted_piece) * (newpos));
1464              s->pieces[i + newpos] = piece;
1465            }
1466        }
1467    }
1468
1469  assertWeightedPiecesAreSorted (t);
1470  *numgot = got;
1471}
1472
1473bool
1474tr_peerMgrDidPeerRequest (const tr_torrent  * tor,
1475                          const tr_peer     * peer,
1476                          tr_block_index_t    block)
1477{
1478  return requestListLookup ((tr_swarm*)tor->swarm, block, peer) != NULL;
1479}
1480
1481/* cancel requests that are too old */
1482static void
1483refillUpkeep (int foo UNUSED, short bar UNUSED, void * vmgr)
1484{
1485    time_t now;
1486    time_t too_old;
1487    tr_torrent * tor;
1488    int cancel_buflen = 0;
1489    struct block_request * cancel = NULL;
1490    tr_peerMgr * mgr = vmgr;
1491    managerLock (mgr);
1492
1493    now = tr_time ();
1494    too_old = now - REQUEST_TTL_SECS;
1495
1496    /* alloc the temporary "cancel" buffer */
1497    tor = NULL;
1498    while ((tor = tr_torrentNext (mgr->session, tor)))
1499        cancel_buflen = MAX (cancel_buflen, tor->swarm->requestCount);
1500    if (cancel_buflen > 0)
1501        cancel = tr_new (struct block_request, cancel_buflen);
1502
1503    /* prune requests that are too old */
1504    tor = NULL;
1505    while ((tor = tr_torrentNext (mgr->session, tor)))
1506    {
1507        tr_swarm * s = tor->swarm;
1508        const int n = s->requestCount;
1509        if (n > 0)
1510        {
1511            int keepCount = 0;
1512            int cancelCount = 0;
1513            const struct block_request * it;
1514            const struct block_request * end;
1515
1516            for (it=s->requests, end=it+n; it!=end; ++it)
1517            {
1518                tr_peerMsgs * msgs = PEER_MSGS(it->peer);
1519
1520                if ((msgs !=NULL) && (it->sentAt <= too_old) && !tr_peerMsgsIsReadingBlock (msgs, it->block))
1521                    cancel[cancelCount++] = *it;
1522                else
1523                {
1524                    if (it != &s->requests[keepCount])
1525                        s->requests[keepCount] = *it;
1526                    keepCount++;
1527                }
1528            }
1529
1530            /* prune out the ones we aren't keeping */
1531            s->requestCount = keepCount;
1532
1533            /* send cancel messages for all the "cancel" ones */
1534            for (it=cancel, end=it+cancelCount; it!=end; ++it)
1535            {
1536              tr_peerMsgs * msgs = PEER_MSGS(it->peer);
1537
1538              if (msgs != NULL)
1539                {
1540                  tr_historyAdd (&it->peer->cancelsSentToPeer, now, 1);
1541                  tr_peerMsgsCancel (msgs, it->block);
1542                  decrementPendingReqCount (it);
1543                }
1544            }
1545
1546            /* decrement the pending request counts for the timed-out blocks */
1547            for (it=cancel, end=it+cancelCount; it!=end; ++it)
1548                pieceListRemoveRequest (s, it->block);
1549        }
1550    }
1551
1552    tr_free (cancel);
1553    tr_timerAddMsec (mgr->refillUpkeepTimer, REFILL_UPKEEP_PERIOD_MSEC);
1554    managerUnlock (mgr);
1555}
1556
1557static void
1558addStrike (tr_swarm * s, tr_peer * peer)
1559{
1560  tordbg (s, "increasing peer %s strike count to %d",
1561          tr_atomAddrStr (peer->atom), peer->strikes + 1);
1562
1563  if (++peer->strikes >= MAX_BAD_PIECES_PER_PEER)
1564    {
1565      struct peer_atom * atom = peer->atom;
1566      atom->flags2 |= MYFLAG_BANNED;
1567      peer->doPurge = 1;
1568      tordbg (s, "banning peer %s", tr_atomAddrStr (atom));
1569    }
1570}
1571
1572static void
1573peerSuggestedPiece (tr_swarm           * s UNUSED,
1574                    tr_peer            * peer UNUSED,
1575                    tr_piece_index_t     pieceIndex UNUSED,
1576                    int                  isFastAllowed UNUSED)
1577{
1578#if 0
1579    assert (t);
1580    assert (peer);
1581    assert (peer->msgs);
1582
1583    /* is this a valid piece? */
1584    if (pieceIndex >= t->tor->info.pieceCount)
1585        return;
1586
1587    /* don't ask for it if we've already got it */
1588    if (tr_cpPieceIsComplete (t->tor->completion, pieceIndex))
1589        return;
1590
1591    /* don't ask for it if they don't have it */
1592    if (!tr_bitfieldHas (peer->have, pieceIndex))
1593        return;
1594
1595    /* don't ask for it if we're choked and it's not fast */
1596    if (!isFastAllowed && peer->clientIsChoked)
1597        return;
1598
1599    /* request the blocks that we don't have in this piece */
1600    {
1601        tr_block_index_t b;
1602        tr_block_index_t first;
1603        tr_block_index_t last;
1604        const tr_torrent * tor = t->tor;
1605
1606        tr_torGetPieceBlockRange (t->tor, pieceIndex, &first, &last);
1607
1608        for (b=first; b<=last; ++b)
1609        {
1610            if (!tr_cpBlockIsComplete (tor->completion, b))
1611            {
1612                const uint32_t offset = getBlockOffsetInPiece (tor, b);
1613                const uint32_t length = tr_torBlockCountBytes (tor, b);
1614                tr_peerMsgsAddRequest (peer->msgs, pieceIndex, offset, length);
1615                incrementPieceRequests (t, pieceIndex);
1616            }
1617        }
1618    }
1619#endif
1620}
1621
1622static void
1623removeRequestFromTables (tr_swarm * s, tr_block_index_t block, const tr_peer * peer)
1624{
1625  requestListRemove (s, block, peer);
1626  pieceListRemoveRequest (s, block);
1627}
1628
1629/* peer choked us, or maybe it disconnected.
1630   either way we need to remove all its requests */
1631static void
1632peerDeclinedAllRequests (tr_swarm * s, const tr_peer * peer)
1633{
1634  int i, n;
1635  tr_block_index_t * blocks = tr_new (tr_block_index_t, s->requestCount);
1636
1637  for (i=n=0; i<s->requestCount; ++i)
1638    if (peer == s->requests[i].peer)
1639      blocks[n++] = s->requests[i].block;
1640
1641  for (i=0; i<n; ++i)
1642    removeRequestFromTables (s, blocks[i], peer);
1643
1644  tr_free (blocks);
1645}
1646
1647static void
1648cancelAllRequestsForBlock (tr_swarm          * s,
1649                           tr_block_index_t    block,
1650                           tr_peer           * no_notify)
1651{
1652  int i;
1653  int peerCount;
1654  tr_peer ** peers;
1655  tr_ptrArray peerArr;
1656
1657  peerArr = TR_PTR_ARRAY_INIT;
1658  getBlockRequestPeers (s, block, &peerArr);
1659  peers = (tr_peer **) tr_ptrArrayPeek (&peerArr, &peerCount);
1660  for (i=0; i<peerCount; ++i)
1661    {
1662      tr_peer * p = peers[i];
1663
1664      if ((p != no_notify) && (p != NULL))
1665        {
1666          tr_historyAdd (&p->cancelsSentToPeer, tr_time (), 1);
1667          tr_peerMsgsCancel (PEER_MSGS(p), block);
1668        }
1669
1670      removeRequestFromTables (s, block, p);
1671    }
1672
1673  tr_ptrArrayDestruct (&peerArr, NULL);
1674}
1675
1676void
1677tr_peerMgrPieceCompleted (tr_torrent * tor, tr_piece_index_t p)
1678{
1679  int i;
1680  bool pieceCameFromPeers = false;
1681  tr_swarm * const s = tor->swarm;
1682  const int n = tr_ptrArraySize (&s->peers);
1683
1684  /* walk through our peers */
1685  for (i=0; i<n; ++i)
1686    {
1687      tr_peer * peer = tr_ptrArrayNth (&s->peers, i);
1688
1689      /* notify the peer that we now have this piece */
1690      tr_peerMsgsHave (PEER_MSGS(peer), p);
1691
1692      if (!pieceCameFromPeers)
1693        pieceCameFromPeers = tr_bitfieldHas (&peer->blame, p);
1694    }
1695
1696  if (pieceCameFromPeers) /* webseed downloads don't belong in announce totals */
1697    tr_announcerAddBytes (tor, TR_ANN_DOWN, tr_torPieceCountBytes (tor, p));
1698
1699  /* bookkeeping */
1700  pieceListRemovePiece (s, p);
1701  s->needsCompletenessCheck = true;
1702}
1703
1704static void
1705peerCallbackFunc (tr_peer * peer, const tr_peer_event * e, void * vs)
1706{
1707  tr_swarm * s = vs;
1708
1709  swarmLock (s);
1710
1711  assert (peer != NULL);
1712
1713  switch (e->eventType)
1714    {
1715      case TR_PEER_PEER_GOT_PIECE_DATA:
1716        {
1717          const time_t now = tr_time ();
1718          tr_torrent * tor = s->tor;
1719
1720          tor->uploadedCur += e->length;
1721          tr_announcerAddBytes (tor, TR_ANN_UP, e->length);
1722          tr_torrentSetActivityDate (tor, now);
1723          tr_torrentSetDirty (tor);
1724          tr_statsAddUploaded (tor->session, e->length);
1725
1726          if (peer->atom != NULL)
1727            peer->atom->piece_data_time = now;
1728
1729          break;
1730        }
1731
1732      case TR_PEER_CLIENT_GOT_PIECE_DATA:
1733        {
1734          const time_t now = tr_time ();
1735          tr_torrent * tor = s->tor;
1736
1737          tor->downloadedCur += e->length;
1738          tr_torrentSetActivityDate (tor, now);
1739          tr_torrentSetDirty (tor);
1740
1741          tr_statsAddDownloaded (tor->session, e->length);
1742
1743          if (peer->atom != NULL)
1744            peer->atom->piece_data_time = now;
1745
1746          break;
1747        }
1748
1749      case TR_PEER_CLIENT_GOT_HAVE:
1750        if (replicationExists (s))
1751          {
1752            tr_incrReplicationOfPiece (s, e->pieceIndex);
1753            assertReplicationCountIsExact (s);
1754          }
1755        break;
1756
1757      case TR_PEER_CLIENT_GOT_HAVE_ALL:
1758        if (replicationExists (s))
1759          {
1760            tr_incrReplication (s);
1761            assertReplicationCountIsExact (s);
1762          }
1763        break;
1764
1765      case TR_PEER_CLIENT_GOT_HAVE_NONE:
1766        /* noop */
1767        break;
1768
1769      case TR_PEER_CLIENT_GOT_BITFIELD:
1770        assert (e->bitfield != NULL);
1771        if (replicationExists (s))
1772          {
1773            tr_incrReplicationFromBitfield (s, e->bitfield);
1774            assertReplicationCountIsExact (s);
1775          }
1776        break;
1777
1778      case TR_PEER_CLIENT_GOT_REJ:
1779        {
1780          tr_block_index_t b = _tr_block (s->tor, e->pieceIndex, e->offset);
1781          if (b < s->tor->blockCount)
1782            removeRequestFromTables (s, b, peer);
1783          else
1784            tordbg (s, "Peer %s sent an out-of-range reject message",
1785                    tr_atomAddrStr (peer->atom));
1786          break;
1787        }
1788
1789      case TR_PEER_CLIENT_GOT_CHOKE:
1790        peerDeclinedAllRequests (s, peer);
1791        break;
1792
1793      case TR_PEER_CLIENT_GOT_PORT:
1794        if (peer->atom)
1795          peer->atom->port = e->port;
1796        break;
1797
1798      case TR_PEER_CLIENT_GOT_SUGGEST:
1799        peerSuggestedPiece (s, peer, e->pieceIndex, false);
1800        break;
1801
1802      case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1803        peerSuggestedPiece (s, peer, e->pieceIndex, true);
1804        break;
1805
1806      case TR_PEER_CLIENT_GOT_BLOCK:
1807        {
1808          tr_torrent * tor = s->tor;
1809          const tr_piece_index_t p = e->pieceIndex;
1810          const tr_block_index_t block = _tr_block (tor, p, e->offset);
1811          cancelAllRequestsForBlock (s, block, peer);
1812          tr_historyAdd (&peer->blocksSentToClient, tr_time(), 1);
1813          pieceListResortPiece (s, pieceListLookup (s, p));
1814          tr_torrentGotBlock (tor, block);
1815          break;
1816        }
1817
1818      case TR_PEER_ERROR:
1819        if ((e->err == ERANGE) || (e->err == EMSGSIZE) || (e->err == ENOTCONN))
1820          {
1821            /* some protocol error from the peer */
1822            peer->doPurge = 1;
1823            tordbg (s, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1824                    tr_atomAddrStr (peer->atom));
1825          }
1826        else
1827          {
1828            tordbg (s, "unhandled error: %s", tr_strerror (e->err));
1829          }
1830        break;
1831
1832      default:
1833        assert (0);
1834    }
1835
1836  swarmUnlock (s);
1837}
1838
1839static int
1840getDefaultShelfLife (uint8_t from)
1841{
1842    /* in general, peers obtained from firsthand contact
1843     * are better than those from secondhand, etc etc */
1844    switch (from)
1845    {
1846        case TR_PEER_FROM_INCOMING : return 60 * 60 * 6;
1847        case TR_PEER_FROM_LTEP     : return 60 * 60 * 6;
1848        case TR_PEER_FROM_TRACKER  : return 60 * 60 * 3;
1849        case TR_PEER_FROM_DHT      : return 60 * 60 * 3;
1850        case TR_PEER_FROM_PEX      : return 60 * 60 * 2;
1851        case TR_PEER_FROM_RESUME   : return 60 * 60;
1852        case TR_PEER_FROM_LPD      : return 10 * 60;
1853        default                    : return 60 * 60;
1854    }
1855}
1856
1857static void
1858ensureAtomExists (tr_swarm          * s,
1859                  const tr_address  * addr,
1860                  const tr_port       port,
1861                  const uint8_t       flags,
1862                  const int8_t        seedProbability,
1863                  const uint8_t       from)
1864{
1865  struct peer_atom * a;
1866
1867  assert (tr_address_is_valid (addr));
1868  assert (from < TR_PEER_FROM__MAX);
1869
1870  a = getExistingAtom (s, addr);
1871
1872  if (a == NULL)
1873    {
1874      const int jitter = tr_cryptoWeakRandInt (60*10);
1875      a = tr_new0 (struct peer_atom, 1);
1876      a->addr = *addr;
1877      a->port = port;
1878      a->flags = flags;
1879      a->fromFirst = from;
1880      a->fromBest = from;
1881      a->shelf_date = tr_time () + getDefaultShelfLife (from) + jitter;
1882      a->blocklisted = -1;
1883      atomSetSeedProbability (a, seedProbability);
1884      tr_ptrArrayInsertSorted (&s->pool, a, compareAtomsByAddress);
1885
1886      tordbg (s, "got a new atom: %s", tr_atomAddrStr (a));
1887    }
1888  else
1889    {
1890      if (from < a->fromBest)
1891        a->fromBest = from;
1892
1893      if (a->seedProbability == -1)
1894        atomSetSeedProbability (a, seedProbability);
1895
1896      a->flags |= flags;
1897    }
1898}
1899
1900static int
1901getMaxPeerCount (const tr_torrent * tor)
1902{
1903  return tor->maxConnectedPeers;
1904}
1905
1906static int
1907getPeerCount (const tr_swarm * s)
1908{
1909  return tr_ptrArraySize (&s->peers);/* + tr_ptrArraySize (&t->outgoingHandshakes); */
1910}
1911
1912
1913static void
1914createBitTorrentPeer (tr_torrent       * tor,
1915                      struct tr_peerIo * io,
1916                      struct peer_atom * atom,
1917                      tr_quark           client)
1918{
1919  tr_peer * peer;
1920  tr_swarm * swarm;
1921
1922  assert (atom != NULL);
1923  assert (tr_isTorrent (tor));
1924  assert (tor->swarm != NULL);
1925
1926  swarm = tor->swarm;
1927
1928  peer = (tr_peer*) tr_peerMsgsNew (tor, io, peerCallbackFunc, swarm);
1929  peer->atom = atom;
1930  peer->client = client;
1931  atom->peer = peer;
1932
1933  tr_ptrArrayInsertSorted (&swarm->peers, peer, peerCompare);
1934}
1935
1936
1937/* FIXME: this is kind of a mess. */
1938static bool
1939myHandshakeDoneCB (tr_handshake  * handshake,
1940                   tr_peerIo     * io,
1941                   bool            readAnythingFromPeer,
1942                   bool            isConnected,
1943                   const uint8_t * peer_id,
1944                   void          * vmanager)
1945{
1946  bool ok = isConnected;
1947  bool success = false;
1948  tr_port port;
1949  const tr_address * addr;
1950  tr_peerMgr * manager = vmanager;
1951  tr_swarm  * s;
1952  tr_handshake * ours;
1953
1954  assert (io);
1955  assert (tr_isBool (ok));
1956
1957  s = tr_peerIoHasTorrentHash (io)
1958    ? getExistingSwarm (manager, tr_peerIoGetTorrentHash (io))
1959    : NULL;
1960
1961  if (tr_peerIoIsIncoming (io))
1962    ours = tr_ptrArrayRemoveSorted (&manager->incomingHandshakes,
1963                                    handshake, handshakeCompare);
1964  else if (s)
1965    ours = tr_ptrArrayRemoveSorted (&s->outgoingHandshakes,
1966                                    handshake, handshakeCompare);
1967  else
1968    ours = handshake;
1969
1970  assert (ours);
1971  assert (ours == handshake);
1972
1973  if (s)
1974    swarmLock (s);
1975
1976  addr = tr_peerIoGetAddress (io, &port);
1977
1978  if (!ok || !s || !s->isRunning)
1979    {
1980      if (s)
1981        {
1982          struct peer_atom * atom = getExistingAtom (s, addr);
1983          if (atom)
1984            {
1985              ++atom->numFails;
1986
1987              if (!readAnythingFromPeer)
1988                {
1989                  tordbg (s, "marking peer %s as unreachable... numFails is %d", tr_atomAddrStr (atom), (int)atom->numFails);
1990                  atom->flags2 |= MYFLAG_UNREACHABLE;
1991                }
1992            }
1993        }
1994    }
1995  else /* looking good */
1996    {
1997      struct peer_atom * atom;
1998
1999      ensureAtomExists (s, addr, port, 0, -1, TR_PEER_FROM_INCOMING);
2000      atom = getExistingAtom (s, addr);
2001      atom->time = tr_time ();
2002      atom->piece_data_time = 0;
2003      atom->lastConnectionAt = tr_time ();
2004
2005      if (!tr_peerIoIsIncoming (io))
2006        {
2007          atom->flags |= ADDED_F_CONNECTABLE;
2008          atom->flags2 &= ~MYFLAG_UNREACHABLE;
2009        }
2010
2011      /* In principle, this flag specifies whether the peer groks uTP,
2012         not whether it's currently connected over uTP. */
2013      if (io->utp_socket)
2014        atom->flags |= ADDED_F_UTP_FLAGS;
2015
2016      if (atom->flags2 & MYFLAG_BANNED)
2017        {
2018          tordbg (s, "banned peer %s tried to reconnect",
2019                  tr_atomAddrStr (atom));
2020        }
2021      else if (tr_peerIoIsIncoming (io) && (getPeerCount (s) >= getMaxPeerCount (s->tor)))
2022        {
2023        }
2024      else
2025        {
2026          tr_peer * peer = atom->peer;
2027
2028          if (peer) /* we already have this peer */
2029            {
2030            }
2031          else
2032            {
2033              tr_quark client;
2034              tr_peerIo * io;
2035              char buf[128];
2036
2037              if (peer_id != NULL)
2038                client = tr_quark_new (tr_clientForId (buf, sizeof (buf), peer_id), -1);
2039              else
2040                client = TR_KEY_NONE;
2041
2042              io = tr_handshakeStealIO (handshake); /* this steals its refcount too, which is
2043                                                       balanced by our unref in peerDelete ()  */
2044              tr_peerIoSetParent (io, &s->tor->bandwidth);
2045              createBitTorrentPeer (s->tor, io, atom, client);
2046
2047              success = true;
2048            }
2049        }
2050    }
2051
2052  if (s != NULL)
2053    swarmUnlock (s);
2054
2055  return success;
2056}
2057
2058void
2059tr_peerMgrAddIncoming (tr_peerMgr       * manager,
2060                       tr_address       * addr,
2061                       tr_port            port,
2062                       int                socket,
2063                       struct UTPSocket * utp_socket)
2064{
2065  tr_session * session;
2066
2067  managerLock (manager);
2068
2069  assert (tr_isSession (manager->session));
2070  session = manager->session;
2071
2072  if (tr_sessionIsAddressBlocked (session, addr))
2073    {
2074      tr_logAddDebug ("Banned IP address \"%s\" tried to connect to us", tr_address_to_string (addr));
2075      if (socket >= 0)
2076        tr_netClose (session, socket);
2077      else
2078        UTP_Close (utp_socket);
2079    }
2080  else if (getExistingHandshake (&manager->incomingHandshakes, addr))
2081    {
2082      if (socket >= 0)
2083        tr_netClose (session, socket);
2084      else
2085        UTP_Close (utp_socket);
2086    }
2087  else /* we don't have a connection to them yet... */
2088    {
2089      tr_peerIo *    io;
2090      tr_handshake * handshake;
2091
2092      io = tr_peerIoNewIncoming (session, &session->bandwidth, addr, port, socket, utp_socket);
2093
2094      handshake = tr_handshakeNew (io,
2095                                   session->encryptionMode,
2096                                   myHandshakeDoneCB,
2097                                   manager);
2098
2099      tr_peerIoUnref (io); /* balanced by the implicit ref in tr_peerIoNewIncoming () */
2100
2101      tr_ptrArrayInsertSorted (&manager->incomingHandshakes, handshake,
2102                               handshakeCompare);
2103    }
2104
2105  managerUnlock (manager);
2106}
2107
2108void
2109tr_peerMgrAddPex (tr_torrent * tor, uint8_t from,
2110                  const tr_pex * pex, int8_t seedProbability)
2111{
2112  if (tr_isPex (pex)) /* safeguard against corrupt data */
2113    {
2114      tr_swarm * s = tor->swarm;
2115      managerLock (s->manager);
2116
2117      if (!tr_sessionIsAddressBlocked (s->manager->session, &pex->addr))
2118        if (tr_address_is_valid_for_peers (&pex->addr, pex->port))
2119          ensureAtomExists (s, &pex->addr, pex->port, pex->flags, seedProbability, from);
2120
2121      managerUnlock (s->manager);
2122    }
2123}
2124
2125void
2126tr_peerMgrMarkAllAsSeeds (tr_torrent * tor)
2127{
2128  tr_swarm * s = tor->swarm;
2129  const int n = tr_ptrArraySize (&s->pool);
2130  struct peer_atom ** it = (struct peer_atom**) tr_ptrArrayBase (&s->pool);
2131  struct peer_atom ** end = it + n;
2132
2133  while (it != end)
2134    atomSetSeed (s, *it++);
2135}
2136
2137tr_pex *
2138tr_peerMgrCompactToPex (const void    * compact,
2139                        size_t          compactLen,
2140                        const uint8_t * added_f,
2141                        size_t          added_f_len,
2142                        size_t        * pexCount)
2143{
2144  size_t i;
2145  size_t n = compactLen / 6;
2146  const uint8_t * walk = compact;
2147  tr_pex * pex = tr_new0 (tr_pex, n);
2148
2149  for (i=0; i<n; ++i)
2150    {
2151      pex[i].addr.type = TR_AF_INET;
2152      memcpy (&pex[i].addr.addr, walk, 4); walk += 4;
2153      memcpy (&pex[i].port, walk, 2); walk += 2;
2154      if (added_f && (n == added_f_len))
2155        pex[i].flags = added_f[i];
2156    }
2157
2158  *pexCount = n;
2159  return pex;
2160}
2161
2162tr_pex *
2163tr_peerMgrCompact6ToPex (const void    * compact,
2164                         size_t          compactLen,
2165                         const uint8_t * added_f,
2166                         size_t          added_f_len,
2167                         size_t        * pexCount)
2168{
2169  size_t i;
2170  size_t n = compactLen / 18;
2171  const uint8_t * walk = compact;
2172  tr_pex * pex = tr_new0 (tr_pex, n);
2173
2174  for (i=0; i<n; ++i)
2175    {
2176      pex[i].addr.type = TR_AF_INET6;
2177      memcpy (&pex[i].addr.addr.addr6.s6_addr, walk, 16); walk += 16;
2178      memcpy (&pex[i].port, walk, 2); walk += 2;
2179      if (added_f && (n == added_f_len))
2180        pex[i].flags = added_f[i];
2181    }
2182
2183  *pexCount = n;
2184  return pex;
2185}
2186
2187tr_pex *
2188tr_peerMgrArrayToPex (const void  * array,
2189                      size_t        arrayLen,
2190                      size_t      * pexCount)
2191{
2192  size_t i;
2193  size_t n = arrayLen / (sizeof (tr_address) + 2);
2194  const uint8_t * walk = array;
2195  tr_pex * pex = tr_new0 (tr_pex, n);
2196
2197  for (i=0 ; i<n ; ++i)
2198    {
2199      memcpy (&pex[i].addr, walk, sizeof (tr_address));
2200      memcpy (&pex[i].port, walk + sizeof (tr_address), 2);
2201      pex[i].flags = 0x00;
2202      walk += sizeof (tr_address) + 2;
2203    }
2204
2205  *pexCount = n;
2206  return pex;
2207}
2208
2209/**
2210***
2211**/
2212
2213void
2214tr_peerMgrGotBadPiece (tr_torrent * tor, tr_piece_index_t pieceIndex)
2215{
2216  int i;
2217  int n;
2218  tr_swarm * s = tor->swarm;
2219  const uint32_t byteCount = tr_torPieceCountBytes (tor, pieceIndex);
2220
2221  for (i=0, n=tr_ptrArraySize(&s->peers); i!=n; ++i)
2222    {
2223      tr_peer * peer = tr_ptrArrayNth (&s->peers, i);
2224
2225      if (tr_bitfieldHas (&peer->blame, pieceIndex))
2226        {
2227          tordbg (s, "peer %s contributed to corrupt piece (%d); now has %d strikes",
2228                  tr_atomAddrStr(peer->atom), pieceIndex, (int)peer->strikes + 1);
2229          addStrike (s, peer);
2230        }
2231    }
2232
2233
2234  tr_announcerAddBytes (tor, TR_ANN_CORRUPT, byteCount);
2235}
2236
2237int
2238tr_pexCompare (const void * va, const void * vb)
2239{
2240  int i;
2241  const tr_pex * a = va;
2242  const tr_pex * b = vb;
2243
2244  assert (tr_isPex (a));
2245  assert (tr_isPex (b));
2246
2247  if ((i = tr_address_compare (&a->addr, &b->addr)))
2248    return i;
2249
2250  if (a->port != b->port)
2251    return a->port < b->port ? -1 : 1;
2252
2253  return 0;
2254}
2255
2256/* better goes first */
2257static int
2258compareAtomsByUsefulness (const void * va, const void *vb)
2259{
2260  const struct peer_atom * a = * (const struct peer_atom**) va;
2261  const struct peer_atom * b = * (const struct peer_atom**) vb;
2262
2263  assert (tr_isAtom (a));
2264  assert (tr_isAtom (b));
2265
2266  if (a->piece_data_time != b->piece_data_time)
2267    return a->piece_data_time > b->piece_data_time ? -1 : 1;
2268  if (a->fromBest != b->fromBest)
2269    return a->fromBest < b->fromBest ? -1 : 1;
2270  if (a->numFails != b->numFails)
2271    return a->numFails < b->numFails ? -1 : 1;
2272
2273  return 0;
2274}
2275
2276static bool
2277isAtomInteresting (const tr_torrent * tor, struct peer_atom * atom)
2278{
2279  if (tr_torrentIsSeed (tor) && atomIsSeed (atom))
2280    return false;
2281
2282  if (peerIsInUse (tor->swarm, atom))
2283    return true;
2284
2285  if (isAtomBlocklisted (tor->session, atom))
2286    return false;
2287
2288  if (atom->flags2 & MYFLAG_BANNED)
2289    return false;
2290
2291  return true;
2292}
2293
2294int
2295tr_peerMgrGetPeers (tr_torrent   * tor,
2296                    tr_pex      ** setme_pex,
2297                    uint8_t        af,
2298                    uint8_t        list_mode,
2299                    int            maxCount)
2300{
2301  int i;
2302  int n;
2303  int count = 0;
2304  int atomCount = 0;
2305  const tr_swarm * s = tor->swarm;
2306  struct peer_atom ** atoms = NULL;
2307  tr_pex * pex;
2308  tr_pex * walk;
2309
2310  assert (tr_isTorrent (tor));
2311  assert (setme_pex != NULL);
2312  assert (af==TR_AF_INET || af==TR_AF_INET6);
2313  assert (list_mode==TR_PEERS_CONNECTED || list_mode==TR_PEERS_INTERESTING);
2314
2315  managerLock (s->manager);
2316
2317  /**
2318  ***  build a list of atoms
2319  **/
2320
2321  if (list_mode == TR_PEERS_CONNECTED) /* connected peers only */
2322    {
2323      int i;
2324      const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase (&s->peers);
2325      atomCount = tr_ptrArraySize (&s->peers);
2326      atoms = tr_new (struct peer_atom *, atomCount);
2327      for (i=0; i<atomCount; ++i)
2328        atoms[i] = peers[i]->atom;
2329    }
2330  else /* TR_PEERS_INTERESTING */
2331    {
2332      int i;
2333      struct peer_atom ** atomBase = (struct peer_atom**) tr_ptrArrayBase (&s->pool);
2334      n = tr_ptrArraySize (&s->pool);
2335      atoms = tr_new (struct peer_atom *, n);
2336      for (i=0; i<n; ++i)
2337        if (isAtomInteresting (tor, atomBase[i]))
2338          atoms[atomCount++] = atomBase[i];
2339    }
2340
2341  qsort (atoms, atomCount, sizeof (struct peer_atom *), compareAtomsByUsefulness);
2342
2343  /**
2344  ***  add the first N of them into our return list
2345  **/
2346
2347  n = MIN (atomCount, maxCount);
2348  pex = walk = tr_new0 (tr_pex, n);
2349
2350  for (i=0; i<atomCount && count<n; ++i)
2351    {
2352      const struct peer_atom * atom = atoms[i];
2353      if (atom->addr.type == af)
2354        {
2355          assert (tr_address_is_valid (&atom->addr));
2356          walk->addr = atom->addr;
2357          walk->port = atom->port;
2358          walk->flags = atom->flags;
2359          ++count;
2360          ++walk;
2361        }
2362    }
2363
2364  qsort (pex, count, sizeof (tr_pex), tr_pexCompare);
2365
2366  assert ((walk - pex) == count);
2367  *setme_pex = pex;
2368
2369  /* cleanup */
2370  tr_free (atoms);
2371  managerUnlock (s->manager);
2372  return count;
2373}
2374
2375static void atomPulse    (int, short, void *);
2376static void bandwidthPulse (int, short, void *);
2377static void rechokePulse (int, short, void *);
2378static void reconnectPulse (int, short, void *);
2379
2380static struct event *
2381createTimer (tr_session * session, int msec, void (*callback)(int, short, void *), void * cbdata)
2382{
2383  struct event * timer = evtimer_new (session->event_base, callback, cbdata);
2384  tr_timerAddMsec (timer, msec);
2385  return timer;
2386}
2387
2388static void
2389ensureMgrTimersExist (struct tr_peerMgr * m)
2390{
2391  if (m->atomTimer == NULL)
2392    m->atomTimer = createTimer (m->session, ATOM_PERIOD_MSEC, atomPulse, m);
2393
2394  if (m->bandwidthTimer == NULL)
2395    m->bandwidthTimer = createTimer (m->session, BANDWIDTH_PERIOD_MSEC, bandwidthPulse, m);
2396
2397  if (m->rechokeTimer == NULL)
2398    m->rechokeTimer = createTimer (m->session, RECHOKE_PERIOD_MSEC, rechokePulse, m);
2399
2400  if (m->refillUpkeepTimer == NULL)
2401    m->refillUpkeepTimer = createTimer (m->session, REFILL_UPKEEP_PERIOD_MSEC, refillUpkeep, m);
2402}
2403
2404void
2405tr_peerMgrStartTorrent (tr_torrent * tor)
2406{
2407  tr_swarm * s;
2408
2409  assert (tr_isTorrent (tor));
2410  assert (tr_torrentIsLocked (tor));
2411
2412  s = tor->swarm;
2413  ensureMgrTimersExist (s->manager);
2414
2415  s->isRunning = true;
2416  s->maxPeers = tor->maxConnectedPeers;
2417  s->pieceSortState = PIECES_UNSORTED;
2418
2419  rechokePulse (0, 0, s->manager);
2420}
2421
2422static void
2423stopSwarm (tr_swarm * swarm)
2424{
2425  tr_peer * peer;
2426
2427  swarm->isRunning = false;
2428
2429  replicationFree (swarm);
2430  invalidatePieceSorting (swarm);
2431
2432  /* disconnect the peers. */
2433  while ((peer = tr_ptrArrayPop (&swarm->peers)))
2434    tr_peerFree (peer);
2435
2436  /* disconnect the handshakes. handshakeAbort calls handshakeDoneCB (),
2437   * which removes the handshake from t->outgoingHandshakes... */
2438  while (!tr_ptrArrayEmpty (&swarm->outgoingHandshakes))
2439    tr_handshakeAbort (tr_ptrArrayNth (&swarm->outgoingHandshakes, 0));
2440}
2441
2442void
2443tr_peerMgrStopTorrent (tr_torrent * tor)
2444{
2445  assert (tr_isTorrent (tor));
2446  assert (tr_torrentIsLocked (tor));
2447
2448  stopSwarm (tor->swarm);
2449}
2450
2451void
2452tr_peerMgrAddTorrent (tr_peerMgr * manager, tr_torrent * tor)
2453{
2454  assert (tr_isTorrent (tor));
2455  assert (tr_torrentIsLocked (tor));
2456  assert (tor->swarm == NULL);
2457
2458  tor->swarm = swarmNew (manager, tor);
2459}
2460
2461void
2462tr_peerMgrRemoveTorrent (tr_torrent * tor)
2463{
2464  assert (tr_isTorrent (tor));
2465  assert (tr_torrentIsLocked (tor));
2466
2467  stopSwarm (tor->swarm);
2468  swarmFree (tor->swarm);
2469}
2470
2471void
2472tr_peerUpdateProgress (tr_torrent * tor, tr_peer * peer)
2473{
2474  const tr_bitfield * have = &peer->have;
2475
2476  if (tr_bitfieldHasAll (have))
2477    {
2478      peer->progress = 1.0;
2479    }
2480  else if (tr_bitfieldHasNone (have))
2481    {
2482      peer->progress = 0.0;
2483    }
2484  else
2485    {
2486      const float true_count = tr_bitfieldCountTrueBits (have);
2487
2488      if (tr_torrentHasMetadata (tor))
2489        {
2490          peer->progress = true_count / tor->info.pieceCount;
2491        }
2492      else /* without pieceCount, this result is only a best guess... */
2493        {
2494          peer->progress = true_count / (have->bit_count + 1);
2495        }
2496    }
2497
2498  /* clamp the progress range */
2499  if (peer->progress < 0.0)
2500    peer->progress = 0.0;
2501  if (peer->progress > 1.0)
2502    peer->progress = 1.0;
2503
2504  if (peer->atom && (peer->progress >= 1.0))
2505    atomSetSeed (tor->swarm, peer->atom);
2506}
2507
2508void
2509tr_peerMgrOnTorrentGotMetainfo (tr_torrent * tor)
2510{
2511  int i;
2512  int peerCount;
2513  tr_peer ** peers;
2514
2515  /* the webseed list may have changed... */
2516  rebuildWebseedArray (tor->swarm, tor);
2517
2518  /* some peer_msgs' progress fields may not be accurate if we
2519     didn't have the metadata before now... so refresh them all... */
2520  peerCount = tr_ptrArraySize (&tor->swarm->peers);
2521  peers = (tr_peer**) tr_ptrArrayBase (&tor->swarm->peers);
2522  for (i=0; i<peerCount; ++i)
2523    tr_peerUpdateProgress (tor, peers[i]);
2524}
2525
2526void
2527tr_peerMgrTorrentAvailability (const tr_torrent  * tor,
2528                               int8_t            * tab,
2529                               unsigned int        tabCount)
2530{
2531  assert (tr_isTorrent (tor));
2532  assert (tab != NULL);
2533  assert (tabCount > 0);
2534
2535  memset (tab, 0, tabCount);
2536
2537  if (tr_torrentHasMetadata (tor))
2538    {
2539      tr_piece_index_t i;
2540      const int peerCount = tr_ptrArraySize (&tor->swarm->peers);
2541      const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase (&tor->swarm->peers);
2542      const float interval = tor->info.pieceCount / (float)tabCount;
2543      const bool isSeed = tr_cpGetStatus (&tor->completion) == TR_SEED;
2544
2545      for (i=0; i<tabCount; ++i)
2546        {
2547          const int piece = i * interval;
2548
2549          if (isSeed || tr_cpPieceIsComplete (&tor->completion, piece))
2550            {
2551              tab[i] = -1;
2552            }
2553          else if (peerCount)
2554            {
2555              int j;
2556              for (j=0; j<peerCount; ++j)
2557                if (tr_bitfieldHas (&peers[j]->have, piece))
2558                  ++tab[i];
2559            }
2560        }
2561    }
2562}
2563
2564static bool
2565peerIsSeed (const tr_peer * peer)
2566{
2567  if (peer->progress >= 1.0)
2568    return true;
2569
2570  if (peer->atom && atomIsSeed (peer->atom))
2571    return true;
2572
2573  return false;
2574}
2575
2576/* count how many bytes we want that connected peers have */
2577uint64_t
2578tr_peerMgrGetDesiredAvailable (const tr_torrent * tor)
2579{
2580  size_t i;
2581  size_t n;
2582  uint64_t desiredAvailable;
2583  const tr_swarm * s = tor->swarm;
2584
2585  /* common shortcuts... */
2586
2587  if (tr_torrentIsSeed (s->tor))
2588    return 0;
2589
2590  if (!tr_torrentHasMetadata (tor))
2591    return 0;
2592
2593  n = tr_ptrArraySize (&s->peers);
2594  if (n == 0)
2595    {
2596      return 0;
2597    }
2598  else
2599    {
2600      const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase (&s->peers);
2601      for (i=0; i<n; ++i)
2602        if (peers[i]->atom && atomIsSeed (peers[i]->atom))
2603          return tr_cpLeftUntilDone (&tor->completion);
2604    }
2605
2606  if (!s->pieceReplication || !s->pieceReplicationSize)
2607    return 0;
2608
2609  /* do it the hard way */
2610
2611  desiredAvailable = 0;
2612  for (i=0, n=MIN (tor->info.pieceCount, s->pieceReplicationSize); i<n; ++i)
2613    if (!tor->info.pieces[i].dnd && (s->pieceReplication[i] > 0))
2614      desiredAvailable += tr_cpMissingBytesInPiece (&s->tor->completion, i);
2615
2616  assert (desiredAvailable <= tor->info.totalSize);
2617  return desiredAvailable;
2618}
2619
2620void
2621tr_peerMgrTorrentStats (tr_torrent  * tor,
2622                        int         * setmePeersConnected,
2623                        int         * setmeWebseedsSendingToUs,
2624                        int         * setmePeersSendingToUs,
2625                        int         * setmePeersGettingFromUs,
2626                        int         * setmePeersFrom)
2627{
2628  int i;
2629  int n;
2630  tr_swarm * s;
2631
2632  assert (tr_isTorrent (tor));
2633
2634  *setmePeersConnected       = 0;
2635  *setmePeersGettingFromUs   = 0;
2636  *setmePeersSendingToUs     = 0;
2637  *setmeWebseedsSendingToUs  = 0;
2638
2639  s = tor->swarm;
2640  n = tr_ptrArraySize (&s->peers);
2641
2642  for (i=0; i<TR_PEER_FROM__MAX; ++i)
2643    setmePeersFrom[i] = 0;
2644
2645  for (i=0; i<n; ++i)
2646    {
2647      tr_peer * peer = tr_ptrArrayNth (&s->peers, i);
2648      tr_peerMsgs * msgs = PEER_MSGS (peer);
2649      const struct peer_atom * atom = peer->atom;
2650
2651      assert (msgs != NULL);
2652
2653      ++*setmePeersConnected;
2654
2655      ++setmePeersFrom[atom->fromFirst];
2656
2657      if (clientIsDownloadingFrom (tor, msgs))
2658        ++*setmePeersSendingToUs;
2659
2660      if (clientIsUploadingTo (msgs))
2661        ++*setmePeersGettingFromUs;
2662    }
2663
2664  *setmeWebseedsSendingToUs = countActiveWebseeds (s);
2665}
2666
2667double*
2668tr_peerMgrWebSpeeds_KBps (const tr_torrent * tor)
2669{
2670  unsigned int i;
2671  tr_swarm * s;
2672  unsigned int n;
2673  double * ret = NULL;
2674  const uint64_t now = tr_time_msec ();
2675
2676  assert (tr_isTorrent (tor));
2677
2678  s = tor->swarm;
2679  n = tr_ptrArraySize (&s->webseeds);
2680  ret = tr_new0 (double, n);
2681
2682  assert (s->manager != NULL);
2683  assert (n == tor->info.webseedCount);
2684
2685  for (i=0; i<n; ++i)
2686    {
2687      unsigned int Bps = 0;
2688      if (tr_peerIsTransferringPieces (tr_ptrArrayNth(&s->webseeds,i), now, TR_DOWN, &Bps))
2689        ret[i] = Bps / (double)tr_speed_K;
2690      else
2691        ret[i] = -1.0;
2692    }
2693
2694  return ret;
2695}
2696
2697struct tr_peer_stat *
2698tr_peerMgrPeerStats (const tr_torrent * tor, int * setmeCount)
2699{
2700  int i;
2701  int size = 0;
2702  tr_peer_stat * ret;
2703  const tr_swarm * s;
2704  tr_peer ** peers;
2705  const time_t now = tr_time ();
2706  const uint64_t now_msec = tr_time_msec ();
2707
2708  assert (tr_isTorrent (tor));
2709  assert (tor->swarm->manager != NULL);
2710
2711  s = tor->swarm;
2712  peers = (tr_peer**) tr_ptrArrayBase (&s->peers);
2713  size = tr_ptrArraySize (&s->peers);
2714  ret = tr_new0 (tr_peer_stat, size);
2715
2716  for (i=0; i<size; ++i)
2717    {
2718      char *                   pch;
2719      tr_peer *                peer = peers[i];
2720      tr_peerMsgs *            msgs = PEER_MSGS (peer);
2721      const struct peer_atom * atom = peer->atom;
2722      tr_peer_stat *           stat = ret + i;
2723
2724      tr_address_to_string_with_buf (&atom->addr, stat->addr, sizeof (stat->addr));
2725      tr_strlcpy (stat->client, tr_quark_get_string(peer->client,NULL), sizeof (stat->client));
2726      stat->port                = ntohs (peer->atom->port);
2727      stat->from                = atom->fromFirst;
2728      stat->progress            = peer->progress;
2729      stat->isUTP               = tr_peerMsgsIsUtpConnection (msgs);
2730      stat->isEncrypted         = tr_peerMsgsIsEncrypted (msgs);
2731      stat->rateToPeer_KBps     = toSpeedKBps (tr_peerGetPieceSpeed_Bps (peer, now_msec, TR_CLIENT_TO_PEER));
2732      stat->rateToClient_KBps   = toSpeedKBps (tr_peerGetPieceSpeed_Bps (peer, now_msec, TR_PEER_TO_CLIENT));
2733      stat->peerIsChoked        = tr_peerMsgsIsPeerChoked (msgs);
2734      stat->peerIsInterested    = tr_peerMsgsIsPeerInterested (msgs);
2735      stat->clientIsChoked      = tr_peerMsgsIsClientChoked (msgs);
2736      stat->clientIsInterested  = tr_peerMsgsIsClientInterested (msgs);
2737      stat->isIncoming          = tr_peerMsgsIsIncomingConnection (msgs);
2738      stat->isDownloadingFrom   = clientIsDownloadingFrom (tor, msgs);
2739      stat->isUploadingTo       = clientIsUploadingTo (msgs);
2740      stat->isSeed              = peerIsSeed (peer);
2741
2742      stat->blocksToPeer        = tr_historyGet (&peer->blocksSentToPeer,    now, CANCEL_HISTORY_SEC);
2743      stat->blocksToClient      = tr_historyGet (&peer->blocksSentToClient,  now, CANCEL_HISTORY_SEC);
2744      stat->cancelsToPeer       = tr_historyGet (&peer->cancelsSentToPeer,   now, CANCEL_HISTORY_SEC);
2745      stat->cancelsToClient     = tr_historyGet (&peer->cancelsSentToClient, now, CANCEL_HISTORY_SEC);
2746
2747      stat->pendingReqsToPeer   = peer->pendingReqsToPeer;
2748      stat->pendingReqsToClient = peer->pendingReqsToClient;
2749
2750      pch = stat->flagStr;
2751      if (stat->isUTP) *pch++ = 'T';
2752      if (s->optimistic == msgs) *pch++ = 'O';
2753      if (stat->isDownloadingFrom) *pch++ = 'D';
2754      else if (stat->clientIsInterested) *pch++ = 'd';
2755      if (stat->isUploadingTo) *pch++ = 'U';
2756      else if (stat->peerIsInterested) *pch++ = 'u';
2757      if (!stat->clientIsChoked && !stat->clientIsInterested) *pch++ = 'K';
2758      if (!stat->peerIsChoked && !stat->peerIsInterested) *pch++ = '?';
2759      if (stat->isEncrypted) *pch++ = 'E';
2760      if (stat->from == TR_PEER_FROM_DHT) *pch++ = 'H';
2761      else if (stat->from == TR_PEER_FROM_PEX) *pch++ = 'X';
2762      if (stat->isIncoming) *pch++ = 'I';
2763      *pch = '\0';
2764    }
2765
2766  *setmeCount = size;
2767  return ret;
2768}
2769
2770/***
2771****
2772****
2773***/
2774
2775void
2776tr_peerMgrClearInterest (tr_torrent * tor)
2777{
2778  int i;
2779  tr_swarm * s = tor->swarm;
2780  const int peerCount = tr_ptrArraySize (&s->peers);
2781
2782  assert (tr_isTorrent (tor));
2783  assert (tr_torrentIsLocked (tor));
2784
2785  for (i=0; i<peerCount; ++i)
2786    tr_peerMsgsSetInterested (tr_ptrArrayNth (&s->peers, i), false);
2787}
2788
2789/* does this peer have any pieces that we want? */
2790static bool
2791isPeerInteresting (tr_torrent     * const tor,
2792                   const bool     * const piece_is_interesting,
2793                   const tr_peer  * const peer)
2794{
2795  tr_piece_index_t i, n;
2796
2797  /* these cases should have already been handled by the calling code... */
2798  assert (!tr_torrentIsSeed (tor));
2799  assert (tr_torrentIsPieceTransferAllowed (tor, TR_PEER_TO_CLIENT));
2800
2801  if (peerIsSeed (peer))
2802    return true;
2803
2804  for (i=0, n=tor->info.pieceCount; i<n; ++i)
2805    if (piece_is_interesting[i] && tr_bitfieldHas (&peer->have, i))
2806      return true;
2807
2808  return false;
2809}
2810
2811typedef enum
2812{
2813  RECHOKE_STATE_GOOD,
2814  RECHOKE_STATE_UNTESTED,
2815  RECHOKE_STATE_BAD
2816}
2817tr_rechoke_state;
2818
2819struct tr_rechoke_info
2820{
2821  tr_peer * peer;
2822  int salt;
2823  int rechoke_state;
2824};
2825
2826static int
2827compare_rechoke_info (const void * va, const void * vb)
2828{
2829  const struct tr_rechoke_info * a = va;
2830  const struct tr_rechoke_info * b = vb;
2831
2832  if (a->rechoke_state != b->rechoke_state)
2833    return a->rechoke_state - b->rechoke_state;
2834
2835  return a->salt - b->salt;
2836}
2837
2838/* determines who we send "interested" messages to */
2839static void
2840rechokeDownloads (tr_swarm * s)
2841{
2842  int i;
2843  int maxPeers = 0;
2844  int rechoke_count = 0;
2845  struct tr_rechoke_info * rechoke = NULL;
2846  const int MIN_INTERESTING_PEERS = 5;
2847  const int peerCount = tr_ptrArraySize (&s->peers);
2848  const time_t now = tr_time ();
2849
2850  /* some cases where this function isn't necessary */
2851  if (tr_torrentIsSeed (s->tor))
2852    return;
2853  if (!tr_torrentIsPieceTransferAllowed (s->tor, TR_PEER_TO_CLIENT))
2854    return;
2855
2856  /* decide HOW MANY peers to be interested in */
2857  {
2858    int blocks = 0;
2859    int cancels = 0;
2860    time_t timeSinceCancel;
2861
2862    /* Count up how many blocks & cancels each peer has.
2863     *
2864     * There are two situations where we send out cancels --
2865     *
2866     * 1. We've got unresponsive peers, which is handled by deciding
2867     *    -which- peers to be interested in.
2868     *
2869     * 2. We've hit our bandwidth cap, which is handled by deciding
2870     *    -how many- peers to be interested in.
2871     *
2872     * We're working on 2. here, so we need to ignore unresponsive
2873     * peers in our calculations lest they confuse Transmission into
2874     * thinking it's hit its bandwidth cap.
2875     */
2876    for (i=0; i<peerCount; ++i)
2877      {
2878        const tr_peer * peer = tr_ptrArrayNth (&s->peers, i);
2879        const int b = tr_historyGet (&peer->blocksSentToClient, now, CANCEL_HISTORY_SEC);
2880        const int c = tr_historyGet (&peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC);
2881
2882        if (b == 0) /* ignore unresponsive peers, as described above */
2883          continue;
2884
2885        blocks += b;
2886        cancels += c;
2887      }
2888
2889    if (cancels > 0)
2890      {
2891        /* cancelRate: of the block requests we've recently made, the percentage we cancelled.
2892         * higher values indicate more congestion. */
2893        const double cancelRate = cancels / (double)(cancels + blocks);
2894        const double mult = 1 - MIN (cancelRate, 0.5);
2895        maxPeers = s->interestedCount * mult;
2896        tordbg (s, "cancel rate is %.3f -- reducing the "
2897                   "number of peers we're interested in by %.0f percent",
2898                   cancelRate, mult * 100);
2899        s->lastCancel = now;
2900      }
2901
2902    timeSinceCancel = now - s->lastCancel;
2903    if (timeSinceCancel)
2904      {
2905        const int maxIncrease = 15;
2906        const time_t maxHistory = 2 * CANCEL_HISTORY_SEC;
2907        const double mult = MIN (timeSinceCancel, maxHistory) / (double) maxHistory;
2908        const int inc = maxIncrease * mult;
2909        maxPeers = s->maxPeers + inc;
2910        tordbg (s, "time since last cancel is %li -- increasing the "
2911                   "number of peers we're interested in by %d",
2912                   timeSinceCancel, inc);
2913      }
2914  }
2915
2916  /* don't let the previous section's number tweaking go too far... */
2917  if (maxPeers < MIN_INTERESTING_PEERS)
2918    maxPeers = MIN_INTERESTING_PEERS;
2919  if (maxPeers > s->tor->maxConnectedPeers)
2920    maxPeers = s->tor->maxConnectedPeers;
2921
2922  s->maxPeers = maxPeers;
2923
2924  if (peerCount > 0)
2925    {
2926      bool * piece_is_interesting;
2927      const tr_torrent * const tor = s->tor;
2928      const int n = tor->info.pieceCount;
2929
2930      /* build a bitfield of interesting pieces... */
2931      piece_is_interesting = tr_new (bool, n);
2932      for (i=0; i<n; i++)
2933        piece_is_interesting[i] = !tor->info.pieces[i].dnd && !tr_cpPieceIsComplete (&tor->completion, i);
2934
2935      /* decide WHICH peers to be interested in (based on their cancel-to-block ratio) */
2936      for (i=0; i<peerCount; ++i)
2937        {
2938          tr_peer * peer = tr_ptrArrayNth (&s->peers, i);
2939
2940          if (!isPeerInteresting (s->tor, piece_is_interesting, peer))
2941            {
2942              tr_peerMsgsSetInterested (PEER_MSGS(peer), false);
2943            }
2944          else
2945            {
2946              tr_rechoke_state rechoke_state;
2947              const int blocks = tr_historyGet (&peer->blocksSentToClient, now, CANCEL_HISTORY_SEC);
2948              const int cancels = tr_historyGet (&peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC);
2949
2950              if (!blocks && !cancels)
2951                rechoke_state = RECHOKE_STATE_UNTESTED;
2952              else if (!cancels)
2953                rechoke_state = RECHOKE_STATE_GOOD;
2954              else if (!blocks)
2955                rechoke_state = RECHOKE_STATE_BAD;
2956              else if ((cancels * 10) < blocks)
2957                rechoke_state = RECHOKE_STATE_GOOD;
2958              else
2959                rechoke_state = RECHOKE_STATE_BAD;
2960
2961              if (rechoke == NULL)
2962                rechoke = tr_new (struct tr_rechoke_info, peerCount);
2963
2964              rechoke[rechoke_count].peer = peer;
2965              rechoke[rechoke_count].rechoke_state = rechoke_state;
2966              rechoke[rechoke_count].salt = tr_cryptoWeakRandInt (INT_MAX);
2967              rechoke_count++;
2968            }
2969
2970        }
2971
2972      tr_free (piece_is_interesting);
2973    }
2974
2975  /* now that we know which & how many peers to be interested in... update the peer interest */
2976  qsort (rechoke, rechoke_count, sizeof (struct tr_rechoke_info), compare_rechoke_info);
2977  s->interestedCount = MIN (maxPeers, rechoke_count);
2978  for (i=0; i<rechoke_count; ++i)
2979    tr_peerMsgsSetInterested (PEER_MSGS(rechoke[i].peer), i<s->interestedCount);
2980
2981  /* cleanup */
2982  tr_free (rechoke);
2983}
2984
2985/**
2986***
2987**/
2988
2989struct ChokeData
2990{
2991  bool          isInterested;
2992  bool          wasChoked;
2993  bool          isChoked;
2994  int           rate;
2995  int           salt;
2996  tr_peerMsgs * msgs;
2997};
2998
2999static int
3000compareChoke (const void * va, const void * vb)
3001{
3002  const struct ChokeData * a = va;
3003  const struct ChokeData * b = vb;
3004
3005  if (a->rate != b->rate) /* prefer higher overall speeds */
3006    return a->rate > b->rate ? -1 : 1;
3007
3008  if (a->wasChoked != b->wasChoked) /* prefer unchoked */
3009    return a->wasChoked ? 1 : -1;
3010
3011  if (a->salt != b->salt) /* random order */
3012    return a->salt - b->salt;
3013
3014  return 0;
3015}
3016
3017/* is this a new connection? */
3018static bool
3019isNew (const tr_peerMsgs * msgs)
3020{
3021  return (msgs != NULL) && (tr_peerMsgsGetConnectionAge (msgs) < 45);
3022}
3023
3024/* get a rate for deciding which peers to choke and unchoke. */
3025static int
3026getRate (const tr_torrent * tor, struct peer_atom * atom, uint64_t now)
3027{
3028  unsigned int Bps;
3029
3030  if (tr_torrentIsSeed (tor))
3031    Bps = tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_CLIENT_TO_PEER);
3032
3033  /* downloading a private torrent... take upload speed into account
3034   * because there may only be a small window of opportunity to share */
3035  else if (tr_torrentIsPrivate (tor))
3036    Bps = tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_PEER_TO_CLIENT)
3037        + tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_CLIENT_TO_PEER);
3038
3039  /* downloading a public torrent */
3040  else
3041    Bps = tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_PEER_TO_CLIENT);
3042
3043  /* convert it to bytes per second */
3044  return Bps;
3045}
3046
3047static inline bool
3048isBandwidthMaxedOut (const tr_bandwidth * b,
3049                     const uint64_t now_msec, tr_direction dir)
3050{
3051  if (!tr_bandwidthIsLimited (b, dir))
3052    {
3053      return false;
3054    }
3055  else
3056    {
3057      const unsigned int got = tr_bandwidthGetPieceSpeed_Bps (b, now_msec, dir);
3058      const unsigned int want = tr_bandwidthGetDesiredSpeed_Bps (b, dir);
3059      return got >= want;
3060    }
3061}
3062
3063static void
3064rechokeUploads (tr_swarm * s, const uint64_t now)
3065{
3066  int i, size, unchokedInterested;
3067  const int peerCount = tr_ptrArraySize (&s->peers);
3068  tr_peer ** peers = (tr_peer**) tr_ptrArrayBase (&s->peers);
3069  struct ChokeData * choke = tr_new0 (struct ChokeData, peerCount);
3070  const tr_session * session = s->manager->session;
3071  const int chokeAll = !tr_torrentIsPieceTransferAllowed (s->tor, TR_CLIENT_TO_PEER);
3072  const bool isMaxedOut = isBandwidthMaxedOut (&s->tor->bandwidth, now, TR_UP);
3073
3074  assert (swarmIsLocked (s));
3075
3076  /* an optimistic unchoke peer's "optimistic"
3077   * state lasts for N calls to rechokeUploads (). */
3078  if (s->optimisticUnchokeTimeScaler > 0)
3079    s->optimisticUnchokeTimeScaler--;
3080  else
3081    s->optimistic = NULL;
3082
3083  /* sort the peers by preference and rate */
3084  for (i=0, size=0; i<peerCount; ++i)
3085    {
3086      tr_peer * peer = peers[i];
3087      tr_peerMsgs * msgs = PEER_MSGS (peer);
3088
3089      struct peer_atom * atom = peer->atom;
3090
3091      if (peerIsSeed (peer)) /* choke seeds and partial seeds */
3092        {
3093          tr_peerMsgsSetChoke (PEER_MSGS(peer), true);
3094        }
3095      else if (chokeAll) /* choke everyone if we're not uploading */
3096        {
3097          tr_peerMsgsSetChoke (PEER_MSGS(peer), true);
3098        }
3099      else if (msgs != s->optimistic)
3100        {
3101          struct ChokeData * n = &choke[size++];
3102          n->msgs         = msgs;
3103          n->isInterested = tr_peerMsgsIsPeerInterested (msgs);
3104          n->wasChoked    = tr_peerMsgsIsPeerChoked (msgs);
3105          n->rate         = getRate (s->tor, atom, now);
3106          n->salt         = tr_cryptoWeakRandInt (INT_MAX);
3107          n->isChoked     = true;
3108        }
3109    }
3110
3111  qsort (choke, size, sizeof (struct ChokeData), compareChoke);
3112
3113  /**
3114   * Reciprocation and number of uploads capping is managed by unchoking
3115   * the N peers which have the best upload rate and are interested.
3116   * This maximizes the client's download rate. These N peers are
3117   * referred to as downloaders, because they are interested in downloading
3118   * from the client.
3119   *
3120   * Peers which have a better upload rate (as compared to the downloaders)
3121   * but aren't interested get unchoked. If they become interested, the
3122   * downloader with the worst upload rate gets choked. If a client has
3123   * a complete file, it uses its upload rate rather than its download
3124   * rate to decide which peers to unchoke.
3125   *
3126   * If our bandwidth is maxed out, don't unchoke any more peers.
3127   */
3128  unchokedInterested = 0;
3129  for (i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i)
3130    {
3131      choke[i].isChoked = isMaxedOut ? choke[i].wasChoked : false;
3132      if (choke[i].isInterested)
3133        ++unchokedInterested;
3134    }
3135
3136  /* optimistic unchoke */
3137  if (!s->optimistic && !isMaxedOut && (i<size))
3138    {
3139      int n;
3140      struct ChokeData * c;
3141      tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
3142
3143      for (; i<size; ++i)
3144        {
3145          if (choke[i].isInterested)
3146            {
3147              const tr_peerMsgs * msgs = choke[i].msgs;
3148              int x = 1, y;
3149              if (isNew (msgs)) x *= 3;
3150              for (y=0; y<x; ++y)
3151                tr_ptrArrayAppend (&randPool, &choke[i]);
3152            }
3153        }
3154
3155      if ((n = tr_ptrArraySize (&randPool)))
3156        {
3157          c = tr_ptrArrayNth (&randPool, tr_cryptoWeakRandInt (n));
3158          c->isChoked = false;
3159          s->optimistic = c->msgs;
3160          s->optimisticUnchokeTimeScaler = OPTIMISTIC_UNCHOKE_MULTIPLIER;
3161        }
3162
3163      tr_ptrArrayDestruct (&randPool, NULL);
3164    }
3165
3166  for (i=0; i<size; ++i)
3167    tr_peerMsgsSetChoke (choke[i].msgs, choke[i].isChoked);
3168
3169  /* cleanup */
3170  tr_free (choke);
3171}
3172
3173static void
3174rechokePulse (int foo UNUSED, short bar UNUSED, void * vmgr)
3175{
3176  tr_torrent * tor = NULL;
3177  tr_peerMgr * mgr = vmgr;
3178  const uint64_t now = tr_time_msec ();
3179
3180  managerLock (mgr);
3181
3182  while ((tor = tr_torrentNext (mgr->session, tor)))
3183    {
3184      if (tor->isRunning)
3185        {
3186          tr_swarm * s = tor->swarm;
3187          if (!tr_ptrArrayEmpty (&s->peers))
3188            {
3189              rechokeUploads (s, now);
3190              rechokeDownloads (s);
3191            }
3192        }
3193    }
3194
3195  tr_timerAddMsec (mgr->rechokeTimer, RECHOKE_PERIOD_MSEC);
3196  managerUnlock (mgr);
3197}
3198
3199/***
3200****
3201****  Life and Death
3202****
3203***/
3204
3205static bool
3206shouldPeerBeClosed (const tr_swarm   * s,
3207                    const tr_peer    * peer,
3208                    int                peerCount,
3209                    const time_t       now)
3210{
3211  const tr_torrent * tor = s->tor;
3212  const struct peer_atom * atom = peer->atom;
3213
3214  /* if it's marked for purging, close it */
3215  if (peer->doPurge)
3216    {
3217      tordbg (s, "purging peer %s because its doPurge flag is set",
3218              tr_atomAddrStr (atom));
3219      return true;
3220    }
3221
3222  /* disconnect if we're both seeds and enough time has passed for PEX */
3223  if (tr_torrentIsSeed (tor) && peerIsSeed (peer))
3224    return !tr_torrentAllowsPex (tor) || (now-atom->time>=30);
3225
3226  /* disconnect if it's been too long since piece data has been transferred.
3227   * this is on a sliding scale based on number of available peers... */
3228  {
3229    const int relaxStrictnessIfFewerThanN = (int)((getMaxPeerCount (tor) * 0.9) + 0.5);
3230    /* if we have >= relaxIfFewerThan, strictness is 100%.
3231     * if we have zero connections, strictness is 0% */
3232    const float strictness = peerCount >= relaxStrictnessIfFewerThanN
3233                           ? 1.0
3234                           : peerCount / (float)relaxStrictnessIfFewerThanN;
3235    const int lo = MIN_UPLOAD_IDLE_SECS;
3236    const int hi = MAX_UPLOAD_IDLE_SECS;
3237    const int limit = hi - ((hi - lo) * strictness);
3238    const int idleTime = now - MAX (atom->time, atom->piece_data_time);
3239/*fprintf (stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit);*/
3240    if (idleTime > limit)
3241      {
3242        tordbg (s, "purging peer %s because it's been %d secs since we shared anything",
3243                tr_atomAddrStr (atom), idleTime);
3244        return true;
3245      }
3246  }
3247
3248  return false;
3249}
3250
3251static tr_peer **
3252getPeersToClose (tr_swarm * s, const time_t now_sec, int * setmeSize)
3253{
3254  int i, peerCount, outsize;
3255  struct tr_peer ** ret = NULL;
3256  tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek (&s->peers, &peerCount);
3257
3258  assert (swarmIsLocked (s));
3259
3260  for (i=outsize=0; i<peerCount; ++i)
3261    {
3262      if (shouldPeerBeClosed (s, peers[i], peerCount, now_sec))
3263        {
3264          if (ret == NULL)
3265            ret = tr_new (tr_peer *, peerCount);
3266          ret[outsize++] = peers[i];
3267        }
3268    }
3269
3270  *setmeSize = outsize;
3271  return ret;
3272}
3273
3274static int
3275getReconnectIntervalSecs (const struct peer_atom * atom, const time_t now)
3276{
3277  int sec;
3278  const bool unreachable = (atom->flags2 & MYFLAG_UNREACHABLE) != 0;
3279
3280  /* if we were recently connected to this peer and transferring piece
3281   * data, try to reconnect to them sooner rather that later -- we don't
3282   * want network troubles to get in the way of a good peer. */
3283  if (!unreachable && ((now - atom->piece_data_time) <= (MINIMUM_RECONNECT_INTERVAL_SECS * 2)))
3284    sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3285
3286  /* otherwise, the interval depends on how many times we've tried
3287   * and failed to connect to the peer */
3288  else
3289    {
3290      int step = atom->numFails;
3291
3292      /* penalize peers that were unreachable the last time we tried */
3293      if (unreachable)
3294        step += 2;
3295 
3296      switch (step)
3297        {
3298          case 0: sec = 0; break;
3299          case 1: sec = 10; break;
3300          case 2: sec = 60 * 2; break;
3301          case 3: sec = 60 * 15; break;
3302          case 4: sec = 60 * 30; break;
3303          case 5: sec = 60 * 60; break;
3304          default: sec = 60 * 120; break;
3305        }
3306    }
3307
3308  dbgmsg ("reconnect interval for %s is %d seconds", tr_atomAddrStr (atom), sec);
3309  return sec;
3310}
3311
3312static void
3313removePeer (tr_swarm * s, tr_peer * peer)
3314{
3315  tr_peer * removed;
3316  struct peer_atom * atom = peer->atom;
3317
3318  assert (swarmIsLocked (s));
3319  assert (atom);
3320
3321  atom->time = tr_time ();
3322
3323  removed = tr_ptrArrayRemoveSorted (&s->peers, peer, peerCompare);
3324
3325  if (replicationExists (s))
3326    tr_decrReplicationFromBitfield (s, &peer->have);
3327
3328  assert (removed == peer);
3329  tr_peerFree (removed);
3330}
3331
3332static void
3333closePeer (tr_swarm * s, tr_peer * peer)
3334{
3335  struct peer_atom * atom;
3336
3337  assert (s != NULL);
3338  assert (peer != NULL);
3339
3340  atom = peer->atom;
3341
3342  /* if we transferred piece data, then they might be good peers,
3343     so reset their `numFails' weight to zero. otherwise we connected
3344     to them fruitlessly, so mark it as another fail */
3345  if (atom->piece_data_time)
3346    {
3347      tordbg (s, "resetting atom %s numFails to 0", tr_atomAddrStr (atom));
3348      atom->numFails = 0;
3349    }
3350  else
3351    {
3352      ++atom->numFails;
3353      tordbg (s, "incremented atom %s numFails to %d", tr_atomAddrStr (atom), (int)atom->numFails);
3354    }
3355
3356  tordbg (s, "removing bad peer %s", tr_atomAddrStr (peer->atom));
3357  removePeer (s, peer);
3358}
3359
3360static void
3361removeAllPeers (tr_swarm * s)
3362{
3363  while (!tr_ptrArrayEmpty (&s->peers))
3364    removePeer (s, tr_ptrArrayNth (&s->peers, 0));
3365}
3366
3367static void
3368closeBadPeers (tr_swarm * s, const time_t now_sec)
3369{
3370  if (!tr_ptrArrayEmpty (&s->peers))
3371    {
3372      int i;
3373      int peerCount;
3374      struct tr_peer ** peers;
3375
3376      peers = getPeersToClose (s, now_sec, &peerCount);
3377      for (i=0; i<peerCount; ++i)
3378        closePeer (s, peers[i]);
3379      tr_free (peers);
3380    }
3381}
3382
3383struct peer_liveliness
3384{
3385  tr_peer * peer;
3386  void * clientData;
3387  time_t pieceDataTime;
3388  time_t time;
3389  int speed;
3390  bool doPurge;
3391};
3392
3393static int
3394comparePeerLiveliness (const void * va, const void * vb)
3395{
3396  const struct peer_liveliness * a = va;
3397  const struct peer_liveliness * b = vb;
3398
3399  if (a->doPurge != b->doPurge)
3400    return a->doPurge ? 1 : -1;
3401
3402  if (a->speed != b->speed) /* faster goes first */
3403    return a->speed > b->speed ? -1 : 1;
3404
3405  /* the one to give us data more recently goes first */
3406  if (a->pieceDataTime != b->pieceDataTime)
3407    return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
3408
3409  /* the one we connected to most recently goes first */
3410  if (a->time != b->time)
3411    return a->time > b->time ? -1 : 1;
3412
3413  return 0;
3414}
3415
3416static void
3417sortPeersByLivelinessImpl (tr_peer  ** peers,
3418                           void     ** clientData,
3419                           int         n,
3420                           uint64_t    now,
3421                           int (*compare)(const void *va, const void *vb))
3422{
3423  int i;
3424  struct peer_liveliness *lives, *l;
3425
3426  /* build a sortable array of peer + extra info */
3427  lives = l = tr_new0 (struct peer_liveliness, n);
3428  for (i=0; i<n; ++i, ++l)
3429    {
3430      tr_peer * p = peers[i];
3431      l->peer = p;
3432      l->doPurge = p->doPurge;
3433      l->pieceDataTime = p->atom->piece_data_time;
3434      l->time = p->atom->time;
3435      l->speed = tr_peerGetPieceSpeed_Bps (p, now, TR_UP)
3436               + tr_peerGetPieceSpeed_Bps (p, now, TR_DOWN);
3437      if (clientData)
3438        l->clientData = clientData[i];
3439    }
3440
3441  /* sort 'em */
3442  assert (n == (l - lives));
3443  qsort (lives, n, sizeof (struct peer_liveliness), compare);
3444
3445  /* build the peer array */
3446  for (i=0, l=lives; i<n; ++i, ++l)
3447    {
3448      peers[i] = l->peer;
3449      if (clientData)
3450        clientData[i] = l->clientData;
3451    }
3452  assert (n == (l - lives));
3453
3454  /* cleanup */
3455  tr_free (lives);
3456}
3457
3458static void
3459sortPeersByLiveliness (tr_peer ** peers, void ** clientData, int n, uint64_t now)
3460{
3461  sortPeersByLivelinessImpl (peers, clientData, n, now, comparePeerLiveliness);
3462}
3463
3464
3465static void
3466enforceTorrentPeerLimit (tr_swarm * s, uint64_t now)
3467{
3468  int n = tr_ptrArraySize (&s->peers);
3469  const int max = tr_torrentGetPeerLimit (s->tor);
3470  if (n > max)
3471    {
3472      void * base = tr_ptrArrayBase (&s->peers);
3473      tr_peer ** peers = tr_memdup (base, n*sizeof (tr_peer*));
3474      sortPeersByLiveliness (peers, NULL, n, now);
3475      while (n > max)
3476        closePeer (s, peers[--n]);
3477      tr_free (peers);
3478    }
3479}
3480
3481static void
3482enforceSessionPeerLimit (tr_session * session, uint64_t now)
3483{
3484  int n = 0;
3485  tr_torrent * tor = NULL;
3486  const int max = tr_sessionGetPeerLimit (session);
3487
3488  /* count the total number of peers */
3489  while ((tor = tr_torrentNext (session, tor)))
3490    n += tr_ptrArraySize (&tor->swarm->peers);
3491
3492  /* if there are too many, prune out the worst */
3493  if (n > max)
3494    {
3495      tr_peer ** peers = tr_new (tr_peer*, n);
3496      tr_swarm ** swarms = tr_new (tr_swarm*, n);
3497
3498      /* populate the peer array */
3499      n = 0;
3500      tor = NULL;
3501      while ((tor = tr_torrentNext (session, tor)))
3502        {
3503          int i;
3504          tr_swarm * s = tor->swarm;
3505          const int tn = tr_ptrArraySize (&s->peers);
3506          for (i=0; i<tn; ++i, ++n)
3507            {
3508              peers[n] = tr_ptrArrayNth (&s->peers, i);
3509              swarms[n] = s;
3510            }
3511        }
3512
3513      /* sort 'em */
3514      sortPeersByLiveliness (peers, (void**)swarms, n, now);
3515
3516      /* cull out the crappiest */
3517      while (n-- > max)
3518        closePeer (swarms[n], peers[n]);
3519
3520      /* cleanup */
3521      tr_free (swarms);
3522      tr_free (peers);
3523    }
3524}
3525
3526static void makeNewPeerConnections (tr_peerMgr * mgr, const int max);
3527
3528static void
3529reconnectPulse (int foo UNUSED, short bar UNUSED, void * vmgr)
3530{
3531  tr_torrent * tor;
3532  tr_peerMgr * mgr = vmgr;
3533  const time_t now_sec = tr_time ();
3534  const uint64_t now_msec = tr_time_msec ();
3535
3536  /**
3537  ***  enforce the per-session and per-torrent peer limits
3538  **/
3539
3540  /* if we're over the per-torrent peer limits, cull some peers */
3541  tor = NULL;
3542  while ((tor = tr_torrentNext (mgr->session, tor)))
3543    if (tor->isRunning)
3544      enforceTorrentPeerLimit (tor->swarm, now_msec);
3545
3546  /* if we're over the per-session peer limits, cull some peers */
3547  enforceSessionPeerLimit (mgr->session, now_msec);
3548
3549  /* remove crappy peers */
3550  tor = NULL;
3551  while ((tor = tr_torrentNext (mgr->session, tor)))
3552    if (!tor->swarm->isRunning)
3553      removeAllPeers (tor->swarm);
3554    else
3555      closeBadPeers (tor->swarm, now_sec);
3556
3557  /* try to make new peer connections */
3558  makeNewPeerConnections (mgr, MAX_CONNECTIONS_PER_PULSE);
3559}
3560
3561/****
3562*****
3563*****  BANDWIDTH ALLOCATION
3564*****
3565****/
3566
3567static void
3568pumpAllPeers (tr_peerMgr * mgr)
3569{
3570  tr_torrent * tor = NULL;
3571
3572  while ((tor = tr_torrentNext (mgr->session, tor)))
3573    {
3574      int j;
3575      tr_swarm * s = tor->swarm;
3576
3577      for (j=0; j<tr_ptrArraySize (&s->peers); ++j)
3578        tr_peerMsgsPulse (tr_ptrArrayNth (&s->peers, j));
3579    }
3580}
3581
3582
3583static void
3584queuePulseForeach (void * vtor)
3585{
3586  tr_torrent * tor = vtor;
3587
3588  tr_torrentStartNow (tor);
3589
3590  if (tor->queue_started_callback != NULL)
3591    (*tor->queue_started_callback)(tor, tor->queue_started_user_data);
3592}
3593
3594static void
3595queuePulse (tr_session * session, tr_direction dir)
3596{
3597  assert (tr_isSession (session));
3598  assert (tr_isDirection (dir));
3599
3600  if (tr_sessionGetQueueEnabled (session, dir))
3601    {
3602      tr_ptrArray torrents = TR_PTR_ARRAY_INIT;
3603
3604      tr_sessionGetNextQueuedTorrents (session,
3605                                       dir,
3606                                       tr_sessionCountQueueFreeSlots (session, dir),
3607                                       &torrents);
3608
3609      tr_ptrArrayForeach (&torrents, queuePulseForeach);
3610
3611      tr_ptrArrayDestruct (&torrents, NULL);
3612    }
3613}
3614
3615static void
3616bandwidthPulse (int foo UNUSED, short bar UNUSED, void * vmgr)
3617{
3618  tr_torrent * tor;
3619  tr_peerMgr * mgr = vmgr;
3620  tr_session * session = mgr->session;
3621  managerLock (mgr);
3622
3623  /* FIXME: this next line probably isn't necessary... */
3624  pumpAllPeers (mgr);
3625
3626  /* allocate bandwidth to the peers */
3627  tr_bandwidthAllocate (&session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC);
3628  tr_bandwidthAllocate (&session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC);
3629
3630  /* torrent upkeep */
3631  tor = NULL;
3632  while ((tor = tr_torrentNext (session, tor)))
3633    {
3634      /* possibly stop torrents that have seeded enough */
3635      tr_torrentCheckSeedLimit (tor);
3636
3637      /* run the completeness check for any torrents that need it */
3638      if (tor->swarm->needsCompletenessCheck)
3639        {
3640          tor->swarm->needsCompletenessCheck  = false;
3641          tr_torrentRecheckCompleteness (tor);
3642        }
3643
3644      /* stop torrents that are ready to stop, but couldn't be stopped
3645         earlier during the peer-io callback call chain */
3646      if (tor->isStopping)
3647        tr_torrentStop (tor);
3648    }
3649
3650  /* pump the queues */
3651  queuePulse (session, TR_UP);
3652  queuePulse (session, TR_DOWN);
3653
3654  reconnectPulse (0, 0, mgr);
3655
3656  tr_timerAddMsec (mgr->bandwidthTimer, BANDWIDTH_PERIOD_MSEC);
3657  managerUnlock (mgr);
3658}
3659
3660/***
3661****
3662***/
3663
3664static int
3665compareAtomPtrsByAddress (const void * va, const void *vb)
3666{
3667  const struct peer_atom * a = * (const struct peer_atom**) va;
3668  const struct peer_atom * b = * (const struct peer_atom**) vb;
3669
3670  assert (tr_isAtom (a));
3671  assert (tr_isAtom (b));
3672
3673  return tr_address_compare (&a->addr, &b->addr);
3674}
3675
3676/* best come first, worst go last */
3677static int
3678compareAtomPtrsByShelfDate (const void * va, const void *vb)
3679{
3680  time_t atime;
3681  time_t btime;
3682  const struct peer_atom * a = * (const struct peer_atom**) va;
3683  const struct peer_atom * b = * (const struct peer_atom**) vb;
3684  const int data_time_cutoff_secs = 60 * 60;
3685  const time_t tr_now = tr_time ();
3686
3687  assert (tr_isAtom (a));
3688  assert (tr_isAtom (b));
3689
3690  /* primary key: the last piece data time *if* it was within the last hour */
3691  atime = a->piece_data_time; if (atime + data_time_cutoff_secs < tr_now) atime = 0;
3692  btime = b->piece_data_time; if (btime + data_time_cutoff_secs < tr_now) btime = 0;
3693  if (atime != btime)
3694    return atime > btime ? -1 : 1;
3695
3696  /* secondary key: shelf date. */
3697  if (a->shelf_date != b->shelf_date)
3698    return a->shelf_date > b->shelf_date ? -1 : 1;
3699
3700  return 0;
3701}
3702
3703static int
3704getMaxAtomCount (const tr_torrent * tor)
3705{
3706  return MIN (50, tor->maxConnectedPeers * 3);
3707}
3708
3709static void
3710atomPulse (int foo UNUSED, short bar UNUSED, void * vmgr)
3711{
3712  tr_torrent * tor = NULL;
3713  tr_peerMgr * mgr = vmgr;
3714  managerLock (mgr);
3715
3716  while ((tor = tr_torrentNext (mgr->session, tor)))
3717    {
3718      int atomCount;
3719      tr_swarm * s = tor->swarm;
3720      const int maxAtomCount = getMaxAtomCount (tor);
3721      struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek (&s->pool, &atomCount);
3722
3723      if (atomCount > maxAtomCount) /* we've got too many atoms... time to prune */
3724        {
3725          int i;
3726          int keepCount = 0;
3727          int testCount = 0;
3728          struct peer_atom ** keep = tr_new (struct peer_atom*, atomCount);
3729          struct peer_atom ** test = tr_new (struct peer_atom*, atomCount);
3730
3731          /* keep the ones that are in use */
3732          for (i=0; i<atomCount; ++i)
3733            {
3734              struct peer_atom * atom = atoms[i];
3735              if (peerIsInUse (s, atom))
3736                keep[keepCount++] = atom;
3737              else
3738                test[testCount++] = atom;
3739            }
3740
3741          /* if there's room, keep the best of what's left */
3742          i = 0;
3743          if (keepCount < maxAtomCount)
3744            {
3745              qsort (test, testCount, sizeof (struct peer_atom *), compareAtomPtrsByShelfDate);
3746              while (i<testCount && keepCount<maxAtomCount)
3747                keep[keepCount++] = test[i++];
3748            }
3749
3750          /* free the culled atoms */
3751          while (i<testCount)
3752            tr_free (test[i++]);
3753
3754          /* rebuild Torrent.pool with what's left */
3755          tr_ptrArrayDestruct (&s->pool, NULL);
3756          s->pool = TR_PTR_ARRAY_INIT;
3757          qsort (keep, keepCount, sizeof (struct peer_atom *), compareAtomPtrsByAddress);
3758          for (i=0; i<keepCount; ++i)
3759            tr_ptrArrayAppend (&s->pool, keep[i]);
3760
3761          tordbg (s, "max atom count is %d... pruned from %d to %d\n", maxAtomCount, atomCount, keepCount);
3762
3763          /* cleanup */
3764          tr_free (test);
3765          tr_free (keep);
3766        }
3767    }
3768
3769  tr_timerAddMsec (mgr->atomTimer, ATOM_PERIOD_MSEC);
3770  managerUnlock (mgr);
3771}
3772
3773/***
3774****
3775****
3776****
3777***/
3778
3779/* is this atom someone that we'd want to initiate a connection to? */
3780static bool
3781isPeerCandidate (const tr_torrent * tor, struct peer_atom * atom, const time_t now)
3782{
3783  /* not if we're both seeds */
3784  if (tr_torrentIsSeed (tor) && atomIsSeed (atom))
3785    return false;
3786
3787  /* not if we've already got a connection to them... */
3788  if (peerIsInUse (tor->swarm, atom))
3789    return false;
3790
3791  /* not if we just tried them already */
3792  if ((now - atom->time) < getReconnectIntervalSecs (atom, now))
3793    return false;
3794
3795  /* not if they're blocklisted */
3796  if (isAtomBlocklisted (tor->session, atom))
3797    return false;
3798
3799  /* not if they're banned... */
3800  if (atom->flags2 & MYFLAG_BANNED)
3801    return false;
3802
3803  return true;
3804}
3805
3806struct peer_candidate
3807{
3808  uint64_t score;
3809  tr_torrent * tor;
3810  struct peer_atom * atom;
3811};
3812
3813static bool
3814torrentWasRecentlyStarted (const tr_torrent * tor)
3815{
3816  return difftime (tr_time (), tor->startDate) < 120;
3817}
3818
3819static inline uint64_t
3820addValToKey (uint64_t value, int width, uint64_t addme)
3821{
3822  value = (value << (uint64_t)width);
3823  value |= addme;
3824  return value;
3825}
3826
3827/* smaller value is better */
3828static uint64_t
3829getPeerCandidateScore (const tr_torrent * tor, const struct peer_atom * atom, uint8_t salt)
3830{
3831  uint64_t i;
3832  uint64_t score = 0;
3833  const bool failed = atom->lastConnectionAt < atom->lastConnectionAttemptAt;
3834
3835  /* prefer peers we've connected to, or never tried, over peers we failed to connect to. */
3836  i = failed ? 1 : 0;
3837  score = addValToKey (score, 1, i);
3838
3839  /* prefer the one we attempted least recently (to cycle through all peers) */
3840  i = atom->lastConnectionAttemptAt;
3841  score = addValToKey (score, 32, i);
3842
3843  /* prefer peers belonging to a torrent of a higher priority */
3844  switch (tr_torrentGetPriority (tor))
3845    {
3846      case TR_PRI_HIGH:    i = 0; break;
3847      case TR_PRI_NORMAL:  i = 1; break;
3848      case TR_PRI_LOW:     i = 2; break;
3849    }
3850  score = addValToKey (score, 4, i);
3851
3852  /* prefer recently-started torrents */
3853  i = torrentWasRecentlyStarted (tor) ? 0 : 1;
3854  score = addValToKey (score, 1, i);
3855
3856  /* prefer torrents we're downloading with */
3857  i = tr_torrentIsSeed (tor) ? 1 : 0;
3858  score = addValToKey (score, 1, i);
3859
3860  /* prefer peers that are known to be connectible */
3861  i = (atom->flags & ADDED_F_CONNECTABLE) ? 0 : 1;
3862  score = addValToKey (score, 1, i);
3863
3864  /* prefer peers that we might have a chance of uploading to...
3865  so lower seed probability is better */
3866  if (atom->seedProbability == 100) i = 101;
3867  else if (atom->seedProbability == -1) i = 100;
3868  else i = atom->seedProbability;
3869  score = addValToKey (score, 8, i);
3870
3871  /* Prefer peers that we got from more trusted sources.
3872   * lower `fromBest' values indicate more trusted sources */
3873  score = addValToKey (score, 4, atom->fromBest);
3874
3875  /* salt */
3876  score = addValToKey (score, 8, salt);
3877
3878  return score;
3879}
3880
3881static int
3882comparePeerCandidates (const void * va, const void * vb)
3883{
3884  int ret;
3885  const struct peer_candidate * a = va;
3886  const struct peer_candidate * b = vb;
3887
3888  if (a->score < b->score)
3889    ret = -1;
3890  else if (a->score > b->score)
3891    ret = 1;
3892  else
3893    ret = 0;
3894
3895  return ret;
3896}
3897
3898/* Partial sorting -- selecting the k best candidates
3899   Adapted from http://en.wikipedia.org/wiki/Selection_algorithm */
3900static void
3901selectPeerCandidates (struct peer_candidate * candidates, int candidate_count, int select_count)
3902{
3903  tr_quickfindFirstK (candidates,
3904                      candidate_count,
3905                      sizeof(struct peer_candidate),
3906                      comparePeerCandidates,
3907                      select_count);
3908}
3909
3910#ifndef NDEBUG
3911static bool
3912checkBestScoresComeFirst (const struct peer_candidate * candidates, int n, int k)
3913{
3914  int i;
3915  uint64_t worstFirstScore = 0;
3916  const int x = MIN (n, k) - 1;
3917
3918  for (i=0; i<x; i++)
3919    if (worstFirstScore < candidates[i].score)
3920      worstFirstScore = candidates[i].score;
3921
3922  for (i=0; i<x; i++)
3923    assert (candidates[i].score <= worstFirstScore);
3924
3925  for (i=x+1; i<n; i++)
3926    assert (candidates[i].score >= worstFirstScore);
3927
3928  return true;
3929}
3930#endif /* NDEBUG */
3931
3932/** @return an array of all the atoms we might want to connect to */
3933static struct peer_candidate*
3934getPeerCandidates (tr_session * session, int * candidateCount, int max)
3935{
3936  int atomCount;
3937  int peerCount;
3938  tr_torrent * tor;
3939  struct peer_candidate * candidates;
3940  struct peer_candidate * walk;
3941  const time_t now = tr_time ();
3942  const uint64_t now_msec = tr_time_msec ();
3943  /* leave 5% of connection slots for incoming connections -- ticket #2609 */
3944  const int maxCandidates = tr_sessionGetPeerLimit (session) * 0.95;
3945
3946  /* count how many peers and atoms we've got */
3947  tor= NULL;
3948  atomCount = 0;
3949  peerCount = 0;
3950  while ((tor = tr_torrentNext (session, tor)))
3951    {
3952      atomCount += tr_ptrArraySize (&tor->swarm->pool);
3953      peerCount += tr_ptrArraySize (&tor->swarm->peers);
3954    }
3955
3956  /* don't start any new handshakes if we're full up */
3957  if (maxCandidates <= peerCount)
3958    {
3959      *candidateCount = 0;
3960      return NULL;
3961    }
3962
3963  /* allocate an array of candidates */
3964  walk = candidates = tr_new (struct peer_candidate, atomCount);
3965
3966  /* populate the candidate array */
3967  tor = NULL;
3968  while ((tor = tr_torrentNext (session, tor)))
3969    {
3970      int i, nAtoms;
3971      struct peer_atom ** atoms;
3972
3973      if (!tor->swarm->isRunning)
3974        continue;
3975
3976      /* if we've already got enough peers in this torrent... */
3977      if (tr_torrentGetPeerLimit (tor) <= tr_ptrArraySize (&tor->swarm->peers))
3978        continue;
3979
3980      /* if we've already got enough speed in this torrent... */
3981      if (tr_torrentIsSeed (tor) && isBandwidthMaxedOut (&tor->bandwidth, now_msec, TR_UP))
3982        continue;
3983
3984      atoms = (struct peer_atom**) tr_ptrArrayPeek (&tor->swarm->pool, &nAtoms);
3985      for (i=0; i<nAtoms; ++i)
3986        {
3987          struct peer_atom * atom = atoms[i];
3988
3989          if (isPeerCandidate (tor, atom, now))
3990            {
3991              const uint8_t salt = tr_cryptoWeakRandInt (1024);
3992              walk->tor = tor;
3993              walk->atom = atom;
3994              walk->score = getPeerCandidateScore (tor, atom, salt);
3995              ++walk;
3996            }
3997        }
3998    }
3999
4000  *candidateCount = walk - candidates;
4001  if (walk != candidates)
4002    selectPeerCandidates (candidates, walk-candidates, max);
4003
4004  assert (checkBestScoresComeFirst (candidates, *candidateCount, max));
4005  return candidates;
4006}
4007
4008static void
4009initiateConnection (tr_peerMgr * mgr, tr_swarm * s, struct peer_atom * atom)
4010{
4011  tr_peerIo * io;
4012  const time_t now = tr_time ();
4013  bool utp = tr_sessionIsUTPEnabled (mgr->session) && !atom->utp_failed;
4014
4015  if (atom->fromFirst == TR_PEER_FROM_PEX)
4016    /* PEX has explicit signalling for uTP support.  If an atom
4017       originally came from PEX and doesn't have the uTP flag, skip the
4018       uTP connection attempt.  Are we being optimistic here? */
4019    utp = utp && (atom->flags & ADDED_F_UTP_FLAGS);
4020
4021  tordbg (s, "Starting an OUTGOING%s connection with %s",
4022          utp ? " µTP" : "", tr_atomAddrStr (atom));
4023
4024  io = tr_peerIoNewOutgoing (mgr->session,
4025                             &mgr->session->bandwidth,
4026                             &atom->addr,
4027                             atom->port,
4028                             s->tor->info.hash,
4029                             s->tor->completeness == TR_SEED,
4030                             utp);
4031
4032  if (io == NULL)
4033    {
4034      tordbg (s, "peerIo not created; marking peer %s as unreachable", tr_atomAddrStr (atom));
4035      atom->flags2 |= MYFLAG_UNREACHABLE;
4036      atom->numFails++;
4037    }
4038  else
4039    {
4040      tr_handshake * handshake = tr_handshakeNew (io,
4041                                                  mgr->session->encryptionMode,
4042                                                  myHandshakeDoneCB,
4043                                                  mgr);
4044
4045      assert (tr_peerIoGetTorrentHash (io));
4046
4047      tr_peerIoUnref (io); /* balanced by the initial ref
4048                              in tr_peerIoNewOutgoing () */
4049
4050      tr_ptrArrayInsertSorted (&s->outgoingHandshakes, handshake,
4051                               handshakeCompare);
4052    }
4053
4054  atom->lastConnectionAttemptAt = now;
4055  atom->time = now;
4056}
4057
4058static void
4059initiateCandidateConnection (tr_peerMgr * mgr, struct peer_candidate * c)
4060{
4061#if 0
4062  fprintf (stderr, "Starting an OUTGOING connection with %s - [%s] seedProbability==%d; %s, %s\n",
4063           tr_atomAddrStr (c->atom),
4064           tr_torrentName (c->tor),
4065           (int)c->atom->seedProbability,
4066           tr_torrentIsPrivate (c->tor) ? "private" : "public",
4067           tr_torrentIsSeed (c->tor) ? "seed" : "downloader");
4068#endif
4069
4070  initiateConnection (mgr, c->tor->swarm, c->atom);
4071}
4072
4073static void
4074makeNewPeerConnections (struct tr_peerMgr * mgr, const int max)
4075{
4076  int i, n;
4077  struct peer_candidate * candidates;
4078
4079  candidates = getPeerCandidates (mgr->session, &n, max);
4080
4081  for (i=0; i<n && i<max; ++i)
4082    initiateCandidateConnection (mgr, &candidates[i]);
4083
4084  tr_free (candidates);
4085}
Note: See TracBrowser for help on using the repository browser.