source: trunk/libtransmission/peer-mgr.c @ 12918

Last change on this file since 12918 was 12905, checked in by jordan, 10 years ago

#4496 'freeze when having a huge torrent' -- more tweaks based on Shark reports from MechMK1

  • Property svn:keywords set to Date Rev Author Id
File size: 115.9 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 12905 2011-09-21 23:04:39Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h> /* error codes ERANGE, ... */
15#include <limits.h> /* INT_MAX */
16#include <string.h> /* memcpy, memcmp, strstr */
17#include <stdlib.h> /* qsort */
18
19#include <event2/event.h>
20#include <libutp/utp.h>
21
22#include "transmission.h"
23#include "announcer.h"
24#include "bandwidth.h"
25#include "blocklist.h"
26#include "cache.h"
27#include "clients.h"
28#include "completion.h"
29#include "crypto.h"
30#include "handshake.h"
31#include "net.h"
32#include "peer-io.h"
33#include "peer-mgr.h"
34#include "peer-msgs.h"
35#include "ptrarray.h"
36#include "session.h"
37#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
38#include "torrent.h"
39#include "utils.h"
40#include "webseed.h"
41
42enum
43{
44    /* how frequently to cull old atoms */
45    ATOM_PERIOD_MSEC = ( 60 * 1000 ),
46
47    /* how frequently to change which peers are choked */
48    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
49
50    /* an optimistically unchoked peer is immune from rechoking
51       for this many calls to rechokeUploads(). */
52    OPTIMISTIC_UNCHOKE_MULTIPLIER = 4,
53
54    /* how frequently to reallocate bandwidth */
55    BANDWIDTH_PERIOD_MSEC = 500,
56
57    /* how frequently to age out old piece request lists */
58    REFILL_UPKEEP_PERIOD_MSEC = ( 10 * 1000 ),
59
60    /* how frequently to decide which peers live and die */
61    RECONNECT_PERIOD_MSEC = 500,
62
63    /* when many peers are available, keep idle ones this long */
64    MIN_UPLOAD_IDLE_SECS = ( 60 ),
65
66    /* when few peers are available, keep idle ones this long */
67    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
68
69    /* max number of peers to ask for per second overall.
70     * this throttle is to avoid overloading the router */
71    MAX_CONNECTIONS_PER_SECOND = 12,
72
73    MAX_CONNECTIONS_PER_PULSE = (int)(MAX_CONNECTIONS_PER_SECOND * (RECONNECT_PERIOD_MSEC/1000.0)),
74
75    /* number of bad pieces a peer is allowed to send before we ban them */
76    MAX_BAD_PIECES_PER_PEER = 5,
77
78    /* amount of time to keep a list of request pieces lying around
79       before it's considered too old and needs to be rebuilt */
80    PIECE_LIST_SHELF_LIFE_SECS = 60,
81
82    /* use for bitwise operations w/peer_atom.flags2 */
83    MYFLAG_BANNED = 1,
84
85    /* use for bitwise operations w/peer_atom.flags2 */
86    /* unreachable for now... but not banned.
87     * if they try to connect to us it's okay */
88    MYFLAG_UNREACHABLE = 2,
89
90    /* the minimum we'll wait before attempting to reconnect to a peer */
91    MINIMUM_RECONNECT_INTERVAL_SECS = 5,
92
93    /** how long we'll let requests we've made linger before we cancel them */
94    REQUEST_TTL_SECS = 120,
95
96    NO_BLOCKS_CANCEL_HISTORY = 120,
97
98    CANCEL_HISTORY_SEC = 60
99};
100
101const tr_peer_event TR_PEER_EVENT_INIT = { 0, 0, NULL, 0, 0, 0, false, 0 };
102
103/**
104***
105**/
106
107/**
108 * Peer information that should be kept even before we've connected and
109 * after we've disconnected. These are kept in a pool of peer_atoms to decide
110 * which ones would make good candidates for connecting to, and to watch out
111 * for banned peers.
112 *
113 * @see tr_peer
114 * @see tr_peermsgs
115 */
116struct peer_atom
117{
118    uint8_t     fromFirst;          /* where the peer was first found */
119    uint8_t     fromBest;           /* the "best" value of where the peer has been found */
120    uint8_t     flags;              /* these match the added_f flags */
121    uint8_t     flags2;             /* flags that aren't defined in added_f */
122    int8_t      seedProbability;    /* how likely is this to be a seed... [0..100] or -1 for unknown */
123    int8_t      blocklisted;        /* -1 for unknown, true for blocklisted, false for not blocklisted */
124
125    tr_port     port;
126    bool        utp_failed;         /* We recently failed to connect over uTP */
127    uint16_t    numFails;
128    time_t      time;               /* when the peer's connection status last changed */
129    time_t      piece_data_time;
130
131    time_t      lastConnectionAttemptAt;
132    time_t      lastConnectionAt;
133
134    /* similar to a TTL field, but less rigid --
135     * if the swarm is small, the atom will be kept past this date. */
136    time_t      shelf_date;
137    tr_peer   * peer;               /* will be NULL if not connected */
138    tr_address  addr;
139};
140
141#ifdef NDEBUG
142#define tr_isAtom(a) (TRUE)
143#else
144static bool
145tr_isAtom( const struct peer_atom * atom )
146{
147    return ( atom != NULL )
148        && ( atom->fromFirst < TR_PEER_FROM__MAX )
149        && ( atom->fromBest < TR_PEER_FROM__MAX )
150        && ( tr_address_is_valid( &atom->addr ) );
151}
152#endif
153
154static const char*
155tr_atomAddrStr( const struct peer_atom * atom )
156{
157    return atom ? tr_peerIoAddrStr( &atom->addr, atom->port ) : "[no atom]";
158}
159
160struct block_request
161{
162    tr_block_index_t block;
163    tr_peer * peer;
164    time_t sentAt;
165};
166
167struct weighted_piece
168{
169    tr_piece_index_t index;
170    int16_t salt;
171    int16_t requestCount;
172};
173
174enum piece_sort_state
175{
176    PIECES_UNSORTED,
177    PIECES_SORTED_BY_INDEX,
178    PIECES_SORTED_BY_WEIGHT
179};
180
181/** @brief Opaque, per-torrent data structure for peer connection information */
182typedef struct tr_torrent_peers
183{
184    tr_ptrArray                outgoingHandshakes; /* tr_handshake */
185    tr_ptrArray                pool; /* struct peer_atom */
186    tr_ptrArray                peers; /* tr_peer */
187    tr_ptrArray                webseeds; /* tr_webseed */
188
189    tr_torrent               * tor;
190    struct tr_peerMgr        * manager;
191
192    tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
193    int                        optimisticUnchokeTimeScaler;
194
195    bool                       isRunning;
196    bool                       needsCompletenessCheck;
197
198    struct block_request     * requests;
199    int                        requestCount;
200    int                        requestAlloc;
201
202    struct weighted_piece    * pieces;
203    int                        pieceCount;
204    enum piece_sort_state      pieceSortState;
205
206    /* An array of pieceCount items stating how many peers have each piece.
207       This is used to help us for downloading pieces "rarest first."
208       This may be NULL if we don't have metainfo yet, or if we're not
209       downloading and don't care about rarity */
210    uint16_t                 * pieceReplication;
211    size_t                     pieceReplicationSize;
212
213    int                        interestedCount;
214    int                        maxPeers;
215    time_t                     lastCancel;
216
217    /* Before the endgame this should be 0. In endgame, is contains the average
218     * number of pending requests per peer. Only peers which have more pending
219     * requests are considered 'fast' are allowed to request a block that's
220     * already been requested from another (slower?) peer. */
221    int                        endgame;
222}
223Torrent;
224
225struct tr_peerMgr
226{
227    tr_session    * session;
228    tr_ptrArray     incomingHandshakes; /* tr_handshake */
229    struct event  * bandwidthTimer;
230    struct event  * rechokeTimer;
231    struct event  * refillUpkeepTimer;
232    struct event  * atomTimer;
233};
234
235#define tordbg( t, ... ) \
236    do { \
237        if( tr_deepLoggingIsActive( ) ) \
238            tr_deepLog( __FILE__, __LINE__, tr_torrentName( t->tor ), __VA_ARGS__ ); \
239    } while( 0 )
240
241#define dbgmsg( ... ) \
242    do { \
243        if( tr_deepLoggingIsActive( ) ) \
244            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
245    } while( 0 )
246
247/**
248***
249**/
250
251static inline void
252managerLock( const struct tr_peerMgr * manager )
253{
254    tr_sessionLock( manager->session );
255}
256
257static inline void
258managerUnlock( const struct tr_peerMgr * manager )
259{
260    tr_sessionUnlock( manager->session );
261}
262
263static inline void
264torrentLock( Torrent * torrent )
265{
266    managerLock( torrent->manager );
267}
268
269static inline void
270torrentUnlock( Torrent * torrent )
271{
272    managerUnlock( torrent->manager );
273}
274
275static inline int
276torrentIsLocked( const Torrent * t )
277{
278    return tr_sessionIsLocked( t->manager->session );
279}
280
281/**
282***
283**/
284
285static int
286handshakeCompareToAddr( const void * va, const void * vb )
287{
288    const tr_handshake * a = va;
289
290    return tr_address_compare( tr_handshakeGetAddr( a, NULL ), vb );
291}
292
293static int
294handshakeCompare( const void * a, const void * b )
295{
296    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
297}
298
299static inline tr_handshake*
300getExistingHandshake( tr_ptrArray * handshakes, const tr_address * addr )
301{
302    if( tr_ptrArrayEmpty( handshakes ) )
303        return NULL;
304
305    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
306}
307
308static int
309comparePeerAtomToAddress( const void * va, const void * vb )
310{
311    const struct peer_atom * a = va;
312
313    return tr_address_compare( &a->addr, vb );
314}
315
316static int
317compareAtomsByAddress( const void * va, const void * vb )
318{
319    const struct peer_atom * b = vb;
320
321    assert( tr_isAtom( b ) );
322
323    return comparePeerAtomToAddress( va, &b->addr );
324}
325
326/**
327***
328**/
329
330const tr_address *
331tr_peerAddress( const tr_peer * peer )
332{
333    return &peer->atom->addr;
334}
335
336static Torrent*
337getExistingTorrent( tr_peerMgr *    manager,
338                    const uint8_t * hash )
339{
340    tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash );
341
342    return tor == NULL ? NULL : tor->torrentPeers;
343}
344
345static int
346peerCompare( const void * a, const void * b )
347{
348    return tr_address_compare( tr_peerAddress( a ), tr_peerAddress( b ) );
349}
350
351static struct peer_atom*
352getExistingAtom( const Torrent    * t,
353                 const tr_address * addr )
354{
355    Torrent * tt = (Torrent*)t;
356    assert( torrentIsLocked( t ) );
357    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
358}
359
360static bool
361peerIsInUse( const Torrent * ct, const struct peer_atom * atom )
362{
363    Torrent * t = (Torrent*) ct;
364
365    assert( torrentIsLocked ( t ) );
366
367    return ( atom->peer != NULL )
368        || getExistingHandshake( &t->outgoingHandshakes, &atom->addr )
369        || getExistingHandshake( &t->manager->incomingHandshakes, &atom->addr );
370}
371
372void
373tr_peerConstruct( tr_peer * peer )
374{
375    memset( peer, 0, sizeof( tr_peer ) );
376
377    peer->have = TR_BITFIELD_INIT;
378}
379
380static tr_peer*
381peerNew( struct peer_atom * atom )
382{
383    tr_peer * peer = tr_new( tr_peer, 1 );
384    tr_peerConstruct( peer );
385
386    peer->atom = atom;
387    atom->peer = peer;
388
389    return peer;
390}
391
392static tr_peer*
393getPeer( Torrent * torrent, struct peer_atom * atom )
394{
395    tr_peer * peer;
396
397    assert( torrentIsLocked( torrent ) );
398
399    peer = atom->peer;
400
401    if( peer == NULL )
402    {
403        peer = peerNew( atom );
404        tr_bitfieldConstruct( &peer->have, torrent->tor->blockCount );
405        tr_bitfieldConstruct( &peer->blame, torrent->tor->blockCount );
406        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
407    }
408
409    return peer;
410}
411
412static void peerDeclinedAllRequests( Torrent *, const tr_peer * );
413
414void
415tr_peerDestruct( tr_torrent * tor, tr_peer * peer )
416{
417    assert( peer != NULL );
418
419    peerDeclinedAllRequests( tor->torrentPeers, peer );
420
421    if( peer->msgs != NULL )
422        tr_peerMsgsFree( peer->msgs );
423
424    if( peer->io ) {
425        tr_peerIoClear( peer->io );
426        tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
427    }
428
429    tr_bitfieldDestruct( &peer->have );
430    tr_bitfieldDestruct( &peer->blame );
431    tr_free( peer->client );
432
433    if( peer->atom )
434        peer->atom->peer = NULL;
435}
436
437static void
438peerDelete( Torrent * t, tr_peer * peer )
439{
440    tr_peerDestruct( t->tor, peer );
441    tr_free( peer );
442}
443
444static bool
445replicationExists( const Torrent * t )
446{
447    return t->pieceReplication != NULL;
448}
449
450static void
451replicationFree( Torrent * t )
452{
453    tr_free( t->pieceReplication );
454    t->pieceReplication = NULL;
455    t->pieceReplicationSize = 0;
456}
457
458static void
459replicationNew( Torrent * t )
460{
461    tr_piece_index_t piece_i;
462    const tr_piece_index_t piece_count = t->tor->info.pieceCount;
463    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
464    const int peer_count = tr_ptrArraySize( &t->peers );
465
466    assert( !replicationExists( t ) );
467
468    t->pieceReplicationSize = piece_count;
469    t->pieceReplication = tr_new0( uint16_t, piece_count );
470
471    for( piece_i=0; piece_i<piece_count; ++piece_i )
472    {
473        int peer_i;
474        uint16_t r = 0;
475
476        for( peer_i=0; peer_i<peer_count; ++peer_i )
477            if( tr_bitfieldHas( &peers[peer_i]->have, piece_i ) )
478                ++r;
479
480        t->pieceReplication[piece_i] = r;
481    }
482}
483
484static void
485torrentFree( void * vt )
486{
487    Torrent * t = vt;
488
489    assert( t );
490    assert( !t->isRunning );
491    assert( torrentIsLocked( t ) );
492    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
493    assert( tr_ptrArrayEmpty( &t->peers ) );
494
495    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
496    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
497    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
498    tr_ptrArrayDestruct( &t->peers, NULL );
499
500    replicationFree( t );
501
502    tr_free( t->requests );
503    tr_free( t->pieces );
504    tr_free( t );
505}
506
507static void peerCallbackFunc( tr_peer *, const tr_peer_event *, void * );
508
509static Torrent*
510torrentNew( tr_peerMgr * manager, tr_torrent * tor )
511{
512    int       i;
513    Torrent * t;
514
515    t = tr_new0( Torrent, 1 );
516    t->manager = manager;
517    t->tor = tor;
518    t->pool = TR_PTR_ARRAY_INIT;
519    t->peers = TR_PTR_ARRAY_INIT;
520    t->webseeds = TR_PTR_ARRAY_INIT;
521    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
522
523    for( i = 0; i < tor->info.webseedCount; ++i )
524    {
525        tr_webseed * w =
526            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
527        tr_ptrArrayAppend( &t->webseeds, w );
528    }
529
530    return t;
531}
532
533static void ensureMgrTimersExist( struct tr_peerMgr * m );
534
535tr_peerMgr*
536tr_peerMgrNew( tr_session * session )
537{
538    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
539    m->session = session;
540    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
541    ensureMgrTimersExist( m );
542    return m;
543}
544
545static void
546deleteTimer( struct event ** t )
547{
548    if( *t != NULL )
549    {
550        event_free( *t );
551        *t = NULL;
552    }
553}
554
555static void
556deleteTimers( struct tr_peerMgr * m )
557{
558    deleteTimer( &m->atomTimer );
559    deleteTimer( &m->bandwidthTimer );
560    deleteTimer( &m->rechokeTimer );
561    deleteTimer( &m->refillUpkeepTimer );
562}
563
564void
565tr_peerMgrFree( tr_peerMgr * manager )
566{
567    managerLock( manager );
568
569    deleteTimers( manager );
570
571    /* free the handshakes. Abort invokes handshakeDoneCB(), which removes
572     * the item from manager->handshakes, so this is a little roundabout... */
573    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
574        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
575
576    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
577
578    managerUnlock( manager );
579    tr_free( manager );
580}
581
582static int
583clientIsDownloadingFrom( const tr_torrent * tor, const tr_peer * peer )
584{
585    if( !tr_torrentHasMetadata( tor ) )
586        return true;
587
588    return peer->clientIsInterested && !peer->clientIsChoked;
589}
590
591static int
592clientIsUploadingTo( const tr_peer * peer )
593{
594    return peer->peerIsInterested && !peer->peerIsChoked;
595}
596
597/***
598****
599***/
600
601void
602tr_peerMgrOnBlocklistChanged( tr_peerMgr * mgr )
603{
604    tr_torrent * tor = NULL;
605    tr_session * session = mgr->session;
606
607    /* we cache whether or not a peer is blocklisted...
608       since the blocklist has changed, erase that cached value */
609    while(( tor = tr_torrentNext( session, tor )))
610    {
611        int i;
612        Torrent * t = tor->torrentPeers;
613        const int n = tr_ptrArraySize( &t->pool );
614        for( i=0; i<n; ++i ) {
615            struct peer_atom * atom = tr_ptrArrayNth( &t->pool, i );
616            atom->blocklisted = -1;
617        }
618    }
619}
620
621static bool
622isAtomBlocklisted( tr_session * session, struct peer_atom * atom )
623{
624    if( atom->blocklisted < 0 )
625        atom->blocklisted = tr_sessionIsAddressBlocked( session, &atom->addr );
626
627    assert( tr_isBool( atom->blocklisted ) );
628    return atom->blocklisted;
629}
630
631
632/***
633****
634***/
635
636static void
637atomSetSeedProbability( struct peer_atom * atom, int seedProbability )
638{
639    assert( atom != NULL );
640    assert( -1<=seedProbability && seedProbability<=100 );
641
642    atom->seedProbability = seedProbability;
643
644    if( seedProbability == 100 )
645        atom->flags |= ADDED_F_SEED_FLAG;
646    else if( seedProbability != -1 )
647        atom->flags &= ~ADDED_F_SEED_FLAG;
648}
649
650static inline bool
651atomIsSeed( const struct peer_atom * atom )
652{
653    return atom->seedProbability == 100;
654}
655
656static void
657atomSetSeed( const Torrent * t, struct peer_atom * atom )
658{
659    if( !atomIsSeed( atom ) )
660    {
661        tordbg( t, "marking peer %s as a seed", tr_atomAddrStr( atom ) );
662
663        atomSetSeedProbability( atom, 100 );
664    }
665}
666
667
668bool
669tr_peerMgrPeerIsSeed( const tr_torrent  * tor,
670                      const tr_address  * addr )
671{
672    bool isSeed = false;
673    const Torrent * t = tor->torrentPeers;
674    const struct peer_atom * atom = getExistingAtom( t, addr );
675
676    if( atom )
677        isSeed = atomIsSeed( atom );
678
679    return isSeed;
680}
681
682void
683tr_peerMgrSetUtpSupported( tr_torrent * tor, const tr_address * addr )
684{
685    struct peer_atom * atom = getExistingAtom( tor->torrentPeers, addr );
686
687    if( atom )
688        atom->flags |= ADDED_F_UTP_FLAGS;
689}
690
691void
692tr_peerMgrSetUtpFailed( tr_torrent *tor, const tr_address *addr, bool failed )
693{
694    struct peer_atom * atom = getExistingAtom( tor->torrentPeers, addr );
695
696    if( atom )
697        atom->utp_failed = failed;
698}
699
700
701/**
702***  REQUESTS
703***
704*** There are two data structures associated with managing block requests:
705***
706*** 1. Torrent::requests, an array of "struct block_request" which keeps
707***    track of which blocks have been requested, and when, and by which peers.
708***    This is list is used for (a) cancelling requests that have been pending
709***    for too long and (b) avoiding duplicate requests before endgame.
710***
711*** 2. Torrent::pieces, an array of "struct weighted_piece" which lists the
712***    pieces that we want to request. It's used to decide which blocks to
713***    return next when tr_peerMgrGetBlockRequests() is called.
714**/
715
716/**
717*** struct block_request
718**/
719
720static int
721compareReqByBlock( const void * va, const void * vb )
722{
723    const struct block_request * a = va;
724    const struct block_request * b = vb;
725
726    /* primary key: block */
727    if( a->block < b->block ) return -1;
728    if( a->block > b->block ) return 1;
729
730    /* secondary key: peer */
731    if( a->peer < b->peer ) return -1;
732    if( a->peer > b->peer ) return 1;
733
734    return 0;
735}
736
737static void
738requestListAdd( Torrent * t, tr_block_index_t block, tr_peer * peer )
739{
740    struct block_request key;
741
742    /* ensure enough room is available... */
743    if( t->requestCount + 1 >= t->requestAlloc )
744    {
745        const int CHUNK_SIZE = 128;
746        t->requestAlloc += CHUNK_SIZE;
747        t->requests = tr_renew( struct block_request,
748                                t->requests, t->requestAlloc );
749    }
750
751    /* populate the record we're inserting */
752    key.block = block;
753    key.peer = peer;
754    key.sentAt = tr_time( );
755
756    /* insert the request to our array... */
757    {
758        bool exact;
759        const int pos = tr_lowerBound( &key, t->requests, t->requestCount,
760                                       sizeof( struct block_request ),
761                                       compareReqByBlock, &exact );
762        assert( !exact );
763        memmove( t->requests + pos + 1,
764                 t->requests + pos,
765                 sizeof( struct block_request ) * ( t->requestCount++ - pos ) );
766        t->requests[pos] = key;
767    }
768
769    if( peer != NULL )
770    {
771        ++peer->pendingReqsToPeer;
772        assert( peer->pendingReqsToPeer >= 0 );
773    }
774
775    /*fprintf( stderr, "added request of block %lu from peer %s... "
776                       "there are now %d block\n",
777                       (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
778}
779
780static struct block_request *
781requestListLookup( Torrent * t, tr_block_index_t block, const tr_peer * peer )
782{
783    struct block_request key;
784    key.block = block;
785    key.peer = (tr_peer*) peer;
786
787    return bsearch( &key, t->requests, t->requestCount,
788                    sizeof( struct block_request ),
789                    compareReqByBlock );
790}
791
792/**
793 * Find the peers are we currently requesting the block
794 * with index @a block from and append them to @a peerArr.
795 */
796static void
797getBlockRequestPeers( Torrent * t, tr_block_index_t block,
798                      tr_ptrArray * peerArr )
799{
800    bool exact;
801    int i, pos;
802    struct block_request key;
803
804    key.block = block;
805    key.peer = NULL;
806    pos = tr_lowerBound( &key, t->requests, t->requestCount,
807                         sizeof( struct block_request ),
808                         compareReqByBlock, &exact );
809
810    assert( !exact ); /* shouldn't have a request with .peer == NULL */
811
812    for( i = pos; i < t->requestCount; ++i )
813    {
814        if( t->requests[i].block != block )
815            break;
816        tr_ptrArrayAppend( peerArr, t->requests[i].peer );
817    }
818}
819
820static void
821decrementPendingReqCount( const struct block_request * b )
822{
823    if( b->peer != NULL )
824        if( b->peer->pendingReqsToPeer > 0 )
825            --b->peer->pendingReqsToPeer;
826}
827
828static void
829requestListRemove( Torrent * t, tr_block_index_t block, const tr_peer * peer )
830{
831    const struct block_request * b = requestListLookup( t, block, peer );
832    if( b != NULL )
833    {
834        const int pos = b - t->requests;
835        assert( pos < t->requestCount );
836
837        decrementPendingReqCount( b );
838
839        tr_removeElementFromArray( t->requests,
840                                   pos,
841                                   sizeof( struct block_request ),
842                                   t->requestCount-- );
843
844        /*fprintf( stderr, "removing request of block %lu from peer %s... "
845                           "there are now %d block requests left\n",
846                           (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
847    }
848}
849
850static int
851countActiveWebseeds( const Torrent * t )
852{
853    int activeCount = 0;
854    const tr_webseed ** w = (const tr_webseed **) tr_ptrArrayBase( &t->webseeds );
855    const tr_webseed ** const wend = w + tr_ptrArraySize( &t->webseeds );
856
857    for( ; w!=wend; ++w )
858        if( tr_webseedIsActive( *w ) )
859            ++activeCount;
860
861    return activeCount;
862}
863
864static bool
865testForEndgame( const Torrent * t )
866{
867    /* we consider ourselves to be in endgame if the number of bytes
868       we've got requested is >= the number of bytes left to download */
869    return ( t->requestCount * t->tor->blockSize )
870               >= tr_cpLeftUntilDone( &t->tor->completion );
871}
872
873static void
874updateEndgame( Torrent * t )
875{
876    assert( t->requestCount >= 0 );
877
878    if( !testForEndgame( t ) )
879    {
880        /* not in endgame */
881        t->endgame = 0;
882    }
883    else if( !t->endgame ) /* only recalculate when endgame first begins */
884    {
885        int numDownloading = 0;
886        const tr_peer ** p = (const tr_peer **) tr_ptrArrayBase( &t->peers );
887        const tr_peer ** const pend = p + tr_ptrArraySize( &t->peers );
888
889        /* add the active bittorrent peers... */
890        for( ; p!=pend; ++p )
891            if( (*p)->pendingReqsToPeer > 0 )
892                ++numDownloading;
893
894        /* add the active webseeds... */
895        numDownloading += countActiveWebseeds( t );
896
897        /* average number of pending requests per downloading peer */
898        t->endgame = t->requestCount / MAX( numDownloading, 1 );
899    }
900}
901
902
903/****
904*****
905*****  Piece List Manipulation / Accessors
906*****
907****/
908
909static inline void
910invalidatePieceSorting( Torrent * t )
911{
912    t->pieceSortState = PIECES_UNSORTED;
913}
914
915static const tr_torrent * weightTorrent;
916
917static const uint16_t * weightReplication;
918
919static void
920setComparePieceByWeightTorrent( Torrent * t )
921{
922    if( !replicationExists( t ) )
923        replicationNew( t );
924
925    weightTorrent = t->tor;
926    weightReplication = t->pieceReplication;
927}
928
929/* we try to create a "weight" s.t. high-priority pieces come before others,
930 * and that partially-complete pieces come before empty ones. */
931static int
932comparePieceByWeight( const void * va, const void * vb )
933{
934    const struct weighted_piece * a = va;
935    const struct weighted_piece * b = vb;
936    int ia, ib, missing, pending;
937    const tr_torrent * tor = weightTorrent;
938    const uint16_t * rep = weightReplication;
939
940    /* primary key: weight */
941    missing = tr_cpMissingBlocksInPiece( &tor->completion, a->index );
942    pending = a->requestCount;
943    ia = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
944    missing = tr_cpMissingBlocksInPiece( &tor->completion, b->index );
945    pending = b->requestCount;
946    ib = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
947    if( ia < ib ) return -1;
948    if( ia > ib ) return 1;
949
950    /* secondary key: higher priorities go first */
951    ia = tor->info.pieces[a->index].priority;
952    ib = tor->info.pieces[b->index].priority;
953    if( ia > ib ) return -1;
954    if( ia < ib ) return 1;
955
956    /* tertiary key: rarest first. */
957    ia = rep[a->index];
958    ib = rep[b->index];
959    if( ia < ib ) return -1;
960    if( ia > ib ) return 1;
961
962    /* quaternary key: random */
963    if( a->salt < b->salt ) return -1;
964    if( a->salt > b->salt ) return 1;
965
966    /* okay, they're equal */
967    return 0;
968}
969
970static int
971comparePieceByIndex( const void * va, const void * vb )
972{
973    const struct weighted_piece * a = va;
974    const struct weighted_piece * b = vb;
975    if( a->index < b->index ) return -1;
976    if( a->index > b->index ) return 1;
977    return 0;
978}
979
980static void
981pieceListSort( Torrent * t, enum piece_sort_state state )
982{
983    assert( state==PIECES_SORTED_BY_INDEX
984         || state==PIECES_SORTED_BY_WEIGHT );
985
986
987    if( state == PIECES_SORTED_BY_WEIGHT )
988    {
989        setComparePieceByWeightTorrent( t );
990        qsort( t->pieces, t->pieceCount, sizeof( struct weighted_piece ), comparePieceByWeight );
991    }
992    else
993        qsort( t->pieces, t->pieceCount, sizeof( struct weighted_piece ), comparePieceByIndex );
994
995    t->pieceSortState = state;
996}
997
998/**
999 * These functions are useful for testing, but too expensive for nightly builds.
1000 * let's leave it disabled but add an easy hook to compile it back in
1001 */
1002#if 1
1003#define assertWeightedPiecesAreSorted(t)
1004#define assertReplicationCountIsExact(t)
1005#else
1006static void
1007assertWeightedPiecesAreSorted( Torrent * t )
1008{
1009    if( !t->endgame )
1010    {
1011        int i;
1012        setComparePieceByWeightTorrent( t );
1013        for( i=0; i<t->pieceCount-1; ++i )
1014            assert( comparePieceByWeight( &t->pieces[i], &t->pieces[i+1] ) <= 0 );
1015    }
1016}
1017static void
1018assertReplicationCountIsExact( Torrent * t )
1019{
1020    /* This assert might fail due to errors of implementations in other
1021     * clients. It happens when receiving duplicate bitfields/HaveAll/HaveNone
1022     * from a client. If a such a behavior is noticed,
1023     * a bug report should be filled to the faulty client. */
1024
1025    size_t piece_i;
1026    const uint16_t * rep = t->pieceReplication;
1027    const size_t piece_count = t->pieceReplicationSize;
1028    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1029    const int peer_count = tr_ptrArraySize( &t->peers );
1030
1031    assert( piece_count == t->tor->info.pieceCount );
1032
1033    for( piece_i=0; piece_i<piece_count; ++piece_i )
1034    {
1035        int peer_i;
1036        uint16_t r = 0;
1037
1038        for( peer_i=0; peer_i<peer_count; ++peer_i )
1039            if( tr_bitsetHas( &peers[peer_i]->have, piece_i ) )
1040                ++r;
1041
1042        assert( rep[piece_i] == r );
1043    }
1044}
1045#endif
1046
1047static struct weighted_piece *
1048pieceListLookup( Torrent * t, tr_piece_index_t index )
1049{
1050    int i;
1051
1052    for( i=0; i<t->pieceCount; ++i )
1053        if( t->pieces[i].index == index )
1054            return &t->pieces[i];
1055
1056    return NULL;
1057}
1058
1059static void
1060pieceListRebuild( Torrent * t )
1061{
1062
1063    if( !tr_torrentIsSeed( t->tor ) )
1064    {
1065        tr_piece_index_t i;
1066        tr_piece_index_t * pool;
1067        tr_piece_index_t poolCount = 0;
1068        const tr_torrent * tor = t->tor;
1069        const tr_info * inf = tr_torrentInfo( tor );
1070        struct weighted_piece * pieces;
1071        int pieceCount;
1072
1073        /* build the new list */
1074        pool = tr_new( tr_piece_index_t, inf->pieceCount );
1075        for( i=0; i<inf->pieceCount; ++i )
1076            if( !inf->pieces[i].dnd )
1077                if( !tr_cpPieceIsComplete( &tor->completion, i ) )
1078                    pool[poolCount++] = i;
1079        pieceCount = poolCount;
1080        pieces = tr_new0( struct weighted_piece, pieceCount );
1081        for( i=0; i<poolCount; ++i ) {
1082            struct weighted_piece * piece = pieces + i;
1083            piece->index = pool[i];
1084            piece->requestCount = 0;
1085            piece->salt = tr_cryptoWeakRandInt( 4096 );
1086        }
1087
1088        /* if we already had a list of pieces, merge it into
1089         * the new list so we don't lose its requestCounts */
1090        if( t->pieces != NULL )
1091        {
1092            struct weighted_piece * o = t->pieces;
1093            struct weighted_piece * oend = o + t->pieceCount;
1094            struct weighted_piece * n = pieces;
1095            struct weighted_piece * nend = n + pieceCount;
1096
1097            pieceListSort( t, PIECES_SORTED_BY_INDEX );
1098
1099            while( o!=oend && n!=nend ) {
1100                if( o->index < n->index )
1101                    ++o;
1102                else if( o->index > n->index )
1103                    ++n;
1104                else
1105                    *n++ = *o++;
1106            }
1107
1108            tr_free( t->pieces );
1109        }
1110
1111        t->pieces = pieces;
1112        t->pieceCount = pieceCount;
1113
1114        pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
1115
1116        /* cleanup */
1117        tr_free( pool );
1118    }
1119}
1120
1121static void
1122pieceListRemovePiece( Torrent * t, tr_piece_index_t piece )
1123{
1124    struct weighted_piece * p;
1125
1126    if(( p = pieceListLookup( t, piece )))
1127    {
1128        const int pos = p - t->pieces;
1129
1130        tr_removeElementFromArray( t->pieces,
1131                                   pos,
1132                                   sizeof( struct weighted_piece ),
1133                                   t->pieceCount-- );
1134
1135        if( t->pieceCount == 0 )
1136        {
1137            tr_free( t->pieces );
1138            t->pieces = NULL;
1139        }
1140    }
1141}
1142
1143static void
1144pieceListResortPiece( Torrent * t, struct weighted_piece * p )
1145{
1146    int pos;
1147    bool isSorted = true;
1148
1149    if( p == NULL )
1150        return;
1151
1152    /* is the torrent already sorted? */
1153    pos = p - t->pieces;
1154    setComparePieceByWeightTorrent( t );
1155    if( isSorted && ( pos > 0 ) && ( comparePieceByWeight( p-1, p ) > 0 ) )
1156        isSorted = false;
1157    if( isSorted && ( pos < t->pieceCount - 1 ) && ( comparePieceByWeight( p, p+1 ) > 0 ) )
1158        isSorted = false;
1159
1160    if( t->pieceSortState != PIECES_SORTED_BY_WEIGHT )
1161    {
1162       pieceListSort( t, PIECES_SORTED_BY_WEIGHT);
1163       isSorted = true;
1164    }
1165
1166    /* if it's not sorted, move it around */
1167    if( !isSorted )
1168    {
1169        bool exact;
1170        const struct weighted_piece tmp = *p;
1171
1172        tr_removeElementFromArray( t->pieces,
1173                                   pos,
1174                                   sizeof( struct weighted_piece ),
1175                                   t->pieceCount-- );
1176
1177        pos = tr_lowerBound( &tmp, t->pieces, t->pieceCount,
1178                             sizeof( struct weighted_piece ),
1179                             comparePieceByWeight, &exact );
1180
1181        memmove( &t->pieces[pos + 1],
1182                 &t->pieces[pos],
1183                 sizeof( struct weighted_piece ) * ( t->pieceCount++ - pos ) );
1184
1185        t->pieces[pos] = tmp;
1186    }
1187
1188    assertWeightedPiecesAreSorted( t );
1189}
1190
1191static void
1192pieceListRemoveRequest( Torrent * t, tr_block_index_t block )
1193{
1194    struct weighted_piece * p;
1195    const tr_piece_index_t index = tr_torBlockPiece( t->tor, block );
1196
1197    if( ((p = pieceListLookup( t, index ))) && ( p->requestCount > 0 ) )
1198    {
1199        --p->requestCount;
1200        pieceListResortPiece( t, p );
1201    }
1202}
1203
1204
1205/****
1206*****
1207*****  Replication count ( for rarest first policy )
1208*****
1209****/
1210
1211/**
1212 * Increase the replication count of this piece and sort it if the
1213 * piece list is already sorted
1214 */
1215static void
1216tr_incrReplicationOfPiece( Torrent * t, const size_t index )
1217{
1218    assert( replicationExists( t ) );
1219    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1220
1221    /* One more replication of this piece is present in the swarm */
1222    ++t->pieceReplication[index];
1223
1224    /* we only resort the piece if the list is already sorted */
1225    if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1226        pieceListResortPiece( t, pieceListLookup( t, index ) );
1227}
1228
1229/**
1230 * Increases the replication count of pieces present in the bitfield
1231 */
1232static void
1233tr_incrReplicationFromBitfield( Torrent * t, const tr_bitfield * b )
1234{
1235    size_t i;
1236    uint16_t * rep = t->pieceReplication;
1237    const size_t n = t->tor->info.pieceCount;
1238
1239    assert( replicationExists( t ) );
1240
1241    for( i=0; i<n; ++i )
1242        if( tr_bitfieldHas( b, i ) )
1243            ++rep[i];
1244
1245    if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1246        invalidatePieceSorting( t );
1247}
1248
1249/**
1250 * Increase the replication count of every piece
1251 */
1252static void
1253tr_incrReplication( Torrent * t )
1254{
1255    int i;
1256    const int n = t->pieceReplicationSize;
1257
1258    assert( replicationExists( t ) );
1259    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1260
1261    for( i=0; i<n; ++i )
1262        ++t->pieceReplication[i];
1263}
1264
1265/**
1266 * Decrease the replication count of pieces present in the bitset.
1267 */
1268static void
1269tr_decrReplicationFromBitfield( Torrent * t, const tr_bitfield * b )
1270{
1271    int i;
1272    const int n = t->pieceReplicationSize;
1273
1274    assert( replicationExists( t ) );
1275    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1276
1277    if( tr_bitfieldHasAll( b ) )
1278    {
1279        for( i=0; i<n; ++i )
1280            --t->pieceReplication[i];
1281    }
1282    else if ( !tr_bitfieldHasNone( b ) )
1283    {
1284        for( i=0; i<n; ++i )
1285            if( tr_bitfieldHas( b, i ) )
1286                --t->pieceReplication[i];
1287
1288        if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1289            invalidatePieceSorting( t );
1290    }
1291}
1292
1293/**
1294***
1295**/
1296
1297void
1298tr_peerMgrRebuildRequests( tr_torrent * tor )
1299{
1300    assert( tr_isTorrent( tor ) );
1301
1302    pieceListRebuild( tor->torrentPeers );
1303}
1304
1305void
1306tr_peerMgrGetNextRequests( tr_torrent           * tor,
1307                           tr_peer              * peer,
1308                           int                    numwant,
1309                           tr_block_index_t     * setme,
1310                           int                  * numgot,
1311                           bool                   get_intervals )
1312{
1313    int i;
1314    int got;
1315    Torrent * t;
1316    struct weighted_piece * pieces;
1317    const tr_bitfield * const have = &peer->have;
1318
1319    /* sanity clause */
1320    assert( tr_isTorrent( tor ) );
1321    assert( peer->clientIsInterested );
1322    assert( !peer->clientIsChoked );
1323    assert( numwant > 0 );
1324
1325    /* walk through the pieces and find blocks that should be requested */
1326    got = 0;
1327    t = tor->torrentPeers;
1328
1329    /* prep the pieces list */
1330    if( t->pieces == NULL )
1331        pieceListRebuild( t );
1332
1333    if( t->pieceSortState != PIECES_SORTED_BY_WEIGHT )
1334        pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
1335
1336    assertReplicationCountIsExact( t );
1337    assertWeightedPiecesAreSorted( t );
1338
1339    updateEndgame( t );
1340    pieces = t->pieces;
1341    for( i=0; i<t->pieceCount && got<numwant; ++i )
1342    {
1343        struct weighted_piece * p = pieces + i;
1344
1345        /* if the peer has this piece that we want... */
1346        if( tr_bitfieldHas( have, p->index ) )
1347        {
1348            tr_block_index_t b;
1349            tr_block_index_t first;
1350            tr_block_index_t last;
1351            tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1352
1353            tr_torGetPieceBlockRange( tor, p->index, &first, &last );
1354
1355            for( b=first; b<=last && (got<numwant || (get_intervals && setme[2*got-1] == b-1)); ++b )
1356            {
1357                int peerCount;
1358                tr_peer ** peers;
1359
1360                /* don't request blocks we've already got */
1361                if( tr_cpBlockIsComplete( &tor->completion, b ) )
1362                    continue;
1363
1364                /* always add peer if this block has no peers yet */
1365                tr_ptrArrayClear( &peerArr );
1366                getBlockRequestPeers( t, b, &peerArr );
1367                peers = (tr_peer **) tr_ptrArrayPeek( &peerArr, &peerCount );
1368                if( peerCount != 0 )
1369                {
1370                    /* don't make a second block request until the endgame */
1371                    if( !t->endgame )
1372                        continue;
1373
1374                    /* don't have more than two peers requesting this block */
1375                    if( peerCount > 1 )
1376                        continue;
1377
1378                    /* don't send the same request to the same peer twice */
1379                    if( peer == peers[0] )
1380                        continue;
1381
1382                    /* in the endgame allow an additional peer to download a
1383                       block but only if the peer seems to be handling requests
1384                       relatively fast */
1385                    if( peer->pendingReqsToPeer + numwant - got < t->endgame )
1386                        continue;
1387                }
1388
1389                /* update the caller's table */
1390                if( !get_intervals ) {
1391                    setme[got++] = b;
1392                }
1393                /* if intervals are requested two array entries are necessarry:
1394                   one for the interval's starting block and one for its end block */
1395                else if( got && setme[2 * got - 1] == b - 1 && b != first ) {
1396                    /* expand the last interval */
1397                    ++setme[2 * got - 1];
1398                }
1399                else {
1400                    /* begin a new interval */
1401                    setme[2 * got] = setme[2 * got + 1] = b;
1402                    ++got;
1403                }
1404
1405                /* update our own tables */
1406                requestListAdd( t, b, peer );
1407                ++p->requestCount;
1408            }
1409
1410            tr_ptrArrayDestruct( &peerArr, NULL );
1411        }
1412    }
1413
1414    /* In most cases we've just changed the weights of a small number of pieces.
1415     * So rather than qsort()ing the entire array, it's faster to apply an
1416     * adaptive insertion sort algorithm. */
1417    if( got > 0 )
1418    {
1419        /* not enough requests || last piece modified */
1420        if ( i == t->pieceCount ) --i;
1421
1422        setComparePieceByWeightTorrent( t );
1423        while( --i >= 0 )
1424        {
1425            bool exact;
1426
1427            /* relative position! */
1428            const int newpos = tr_lowerBound( &t->pieces[i], &t->pieces[i + 1],
1429                                              t->pieceCount - (i + 1),
1430                                              sizeof( struct weighted_piece ),
1431                                              comparePieceByWeight, &exact );
1432            if( newpos > 0 )
1433            {
1434                const struct weighted_piece piece = t->pieces[i];
1435                memmove( &t->pieces[i],
1436                         &t->pieces[i + 1],
1437                         sizeof( struct weighted_piece ) * ( newpos ) );
1438                t->pieces[i + newpos] = piece;
1439            }
1440        }
1441    }
1442
1443    assertWeightedPiecesAreSorted( t );
1444    *numgot = got;
1445}
1446
1447bool
1448tr_peerMgrDidPeerRequest( const tr_torrent  * tor,
1449                          const tr_peer     * peer,
1450                          tr_block_index_t    block )
1451{
1452    const Torrent * t = tor->torrentPeers;
1453    return requestListLookup( (Torrent*)t, block, peer ) != NULL;
1454}
1455
1456/* cancel requests that are too old */
1457static void
1458refillUpkeep( int foo UNUSED, short bar UNUSED, void * vmgr )
1459{
1460    time_t now;
1461    time_t too_old;
1462    tr_torrent * tor;
1463    int cancel_buflen = 0;
1464    struct block_request * cancel = NULL;
1465    tr_peerMgr * mgr = vmgr;
1466    managerLock( mgr );
1467
1468    now = tr_time( );
1469    too_old = now - REQUEST_TTL_SECS;
1470
1471    /* alloc the temporary "cancel" buffer */
1472    tor = NULL;
1473    while(( tor = tr_torrentNext( mgr->session, tor )))
1474        cancel_buflen = MAX( cancel_buflen, tor->torrentPeers->requestCount );
1475    if( cancel_buflen > 0 )
1476        cancel = tr_new( struct block_request, cancel_buflen );
1477
1478    /* prune requests that are too old */
1479    tor = NULL;
1480    while(( tor = tr_torrentNext( mgr->session, tor )))
1481    {
1482        Torrent * t = tor->torrentPeers;
1483        const int n = t->requestCount;
1484        if( n > 0 )
1485        {
1486            int keepCount = 0;
1487            int cancelCount = 0;
1488            const struct block_request * it;
1489            const struct block_request * end;
1490
1491            for( it=t->requests, end=it+n; it!=end; ++it )
1492            {
1493                if( ( it->sentAt <= too_old ) && it->peer->msgs && !tr_peerMsgsIsReadingBlock( it->peer->msgs, it->block ) )
1494                    cancel[cancelCount++] = *it;
1495                else
1496                {
1497                    if( it != &t->requests[keepCount] )
1498                        t->requests[keepCount] = *it;
1499                    keepCount++;
1500                }
1501            }
1502
1503            /* prune out the ones we aren't keeping */
1504            t->requestCount = keepCount;
1505
1506            /* send cancel messages for all the "cancel" ones */
1507            for( it=cancel, end=it+cancelCount; it!=end; ++it ) {
1508                if( ( it->peer != NULL ) && ( it->peer->msgs != NULL ) ) {
1509                    tr_historyAdd( &it->peer->cancelsSentToPeer, now, 1 );
1510                    tr_peerMsgsCancel( it->peer->msgs, it->block );
1511                    decrementPendingReqCount( it );
1512                }
1513            }
1514
1515            /* decrement the pending request counts for the timed-out blocks */
1516            for( it=cancel, end=it+cancelCount; it!=end; ++it )
1517                pieceListRemoveRequest( t, it->block );
1518        }
1519    }
1520
1521    tr_free( cancel );
1522    tr_timerAddMsec( mgr->refillUpkeepTimer, REFILL_UPKEEP_PERIOD_MSEC );
1523    managerUnlock( mgr );
1524}
1525
1526static void
1527addStrike( Torrent * t, tr_peer * peer )
1528{
1529    tordbg( t, "increasing peer %s strike count to %d",
1530            tr_atomAddrStr( peer->atom ), peer->strikes + 1 );
1531
1532    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
1533    {
1534        struct peer_atom * atom = peer->atom;
1535        atom->flags2 |= MYFLAG_BANNED;
1536        peer->doPurge = 1;
1537        tordbg( t, "banning peer %s", tr_atomAddrStr( atom ) );
1538    }
1539}
1540
1541static void
1542gotBadPiece( Torrent * t, tr_piece_index_t pieceIndex )
1543{
1544    tr_torrent *   tor = t->tor;
1545    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
1546
1547    tor->corruptCur += byteCount;
1548    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1549
1550    tr_announcerAddBytes( tor, TR_ANN_CORRUPT, byteCount );
1551}
1552
1553static void
1554peerSuggestedPiece( Torrent            * t UNUSED,
1555                    tr_peer            * peer UNUSED,
1556                    tr_piece_index_t     pieceIndex UNUSED,
1557                    int                  isFastAllowed UNUSED )
1558{
1559#if 0
1560    assert( t );
1561    assert( peer );
1562    assert( peer->msgs );
1563
1564    /* is this a valid piece? */
1565    if(  pieceIndex >= t->tor->info.pieceCount )
1566        return;
1567
1568    /* don't ask for it if we've already got it */
1569    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
1570        return;
1571
1572    /* don't ask for it if they don't have it */
1573    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
1574        return;
1575
1576    /* don't ask for it if we're choked and it's not fast */
1577    if( !isFastAllowed && peer->clientIsChoked )
1578        return;
1579
1580    /* request the blocks that we don't have in this piece */
1581    {
1582        tr_block_index_t b;
1583        tr_block_index_t first;
1584        tr_block_index_t last;
1585        const tr_torrent * tor = t->tor;
1586
1587        tr_torGetPieceBlockRange( t->tor, pieceIndex, &first, &last );
1588
1589        for( b=first; b<=last; ++b )
1590        {
1591            if( !tr_cpBlockIsComplete( tor->completion, b ) )
1592            {
1593                const uint32_t offset = getBlockOffsetInPiece( tor, b );
1594                const uint32_t length = tr_torBlockCountBytes( tor, b );
1595                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
1596                incrementPieceRequests( t, pieceIndex );
1597            }
1598        }
1599    }
1600#endif
1601}
1602
1603static void
1604removeRequestFromTables( Torrent * t, tr_block_index_t block, const tr_peer * peer )
1605{
1606    requestListRemove( t, block, peer );
1607    pieceListRemoveRequest( t, block );
1608}
1609
1610/* peer choked us, or maybe it disconnected.
1611   either way we need to remove all its requests */
1612static void
1613peerDeclinedAllRequests( Torrent * t, const tr_peer * peer )
1614{
1615    int i, n;
1616    tr_block_index_t * blocks = tr_new( tr_block_index_t, t->requestCount );
1617
1618    for( i=n=0; i<t->requestCount; ++i )
1619        if( peer == t->requests[i].peer )
1620            blocks[n++] = t->requests[i].block;
1621
1622    for( i=0; i<n; ++i )
1623        removeRequestFromTables( t, blocks[i], peer );
1624
1625    tr_free( blocks );
1626}
1627
1628static void tr_peerMgrSetBlame( tr_torrent *, tr_piece_index_t, int );
1629
1630static void
1631peerCallbackFunc( tr_peer * peer, const tr_peer_event * e, void * vt )
1632{
1633    Torrent * t = vt;
1634
1635    torrentLock( t );
1636
1637    assert( peer != NULL );
1638
1639    switch( e->eventType )
1640    {
1641        case TR_PEER_PEER_GOT_DATA:
1642        {
1643            const time_t now = tr_time( );
1644            tr_torrent * tor = t->tor;
1645
1646            if( e->wasPieceData )
1647            {
1648                tor->uploadedCur += e->length;
1649                tr_announcerAddBytes( tor, TR_ANN_UP, e->length );
1650                tr_torrentSetActivityDate( tor, now );
1651                tr_torrentSetDirty( tor );
1652            }
1653
1654            /* update the stats */
1655            if( e->wasPieceData )
1656                tr_statsAddUploaded( tor->session, e->length );
1657
1658            /* update our atom */
1659            if( peer->atom && e->wasPieceData )
1660                peer->atom->piece_data_time = now;
1661
1662            break;
1663        }
1664
1665        case TR_PEER_CLIENT_GOT_HAVE:
1666            if( replicationExists( t ) ) {
1667                tr_incrReplicationOfPiece( t, e->pieceIndex );
1668                assertReplicationCountIsExact( t );
1669            }
1670            break;
1671
1672        case TR_PEER_CLIENT_GOT_HAVE_ALL:
1673            if( replicationExists( t ) ) {
1674                tr_incrReplication( t );
1675                assertReplicationCountIsExact( t );
1676            }
1677            break;
1678
1679        case TR_PEER_CLIENT_GOT_HAVE_NONE:
1680            /* noop */
1681            break;
1682
1683        case TR_PEER_CLIENT_GOT_BITFIELD:
1684            assert( e->bitfield != NULL );
1685            if( replicationExists( t ) ) {
1686                tr_incrReplicationFromBitfield( t, e->bitfield );
1687                assertReplicationCountIsExact( t );
1688            }
1689            break;
1690
1691        case TR_PEER_CLIENT_GOT_REJ: {
1692            tr_block_index_t b = _tr_block( t->tor, e->pieceIndex, e->offset );
1693            if( b < t->tor->blockCount )
1694                removeRequestFromTables( t, b, peer );
1695            else
1696                tordbg( t, "Peer %s sent an out-of-range reject message",
1697                           tr_atomAddrStr( peer->atom ) );
1698            break;
1699        }
1700
1701        case TR_PEER_CLIENT_GOT_CHOKE:
1702            peerDeclinedAllRequests( t, peer );
1703            break;
1704
1705        case TR_PEER_CLIENT_GOT_PORT:
1706            if( peer->atom )
1707                peer->atom->port = e->port;
1708            break;
1709
1710        case TR_PEER_CLIENT_GOT_SUGGEST:
1711            peerSuggestedPiece( t, peer, e->pieceIndex, false );
1712            break;
1713
1714        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1715            peerSuggestedPiece( t, peer, e->pieceIndex, true );
1716            break;
1717
1718        case TR_PEER_CLIENT_GOT_DATA:
1719        {
1720            const time_t now = tr_time( );
1721            tr_torrent * tor = t->tor;
1722
1723            if( e->wasPieceData )
1724            {
1725                tor->downloadedCur += e->length;
1726                tr_torrentSetActivityDate( tor, now );
1727                tr_torrentSetDirty( tor );
1728            }
1729
1730            /* update the stats */
1731            if( e->wasPieceData )
1732                tr_statsAddDownloaded( tor->session, e->length );
1733
1734            /* update our atom */
1735            if( peer->atom && e->wasPieceData )
1736                peer->atom->piece_data_time = now;
1737
1738            break;
1739        }
1740
1741        case TR_PEER_CLIENT_GOT_BLOCK:
1742        {
1743            tr_torrent * tor = t->tor;
1744            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1745            int i, peerCount;
1746            tr_peer ** peers;
1747            tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1748
1749            removeRequestFromTables( t, block, peer );
1750            getBlockRequestPeers( t, block, &peerArr );
1751            peers = (tr_peer **) tr_ptrArrayPeek( &peerArr, &peerCount );
1752
1753            /* remove additional block requests and send cancel to peers */
1754            for( i=0; i<peerCount; i++ ) {
1755                tr_peer * p = peers[i];
1756                assert( p != peer );
1757                if( p->msgs ) {
1758                    tr_historyAdd( &p->cancelsSentToPeer, tr_time( ), 1 );
1759                    tr_peerMsgsCancel( p->msgs, block );
1760                }
1761                removeRequestFromTables( t, block, p );
1762            }
1763
1764            tr_ptrArrayDestruct( &peerArr, false );
1765
1766            tr_historyAdd( &peer->blocksSentToClient, tr_time( ), 1 );
1767
1768            if( tr_cpBlockIsComplete( &tor->completion, block ) )
1769            {
1770                /* we already have this block... */
1771                const uint32_t n = tr_torBlockCountBytes( tor, block );
1772                tor->downloadedCur -= MIN( tor->downloadedCur, n );
1773                tordbg( t, "we have this block already..." );
1774            }
1775            else
1776            {
1777                tr_cpBlockAdd( &tor->completion, block );
1778                pieceListResortPiece( t, pieceListLookup( t, e->pieceIndex ) );
1779                tr_torrentSetDirty( tor );
1780
1781                if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1782                {
1783                    const tr_piece_index_t p = e->pieceIndex;
1784                    const bool ok = tr_torrentCheckPiece( tor, p );
1785
1786                    tordbg( t, "[LAZY] checked just-completed piece %zu", (size_t)p );
1787
1788                    if( !ok )
1789                    {
1790                        tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1791                                   (unsigned long)p );
1792                    }
1793
1794                    tr_peerMgrSetBlame( tor, p, ok );
1795
1796                    if( !ok )
1797                    {
1798                        gotBadPiece( t, p );
1799                    }
1800                    else
1801                    {
1802                        int i;
1803                        int peerCount;
1804                        tr_peer ** peers;
1805                        tr_file_index_t fileIndex;
1806
1807                        /* only add this to downloadedCur if we got it from a peer --
1808                         * webseeds shouldn't count against our ratio. As one tracker
1809                         * admin put it, "Those pieces are downloaded directly from the
1810                         * content distributor, not the peers, it is the tracker's job
1811                         * to manage the swarms, not the web server and does not fit
1812                         * into the jurisdiction of the tracker." */
1813                        if( peer->msgs != NULL ) {
1814                            const uint32_t n = tr_torPieceCountBytes( tor, p );
1815                            tr_announcerAddBytes( tor, TR_ANN_DOWN, n );
1816                        }
1817
1818                        peerCount = tr_ptrArraySize( &t->peers );
1819                        peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1820                        for( i=0; i<peerCount; ++i )
1821                            tr_peerMsgsHave( peers[i]->msgs, p );
1822
1823                        for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex ) {
1824                            const tr_file * file = &tor->info.files[fileIndex];
1825                            if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) ) {
1826                                if( tr_cpFileIsComplete( &tor->completion, fileIndex ) ) {
1827                                    tr_cacheFlushFile( tor->session->cache, tor, fileIndex );
1828                                    tr_torrentFileCompleted( tor, fileIndex );
1829                                }
1830                            }
1831                        }
1832
1833                        pieceListRemovePiece( t, p );
1834                    }
1835                }
1836
1837                t->needsCompletenessCheck = true;
1838            }
1839            break;
1840        }
1841
1842        case TR_PEER_ERROR:
1843            if( ( e->err == ERANGE ) || ( e->err == EMSGSIZE ) || ( e->err == ENOTCONN ) )
1844            {
1845                /* some protocol error from the peer */
1846                peer->doPurge = 1;
1847                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1848                        tr_atomAddrStr( peer->atom ) );
1849            }
1850            else
1851            {
1852                tordbg( t, "unhandled error: %s", tr_strerror( e->err ) );
1853            }
1854            break;
1855
1856        default:
1857            assert( 0 );
1858    }
1859
1860    torrentUnlock( t );
1861}
1862
1863static int
1864getDefaultShelfLife( uint8_t from )
1865{
1866    /* in general, peers obtained from firsthand contact
1867     * are better than those from secondhand, etc etc */
1868    switch( from )
1869    {
1870        case TR_PEER_FROM_INCOMING : return 60 * 60 * 6;
1871        case TR_PEER_FROM_LTEP     : return 60 * 60 * 6;
1872        case TR_PEER_FROM_TRACKER  : return 60 * 60 * 3;
1873        case TR_PEER_FROM_DHT      : return 60 * 60 * 3;
1874        case TR_PEER_FROM_PEX      : return 60 * 60 * 2;
1875        case TR_PEER_FROM_RESUME   : return 60 * 60;
1876        case TR_PEER_FROM_LPD      : return 10 * 60;
1877        default                    : return 60 * 60;
1878    }
1879}
1880
1881static void
1882ensureAtomExists( Torrent           * t,
1883                  const tr_address  * addr,
1884                  const tr_port       port,
1885                  const uint8_t       flags,
1886                  const int8_t        seedProbability,
1887                  const uint8_t       from )
1888{
1889    struct peer_atom * a;
1890
1891    assert( tr_address_is_valid( addr ) );
1892    assert( from < TR_PEER_FROM__MAX );
1893
1894    a = getExistingAtom( t, addr );
1895
1896    if( a == NULL )
1897    {
1898        const int jitter = tr_cryptoWeakRandInt( 60*10 );
1899        a = tr_new0( struct peer_atom, 1 );
1900        a->addr = *addr;
1901        a->port = port;
1902        a->flags = flags;
1903        a->fromFirst = from;
1904        a->fromBest = from;
1905        a->shelf_date = tr_time( ) + getDefaultShelfLife( from ) + jitter;
1906        a->blocklisted = -1;
1907        atomSetSeedProbability( a, seedProbability );
1908        tr_ptrArrayInsertSorted( &t->pool, a, compareAtomsByAddress );
1909
1910        tordbg( t, "got a new atom: %s", tr_atomAddrStr( a ) );
1911    }
1912    else
1913    {
1914        if( from < a->fromBest )
1915            a->fromBest = from;
1916
1917        if( a->seedProbability == -1 )
1918            atomSetSeedProbability( a, seedProbability );
1919
1920        a->flags |= flags;
1921    }
1922}
1923
1924static int
1925getMaxPeerCount( const tr_torrent * tor )
1926{
1927    return tor->maxConnectedPeers;
1928}
1929
1930static int
1931getPeerCount( const Torrent * t )
1932{
1933    return tr_ptrArraySize( &t->peers );/* + tr_ptrArraySize( &t->outgoingHandshakes ); */
1934}
1935
1936/* FIXME: this is kind of a mess. */
1937static bool
1938myHandshakeDoneCB( tr_handshake  * handshake,
1939                   tr_peerIo     * io,
1940                   bool            readAnythingFromPeer,
1941                   bool            isConnected,
1942                   const uint8_t * peer_id,
1943                   void          * vmanager )
1944{
1945    bool               ok = isConnected;
1946    bool               success = false;
1947    tr_port            port;
1948    const tr_address * addr;
1949    tr_peerMgr       * manager = vmanager;
1950    Torrent          * t;
1951    tr_handshake     * ours;
1952
1953    assert( io );
1954    assert( tr_isBool( ok ) );
1955
1956    t = tr_peerIoHasTorrentHash( io )
1957        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1958        : NULL;
1959
1960    if( tr_peerIoIsIncoming ( io ) )
1961        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1962                                        handshake, handshakeCompare );
1963    else if( t )
1964        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1965                                        handshake, handshakeCompare );
1966    else
1967        ours = handshake;
1968
1969    assert( ours );
1970    assert( ours == handshake );
1971
1972    if( t )
1973        torrentLock( t );
1974
1975    addr = tr_peerIoGetAddress( io, &port );
1976
1977    if( !ok || !t || !t->isRunning )
1978    {
1979        if( t )
1980        {
1981            struct peer_atom * atom = getExistingAtom( t, addr );
1982            if( atom )
1983            {
1984                ++atom->numFails;
1985
1986                if( !readAnythingFromPeer )
1987                {
1988                    tordbg( t, "marking peer %s as unreachable... numFails is %d", tr_atomAddrStr( atom ), (int)atom->numFails );
1989                    atom->flags2 |= MYFLAG_UNREACHABLE;
1990                }
1991            }
1992        }
1993    }
1994    else /* looking good */
1995    {
1996        struct peer_atom * atom;
1997
1998        ensureAtomExists( t, addr, port, 0, -1, TR_PEER_FROM_INCOMING );
1999        atom = getExistingAtom( t, addr );
2000        atom->time = tr_time( );
2001        atom->piece_data_time = 0;
2002        atom->lastConnectionAt = tr_time( );
2003
2004        if( !tr_peerIoIsIncoming( io ) )
2005        {
2006            atom->flags |= ADDED_F_CONNECTABLE;
2007            atom->flags2 &= ~MYFLAG_UNREACHABLE;
2008        }
2009
2010        /* In principle, this flag specifies whether the peer groks uTP,
2011           not whether it's currently connected over uTP. */
2012        if( io->utp_socket )
2013            atom->flags |= ADDED_F_UTP_FLAGS;
2014
2015        if( atom->flags2 & MYFLAG_BANNED )
2016        {
2017            tordbg( t, "banned peer %s tried to reconnect",
2018                    tr_atomAddrStr( atom ) );
2019        }
2020        else if( tr_peerIoIsIncoming( io )
2021               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
2022
2023        {
2024        }
2025        else
2026        {
2027            tr_peer * peer = atom->peer;
2028
2029            if( peer ) /* we already have this peer */
2030            {
2031            }
2032            else
2033            {
2034                peer = getPeer( t, atom );
2035                tr_free( peer->client );
2036
2037                if( !peer_id )
2038                    peer->client = NULL;
2039                else {
2040                    char client[128];
2041                    tr_clientForId( client, sizeof( client ), peer_id );
2042                    peer->client = tr_strdup( client );
2043                }
2044
2045                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
2046                                                                balanced by our unref in peerDelete()  */
2047                tr_peerIoSetParent( peer->io, &t->tor->bandwidth );
2048                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t );
2049
2050                success = true;
2051            }
2052        }
2053    }
2054
2055    if( t )
2056        torrentUnlock( t );
2057
2058    return success;
2059}
2060
2061void
2062tr_peerMgrAddIncoming( tr_peerMgr * manager,
2063                       tr_address * addr,
2064                       tr_port      port,
2065                       int          socket,
2066                       struct UTPSocket * utp_socket )
2067{
2068    tr_session * session;
2069
2070    managerLock( manager );
2071
2072    assert( tr_isSession( manager->session ) );
2073    session = manager->session;
2074
2075    if( tr_sessionIsAddressBlocked( session, addr ) )
2076    {
2077        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_address_to_string( addr ) );
2078        if(socket >= 0)
2079            tr_netClose( session, socket );
2080        else
2081            UTP_Close( utp_socket );
2082    }
2083    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
2084    {
2085        if(socket >= 0)
2086            tr_netClose( session, socket );
2087        else
2088            UTP_Close( utp_socket );
2089    }
2090    else /* we don't have a connection to them yet... */
2091    {
2092        tr_peerIo *    io;
2093        tr_handshake * handshake;
2094
2095        io = tr_peerIoNewIncoming( session, &session->bandwidth, addr, port, socket, utp_socket );
2096
2097        handshake = tr_handshakeNew( io,
2098                                     session->encryptionMode,
2099                                     myHandshakeDoneCB,
2100                                     manager );
2101
2102        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
2103
2104        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
2105                                 handshakeCompare );
2106    }
2107
2108    managerUnlock( manager );
2109}
2110
2111void
2112tr_peerMgrAddPex( tr_torrent * tor, uint8_t from,
2113                  const tr_pex * pex, int8_t seedProbability )
2114{
2115    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
2116    {
2117        Torrent * t = tor->torrentPeers;
2118        managerLock( t->manager );
2119
2120        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
2121            if( tr_address_is_valid_for_peers( &pex->addr, pex->port ) )
2122                ensureAtomExists( t, &pex->addr, pex->port, pex->flags, seedProbability, from );
2123
2124        managerUnlock( t->manager );
2125    }
2126}
2127
2128void
2129tr_peerMgrMarkAllAsSeeds( tr_torrent * tor )
2130{
2131    Torrent * t = tor->torrentPeers;
2132    const int n = tr_ptrArraySize( &t->pool );
2133    struct peer_atom ** it = (struct peer_atom**) tr_ptrArrayBase( &t->pool );
2134    struct peer_atom ** end = it + n;
2135
2136    while( it != end )
2137        atomSetSeed( t, *it++ );
2138}
2139
2140tr_pex *
2141tr_peerMgrCompactToPex( const void *    compact,
2142                        size_t          compactLen,
2143                        const uint8_t * added_f,
2144                        size_t          added_f_len,
2145                        size_t *        pexCount )
2146{
2147    size_t          i;
2148    size_t          n = compactLen / 6;
2149    const uint8_t * walk = compact;
2150    tr_pex *        pex = tr_new0( tr_pex, n );
2151
2152    for( i = 0; i < n; ++i )
2153    {
2154        pex[i].addr.type = TR_AF_INET;
2155        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
2156        memcpy( &pex[i].port, walk, 2 ); walk += 2;
2157        if( added_f && ( n == added_f_len ) )
2158            pex[i].flags = added_f[i];
2159    }
2160
2161    *pexCount = n;
2162    return pex;
2163}
2164
2165tr_pex *
2166tr_peerMgrCompact6ToPex( const void    * compact,
2167                         size_t          compactLen,
2168                         const uint8_t * added_f,
2169                         size_t          added_f_len,
2170                         size_t        * pexCount )
2171{
2172    size_t          i;
2173    size_t          n = compactLen / 18;
2174    const uint8_t * walk = compact;
2175    tr_pex *        pex = tr_new0( tr_pex, n );
2176
2177    for( i = 0; i < n; ++i )
2178    {
2179        pex[i].addr.type = TR_AF_INET6;
2180        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
2181        memcpy( &pex[i].port, walk, 2 ); walk += 2;
2182        if( added_f && ( n == added_f_len ) )
2183            pex[i].flags = added_f[i];
2184    }
2185
2186    *pexCount = n;
2187    return pex;
2188}
2189
2190tr_pex *
2191tr_peerMgrArrayToPex( const void * array,
2192                      size_t       arrayLen,
2193                      size_t      * pexCount )
2194{
2195    size_t          i;
2196    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
2197    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
2198    const uint8_t * walk = array;
2199    tr_pex        * pex = tr_new0( tr_pex, n );
2200
2201    for( i = 0 ; i < n ; i++ ) {
2202        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
2203        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
2204        pex[i].flags = 0x00;
2205        walk += sizeof( tr_address ) + 2;
2206    }
2207
2208    *pexCount = n;
2209    return pex;
2210}
2211
2212/**
2213***
2214**/
2215
2216static void
2217tr_peerMgrSetBlame( tr_torrent     * tor,
2218                    tr_piece_index_t pieceIndex,
2219                    int              success )
2220{
2221    if( !success )
2222    {
2223        int        peerCount, i;
2224        Torrent *  t = tor->torrentPeers;
2225        tr_peer ** peers;
2226
2227        assert( torrentIsLocked( t ) );
2228
2229        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
2230        for( i = 0; i < peerCount; ++i )
2231        {
2232            tr_peer * peer = peers[i];
2233            if( tr_bitfieldHas( &peer->blame, pieceIndex ) )
2234            {
2235                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
2236                        tr_atomAddrStr( peer->atom ),
2237                        pieceIndex, (int)peer->strikes + 1 );
2238                addStrike( t, peer );
2239            }
2240        }
2241    }
2242}
2243
2244int
2245tr_pexCompare( const void * va, const void * vb )
2246{
2247    const tr_pex * a = va;
2248    const tr_pex * b = vb;
2249    int i;
2250
2251    assert( tr_isPex( a ) );
2252    assert( tr_isPex( b ) );
2253
2254    if(( i = tr_address_compare( &a->addr, &b->addr )))
2255        return i;
2256
2257    if( a->port != b->port )
2258        return a->port < b->port ? -1 : 1;
2259
2260    return 0;
2261}
2262
2263/* better goes first */
2264static int
2265compareAtomsByUsefulness( const void * va, const void *vb )
2266{
2267    const struct peer_atom * a = * (const struct peer_atom**) va;
2268    const struct peer_atom * b = * (const struct peer_atom**) vb;
2269
2270    assert( tr_isAtom( a ) );
2271    assert( tr_isAtom( b ) );
2272
2273    if( a->piece_data_time != b->piece_data_time )
2274        return a->piece_data_time > b->piece_data_time ? -1 : 1;
2275    if( a->fromBest != b->fromBest )
2276        return a->fromBest < b->fromBest ? -1 : 1;
2277    if( a->numFails != b->numFails )
2278        return a->numFails < b->numFails ? -1 : 1;
2279
2280    return 0;
2281}
2282
2283static bool
2284isAtomInteresting( const tr_torrent * tor, struct peer_atom * atom )
2285{
2286    if( tr_torrentIsSeed( tor ) && atomIsSeed( atom ) )
2287        return false;
2288
2289    if( peerIsInUse( tor->torrentPeers, atom ) )
2290        return true;
2291
2292    if( isAtomBlocklisted( tor->session, atom ) )
2293        return false;
2294
2295    if( atom->flags2 & MYFLAG_BANNED )
2296        return false;
2297
2298    return true;
2299}
2300
2301int
2302tr_peerMgrGetPeers( tr_torrent   * tor,
2303                    tr_pex      ** setme_pex,
2304                    uint8_t        af,
2305                    uint8_t        list_mode,
2306                    int            maxCount )
2307{
2308    int i;
2309    int n;
2310    int count = 0;
2311    int atomCount = 0;
2312    const Torrent * t = tor->torrentPeers;
2313    struct peer_atom ** atoms = NULL;
2314    tr_pex * pex;
2315    tr_pex * walk;
2316
2317    assert( tr_isTorrent( tor ) );
2318    assert( setme_pex != NULL );
2319    assert( af==TR_AF_INET || af==TR_AF_INET6 );
2320    assert( list_mode==TR_PEERS_CONNECTED || list_mode==TR_PEERS_INTERESTING );
2321
2322    managerLock( t->manager );
2323
2324    /**
2325    ***  build a list of atoms
2326    **/
2327
2328    if( list_mode == TR_PEERS_CONNECTED ) /* connected peers only */
2329    {
2330        int i;
2331        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2332        atomCount = tr_ptrArraySize( &t->peers );
2333        atoms = tr_new( struct peer_atom *, atomCount );
2334        for( i=0; i<atomCount; ++i )
2335            atoms[i] = peers[i]->atom;
2336    }
2337    else /* TR_PEERS_INTERESTING */
2338    {
2339        int i;
2340        struct peer_atom ** atomBase = (struct peer_atom**) tr_ptrArrayBase( &t->pool );
2341        n = tr_ptrArraySize( &t->pool );
2342        atoms = tr_new( struct peer_atom *, n );
2343        for( i=0; i<n; ++i )
2344            if( isAtomInteresting( tor, atomBase[i] ) )
2345                atoms[atomCount++] = atomBase[i];
2346    }
2347
2348    qsort( atoms, atomCount, sizeof( struct peer_atom * ), compareAtomsByUsefulness );
2349
2350    /**
2351    ***  add the first N of them into our return list
2352    **/
2353
2354    n = MIN( atomCount, maxCount );
2355    pex = walk = tr_new0( tr_pex, n );
2356
2357    for( i=0; i<atomCount && count<n; ++i )
2358    {
2359        const struct peer_atom * atom = atoms[i];
2360        if( atom->addr.type == af )
2361        {
2362            assert( tr_address_is_valid( &atom->addr ) );
2363            walk->addr = atom->addr;
2364            walk->port = atom->port;
2365            walk->flags = atom->flags;
2366            ++count;
2367            ++walk;
2368        }
2369    }
2370
2371    qsort( pex, count, sizeof( tr_pex ), tr_pexCompare );
2372
2373    assert( ( walk - pex ) == count );
2374    *setme_pex = pex;
2375
2376    /* cleanup */
2377    tr_free( atoms );
2378    managerUnlock( t->manager );
2379    return count;
2380}
2381
2382static void atomPulse      ( int, short, void * );
2383static void bandwidthPulse ( int, short, void * );
2384static void rechokePulse   ( int, short, void * );
2385static void reconnectPulse ( int, short, void * );
2386
2387static struct event *
2388createTimer( tr_session * session, int msec, void (*callback)(int, short, void *), void * cbdata )
2389{
2390    struct event * timer = evtimer_new( session->event_base, callback, cbdata );
2391    tr_timerAddMsec( timer, msec );
2392    return timer;
2393}
2394
2395static void
2396ensureMgrTimersExist( struct tr_peerMgr * m )
2397{
2398    if( m->atomTimer == NULL )
2399        m->atomTimer = createTimer( m->session, ATOM_PERIOD_MSEC, atomPulse, m );
2400
2401    if( m->bandwidthTimer == NULL )
2402        m->bandwidthTimer = createTimer( m->session, BANDWIDTH_PERIOD_MSEC, bandwidthPulse, m );
2403
2404    if( m->rechokeTimer == NULL )
2405        m->rechokeTimer = createTimer( m->session, RECHOKE_PERIOD_MSEC, rechokePulse, m );
2406
2407    if( m->refillUpkeepTimer == NULL )
2408        m->refillUpkeepTimer = createTimer( m->session, REFILL_UPKEEP_PERIOD_MSEC, refillUpkeep, m );
2409}
2410
2411void
2412tr_peerMgrStartTorrent( tr_torrent * tor )
2413{
2414    Torrent * t = tor->torrentPeers;
2415
2416    assert( tr_isTorrent( tor ) );
2417    assert( tr_torrentIsLocked( tor ) );
2418
2419    ensureMgrTimersExist( t->manager );
2420
2421    t->isRunning = true;
2422    t->maxPeers = t->tor->maxConnectedPeers;
2423    t->pieceSortState = PIECES_UNSORTED;
2424
2425    rechokePulse( 0, 0, t->manager );
2426}
2427
2428static void
2429stopTorrent( Torrent * t )
2430{
2431    tr_peer * peer;
2432
2433    t->isRunning = false;
2434
2435    replicationFree( t );
2436    invalidatePieceSorting( t );
2437
2438    /* disconnect the peers. */
2439    while(( peer = tr_ptrArrayPop( &t->peers )))
2440        peerDelete( t, peer );
2441
2442    /* disconnect the handshakes. handshakeAbort calls handshakeDoneCB(),
2443     * which removes the handshake from t->outgoingHandshakes... */
2444    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
2445        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
2446}
2447
2448void
2449tr_peerMgrStopTorrent( tr_torrent * tor )
2450{
2451    assert( tr_isTorrent( tor ) );
2452    assert( tr_torrentIsLocked( tor ) );
2453
2454    stopTorrent( tor->torrentPeers );
2455}
2456
2457void
2458tr_peerMgrAddTorrent( tr_peerMgr * manager, tr_torrent * tor )
2459{
2460    assert( tr_isTorrent( tor ) );
2461    assert( tr_torrentIsLocked( tor ) );
2462    assert( tor->torrentPeers == NULL );
2463
2464    tor->torrentPeers = torrentNew( manager, tor );
2465}
2466
2467void
2468tr_peerMgrRemoveTorrent( tr_torrent * tor )
2469{
2470    assert( tr_isTorrent( tor ) );
2471    assert( tr_torrentIsLocked( tor ) );
2472
2473    stopTorrent( tor->torrentPeers );
2474    torrentFree( tor->torrentPeers );
2475}
2476
2477void
2478tr_peerUpdateProgress( tr_torrent * tor, tr_peer * peer )
2479{
2480    const tr_bitfield * have = &peer->have;
2481
2482    if( tr_bitfieldHasAll( have ) )
2483    {
2484        peer->progress = 1.0;
2485    }
2486    else if( tr_bitfieldHasNone( have ) )
2487    {
2488        peer->progress = 0.0;
2489    }
2490    else
2491    {
2492        const float true_count = tr_bitfieldCountTrueBits( have );
2493
2494        if( tr_torrentHasMetadata( tor ) )
2495            peer->progress = true_count / tor->info.pieceCount;
2496        else /* without pieceCount, this result is only a best guess... */
2497            peer->progress = true_count / ( have->bit_count + 1 );
2498    }
2499
2500    if( peer->atom && ( peer->progress >= 1.0 ) )
2501        atomSetSeed( tor->torrentPeers, peer->atom );
2502}
2503
2504void
2505tr_peerMgrOnTorrentGotMetainfo( tr_torrent * tor )
2506{
2507    int i;
2508    const int peerCount = tr_ptrArraySize( &tor->torrentPeers->peers );
2509    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &tor->torrentPeers->peers );
2510
2511    /* some peer_msgs' progress fields may not be accurate if we
2512       didn't have the metadata before now... so refresh them all... */
2513    for( i=0; i<peerCount; ++i )
2514        tr_peerUpdateProgress( tor, peers[i] );
2515}
2516
2517void
2518tr_peerMgrTorrentAvailability( const tr_torrent * tor, int8_t * tab, unsigned int tabCount )
2519{
2520    assert( tr_isTorrent( tor ) );
2521    assert( torrentIsLocked( tor->torrentPeers ) );
2522    assert( tab != NULL );
2523    assert( tabCount > 0 );
2524
2525    memset( tab, 0, tabCount );
2526
2527    if( tr_torrentHasMetadata( tor ) )
2528    {
2529        tr_piece_index_t i;
2530        const int peerCount = tr_ptrArraySize( &tor->torrentPeers->peers );
2531        const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &tor->torrentPeers->peers );
2532        const float interval = tor->info.pieceCount / (float)tabCount;
2533        const bool isSeed = tr_cpGetStatus( &tor->completion ) == TR_SEED;
2534
2535        for( i=0; i<tabCount; ++i )
2536        {
2537            const int piece = i * interval;
2538
2539            if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
2540                tab[i] = -1;
2541            else if( peerCount ) {
2542                int j;
2543                for( j=0; j<peerCount; ++j )
2544                    if( tr_bitfieldHas( &peers[j]->have, piece ) )
2545                        ++tab[i];
2546            }
2547        }
2548    }
2549}
2550
2551static bool
2552peerIsSeed( const tr_peer * peer )
2553{
2554    if( peer->progress >= 1.0 )
2555        return true;
2556
2557    if( peer->atom && atomIsSeed( peer->atom ) )
2558        return true;
2559
2560    return false;
2561}
2562
2563/* count how many bytes we want that connected peers have */
2564uint64_t
2565tr_peerMgrGetDesiredAvailable( const tr_torrent * tor )
2566{
2567    size_t i;
2568    size_t n;
2569    uint64_t desiredAvailable;
2570    const Torrent * t = tor->torrentPeers;
2571
2572    /* common shortcuts... */
2573
2574    if( tr_torrentIsSeed( t->tor ) )
2575        return 0;
2576
2577    if( !tr_torrentHasMetadata( tor ) )
2578        return 0;
2579
2580    n = tr_ptrArraySize( &t->peers );
2581    if( n == 0 )
2582        return 0;
2583    else {
2584        const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2585        for( i=0; i<n; ++i )
2586            if( peers[i]->atom && atomIsSeed( peers[i]->atom ) )
2587                return tr_cpLeftUntilDone( &tor->completion );
2588    }
2589
2590    if( !t->pieceReplication || !t->pieceReplicationSize )
2591        return 0;
2592
2593    /* do it the hard way */
2594
2595    desiredAvailable = 0;
2596    for( i=0, n=MIN(tor->info.pieceCount, t->pieceReplicationSize); i<n; ++i )
2597        if( !tor->info.pieces[i].dnd && ( t->pieceReplication[i] > 0 ) )
2598            desiredAvailable += tr_cpMissingBytesInPiece( &t->tor->completion, i );
2599
2600    assert( desiredAvailable <= tor->info.totalSize );
2601    return desiredAvailable;
2602}
2603
2604void
2605tr_peerMgrTorrentStats( tr_torrent  * tor,
2606                        int         * setmePeersConnected,
2607                        int         * setmeWebseedsSendingToUs,
2608                        int         * setmePeersSendingToUs,
2609                        int         * setmePeersGettingFromUs,
2610                        int         * setmePeersFrom )
2611{
2612    int i, size;
2613    const Torrent * t = tor->torrentPeers;
2614    const tr_peer ** peers;
2615
2616    assert( tr_torrentIsLocked( tor ) );
2617
2618    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2619    size = tr_ptrArraySize( &t->peers );
2620
2621    *setmePeersConnected       = 0;
2622    *setmePeersGettingFromUs   = 0;
2623    *setmePeersSendingToUs     = 0;
2624    *setmeWebseedsSendingToUs  = 0;
2625
2626    for( i=0; i<TR_PEER_FROM__MAX; ++i )
2627        setmePeersFrom[i] = 0;
2628
2629    for( i=0; i<size; ++i )
2630    {
2631        const tr_peer * peer = peers[i];
2632        const struct peer_atom * atom = peer->atom;
2633
2634        if( peer->io == NULL ) /* not connected */
2635            continue;
2636
2637        ++*setmePeersConnected;
2638
2639        ++setmePeersFrom[atom->fromFirst];
2640
2641        if( clientIsDownloadingFrom( tor, peer ) )
2642            ++*setmePeersSendingToUs;
2643
2644        if( clientIsUploadingTo( peer ) )
2645            ++*setmePeersGettingFromUs;
2646    }
2647
2648    *setmeWebseedsSendingToUs = countActiveWebseeds( t );
2649}
2650
2651double*
2652tr_peerMgrWebSpeeds_KBps( const tr_torrent * tor )
2653{
2654    int i;
2655    const Torrent * t = tor->torrentPeers;
2656    const int webseedCount = tr_ptrArraySize( &t->webseeds );
2657    const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2658    const uint64_t now = tr_time_msec( );
2659    double * ret = tr_new0( double, webseedCount );
2660
2661    assert( tr_isTorrent( tor ) );
2662    assert( tr_torrentIsLocked( tor ) );
2663    assert( t->manager != NULL );
2664    assert( webseedCount == tor->info.webseedCount );
2665
2666    for( i=0; i<webseedCount; ++i ) {
2667        int Bps;
2668        if( tr_webseedGetSpeed_Bps( webseeds[i], now, &Bps ) )
2669            ret[i] = Bps / (double)tr_speed_K;
2670        else
2671            ret[i] = -1.0;
2672    }
2673
2674    return ret;
2675}
2676
2677int
2678tr_peerGetPieceSpeed_Bps( const tr_peer * peer, uint64_t now, tr_direction direction )
2679{
2680    return peer->io ? tr_peerIoGetPieceSpeed_Bps( peer->io, now, direction ) : 0.0;
2681}
2682
2683struct tr_peer_stat *
2684tr_peerMgrPeerStats( const tr_torrent * tor, int * setmeCount )
2685{
2686    int i;
2687    const Torrent * t = tor->torrentPeers;
2688    const int size = tr_ptrArraySize( &t->peers );
2689    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2690    const uint64_t now_msec = tr_time_msec( );
2691    const time_t now = tr_time();
2692    tr_peer_stat * ret = tr_new0( tr_peer_stat, size );
2693
2694    assert( tr_isTorrent( tor ) );
2695    assert( tr_torrentIsLocked( tor ) );
2696    assert( t->manager );
2697
2698    for( i=0; i<size; ++i )
2699    {
2700        char *                   pch;
2701        const tr_peer *          peer = peers[i];
2702        const struct peer_atom * atom = peer->atom;
2703        tr_peer_stat *           stat = ret + i;
2704
2705        tr_address_to_string_with_buf( &atom->addr, stat->addr, sizeof( stat->addr ) );
2706        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
2707                   sizeof( stat->client ) );
2708        stat->port                = ntohs( peer->atom->port );
2709        stat->from                = atom->fromFirst;
2710        stat->progress            = peer->progress;
2711        stat->isUTP               = peer->io->utp_socket != NULL;
2712        stat->isEncrypted         = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
2713        stat->rateToPeer_KBps     = toSpeedKBps( tr_peerGetPieceSpeed_Bps( peer, now_msec, TR_CLIENT_TO_PEER ) );
2714        stat->rateToClient_KBps   = toSpeedKBps( tr_peerGetPieceSpeed_Bps( peer, now_msec, TR_PEER_TO_CLIENT ) );
2715        stat->peerIsChoked        = peer->peerIsChoked;
2716        stat->peerIsInterested    = peer->peerIsInterested;
2717        stat->clientIsChoked      = peer->clientIsChoked;
2718        stat->clientIsInterested  = peer->clientIsInterested;
2719        stat->isIncoming          = tr_peerIoIsIncoming( peer->io );
2720        stat->isDownloadingFrom   = clientIsDownloadingFrom( tor, peer );
2721        stat->isUploadingTo       = clientIsUploadingTo( peer );
2722        stat->isSeed              = peerIsSeed( peer );
2723
2724        stat->blocksToPeer        = tr_historyGet( &peer->blocksSentToPeer,    now, CANCEL_HISTORY_SEC );
2725        stat->blocksToClient      = tr_historyGet( &peer->blocksSentToClient,  now, CANCEL_HISTORY_SEC );
2726        stat->cancelsToPeer       = tr_historyGet( &peer->cancelsSentToPeer,   now, CANCEL_HISTORY_SEC );
2727        stat->cancelsToClient     = tr_historyGet( &peer->cancelsSentToClient, now, CANCEL_HISTORY_SEC );
2728
2729        stat->pendingReqsToPeer   = peer->pendingReqsToPeer;
2730        stat->pendingReqsToClient = peer->pendingReqsToClient;
2731
2732        pch = stat->flagStr;
2733        if( stat->isUTP ) *pch++ = 'T';
2734        if( t->optimistic == peer ) *pch++ = 'O';
2735        if( stat->isDownloadingFrom ) *pch++ = 'D';
2736        else if( stat->clientIsInterested ) *pch++ = 'd';
2737        if( stat->isUploadingTo ) *pch++ = 'U';
2738        else if( stat->peerIsInterested ) *pch++ = 'u';
2739        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
2740        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
2741        if( stat->isEncrypted ) *pch++ = 'E';
2742        if( stat->from == TR_PEER_FROM_DHT ) *pch++ = 'H';
2743        else if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
2744        if( stat->isIncoming ) *pch++ = 'I';
2745        *pch = '\0';
2746    }
2747
2748    *setmeCount = size;
2749
2750    return ret;
2751}
2752
2753/***
2754****
2755****
2756***/
2757
2758void
2759tr_peerMgrClearInterest( tr_torrent * tor )
2760{
2761    int i;
2762    Torrent * t = tor->torrentPeers;
2763    const int peerCount = tr_ptrArraySize( &t->peers );
2764
2765    assert( tr_isTorrent( tor ) );
2766    assert( tr_torrentIsLocked( tor ) );
2767
2768    for( i=0; i<peerCount; ++i )
2769    {
2770        const tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2771        tr_peerMsgsSetInterested( peer->msgs, false );
2772    }
2773}
2774
2775/* does this peer have any pieces that we want? */
2776static bool
2777isPeerInteresting( const tr_torrent  * const tor,
2778                   const bool        * const piece_is_interesting,
2779                   const tr_peer     * const peer )
2780{
2781    tr_piece_index_t i, n;
2782
2783    /* these cases should have already been handled by the calling code... */
2784    assert( !tr_torrentIsSeed( tor ) );
2785    assert( tr_torrentIsPieceTransferAllowed( tor, TR_PEER_TO_CLIENT ) );
2786
2787    if( peerIsSeed( peer ) )
2788        return true;
2789
2790    for( i=0, n=tor->info.pieceCount; i<n; ++i )
2791        if( piece_is_interesting[i] && tr_bitfieldHas( &peer->have, i ) )
2792            return true;
2793
2794    return false;
2795}
2796
2797typedef enum
2798{
2799    RECHOKE_STATE_GOOD,
2800    RECHOKE_STATE_UNTESTED,
2801    RECHOKE_STATE_BAD
2802}
2803tr_rechoke_state;
2804
2805struct tr_rechoke_info
2806{
2807    tr_peer * peer;
2808    int salt;
2809    int rechoke_state;
2810};
2811
2812static int
2813compare_rechoke_info( const void * va, const void * vb )
2814{
2815    const struct tr_rechoke_info * a = va;
2816    const struct tr_rechoke_info * b = vb;
2817
2818    if( a->rechoke_state != b->rechoke_state )
2819        return a->rechoke_state - b->rechoke_state;
2820
2821    return a->salt - b->salt;
2822}
2823
2824/* determines who we send "interested" messages to */
2825static void
2826rechokeDownloads( Torrent * t )
2827{
2828    int i;
2829    int maxPeers = 0;
2830    int rechoke_count = 0;
2831    struct tr_rechoke_info * rechoke = NULL;
2832    const int MIN_INTERESTING_PEERS = 5;
2833    const int peerCount = tr_ptrArraySize( &t->peers );
2834    const time_t now = tr_time( );
2835
2836    /* some cases where this function isn't necessary */
2837    if( tr_torrentIsSeed( t->tor ) )
2838        return;
2839    if ( !tr_torrentIsPieceTransferAllowed( t->tor, TR_PEER_TO_CLIENT ) )
2840        return;
2841
2842    /* decide HOW MANY peers to be interested in */
2843    {
2844        int blocks = 0;
2845        int cancels = 0;
2846        time_t timeSinceCancel;
2847
2848        /* Count up how many blocks & cancels each peer has.
2849         *
2850         * There are two situations where we send out cancels --
2851         *
2852         * 1. We've got unresponsive peers, which is handled by deciding
2853         *    -which- peers to be interested in.
2854         *
2855         * 2. We've hit our bandwidth cap, which is handled by deciding
2856         *    -how many- peers to be interested in.
2857         *
2858         * We're working on 2. here, so we need to ignore unresponsive
2859         * peers in our calculations lest they confuse Transmission into
2860         * thinking it's hit its bandwidth cap.
2861         */
2862        for( i=0; i<peerCount; ++i )
2863        {
2864            const tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2865            const int b = tr_historyGet( &peer->blocksSentToClient, now, CANCEL_HISTORY_SEC );
2866            const int c = tr_historyGet( &peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC );
2867
2868            if( b == 0 ) /* ignore unresponsive peers, as described above */
2869                continue;
2870
2871            blocks += b;
2872            cancels += c;
2873        }
2874
2875        if( cancels > 0 )
2876        {
2877            /* cancelRate: of the block requests we've recently made, the percentage we cancelled.
2878             * higher values indicate more congestion. */
2879            const double cancelRate = cancels / (double)(cancels + blocks);
2880            const double mult = 1 - MIN( cancelRate, 0.5 );
2881            maxPeers = t->interestedCount * mult;
2882            tordbg( t, "cancel rate is %.3f -- reducing the "
2883                       "number of peers we're interested in by %.0f percent",
2884                       cancelRate, mult * 100 );
2885            t->lastCancel = now;
2886        }
2887
2888        timeSinceCancel = now - t->lastCancel;
2889        if( timeSinceCancel )
2890        {
2891            const int maxIncrease = 15;
2892            const time_t maxHistory = 2 * CANCEL_HISTORY_SEC;
2893            const double mult = MIN( timeSinceCancel, maxHistory ) / (double) maxHistory;
2894            const int inc = maxIncrease * mult;
2895            maxPeers = t->maxPeers + inc;
2896            tordbg( t, "time since last cancel is %li -- increasing the "
2897                       "number of peers we're interested in by %d",
2898                       timeSinceCancel, inc );
2899        }
2900    }
2901
2902    /* don't let the previous section's number tweaking go too far... */
2903    if( maxPeers < MIN_INTERESTING_PEERS )
2904        maxPeers = MIN_INTERESTING_PEERS;
2905    if( maxPeers > t->tor->maxConnectedPeers )
2906        maxPeers = t->tor->maxConnectedPeers;
2907
2908    t->maxPeers = maxPeers;
2909
2910    if( peerCount > 0 )
2911    {
2912        bool * piece_is_interesting;
2913        const tr_torrent * const tor = t->tor;
2914        const int n = tor->info.pieceCount;
2915
2916        /* build a bitfield of interesting pieces... */
2917        piece_is_interesting = tr_new( bool, n );
2918        for( i=0; i<n; i++ )
2919            piece_is_interesting[i] = !tor->info.pieces[i].dnd && !tr_cpPieceIsComplete( &tor->completion, i );
2920
2921        /* decide WHICH peers to be interested in (based on their cancel-to-block ratio) */
2922        for( i=0; i<peerCount; ++i )
2923        {
2924            tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2925
2926            if( !isPeerInteresting( t->tor, piece_is_interesting, peer ) )
2927            {
2928                tr_peerMsgsSetInterested( peer->msgs, false );
2929            }
2930            else
2931            {
2932                tr_rechoke_state rechoke_state;
2933                const int blocks = tr_historyGet( &peer->blocksSentToClient, now, CANCEL_HISTORY_SEC );
2934                const int cancels = tr_historyGet( &peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC );
2935
2936                if( !blocks && !cancels )
2937                    rechoke_state = RECHOKE_STATE_UNTESTED;
2938                else if( !cancels )
2939                    rechoke_state = RECHOKE_STATE_GOOD;
2940                else if( !blocks )
2941                    rechoke_state = RECHOKE_STATE_BAD;
2942                else if( ( cancels * 10 ) < blocks )
2943                    rechoke_state = RECHOKE_STATE_GOOD;
2944                else
2945                    rechoke_state = RECHOKE_STATE_BAD;
2946
2947                if( rechoke == NULL )
2948                    rechoke = tr_new( struct tr_rechoke_info, peerCount );
2949
2950                 rechoke[rechoke_count].peer = peer;
2951                 rechoke[rechoke_count].rechoke_state = rechoke_state;
2952                 rechoke[rechoke_count].salt = tr_cryptoWeakRandInt( INT_MAX );
2953                 rechoke_count++;
2954            }
2955
2956        }
2957
2958        tr_free( piece_is_interesting );
2959    }
2960
2961    /* now that we know which & how many peers to be interested in... update the peer interest */
2962    qsort( rechoke, rechoke_count, sizeof( struct tr_rechoke_info ), compare_rechoke_info );
2963    t->interestedCount = MIN( maxPeers, rechoke_count );
2964    for( i=0; i<rechoke_count; ++i )
2965        tr_peerMsgsSetInterested( rechoke[i].peer->msgs, i<t->interestedCount );
2966
2967    /* cleanup */
2968    tr_free( rechoke );
2969}
2970
2971/**
2972***
2973**/
2974
2975struct ChokeData
2976{
2977    bool            isInterested;
2978    bool            wasChoked;
2979    bool            isChoked;
2980    int             rate;
2981    int             salt;
2982    tr_peer *       peer;
2983};
2984
2985static int
2986compareChoke( const void * va, const void * vb )
2987{
2988    const struct ChokeData * a = va;
2989    const struct ChokeData * b = vb;
2990
2991    if( a->rate != b->rate ) /* prefer higher overall speeds */
2992        return a->rate > b->rate ? -1 : 1;
2993
2994    if( a->wasChoked != b->wasChoked ) /* prefer unchoked */
2995        return a->wasChoked ? 1 : -1;
2996
2997    if( a->salt != b->salt ) /* random order */
2998        return a->salt - b->salt;
2999
3000    return 0;
3001}
3002
3003/* is this a new connection? */
3004static int
3005isNew( const tr_peer * peer )
3006{
3007    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
3008}
3009
3010/* get a rate for deciding which peers to choke and unchoke. */
3011static int
3012getRate( const tr_torrent * tor, struct peer_atom * atom, uint64_t now )
3013{
3014    int Bps;
3015
3016    if( tr_torrentIsSeed( tor ) )
3017        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_CLIENT_TO_PEER );
3018
3019    /* downloading a private torrent... take upload speed into account
3020     * because there may only be a small window of opportunity to share */
3021    else if( tr_torrentIsPrivate( tor ) )
3022        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_PEER_TO_CLIENT )
3023            + tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_CLIENT_TO_PEER );
3024
3025    /* downloading a public torrent */
3026    else
3027        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_PEER_TO_CLIENT );
3028
3029    /* convert it to bytes per second */
3030    return Bps;
3031}
3032
3033static inline bool
3034isBandwidthMaxedOut( const tr_bandwidth * b,
3035                     const uint64_t now_msec, tr_direction dir )
3036{
3037    if( !tr_bandwidthIsLimited( b, dir ) )
3038        return false;
3039    else {
3040        const int got = tr_bandwidthGetPieceSpeed_Bps( b, now_msec, dir );
3041        const int want = tr_bandwidthGetDesiredSpeed_Bps( b, dir );
3042        return got >= want;
3043    }
3044}
3045
3046static void
3047rechokeUploads( Torrent * t, const uint64_t now )
3048{
3049    int i, size, unchokedInterested;
3050    const int peerCount = tr_ptrArraySize( &t->peers );
3051    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
3052    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
3053    const tr_session * session = t->manager->session;
3054    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
3055    const bool isMaxedOut = isBandwidthMaxedOut( &t->tor->bandwidth, now, TR_UP );
3056
3057    assert( torrentIsLocked( t ) );
3058
3059    /* an optimistic unchoke peer's "optimistic"
3060     * state lasts for N calls to rechokeUploads(). */
3061    if( t->optimisticUnchokeTimeScaler > 0 )
3062        t->optimisticUnchokeTimeScaler--;
3063    else
3064        t->optimistic = NULL;
3065
3066    /* sort the peers by preference and rate */
3067    for( i = 0, size = 0; i < peerCount; ++i )
3068    {
3069        tr_peer * peer = peers[i];
3070        struct peer_atom * atom = peer->atom;
3071
3072        if( peerIsSeed( peer ) ) /* choke seeds and partial seeds */
3073        {
3074            tr_peerMsgsSetChoke( peer->msgs, true );
3075        }
3076        else if( chokeAll ) /* choke everyone if we're not uploading */
3077        {
3078            tr_peerMsgsSetChoke( peer->msgs, true );
3079        }
3080        else if( peer != t->optimistic )
3081        {
3082            struct ChokeData * n = &choke[size++];
3083            n->peer         = peer;
3084            n->isInterested = peer->peerIsInterested;
3085            n->wasChoked    = peer->peerIsChoked;
3086            n->rate         = getRate( t->tor, atom, now );
3087            n->salt         = tr_cryptoWeakRandInt( INT_MAX );
3088            n->isChoked     = true;
3089        }
3090    }
3091
3092    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
3093
3094    /**
3095     * Reciprocation and number of uploads capping is managed by unchoking
3096     * the N peers which have the best upload rate and are interested.
3097     * This maximizes the client's download rate. These N peers are
3098     * referred to as downloaders, because they are interested in downloading
3099     * from the client.
3100     *
3101     * Peers which have a better upload rate (as compared to the downloaders)
3102     * but aren't interested get unchoked. If they become interested, the
3103     * downloader with the worst upload rate gets choked. If a client has
3104     * a complete file, it uses its upload rate rather than its download
3105     * rate to decide which peers to unchoke.
3106     *
3107     * If our bandwidth is maxed out, don't unchoke any more peers.
3108     */
3109    unchokedInterested = 0;
3110    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
3111        choke[i].isChoked = isMaxedOut ? choke[i].wasChoked : false;
3112        if( choke[i].isInterested )
3113            ++unchokedInterested;
3114    }
3115
3116    /* optimistic unchoke */
3117    if( !t->optimistic && !isMaxedOut && (i<size) )
3118    {
3119        int n;
3120        struct ChokeData * c;
3121        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
3122
3123        for( ; i<size; ++i )
3124        {
3125            if( choke[i].isInterested )
3126            {
3127                const tr_peer * peer = choke[i].peer;
3128                int x = 1, y;
3129                if( isNew( peer ) ) x *= 3;
3130                for( y=0; y<x; ++y )
3131                    tr_ptrArrayAppend( &randPool, &choke[i] );
3132            }
3133        }
3134
3135        if(( n = tr_ptrArraySize( &randPool )))
3136        {
3137            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
3138            c->isChoked = false;
3139            t->optimistic = c->peer;
3140            t->optimisticUnchokeTimeScaler = OPTIMISTIC_UNCHOKE_MULTIPLIER;
3141        }
3142
3143        tr_ptrArrayDestruct( &randPool, NULL );
3144    }
3145
3146    for( i=0; i<size; ++i )
3147        tr_peerMsgsSetChoke( choke[i].peer->msgs, choke[i].isChoked );
3148
3149    /* cleanup */
3150    tr_free( choke );
3151}
3152
3153static void
3154rechokePulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3155{
3156    tr_torrent * tor = NULL;
3157    tr_peerMgr * mgr = vmgr;
3158    const uint64_t now = tr_time_msec( );
3159
3160    managerLock( mgr );
3161
3162    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3163        if( tor->isRunning ) {
3164            Torrent * t = tor->torrentPeers;
3165            if( !tr_ptrArrayEmpty( &t->peers ) ) {
3166                rechokeUploads( t, now );
3167                rechokeDownloads( t );
3168            }
3169        }
3170    }
3171
3172    tr_timerAddMsec( mgr->rechokeTimer, RECHOKE_PERIOD_MSEC );
3173    managerUnlock( mgr );
3174}
3175
3176/***
3177****
3178****  Life and Death
3179****
3180***/
3181
3182static bool
3183shouldPeerBeClosed( const Torrent    * t,
3184                    const tr_peer    * peer,
3185                    int                peerCount,
3186                    const time_t       now )
3187{
3188    const tr_torrent *       tor = t->tor;
3189    const struct peer_atom * atom = peer->atom;
3190
3191    /* if it's marked for purging, close it */
3192    if( peer->doPurge )
3193    {
3194        tordbg( t, "purging peer %s because its doPurge flag is set",
3195                tr_atomAddrStr( atom ) );
3196        return true;
3197    }
3198
3199    /* disconnect if we're both seeds and enough time has passed for PEX */
3200    if( tr_torrentIsSeed( tor ) && peerIsSeed( peer ) )
3201        return !tr_torrentAllowsPex(tor) || (now-atom->time>=30);
3202
3203    /* disconnect if it's been too long since piece data has been transferred.
3204     * this is on a sliding scale based on number of available peers... */
3205    {
3206        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
3207        /* if we have >= relaxIfFewerThan, strictness is 100%.
3208         * if we have zero connections, strictness is 0% */
3209        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
3210                               ? 1.0
3211                               : peerCount / (float)relaxStrictnessIfFewerThanN;
3212        const int lo = MIN_UPLOAD_IDLE_SECS;
3213        const int hi = MAX_UPLOAD_IDLE_SECS;
3214        const int limit = hi - ( ( hi - lo ) * strictness );
3215        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
3216/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
3217        if( idleTime > limit ) {
3218            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
3219                       tr_atomAddrStr( atom ), idleTime );
3220            return true;
3221        }
3222    }
3223
3224    return false;
3225}
3226
3227static tr_peer **
3228getPeersToClose( Torrent * t, const time_t now_sec, int * setmeSize )
3229{
3230    int i, peerCount, outsize;
3231    struct tr_peer ** ret = NULL;
3232    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
3233
3234    assert( torrentIsLocked( t ) );
3235
3236    for( i = outsize = 0; i < peerCount; ++i ) {
3237        if( shouldPeerBeClosed( t, peers[i], peerCount, now_sec ) ) {
3238            if( ret == NULL )
3239                ret = tr_new( tr_peer *, peerCount );
3240            ret[outsize++] = peers[i];
3241        }
3242    }
3243
3244    *setmeSize = outsize;
3245    return ret;
3246}
3247
3248static int
3249getReconnectIntervalSecs( const struct peer_atom * atom, const time_t now )
3250{
3251    int sec;
3252
3253    /* if we were recently connected to this peer and transferring piece
3254     * data, try to reconnect to them sooner rather that later -- we don't
3255     * want network troubles to get in the way of a good peer. */
3256    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
3257        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3258
3259    /* don't allow reconnects more often than our minimum */
3260    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
3261        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3262
3263    /* otherwise, the interval depends on how many times we've tried
3264     * and failed to connect to the peer */
3265    else switch( atom->numFails ) {
3266        case 0: sec = 0; break;
3267        case 1: sec = 5; break;
3268        case 2: sec = 2 * 60; break;
3269        case 3: sec = 15 * 60; break;
3270        case 4: sec = 30 * 60; break;
3271        case 5: sec = 60 * 60; break;
3272        default: sec = 120 * 60; break;
3273    }
3274
3275    /* penalize peers that were unreachable the last time we tried */
3276    if( atom->flags2 & MYFLAG_UNREACHABLE )
3277        sec += sec;
3278
3279    dbgmsg( "reconnect interval for %s is %d seconds", tr_atomAddrStr( atom ), sec );
3280    return sec;
3281}
3282
3283static void
3284removePeer( Torrent * t, tr_peer * peer )
3285{
3286    tr_peer * removed;
3287    struct peer_atom * atom = peer->atom;
3288
3289    assert( torrentIsLocked( t ) );
3290    assert( atom );
3291
3292    atom->time = tr_time( );
3293
3294    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
3295
3296    if( replicationExists( t ) )
3297        tr_decrReplicationFromBitfield( t, &peer->have );
3298
3299    assert( removed == peer );
3300    peerDelete( t, removed );
3301}
3302
3303static void
3304closePeer( Torrent * t, tr_peer * peer )
3305{
3306    struct peer_atom * atom;
3307
3308    assert( t != NULL );
3309    assert( peer != NULL );
3310
3311    atom = peer->atom;
3312
3313    /* if we transferred piece data, then they might be good peers,
3314       so reset their `numFails' weight to zero. otherwise we connected
3315       to them fruitlessly, so mark it as another fail */
3316    if( atom->piece_data_time ) {
3317        tordbg( t, "resetting atom %s numFails to 0", tr_atomAddrStr(atom) );
3318        atom->numFails = 0;
3319    } else {
3320        ++atom->numFails;
3321        tordbg( t, "incremented atom %s numFails to %d", tr_atomAddrStr(atom), (int)atom->numFails );
3322    }
3323
3324    tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
3325    removePeer( t, peer );
3326}
3327
3328static void
3329removeAllPeers( Torrent * t )
3330{
3331    while( !tr_ptrArrayEmpty( &t->peers ) )
3332        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
3333}
3334
3335static void
3336closeBadPeers( Torrent * t, const time_t now_sec )
3337{
3338    if( !tr_ptrArrayEmpty( &t->peers ) )
3339    {
3340        int i;
3341        int peerCount;
3342        struct tr_peer ** peers = getPeersToClose( t, now_sec, &peerCount );
3343        for( i=0; i<peerCount; ++i )
3344            closePeer( t, peers[i] );
3345        tr_free( peers );
3346    }
3347}
3348
3349struct peer_liveliness
3350{
3351    tr_peer * peer;
3352    void * clientData;
3353    time_t pieceDataTime;
3354    time_t time;
3355    int speed;
3356    bool doPurge;
3357};
3358
3359static int
3360comparePeerLiveliness( const void * va, const void * vb )
3361{
3362    const struct peer_liveliness * a = va;
3363    const struct peer_liveliness * b = vb;
3364
3365    if( a->doPurge != b->doPurge )
3366        return a->doPurge ? 1 : -1;
3367
3368    if( a->speed != b->speed ) /* faster goes first */
3369        return a->speed > b->speed ? -1 : 1;
3370
3371    /* the one to give us data more recently goes first */
3372    if( a->pieceDataTime != b->pieceDataTime )
3373        return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
3374
3375    /* the one we connected to most recently goes first */
3376    if( a->time != b->time )
3377        return a->time > b->time ? -1 : 1;
3378
3379    return 0;
3380}
3381
3382static void
3383sortPeersByLivelinessImpl( tr_peer  ** peers,
3384                           void     ** clientData,
3385                           int         n,
3386                           uint64_t    now,
3387                           int (*compare) ( const void *va, const void *vb ) )
3388{
3389    int i;
3390    struct peer_liveliness *lives, *l;
3391
3392    /* build a sortable array of peer + extra info */
3393    lives = l = tr_new0( struct peer_liveliness, n );
3394    for( i=0; i<n; ++i, ++l )
3395    {
3396        tr_peer * p = peers[i];
3397        l->peer = p;
3398        l->doPurge = p->doPurge;
3399        l->pieceDataTime = p->atom->piece_data_time;
3400        l->time = p->atom->time;
3401        l->speed = tr_peerGetPieceSpeed_Bps( p, now, TR_UP )
3402                 + tr_peerGetPieceSpeed_Bps( p, now, TR_DOWN );
3403        if( clientData )
3404            l->clientData = clientData[i];
3405    }
3406
3407    /* sort 'em */
3408    assert( n == ( l - lives ) );
3409    qsort( lives, n, sizeof( struct peer_liveliness ), compare );
3410
3411    /* build the peer array */
3412    for( i=0, l=lives; i<n; ++i, ++l ) {
3413        peers[i] = l->peer;
3414        if( clientData )
3415            clientData[i] = l->clientData;
3416    }
3417    assert( n == ( l - lives ) );
3418
3419    /* cleanup */
3420    tr_free( lives );
3421}
3422
3423static void
3424sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now )
3425{
3426    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLiveliness );
3427}
3428
3429
3430static void
3431enforceTorrentPeerLimit( Torrent * t, uint64_t now )
3432{
3433    int n = tr_ptrArraySize( &t->peers );
3434    const int max = tr_torrentGetPeerLimit( t->tor );
3435    if( n > max )
3436    {
3437        void * base = tr_ptrArrayBase( &t->peers );
3438        tr_peer ** peers = tr_memdup( base, n*sizeof( tr_peer* ) );
3439        sortPeersByLiveliness( peers, NULL, n, now );
3440        while( n > max )
3441            closePeer( t, peers[--n] );
3442        tr_free( peers );
3443    }
3444}
3445
3446static void
3447enforceSessionPeerLimit( tr_session * session, uint64_t now )
3448{
3449    int n = 0;
3450    tr_torrent * tor = NULL;
3451    const int max = tr_sessionGetPeerLimit( session );
3452
3453    /* count the total number of peers */
3454    while(( tor = tr_torrentNext( session, tor )))
3455        n += tr_ptrArraySize( &tor->torrentPeers->peers );
3456
3457    /* if there are too many, prune out the worst */
3458    if( n > max )
3459    {
3460        tr_peer ** peers = tr_new( tr_peer*, n );
3461        Torrent ** torrents = tr_new( Torrent*, n );
3462
3463        /* populate the peer array */
3464        n = 0;
3465        tor = NULL;
3466        while(( tor = tr_torrentNext( session, tor ))) {
3467            int i;
3468            Torrent * t = tor->torrentPeers;
3469            const int tn = tr_ptrArraySize( &t->peers );
3470            for( i=0; i<tn; ++i, ++n ) {
3471                peers[n] = tr_ptrArrayNth( &t->peers, i );
3472                torrents[n] = t;
3473            }
3474        }
3475
3476        /* sort 'em */
3477        sortPeersByLiveliness( peers, (void**)torrents, n, now );
3478
3479        /* cull out the crappiest */
3480        while( n-- > max )
3481            closePeer( torrents[n], peers[n] );
3482
3483        /* cleanup */
3484        tr_free( torrents );
3485        tr_free( peers );
3486    }
3487}
3488
3489static void makeNewPeerConnections( tr_peerMgr * mgr, const int max );
3490
3491static void
3492reconnectPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3493{
3494    tr_torrent * tor;
3495    tr_peerMgr * mgr = vmgr;
3496    const time_t now_sec = tr_time( );
3497    const uint64_t now_msec = tr_time_msec( );
3498
3499    /**
3500    ***  enforce the per-session and per-torrent peer limits
3501    **/
3502
3503    /* if we're over the per-torrent peer limits, cull some peers */
3504    tor = NULL;
3505    while(( tor = tr_torrentNext( mgr->session, tor )))
3506        if( tor->isRunning )
3507            enforceTorrentPeerLimit( tor->torrentPeers, now_msec );
3508
3509    /* if we're over the per-session peer limits, cull some peers */
3510    enforceSessionPeerLimit( mgr->session, now_msec );
3511
3512    /* remove crappy peers */
3513    tor = NULL;
3514    while(( tor = tr_torrentNext( mgr->session, tor )))
3515        if( !tor->torrentPeers->isRunning )
3516            removeAllPeers( tor->torrentPeers );
3517        else
3518            closeBadPeers( tor->torrentPeers, now_sec );
3519
3520    /* try to make new peer connections */
3521    makeNewPeerConnections( mgr, MAX_CONNECTIONS_PER_PULSE );
3522}
3523
3524/****
3525*****
3526*****  BANDWIDTH ALLOCATION
3527*****
3528****/
3529
3530static void
3531pumpAllPeers( tr_peerMgr * mgr )
3532{
3533    tr_torrent * tor = NULL;
3534
3535    while(( tor = tr_torrentNext( mgr->session, tor )))
3536    {
3537        int j;
3538        Torrent * t = tor->torrentPeers;
3539
3540        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
3541        {
3542            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
3543            tr_peerMsgsPulse( peer->msgs );
3544        }
3545    }
3546}
3547
3548static void
3549queuePulse( tr_session * session, tr_direction dir )
3550{
3551    assert( tr_isSession( session ) );
3552    assert( tr_isDirection( dir ) );
3553
3554    if( tr_sessionGetQueueEnabled( session, dir ) )
3555    {
3556        int i;
3557        const int n = tr_sessionCountQueueFreeSlots( session, dir );
3558        for( i=0; i<n; i++ ) {
3559            tr_torrent * tor = tr_sessionGetNextQueuedTorrent( session, dir );
3560            if( tor != NULL ) {
3561                tr_torrentStartNow( tor );
3562                if( tor->queue_started_callback != NULL )
3563                    (*tor->queue_started_callback)( tor, tor->queue_started_user_data );
3564            }
3565        }
3566    }
3567}
3568
3569static void
3570bandwidthPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3571{
3572    tr_torrent * tor;
3573    tr_peerMgr * mgr = vmgr;
3574    tr_session * session = mgr->session;
3575    managerLock( mgr );
3576
3577    /* FIXME: this next line probably isn't necessary... */
3578    pumpAllPeers( mgr );
3579
3580    /* allocate bandwidth to the peers */
3581    tr_bandwidthAllocate( &session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
3582    tr_bandwidthAllocate( &session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
3583
3584    /* torrent upkeep */
3585    tor = NULL;
3586    while(( tor = tr_torrentNext( session, tor )))
3587    {
3588        /* possibly stop torrents that have seeded enough */
3589        tr_torrentCheckSeedLimit( tor );
3590
3591        /* run the completeness check for any torrents that need it */
3592        if( tor->torrentPeers->needsCompletenessCheck ) {
3593            tor->torrentPeers->needsCompletenessCheck  = false;
3594            tr_torrentRecheckCompleteness( tor );
3595        }
3596
3597        /* stop torrents that are ready to stop, but couldn't be stopped
3598           earlier during the peer-io callback call chain */
3599        if( tor->isStopping )
3600            tr_torrentStop( tor );
3601    }
3602
3603    /* pump the queues */
3604    queuePulse( session, TR_UP );
3605    queuePulse( session, TR_DOWN );
3606
3607    reconnectPulse( 0, 0, mgr );
3608
3609    tr_timerAddMsec( mgr->bandwidthTimer, BANDWIDTH_PERIOD_MSEC );
3610    managerUnlock( mgr );
3611}
3612
3613/***
3614****
3615***/
3616
3617static int
3618compareAtomPtrsByAddress( const void * va, const void *vb )
3619{
3620    const struct peer_atom * a = * (const struct peer_atom**) va;
3621    const struct peer_atom * b = * (const struct peer_atom**) vb;
3622
3623    assert( tr_isAtom( a ) );
3624    assert( tr_isAtom( b ) );
3625
3626    return tr_address_compare( &a->addr, &b->addr );
3627}
3628
3629/* best come first, worst go last */
3630static int
3631compareAtomPtrsByShelfDate( const void * va, const void *vb )
3632{
3633    time_t atime;
3634    time_t btime;
3635    const struct peer_atom * a = * (const struct peer_atom**) va;
3636    const struct peer_atom * b = * (const struct peer_atom**) vb;
3637    const int data_time_cutoff_secs = 60 * 60;
3638    const time_t tr_now = tr_time( );
3639
3640    assert( tr_isAtom( a ) );
3641    assert( tr_isAtom( b ) );
3642
3643    /* primary key: the last piece data time *if* it was within the last hour */
3644    atime = a->piece_data_time; if( atime + data_time_cutoff_secs < tr_now ) atime = 0;
3645    btime = b->piece_data_time; if( btime + data_time_cutoff_secs < tr_now ) btime = 0;
3646    if( atime != btime )
3647        return atime > btime ? -1 : 1;
3648
3649    /* secondary key: shelf date. */
3650    if( a->shelf_date != b->shelf_date )
3651        return a->shelf_date > b->shelf_date ? -1 : 1;
3652
3653    return 0;
3654}
3655
3656static int
3657getMaxAtomCount( const tr_torrent * tor )
3658{
3659    const int n = tor->maxConnectedPeers;
3660    /* approximate fit of the old jump discontinuous function */
3661    if( n >= 55 ) return     n + 150;
3662    if( n >= 20 ) return 2 * n + 95;
3663    return               4 * n + 55;
3664}
3665
3666static void
3667atomPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3668{
3669    tr_torrent * tor = NULL;
3670    tr_peerMgr * mgr = vmgr;
3671    managerLock( mgr );
3672
3673    while(( tor = tr_torrentNext( mgr->session, tor )))
3674    {
3675        int atomCount;
3676        Torrent * t = tor->torrentPeers;
3677        const int maxAtomCount = getMaxAtomCount( tor );
3678        struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
3679
3680        if( atomCount > maxAtomCount ) /* we've got too many atoms... time to prune */
3681        {
3682            int i;
3683            int keepCount = 0;
3684            int testCount = 0;
3685            struct peer_atom ** keep = tr_new( struct peer_atom*, atomCount );
3686            struct peer_atom ** test = tr_new( struct peer_atom*, atomCount );
3687
3688            /* keep the ones that are in use */
3689            for( i=0; i<atomCount; ++i ) {
3690                struct peer_atom * atom = atoms[i];
3691                if( peerIsInUse( t, atom ) )
3692                    keep[keepCount++] = atom;
3693                else
3694                    test[testCount++] = atom;
3695            }
3696
3697            /* if there's room, keep the best of what's left */
3698            i = 0;
3699            if( keepCount < maxAtomCount ) {
3700                qsort( test, testCount, sizeof( struct peer_atom * ), compareAtomPtrsByShelfDate );
3701                while( i<testCount && keepCount<maxAtomCount )
3702                    keep[keepCount++] = test[i++];
3703            }
3704
3705            /* free the culled atoms */
3706            while( i<testCount )
3707                tr_free( test[i++] );
3708
3709            /* rebuild Torrent.pool with what's left */
3710            tr_ptrArrayDestruct( &t->pool, NULL );
3711            t->pool = TR_PTR_ARRAY_INIT;
3712            qsort( keep, keepCount, sizeof( struct peer_atom * ), compareAtomPtrsByAddress );
3713            for( i=0; i<keepCount; ++i )
3714                tr_ptrArrayAppend( &t->pool, keep[i] );
3715
3716            tordbg( t, "max atom count is %d... pruned from %d to %d\n", maxAtomCount, atomCount, keepCount );
3717
3718            /* cleanup */
3719            tr_free( test );
3720            tr_free( keep );
3721        }
3722    }
3723
3724    tr_timerAddMsec( mgr->atomTimer, ATOM_PERIOD_MSEC );
3725    managerUnlock( mgr );
3726}
3727
3728/***
3729****
3730****
3731****
3732***/
3733
3734/* is this atom someone that we'd want to initiate a connection to? */
3735static bool
3736isPeerCandidate( const tr_torrent * tor, struct peer_atom * atom, const time_t now )
3737{
3738    /* not if we're both seeds */
3739    if( tr_torrentIsSeed( tor ) && atomIsSeed( atom ) )
3740        return false;
3741
3742    /* not if we've already got a connection to them... */
3743    if( peerIsInUse( tor->torrentPeers, atom ) )
3744        return false;
3745
3746    /* not if we just tried them already */
3747    if( ( now - atom->time ) < getReconnectIntervalSecs( atom, now ) )
3748        return false;
3749
3750    /* not if they're blocklisted */
3751    if( isAtomBlocklisted( tor->session, atom ) )
3752        return false;
3753
3754    /* not if they're banned... */
3755    if( atom->flags2 & MYFLAG_BANNED )
3756        return false;
3757
3758    return true;
3759}
3760
3761struct peer_candidate
3762{
3763    uint64_t score;
3764    tr_torrent * tor;
3765    struct peer_atom * atom;
3766};
3767
3768static bool
3769torrentWasRecentlyStarted( const tr_torrent * tor )
3770{
3771    return difftime( tr_time( ), tor->startDate ) < 120;
3772}
3773
3774static inline uint64_t
3775addValToKey( uint64_t value, int width, uint64_t addme )
3776{
3777    value = (value << (uint64_t)width);
3778    value |= addme;
3779    return value;
3780}
3781
3782/* smaller value is better */
3783static uint64_t
3784getPeerCandidateScore( const tr_torrent * tor, const struct peer_atom * atom, uint8_t salt  )
3785{
3786    uint64_t i;
3787    uint64_t score = 0;
3788    const bool failed = atom->lastConnectionAt < atom->lastConnectionAttemptAt;
3789
3790    /* prefer peers we've connected to, or never tried, over peers we failed to connect to. */
3791    i = failed ? 1 : 0;
3792    score = addValToKey( score, 1, i );
3793
3794    /* prefer the one we attempted least recently (to cycle through all peers) */
3795    i = atom->lastConnectionAttemptAt;
3796    score = addValToKey( score, 32, i );
3797
3798    /* prefer peers belonging to a torrent of a higher priority */
3799    switch( tr_torrentGetPriority( tor ) ) {
3800        case TR_PRI_HIGH:    i = 0; break;
3801        case TR_PRI_NORMAL:  i = 1; break;
3802        case TR_PRI_LOW:     i = 2; break;
3803    }
3804    score = addValToKey( score, 4, i );
3805
3806    /* prefer recently-started torrents */
3807    i = torrentWasRecentlyStarted( tor ) ? 0 : 1;
3808    score = addValToKey( score, 1, i );
3809
3810    /* prefer torrents we're downloading with */
3811    i = tr_torrentIsSeed( tor ) ? 1 : 0;
3812    score = addValToKey( score, 1, i );
3813
3814    /* prefer peers that are known to be connectible */
3815    i = ( atom->flags & ADDED_F_CONNECTABLE ) ? 0 : 1;
3816    score = addValToKey( score, 1, i );
3817
3818    /* prefer peers that we might have a chance of uploading to...
3819       so lower seed probability is better */
3820    if( atom->seedProbability == 100 ) i = 101;
3821    else if( atom->seedProbability == -1 ) i = 100;
3822    else i = atom->seedProbability;
3823    score = addValToKey( score, 8, i );
3824
3825    /* Prefer peers that we got from more trusted sources.
3826     * lower `fromBest' values indicate more trusted sources */
3827    score = addValToKey( score, 4, atom->fromBest );
3828
3829    /* salt */
3830    score = addValToKey( score, 8, salt );
3831
3832    return score;
3833}
3834
3835/* sort an array of peer candidates */
3836static int
3837comparePeerCandidates( const void * va, const void * vb )
3838{
3839    const struct peer_candidate * a = va;
3840    const struct peer_candidate * b = vb;
3841
3842    if( a->score < b->score ) return -1;
3843    if( a->score > b->score ) return 1;
3844
3845    return 0;
3846}
3847
3848/** @return an array of all the atoms we might want to connect to */
3849static struct peer_candidate*
3850getPeerCandidates( tr_session * session, int * candidateCount )
3851{
3852    int atomCount;
3853    int peerCount;
3854    tr_torrent * tor;
3855    struct peer_candidate * candidates;
3856    struct peer_candidate * walk;
3857    const time_t now = tr_time( );
3858    const uint64_t now_msec = tr_time_msec( );
3859    /* leave 5% of connection slots for incoming connections -- ticket #2609 */
3860    const int maxCandidates = tr_sessionGetPeerLimit( session ) * 0.95;
3861
3862    /* count how many peers and atoms we've got */
3863    tor= NULL;
3864    atomCount = 0;
3865    peerCount = 0;
3866    while(( tor = tr_torrentNext( session, tor ))) {
3867        atomCount += tr_ptrArraySize( &tor->torrentPeers->pool );
3868        peerCount += tr_ptrArraySize( &tor->torrentPeers->peers );
3869    }
3870
3871    /* don't start any new handshakes if we're full up */
3872    if( maxCandidates <= peerCount ) {
3873        *candidateCount = 0;
3874        return NULL;
3875    }
3876
3877    /* allocate an array of candidates */
3878    walk = candidates = tr_new( struct peer_candidate, atomCount );
3879
3880    /* populate the candidate array */
3881    tor = NULL;
3882    while(( tor = tr_torrentNext( session, tor )))
3883    {
3884        int i, nAtoms;
3885        struct peer_atom ** atoms;
3886
3887        if( !tor->torrentPeers->isRunning )
3888            continue;
3889
3890        /* if we've already got enough peers in this torrent... */
3891        if( tr_torrentGetPeerLimit( tor ) <= tr_ptrArraySize( &tor->torrentPeers->peers ) )
3892            continue;
3893
3894        /* if we've already got enough speed in this torrent... */
3895        if( tr_torrentIsSeed( tor ) && isBandwidthMaxedOut( &tor->bandwidth, now_msec, TR_UP ) )
3896            continue;
3897
3898        atoms = (struct peer_atom**) tr_ptrArrayPeek( &tor->torrentPeers->pool, &nAtoms );
3899        for( i=0; i<nAtoms; ++i )
3900        {
3901            struct peer_atom * atom = atoms[i];
3902
3903            if( isPeerCandidate( tor, atom, now ) )
3904            {
3905                const uint8_t salt = tr_cryptoWeakRandInt( 1024 );
3906                walk->tor = tor;
3907                walk->atom = atom;
3908                walk->score = getPeerCandidateScore( tor, atom, salt );
3909                ++walk;
3910            }
3911        }
3912    }
3913
3914    *candidateCount = walk - candidates;
3915    if( *candidateCount > 1 )
3916        qsort( candidates, *candidateCount, sizeof( struct peer_candidate ), comparePeerCandidates );
3917    return candidates;
3918}
3919
3920static void
3921initiateConnection( tr_peerMgr * mgr, Torrent * t, struct peer_atom * atom )
3922{
3923    tr_peerIo * io;
3924    const time_t now = tr_time( );
3925    bool utp = tr_sessionIsUTPEnabled(mgr->session) && !atom->utp_failed;
3926
3927    if( atom->fromFirst == TR_PEER_FROM_PEX )
3928        /* PEX has explicit signalling for uTP support.  If an atom
3929           originally came from PEX and doesn't have the uTP flag, skip the
3930           uTP connection attempt.  Are we being optimistic here? */
3931        utp = utp && (atom->flags & ADDED_F_UTP_FLAGS);
3932
3933    tordbg( t, "Starting an OUTGOING%s connection with %s",
3934            utp ? " µTP" : "",
3935            tr_atomAddrStr( atom ) );
3936
3937    io = tr_peerIoNewOutgoing( mgr->session,
3938                               &mgr->session->bandwidth,
3939                               &atom->addr,
3940                               atom->port,
3941                               t->tor->info.hash,
3942                               t->tor->completeness == TR_SEED,
3943                               utp );
3944
3945    if( io == NULL )
3946    {
3947        tordbg( t, "peerIo not created; marking peer %s as unreachable",
3948                tr_atomAddrStr( atom ) );
3949        atom->flags2 |= MYFLAG_UNREACHABLE;
3950        atom->numFails++;
3951    }
3952    else
3953    {
3954        tr_handshake * handshake = tr_handshakeNew( io,
3955                                                    mgr->session->encryptionMode,
3956                                                    myHandshakeDoneCB,
3957                                                    mgr );
3958
3959        assert( tr_peerIoGetTorrentHash( io ) );
3960
3961        tr_peerIoUnref( io ); /* balanced by the initial ref
3962                                 in tr_peerIoNewOutgoing() */
3963
3964        tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
3965                                 handshakeCompare );
3966    }
3967
3968    atom->lastConnectionAttemptAt = now;
3969    atom->time = now;
3970}
3971
3972static void
3973initiateCandidateConnection( tr_peerMgr * mgr, struct peer_candidate * c )
3974{
3975#if 0
3976    fprintf( stderr, "Starting an OUTGOING connection with %s - [%s] seedProbability==%d; %s, %s\n",
3977             tr_atomAddrStr( c->atom ),
3978             tr_torrentName( c->tor ),
3979             (int)c->atom->seedProbability,
3980             tr_torrentIsPrivate( c->tor ) ? "private" : "public",
3981             tr_torrentIsSeed( c->tor ) ? "seed" : "downloader" );
3982#endif
3983
3984    initiateConnection( mgr, c->tor->torrentPeers, c->atom );
3985}
3986
3987static void
3988makeNewPeerConnections( struct tr_peerMgr * mgr, const int max )
3989{
3990    int i, n;
3991    struct peer_candidate * candidates;
3992
3993    candidates = getPeerCandidates( mgr->session, &n );
3994
3995    for( i=0; i<n && i<max; ++i )
3996        initiateCandidateConnection( mgr, &candidates[i] );
3997
3998    tr_free( candidates );
3999}
Note: See TracBrowser for help on using the repository browser.