source: trunk/libtransmission/peer-mgr.c @ 12615

Last change on this file since 12615 was 12615, checked in by jordan, 10 years ago

Add a callback to be invoked when the queue starts a torrent.

  • Property svn:keywords set to Date Rev Author Id
File size: 116.2 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 12615 2011-08-03 03:14:57Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h> /* error codes ERANGE, ... */
15#include <limits.h> /* INT_MAX */
16#include <string.h> /* memcpy, memcmp, strstr */
17#include <stdlib.h> /* qsort */
18
19#include <event2/event.h>
20#include <libutp/utp.h>
21
22#include "transmission.h"
23#include "announcer.h"
24#include "bandwidth.h"
25#include "blocklist.h"
26#include "cache.h"
27#include "clients.h"
28#include "completion.h"
29#include "crypto.h"
30#include "handshake.h"
31#include "net.h"
32#include "peer-io.h"
33#include "peer-mgr.h"
34#include "peer-msgs.h"
35#include "ptrarray.h"
36#include "session.h"
37#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
38#include "torrent.h"
39#include "utils.h"
40#include "webseed.h"
41
42enum
43{
44    /* how frequently to cull old atoms */
45    ATOM_PERIOD_MSEC = ( 60 * 1000 ),
46
47    /* how frequently to change which peers are choked */
48    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
49
50    /* an optimistically unchoked peer is immune from rechoking
51       for this many calls to rechokeUploads(). */
52    OPTIMISTIC_UNCHOKE_MULTIPLIER = 4,
53
54    /* how frequently to reallocate bandwidth */
55    BANDWIDTH_PERIOD_MSEC = 500,
56
57    /* how frequently to age out old piece request lists */
58    REFILL_UPKEEP_PERIOD_MSEC = ( 10 * 1000 ),
59
60    /* how frequently to decide which peers live and die */
61    RECONNECT_PERIOD_MSEC = 500,
62
63    /* when many peers are available, keep idle ones this long */
64    MIN_UPLOAD_IDLE_SECS = ( 60 ),
65
66    /* when few peers are available, keep idle ones this long */
67    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
68
69    /* max number of peers to ask for per second overall.
70     * this throttle is to avoid overloading the router */
71    MAX_CONNECTIONS_PER_SECOND = 12,
72
73    MAX_CONNECTIONS_PER_PULSE = (int)(MAX_CONNECTIONS_PER_SECOND * (RECONNECT_PERIOD_MSEC/1000.0)),
74
75    /* number of bad pieces a peer is allowed to send before we ban them */
76    MAX_BAD_PIECES_PER_PEER = 5,
77
78    /* amount of time to keep a list of request pieces lying around
79       before it's considered too old and needs to be rebuilt */
80    PIECE_LIST_SHELF_LIFE_SECS = 60,
81
82    /* use for bitwise operations w/peer_atom.flags2 */
83    MYFLAG_BANNED = 1,
84
85    /* use for bitwise operations w/peer_atom.flags2 */
86    /* unreachable for now... but not banned.
87     * if they try to connect to us it's okay */
88    MYFLAG_UNREACHABLE = 2,
89
90    /* the minimum we'll wait before attempting to reconnect to a peer */
91    MINIMUM_RECONNECT_INTERVAL_SECS = 5,
92
93    /** how long we'll let requests we've made linger before we cancel them */
94    REQUEST_TTL_SECS = 120,
95
96    NO_BLOCKS_CANCEL_HISTORY = 120,
97
98    CANCEL_HISTORY_SEC = 60
99};
100
101const tr_peer_event TR_PEER_EVENT_INIT = { 0, 0, NULL, 0, 0, 0, false, 0 };
102
103/**
104***
105**/
106
107/**
108 * Peer information that should be kept even before we've connected and
109 * after we've disconnected. These are kept in a pool of peer_atoms to decide
110 * which ones would make good candidates for connecting to, and to watch out
111 * for banned peers.
112 *
113 * @see tr_peer
114 * @see tr_peermsgs
115 */
116struct peer_atom
117{
118    uint8_t     fromFirst;          /* where the peer was first found */
119    uint8_t     fromBest;           /* the "best" value of where the peer has been found */
120    uint8_t     flags;              /* these match the added_f flags */
121    uint8_t     flags2;             /* flags that aren't defined in added_f */
122    int8_t      seedProbability;    /* how likely is this to be a seed... [0..100] or -1 for unknown */
123    int8_t      blocklisted;        /* -1 for unknown, true for blocklisted, false for not blocklisted */
124
125    tr_port     port;
126    bool        utp_failed;         /* We recently failed to connect over uTP */
127    uint16_t    numFails;
128    time_t      time;               /* when the peer's connection status last changed */
129    time_t      piece_data_time;
130
131    time_t      lastConnectionAttemptAt;
132    time_t      lastConnectionAt;
133
134    /* similar to a TTL field, but less rigid --
135     * if the swarm is small, the atom will be kept past this date. */
136    time_t      shelf_date;
137    tr_peer   * peer;               /* will be NULL if not connected */
138    tr_address  addr;
139};
140
141#ifdef NDEBUG
142#define tr_isAtom(a) (TRUE)
143#else
144static bool
145tr_isAtom( const struct peer_atom * atom )
146{
147    return ( atom != NULL )
148        && ( atom->fromFirst < TR_PEER_FROM__MAX )
149        && ( atom->fromBest < TR_PEER_FROM__MAX )
150        && ( tr_address_is_valid( &atom->addr ) );
151}
152#endif
153
154static const char*
155tr_atomAddrStr( const struct peer_atom * atom )
156{
157    return atom ? tr_peerIoAddrStr( &atom->addr, atom->port ) : "[no atom]";
158}
159
160struct block_request
161{
162    tr_block_index_t block;
163    tr_peer * peer;
164    time_t sentAt;
165};
166
167struct weighted_piece
168{
169    tr_piece_index_t index;
170    int16_t salt;
171    int16_t requestCount;
172};
173
174enum piece_sort_state
175{
176    PIECES_UNSORTED,
177    PIECES_SORTED_BY_INDEX,
178    PIECES_SORTED_BY_WEIGHT
179};
180
181/** @brief Opaque, per-torrent data structure for peer connection information */
182typedef struct tr_torrent_peers
183{
184    tr_ptrArray                outgoingHandshakes; /* tr_handshake */
185    tr_ptrArray                pool; /* struct peer_atom */
186    tr_ptrArray                peers; /* tr_peer */
187    tr_ptrArray                webseeds; /* tr_webseed */
188
189    tr_torrent               * tor;
190    struct tr_peerMgr        * manager;
191
192    tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
193    int                        optimisticUnchokeTimeScaler;
194
195    bool                       isRunning;
196    bool                       needsCompletenessCheck;
197
198    struct block_request     * requests;
199    int                        requestCount;
200    int                        requestAlloc;
201
202    struct weighted_piece    * pieces;
203    int                        pieceCount;
204    enum piece_sort_state      pieceSortState;
205
206    /* An array of pieceCount items stating how many peers have each piece.
207       This is used to help us for downloading pieces "rarest first."
208       This may be NULL if we don't have metainfo yet, or if we're not
209       downloading and don't care about rarity */
210    uint16_t                 * pieceReplication;
211    size_t                     pieceReplicationSize;
212
213    int                        interestedCount;
214    int                        maxPeers;
215    time_t                     lastCancel;
216
217    /* Before the endgame this should be 0. In endgame, is contains the average
218     * number of pending requests per peer. Only peers which have more pending
219     * requests are considered 'fast' are allowed to request a block that's
220     * already been requested from another (slower?) peer. */
221    int                        endgame;
222}
223Torrent;
224
225struct tr_peerMgr
226{
227    tr_session    * session;
228    tr_ptrArray     incomingHandshakes; /* tr_handshake */
229    struct event  * bandwidthTimer;
230    struct event  * rechokeTimer;
231    struct event  * refillUpkeepTimer;
232    struct event  * atomTimer;
233};
234
235#define tordbg( t, ... ) \
236    do { \
237        if( tr_deepLoggingIsActive( ) ) \
238            tr_deepLog( __FILE__, __LINE__, tr_torrentName( t->tor ), __VA_ARGS__ ); \
239    } while( 0 )
240
241#define dbgmsg( ... ) \
242    do { \
243        if( tr_deepLoggingIsActive( ) ) \
244            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
245    } while( 0 )
246
247/**
248***
249**/
250
251static inline void
252managerLock( const struct tr_peerMgr * manager )
253{
254    tr_sessionLock( manager->session );
255}
256
257static inline void
258managerUnlock( const struct tr_peerMgr * manager )
259{
260    tr_sessionUnlock( manager->session );
261}
262
263static inline void
264torrentLock( Torrent * torrent )
265{
266    managerLock( torrent->manager );
267}
268
269static inline void
270torrentUnlock( Torrent * torrent )
271{
272    managerUnlock( torrent->manager );
273}
274
275static inline int
276torrentIsLocked( const Torrent * t )
277{
278    return tr_sessionIsLocked( t->manager->session );
279}
280
281/**
282***
283**/
284
285static int
286handshakeCompareToAddr( const void * va, const void * vb )
287{
288    const tr_handshake * a = va;
289
290    return tr_address_compare( tr_handshakeGetAddr( a, NULL ), vb );
291}
292
293static int
294handshakeCompare( const void * a, const void * b )
295{
296    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
297}
298
299static inline tr_handshake*
300getExistingHandshake( tr_ptrArray * handshakes, const tr_address * addr )
301{
302    if( tr_ptrArrayEmpty( handshakes ) )
303        return NULL;
304
305    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
306}
307
308static int
309comparePeerAtomToAddress( const void * va, const void * vb )
310{
311    const struct peer_atom * a = va;
312
313    return tr_address_compare( &a->addr, vb );
314}
315
316static int
317compareAtomsByAddress( const void * va, const void * vb )
318{
319    const struct peer_atom * b = vb;
320
321    assert( tr_isAtom( b ) );
322
323    return comparePeerAtomToAddress( va, &b->addr );
324}
325
326/**
327***
328**/
329
330const tr_address *
331tr_peerAddress( const tr_peer * peer )
332{
333    return &peer->atom->addr;
334}
335
336static Torrent*
337getExistingTorrent( tr_peerMgr *    manager,
338                    const uint8_t * hash )
339{
340    tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash );
341
342    return tor == NULL ? NULL : tor->torrentPeers;
343}
344
345static int
346peerCompare( const void * a, const void * b )
347{
348    return tr_address_compare( tr_peerAddress( a ), tr_peerAddress( b ) );
349}
350
351static struct peer_atom*
352getExistingAtom( const Torrent    * t,
353                 const tr_address * addr )
354{
355    Torrent * tt = (Torrent*)t;
356    assert( torrentIsLocked( t ) );
357    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
358}
359
360static bool
361peerIsInUse( const Torrent * ct, const struct peer_atom * atom )
362{
363    Torrent * t = (Torrent*) ct;
364
365    assert( torrentIsLocked ( t ) );
366
367    return ( atom->peer != NULL )
368        || getExistingHandshake( &t->outgoingHandshakes, &atom->addr )
369        || getExistingHandshake( &t->manager->incomingHandshakes, &atom->addr );
370}
371
372void
373tr_peerConstruct( tr_peer * peer )
374{
375    memset( peer, 0, sizeof( tr_peer ) );
376
377    peer->have = TR_BITFIELD_INIT;
378}
379
380static tr_peer*
381peerNew( struct peer_atom * atom )
382{
383    tr_peer * peer = tr_new( tr_peer, 1 );
384    tr_peerConstruct( peer );
385
386    peer->atom = atom;
387    atom->peer = peer;
388
389    return peer;
390}
391
392static tr_peer*
393getPeer( Torrent * torrent, struct peer_atom * atom )
394{
395    tr_peer * peer;
396
397    assert( torrentIsLocked( torrent ) );
398
399    peer = atom->peer;
400
401    if( peer == NULL )
402    {
403        peer = peerNew( atom );
404        tr_bitfieldConstruct( &peer->have, torrent->tor->blockCount );
405        tr_bitfieldConstruct( &peer->blame, torrent->tor->blockCount );
406        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
407    }
408
409    return peer;
410}
411
412static void peerDeclinedAllRequests( Torrent *, const tr_peer * );
413
414void
415tr_peerDestruct( tr_torrent * tor, tr_peer * peer )
416{
417    assert( peer != NULL );
418
419    peerDeclinedAllRequests( tor->torrentPeers, peer );
420
421    if( peer->msgs != NULL )
422        tr_peerMsgsFree( peer->msgs );
423
424    if( peer->io ) {
425        tr_peerIoClear( peer->io );
426        tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
427    }
428
429    tr_bitfieldDestruct( &peer->have );
430    tr_bitfieldDestruct( &peer->blame );
431    tr_free( peer->client );
432
433    if( peer->atom )
434        peer->atom->peer = NULL;
435}
436
437static void
438peerDelete( Torrent * t, tr_peer * peer )
439{
440    tr_peerDestruct( t->tor, peer );
441    tr_free( peer );
442}
443
444static bool
445replicationExists( const Torrent * t )
446{
447    return t->pieceReplication != NULL;
448}
449
450static void
451replicationFree( Torrent * t )
452{
453    tr_free( t->pieceReplication );
454    t->pieceReplication = NULL;
455    t->pieceReplicationSize = 0;
456}
457
458static void
459replicationNew( Torrent * t )
460{
461    tr_piece_index_t piece_i;
462    const tr_piece_index_t piece_count = t->tor->info.pieceCount;
463    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
464    const int peer_count = tr_ptrArraySize( &t->peers );
465
466    assert( !replicationExists( t ) );
467
468    t->pieceReplicationSize = piece_count;
469    t->pieceReplication = tr_new0( uint16_t, piece_count );
470
471    for( piece_i=0; piece_i<piece_count; ++piece_i )
472    {
473        int peer_i;
474        uint16_t r = 0;
475
476        for( peer_i=0; peer_i<peer_count; ++peer_i )
477            if( tr_bitfieldHas( &peers[peer_i]->have, piece_i ) )
478                ++r;
479
480        t->pieceReplication[piece_i] = r;
481    }
482}
483
484static void
485torrentFree( void * vt )
486{
487    Torrent * t = vt;
488
489    assert( t );
490    assert( !t->isRunning );
491    assert( torrentIsLocked( t ) );
492    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
493    assert( tr_ptrArrayEmpty( &t->peers ) );
494
495    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
496    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
497    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
498    tr_ptrArrayDestruct( &t->peers, NULL );
499
500    replicationFree( t );
501
502    tr_free( t->requests );
503    tr_free( t->pieces );
504    tr_free( t );
505}
506
507static void peerCallbackFunc( tr_peer *, const tr_peer_event *, void * );
508
509static Torrent*
510torrentNew( tr_peerMgr * manager, tr_torrent * tor )
511{
512    int       i;
513    Torrent * t;
514
515    t = tr_new0( Torrent, 1 );
516    t->manager = manager;
517    t->tor = tor;
518    t->pool = TR_PTR_ARRAY_INIT;
519    t->peers = TR_PTR_ARRAY_INIT;
520    t->webseeds = TR_PTR_ARRAY_INIT;
521    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
522
523    for( i = 0; i < tor->info.webseedCount; ++i )
524    {
525        tr_webseed * w =
526            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
527        tr_ptrArrayAppend( &t->webseeds, w );
528    }
529
530    return t;
531}
532
533static void ensureMgrTimersExist( struct tr_peerMgr * m );
534
535tr_peerMgr*
536tr_peerMgrNew( tr_session * session )
537{
538    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
539    m->session = session;
540    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
541    ensureMgrTimersExist( m );
542    return m;
543}
544
545static void
546deleteTimer( struct event ** t )
547{
548    if( *t != NULL )
549    {
550        event_free( *t );
551        *t = NULL;
552    }
553}
554
555static void
556deleteTimers( struct tr_peerMgr * m )
557{
558    deleteTimer( &m->atomTimer );
559    deleteTimer( &m->bandwidthTimer );
560    deleteTimer( &m->rechokeTimer );
561    deleteTimer( &m->refillUpkeepTimer );
562}
563
564void
565tr_peerMgrFree( tr_peerMgr * manager )
566{
567    managerLock( manager );
568
569    deleteTimers( manager );
570
571    /* free the handshakes. Abort invokes handshakeDoneCB(), which removes
572     * the item from manager->handshakes, so this is a little roundabout... */
573    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
574        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
575
576    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
577
578    managerUnlock( manager );
579    tr_free( manager );
580}
581
582static int
583clientIsDownloadingFrom( const tr_torrent * tor, const tr_peer * peer )
584{
585    if( !tr_torrentHasMetadata( tor ) )
586        return true;
587
588    return peer->clientIsInterested && !peer->clientIsChoked;
589}
590
591static int
592clientIsUploadingTo( const tr_peer * peer )
593{
594    return peer->peerIsInterested && !peer->peerIsChoked;
595}
596
597/***
598****
599***/
600
601void
602tr_peerMgrOnBlocklistChanged( tr_peerMgr * mgr )
603{
604    tr_torrent * tor = NULL;
605    tr_session * session = mgr->session;
606
607    /* we cache whether or not a peer is blocklisted...
608       since the blocklist has changed, erase that cached value */
609    while(( tor = tr_torrentNext( session, tor )))
610    {
611        int i;
612        Torrent * t = tor->torrentPeers;
613        const int n = tr_ptrArraySize( &t->pool );
614        for( i=0; i<n; ++i ) {
615            struct peer_atom * atom = tr_ptrArrayNth( &t->pool, i );
616            atom->blocklisted = -1;
617        }
618    }
619}
620
621static bool
622isAtomBlocklisted( tr_session * session, struct peer_atom * atom )
623{
624    if( atom->blocklisted < 0 )
625        atom->blocklisted = tr_sessionIsAddressBlocked( session, &atom->addr );
626
627    assert( tr_isBool( atom->blocklisted ) );
628    return atom->blocklisted;
629}
630
631
632/***
633****
634***/
635
636static void
637atomSetSeedProbability( struct peer_atom * atom, int seedProbability )
638{
639    assert( atom != NULL );
640    assert( -1<=seedProbability && seedProbability<=100 );
641
642    atom->seedProbability = seedProbability;
643
644    if( seedProbability == 100 )
645        atom->flags |= ADDED_F_SEED_FLAG;
646    else if( seedProbability != -1 )
647        atom->flags &= ~ADDED_F_SEED_FLAG;
648}
649
650static inline bool
651atomIsSeed( const struct peer_atom * atom )
652{
653    return atom->seedProbability == 100;
654}
655
656static void
657atomSetSeed( const Torrent * t, struct peer_atom * atom )
658{
659    if( !atomIsSeed( atom ) )
660    {
661        tordbg( t, "marking peer %s as a seed", tr_atomAddrStr( atom ) );
662
663        atomSetSeedProbability( atom, 100 );
664    }
665}
666
667
668bool
669tr_peerMgrPeerIsSeed( const tr_torrent  * tor,
670                      const tr_address  * addr )
671{
672    bool isSeed = false;
673    const Torrent * t = tor->torrentPeers;
674    const struct peer_atom * atom = getExistingAtom( t, addr );
675
676    if( atom )
677        isSeed = atomIsSeed( atom );
678
679    return isSeed;
680}
681
682void
683tr_peerMgrSetUtpSupported( tr_torrent * tor, const tr_address * addr )
684{
685    struct peer_atom * atom = getExistingAtom( tor->torrentPeers, addr );
686
687    if( atom )
688        atom->flags |= ADDED_F_UTP_FLAGS;
689}
690
691void
692tr_peerMgrSetUtpFailed( tr_torrent *tor, const tr_address *addr, bool failed )
693{
694    struct peer_atom * atom = getExistingAtom( tor->torrentPeers, addr );
695
696    if( atom )
697        atom->utp_failed = failed;
698}
699
700
701/**
702***  REQUESTS
703***
704*** There are two data structures associated with managing block requests:
705***
706*** 1. Torrent::requests, an array of "struct block_request" which keeps
707***    track of which blocks have been requested, and when, and by which peers.
708***    This is list is used for (a) cancelling requests that have been pending
709***    for too long and (b) avoiding duplicate requests before endgame.
710***
711*** 2. Torrent::pieces, an array of "struct weighted_piece" which lists the
712***    pieces that we want to request. It's used to decide which blocks to
713***    return next when tr_peerMgrGetBlockRequests() is called.
714**/
715
716/**
717*** struct block_request
718**/
719
720static int
721compareReqByBlock( const void * va, const void * vb )
722{
723    const struct block_request * a = va;
724    const struct block_request * b = vb;
725
726    /* primary key: block */
727    if( a->block < b->block ) return -1;
728    if( a->block > b->block ) return 1;
729
730    /* secondary key: peer */
731    if( a->peer < b->peer ) return -1;
732    if( a->peer > b->peer ) return 1;
733
734    return 0;
735}
736
737static void
738requestListAdd( Torrent * t, tr_block_index_t block, tr_peer * peer )
739{
740    struct block_request key;
741
742    /* ensure enough room is available... */
743    if( t->requestCount + 1 >= t->requestAlloc )
744    {
745        const int CHUNK_SIZE = 128;
746        t->requestAlloc += CHUNK_SIZE;
747        t->requests = tr_renew( struct block_request,
748                                t->requests, t->requestAlloc );
749    }
750
751    /* populate the record we're inserting */
752    key.block = block;
753    key.peer = peer;
754    key.sentAt = tr_time( );
755
756    /* insert the request to our array... */
757    {
758        bool exact;
759        const int pos = tr_lowerBound( &key, t->requests, t->requestCount,
760                                       sizeof( struct block_request ),
761                                       compareReqByBlock, &exact );
762        assert( !exact );
763        memmove( t->requests + pos + 1,
764                 t->requests + pos,
765                 sizeof( struct block_request ) * ( t->requestCount++ - pos ) );
766        t->requests[pos] = key;
767    }
768
769    if( peer != NULL )
770    {
771        ++peer->pendingReqsToPeer;
772        assert( peer->pendingReqsToPeer >= 0 );
773    }
774
775    /*fprintf( stderr, "added request of block %lu from peer %s... "
776                       "there are now %d block\n",
777                       (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
778}
779
780static struct block_request *
781requestListLookup( Torrent * t, tr_block_index_t block, const tr_peer * peer )
782{
783    struct block_request key;
784    key.block = block;
785    key.peer = (tr_peer*) peer;
786
787    return bsearch( &key, t->requests, t->requestCount,
788                    sizeof( struct block_request ),
789                    compareReqByBlock );
790}
791
792/**
793 * Find the peers are we currently requesting the block
794 * with index @a block from and append them to @a peerArr.
795 */
796static void
797getBlockRequestPeers( Torrent * t, tr_block_index_t block,
798                      tr_ptrArray * peerArr )
799{
800    bool exact;
801    int i, pos;
802    struct block_request key;
803
804    key.block = block;
805    key.peer = NULL;
806    pos = tr_lowerBound( &key, t->requests, t->requestCount,
807                         sizeof( struct block_request ),
808                         compareReqByBlock, &exact );
809
810    assert( !exact ); /* shouldn't have a request with .peer == NULL */
811
812    for( i = pos; i < t->requestCount; ++i )
813    {
814        if( t->requests[i].block != block )
815            break;
816        tr_ptrArrayAppend( peerArr, t->requests[i].peer );
817    }
818}
819
820static void
821decrementPendingReqCount( const struct block_request * b )
822{
823    if( b->peer != NULL )
824        if( b->peer->pendingReqsToPeer > 0 )
825            --b->peer->pendingReqsToPeer;
826}
827
828static void
829requestListRemove( Torrent * t, tr_block_index_t block, const tr_peer * peer )
830{
831    const struct block_request * b = requestListLookup( t, block, peer );
832    if( b != NULL )
833    {
834        const int pos = b - t->requests;
835        assert( pos < t->requestCount );
836
837        decrementPendingReqCount( b );
838
839        tr_removeElementFromArray( t->requests,
840                                   pos,
841                                   sizeof( struct block_request ),
842                                   t->requestCount-- );
843
844        /*fprintf( stderr, "removing request of block %lu from peer %s... "
845                           "there are now %d block requests left\n",
846                           (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
847    }
848}
849
850static int
851countActiveWebseeds( const Torrent * t )
852{
853    int activeCount = 0;
854    const tr_webseed ** w = (const tr_webseed **) tr_ptrArrayBase( &t->webseeds );
855    const tr_webseed ** const wend = w + tr_ptrArraySize( &t->webseeds );
856
857    for( ; w!=wend; ++w )
858        if( tr_webseedIsActive( *w ) )
859            ++activeCount;
860
861    return activeCount;
862}
863
864static bool
865testForEndgame( const Torrent * t )
866{
867    /* we consider ourselves to be in endgame if the number of bytes
868       we've got requested is >= the number of bytes left to download */
869    return ( t->requestCount * t->tor->blockSize )
870               >= tr_cpLeftUntilDone( &t->tor->completion );
871}
872
873static void
874updateEndgame( Torrent * t )
875{
876    assert( t->requestCount >= 0 );
877
878    if( !testForEndgame( t ) )
879    {
880        /* not in endgame */
881        t->endgame = 0;
882    }
883    else if( !t->endgame ) /* only recalculate when endgame first begins */
884    {
885        int numDownloading = 0;
886        const tr_peer ** p = (const tr_peer **) tr_ptrArrayBase( &t->peers );
887        const tr_peer ** const pend = p + tr_ptrArraySize( &t->peers );
888
889        /* add the active bittorrent peers... */
890        for( ; p!=pend; ++p )
891            if( (*p)->pendingReqsToPeer > 0 )
892                ++numDownloading;
893
894        /* add the active webseeds... */
895        numDownloading += countActiveWebseeds( t );
896
897        /* average number of pending requests per downloading peer */
898        t->endgame = t->requestCount / MAX( numDownloading, 1 );
899    }
900}
901
902
903/****
904*****
905*****  Piece List Manipulation / Accessors
906*****
907****/
908
909static inline void
910invalidatePieceSorting( Torrent * t )
911{
912    t->pieceSortState = PIECES_UNSORTED;
913}
914
915static const tr_torrent * weightTorrent;
916
917static const uint16_t * weightReplication;
918
919static void
920setComparePieceByWeightTorrent( Torrent * t )
921{
922    if( !replicationExists( t ) )
923        replicationNew( t );
924
925    weightTorrent = t->tor;
926    weightReplication = t->pieceReplication;
927}
928
929/* we try to create a "weight" s.t. high-priority pieces come before others,
930 * and that partially-complete pieces come before empty ones. */
931static int
932comparePieceByWeight( const void * va, const void * vb )
933{
934    const struct weighted_piece * a = va;
935    const struct weighted_piece * b = vb;
936    int ia, ib, missing, pending;
937    const tr_torrent * tor = weightTorrent;
938    const uint16_t * rep = weightReplication;
939
940    /* primary key: weight */
941    missing = tr_cpMissingBlocksInPiece( &tor->completion, a->index );
942    pending = a->requestCount;
943    ia = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
944    missing = tr_cpMissingBlocksInPiece( &tor->completion, b->index );
945    pending = b->requestCount;
946    ib = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
947    if( ia < ib ) return -1;
948    if( ia > ib ) return 1;
949
950    /* secondary key: higher priorities go first */
951    ia = tor->info.pieces[a->index].priority;
952    ib = tor->info.pieces[b->index].priority;
953    if( ia > ib ) return -1;
954    if( ia < ib ) return 1;
955
956    /* tertiary key: rarest first. */
957    ia = rep[a->index];
958    ib = rep[b->index];
959    if( ia < ib ) return -1;
960    if( ia > ib ) return 1;
961
962    /* quaternary key: random */
963    if( a->salt < b->salt ) return -1;
964    if( a->salt > b->salt ) return 1;
965
966    /* okay, they're equal */
967    return 0;
968}
969
970static int
971comparePieceByIndex( const void * va, const void * vb )
972{
973    const struct weighted_piece * a = va;
974    const struct weighted_piece * b = vb;
975    if( a->index < b->index ) return -1;
976    if( a->index > b->index ) return 1;
977    return 0;
978}
979
980static void
981pieceListSort( Torrent * t, enum piece_sort_state state )
982{
983    assert( state==PIECES_SORTED_BY_INDEX
984         || state==PIECES_SORTED_BY_WEIGHT );
985
986
987    if( state == PIECES_SORTED_BY_WEIGHT )
988    {
989        setComparePieceByWeightTorrent( t );
990        qsort( t->pieces, t->pieceCount, sizeof( struct weighted_piece ), comparePieceByWeight );
991    }
992    else
993        qsort( t->pieces, t->pieceCount, sizeof( struct weighted_piece ), comparePieceByIndex );
994
995    t->pieceSortState = state;
996}
997
998/**
999 * These functions are useful for testing, but too expensive for nightly builds.
1000 * let's leave it disabled but add an easy hook to compile it back in
1001 */
1002#if 1
1003#define assertWeightedPiecesAreSorted(t)
1004#define assertReplicationCountIsExact(t)
1005#else
1006static void
1007assertWeightedPiecesAreSorted( Torrent * t )
1008{
1009    if( !t->endgame )
1010    {
1011        int i;
1012        setComparePieceByWeightTorrent( t );
1013        for( i=0; i<t->pieceCount-1; ++i )
1014            assert( comparePieceByWeight( &t->pieces[i], &t->pieces[i+1] ) <= 0 );
1015    }
1016}
1017static void
1018assertReplicationCountIsExact( Torrent * t )
1019{
1020    /* This assert might fail due to errors of implementations in other
1021     * clients. It happens when receiving duplicate bitfields/HaveAll/HaveNone
1022     * from a client. If a such a behavior is noticed,
1023     * a bug report should be filled to the faulty client. */
1024
1025    size_t piece_i;
1026    const uint16_t * rep = t->pieceReplication;
1027    const size_t piece_count = t->pieceReplicationSize;
1028    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1029    const int peer_count = tr_ptrArraySize( &t->peers );
1030
1031    assert( piece_count == t->tor->info.pieceCount );
1032
1033    for( piece_i=0; piece_i<piece_count; ++piece_i )
1034    {
1035        int peer_i;
1036        uint16_t r = 0;
1037
1038        for( peer_i=0; peer_i<peer_count; ++peer_i )
1039            if( tr_bitsetHas( &peers[peer_i]->have, piece_i ) )
1040                ++r;
1041
1042        assert( rep[piece_i] == r );
1043    }
1044}
1045#endif
1046
1047static struct weighted_piece *
1048pieceListLookup( Torrent * t, tr_piece_index_t index )
1049{
1050    int i;
1051
1052    for( i=0; i<t->pieceCount; ++i )
1053        if( t->pieces[i].index == index )
1054            return &t->pieces[i];
1055
1056    return NULL;
1057}
1058
1059static void
1060pieceListRebuild( Torrent * t )
1061{
1062
1063    if( !tr_torrentIsSeed( t->tor ) )
1064    {
1065        tr_piece_index_t i;
1066        tr_piece_index_t * pool;
1067        tr_piece_index_t poolCount = 0;
1068        const tr_torrent * tor = t->tor;
1069        const tr_info * inf = tr_torrentInfo( tor );
1070        struct weighted_piece * pieces;
1071        int pieceCount;
1072
1073        /* build the new list */
1074        pool = tr_new( tr_piece_index_t, inf->pieceCount );
1075        for( i=0; i<inf->pieceCount; ++i )
1076            if( !inf->pieces[i].dnd )
1077                if( !tr_cpPieceIsComplete( &tor->completion, i ) )
1078                    pool[poolCount++] = i;
1079        pieceCount = poolCount;
1080        pieces = tr_new0( struct weighted_piece, pieceCount );
1081        for( i=0; i<poolCount; ++i ) {
1082            struct weighted_piece * piece = pieces + i;
1083            piece->index = pool[i];
1084            piece->requestCount = 0;
1085            piece->salt = tr_cryptoWeakRandInt( 4096 );
1086        }
1087
1088        /* if we already had a list of pieces, merge it into
1089         * the new list so we don't lose its requestCounts */
1090        if( t->pieces != NULL )
1091        {
1092            struct weighted_piece * o = t->pieces;
1093            struct weighted_piece * oend = o + t->pieceCount;
1094            struct weighted_piece * n = pieces;
1095            struct weighted_piece * nend = n + pieceCount;
1096
1097            pieceListSort( t, PIECES_SORTED_BY_INDEX );
1098
1099            while( o!=oend && n!=nend ) {
1100                if( o->index < n->index )
1101                    ++o;
1102                else if( o->index > n->index )
1103                    ++n;
1104                else
1105                    *n++ = *o++;
1106            }
1107
1108            tr_free( t->pieces );
1109        }
1110
1111        t->pieces = pieces;
1112        t->pieceCount = pieceCount;
1113
1114        pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
1115
1116        /* cleanup */
1117        tr_free( pool );
1118    }
1119}
1120
1121static void
1122pieceListRemovePiece( Torrent * t, tr_piece_index_t piece )
1123{
1124    struct weighted_piece * p;
1125
1126    if(( p = pieceListLookup( t, piece )))
1127    {
1128        const int pos = p - t->pieces;
1129
1130        tr_removeElementFromArray( t->pieces,
1131                                   pos,
1132                                   sizeof( struct weighted_piece ),
1133                                   t->pieceCount-- );
1134
1135        if( t->pieceCount == 0 )
1136        {
1137            tr_free( t->pieces );
1138            t->pieces = NULL;
1139        }
1140    }
1141}
1142
1143static void
1144pieceListResortPiece( Torrent * t, struct weighted_piece * p )
1145{
1146    int pos;
1147    bool isSorted = true;
1148
1149    if( p == NULL )
1150        return;
1151
1152    /* is the torrent already sorted? */
1153    pos = p - t->pieces;
1154    setComparePieceByWeightTorrent( t );
1155    if( isSorted && ( pos > 0 ) && ( comparePieceByWeight( p-1, p ) > 0 ) )
1156        isSorted = false;
1157    if( isSorted && ( pos < t->pieceCount - 1 ) && ( comparePieceByWeight( p, p+1 ) > 0 ) )
1158        isSorted = false;
1159
1160    if( t->pieceSortState != PIECES_SORTED_BY_WEIGHT )
1161    {
1162       pieceListSort( t, PIECES_SORTED_BY_WEIGHT);
1163       isSorted = true;
1164    }
1165
1166    /* if it's not sorted, move it around */
1167    if( !isSorted )
1168    {
1169        bool exact;
1170        const struct weighted_piece tmp = *p;
1171
1172        tr_removeElementFromArray( t->pieces,
1173                                   pos,
1174                                   sizeof( struct weighted_piece ),
1175                                   t->pieceCount-- );
1176
1177        pos = tr_lowerBound( &tmp, t->pieces, t->pieceCount,
1178                             sizeof( struct weighted_piece ),
1179                             comparePieceByWeight, &exact );
1180
1181        memmove( &t->pieces[pos + 1],
1182                 &t->pieces[pos],
1183                 sizeof( struct weighted_piece ) * ( t->pieceCount++ - pos ) );
1184
1185        t->pieces[pos] = tmp;
1186    }
1187
1188    assertWeightedPiecesAreSorted( t );
1189}
1190
1191static void
1192pieceListRemoveRequest( Torrent * t, tr_block_index_t block )
1193{
1194    struct weighted_piece * p;
1195    const tr_piece_index_t index = tr_torBlockPiece( t->tor, block );
1196
1197    if( ((p = pieceListLookup( t, index ))) && ( p->requestCount > 0 ) )
1198    {
1199        --p->requestCount;
1200        pieceListResortPiece( t, p );
1201    }
1202}
1203
1204
1205/****
1206*****
1207*****  Replication count ( for rarest first policy )
1208*****
1209****/
1210
1211/**
1212 * Increase the replication count of this piece and sort it if the
1213 * piece list is already sorted
1214 */
1215static void
1216tr_incrReplicationOfPiece( Torrent * t, const size_t index )
1217{
1218    assert( replicationExists( t ) );
1219    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1220
1221    /* One more replication of this piece is present in the swarm */
1222    ++t->pieceReplication[index];
1223
1224    /* we only resort the piece if the list is already sorted */
1225    if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1226        pieceListResortPiece( t, pieceListLookup( t, index ) );
1227}
1228
1229/**
1230 * Increases the replication count of pieces present in the bitfield
1231 */
1232static void
1233tr_incrReplicationFromBitfield( Torrent * t, const tr_bitfield * b )
1234{
1235    size_t i;
1236    uint16_t * rep = t->pieceReplication;
1237    const size_t n = t->tor->info.pieceCount;
1238
1239    assert( replicationExists( t ) );
1240
1241    for( i=0; i<n; ++i )
1242        if( tr_bitfieldHas( b, i ) )
1243            ++rep[i];
1244
1245    if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1246        invalidatePieceSorting( t );
1247}
1248
1249/**
1250 * Increase the replication count of every piece
1251 */
1252static void
1253tr_incrReplication( Torrent * t )
1254{
1255    int i;
1256    const int n = t->pieceReplicationSize;
1257
1258    assert( replicationExists( t ) );
1259    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1260
1261    for( i=0; i<n; ++i )
1262        ++t->pieceReplication[i];
1263}
1264
1265/**
1266 * Decrease the replication count of pieces present in the bitset.
1267 */
1268static void
1269tr_decrReplicationFromBitfield( Torrent * t, const tr_bitfield * b )
1270{
1271    int i;
1272    const int n = t->pieceReplicationSize;
1273
1274    assert( replicationExists( t ) );
1275    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1276
1277    if( tr_bitfieldHasAll( b ) )
1278    {
1279        for( i=0; i<n; ++i )
1280            --t->pieceReplication[i];
1281    }
1282    else if ( !tr_bitfieldHasNone( b ) )
1283    {
1284        for( i=0; i<n; ++i )
1285            if( tr_bitfieldHas( b, i ) )
1286                --t->pieceReplication[i];
1287
1288        if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1289            invalidatePieceSorting( t );
1290    }
1291}
1292
1293/**
1294***
1295**/
1296
1297void
1298tr_peerMgrRebuildRequests( tr_torrent * tor )
1299{
1300    assert( tr_isTorrent( tor ) );
1301
1302    pieceListRebuild( tor->torrentPeers );
1303}
1304
1305void
1306tr_peerMgrGetNextRequests( tr_torrent           * tor,
1307                           tr_peer              * peer,
1308                           int                    numwant,
1309                           tr_block_index_t     * setme,
1310                           int                  * numgot,
1311                           bool                   get_intervals )
1312{
1313    int i;
1314    int got;
1315    Torrent * t;
1316    struct weighted_piece * pieces;
1317    const tr_bitfield * const have = &peer->have;
1318
1319    /* sanity clause */
1320    assert( tr_isTorrent( tor ) );
1321    assert( peer->clientIsInterested );
1322    assert( !peer->clientIsChoked );
1323    assert( numwant > 0 );
1324
1325    /* walk through the pieces and find blocks that should be requested */
1326    got = 0;
1327    t = tor->torrentPeers;
1328
1329    /* prep the pieces list */
1330    if( t->pieces == NULL )
1331        pieceListRebuild( t );
1332
1333    if( t->pieceSortState != PIECES_SORTED_BY_WEIGHT )
1334        pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
1335
1336    assertReplicationCountIsExact( t );
1337    assertWeightedPiecesAreSorted( t );
1338
1339    updateEndgame( t );
1340    pieces = t->pieces;
1341    for( i=0; i<t->pieceCount && got<numwant; ++i )
1342    {
1343        struct weighted_piece * p = pieces + i;
1344
1345        /* if the peer has this piece that we want... */
1346        if( tr_bitfieldHas( have, p->index ) )
1347        {
1348            tr_block_index_t b;
1349            tr_block_index_t first;
1350            tr_block_index_t last;
1351            tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1352
1353            tr_torGetPieceBlockRange( tor, p->index, &first, &last );
1354
1355            for( b=first; b<=last && (got<numwant || (get_intervals && setme[2*got-1] == b-1)); ++b )
1356            {
1357                int peerCount;
1358                tr_peer ** peers;
1359
1360                /* don't request blocks we've already got */
1361                if( tr_cpBlockIsComplete( &tor->completion, b ) )
1362                    continue;
1363
1364                /* always add peer if this block has no peers yet */
1365                tr_ptrArrayClear( &peerArr );
1366                getBlockRequestPeers( t, b, &peerArr );
1367                peers = (tr_peer **) tr_ptrArrayPeek( &peerArr, &peerCount );
1368                if( peerCount != 0 )
1369                {
1370                    /* don't make a second block request until the endgame */
1371                    if( !t->endgame )
1372                        continue;
1373
1374                    /* don't have more than two peers requesting this block */
1375                    if( peerCount > 1 )
1376                        continue;
1377
1378                    /* don't send the same request to the same peer twice */
1379                    if( peer == peers[0] )
1380                        continue;
1381
1382                    /* in the endgame allow an additional peer to download a
1383                       block but only if the peer seems to be handling requests
1384                       relatively fast */
1385                    if( peer->pendingReqsToPeer + numwant - got < t->endgame )
1386                        continue;
1387                }
1388
1389                /* update the caller's table */
1390                if( !get_intervals ) {
1391                    setme[got++] = b;
1392                }
1393                /* if intervals are requested two array entries are necessarry:
1394                   one for the interval's starting block and one for its end block */
1395                else if( got && setme[2 * got - 1] == b - 1 && b != first ) {
1396                    /* expand the last interval */
1397                    ++setme[2 * got - 1];
1398                }
1399                else {
1400                    /* begin a new interval */
1401                    setme[2 * got] = setme[2 * got + 1] = b;
1402                    ++got;
1403                }
1404
1405                /* update our own tables */
1406                requestListAdd( t, b, peer );
1407                ++p->requestCount;
1408            }
1409
1410            tr_ptrArrayDestruct( &peerArr, NULL );
1411        }
1412    }
1413
1414    /* In most cases we've just changed the weights of a small number of pieces.
1415     * So rather than qsort()ing the entire array, it's faster to apply an
1416     * adaptive insertion sort algorithm. */
1417    if( got > 0 )
1418    {
1419        /* not enough requests || last piece modified */
1420        if ( i == t->pieceCount ) --i;
1421
1422        setComparePieceByWeightTorrent( t );
1423        while( --i >= 0 )
1424        {
1425            bool exact;
1426
1427            /* relative position! */
1428            const int newpos = tr_lowerBound( &t->pieces[i], &t->pieces[i + 1],
1429                                              t->pieceCount - (i + 1),
1430                                              sizeof( struct weighted_piece ),
1431                                              comparePieceByWeight, &exact );
1432            if( newpos > 0 )
1433            {
1434                const struct weighted_piece piece = t->pieces[i];
1435                memmove( &t->pieces[i],
1436                         &t->pieces[i + 1],
1437                         sizeof( struct weighted_piece ) * ( newpos ) );
1438                t->pieces[i + newpos] = piece;
1439            }
1440        }
1441    }
1442
1443    assertWeightedPiecesAreSorted( t );
1444    *numgot = got;
1445}
1446
1447bool
1448tr_peerMgrDidPeerRequest( const tr_torrent  * tor,
1449                          const tr_peer     * peer,
1450                          tr_block_index_t    block )
1451{
1452    const Torrent * t = tor->torrentPeers;
1453    return requestListLookup( (Torrent*)t, block, peer ) != NULL;
1454}
1455
1456/* cancel requests that are too old */
1457static void
1458refillUpkeep( int foo UNUSED, short bar UNUSED, void * vmgr )
1459{
1460    time_t now;
1461    time_t too_old;
1462    tr_torrent * tor;
1463    int cancel_buflen = 0;
1464    struct block_request * cancel = NULL;
1465    tr_peerMgr * mgr = vmgr;
1466    managerLock( mgr );
1467
1468    now = tr_time( );
1469    too_old = now - REQUEST_TTL_SECS;
1470
1471    /* alloc the temporary "cancel" buffer */
1472    tor = NULL;
1473    while(( tor = tr_torrentNext( mgr->session, tor )))
1474        cancel_buflen = MAX( cancel_buflen, tor->torrentPeers->requestCount );
1475    if( cancel_buflen > 0 )
1476        cancel = tr_new( struct block_request, cancel_buflen );
1477
1478    /* prune requests that are too old */
1479    tor = NULL;
1480    while(( tor = tr_torrentNext( mgr->session, tor )))
1481    {
1482        Torrent * t = tor->torrentPeers;
1483        const int n = t->requestCount;
1484        if( n > 0 )
1485        {
1486            int keepCount = 0;
1487            int cancelCount = 0;
1488            const struct block_request * it;
1489            const struct block_request * end;
1490
1491            for( it=t->requests, end=it+n; it!=end; ++it )
1492            {
1493                if( ( it->sentAt <= too_old ) && it->peer->msgs && !tr_peerMsgsIsReadingBlock( it->peer->msgs, it->block ) )
1494                    cancel[cancelCount++] = *it;
1495                else
1496                {
1497                    if( it != &t->requests[keepCount] )
1498                        t->requests[keepCount] = *it;
1499                    keepCount++;
1500                }
1501            }
1502
1503            /* prune out the ones we aren't keeping */
1504            t->requestCount = keepCount;
1505
1506            /* send cancel messages for all the "cancel" ones */
1507            for( it=cancel, end=it+cancelCount; it!=end; ++it ) {
1508                if( ( it->peer != NULL ) && ( it->peer->msgs != NULL ) ) {
1509                    tr_historyAdd( &it->peer->cancelsSentToPeer, now, 1 );
1510                    tr_peerMsgsCancel( it->peer->msgs, it->block );
1511                    decrementPendingReqCount( it );
1512                }
1513            }
1514
1515            /* decrement the pending request counts for the timed-out blocks */
1516            for( it=cancel, end=it+cancelCount; it!=end; ++it )
1517                pieceListRemoveRequest( t, it->block );
1518        }
1519    }
1520
1521    tr_free( cancel );
1522    tr_timerAddMsec( mgr->refillUpkeepTimer, REFILL_UPKEEP_PERIOD_MSEC );
1523    managerUnlock( mgr );
1524}
1525
1526static void
1527addStrike( Torrent * t, tr_peer * peer )
1528{
1529    tordbg( t, "increasing peer %s strike count to %d",
1530            tr_atomAddrStr( peer->atom ), peer->strikes + 1 );
1531
1532    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
1533    {
1534        struct peer_atom * atom = peer->atom;
1535        atom->flags2 |= MYFLAG_BANNED;
1536        peer->doPurge = 1;
1537        tordbg( t, "banning peer %s", tr_atomAddrStr( atom ) );
1538    }
1539}
1540
1541static void
1542gotBadPiece( Torrent * t, tr_piece_index_t pieceIndex )
1543{
1544    tr_torrent *   tor = t->tor;
1545    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
1546
1547    tor->corruptCur += byteCount;
1548    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1549
1550    tr_announcerAddBytes( tor, TR_ANN_CORRUPT, byteCount );
1551}
1552
1553static void
1554peerSuggestedPiece( Torrent            * t UNUSED,
1555                    tr_peer            * peer UNUSED,
1556                    tr_piece_index_t     pieceIndex UNUSED,
1557                    int                  isFastAllowed UNUSED )
1558{
1559#if 0
1560    assert( t );
1561    assert( peer );
1562    assert( peer->msgs );
1563
1564    /* is this a valid piece? */
1565    if(  pieceIndex >= t->tor->info.pieceCount )
1566        return;
1567
1568    /* don't ask for it if we've already got it */
1569    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
1570        return;
1571
1572    /* don't ask for it if they don't have it */
1573    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
1574        return;
1575
1576    /* don't ask for it if we're choked and it's not fast */
1577    if( !isFastAllowed && peer->clientIsChoked )
1578        return;
1579
1580    /* request the blocks that we don't have in this piece */
1581    {
1582        tr_block_index_t b;
1583        tr_block_index_t first;
1584        tr_block_index_t last;
1585        const tr_torrent * tor = t->tor;
1586
1587        tr_torGetPieceBlockRange( t->tor, pieceIndex, &first, &last );
1588
1589        for( b=first; b<=last; ++b )
1590        {
1591            if( !tr_cpBlockIsComplete( tor->completion, b ) )
1592            {
1593                const uint32_t offset = getBlockOffsetInPiece( tor, b );
1594                const uint32_t length = tr_torBlockCountBytes( tor, b );
1595                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
1596                incrementPieceRequests( t, pieceIndex );
1597            }
1598        }
1599    }
1600#endif
1601}
1602
1603static void
1604removeRequestFromTables( Torrent * t, tr_block_index_t block, const tr_peer * peer )
1605{
1606    requestListRemove( t, block, peer );
1607    pieceListRemoveRequest( t, block );
1608}
1609
1610/* peer choked us, or maybe it disconnected.
1611   either way we need to remove all its requests */
1612static void
1613peerDeclinedAllRequests( Torrent * t, const tr_peer * peer )
1614{
1615    int i, n;
1616    tr_block_index_t * blocks = tr_new( tr_block_index_t, t->requestCount );
1617
1618    for( i=n=0; i<t->requestCount; ++i )
1619        if( peer == t->requests[i].peer )
1620            blocks[n++] = t->requests[i].block;
1621
1622    for( i=0; i<n; ++i )
1623        removeRequestFromTables( t, blocks[i], peer );
1624
1625    tr_free( blocks );
1626}
1627
1628static void tr_peerMgrSetBlame( tr_torrent *, tr_piece_index_t, int );
1629
1630static void
1631peerCallbackFunc( tr_peer * peer, const tr_peer_event * e, void * vt )
1632{
1633    Torrent * t = vt;
1634
1635    torrentLock( t );
1636
1637    assert( peer != NULL );
1638
1639    switch( e->eventType )
1640    {
1641        case TR_PEER_PEER_GOT_DATA:
1642        {
1643            const time_t now = tr_time( );
1644            tr_torrent * tor = t->tor;
1645
1646            if( e->wasPieceData )
1647            {
1648                tor->uploadedCur += e->length;
1649                tr_announcerAddBytes( tor, TR_ANN_UP, e->length );
1650                tr_torrentSetActivityDate( tor, now );
1651                tr_torrentSetDirty( tor );
1652            }
1653
1654            /* update the stats */
1655            if( e->wasPieceData )
1656                tr_statsAddUploaded( tor->session, e->length );
1657
1658            /* update our atom */
1659            if( peer->atom && e->wasPieceData )
1660                peer->atom->piece_data_time = now;
1661
1662            break;
1663        }
1664
1665        case TR_PEER_CLIENT_GOT_HAVE:
1666            if( replicationExists( t ) ) {
1667                tr_incrReplicationOfPiece( t, e->pieceIndex );
1668                assertReplicationCountIsExact( t );
1669            }
1670            break;
1671
1672        case TR_PEER_CLIENT_GOT_HAVE_ALL:
1673            if( replicationExists( t ) ) {
1674                tr_incrReplication( t );
1675                assertReplicationCountIsExact( t );
1676            }
1677            break;
1678
1679        case TR_PEER_CLIENT_GOT_HAVE_NONE:
1680            /* noop */
1681            break;
1682
1683        case TR_PEER_CLIENT_GOT_BITFIELD:
1684            assert( e->bitfield != NULL );
1685            if( replicationExists( t ) ) {
1686                tr_incrReplicationFromBitfield( t, e->bitfield );
1687                assertReplicationCountIsExact( t );
1688            }
1689            break;
1690
1691        case TR_PEER_CLIENT_GOT_REJ: {
1692            tr_block_index_t b = _tr_block( t->tor, e->pieceIndex, e->offset );
1693            if( b < t->tor->blockCount )
1694                removeRequestFromTables( t, b, peer );
1695            else
1696                tordbg( t, "Peer %s sent an out-of-range reject message",
1697                           tr_atomAddrStr( peer->atom ) );
1698            break;
1699        }
1700
1701        case TR_PEER_CLIENT_GOT_CHOKE:
1702            peerDeclinedAllRequests( t, peer );
1703            break;
1704
1705        case TR_PEER_CLIENT_GOT_PORT:
1706            if( peer->atom )
1707                peer->atom->port = e->port;
1708            break;
1709
1710        case TR_PEER_CLIENT_GOT_SUGGEST:
1711            peerSuggestedPiece( t, peer, e->pieceIndex, false );
1712            break;
1713
1714        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1715            peerSuggestedPiece( t, peer, e->pieceIndex, true );
1716            break;
1717
1718        case TR_PEER_CLIENT_GOT_DATA:
1719        {
1720            const time_t now = tr_time( );
1721            tr_torrent * tor = t->tor;
1722
1723            if( e->wasPieceData )
1724            {
1725                tor->downloadedCur += e->length;
1726                tr_torrentSetActivityDate( tor, now );
1727                tr_torrentSetDirty( tor );
1728            }
1729
1730            /* update the stats */
1731            if( e->wasPieceData )
1732                tr_statsAddDownloaded( tor->session, e->length );
1733
1734            /* update our atom */
1735            if( peer->atom && e->wasPieceData )
1736                peer->atom->piece_data_time = now;
1737
1738            break;
1739        }
1740
1741        case TR_PEER_CLIENT_GOT_BLOCK:
1742        {
1743            tr_torrent * tor = t->tor;
1744            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1745            int i, peerCount;
1746            tr_peer ** peers;
1747            tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1748
1749            removeRequestFromTables( t, block, peer );
1750            getBlockRequestPeers( t, block, &peerArr );
1751            peers = (tr_peer **) tr_ptrArrayPeek( &peerArr, &peerCount );
1752
1753            /* remove additional block requests and send cancel to peers */
1754            for( i=0; i<peerCount; i++ ) {
1755                tr_peer * p = peers[i];
1756                assert( p != peer );
1757                if( p->msgs ) {
1758                    tr_historyAdd( &p->cancelsSentToPeer, tr_time( ), 1 );
1759                    tr_peerMsgsCancel( p->msgs, block );
1760                }
1761                removeRequestFromTables( t, block, p );
1762            }
1763
1764            tr_ptrArrayDestruct( &peerArr, false );
1765
1766            tr_historyAdd( &peer->blocksSentToClient, tr_time( ), 1 );
1767
1768            if( tr_cpBlockIsComplete( &tor->completion, block ) )
1769            {
1770                /* we already have this block... */
1771                const uint32_t n = tr_torBlockCountBytes( tor, block );
1772                tor->downloadedCur -= MIN( tor->downloadedCur, n );
1773                tordbg( t, "we have this block already..." );
1774            }
1775            else
1776            {
1777                tr_cpBlockAdd( &tor->completion, block );
1778                pieceListResortPiece( t, pieceListLookup( t, e->pieceIndex ) );
1779                tr_torrentSetDirty( tor );
1780
1781                if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1782                {
1783                    const tr_piece_index_t p = e->pieceIndex;
1784                    const bool ok = tr_torrentCheckPiece( tor, p );
1785
1786                    tordbg( t, "[LAZY] checked just-completed piece %zu", (size_t)p );
1787
1788                    if( !ok )
1789                    {
1790                        tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1791                                   (unsigned long)p );
1792                    }
1793
1794                    tr_peerMgrSetBlame( tor, p, ok );
1795
1796                    if( !ok )
1797                    {
1798                        gotBadPiece( t, p );
1799                    }
1800                    else
1801                    {
1802                        int i;
1803                        int peerCount;
1804                        tr_peer ** peers;
1805                        tr_file_index_t fileIndex;
1806
1807                        /* only add this to downloadedCur if we got it from a peer --
1808                         * webseeds shouldn't count against our ratio. As one tracker
1809                         * admin put it, "Those pieces are downloaded directly from the
1810                         * content distributor, not the peers, it is the tracker's job
1811                         * to manage the swarms, not the web server and does not fit
1812                         * into the jurisdiction of the tracker." */
1813                        if( peer->msgs != NULL ) {
1814                            const uint32_t n = tr_torPieceCountBytes( tor, p );
1815                            tr_announcerAddBytes( tor, TR_ANN_DOWN, n );
1816                        }
1817
1818                        peerCount = tr_ptrArraySize( &t->peers );
1819                        peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1820                        for( i=0; i<peerCount; ++i )
1821                            tr_peerMsgsHave( peers[i]->msgs, p );
1822
1823                        for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex ) {
1824                            const tr_file * file = &tor->info.files[fileIndex];
1825                            if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) ) {
1826                                if( tr_cpFileIsComplete( &tor->completion, fileIndex ) ) {
1827                                    tr_cacheFlushFile( tor->session->cache, tor, fileIndex );
1828                                    tr_torrentFileCompleted( tor, fileIndex );
1829                                }
1830                            }
1831                        }
1832
1833                        pieceListRemovePiece( t, p );
1834                    }
1835                }
1836
1837                t->needsCompletenessCheck = true;
1838            }
1839            break;
1840        }
1841
1842        case TR_PEER_ERROR:
1843            if( ( e->err == ERANGE ) || ( e->err == EMSGSIZE ) || ( e->err == ENOTCONN ) )
1844            {
1845                /* some protocol error from the peer */
1846                peer->doPurge = 1;
1847                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1848                        tr_atomAddrStr( peer->atom ) );
1849            }
1850            else
1851            {
1852                tordbg( t, "unhandled error: %s", tr_strerror( e->err ) );
1853            }
1854            break;
1855
1856        default:
1857            assert( 0 );
1858    }
1859
1860    torrentUnlock( t );
1861}
1862
1863static int
1864getDefaultShelfLife( uint8_t from )
1865{
1866    /* in general, peers obtained from firsthand contact
1867     * are better than those from secondhand, etc etc */
1868    switch( from )
1869    {
1870        case TR_PEER_FROM_INCOMING : return 60 * 60 * 6;
1871        case TR_PEER_FROM_LTEP     : return 60 * 60 * 6;
1872        case TR_PEER_FROM_TRACKER  : return 60 * 60 * 3;
1873        case TR_PEER_FROM_DHT      : return 60 * 60 * 3;
1874        case TR_PEER_FROM_PEX      : return 60 * 60 * 2;
1875        case TR_PEER_FROM_RESUME   : return 60 * 60;
1876        case TR_PEER_FROM_LPD      : return 10 * 60;
1877        default                    : return 60 * 60;
1878    }
1879}
1880
1881static void
1882ensureAtomExists( Torrent           * t,
1883                  const tr_address  * addr,
1884                  const tr_port       port,
1885                  const uint8_t       flags,
1886                  const int8_t        seedProbability,
1887                  const uint8_t       from )
1888{
1889    struct peer_atom * a;
1890
1891    assert( tr_address_is_valid( addr ) );
1892    assert( from < TR_PEER_FROM__MAX );
1893
1894    a = getExistingAtom( t, addr );
1895
1896    if( a == NULL )
1897    {
1898        const int jitter = tr_cryptoWeakRandInt( 60*10 );
1899        a = tr_new0( struct peer_atom, 1 );
1900        a->addr = *addr;
1901        a->port = port;
1902        a->flags = flags;
1903        a->fromFirst = from;
1904        a->fromBest = from;
1905        a->shelf_date = tr_time( ) + getDefaultShelfLife( from ) + jitter;
1906        a->blocklisted = -1;
1907        atomSetSeedProbability( a, seedProbability );
1908        tr_ptrArrayInsertSorted( &t->pool, a, compareAtomsByAddress );
1909
1910        tordbg( t, "got a new atom: %s", tr_atomAddrStr( a ) );
1911    }
1912    else
1913    {
1914        if( from < a->fromBest )
1915            a->fromBest = from;
1916
1917        if( a->seedProbability == -1 )
1918            atomSetSeedProbability( a, seedProbability );
1919
1920        a->flags |= flags;
1921    }
1922}
1923
1924static int
1925getMaxPeerCount( const tr_torrent * tor )
1926{
1927    return tor->maxConnectedPeers;
1928}
1929
1930static int
1931getPeerCount( const Torrent * t )
1932{
1933    return tr_ptrArraySize( &t->peers );/* + tr_ptrArraySize( &t->outgoingHandshakes ); */
1934}
1935
1936/* FIXME: this is kind of a mess. */
1937static bool
1938myHandshakeDoneCB( tr_handshake  * handshake,
1939                   tr_peerIo     * io,
1940                   bool            readAnythingFromPeer,
1941                   bool            isConnected,
1942                   const uint8_t * peer_id,
1943                   void          * vmanager )
1944{
1945    bool               ok = isConnected;
1946    bool               success = false;
1947    tr_port            port;
1948    const tr_address * addr;
1949    tr_peerMgr       * manager = vmanager;
1950    Torrent          * t;
1951    tr_handshake     * ours;
1952
1953    assert( io );
1954    assert( tr_isBool( ok ) );
1955
1956    t = tr_peerIoHasTorrentHash( io )
1957        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1958        : NULL;
1959
1960    if( tr_peerIoIsIncoming ( io ) )
1961        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1962                                        handshake, handshakeCompare );
1963    else if( t )
1964        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1965                                        handshake, handshakeCompare );
1966    else
1967        ours = handshake;
1968
1969    assert( ours );
1970    assert( ours == handshake );
1971
1972    if( t )
1973        torrentLock( t );
1974
1975    addr = tr_peerIoGetAddress( io, &port );
1976
1977    if( !ok || !t || !t->isRunning )
1978    {
1979        if( t )
1980        {
1981            struct peer_atom * atom = getExistingAtom( t, addr );
1982            if( atom )
1983            {
1984                ++atom->numFails;
1985
1986                if( !readAnythingFromPeer )
1987                {
1988                    tordbg( t, "marking peer %s as unreachable... numFails is %d", tr_atomAddrStr( atom ), (int)atom->numFails );
1989                    atom->flags2 |= MYFLAG_UNREACHABLE;
1990                }
1991            }
1992        }
1993    }
1994    else /* looking good */
1995    {
1996        struct peer_atom * atom;
1997
1998        ensureAtomExists( t, addr, port, 0, -1, TR_PEER_FROM_INCOMING );
1999        atom = getExistingAtom( t, addr );
2000        atom->time = tr_time( );
2001        atom->piece_data_time = 0;
2002        atom->lastConnectionAt = tr_time( );
2003
2004        if( !tr_peerIoIsIncoming( io ) )
2005        {
2006            atom->flags |= ADDED_F_CONNECTABLE;
2007            atom->flags2 &= ~MYFLAG_UNREACHABLE;
2008        }
2009
2010        /* In principle, this flag specifies whether the peer groks uTP,
2011           not whether it's currently connected over uTP. */
2012        if( io->utp_socket )
2013            atom->flags |= ADDED_F_UTP_FLAGS;
2014
2015        if( atom->flags2 & MYFLAG_BANNED )
2016        {
2017            tordbg( t, "banned peer %s tried to reconnect",
2018                    tr_atomAddrStr( atom ) );
2019        }
2020        else if( tr_peerIoIsIncoming( io )
2021               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
2022
2023        {
2024        }
2025        else
2026        {
2027            tr_peer * peer = atom->peer;
2028
2029            if( peer ) /* we already have this peer */
2030            {
2031            }
2032            else
2033            {
2034                peer = getPeer( t, atom );
2035                tr_free( peer->client );
2036
2037                if( !peer_id )
2038                    peer->client = NULL;
2039                else {
2040                    char client[128];
2041                    tr_clientForId( client, sizeof( client ), peer_id );
2042                    peer->client = tr_strdup( client );
2043                }
2044
2045                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
2046                                                                balanced by our unref in peerDelete()  */
2047                tr_peerIoSetParent( peer->io, &t->tor->bandwidth );
2048                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t );
2049
2050                success = true;
2051            }
2052        }
2053    }
2054
2055    if( t )
2056        torrentUnlock( t );
2057
2058    return success;
2059}
2060
2061void
2062tr_peerMgrAddIncoming( tr_peerMgr * manager,
2063                       tr_address * addr,
2064                       tr_port      port,
2065                       int          socket,
2066                       struct UTPSocket * utp_socket )
2067{
2068    tr_session * session;
2069
2070    managerLock( manager );
2071
2072    assert( tr_isSession( manager->session ) );
2073    session = manager->session;
2074
2075    if( tr_sessionIsAddressBlocked( session, addr ) )
2076    {
2077        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_address_to_string( addr ) );
2078        if(socket >= 0)
2079            tr_netClose( session, socket );
2080        else
2081            UTP_Close( utp_socket );
2082    }
2083    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
2084    {
2085        if(socket >= 0)
2086            tr_netClose( session, socket );
2087        else
2088            UTP_Close( utp_socket );
2089    }
2090    else /* we don't have a connection to them yet... */
2091    {
2092        tr_peerIo *    io;
2093        tr_handshake * handshake;
2094
2095        io = tr_peerIoNewIncoming( session, &session->bandwidth, addr, port, socket, utp_socket );
2096
2097        handshake = tr_handshakeNew( io,
2098                                     session->encryptionMode,
2099                                     myHandshakeDoneCB,
2100                                     manager );
2101
2102        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
2103
2104        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
2105                                 handshakeCompare );
2106    }
2107
2108    managerUnlock( manager );
2109}
2110
2111void
2112tr_peerMgrAddPex( tr_torrent * tor, uint8_t from,
2113                  const tr_pex * pex, int8_t seedProbability )
2114{
2115    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
2116    {
2117        Torrent * t = tor->torrentPeers;
2118        managerLock( t->manager );
2119
2120        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
2121            if( tr_address_is_valid_for_peers( &pex->addr, pex->port ) )
2122                ensureAtomExists( t, &pex->addr, pex->port, pex->flags, seedProbability, from );
2123
2124        managerUnlock( t->manager );
2125    }
2126}
2127
2128void
2129tr_peerMgrMarkAllAsSeeds( tr_torrent * tor )
2130{
2131    Torrent * t = tor->torrentPeers;
2132    const int n = tr_ptrArraySize( &t->pool );
2133    struct peer_atom ** it = (struct peer_atom**) tr_ptrArrayBase( &t->pool );
2134    struct peer_atom ** end = it + n;
2135
2136    while( it != end )
2137        atomSetSeed( t, *it++ );
2138}
2139
2140tr_pex *
2141tr_peerMgrCompactToPex( const void *    compact,
2142                        size_t          compactLen,
2143                        const uint8_t * added_f,
2144                        size_t          added_f_len,
2145                        size_t *        pexCount )
2146{
2147    size_t          i;
2148    size_t          n = compactLen / 6;
2149    const uint8_t * walk = compact;
2150    tr_pex *        pex = tr_new0( tr_pex, n );
2151
2152    for( i = 0; i < n; ++i )
2153    {
2154        pex[i].addr.type = TR_AF_INET;
2155        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
2156        memcpy( &pex[i].port, walk, 2 ); walk += 2;
2157        if( added_f && ( n == added_f_len ) )
2158            pex[i].flags = added_f[i];
2159    }
2160
2161    *pexCount = n;
2162    return pex;
2163}
2164
2165tr_pex *
2166tr_peerMgrCompact6ToPex( const void    * compact,
2167                         size_t          compactLen,
2168                         const uint8_t * added_f,
2169                         size_t          added_f_len,
2170                         size_t        * pexCount )
2171{
2172    size_t          i;
2173    size_t          n = compactLen / 18;
2174    const uint8_t * walk = compact;
2175    tr_pex *        pex = tr_new0( tr_pex, n );
2176
2177    for( i = 0; i < n; ++i )
2178    {
2179        pex[i].addr.type = TR_AF_INET6;
2180        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
2181        memcpy( &pex[i].port, walk, 2 ); walk += 2;
2182        if( added_f && ( n == added_f_len ) )
2183            pex[i].flags = added_f[i];
2184    }
2185
2186    *pexCount = n;
2187    return pex;
2188}
2189
2190tr_pex *
2191tr_peerMgrArrayToPex( const void * array,
2192                      size_t       arrayLen,
2193                      size_t      * pexCount )
2194{
2195    size_t          i;
2196    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
2197    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
2198    const uint8_t * walk = array;
2199    tr_pex        * pex = tr_new0( tr_pex, n );
2200
2201    for( i = 0 ; i < n ; i++ ) {
2202        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
2203        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
2204        pex[i].flags = 0x00;
2205        walk += sizeof( tr_address ) + 2;
2206    }
2207
2208    *pexCount = n;
2209    return pex;
2210}
2211
2212/**
2213***
2214**/
2215
2216static void
2217tr_peerMgrSetBlame( tr_torrent     * tor,
2218                    tr_piece_index_t pieceIndex,
2219                    int              success )
2220{
2221    if( !success )
2222    {
2223        int        peerCount, i;
2224        Torrent *  t = tor->torrentPeers;
2225        tr_peer ** peers;
2226
2227        assert( torrentIsLocked( t ) );
2228
2229        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
2230        for( i = 0; i < peerCount; ++i )
2231        {
2232            tr_peer * peer = peers[i];
2233            if( tr_bitfieldHas( &peer->blame, pieceIndex ) )
2234            {
2235                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
2236                        tr_atomAddrStr( peer->atom ),
2237                        pieceIndex, (int)peer->strikes + 1 );
2238                addStrike( t, peer );
2239            }
2240        }
2241    }
2242}
2243
2244int
2245tr_pexCompare( const void * va, const void * vb )
2246{
2247    const tr_pex * a = va;
2248    const tr_pex * b = vb;
2249    int i;
2250
2251    assert( tr_isPex( a ) );
2252    assert( tr_isPex( b ) );
2253
2254    if(( i = tr_address_compare( &a->addr, &b->addr )))
2255        return i;
2256
2257    if( a->port != b->port )
2258        return a->port < b->port ? -1 : 1;
2259
2260    return 0;
2261}
2262
2263#if 0
2264static int
2265peerPrefersCrypto( const tr_peer * peer )
2266{
2267    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
2268        return true;
2269
2270    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
2271        return false;
2272
2273    return tr_peerIoIsEncrypted( peer->io );
2274}
2275#endif
2276
2277/* better goes first */
2278static int
2279compareAtomsByUsefulness( const void * va, const void *vb )
2280{
2281    const struct peer_atom * a = * (const struct peer_atom**) va;
2282    const struct peer_atom * b = * (const struct peer_atom**) vb;
2283
2284    assert( tr_isAtom( a ) );
2285    assert( tr_isAtom( b ) );
2286
2287    if( a->piece_data_time != b->piece_data_time )
2288        return a->piece_data_time > b->piece_data_time ? -1 : 1;
2289    if( a->fromBest != b->fromBest )
2290        return a->fromBest < b->fromBest ? -1 : 1;
2291    if( a->numFails != b->numFails )
2292        return a->numFails < b->numFails ? -1 : 1;
2293
2294    return 0;
2295}
2296
2297static bool
2298isAtomInteresting( const tr_torrent * tor, struct peer_atom * atom )
2299{
2300    if( tr_torrentIsSeed( tor ) && atomIsSeed( atom ) )
2301        return false;
2302
2303    if( peerIsInUse( tor->torrentPeers, atom ) )
2304        return true;
2305
2306    if( isAtomBlocklisted( tor->session, atom ) )
2307        return false;
2308
2309    if( atom->flags2 & MYFLAG_BANNED )
2310        return false;
2311
2312    return true;
2313}
2314
2315int
2316tr_peerMgrGetPeers( tr_torrent   * tor,
2317                    tr_pex      ** setme_pex,
2318                    uint8_t        af,
2319                    uint8_t        list_mode,
2320                    int            maxCount )
2321{
2322    int i;
2323    int n;
2324    int count = 0;
2325    int atomCount = 0;
2326    const Torrent * t = tor->torrentPeers;
2327    struct peer_atom ** atoms = NULL;
2328    tr_pex * pex;
2329    tr_pex * walk;
2330
2331    assert( tr_isTorrent( tor ) );
2332    assert( setme_pex != NULL );
2333    assert( af==TR_AF_INET || af==TR_AF_INET6 );
2334    assert( list_mode==TR_PEERS_CONNECTED || list_mode==TR_PEERS_INTERESTING );
2335
2336    managerLock( t->manager );
2337
2338    /**
2339    ***  build a list of atoms
2340    **/
2341
2342    if( list_mode == TR_PEERS_CONNECTED ) /* connected peers only */
2343    {
2344        int i;
2345        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2346        atomCount = tr_ptrArraySize( &t->peers );
2347        atoms = tr_new( struct peer_atom *, atomCount );
2348        for( i=0; i<atomCount; ++i )
2349            atoms[i] = peers[i]->atom;
2350    }
2351    else /* TR_PEERS_INTERESTING */
2352    {
2353        int i;
2354        struct peer_atom ** atomBase = (struct peer_atom**) tr_ptrArrayBase( &t->pool );
2355        n = tr_ptrArraySize( &t->pool );
2356        atoms = tr_new( struct peer_atom *, n );
2357        for( i=0; i<n; ++i )
2358            if( isAtomInteresting( tor, atomBase[i] ) )
2359                atoms[atomCount++] = atomBase[i];
2360    }
2361
2362    qsort( atoms, atomCount, sizeof( struct peer_atom * ), compareAtomsByUsefulness );
2363
2364    /**
2365    ***  add the first N of them into our return list
2366    **/
2367
2368    n = MIN( atomCount, maxCount );
2369    pex = walk = tr_new0( tr_pex, n );
2370
2371    for( i=0; i<atomCount && count<n; ++i )
2372    {
2373        const struct peer_atom * atom = atoms[i];
2374        if( atom->addr.type == af )
2375        {
2376            assert( tr_address_is_valid( &atom->addr ) );
2377            walk->addr = atom->addr;
2378            walk->port = atom->port;
2379            walk->flags = atom->flags;
2380            ++count;
2381            ++walk;
2382        }
2383    }
2384
2385    qsort( pex, count, sizeof( tr_pex ), tr_pexCompare );
2386
2387    assert( ( walk - pex ) == count );
2388    *setme_pex = pex;
2389
2390    /* cleanup */
2391    tr_free( atoms );
2392    managerUnlock( t->manager );
2393    return count;
2394}
2395
2396static void atomPulse      ( int, short, void * );
2397static void bandwidthPulse ( int, short, void * );
2398static void rechokePulse   ( int, short, void * );
2399static void reconnectPulse ( int, short, void * );
2400
2401static struct event *
2402createTimer( tr_session * session, int msec, void (*callback)(int, short, void *), void * cbdata )
2403{
2404    struct event * timer = evtimer_new( session->event_base, callback, cbdata );
2405    tr_timerAddMsec( timer, msec );
2406    return timer;
2407}
2408
2409static void
2410ensureMgrTimersExist( struct tr_peerMgr * m )
2411{
2412    if( m->atomTimer == NULL )
2413        m->atomTimer = createTimer( m->session, ATOM_PERIOD_MSEC, atomPulse, m );
2414
2415    if( m->bandwidthTimer == NULL )
2416        m->bandwidthTimer = createTimer( m->session, BANDWIDTH_PERIOD_MSEC, bandwidthPulse, m );
2417
2418    if( m->rechokeTimer == NULL )
2419        m->rechokeTimer = createTimer( m->session, RECHOKE_PERIOD_MSEC, rechokePulse, m );
2420
2421    if( m->refillUpkeepTimer == NULL )
2422        m->refillUpkeepTimer = createTimer( m->session, REFILL_UPKEEP_PERIOD_MSEC, refillUpkeep, m );
2423}
2424
2425void
2426tr_peerMgrStartTorrent( tr_torrent * tor )
2427{
2428    Torrent * t = tor->torrentPeers;
2429
2430    assert( tr_isTorrent( tor ) );
2431    assert( tr_torrentIsLocked( tor ) );
2432
2433    ensureMgrTimersExist( t->manager );
2434
2435    t->isRunning = true;
2436    t->maxPeers = t->tor->maxConnectedPeers;
2437    t->pieceSortState = PIECES_UNSORTED;
2438
2439    rechokePulse( 0, 0, t->manager );
2440}
2441
2442static void
2443stopTorrent( Torrent * t )
2444{
2445    tr_peer * peer;
2446
2447    t->isRunning = false;
2448
2449    replicationFree( t );
2450    invalidatePieceSorting( t );
2451
2452    /* disconnect the peers. */
2453    while(( peer = tr_ptrArrayPop( &t->peers )))
2454        peerDelete( t, peer );
2455
2456    /* disconnect the handshakes. handshakeAbort calls handshakeDoneCB(),
2457     * which removes the handshake from t->outgoingHandshakes... */
2458    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
2459        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
2460}
2461
2462void
2463tr_peerMgrStopTorrent( tr_torrent * tor )
2464{
2465    assert( tr_isTorrent( tor ) );
2466    assert( tr_torrentIsLocked( tor ) );
2467
2468    stopTorrent( tor->torrentPeers );
2469}
2470
2471void
2472tr_peerMgrAddTorrent( tr_peerMgr * manager, tr_torrent * tor )
2473{
2474    assert( tr_isTorrent( tor ) );
2475    assert( tr_torrentIsLocked( tor ) );
2476    assert( tor->torrentPeers == NULL );
2477
2478    tor->torrentPeers = torrentNew( manager, tor );
2479}
2480
2481void
2482tr_peerMgrRemoveTorrent( tr_torrent * tor )
2483{
2484    assert( tr_isTorrent( tor ) );
2485    assert( tr_torrentIsLocked( tor ) );
2486
2487    stopTorrent( tor->torrentPeers );
2488    torrentFree( tor->torrentPeers );
2489}
2490
2491void
2492tr_peerUpdateProgress( tr_torrent * tor, tr_peer * peer )
2493{
2494    const tr_bitfield * have = &peer->have;
2495
2496    if( tr_bitfieldHasAll( have ) )
2497    {
2498        peer->progress = 1.0;
2499    }
2500    else if( tr_bitfieldHasNone( have ) )
2501    {
2502        peer->progress = 0.0;
2503    }
2504    else
2505    {
2506        const float true_count = tr_bitfieldCountTrueBits( have );
2507
2508        if( tr_torrentHasMetadata( tor ) )
2509            peer->progress = true_count / tor->info.pieceCount;
2510        else /* without pieceCount, this result is only a best guess... */
2511            peer->progress = true_count / ( have->bit_count + 1 );
2512    }
2513
2514    if( peer->atom && ( peer->progress >= 1.0 ) )
2515        atomSetSeed( tor->torrentPeers, peer->atom );
2516}
2517
2518void
2519tr_peerMgrOnTorrentGotMetainfo( tr_torrent * tor )
2520{
2521    int i;
2522    const int peerCount = tr_ptrArraySize( &tor->torrentPeers->peers );
2523    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &tor->torrentPeers->peers );
2524
2525    /* some peer_msgs' progress fields may not be accurate if we
2526       didn't have the metadata before now... so refresh them all... */
2527    for( i=0; i<peerCount; ++i )
2528        tr_peerUpdateProgress( tor, peers[i] );
2529}
2530
2531void
2532tr_peerMgrTorrentAvailability( const tr_torrent * tor, int8_t * tab, unsigned int tabCount )
2533{
2534    assert( tr_isTorrent( tor ) );
2535    assert( torrentIsLocked( tor->torrentPeers ) );
2536    assert( tab != NULL );
2537    assert( tabCount > 0 );
2538
2539    memset( tab, 0, tabCount );
2540
2541    if( tr_torrentHasMetadata( tor ) )
2542    {
2543        tr_piece_index_t i;
2544        const int peerCount = tr_ptrArraySize( &tor->torrentPeers->peers );
2545        const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &tor->torrentPeers->peers );
2546        const float interval = tor->info.pieceCount / (float)tabCount;
2547        const bool isSeed = tr_cpGetStatus( &tor->completion ) == TR_SEED;
2548
2549        for( i=0; i<tabCount; ++i )
2550        {
2551            const int piece = i * interval;
2552
2553            if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
2554                tab[i] = -1;
2555            else if( peerCount ) {
2556                int j;
2557                for( j=0; j<peerCount; ++j )
2558                    if( tr_bitfieldHas( &peers[j]->have, piece ) )
2559                        ++tab[i];
2560            }
2561        }
2562    }
2563}
2564
2565static bool
2566peerIsSeed( const tr_peer * peer )
2567{
2568    if( peer->progress >= 1.0 )
2569        return true;
2570
2571    if( peer->atom && atomIsSeed( peer->atom ) )
2572        return true;
2573
2574    return false;
2575}
2576
2577/* count how many bytes we want that connected peers have */
2578uint64_t
2579tr_peerMgrGetDesiredAvailable( const tr_torrent * tor )
2580{
2581    size_t i;
2582    size_t n;
2583    uint64_t desiredAvailable;
2584    const Torrent * t = tor->torrentPeers;
2585
2586    /* common shortcuts... */
2587
2588    if( tr_torrentIsSeed( t->tor ) )
2589        return 0;
2590
2591    if( !tr_torrentHasMetadata( tor ) )
2592        return 0;
2593
2594    n = tr_ptrArraySize( &t->peers );
2595    if( n == 0 )
2596        return 0;
2597    else {
2598        const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2599        for( i=0; i<n; ++i )
2600            if( peers[i]->atom && atomIsSeed( peers[i]->atom ) )
2601                return tr_cpLeftUntilDone( &tor->completion );
2602    }
2603
2604    if( !t->pieceReplication || !t->pieceReplicationSize )
2605        return 0;
2606
2607    /* do it the hard way */
2608
2609    desiredAvailable = 0;
2610    for( i=0, n=MIN(tor->info.pieceCount, t->pieceReplicationSize); i<n; ++i )
2611        if( !tor->info.pieces[i].dnd && ( t->pieceReplication[i] > 0 ) )
2612            desiredAvailable += tr_cpMissingBytesInPiece( &t->tor->completion, i );
2613
2614    assert( desiredAvailable <= tor->info.totalSize );
2615    return desiredAvailable;
2616}
2617
2618void
2619tr_peerMgrTorrentStats( tr_torrent  * tor,
2620                        int         * setmePeersConnected,
2621                        int         * setmeWebseedsSendingToUs,
2622                        int         * setmePeersSendingToUs,
2623                        int         * setmePeersGettingFromUs,
2624                        int         * setmePeersFrom )
2625{
2626    int i, size;
2627    const Torrent * t = tor->torrentPeers;
2628    const tr_peer ** peers;
2629
2630    assert( tr_torrentIsLocked( tor ) );
2631
2632    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2633    size = tr_ptrArraySize( &t->peers );
2634
2635    *setmePeersConnected       = 0;
2636    *setmePeersGettingFromUs   = 0;
2637    *setmePeersSendingToUs     = 0;
2638    *setmeWebseedsSendingToUs  = 0;
2639
2640    for( i=0; i<TR_PEER_FROM__MAX; ++i )
2641        setmePeersFrom[i] = 0;
2642
2643    for( i=0; i<size; ++i )
2644    {
2645        const tr_peer * peer = peers[i];
2646        const struct peer_atom * atom = peer->atom;
2647
2648        if( peer->io == NULL ) /* not connected */
2649            continue;
2650
2651        ++*setmePeersConnected;
2652
2653        ++setmePeersFrom[atom->fromFirst];
2654
2655        if( clientIsDownloadingFrom( tor, peer ) )
2656            ++*setmePeersSendingToUs;
2657
2658        if( clientIsUploadingTo( peer ) )
2659            ++*setmePeersGettingFromUs;
2660    }
2661
2662    *setmeWebseedsSendingToUs = countActiveWebseeds( t );
2663}
2664
2665double*
2666tr_peerMgrWebSpeeds_KBps( const tr_torrent * tor )
2667{
2668    int i;
2669    const Torrent * t = tor->torrentPeers;
2670    const int webseedCount = tr_ptrArraySize( &t->webseeds );
2671    const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2672    const uint64_t now = tr_time_msec( );
2673    double * ret = tr_new0( double, webseedCount );
2674
2675    assert( tr_isTorrent( tor ) );
2676    assert( tr_torrentIsLocked( tor ) );
2677    assert( t->manager != NULL );
2678    assert( webseedCount == tor->info.webseedCount );
2679
2680    for( i=0; i<webseedCount; ++i ) {
2681        int Bps;
2682        if( tr_webseedGetSpeed_Bps( webseeds[i], now, &Bps ) )
2683            ret[i] = Bps / (double)tr_speed_K;
2684        else
2685            ret[i] = -1.0;
2686    }
2687
2688    return ret;
2689}
2690
2691int
2692tr_peerGetPieceSpeed_Bps( const tr_peer * peer, uint64_t now, tr_direction direction )
2693{
2694    return peer->io ? tr_peerIoGetPieceSpeed_Bps( peer->io, now, direction ) : 0.0;
2695}
2696
2697struct tr_peer_stat *
2698tr_peerMgrPeerStats( const tr_torrent * tor, int * setmeCount )
2699{
2700    int i;
2701    const Torrent * t = tor->torrentPeers;
2702    const int size = tr_ptrArraySize( &t->peers );
2703    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2704    const uint64_t now_msec = tr_time_msec( );
2705    const time_t now = tr_time();
2706    tr_peer_stat * ret = tr_new0( tr_peer_stat, size );
2707
2708    assert( tr_isTorrent( tor ) );
2709    assert( tr_torrentIsLocked( tor ) );
2710    assert( t->manager );
2711
2712    for( i=0; i<size; ++i )
2713    {
2714        char *                   pch;
2715        const tr_peer *          peer = peers[i];
2716        const struct peer_atom * atom = peer->atom;
2717        tr_peer_stat *           stat = ret + i;
2718
2719        tr_address_to_string_with_buf( &atom->addr, stat->addr, sizeof( stat->addr ) );
2720        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
2721                   sizeof( stat->client ) );
2722        stat->port                = ntohs( peer->atom->port );
2723        stat->from                = atom->fromFirst;
2724        stat->progress            = peer->progress;
2725        stat->isUTP               = peer->io->utp_socket != NULL;
2726        stat->isEncrypted         = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
2727        stat->rateToPeer_KBps     = toSpeedKBps( tr_peerGetPieceSpeed_Bps( peer, now_msec, TR_CLIENT_TO_PEER ) );
2728        stat->rateToClient_KBps   = toSpeedKBps( tr_peerGetPieceSpeed_Bps( peer, now_msec, TR_PEER_TO_CLIENT ) );
2729        stat->peerIsChoked        = peer->peerIsChoked;
2730        stat->peerIsInterested    = peer->peerIsInterested;
2731        stat->clientIsChoked      = peer->clientIsChoked;
2732        stat->clientIsInterested  = peer->clientIsInterested;
2733        stat->isIncoming          = tr_peerIoIsIncoming( peer->io );
2734        stat->isDownloadingFrom   = clientIsDownloadingFrom( tor, peer );
2735        stat->isUploadingTo       = clientIsUploadingTo( peer );
2736        stat->isSeed              = peerIsSeed( peer );
2737
2738        stat->blocksToPeer        = tr_historyGet( &peer->blocksSentToPeer,    now, CANCEL_HISTORY_SEC );
2739        stat->blocksToClient      = tr_historyGet( &peer->blocksSentToClient,  now, CANCEL_HISTORY_SEC );
2740        stat->cancelsToPeer       = tr_historyGet( &peer->cancelsSentToPeer,   now, CANCEL_HISTORY_SEC );
2741        stat->cancelsToClient     = tr_historyGet( &peer->cancelsSentToClient, now, CANCEL_HISTORY_SEC );
2742
2743        stat->pendingReqsToPeer   = peer->pendingReqsToPeer;
2744        stat->pendingReqsToClient = peer->pendingReqsToClient;
2745
2746        pch = stat->flagStr;
2747        if( stat->isUTP ) *pch++ = 'T';
2748        if( t->optimistic == peer ) *pch++ = 'O';
2749        if( stat->isDownloadingFrom ) *pch++ = 'D';
2750        else if( stat->clientIsInterested ) *pch++ = 'd';
2751        if( stat->isUploadingTo ) *pch++ = 'U';
2752        else if( stat->peerIsInterested ) *pch++ = 'u';
2753        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
2754        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
2755        if( stat->isEncrypted ) *pch++ = 'E';
2756        if( stat->from == TR_PEER_FROM_DHT ) *pch++ = 'H';
2757        else if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
2758        if( stat->isIncoming ) *pch++ = 'I';
2759        *pch = '\0';
2760    }
2761
2762    *setmeCount = size;
2763
2764    return ret;
2765}
2766
2767/***
2768****
2769****
2770***/
2771
2772void
2773tr_peerMgrClearInterest( tr_torrent * tor )
2774{
2775    int i;
2776    Torrent * t = tor->torrentPeers;
2777    const int peerCount = tr_ptrArraySize( &t->peers );
2778
2779    assert( tr_isTorrent( tor ) );
2780    assert( tr_torrentIsLocked( tor ) );
2781
2782    for( i=0; i<peerCount; ++i )
2783    {
2784        const tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2785        tr_peerMsgsSetInterested( peer->msgs, false );
2786    }
2787}
2788
2789/* does this peer have any pieces that we want? */
2790static bool
2791isPeerInteresting( const tr_torrent  * const tor,
2792                   const tr_bitfield * const interesting_pieces,
2793                   const tr_peer     * const peer )
2794{
2795    tr_piece_index_t i, n;
2796
2797    /* these cases should have already been handled by the calling code... */
2798    assert( !tr_torrentIsSeed( tor ) );
2799    assert( tr_torrentIsPieceTransferAllowed( tor, TR_PEER_TO_CLIENT ) );
2800
2801    if( peerIsSeed( peer ) )
2802        return true;
2803
2804    for( i=0, n=tor->info.pieceCount; i<n; ++i )
2805        if( tr_bitfieldHas( interesting_pieces, i ) && tr_bitfieldHas( &peer->have, i ) )
2806            return true;
2807
2808    return false;
2809}
2810
2811typedef enum
2812{
2813    RECHOKE_STATE_GOOD,
2814    RECHOKE_STATE_UNTESTED,
2815    RECHOKE_STATE_BAD
2816}
2817tr_rechoke_state;
2818
2819struct tr_rechoke_info
2820{
2821    tr_peer * peer;
2822    int salt;
2823    int rechoke_state;
2824};
2825
2826static int
2827compare_rechoke_info( const void * va, const void * vb )
2828{
2829    const struct tr_rechoke_info * a = va;
2830    const struct tr_rechoke_info * b = vb;
2831
2832    if( a->rechoke_state != b->rechoke_state )
2833        return a->rechoke_state - b->rechoke_state;
2834
2835    return a->salt - b->salt;
2836}
2837
2838/* determines who we send "interested" messages to */
2839static void
2840rechokeDownloads( Torrent * t )
2841{
2842    int i;
2843    int maxPeers = 0;
2844    int rechoke_count = 0;
2845    struct tr_rechoke_info * rechoke = NULL;
2846    const int MIN_INTERESTING_PEERS = 5;
2847    const int peerCount = tr_ptrArraySize( &t->peers );
2848    const time_t now = tr_time( );
2849
2850    /* some cases where this function isn't necessary */
2851    if( tr_torrentIsSeed( t->tor ) )
2852        return;
2853    if ( !tr_torrentIsPieceTransferAllowed( t->tor, TR_PEER_TO_CLIENT ) )
2854        return;
2855
2856    /* decide HOW MANY peers to be interested in */
2857    {
2858        int blocks = 0;
2859        int cancels = 0;
2860        time_t timeSinceCancel;
2861
2862        /* Count up how many blocks & cancels each peer has.
2863         *
2864         * There are two situations where we send out cancels --
2865         *
2866         * 1. We've got unresponsive peers, which is handled by deciding
2867         *    -which- peers to be interested in.
2868         *
2869         * 2. We've hit our bandwidth cap, which is handled by deciding
2870         *    -how many- peers to be interested in.
2871         *
2872         * We're working on 2. here, so we need to ignore unresponsive
2873         * peers in our calculations lest they confuse Transmission into
2874         * thinking it's hit its bandwidth cap.
2875         */
2876        for( i=0; i<peerCount; ++i )
2877        {
2878            const tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2879            const int b = tr_historyGet( &peer->blocksSentToClient, now, CANCEL_HISTORY_SEC );
2880            const int c = tr_historyGet( &peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC );
2881
2882            if( b == 0 ) /* ignore unresponsive peers, as described above */
2883                continue;
2884
2885            blocks += b;
2886            cancels += c;
2887        }
2888
2889        if( cancels > 0 )
2890        {
2891            /* cancelRate: of the block requests we've recently made, the percentage we cancelled.
2892             * higher values indicate more congestion. */
2893            const double cancelRate = cancels / (double)(cancels + blocks);
2894            const double mult = 1 - MIN( cancelRate, 0.5 );
2895            maxPeers = t->interestedCount * mult;
2896            tordbg( t, "cancel rate is %.3f -- reducing the "
2897                       "number of peers we're interested in by %.0f percent",
2898                       cancelRate, mult * 100 );
2899            t->lastCancel = now;
2900        }
2901
2902        timeSinceCancel = now - t->lastCancel;
2903        if( timeSinceCancel )
2904        {
2905            const int maxIncrease = 15;
2906            const time_t maxHistory = 2 * CANCEL_HISTORY_SEC;
2907            const double mult = MIN( timeSinceCancel, maxHistory ) / (double) maxHistory;
2908            const int inc = maxIncrease * mult;
2909            maxPeers = t->maxPeers + inc;
2910            tordbg( t, "time since last cancel is %li -- increasing the "
2911                       "number of peers we're interested in by %d",
2912                       timeSinceCancel, inc );
2913        }
2914    }
2915
2916    /* don't let the previous section's number tweaking go too far... */
2917    if( maxPeers < MIN_INTERESTING_PEERS )
2918        maxPeers = MIN_INTERESTING_PEERS;
2919    if( maxPeers > t->tor->maxConnectedPeers )
2920        maxPeers = t->tor->maxConnectedPeers;
2921
2922    t->maxPeers = maxPeers;
2923
2924    if( peerCount > 0 )
2925    {
2926        const tr_torrent * const tor = t->tor;
2927        const int n = tor->info.pieceCount;
2928        tr_bitfield interesting_pieces = TR_BITFIELD_INIT;
2929
2930        /* build a bitfield of interesting pieces... */
2931        tr_bitfieldConstruct( &interesting_pieces, n );
2932        for( i=0; i<n; i++ )
2933            if( !tor->info.pieces[i].dnd && !tr_cpPieceIsComplete( &tor->completion, i ) )
2934                tr_bitfieldAdd( &interesting_pieces, i );
2935
2936        /* decide WHICH peers to be interested in (based on their cancel-to-block ratio) */
2937        for( i=0; i<peerCount; ++i )
2938        {
2939            tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2940
2941            if( !isPeerInteresting( t->tor, &interesting_pieces, peer ) )
2942            {
2943                tr_peerMsgsSetInterested( peer->msgs, false );
2944            }
2945            else
2946            {
2947                tr_rechoke_state rechoke_state;
2948                const int blocks = tr_historyGet( &peer->blocksSentToClient, now, CANCEL_HISTORY_SEC );
2949                const int cancels = tr_historyGet( &peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC );
2950
2951                if( !blocks && !cancels )
2952                    rechoke_state = RECHOKE_STATE_UNTESTED;
2953                else if( !cancels )
2954                    rechoke_state = RECHOKE_STATE_GOOD;
2955                else if( !blocks )
2956                    rechoke_state = RECHOKE_STATE_BAD;
2957                else if( ( cancels * 10 ) < blocks )
2958                    rechoke_state = RECHOKE_STATE_GOOD;
2959                else
2960                    rechoke_state = RECHOKE_STATE_BAD;
2961
2962                if( rechoke == NULL )
2963                    rechoke = tr_new( struct tr_rechoke_info, peerCount );
2964
2965                 rechoke[rechoke_count].peer = peer;
2966                 rechoke[rechoke_count].rechoke_state = rechoke_state;
2967                 rechoke[rechoke_count].salt = tr_cryptoWeakRandInt( INT_MAX );
2968                 rechoke_count++;
2969            }
2970
2971        }
2972
2973        tr_bitfieldDestruct( &interesting_pieces );
2974    }
2975
2976    /* now that we know which & how many peers to be interested in... update the peer interest */
2977    qsort( rechoke, rechoke_count, sizeof( struct tr_rechoke_info ), compare_rechoke_info );
2978    t->interestedCount = MIN( maxPeers, rechoke_count );
2979    for( i=0; i<rechoke_count; ++i )
2980        tr_peerMsgsSetInterested( rechoke[i].peer->msgs, i<t->interestedCount );
2981
2982    /* cleanup */
2983    tr_free( rechoke );
2984}
2985
2986/**
2987***
2988**/
2989
2990struct ChokeData
2991{
2992    bool            isInterested;
2993    bool            wasChoked;
2994    bool            isChoked;
2995    int             rate;
2996    int             salt;
2997    tr_peer *       peer;
2998};
2999
3000static int
3001compareChoke( const void * va, const void * vb )
3002{
3003    const struct ChokeData * a = va;
3004    const struct ChokeData * b = vb;
3005
3006    if( a->rate != b->rate ) /* prefer higher overall speeds */
3007        return a->rate > b->rate ? -1 : 1;
3008
3009    if( a->wasChoked != b->wasChoked ) /* prefer unchoked */
3010        return a->wasChoked ? 1 : -1;
3011
3012    if( a->salt != b->salt ) /* random order */
3013        return a->salt - b->salt;
3014
3015    return 0;
3016}
3017
3018/* is this a new connection? */
3019static int
3020isNew( const tr_peer * peer )
3021{
3022    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
3023}
3024
3025/* get a rate for deciding which peers to choke and unchoke. */
3026static int
3027getRate( const tr_torrent * tor, struct peer_atom * atom, uint64_t now )
3028{
3029    int Bps;
3030
3031    if( tr_torrentIsSeed( tor ) )
3032        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_CLIENT_TO_PEER );
3033
3034    /* downloading a private torrent... take upload speed into account
3035     * because there may only be a small window of opportunity to share */
3036    else if( tr_torrentIsPrivate( tor ) )
3037        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_PEER_TO_CLIENT )
3038            + tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_CLIENT_TO_PEER );
3039
3040    /* downloading a public torrent */
3041    else
3042        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_PEER_TO_CLIENT );
3043
3044    /* convert it to bytes per second */
3045    return Bps;
3046}
3047
3048static inline bool
3049isBandwidthMaxedOut( const tr_bandwidth * b,
3050                     const uint64_t now_msec, tr_direction dir )
3051{
3052    if( !tr_bandwidthIsLimited( b, dir ) )
3053        return false;
3054    else {
3055        const int got = tr_bandwidthGetPieceSpeed_Bps( b, now_msec, dir );
3056        const int want = tr_bandwidthGetDesiredSpeed_Bps( b, dir );
3057        return got >= want;
3058    }
3059}
3060
3061static void
3062rechokeUploads( Torrent * t, const uint64_t now )
3063{
3064    int i, size, unchokedInterested;
3065    const int peerCount = tr_ptrArraySize( &t->peers );
3066    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
3067    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
3068    const tr_session * session = t->manager->session;
3069    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
3070    const bool isMaxedOut = isBandwidthMaxedOut( &t->tor->bandwidth, now, TR_UP );
3071
3072    assert( torrentIsLocked( t ) );
3073
3074    /* an optimistic unchoke peer's "optimistic"
3075     * state lasts for N calls to rechokeUploads(). */
3076    if( t->optimisticUnchokeTimeScaler > 0 )
3077        t->optimisticUnchokeTimeScaler--;
3078    else
3079        t->optimistic = NULL;
3080
3081    /* sort the peers by preference and rate */
3082    for( i = 0, size = 0; i < peerCount; ++i )
3083    {
3084        tr_peer * peer = peers[i];
3085        struct peer_atom * atom = peer->atom;
3086
3087        if( peerIsSeed( peer ) ) /* choke seeds and partial seeds */
3088        {
3089            tr_peerMsgsSetChoke( peer->msgs, true );
3090        }
3091        else if( chokeAll ) /* choke everyone if we're not uploading */
3092        {
3093            tr_peerMsgsSetChoke( peer->msgs, true );
3094        }
3095        else if( peer != t->optimistic )
3096        {
3097            struct ChokeData * n = &choke[size++];
3098            n->peer         = peer;
3099            n->isInterested = peer->peerIsInterested;
3100            n->wasChoked    = peer->peerIsChoked;
3101            n->rate         = getRate( t->tor, atom, now );
3102            n->salt         = tr_cryptoWeakRandInt( INT_MAX );
3103            n->isChoked     = true;
3104        }
3105    }
3106
3107    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
3108
3109    /**
3110     * Reciprocation and number of uploads capping is managed by unchoking
3111     * the N peers which have the best upload rate and are interested.
3112     * This maximizes the client's download rate. These N peers are
3113     * referred to as downloaders, because they are interested in downloading
3114     * from the client.
3115     *
3116     * Peers which have a better upload rate (as compared to the downloaders)
3117     * but aren't interested get unchoked. If they become interested, the
3118     * downloader with the worst upload rate gets choked. If a client has
3119     * a complete file, it uses its upload rate rather than its download
3120     * rate to decide which peers to unchoke.
3121     *
3122     * If our bandwidth is maxed out, don't unchoke any more peers.
3123     */
3124    unchokedInterested = 0;
3125    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
3126        choke[i].isChoked = isMaxedOut ? choke[i].wasChoked : false;
3127        if( choke[i].isInterested )
3128            ++unchokedInterested;
3129    }
3130
3131    /* optimistic unchoke */
3132    if( !t->optimistic && !isMaxedOut && (i<size) )
3133    {
3134        int n;
3135        struct ChokeData * c;
3136        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
3137
3138        for( ; i<size; ++i )
3139        {
3140            if( choke[i].isInterested )
3141            {
3142                const tr_peer * peer = choke[i].peer;
3143                int x = 1, y;
3144                if( isNew( peer ) ) x *= 3;
3145                for( y=0; y<x; ++y )
3146                    tr_ptrArrayAppend( &randPool, &choke[i] );
3147            }
3148        }
3149
3150        if(( n = tr_ptrArraySize( &randPool )))
3151        {
3152            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
3153            c->isChoked = false;
3154            t->optimistic = c->peer;
3155            t->optimisticUnchokeTimeScaler = OPTIMISTIC_UNCHOKE_MULTIPLIER;
3156        }
3157
3158        tr_ptrArrayDestruct( &randPool, NULL );
3159    }
3160
3161    for( i=0; i<size; ++i )
3162        tr_peerMsgsSetChoke( choke[i].peer->msgs, choke[i].isChoked );
3163
3164    /* cleanup */
3165    tr_free( choke );
3166}
3167
3168static void
3169rechokePulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3170{
3171    tr_torrent * tor = NULL;
3172    tr_peerMgr * mgr = vmgr;
3173    const uint64_t now = tr_time_msec( );
3174
3175    managerLock( mgr );
3176
3177    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3178        if( tor->isRunning ) {
3179            Torrent * t = tor->torrentPeers;
3180            if( !tr_ptrArrayEmpty( &t->peers ) ) {
3181                rechokeUploads( t, now );
3182                rechokeDownloads( t );
3183            }
3184        }
3185    }
3186
3187    tr_timerAddMsec( mgr->rechokeTimer, RECHOKE_PERIOD_MSEC );
3188    managerUnlock( mgr );
3189}
3190
3191/***
3192****
3193****  Life and Death
3194****
3195***/
3196
3197static bool
3198shouldPeerBeClosed( const Torrent    * t,
3199                    const tr_peer    * peer,
3200                    int                peerCount,
3201                    const time_t       now )
3202{
3203    const tr_torrent *       tor = t->tor;
3204    const struct peer_atom * atom = peer->atom;
3205
3206    /* if it's marked for purging, close it */
3207    if( peer->doPurge )
3208    {
3209        tordbg( t, "purging peer %s because its doPurge flag is set",
3210                tr_atomAddrStr( atom ) );
3211        return true;
3212    }
3213
3214    /* disconnect if we're both seeds and enough time has passed for PEX */
3215    if( tr_torrentIsSeed( tor ) && peerIsSeed( peer ) )
3216        return !tr_torrentAllowsPex(tor) || (now-atom->time>=30);
3217
3218    /* disconnect if it's been too long since piece data has been transferred.
3219     * this is on a sliding scale based on number of available peers... */
3220    {
3221        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
3222        /* if we have >= relaxIfFewerThan, strictness is 100%.
3223         * if we have zero connections, strictness is 0% */
3224        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
3225                               ? 1.0
3226                               : peerCount / (float)relaxStrictnessIfFewerThanN;
3227        const int lo = MIN_UPLOAD_IDLE_SECS;
3228        const int hi = MAX_UPLOAD_IDLE_SECS;
3229        const int limit = hi - ( ( hi - lo ) * strictness );
3230        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
3231/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
3232        if( idleTime > limit ) {
3233            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
3234                       tr_atomAddrStr( atom ), idleTime );
3235            return true;
3236        }
3237    }
3238
3239    return false;
3240}
3241
3242static tr_peer **
3243getPeersToClose( Torrent * t, const time_t now_sec, int * setmeSize )
3244{
3245    int i, peerCount, outsize;
3246    struct tr_peer ** ret = NULL;
3247    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
3248
3249    assert( torrentIsLocked( t ) );
3250
3251    for( i = outsize = 0; i < peerCount; ++i ) {
3252        if( shouldPeerBeClosed( t, peers[i], peerCount, now_sec ) ) {
3253            if( ret == NULL )
3254                ret = tr_new( tr_peer *, peerCount );
3255            ret[outsize++] = peers[i];
3256        }
3257    }
3258
3259    *setmeSize = outsize;
3260    return ret;
3261}
3262
3263static int
3264getReconnectIntervalSecs( const struct peer_atom * atom, const time_t now )
3265{
3266    int sec;
3267
3268    /* if we were recently connected to this peer and transferring piece
3269     * data, try to reconnect to them sooner rather that later -- we don't
3270     * want network troubles to get in the way of a good peer. */
3271    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
3272        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3273
3274    /* don't allow reconnects more often than our minimum */
3275    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
3276        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3277
3278    /* otherwise, the interval depends on how many times we've tried
3279     * and failed to connect to the peer */
3280    else switch( atom->numFails ) {
3281        case 0: sec = 0; break;
3282        case 1: sec = 5; break;
3283        case 2: sec = 2 * 60; break;
3284        case 3: sec = 15 * 60; break;
3285        case 4: sec = 30 * 60; break;
3286        case 5: sec = 60 * 60; break;
3287        default: sec = 120 * 60; break;
3288    }
3289
3290    /* penalize peers that were unreachable the last time we tried */
3291    if( atom->flags2 & MYFLAG_UNREACHABLE )
3292        sec += sec;
3293
3294    dbgmsg( "reconnect interval for %s is %d seconds", tr_atomAddrStr( atom ), sec );
3295    return sec;
3296}
3297
3298static void
3299removePeer( Torrent * t, tr_peer * peer )
3300{
3301    tr_peer * removed;
3302    struct peer_atom * atom = peer->atom;
3303
3304    assert( torrentIsLocked( t ) );
3305    assert( atom );
3306
3307    atom->time = tr_time( );
3308
3309    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
3310
3311    if( replicationExists( t ) )
3312        tr_decrReplicationFromBitfield( t, &peer->have );
3313
3314    assert( removed == peer );
3315    peerDelete( t, removed );
3316}
3317
3318static void
3319closePeer( Torrent * t, tr_peer * peer )
3320{
3321    struct peer_atom * atom;
3322
3323    assert( t != NULL );
3324    assert( peer != NULL );
3325
3326    atom = peer->atom;
3327
3328    /* if we transferred piece data, then they might be good peers,
3329       so reset their `numFails' weight to zero. otherwise we connected
3330       to them fruitlessly, so mark it as another fail */
3331    if( atom->piece_data_time ) {
3332        tordbg( t, "resetting atom %s numFails to 0", tr_atomAddrStr(atom) );
3333        atom->numFails = 0;
3334    } else {
3335        ++atom->numFails;
3336        tordbg( t, "incremented atom %s numFails to %d", tr_atomAddrStr(atom), (int)atom->numFails );
3337    }
3338
3339    tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
3340    removePeer( t, peer );
3341}
3342
3343static void
3344removeAllPeers( Torrent * t )
3345{
3346    while( !tr_ptrArrayEmpty( &t->peers ) )
3347        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
3348}
3349
3350static void
3351closeBadPeers( Torrent * t, const time_t now_sec )
3352{
3353    if( !tr_ptrArrayEmpty( &t->peers ) )
3354    {
3355        int i;
3356        int peerCount;
3357        struct tr_peer ** peers = getPeersToClose( t, now_sec, &peerCount );
3358        for( i=0; i<peerCount; ++i )
3359            closePeer( t, peers[i] );
3360        tr_free( peers );
3361    }
3362}
3363
3364struct peer_liveliness
3365{
3366    tr_peer * peer;
3367    void * clientData;
3368    time_t pieceDataTime;
3369    time_t time;
3370    int speed;
3371    bool doPurge;
3372};
3373
3374static int
3375comparePeerLiveliness( const void * va, const void * vb )
3376{
3377    const struct peer_liveliness * a = va;
3378    const struct peer_liveliness * b = vb;
3379
3380    if( a->doPurge != b->doPurge )
3381        return a->doPurge ? 1 : -1;
3382
3383    if( a->speed != b->speed ) /* faster goes first */
3384        return a->speed > b->speed ? -1 : 1;
3385
3386    /* the one to give us data more recently goes first */
3387    if( a->pieceDataTime != b->pieceDataTime )
3388        return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
3389
3390    /* the one we connected to most recently goes first */
3391    if( a->time != b->time )
3392        return a->time > b->time ? -1 : 1;
3393
3394    return 0;
3395}
3396
3397static void
3398sortPeersByLivelinessImpl( tr_peer  ** peers,
3399                           void     ** clientData,
3400                           int         n,
3401                           uint64_t    now,
3402                           int (*compare) ( const void *va, const void *vb ) )
3403{
3404    int i;
3405    struct peer_liveliness *lives, *l;
3406
3407    /* build a sortable array of peer + extra info */
3408    lives = l = tr_new0( struct peer_liveliness, n );
3409    for( i=0; i<n; ++i, ++l )
3410    {
3411        tr_peer * p = peers[i];
3412        l->peer = p;
3413        l->doPurge = p->doPurge;
3414        l->pieceDataTime = p->atom->piece_data_time;
3415        l->time = p->atom->time;
3416        l->speed = tr_peerGetPieceSpeed_Bps( p, now, TR_UP )
3417                 + tr_peerGetPieceSpeed_Bps( p, now, TR_DOWN );
3418        if( clientData )
3419            l->clientData = clientData[i];
3420    }
3421
3422    /* sort 'em */
3423    assert( n == ( l - lives ) );
3424    qsort( lives, n, sizeof( struct peer_liveliness ), compare );
3425
3426    /* build the peer array */
3427    for( i=0, l=lives; i<n; ++i, ++l ) {
3428        peers[i] = l->peer;
3429        if( clientData )
3430            clientData[i] = l->clientData;
3431    }
3432    assert( n == ( l - lives ) );
3433
3434    /* cleanup */
3435    tr_free( lives );
3436}
3437
3438static void
3439sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now )
3440{
3441    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLiveliness );
3442}
3443
3444
3445static void
3446enforceTorrentPeerLimit( Torrent * t, uint64_t now )
3447{
3448    int n = tr_ptrArraySize( &t->peers );
3449    const int max = tr_torrentGetPeerLimit( t->tor );
3450    if( n > max )
3451    {
3452        void * base = tr_ptrArrayBase( &t->peers );
3453        tr_peer ** peers = tr_memdup( base, n*sizeof( tr_peer* ) );
3454        sortPeersByLiveliness( peers, NULL, n, now );
3455        while( n > max )
3456            closePeer( t, peers[--n] );
3457        tr_free( peers );
3458    }
3459}
3460
3461static void
3462enforceSessionPeerLimit( tr_session * session, uint64_t now )
3463{
3464    int n = 0;
3465    tr_torrent * tor = NULL;
3466    const int max = tr_sessionGetPeerLimit( session );
3467
3468    /* count the total number of peers */
3469    while(( tor = tr_torrentNext( session, tor )))
3470        n += tr_ptrArraySize( &tor->torrentPeers->peers );
3471
3472    /* if there are too many, prune out the worst */
3473    if( n > max )
3474    {
3475        tr_peer ** peers = tr_new( tr_peer*, n );
3476        Torrent ** torrents = tr_new( Torrent*, n );
3477
3478        /* populate the peer array */
3479        n = 0;
3480        tor = NULL;
3481        while(( tor = tr_torrentNext( session, tor ))) {
3482            int i;
3483            Torrent * t = tor->torrentPeers;
3484            const int tn = tr_ptrArraySize( &t->peers );
3485            for( i=0; i<tn; ++i, ++n ) {
3486                peers[n] = tr_ptrArrayNth( &t->peers, i );
3487                torrents[n] = t;
3488            }
3489        }
3490
3491        /* sort 'em */
3492        sortPeersByLiveliness( peers, (void**)torrents, n, now );
3493
3494        /* cull out the crappiest */
3495        while( n-- > max )
3496            closePeer( torrents[n], peers[n] );
3497
3498        /* cleanup */
3499        tr_free( torrents );
3500        tr_free( peers );
3501    }
3502}
3503
3504static void makeNewPeerConnections( tr_peerMgr * mgr, const int max );
3505
3506static void
3507reconnectPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3508{
3509    tr_torrent * tor;
3510    tr_peerMgr * mgr = vmgr;
3511    const time_t now_sec = tr_time( );
3512    const uint64_t now_msec = tr_time_msec( );
3513
3514    /**
3515    ***  enforce the per-session and per-torrent peer limits
3516    **/
3517
3518    /* if we're over the per-torrent peer limits, cull some peers */
3519    tor = NULL;
3520    while(( tor = tr_torrentNext( mgr->session, tor )))
3521        if( tor->isRunning )
3522            enforceTorrentPeerLimit( tor->torrentPeers, now_msec );
3523
3524    /* if we're over the per-session peer limits, cull some peers */
3525    enforceSessionPeerLimit( mgr->session, now_msec );
3526
3527    /* remove crappy peers */
3528    tor = NULL;
3529    while(( tor = tr_torrentNext( mgr->session, tor )))
3530        if( !tor->torrentPeers->isRunning )
3531            removeAllPeers( tor->torrentPeers );
3532        else
3533            closeBadPeers( tor->torrentPeers, now_sec );
3534
3535    /* try to make new peer connections */
3536    makeNewPeerConnections( mgr, MAX_CONNECTIONS_PER_PULSE );
3537}
3538
3539/****
3540*****
3541*****  BANDWIDTH ALLOCATION
3542*****
3543****/
3544
3545static void
3546pumpAllPeers( tr_peerMgr * mgr )
3547{
3548    tr_torrent * tor = NULL;
3549
3550    while(( tor = tr_torrentNext( mgr->session, tor )))
3551    {
3552        int j;
3553        Torrent * t = tor->torrentPeers;
3554
3555        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
3556        {
3557            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
3558            tr_peerMsgsPulse( peer->msgs );
3559        }
3560    }
3561}
3562
3563static void
3564queuePulse( tr_session * session, tr_direction dir )
3565{
3566    assert( tr_isSession( session ) );
3567    assert( tr_isDirection( dir ) );
3568
3569    if( tr_sessionGetQueueEnabled( session, dir ) )
3570    {
3571        int i;
3572        const int n = tr_sessionCountQueueFreeSlots( session, dir );
3573        for( i=0; i<n; i++ ) {
3574            tr_torrent * tor = tr_sessionGetNextQueuedTorrent( session, dir );
3575            if( tor != NULL ) {
3576                tr_torrentStartNow( tor );
3577                if( tor->queue_started_callback != NULL )
3578                    (*tor->queue_started_callback)( tor );
3579            }
3580        }
3581    }
3582}
3583
3584static void
3585bandwidthPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3586{
3587    tr_torrent * tor;
3588    tr_peerMgr * mgr = vmgr;
3589    tr_session * session = mgr->session;
3590    managerLock( mgr );
3591
3592    /* FIXME: this next line probably isn't necessary... */
3593    pumpAllPeers( mgr );
3594
3595    /* allocate bandwidth to the peers */
3596    tr_bandwidthAllocate( &session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
3597    tr_bandwidthAllocate( &session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
3598
3599    /* torrent upkeep */
3600    tor = NULL;
3601    while(( tor = tr_torrentNext( session, tor )))
3602    {
3603        /* possibly stop torrents that have seeded enough */
3604        tr_torrentCheckSeedLimit( tor );
3605
3606        /* run the completeness check for any torrents that need it */
3607        if( tor->torrentPeers->needsCompletenessCheck ) {
3608            tor->torrentPeers->needsCompletenessCheck  = false;
3609            tr_torrentRecheckCompleteness( tor );
3610        }
3611
3612        /* stop torrents that are ready to stop, but couldn't be stopped
3613           earlier during the peer-io callback call chain */
3614        if( tor->isStopping )
3615            tr_torrentStop( tor );
3616    }
3617
3618    /* pump the queues */
3619    queuePulse( session, TR_UP );
3620    queuePulse( session, TR_DOWN );
3621
3622    reconnectPulse( 0, 0, mgr );
3623
3624    tr_timerAddMsec( mgr->bandwidthTimer, BANDWIDTH_PERIOD_MSEC );
3625    managerUnlock( mgr );
3626}
3627
3628/***
3629****
3630***/
3631
3632static int
3633compareAtomPtrsByAddress( const void * va, const void *vb )
3634{
3635    const struct peer_atom * a = * (const struct peer_atom**) va;
3636    const struct peer_atom * b = * (const struct peer_atom**) vb;
3637
3638    assert( tr_isAtom( a ) );
3639    assert( tr_isAtom( b ) );
3640
3641    return tr_address_compare( &a->addr, &b->addr );
3642}
3643
3644/* best come first, worst go last */
3645static int
3646compareAtomPtrsByShelfDate( const void * va, const void *vb )
3647{
3648    time_t atime;
3649    time_t btime;
3650    const struct peer_atom * a = * (const struct peer_atom**) va;
3651    const struct peer_atom * b = * (const struct peer_atom**) vb;
3652    const int data_time_cutoff_secs = 60 * 60;
3653    const time_t tr_now = tr_time( );
3654
3655    assert( tr_isAtom( a ) );
3656    assert( tr_isAtom( b ) );
3657
3658    /* primary key: the last piece data time *if* it was within the last hour */
3659    atime = a->piece_data_time; if( atime + data_time_cutoff_secs < tr_now ) atime = 0;
3660    btime = b->piece_data_time; if( btime + data_time_cutoff_secs < tr_now ) btime = 0;
3661    if( atime != btime )
3662        return atime > btime ? -1 : 1;
3663
3664    /* secondary key: shelf date. */
3665    if( a->shelf_date != b->shelf_date )
3666        return a->shelf_date > b->shelf_date ? -1 : 1;
3667
3668    return 0;
3669}
3670
3671static int
3672getMaxAtomCount( const tr_torrent * tor )
3673{
3674    const int n = tor->maxConnectedPeers;
3675    /* approximate fit of the old jump discontinuous function */
3676    if( n >= 55 ) return     n + 150;
3677    if( n >= 20 ) return 2 * n + 95;
3678    return               4 * n + 55;
3679}
3680
3681static void
3682atomPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3683{
3684    tr_torrent * tor = NULL;
3685    tr_peerMgr * mgr = vmgr;
3686    managerLock( mgr );
3687
3688    while(( tor = tr_torrentNext( mgr->session, tor )))
3689    {
3690        int atomCount;
3691        Torrent * t = tor->torrentPeers;
3692        const int maxAtomCount = getMaxAtomCount( tor );
3693        struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
3694
3695        if( atomCount > maxAtomCount ) /* we've got too many atoms... time to prune */
3696        {
3697            int i;
3698            int keepCount = 0;
3699            int testCount = 0;
3700            struct peer_atom ** keep = tr_new( struct peer_atom*, atomCount );
3701            struct peer_atom ** test = tr_new( struct peer_atom*, atomCount );
3702
3703            /* keep the ones that are in use */
3704            for( i=0; i<atomCount; ++i ) {
3705                struct peer_atom * atom = atoms[i];
3706                if( peerIsInUse( t, atom ) )
3707                    keep[keepCount++] = atom;
3708                else
3709                    test[testCount++] = atom;
3710            }
3711
3712            /* if there's room, keep the best of what's left */
3713            i = 0;
3714            if( keepCount < maxAtomCount ) {
3715                qsort( test, testCount, sizeof( struct peer_atom * ), compareAtomPtrsByShelfDate );
3716                while( i<testCount && keepCount<maxAtomCount )
3717                    keep[keepCount++] = test[i++];
3718            }
3719
3720            /* free the culled atoms */
3721            while( i<testCount )
3722                tr_free( test[i++] );
3723
3724            /* rebuild Torrent.pool with what's left */
3725            tr_ptrArrayDestruct( &t->pool, NULL );
3726            t->pool = TR_PTR_ARRAY_INIT;
3727            qsort( keep, keepCount, sizeof( struct peer_atom * ), compareAtomPtrsByAddress );
3728            for( i=0; i<keepCount; ++i )
3729                tr_ptrArrayAppend( &t->pool, keep[i] );
3730
3731            tordbg( t, "max atom count is %d... pruned from %d to %d\n", maxAtomCount, atomCount, keepCount );
3732
3733            /* cleanup */
3734            tr_free( test );
3735            tr_free( keep );
3736        }
3737    }
3738
3739    tr_timerAddMsec( mgr->atomTimer, ATOM_PERIOD_MSEC );
3740    managerUnlock( mgr );
3741}
3742
3743/***
3744****
3745****
3746****
3747***/
3748
3749/* is this atom someone that we'd want to initiate a connection to? */
3750static bool
3751isPeerCandidate( const tr_torrent * tor, struct peer_atom * atom, const time_t now )
3752{
3753    /* not if we're both seeds */
3754    if( tr_torrentIsSeed( tor ) && atomIsSeed( atom ) )
3755        return false;
3756
3757    /* not if we've already got a connection to them... */
3758    if( peerIsInUse( tor->torrentPeers, atom ) )
3759        return false;
3760
3761    /* not if we just tried them already */
3762    if( ( now - atom->time ) < getReconnectIntervalSecs( atom, now ) )
3763        return false;
3764
3765    /* not if they're blocklisted */
3766    if( isAtomBlocklisted( tor->session, atom ) )
3767        return false;
3768
3769    /* not if they're banned... */
3770    if( atom->flags2 & MYFLAG_BANNED )
3771        return false;
3772
3773    return true;
3774}
3775
3776struct peer_candidate
3777{
3778    uint64_t score;
3779    tr_torrent * tor;
3780    struct peer_atom * atom;
3781};
3782
3783static bool
3784torrentWasRecentlyStarted( const tr_torrent * tor )
3785{
3786    return difftime( tr_time( ), tor->startDate ) < 120;
3787}
3788
3789static inline uint64_t
3790addValToKey( uint64_t value, int width, uint64_t addme )
3791{
3792    value = (value << (uint64_t)width);
3793    value |= addme;
3794    return value;
3795}
3796
3797/* smaller value is better */
3798static uint64_t
3799getPeerCandidateScore( const tr_torrent * tor, const struct peer_atom * atom, uint8_t salt  )
3800{
3801    uint64_t i;
3802    uint64_t score = 0;
3803    const bool failed = atom->lastConnectionAt < atom->lastConnectionAttemptAt;
3804
3805    /* prefer peers we've connected to, or never tried, over peers we failed to connect to. */
3806    i = failed ? 1 : 0;
3807    score = addValToKey( score, 1, i );
3808
3809    /* prefer the one we attempted least recently (to cycle through all peers) */
3810    i = atom->lastConnectionAttemptAt;
3811    score = addValToKey( score, 32, i );
3812
3813    /* prefer peers belonging to a torrent of a higher priority */
3814    switch( tr_torrentGetPriority( tor ) ) {
3815        case TR_PRI_HIGH:    i = 0; break;
3816        case TR_PRI_NORMAL:  i = 1; break;
3817        case TR_PRI_LOW:     i = 2; break;
3818    }
3819    score = addValToKey( score, 4, i );
3820
3821    /* prefer recently-started torrents */
3822    i = torrentWasRecentlyStarted( tor ) ? 0 : 1;
3823    score = addValToKey( score, 1, i );
3824
3825    /* prefer torrents we're downloading with */
3826    i = tr_torrentIsSeed( tor ) ? 1 : 0;
3827    score = addValToKey( score, 1, i );
3828
3829    /* prefer peers that are known to be connectible */
3830    i = ( atom->flags & ADDED_F_CONNECTABLE ) ? 0 : 1;
3831    score = addValToKey( score, 1, i );
3832
3833    /* prefer peers that we might have a chance of uploading to...
3834       so lower seed probability is better */
3835    if( atom->seedProbability == 100 ) i = 101;
3836    else if( atom->seedProbability == -1 ) i = 100;
3837    else i = atom->seedProbability;
3838    score = addValToKey( score, 8, i );
3839
3840    /* Prefer peers that we got from more trusted sources.
3841     * lower `fromBest' values indicate more trusted sources */
3842    score = addValToKey( score, 4, atom->fromBest );
3843
3844    /* salt */
3845    score = addValToKey( score, 8, salt );
3846
3847    return score;
3848}
3849
3850/* sort an array of peer candidates */
3851static int
3852comparePeerCandidates( const void * va, const void * vb )
3853{
3854    const struct peer_candidate * a = va;
3855    const struct peer_candidate * b = vb;
3856
3857    if( a->score < b->score ) return -1;
3858    if( a->score > b->score ) return 1;
3859
3860    return 0;
3861}
3862
3863/** @return an array of all the atoms we might want to connect to */
3864static struct peer_candidate*
3865getPeerCandidates( tr_session * session, int * candidateCount )
3866{
3867    int atomCount;
3868    int peerCount;
3869    tr_torrent * tor;
3870    struct peer_candidate * candidates;
3871    struct peer_candidate * walk;
3872    const time_t now = tr_time( );
3873    const uint64_t now_msec = tr_time_msec( );
3874    /* leave 5% of connection slots for incoming connections -- ticket #2609 */
3875    const int maxCandidates = tr_sessionGetPeerLimit( session ) * 0.95;
3876
3877    /* count how many peers and atoms we've got */
3878    tor= NULL;
3879    atomCount = 0;
3880    peerCount = 0;
3881    while(( tor = tr_torrentNext( session, tor ))) {
3882        atomCount += tr_ptrArraySize( &tor->torrentPeers->pool );
3883        peerCount += tr_ptrArraySize( &tor->torrentPeers->peers );
3884    }
3885
3886    /* don't start any new handshakes if we're full up */
3887    if( maxCandidates <= peerCount ) {
3888        *candidateCount = 0;
3889        return NULL;
3890    }
3891
3892    /* allocate an array of candidates */
3893    walk = candidates = tr_new( struct peer_candidate, atomCount );
3894
3895    /* populate the candidate array */
3896    tor = NULL;
3897    while(( tor = tr_torrentNext( session, tor )))
3898    {
3899        int i, nAtoms;
3900        struct peer_atom ** atoms;
3901
3902        if( !tor->torrentPeers->isRunning )
3903            continue;
3904
3905        /* if we've already got enough peers in this torrent... */
3906        if( tr_torrentGetPeerLimit( tor ) <= tr_ptrArraySize( &tor->torrentPeers->peers ) )
3907            continue;
3908
3909        /* if we've already got enough speed in this torrent... */
3910        if( tr_torrentIsSeed( tor ) && isBandwidthMaxedOut( &tor->bandwidth, now_msec, TR_UP ) )
3911            continue;
3912
3913        atoms = (struct peer_atom**) tr_ptrArrayPeek( &tor->torrentPeers->pool, &nAtoms );
3914        for( i=0; i<nAtoms; ++i )
3915        {
3916            struct peer_atom * atom = atoms[i];
3917
3918            if( isPeerCandidate( tor, atom, now ) )
3919            {
3920                const uint8_t salt = tr_cryptoWeakRandInt( 1024 );
3921                walk->tor = tor;
3922                walk->atom = atom;
3923                walk->score = getPeerCandidateScore( tor, atom, salt );
3924                ++walk;
3925            }
3926        }
3927    }
3928
3929    *candidateCount = walk - candidates;
3930    if( *candidateCount > 1 )
3931        qsort( candidates, *candidateCount, sizeof( struct peer_candidate ), comparePeerCandidates );
3932    return candidates;
3933}
3934
3935static void
3936initiateConnection( tr_peerMgr * mgr, Torrent * t, struct peer_atom * atom )
3937{
3938    tr_peerIo * io;
3939    const time_t now = tr_time( );
3940    bool utp = tr_sessionIsUTPEnabled(mgr->session) && !atom->utp_failed;
3941
3942    if( atom->fromFirst == TR_PEER_FROM_PEX )
3943        /* PEX has explicit signalling for uTP support.  If an atom
3944           originally came from PEX and doesn't have the uTP flag, skip the
3945           uTP connection attempt.  Are we being optimistic here? */
3946        utp = utp && (atom->flags & ADDED_F_UTP_FLAGS);
3947
3948    tordbg( t, "Starting an OUTGOING%s connection with %s",
3949            utp ? " µTP" : "",
3950            tr_atomAddrStr( atom ) );
3951
3952    io = tr_peerIoNewOutgoing( mgr->session,
3953                               &mgr->session->bandwidth,
3954                               &atom->addr,
3955                               atom->port,
3956                               t->tor->info.hash,
3957                               t->tor->completeness == TR_SEED,
3958                               utp );
3959
3960    if( io == NULL )
3961    {
3962        tordbg( t, "peerIo not created; marking peer %s as unreachable",
3963                tr_atomAddrStr( atom ) );
3964        atom->flags2 |= MYFLAG_UNREACHABLE;
3965        atom->numFails++;
3966    }
3967    else
3968    {
3969        tr_handshake * handshake = tr_handshakeNew( io,
3970                                                    mgr->session->encryptionMode,
3971                                                    myHandshakeDoneCB,
3972                                                    mgr );
3973
3974        assert( tr_peerIoGetTorrentHash( io ) );
3975
3976        tr_peerIoUnref( io ); /* balanced by the initial ref
3977                                 in tr_peerIoNewOutgoing() */
3978
3979        tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
3980                                 handshakeCompare );
3981    }
3982
3983    atom->lastConnectionAttemptAt = now;
3984    atom->time = now;
3985}
3986
3987static void
3988initiateCandidateConnection( tr_peerMgr * mgr, struct peer_candidate * c )
3989{
3990#if 0
3991    fprintf( stderr, "Starting an OUTGOING connection with %s - [%s] seedProbability==%d; %s, %s\n",
3992             tr_atomAddrStr( c->atom ),
3993             tr_torrentName( c->tor ),
3994             (int)c->atom->seedProbability,
3995             tr_torrentIsPrivate( c->tor ) ? "private" : "public",
3996             tr_torrentIsSeed( c->tor ) ? "seed" : "downloader" );
3997#endif
3998
3999    initiateConnection( mgr, c->tor->torrentPeers, c->atom );
4000}
4001
4002static void
4003makeNewPeerConnections( struct tr_peerMgr * mgr, const int max )
4004{
4005    int i, n;
4006    struct peer_candidate * candidates;
4007
4008    candidates = getPeerCandidates( mgr->session, &n );
4009
4010    for( i=0; i<n && i<max; ++i )
4011        initiateCandidateConnection( mgr, &candidates[i] );
4012
4013    tr_free( candidates );
4014}
Note: See TracBrowser for help on using the repository browser.