source: trunk/libtransmission/peer-mgr.c @ 13329

Last change on this file since 13329 was 13329, checked in by jordan, 9 years ago

(trunk libT) fix the Linux build wrt compiling with the new snapshot of libutp checked into r13317

Previously we made sure to include stdbool.h (via transmission.h) before utp.h, since the latter used 'bool' without defining it. The new snapshot defines it unconditionally in non-C++ code, so now we need to include it first.

  • Property svn:keywords set to Date Rev Author Id
File size: 118.3 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 13329 2012-05-30 17:47:29Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h> /* error codes ERANGE, ... */
15#include <limits.h> /* INT_MAX */
16#include <string.h> /* memcpy, memcmp, strstr */
17#include <stdlib.h> /* qsort */
18
19#include <event2/event.h>
20
21#include <libutp/utp.h>
22
23#include "transmission.h"
24#include "announcer.h"
25#include "bandwidth.h"
26#include "blocklist.h"
27#include "cache.h"
28#include "clients.h"
29#include "completion.h"
30#include "crypto.h"
31#include "handshake.h"
32#include "net.h"
33#include "peer-io.h"
34#include "peer-mgr.h"
35#include "peer-msgs.h"
36#include "ptrarray.h"
37#include "session.h"
38#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
39#include "torrent.h"
40#include "tr-utp.h"
41#include "utils.h"
42#include "webseed.h"
43
44enum
45{
46    /* how frequently to cull old atoms */
47    ATOM_PERIOD_MSEC = ( 60 * 1000 ),
48
49    /* how frequently to change which peers are choked */
50    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
51
52    /* an optimistically unchoked peer is immune from rechoking
53       for this many calls to rechokeUploads(). */
54    OPTIMISTIC_UNCHOKE_MULTIPLIER = 4,
55
56    /* how frequently to reallocate bandwidth */
57    BANDWIDTH_PERIOD_MSEC = 500,
58
59    /* how frequently to age out old piece request lists */
60    REFILL_UPKEEP_PERIOD_MSEC = ( 10 * 1000 ),
61
62    /* how frequently to decide which peers live and die */
63    RECONNECT_PERIOD_MSEC = 500,
64
65    /* when many peers are available, keep idle ones this long */
66    MIN_UPLOAD_IDLE_SECS = ( 60 ),
67
68    /* when few peers are available, keep idle ones this long */
69    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
70
71    /* max number of peers to ask for per second overall.
72     * this throttle is to avoid overloading the router */
73    MAX_CONNECTIONS_PER_SECOND = 12,
74
75    MAX_CONNECTIONS_PER_PULSE = (int)(MAX_CONNECTIONS_PER_SECOND * (RECONNECT_PERIOD_MSEC/1000.0)),
76
77    /* number of bad pieces a peer is allowed to send before we ban them */
78    MAX_BAD_PIECES_PER_PEER = 5,
79
80    /* amount of time to keep a list of request pieces lying around
81       before it's considered too old and needs to be rebuilt */
82    PIECE_LIST_SHELF_LIFE_SECS = 60,
83
84    /* use for bitwise operations w/peer_atom.flags2 */
85    MYFLAG_BANNED = 1,
86
87    /* use for bitwise operations w/peer_atom.flags2 */
88    /* unreachable for now... but not banned.
89     * if they try to connect to us it's okay */
90    MYFLAG_UNREACHABLE = 2,
91
92    /* the minimum we'll wait before attempting to reconnect to a peer */
93    MINIMUM_RECONNECT_INTERVAL_SECS = 5,
94
95    /** how long we'll let requests we've made linger before we cancel them */
96    REQUEST_TTL_SECS = 120,
97
98    NO_BLOCKS_CANCEL_HISTORY = 120,
99
100    CANCEL_HISTORY_SEC = 60
101};
102
103const tr_peer_event TR_PEER_EVENT_INIT = { 0, 0, NULL, 0, 0, 0, false, 0 };
104
105/**
106***
107**/
108
109/**
110 * Peer information that should be kept even before we've connected and
111 * after we've disconnected. These are kept in a pool of peer_atoms to decide
112 * which ones would make good candidates for connecting to, and to watch out
113 * for banned peers.
114 *
115 * @see tr_peer
116 * @see tr_peermsgs
117 */
118struct peer_atom
119{
120    uint8_t     fromFirst;          /* where the peer was first found */
121    uint8_t     fromBest;           /* the "best" value of where the peer has been found */
122    uint8_t     flags;              /* these match the added_f flags */
123    uint8_t     flags2;             /* flags that aren't defined in added_f */
124    int8_t      seedProbability;    /* how likely is this to be a seed... [0..100] or -1 for unknown */
125    int8_t      blocklisted;        /* -1 for unknown, true for blocklisted, false for not blocklisted */
126
127    tr_port     port;
128    bool        utp_failed;         /* We recently failed to connect over uTP */
129    uint16_t    numFails;
130    time_t      time;               /* when the peer's connection status last changed */
131    time_t      piece_data_time;
132
133    time_t      lastConnectionAttemptAt;
134    time_t      lastConnectionAt;
135
136    /* similar to a TTL field, but less rigid --
137     * if the swarm is small, the atom will be kept past this date. */
138    time_t      shelf_date;
139    tr_peer   * peer;               /* will be NULL if not connected */
140    tr_address  addr;
141};
142
143#ifdef NDEBUG
144#define tr_isAtom(a) (TRUE)
145#else
146static bool
147tr_isAtom( const struct peer_atom * atom )
148{
149    return ( atom != NULL )
150        && ( atom->fromFirst < TR_PEER_FROM__MAX )
151        && ( atom->fromBest < TR_PEER_FROM__MAX )
152        && ( tr_address_is_valid( &atom->addr ) );
153}
154#endif
155
156static const char*
157tr_atomAddrStr( const struct peer_atom * atom )
158{
159    return atom ? tr_peerIoAddrStr( &atom->addr, atom->port ) : "[no atom]";
160}
161
162struct block_request
163{
164    tr_block_index_t block;
165    tr_peer * peer;
166    time_t sentAt;
167};
168
169struct weighted_piece
170{
171    tr_piece_index_t index;
172    int16_t salt;
173    int16_t requestCount;
174};
175
176enum piece_sort_state
177{
178    PIECES_UNSORTED,
179    PIECES_SORTED_BY_INDEX,
180    PIECES_SORTED_BY_WEIGHT
181};
182
183/** @brief Opaque, per-torrent data structure for peer connection information */
184typedef struct tr_torrent_peers
185{
186    tr_ptrArray                outgoingHandshakes; /* tr_handshake */
187    tr_ptrArray                pool; /* struct peer_atom */
188    tr_ptrArray                peers; /* tr_peer */
189    tr_ptrArray                webseeds; /* tr_webseed */
190
191    tr_torrent               * tor;
192    struct tr_peerMgr        * manager;
193
194    tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
195    int                        optimisticUnchokeTimeScaler;
196
197    bool                       isRunning;
198    bool                       needsCompletenessCheck;
199
200    struct block_request     * requests;
201    int                        requestCount;
202    int                        requestAlloc;
203
204    struct weighted_piece    * pieces;
205    int                        pieceCount;
206    enum piece_sort_state      pieceSortState;
207
208    /* An array of pieceCount items stating how many peers have each piece.
209       This is used to help us for downloading pieces "rarest first."
210       This may be NULL if we don't have metainfo yet, or if we're not
211       downloading and don't care about rarity */
212    uint16_t                 * pieceReplication;
213    size_t                     pieceReplicationSize;
214
215    int                        interestedCount;
216    int                        maxPeers;
217    time_t                     lastCancel;
218
219    /* Before the endgame this should be 0. In endgame, is contains the average
220     * number of pending requests per peer. Only peers which have more pending
221     * requests are considered 'fast' are allowed to request a block that's
222     * already been requested from another (slower?) peer. */
223    int                        endgame;
224}
225Torrent;
226
227struct tr_peerMgr
228{
229    tr_session    * session;
230    tr_ptrArray     incomingHandshakes; /* tr_handshake */
231    struct event  * bandwidthTimer;
232    struct event  * rechokeTimer;
233    struct event  * refillUpkeepTimer;
234    struct event  * atomTimer;
235};
236
237#define tordbg( t, ... ) \
238    do { \
239        if( tr_deepLoggingIsActive( ) ) \
240            tr_deepLog( __FILE__, __LINE__, tr_torrentName( t->tor ), __VA_ARGS__ ); \
241    } while( 0 )
242
243#define dbgmsg( ... ) \
244    do { \
245        if( tr_deepLoggingIsActive( ) ) \
246            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
247    } while( 0 )
248
249/**
250***
251**/
252
253static inline void
254managerLock( const struct tr_peerMgr * manager )
255{
256    tr_sessionLock( manager->session );
257}
258
259static inline void
260managerUnlock( const struct tr_peerMgr * manager )
261{
262    tr_sessionUnlock( manager->session );
263}
264
265static inline void
266torrentLock( Torrent * torrent )
267{
268    managerLock( torrent->manager );
269}
270
271static inline void
272torrentUnlock( Torrent * torrent )
273{
274    managerUnlock( torrent->manager );
275}
276
277static inline int
278torrentIsLocked( const Torrent * t )
279{
280    return tr_sessionIsLocked( t->manager->session );
281}
282
283/**
284***
285**/
286
287static int
288handshakeCompareToAddr( const void * va, const void * vb )
289{
290    const tr_handshake * a = va;
291
292    return tr_address_compare( tr_handshakeGetAddr( a, NULL ), vb );
293}
294
295static int
296handshakeCompare( const void * a, const void * b )
297{
298    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
299}
300
301static inline tr_handshake*
302getExistingHandshake( tr_ptrArray * handshakes, const tr_address * addr )
303{
304    if( tr_ptrArrayEmpty( handshakes ) )
305        return NULL;
306
307    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
308}
309
310static int
311comparePeerAtomToAddress( const void * va, const void * vb )
312{
313    const struct peer_atom * a = va;
314
315    return tr_address_compare( &a->addr, vb );
316}
317
318static int
319compareAtomsByAddress( const void * va, const void * vb )
320{
321    const struct peer_atom * b = vb;
322
323    assert( tr_isAtom( b ) );
324
325    return comparePeerAtomToAddress( va, &b->addr );
326}
327
328/**
329***
330**/
331
332const tr_address *
333tr_peerAddress( const tr_peer * peer )
334{
335    return &peer->atom->addr;
336}
337
338static Torrent*
339getExistingTorrent( tr_peerMgr *    manager,
340                    const uint8_t * hash )
341{
342    tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash );
343
344    return tor == NULL ? NULL : tor->torrentPeers;
345}
346
347static int
348peerCompare( const void * a, const void * b )
349{
350    return tr_address_compare( tr_peerAddress( a ), tr_peerAddress( b ) );
351}
352
353static struct peer_atom*
354getExistingAtom( const Torrent    * t,
355                 const tr_address * addr )
356{
357    Torrent * tt = (Torrent*)t;
358    assert( torrentIsLocked( t ) );
359    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
360}
361
362static bool
363peerIsInUse( const Torrent * ct, const struct peer_atom * atom )
364{
365    Torrent * t = (Torrent*) ct;
366
367    assert( torrentIsLocked ( t ) );
368
369    return ( atom->peer != NULL )
370        || getExistingHandshake( &t->outgoingHandshakes, &atom->addr )
371        || getExistingHandshake( &t->manager->incomingHandshakes, &atom->addr );
372}
373
374void
375tr_peerConstruct( tr_peer * peer )
376{
377    memset( peer, 0, sizeof( tr_peer ) );
378
379    peer->have = TR_BITFIELD_INIT;
380}
381
382static tr_peer*
383peerNew( struct peer_atom * atom )
384{
385    tr_peer * peer = tr_new( tr_peer, 1 );
386    tr_peerConstruct( peer );
387
388    peer->atom = atom;
389    atom->peer = peer;
390
391    return peer;
392}
393
394static tr_peer*
395getPeer( Torrent * torrent, struct peer_atom * atom )
396{
397    tr_peer * peer;
398
399    assert( torrentIsLocked( torrent ) );
400
401    peer = atom->peer;
402
403    if( peer == NULL )
404    {
405        peer = peerNew( atom );
406        tr_bitfieldConstruct( &peer->have, torrent->tor->info.pieceCount );
407        tr_bitfieldConstruct( &peer->blame, torrent->tor->blockCount );
408        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
409    }
410
411    return peer;
412}
413
414static void peerDeclinedAllRequests( Torrent *, const tr_peer * );
415
416void
417tr_peerDestruct( tr_torrent * tor, tr_peer * peer )
418{
419    assert( peer != NULL );
420
421    peerDeclinedAllRequests( tor->torrentPeers, peer );
422
423    if( peer->msgs != NULL )
424        tr_peerMsgsFree( peer->msgs );
425
426    if( peer->io ) {
427        tr_peerIoClear( peer->io );
428        tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
429    }
430
431    tr_bitfieldDestruct( &peer->have );
432    tr_bitfieldDestruct( &peer->blame );
433    tr_free( peer->client );
434
435    if( peer->atom )
436        peer->atom->peer = NULL;
437}
438
439static void
440peerDelete( Torrent * t, tr_peer * peer )
441{
442    tr_peerDestruct( t->tor, peer );
443    tr_free( peer );
444}
445
446static bool
447replicationExists( const Torrent * t )
448{
449    return t->pieceReplication != NULL;
450}
451
452static void
453replicationFree( Torrent * t )
454{
455    tr_free( t->pieceReplication );
456    t->pieceReplication = NULL;
457    t->pieceReplicationSize = 0;
458}
459
460static void
461replicationNew( Torrent * t )
462{
463    tr_piece_index_t piece_i;
464    const tr_piece_index_t piece_count = t->tor->info.pieceCount;
465    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
466    const int peer_count = tr_ptrArraySize( &t->peers );
467
468    assert( !replicationExists( t ) );
469
470    t->pieceReplicationSize = piece_count;
471    t->pieceReplication = tr_new0( uint16_t, piece_count );
472
473    for( piece_i=0; piece_i<piece_count; ++piece_i )
474    {
475        int peer_i;
476        uint16_t r = 0;
477
478        for( peer_i=0; peer_i<peer_count; ++peer_i )
479            if( tr_bitfieldHas( &peers[peer_i]->have, piece_i ) )
480                ++r;
481
482        t->pieceReplication[piece_i] = r;
483    }
484}
485
486static void
487torrentFree( void * vt )
488{
489    Torrent * t = vt;
490
491    assert( t );
492    assert( !t->isRunning );
493    assert( torrentIsLocked( t ) );
494    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
495    assert( tr_ptrArrayEmpty( &t->peers ) );
496
497    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
498    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
499    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
500    tr_ptrArrayDestruct( &t->peers, NULL );
501
502    replicationFree( t );
503
504    tr_free( t->requests );
505    tr_free( t->pieces );
506    tr_free( t );
507}
508
509static void peerCallbackFunc( tr_peer *, const tr_peer_event *, void * );
510
511static Torrent*
512torrentNew( tr_peerMgr * manager, tr_torrent * tor )
513{
514    int       i;
515    Torrent * t;
516
517    t = tr_new0( Torrent, 1 );
518    t->manager = manager;
519    t->tor = tor;
520    t->pool = TR_PTR_ARRAY_INIT;
521    t->peers = TR_PTR_ARRAY_INIT;
522    t->webseeds = TR_PTR_ARRAY_INIT;
523    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
524
525    for( i = 0; i < tor->info.webseedCount; ++i )
526    {
527        tr_webseed * w =
528            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
529        tr_ptrArrayAppend( &t->webseeds, w );
530    }
531
532    return t;
533}
534
535static void ensureMgrTimersExist( struct tr_peerMgr * m );
536
537tr_peerMgr*
538tr_peerMgrNew( tr_session * session )
539{
540    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
541    m->session = session;
542    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
543    ensureMgrTimersExist( m );
544    return m;
545}
546
547static void
548deleteTimer( struct event ** t )
549{
550    if( *t != NULL )
551    {
552        event_free( *t );
553        *t = NULL;
554    }
555}
556
557static void
558deleteTimers( struct tr_peerMgr * m )
559{
560    deleteTimer( &m->atomTimer );
561    deleteTimer( &m->bandwidthTimer );
562    deleteTimer( &m->rechokeTimer );
563    deleteTimer( &m->refillUpkeepTimer );
564}
565
566void
567tr_peerMgrFree( tr_peerMgr * manager )
568{
569    managerLock( manager );
570
571    deleteTimers( manager );
572
573    /* free the handshakes. Abort invokes handshakeDoneCB(), which removes
574     * the item from manager->handshakes, so this is a little roundabout... */
575    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
576        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
577
578    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
579
580    managerUnlock( manager );
581    tr_free( manager );
582}
583
584static int
585clientIsDownloadingFrom( const tr_torrent * tor, const tr_peer * peer )
586{
587    if( !tr_torrentHasMetadata( tor ) )
588        return true;
589
590    return peer->clientIsInterested && !peer->clientIsChoked;
591}
592
593static int
594clientIsUploadingTo( const tr_peer * peer )
595{
596    return peer->peerIsInterested && !peer->peerIsChoked;
597}
598
599/***
600****
601***/
602
603void
604tr_peerMgrOnBlocklistChanged( tr_peerMgr * mgr )
605{
606    tr_torrent * tor = NULL;
607    tr_session * session = mgr->session;
608
609    /* we cache whether or not a peer is blocklisted...
610       since the blocklist has changed, erase that cached value */
611    while(( tor = tr_torrentNext( session, tor )))
612    {
613        int i;
614        Torrent * t = tor->torrentPeers;
615        const int n = tr_ptrArraySize( &t->pool );
616        for( i=0; i<n; ++i ) {
617            struct peer_atom * atom = tr_ptrArrayNth( &t->pool, i );
618            atom->blocklisted = -1;
619        }
620    }
621}
622
623static bool
624isAtomBlocklisted( tr_session * session, struct peer_atom * atom )
625{
626    if( atom->blocklisted < 0 )
627        atom->blocklisted = tr_sessionIsAddressBlocked( session, &atom->addr );
628
629    assert( tr_isBool( atom->blocklisted ) );
630    return atom->blocklisted;
631}
632
633
634/***
635****
636***/
637
638static void
639atomSetSeedProbability( struct peer_atom * atom, int seedProbability )
640{
641    assert( atom != NULL );
642    assert( -1<=seedProbability && seedProbability<=100 );
643
644    atom->seedProbability = seedProbability;
645
646    if( seedProbability == 100 )
647        atom->flags |= ADDED_F_SEED_FLAG;
648    else if( seedProbability != -1 )
649        atom->flags &= ~ADDED_F_SEED_FLAG;
650}
651
652static inline bool
653atomIsSeed( const struct peer_atom * atom )
654{
655    return atom->seedProbability == 100;
656}
657
658static void
659atomSetSeed( const Torrent * t, struct peer_atom * atom )
660{
661    if( !atomIsSeed( atom ) )
662    {
663        tordbg( t, "marking peer %s as a seed", tr_atomAddrStr( atom ) );
664
665        atomSetSeedProbability( atom, 100 );
666    }
667}
668
669
670bool
671tr_peerMgrPeerIsSeed( const tr_torrent  * tor,
672                      const tr_address  * addr )
673{
674    bool isSeed = false;
675    const Torrent * t = tor->torrentPeers;
676    const struct peer_atom * atom = getExistingAtom( t, addr );
677
678    if( atom )
679        isSeed = atomIsSeed( atom );
680
681    return isSeed;
682}
683
684void
685tr_peerMgrSetUtpSupported( tr_torrent * tor, const tr_address * addr )
686{
687    struct peer_atom * atom = getExistingAtom( tor->torrentPeers, addr );
688
689    if( atom )
690        atom->flags |= ADDED_F_UTP_FLAGS;
691}
692
693void
694tr_peerMgrSetUtpFailed( tr_torrent *tor, const tr_address *addr, bool failed )
695{
696    struct peer_atom * atom = getExistingAtom( tor->torrentPeers, addr );
697
698    if( atom )
699        atom->utp_failed = failed;
700}
701
702
703/**
704***  REQUESTS
705***
706*** There are two data structures associated with managing block requests:
707***
708*** 1. Torrent::requests, an array of "struct block_request" which keeps
709***    track of which blocks have been requested, and when, and by which peers.
710***    This is list is used for (a) cancelling requests that have been pending
711***    for too long and (b) avoiding duplicate requests before endgame.
712***
713*** 2. Torrent::pieces, an array of "struct weighted_piece" which lists the
714***    pieces that we want to request. It's used to decide which blocks to
715***    return next when tr_peerMgrGetBlockRequests() is called.
716**/
717
718/**
719*** struct block_request
720**/
721
722static int
723compareReqByBlock( const void * va, const void * vb )
724{
725    const struct block_request * a = va;
726    const struct block_request * b = vb;
727
728    /* primary key: block */
729    if( a->block < b->block ) return -1;
730    if( a->block > b->block ) return 1;
731
732    /* secondary key: peer */
733    if( a->peer < b->peer ) return -1;
734    if( a->peer > b->peer ) return 1;
735
736    return 0;
737}
738
739static void
740requestListAdd( Torrent * t, tr_block_index_t block, tr_peer * peer )
741{
742    struct block_request key;
743
744    /* ensure enough room is available... */
745    if( t->requestCount + 1 >= t->requestAlloc )
746    {
747        const int CHUNK_SIZE = 128;
748        t->requestAlloc += CHUNK_SIZE;
749        t->requests = tr_renew( struct block_request,
750                                t->requests, t->requestAlloc );
751    }
752
753    /* populate the record we're inserting */
754    key.block = block;
755    key.peer = peer;
756    key.sentAt = tr_time( );
757
758    /* insert the request to our array... */
759    {
760        bool exact;
761        const int pos = tr_lowerBound( &key, t->requests, t->requestCount,
762                                       sizeof( struct block_request ),
763                                       compareReqByBlock, &exact );
764        assert( !exact );
765        memmove( t->requests + pos + 1,
766                 t->requests + pos,
767                 sizeof( struct block_request ) * ( t->requestCount++ - pos ) );
768        t->requests[pos] = key;
769    }
770
771    if( peer != NULL )
772    {
773        ++peer->pendingReqsToPeer;
774        assert( peer->pendingReqsToPeer >= 0 );
775    }
776
777    /*fprintf( stderr, "added request of block %lu from peer %s... "
778                       "there are now %d block\n",
779                       (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
780}
781
782static struct block_request *
783requestListLookup( Torrent * t, tr_block_index_t block, const tr_peer * peer )
784{
785    struct block_request key;
786    key.block = block;
787    key.peer = (tr_peer*) peer;
788
789    return bsearch( &key, t->requests, t->requestCount,
790                    sizeof( struct block_request ),
791                    compareReqByBlock );
792}
793
794/**
795 * Find the peers are we currently requesting the block
796 * with index @a block from and append them to @a peerArr.
797 */
798static void
799getBlockRequestPeers( Torrent * t, tr_block_index_t block,
800                      tr_ptrArray * peerArr )
801{
802    bool exact;
803    int i, pos;
804    struct block_request key;
805
806    key.block = block;
807    key.peer = NULL;
808    pos = tr_lowerBound( &key, t->requests, t->requestCount,
809                         sizeof( struct block_request ),
810                         compareReqByBlock, &exact );
811
812    assert( !exact ); /* shouldn't have a request with .peer == NULL */
813
814    for( i = pos; i < t->requestCount; ++i )
815    {
816        if( t->requests[i].block != block )
817            break;
818        tr_ptrArrayAppend( peerArr, t->requests[i].peer );
819    }
820}
821
822static void
823decrementPendingReqCount( const struct block_request * b )
824{
825    if( b->peer != NULL )
826        if( b->peer->pendingReqsToPeer > 0 )
827            --b->peer->pendingReqsToPeer;
828}
829
830static void
831requestListRemove( Torrent * t, tr_block_index_t block, const tr_peer * peer )
832{
833    const struct block_request * b = requestListLookup( t, block, peer );
834    if( b != NULL )
835    {
836        const int pos = b - t->requests;
837        assert( pos < t->requestCount );
838
839        decrementPendingReqCount( b );
840
841        tr_removeElementFromArray( t->requests,
842                                   pos,
843                                   sizeof( struct block_request ),
844                                   t->requestCount-- );
845
846        /*fprintf( stderr, "removing request of block %lu from peer %s... "
847                           "there are now %d block requests left\n",
848                           (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
849    }
850}
851
852static int
853countActiveWebseeds( const Torrent * t )
854{
855    int activeCount = 0;
856    const tr_webseed ** w = (const tr_webseed **) tr_ptrArrayBase( &t->webseeds );
857    const tr_webseed ** const wend = w + tr_ptrArraySize( &t->webseeds );
858
859    for( ; w!=wend; ++w )
860        if( tr_webseedIsActive( *w ) )
861            ++activeCount;
862
863    return activeCount;
864}
865
866static bool
867testForEndgame( const Torrent * t )
868{
869    /* we consider ourselves to be in endgame if the number of bytes
870       we've got requested is >= the number of bytes left to download */
871    return ( t->requestCount * t->tor->blockSize )
872               >= tr_cpLeftUntilDone( &t->tor->completion );
873}
874
875static void
876updateEndgame( Torrent * t )
877{
878    assert( t->requestCount >= 0 );
879
880    if( !testForEndgame( t ) )
881    {
882        /* not in endgame */
883        t->endgame = 0;
884    }
885    else if( !t->endgame ) /* only recalculate when endgame first begins */
886    {
887        int numDownloading = 0;
888        const tr_peer ** p = (const tr_peer **) tr_ptrArrayBase( &t->peers );
889        const tr_peer ** const pend = p + tr_ptrArraySize( &t->peers );
890
891        /* add the active bittorrent peers... */
892        for( ; p!=pend; ++p )
893            if( (*p)->pendingReqsToPeer > 0 )
894                ++numDownloading;
895
896        /* add the active webseeds... */
897        numDownloading += countActiveWebseeds( t );
898
899        /* average number of pending requests per downloading peer */
900        t->endgame = t->requestCount / MAX( numDownloading, 1 );
901    }
902}
903
904
905/****
906*****
907*****  Piece List Manipulation / Accessors
908*****
909****/
910
911static inline void
912invalidatePieceSorting( Torrent * t )
913{
914    t->pieceSortState = PIECES_UNSORTED;
915}
916
917static const tr_torrent * weightTorrent;
918
919static const uint16_t * weightReplication;
920
921static void
922setComparePieceByWeightTorrent( Torrent * t )
923{
924    if( !replicationExists( t ) )
925        replicationNew( t );
926
927    weightTorrent = t->tor;
928    weightReplication = t->pieceReplication;
929}
930
931/* we try to create a "weight" s.t. high-priority pieces come before others,
932 * and that partially-complete pieces come before empty ones. */
933static int
934comparePieceByWeight( const void * va, const void * vb )
935{
936    const struct weighted_piece * a = va;
937    const struct weighted_piece * b = vb;
938    int ia, ib, missing, pending;
939    const tr_torrent * tor = weightTorrent;
940    const uint16_t * rep = weightReplication;
941
942    /* primary key: weight */
943    missing = tr_cpMissingBlocksInPiece( &tor->completion, a->index );
944    pending = a->requestCount;
945    ia = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
946    missing = tr_cpMissingBlocksInPiece( &tor->completion, b->index );
947    pending = b->requestCount;
948    ib = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
949    if( ia < ib ) return -1;
950    if( ia > ib ) return 1;
951
952    /* secondary key: higher priorities go first */
953    ia = tor->info.pieces[a->index].priority;
954    ib = tor->info.pieces[b->index].priority;
955    if( ia > ib ) return -1;
956    if( ia < ib ) return 1;
957
958    /* tertiary key: rarest first. */
959    ia = rep[a->index];
960    ib = rep[b->index];
961    if( ia < ib ) return -1;
962    if( ia > ib ) return 1;
963
964    /* quaternary key: random */
965    if( a->salt < b->salt ) return -1;
966    if( a->salt > b->salt ) return 1;
967
968    /* okay, they're equal */
969    return 0;
970}
971
972static int
973comparePieceByIndex( const void * va, const void * vb )
974{
975    const struct weighted_piece * a = va;
976    const struct weighted_piece * b = vb;
977    if( a->index < b->index ) return -1;
978    if( a->index > b->index ) return 1;
979    return 0;
980}
981
982static void
983pieceListSort( Torrent * t, enum piece_sort_state state )
984{
985    assert( state==PIECES_SORTED_BY_INDEX
986         || state==PIECES_SORTED_BY_WEIGHT );
987
988
989    if( state == PIECES_SORTED_BY_WEIGHT )
990    {
991        setComparePieceByWeightTorrent( t );
992        qsort( t->pieces, t->pieceCount, sizeof( struct weighted_piece ), comparePieceByWeight );
993    }
994    else
995        qsort( t->pieces, t->pieceCount, sizeof( struct weighted_piece ), comparePieceByIndex );
996
997    t->pieceSortState = state;
998}
999
1000/**
1001 * These functions are useful for testing, but too expensive for nightly builds.
1002 * let's leave it disabled but add an easy hook to compile it back in
1003 */
1004#if 1
1005#define assertWeightedPiecesAreSorted(t)
1006#define assertReplicationCountIsExact(t)
1007#else
1008static void
1009assertWeightedPiecesAreSorted( Torrent * t )
1010{
1011    if( !t->endgame )
1012    {
1013        int i;
1014        setComparePieceByWeightTorrent( t );
1015        for( i=0; i<t->pieceCount-1; ++i )
1016            assert( comparePieceByWeight( &t->pieces[i], &t->pieces[i+1] ) <= 0 );
1017    }
1018}
1019static void
1020assertReplicationCountIsExact( Torrent * t )
1021{
1022    /* This assert might fail due to errors of implementations in other
1023     * clients. It happens when receiving duplicate bitfields/HaveAll/HaveNone
1024     * from a client. If a such a behavior is noticed,
1025     * a bug report should be filled to the faulty client. */
1026
1027    size_t piece_i;
1028    const uint16_t * rep = t->pieceReplication;
1029    const size_t piece_count = t->pieceReplicationSize;
1030    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1031    const int peer_count = tr_ptrArraySize( &t->peers );
1032
1033    assert( piece_count == t->tor->info.pieceCount );
1034
1035    for( piece_i=0; piece_i<piece_count; ++piece_i )
1036    {
1037        int peer_i;
1038        uint16_t r = 0;
1039
1040        for( peer_i=0; peer_i<peer_count; ++peer_i )
1041            if( tr_bitsetHas( &peers[peer_i]->have, piece_i ) )
1042                ++r;
1043
1044        assert( rep[piece_i] == r );
1045    }
1046}
1047#endif
1048
1049static struct weighted_piece *
1050pieceListLookup( Torrent * t, tr_piece_index_t index )
1051{
1052    int i;
1053
1054    for( i=0; i<t->pieceCount; ++i )
1055        if( t->pieces[i].index == index )
1056            return &t->pieces[i];
1057
1058    return NULL;
1059}
1060
1061static void
1062pieceListRebuild( Torrent * t )
1063{
1064
1065    if( !tr_torrentIsSeed( t->tor ) )
1066    {
1067        tr_piece_index_t i;
1068        tr_piece_index_t * pool;
1069        tr_piece_index_t poolCount = 0;
1070        const tr_torrent * tor = t->tor;
1071        const tr_info * inf = tr_torrentInfo( tor );
1072        struct weighted_piece * pieces;
1073        int pieceCount;
1074
1075        /* build the new list */
1076        pool = tr_new( tr_piece_index_t, inf->pieceCount );
1077        for( i=0; i<inf->pieceCount; ++i )
1078            if( !inf->pieces[i].dnd )
1079                if( !tr_cpPieceIsComplete( &tor->completion, i ) )
1080                    pool[poolCount++] = i;
1081        pieceCount = poolCount;
1082        pieces = tr_new0( struct weighted_piece, pieceCount );
1083        for( i=0; i<poolCount; ++i ) {
1084            struct weighted_piece * piece = pieces + i;
1085            piece->index = pool[i];
1086            piece->requestCount = 0;
1087            piece->salt = tr_cryptoWeakRandInt( 4096 );
1088        }
1089
1090        /* if we already had a list of pieces, merge it into
1091         * the new list so we don't lose its requestCounts */
1092        if( t->pieces != NULL )
1093        {
1094            struct weighted_piece * o = t->pieces;
1095            struct weighted_piece * oend = o + t->pieceCount;
1096            struct weighted_piece * n = pieces;
1097            struct weighted_piece * nend = n + pieceCount;
1098
1099            pieceListSort( t, PIECES_SORTED_BY_INDEX );
1100
1101            while( o!=oend && n!=nend ) {
1102                if( o->index < n->index )
1103                    ++o;
1104                else if( o->index > n->index )
1105                    ++n;
1106                else
1107                    *n++ = *o++;
1108            }
1109
1110            tr_free( t->pieces );
1111        }
1112
1113        t->pieces = pieces;
1114        t->pieceCount = pieceCount;
1115
1116        pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
1117
1118        /* cleanup */
1119        tr_free( pool );
1120    }
1121}
1122
1123static void
1124pieceListRemovePiece( Torrent * t, tr_piece_index_t piece )
1125{
1126    struct weighted_piece * p;
1127
1128    if(( p = pieceListLookup( t, piece )))
1129    {
1130        const int pos = p - t->pieces;
1131
1132        tr_removeElementFromArray( t->pieces,
1133                                   pos,
1134                                   sizeof( struct weighted_piece ),
1135                                   t->pieceCount-- );
1136
1137        if( t->pieceCount == 0 )
1138        {
1139            tr_free( t->pieces );
1140            t->pieces = NULL;
1141        }
1142    }
1143}
1144
1145static void
1146pieceListResortPiece( Torrent * t, struct weighted_piece * p )
1147{
1148    int pos;
1149    bool isSorted = true;
1150
1151    if( p == NULL )
1152        return;
1153
1154    /* is the torrent already sorted? */
1155    pos = p - t->pieces;
1156    setComparePieceByWeightTorrent( t );
1157    if( isSorted && ( pos > 0 ) && ( comparePieceByWeight( p-1, p ) > 0 ) )
1158        isSorted = false;
1159    if( isSorted && ( pos < t->pieceCount - 1 ) && ( comparePieceByWeight( p, p+1 ) > 0 ) )
1160        isSorted = false;
1161
1162    if( t->pieceSortState != PIECES_SORTED_BY_WEIGHT )
1163    {
1164       pieceListSort( t, PIECES_SORTED_BY_WEIGHT);
1165       isSorted = true;
1166    }
1167
1168    /* if it's not sorted, move it around */
1169    if( !isSorted )
1170    {
1171        bool exact;
1172        const struct weighted_piece tmp = *p;
1173
1174        tr_removeElementFromArray( t->pieces,
1175                                   pos,
1176                                   sizeof( struct weighted_piece ),
1177                                   t->pieceCount-- );
1178
1179        pos = tr_lowerBound( &tmp, t->pieces, t->pieceCount,
1180                             sizeof( struct weighted_piece ),
1181                             comparePieceByWeight, &exact );
1182
1183        memmove( &t->pieces[pos + 1],
1184                 &t->pieces[pos],
1185                 sizeof( struct weighted_piece ) * ( t->pieceCount++ - pos ) );
1186
1187        t->pieces[pos] = tmp;
1188    }
1189
1190    assertWeightedPiecesAreSorted( t );
1191}
1192
1193static void
1194pieceListRemoveRequest( Torrent * t, tr_block_index_t block )
1195{
1196    struct weighted_piece * p;
1197    const tr_piece_index_t index = tr_torBlockPiece( t->tor, block );
1198
1199    if( ((p = pieceListLookup( t, index ))) && ( p->requestCount > 0 ) )
1200    {
1201        --p->requestCount;
1202        pieceListResortPiece( t, p );
1203    }
1204}
1205
1206
1207/****
1208*****
1209*****  Replication count ( for rarest first policy )
1210*****
1211****/
1212
1213/**
1214 * Increase the replication count of this piece and sort it if the
1215 * piece list is already sorted
1216 */
1217static void
1218tr_incrReplicationOfPiece( Torrent * t, const size_t index )
1219{
1220    assert( replicationExists( t ) );
1221    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1222
1223    /* One more replication of this piece is present in the swarm */
1224    ++t->pieceReplication[index];
1225
1226    /* we only resort the piece if the list is already sorted */
1227    if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1228        pieceListResortPiece( t, pieceListLookup( t, index ) );
1229}
1230
1231/**
1232 * Increases the replication count of pieces present in the bitfield
1233 */
1234static void
1235tr_incrReplicationFromBitfield( Torrent * t, const tr_bitfield * b )
1236{
1237    size_t i;
1238    uint16_t * rep = t->pieceReplication;
1239    const size_t n = t->tor->info.pieceCount;
1240
1241    assert( replicationExists( t ) );
1242
1243    for( i=0; i<n; ++i )
1244        if( tr_bitfieldHas( b, i ) )
1245            ++rep[i];
1246
1247    if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1248        invalidatePieceSorting( t );
1249}
1250
1251/**
1252 * Increase the replication count of every piece
1253 */
1254static void
1255tr_incrReplication( Torrent * t )
1256{
1257    int i;
1258    const int n = t->pieceReplicationSize;
1259
1260    assert( replicationExists( t ) );
1261    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1262
1263    for( i=0; i<n; ++i )
1264        ++t->pieceReplication[i];
1265}
1266
1267/**
1268 * Decrease the replication count of pieces present in the bitset.
1269 */
1270static void
1271tr_decrReplicationFromBitfield( Torrent * t, const tr_bitfield * b )
1272{
1273    int i;
1274    const int n = t->pieceReplicationSize;
1275
1276    assert( replicationExists( t ) );
1277    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1278
1279    if( tr_bitfieldHasAll( b ) )
1280    {
1281        for( i=0; i<n; ++i )
1282            --t->pieceReplication[i];
1283    }
1284    else if ( !tr_bitfieldHasNone( b ) )
1285    {
1286        for( i=0; i<n; ++i )
1287            if( tr_bitfieldHas( b, i ) )
1288                --t->pieceReplication[i];
1289
1290        if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1291            invalidatePieceSorting( t );
1292    }
1293}
1294
1295/**
1296***
1297**/
1298
1299void
1300tr_peerMgrRebuildRequests( tr_torrent * tor )
1301{
1302    assert( tr_isTorrent( tor ) );
1303
1304    pieceListRebuild( tor->torrentPeers );
1305}
1306
1307void
1308tr_peerMgrGetNextRequests( tr_torrent           * tor,
1309                           tr_peer              * peer,
1310                           int                    numwant,
1311                           tr_block_index_t     * setme,
1312                           int                  * numgot,
1313                           bool                   get_intervals )
1314{
1315    int i;
1316    int got;
1317    Torrent * t;
1318    struct weighted_piece * pieces;
1319    const tr_bitfield * const have = &peer->have;
1320
1321    /* sanity clause */
1322    assert( tr_isTorrent( tor ) );
1323    assert( peer->clientIsInterested );
1324    assert( !peer->clientIsChoked );
1325    assert( numwant > 0 );
1326
1327    /* walk through the pieces and find blocks that should be requested */
1328    got = 0;
1329    t = tor->torrentPeers;
1330
1331    /* prep the pieces list */
1332    if( t->pieces == NULL )
1333        pieceListRebuild( t );
1334
1335    if( t->pieceSortState != PIECES_SORTED_BY_WEIGHT )
1336        pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
1337
1338    assertReplicationCountIsExact( t );
1339    assertWeightedPiecesAreSorted( t );
1340
1341    updateEndgame( t );
1342    pieces = t->pieces;
1343    for( i=0; i<t->pieceCount && got<numwant; ++i )
1344    {
1345        struct weighted_piece * p = pieces + i;
1346
1347        /* if the peer has this piece that we want... */
1348        if( tr_bitfieldHas( have, p->index ) )
1349        {
1350            tr_block_index_t b;
1351            tr_block_index_t first;
1352            tr_block_index_t last;
1353            tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1354
1355            tr_torGetPieceBlockRange( tor, p->index, &first, &last );
1356
1357            for( b=first; b<=last && (got<numwant || (get_intervals && setme[2*got-1] == b-1)); ++b )
1358            {
1359                int peerCount;
1360                tr_peer ** peers;
1361
1362                /* don't request blocks we've already got */
1363                if( tr_cpBlockIsComplete( &tor->completion, b ) )
1364                    continue;
1365
1366                /* always add peer if this block has no peers yet */
1367                tr_ptrArrayClear( &peerArr );
1368                getBlockRequestPeers( t, b, &peerArr );
1369                peers = (tr_peer **) tr_ptrArrayPeek( &peerArr, &peerCount );
1370                if( peerCount != 0 )
1371                {
1372                    /* don't make a second block request until the endgame */
1373                    if( !t->endgame )
1374                        continue;
1375
1376                    /* don't have more than two peers requesting this block */
1377                    if( peerCount > 1 )
1378                        continue;
1379
1380                    /* don't send the same request to the same peer twice */
1381                    if( peer == peers[0] )
1382                        continue;
1383
1384                    /* in the endgame allow an additional peer to download a
1385                       block but only if the peer seems to be handling requests
1386                       relatively fast */
1387                    if( peer->pendingReqsToPeer + numwant - got < t->endgame )
1388                        continue;
1389                }
1390
1391                /* update the caller's table */
1392                if( !get_intervals ) {
1393                    setme[got++] = b;
1394                }
1395                /* if intervals are requested two array entries are necessarry:
1396                   one for the interval's starting block and one for its end block */
1397                else if( got && setme[2 * got - 1] == b - 1 && b != first ) {
1398                    /* expand the last interval */
1399                    ++setme[2 * got - 1];
1400                }
1401                else {
1402                    /* begin a new interval */
1403                    setme[2 * got] = setme[2 * got + 1] = b;
1404                    ++got;
1405                }
1406
1407                /* update our own tables */
1408                requestListAdd( t, b, peer );
1409                ++p->requestCount;
1410            }
1411
1412            tr_ptrArrayDestruct( &peerArr, NULL );
1413        }
1414    }
1415
1416    /* In most cases we've just changed the weights of a small number of pieces.
1417     * So rather than qsort()ing the entire array, it's faster to apply an
1418     * adaptive insertion sort algorithm. */
1419    if( got > 0 )
1420    {
1421        /* not enough requests || last piece modified */
1422        if ( i == t->pieceCount ) --i;
1423
1424        setComparePieceByWeightTorrent( t );
1425        while( --i >= 0 )
1426        {
1427            bool exact;
1428
1429            /* relative position! */
1430            const int newpos = tr_lowerBound( &t->pieces[i], &t->pieces[i + 1],
1431                                              t->pieceCount - (i + 1),
1432                                              sizeof( struct weighted_piece ),
1433                                              comparePieceByWeight, &exact );
1434            if( newpos > 0 )
1435            {
1436                const struct weighted_piece piece = t->pieces[i];
1437                memmove( &t->pieces[i],
1438                         &t->pieces[i + 1],
1439                         sizeof( struct weighted_piece ) * ( newpos ) );
1440                t->pieces[i + newpos] = piece;
1441            }
1442        }
1443    }
1444
1445    assertWeightedPiecesAreSorted( t );
1446    *numgot = got;
1447}
1448
1449bool
1450tr_peerMgrDidPeerRequest( const tr_torrent  * tor,
1451                          const tr_peer     * peer,
1452                          tr_block_index_t    block )
1453{
1454    const Torrent * t = tor->torrentPeers;
1455    return requestListLookup( (Torrent*)t, block, peer ) != NULL;
1456}
1457
1458/* cancel requests that are too old */
1459static void
1460refillUpkeep( int foo UNUSED, short bar UNUSED, void * vmgr )
1461{
1462    time_t now;
1463    time_t too_old;
1464    tr_torrent * tor;
1465    int cancel_buflen = 0;
1466    struct block_request * cancel = NULL;
1467    tr_peerMgr * mgr = vmgr;
1468    managerLock( mgr );
1469
1470    now = tr_time( );
1471    too_old = now - REQUEST_TTL_SECS;
1472
1473    /* alloc the temporary "cancel" buffer */
1474    tor = NULL;
1475    while(( tor = tr_torrentNext( mgr->session, tor )))
1476        cancel_buflen = MAX( cancel_buflen, tor->torrentPeers->requestCount );
1477    if( cancel_buflen > 0 )
1478        cancel = tr_new( struct block_request, cancel_buflen );
1479
1480    /* prune requests that are too old */
1481    tor = NULL;
1482    while(( tor = tr_torrentNext( mgr->session, tor )))
1483    {
1484        Torrent * t = tor->torrentPeers;
1485        const int n = t->requestCount;
1486        if( n > 0 )
1487        {
1488            int keepCount = 0;
1489            int cancelCount = 0;
1490            const struct block_request * it;
1491            const struct block_request * end;
1492
1493            for( it=t->requests, end=it+n; it!=end; ++it )
1494            {
1495                if( ( it->sentAt <= too_old ) && it->peer->msgs && !tr_peerMsgsIsReadingBlock( it->peer->msgs, it->block ) )
1496                    cancel[cancelCount++] = *it;
1497                else
1498                {
1499                    if( it != &t->requests[keepCount] )
1500                        t->requests[keepCount] = *it;
1501                    keepCount++;
1502                }
1503            }
1504
1505            /* prune out the ones we aren't keeping */
1506            t->requestCount = keepCount;
1507
1508            /* send cancel messages for all the "cancel" ones */
1509            for( it=cancel, end=it+cancelCount; it!=end; ++it ) {
1510                if( ( it->peer != NULL ) && ( it->peer->msgs != NULL ) ) {
1511                    tr_historyAdd( &it->peer->cancelsSentToPeer, now, 1 );
1512                    tr_peerMsgsCancel( it->peer->msgs, it->block );
1513                    decrementPendingReqCount( it );
1514                }
1515            }
1516
1517            /* decrement the pending request counts for the timed-out blocks */
1518            for( it=cancel, end=it+cancelCount; it!=end; ++it )
1519                pieceListRemoveRequest( t, it->block );
1520        }
1521    }
1522
1523    tr_free( cancel );
1524    tr_timerAddMsec( mgr->refillUpkeepTimer, REFILL_UPKEEP_PERIOD_MSEC );
1525    managerUnlock( mgr );
1526}
1527
1528static void
1529addStrike( Torrent * t, tr_peer * peer )
1530{
1531    tordbg( t, "increasing peer %s strike count to %d",
1532            tr_atomAddrStr( peer->atom ), peer->strikes + 1 );
1533
1534    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
1535    {
1536        struct peer_atom * atom = peer->atom;
1537        atom->flags2 |= MYFLAG_BANNED;
1538        peer->doPurge = 1;
1539        tordbg( t, "banning peer %s", tr_atomAddrStr( atom ) );
1540    }
1541}
1542
1543static void
1544gotBadPiece( Torrent * t, tr_piece_index_t pieceIndex )
1545{
1546    tr_torrent *   tor = t->tor;
1547    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
1548
1549    tor->corruptCur += byteCount;
1550    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1551
1552    tr_announcerAddBytes( tor, TR_ANN_CORRUPT, byteCount );
1553}
1554
1555static void
1556peerSuggestedPiece( Torrent            * t UNUSED,
1557                    tr_peer            * peer UNUSED,
1558                    tr_piece_index_t     pieceIndex UNUSED,
1559                    int                  isFastAllowed UNUSED )
1560{
1561#if 0
1562    assert( t );
1563    assert( peer );
1564    assert( peer->msgs );
1565
1566    /* is this a valid piece? */
1567    if(  pieceIndex >= t->tor->info.pieceCount )
1568        return;
1569
1570    /* don't ask for it if we've already got it */
1571    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
1572        return;
1573
1574    /* don't ask for it if they don't have it */
1575    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
1576        return;
1577
1578    /* don't ask for it if we're choked and it's not fast */
1579    if( !isFastAllowed && peer->clientIsChoked )
1580        return;
1581
1582    /* request the blocks that we don't have in this piece */
1583    {
1584        tr_block_index_t b;
1585        tr_block_index_t first;
1586        tr_block_index_t last;
1587        const tr_torrent * tor = t->tor;
1588
1589        tr_torGetPieceBlockRange( t->tor, pieceIndex, &first, &last );
1590
1591        for( b=first; b<=last; ++b )
1592        {
1593            if( !tr_cpBlockIsComplete( tor->completion, b ) )
1594            {
1595                const uint32_t offset = getBlockOffsetInPiece( tor, b );
1596                const uint32_t length = tr_torBlockCountBytes( tor, b );
1597                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
1598                incrementPieceRequests( t, pieceIndex );
1599            }
1600        }
1601    }
1602#endif
1603}
1604
1605static void
1606removeRequestFromTables( Torrent * t, tr_block_index_t block, const tr_peer * peer )
1607{
1608    requestListRemove( t, block, peer );
1609    pieceListRemoveRequest( t, block );
1610}
1611
1612/* peer choked us, or maybe it disconnected.
1613   either way we need to remove all its requests */
1614static void
1615peerDeclinedAllRequests( Torrent * t, const tr_peer * peer )
1616{
1617    int i, n;
1618    tr_block_index_t * blocks = tr_new( tr_block_index_t, t->requestCount );
1619
1620    for( i=n=0; i<t->requestCount; ++i )
1621        if( peer == t->requests[i].peer )
1622            blocks[n++] = t->requests[i].block;
1623
1624    for( i=0; i<n; ++i )
1625        removeRequestFromTables( t, blocks[i], peer );
1626
1627    tr_free( blocks );
1628}
1629
1630static void tr_peerMgrSetBlame( tr_torrent *, tr_piece_index_t, int );
1631
1632static void
1633peerCallbackFunc( tr_peer * peer, const tr_peer_event * e, void * vt )
1634{
1635    Torrent * t = vt;
1636
1637    torrentLock( t );
1638
1639    assert( peer != NULL );
1640
1641    switch( e->eventType )
1642    {
1643        case TR_PEER_PEER_GOT_DATA:
1644        {
1645            const time_t now = tr_time( );
1646            tr_torrent * tor = t->tor;
1647
1648            if( e->wasPieceData )
1649            {
1650                tor->uploadedCur += e->length;
1651                tr_announcerAddBytes( tor, TR_ANN_UP, e->length );
1652                tr_torrentSetActivityDate( tor, now );
1653                tr_torrentSetDirty( tor );
1654            }
1655
1656            /* update the stats */
1657            if( e->wasPieceData )
1658                tr_statsAddUploaded( tor->session, e->length );
1659
1660            /* update our atom */
1661            if( peer->atom && e->wasPieceData )
1662                peer->atom->piece_data_time = now;
1663
1664            break;
1665        }
1666
1667        case TR_PEER_CLIENT_GOT_HAVE:
1668            if( replicationExists( t ) ) {
1669                tr_incrReplicationOfPiece( t, e->pieceIndex );
1670                assertReplicationCountIsExact( t );
1671            }
1672            break;
1673
1674        case TR_PEER_CLIENT_GOT_HAVE_ALL:
1675            if( replicationExists( t ) ) {
1676                tr_incrReplication( t );
1677                assertReplicationCountIsExact( t );
1678            }
1679            break;
1680
1681        case TR_PEER_CLIENT_GOT_HAVE_NONE:
1682            /* noop */
1683            break;
1684
1685        case TR_PEER_CLIENT_GOT_BITFIELD:
1686            assert( e->bitfield != NULL );
1687            if( replicationExists( t ) ) {
1688                tr_incrReplicationFromBitfield( t, e->bitfield );
1689                assertReplicationCountIsExact( t );
1690            }
1691            break;
1692
1693        case TR_PEER_CLIENT_GOT_REJ: {
1694            tr_block_index_t b = _tr_block( t->tor, e->pieceIndex, e->offset );
1695            if( b < t->tor->blockCount )
1696                removeRequestFromTables( t, b, peer );
1697            else
1698                tordbg( t, "Peer %s sent an out-of-range reject message",
1699                           tr_atomAddrStr( peer->atom ) );
1700            break;
1701        }
1702
1703        case TR_PEER_CLIENT_GOT_CHOKE:
1704            peerDeclinedAllRequests( t, peer );
1705            break;
1706
1707        case TR_PEER_CLIENT_GOT_PORT:
1708            if( peer->atom )
1709                peer->atom->port = e->port;
1710            break;
1711
1712        case TR_PEER_CLIENT_GOT_SUGGEST:
1713            peerSuggestedPiece( t, peer, e->pieceIndex, false );
1714            break;
1715
1716        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1717            peerSuggestedPiece( t, peer, e->pieceIndex, true );
1718            break;
1719
1720        case TR_PEER_CLIENT_GOT_DATA:
1721        {
1722            const time_t now = tr_time( );
1723            tr_torrent * tor = t->tor;
1724
1725            if( e->wasPieceData )
1726            {
1727                tor->downloadedCur += e->length;
1728                tr_torrentSetActivityDate( tor, now );
1729                tr_torrentSetDirty( tor );
1730            }
1731
1732            /* update the stats */
1733            if( e->wasPieceData )
1734                tr_statsAddDownloaded( tor->session, e->length );
1735
1736            /* update our atom */
1737            if( peer->atom && e->wasPieceData )
1738                peer->atom->piece_data_time = now;
1739
1740            break;
1741        }
1742
1743        case TR_PEER_CLIENT_GOT_BLOCK:
1744        {
1745            tr_torrent * tor = t->tor;
1746            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1747            int i, peerCount;
1748            tr_peer ** peers;
1749            tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1750
1751            removeRequestFromTables( t, block, peer );
1752            getBlockRequestPeers( t, block, &peerArr );
1753            peers = (tr_peer **) tr_ptrArrayPeek( &peerArr, &peerCount );
1754
1755            /* remove additional block requests and send cancel to peers */
1756            for( i=0; i<peerCount; i++ ) {
1757                tr_peer * p = peers[i];
1758                assert( p != peer );
1759                if( p->msgs ) {
1760                    tr_historyAdd( &p->cancelsSentToPeer, tr_time( ), 1 );
1761                    tr_peerMsgsCancel( p->msgs, block );
1762                }
1763                removeRequestFromTables( t, block, p );
1764            }
1765
1766            tr_ptrArrayDestruct( &peerArr, false );
1767
1768            tr_historyAdd( &peer->blocksSentToClient, tr_time( ), 1 );
1769
1770            if( tr_cpBlockIsComplete( &tor->completion, block ) )
1771            {
1772                /* we already have this block... */
1773                const uint32_t n = tr_torBlockCountBytes( tor, block );
1774                tor->downloadedCur -= MIN( tor->downloadedCur, n );
1775                tordbg( t, "we have this block already..." );
1776            }
1777            else
1778            {
1779                tr_cpBlockAdd( &tor->completion, block );
1780                pieceListResortPiece( t, pieceListLookup( t, e->pieceIndex ) );
1781                tr_torrentSetDirty( tor );
1782
1783                if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1784                {
1785                    const tr_piece_index_t p = e->pieceIndex;
1786                    const bool ok = tr_torrentCheckPiece( tor, p );
1787
1788                    tordbg( t, "[LAZY] checked just-completed piece %zu", (size_t)p );
1789
1790                    if( !ok )
1791                    {
1792                        tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1793                                   (unsigned long)p );
1794                    }
1795
1796                    tr_peerMgrSetBlame( tor, p, ok );
1797
1798                    if( !ok )
1799                    {
1800                        gotBadPiece( t, p );
1801                    }
1802                    else
1803                    {
1804                        int i;
1805                        int peerCount;
1806                        tr_peer ** peers;
1807                        tr_file_index_t fileIndex;
1808
1809                        /* only add this to downloadedCur if we got it from a peer --
1810                         * webseeds shouldn't count against our ratio. As one tracker
1811                         * admin put it, "Those pieces are downloaded directly from the
1812                         * content distributor, not the peers, it is the tracker's job
1813                         * to manage the swarms, not the web server and does not fit
1814                         * into the jurisdiction of the tracker." */
1815                        if( peer->msgs != NULL ) {
1816                            const uint32_t n = tr_torPieceCountBytes( tor, p );
1817                            tr_announcerAddBytes( tor, TR_ANN_DOWN, n );
1818                        }
1819
1820                        peerCount = tr_ptrArraySize( &t->peers );
1821                        peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1822                        for( i=0; i<peerCount; ++i )
1823                            tr_peerMsgsHave( peers[i]->msgs, p );
1824
1825                        for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex ) {
1826                            const tr_file * file = &tor->info.files[fileIndex];
1827                            if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) ) {
1828                                if( tr_cpFileIsComplete( &tor->completion, fileIndex ) ) {
1829                                    tr_cacheFlushFile( tor->session->cache, tor, fileIndex );
1830                                    tr_torrentFileCompleted( tor, fileIndex );
1831                                }
1832                            }
1833                        }
1834
1835                        pieceListRemovePiece( t, p );
1836                    }
1837                }
1838
1839                t->needsCompletenessCheck = true;
1840            }
1841            break;
1842        }
1843
1844        case TR_PEER_ERROR:
1845            if( ( e->err == ERANGE ) || ( e->err == EMSGSIZE ) || ( e->err == ENOTCONN ) )
1846            {
1847                /* some protocol error from the peer */
1848                peer->doPurge = 1;
1849                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1850                        tr_atomAddrStr( peer->atom ) );
1851            }
1852            else
1853            {
1854                tordbg( t, "unhandled error: %s", tr_strerror( e->err ) );
1855            }
1856            break;
1857
1858        default:
1859            assert( 0 );
1860    }
1861
1862    torrentUnlock( t );
1863}
1864
1865static int
1866getDefaultShelfLife( uint8_t from )
1867{
1868    /* in general, peers obtained from firsthand contact
1869     * are better than those from secondhand, etc etc */
1870    switch( from )
1871    {
1872        case TR_PEER_FROM_INCOMING : return 60 * 60 * 6;
1873        case TR_PEER_FROM_LTEP     : return 60 * 60 * 6;
1874        case TR_PEER_FROM_TRACKER  : return 60 * 60 * 3;
1875        case TR_PEER_FROM_DHT      : return 60 * 60 * 3;
1876        case TR_PEER_FROM_PEX      : return 60 * 60 * 2;
1877        case TR_PEER_FROM_RESUME   : return 60 * 60;
1878        case TR_PEER_FROM_LPD      : return 10 * 60;
1879        default                    : return 60 * 60;
1880    }
1881}
1882
1883static void
1884ensureAtomExists( Torrent           * t,
1885                  const tr_address  * addr,
1886                  const tr_port       port,
1887                  const uint8_t       flags,
1888                  const int8_t        seedProbability,
1889                  const uint8_t       from )
1890{
1891    struct peer_atom * a;
1892
1893    assert( tr_address_is_valid( addr ) );
1894    assert( from < TR_PEER_FROM__MAX );
1895
1896    a = getExistingAtom( t, addr );
1897
1898    if( a == NULL )
1899    {
1900        const int jitter = tr_cryptoWeakRandInt( 60*10 );
1901        a = tr_new0( struct peer_atom, 1 );
1902        a->addr = *addr;
1903        a->port = port;
1904        a->flags = flags;
1905        a->fromFirst = from;
1906        a->fromBest = from;
1907        a->shelf_date = tr_time( ) + getDefaultShelfLife( from ) + jitter;
1908        a->blocklisted = -1;
1909        atomSetSeedProbability( a, seedProbability );
1910        tr_ptrArrayInsertSorted( &t->pool, a, compareAtomsByAddress );
1911
1912        tordbg( t, "got a new atom: %s", tr_atomAddrStr( a ) );
1913    }
1914    else
1915    {
1916        if( from < a->fromBest )
1917            a->fromBest = from;
1918
1919        if( a->seedProbability == -1 )
1920            atomSetSeedProbability( a, seedProbability );
1921
1922        a->flags |= flags;
1923    }
1924}
1925
1926static int
1927getMaxPeerCount( const tr_torrent * tor )
1928{
1929    return tor->maxConnectedPeers;
1930}
1931
1932static int
1933getPeerCount( const Torrent * t )
1934{
1935    return tr_ptrArraySize( &t->peers );/* + tr_ptrArraySize( &t->outgoingHandshakes ); */
1936}
1937
1938/* FIXME: this is kind of a mess. */
1939static bool
1940myHandshakeDoneCB( tr_handshake  * handshake,
1941                   tr_peerIo     * io,
1942                   bool            readAnythingFromPeer,
1943                   bool            isConnected,
1944                   const uint8_t * peer_id,
1945                   void          * vmanager )
1946{
1947    bool               ok = isConnected;
1948    bool               success = false;
1949    tr_port            port;
1950    const tr_address * addr;
1951    tr_peerMgr       * manager = vmanager;
1952    Torrent          * t;
1953    tr_handshake     * ours;
1954
1955    assert( io );
1956    assert( tr_isBool( ok ) );
1957
1958    t = tr_peerIoHasTorrentHash( io )
1959        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1960        : NULL;
1961
1962    if( tr_peerIoIsIncoming ( io ) )
1963        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1964                                        handshake, handshakeCompare );
1965    else if( t )
1966        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1967                                        handshake, handshakeCompare );
1968    else
1969        ours = handshake;
1970
1971    assert( ours );
1972    assert( ours == handshake );
1973
1974    if( t )
1975        torrentLock( t );
1976
1977    addr = tr_peerIoGetAddress( io, &port );
1978
1979    if( !ok || !t || !t->isRunning )
1980    {
1981        if( t )
1982        {
1983            struct peer_atom * atom = getExistingAtom( t, addr );
1984            if( atom )
1985            {
1986                ++atom->numFails;
1987
1988                if( !readAnythingFromPeer )
1989                {
1990                    tordbg( t, "marking peer %s as unreachable... numFails is %d", tr_atomAddrStr( atom ), (int)atom->numFails );
1991                    atom->flags2 |= MYFLAG_UNREACHABLE;
1992                }
1993            }
1994        }
1995    }
1996    else /* looking good */
1997    {
1998        struct peer_atom * atom;
1999
2000        ensureAtomExists( t, addr, port, 0, -1, TR_PEER_FROM_INCOMING );
2001        atom = getExistingAtom( t, addr );
2002        atom->time = tr_time( );
2003        atom->piece_data_time = 0;
2004        atom->lastConnectionAt = tr_time( );
2005
2006        if( !tr_peerIoIsIncoming( io ) )
2007        {
2008            atom->flags |= ADDED_F_CONNECTABLE;
2009            atom->flags2 &= ~MYFLAG_UNREACHABLE;
2010        }
2011
2012        /* In principle, this flag specifies whether the peer groks uTP,
2013           not whether it's currently connected over uTP. */
2014        if( io->utp_socket )
2015            atom->flags |= ADDED_F_UTP_FLAGS;
2016
2017        if( atom->flags2 & MYFLAG_BANNED )
2018        {
2019            tordbg( t, "banned peer %s tried to reconnect",
2020                    tr_atomAddrStr( atom ) );
2021        }
2022        else if( tr_peerIoIsIncoming( io )
2023               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
2024
2025        {
2026        }
2027        else
2028        {
2029            tr_peer * peer = atom->peer;
2030
2031            if( peer ) /* we already have this peer */
2032            {
2033            }
2034            else
2035            {
2036                peer = getPeer( t, atom );
2037                tr_free( peer->client );
2038
2039                if( !peer_id )
2040                    peer->client = NULL;
2041                else {
2042                    char client[128];
2043                    tr_clientForId( client, sizeof( client ), peer_id );
2044                    peer->client = tr_strdup( client );
2045                }
2046
2047                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
2048                                                                balanced by our unref in peerDelete()  */
2049                tr_peerIoSetParent( peer->io, &t->tor->bandwidth );
2050                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t );
2051
2052                success = true;
2053            }
2054        }
2055    }
2056
2057    if( t )
2058        torrentUnlock( t );
2059
2060    return success;
2061}
2062
2063void
2064tr_peerMgrAddIncoming( tr_peerMgr * manager,
2065                       tr_address * addr,
2066                       tr_port      port,
2067                       int          socket,
2068                       struct UTPSocket * utp_socket )
2069{
2070    tr_session * session;
2071
2072    managerLock( manager );
2073
2074    assert( tr_isSession( manager->session ) );
2075    session = manager->session;
2076
2077    if( tr_sessionIsAddressBlocked( session, addr ) )
2078    {
2079        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_address_to_string( addr ) );
2080        if(socket >= 0)
2081            tr_netClose( session, socket );
2082        else
2083            UTP_Close( utp_socket );
2084    }
2085    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
2086    {
2087        if(socket >= 0)
2088            tr_netClose( session, socket );
2089        else
2090            UTP_Close( utp_socket );
2091    }
2092    else /* we don't have a connection to them yet... */
2093    {
2094        tr_peerIo *    io;
2095        tr_handshake * handshake;
2096
2097        io = tr_peerIoNewIncoming( session, &session->bandwidth, addr, port, socket, utp_socket );
2098
2099        handshake = tr_handshakeNew( io,
2100                                     session->encryptionMode,
2101                                     myHandshakeDoneCB,
2102                                     manager );
2103
2104        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
2105
2106        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
2107                                 handshakeCompare );
2108    }
2109
2110    managerUnlock( manager );
2111}
2112
2113void
2114tr_peerMgrAddPex( tr_torrent * tor, uint8_t from,
2115                  const tr_pex * pex, int8_t seedProbability )
2116{
2117    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
2118    {
2119        Torrent * t = tor->torrentPeers;
2120        managerLock( t->manager );
2121
2122        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
2123            if( tr_address_is_valid_for_peers( &pex->addr, pex->port ) )
2124                ensureAtomExists( t, &pex->addr, pex->port, pex->flags, seedProbability, from );
2125
2126        managerUnlock( t->manager );
2127    }
2128}
2129
2130void
2131tr_peerMgrMarkAllAsSeeds( tr_torrent * tor )
2132{
2133    Torrent * t = tor->torrentPeers;
2134    const int n = tr_ptrArraySize( &t->pool );
2135    struct peer_atom ** it = (struct peer_atom**) tr_ptrArrayBase( &t->pool );
2136    struct peer_atom ** end = it + n;
2137
2138    while( it != end )
2139        atomSetSeed( t, *it++ );
2140}
2141
2142tr_pex *
2143tr_peerMgrCompactToPex( const void *    compact,
2144                        size_t          compactLen,
2145                        const uint8_t * added_f,
2146                        size_t          added_f_len,
2147                        size_t *        pexCount )
2148{
2149    size_t          i;
2150    size_t          n = compactLen / 6;
2151    const uint8_t * walk = compact;
2152    tr_pex *        pex = tr_new0( tr_pex, n );
2153
2154    for( i = 0; i < n; ++i )
2155    {
2156        pex[i].addr.type = TR_AF_INET;
2157        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
2158        memcpy( &pex[i].port, walk, 2 ); walk += 2;
2159        if( added_f && ( n == added_f_len ) )
2160            pex[i].flags = added_f[i];
2161    }
2162
2163    *pexCount = n;
2164    return pex;
2165}
2166
2167tr_pex *
2168tr_peerMgrCompact6ToPex( const void    * compact,
2169                         size_t          compactLen,
2170                         const uint8_t * added_f,
2171                         size_t          added_f_len,
2172                         size_t        * pexCount )
2173{
2174    size_t          i;
2175    size_t          n = compactLen / 18;
2176    const uint8_t * walk = compact;
2177    tr_pex *        pex = tr_new0( tr_pex, n );
2178
2179    for( i = 0; i < n; ++i )
2180    {
2181        pex[i].addr.type = TR_AF_INET6;
2182        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
2183        memcpy( &pex[i].port, walk, 2 ); walk += 2;
2184        if( added_f && ( n == added_f_len ) )
2185            pex[i].flags = added_f[i];
2186    }
2187
2188    *pexCount = n;
2189    return pex;
2190}
2191
2192tr_pex *
2193tr_peerMgrArrayToPex( const void * array,
2194                      size_t       arrayLen,
2195                      size_t      * pexCount )
2196{
2197    size_t          i;
2198    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
2199    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
2200    const uint8_t * walk = array;
2201    tr_pex        * pex = tr_new0( tr_pex, n );
2202
2203    for( i = 0 ; i < n ; i++ ) {
2204        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
2205        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
2206        pex[i].flags = 0x00;
2207        walk += sizeof( tr_address ) + 2;
2208    }
2209
2210    *pexCount = n;
2211    return pex;
2212}
2213
2214/**
2215***
2216**/
2217
2218static void
2219tr_peerMgrSetBlame( tr_torrent     * tor,
2220                    tr_piece_index_t pieceIndex,
2221                    int              success )
2222{
2223    if( !success )
2224    {
2225        int        peerCount, i;
2226        Torrent *  t = tor->torrentPeers;
2227        tr_peer ** peers;
2228
2229        assert( torrentIsLocked( t ) );
2230
2231        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
2232        for( i = 0; i < peerCount; ++i )
2233        {
2234            tr_peer * peer = peers[i];
2235            if( tr_bitfieldHas( &peer->blame, pieceIndex ) )
2236            {
2237                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
2238                        tr_atomAddrStr( peer->atom ),
2239                        pieceIndex, (int)peer->strikes + 1 );
2240                addStrike( t, peer );
2241            }
2242        }
2243    }
2244}
2245
2246int
2247tr_pexCompare( const void * va, const void * vb )
2248{
2249    const tr_pex * a = va;
2250    const tr_pex * b = vb;
2251    int i;
2252
2253    assert( tr_isPex( a ) );
2254    assert( tr_isPex( b ) );
2255
2256    if(( i = tr_address_compare( &a->addr, &b->addr )))
2257        return i;
2258
2259    if( a->port != b->port )
2260        return a->port < b->port ? -1 : 1;
2261
2262    return 0;
2263}
2264
2265/* better goes first */
2266static int
2267compareAtomsByUsefulness( const void * va, const void *vb )
2268{
2269    const struct peer_atom * a = * (const struct peer_atom**) va;
2270    const struct peer_atom * b = * (const struct peer_atom**) vb;
2271
2272    assert( tr_isAtom( a ) );
2273    assert( tr_isAtom( b ) );
2274
2275    if( a->piece_data_time != b->piece_data_time )
2276        return a->piece_data_time > b->piece_data_time ? -1 : 1;
2277    if( a->fromBest != b->fromBest )
2278        return a->fromBest < b->fromBest ? -1 : 1;
2279    if( a->numFails != b->numFails )
2280        return a->numFails < b->numFails ? -1 : 1;
2281
2282    return 0;
2283}
2284
2285static bool
2286isAtomInteresting( const tr_torrent * tor, struct peer_atom * atom )
2287{
2288    if( tr_torrentIsSeed( tor ) && atomIsSeed( atom ) )
2289        return false;
2290
2291    if( peerIsInUse( tor->torrentPeers, atom ) )
2292        return true;
2293
2294    if( isAtomBlocklisted( tor->session, atom ) )
2295        return false;
2296
2297    if( atom->flags2 & MYFLAG_BANNED )
2298        return false;
2299
2300    return true;
2301}
2302
2303int
2304tr_peerMgrGetPeers( tr_torrent   * tor,
2305                    tr_pex      ** setme_pex,
2306                    uint8_t        af,
2307                    uint8_t        list_mode,
2308                    int            maxCount )
2309{
2310    int i;
2311    int n;
2312    int count = 0;
2313    int atomCount = 0;
2314    const Torrent * t = tor->torrentPeers;
2315    struct peer_atom ** atoms = NULL;
2316    tr_pex * pex;
2317    tr_pex * walk;
2318
2319    assert( tr_isTorrent( tor ) );
2320    assert( setme_pex != NULL );
2321    assert( af==TR_AF_INET || af==TR_AF_INET6 );
2322    assert( list_mode==TR_PEERS_CONNECTED || list_mode==TR_PEERS_INTERESTING );
2323
2324    managerLock( t->manager );
2325
2326    /**
2327    ***  build a list of atoms
2328    **/
2329
2330    if( list_mode == TR_PEERS_CONNECTED ) /* connected peers only */
2331    {
2332        int i;
2333        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2334        atomCount = tr_ptrArraySize( &t->peers );
2335        atoms = tr_new( struct peer_atom *, atomCount );
2336        for( i=0; i<atomCount; ++i )
2337            atoms[i] = peers[i]->atom;
2338    }
2339    else /* TR_PEERS_INTERESTING */
2340    {
2341        int i;
2342        struct peer_atom ** atomBase = (struct peer_atom**) tr_ptrArrayBase( &t->pool );
2343        n = tr_ptrArraySize( &t->pool );
2344        atoms = tr_new( struct peer_atom *, n );
2345        for( i=0; i<n; ++i )
2346            if( isAtomInteresting( tor, atomBase[i] ) )
2347                atoms[atomCount++] = atomBase[i];
2348    }
2349
2350    qsort( atoms, atomCount, sizeof( struct peer_atom * ), compareAtomsByUsefulness );
2351
2352    /**
2353    ***  add the first N of them into our return list
2354    **/
2355
2356    n = MIN( atomCount, maxCount );
2357    pex = walk = tr_new0( tr_pex, n );
2358
2359    for( i=0; i<atomCount && count<n; ++i )
2360    {
2361        const struct peer_atom * atom = atoms[i];
2362        if( atom->addr.type == af )
2363        {
2364            assert( tr_address_is_valid( &atom->addr ) );
2365            walk->addr = atom->addr;
2366            walk->port = atom->port;
2367            walk->flags = atom->flags;
2368            ++count;
2369            ++walk;
2370        }
2371    }
2372
2373    qsort( pex, count, sizeof( tr_pex ), tr_pexCompare );
2374
2375    assert( ( walk - pex ) == count );
2376    *setme_pex = pex;
2377
2378    /* cleanup */
2379    tr_free( atoms );
2380    managerUnlock( t->manager );
2381    return count;
2382}
2383
2384static void atomPulse      ( int, short, void * );
2385static void bandwidthPulse ( int, short, void * );
2386static void rechokePulse   ( int, short, void * );
2387static void reconnectPulse ( int, short, void * );
2388
2389static struct event *
2390createTimer( tr_session * session, int msec, void (*callback)(int, short, void *), void * cbdata )
2391{
2392    struct event * timer = evtimer_new( session->event_base, callback, cbdata );
2393    tr_timerAddMsec( timer, msec );
2394    return timer;
2395}
2396
2397static void
2398ensureMgrTimersExist( struct tr_peerMgr * m )
2399{
2400    if( m->atomTimer == NULL )
2401        m->atomTimer = createTimer( m->session, ATOM_PERIOD_MSEC, atomPulse, m );
2402
2403    if( m->bandwidthTimer == NULL )
2404        m->bandwidthTimer = createTimer( m->session, BANDWIDTH_PERIOD_MSEC, bandwidthPulse, m );
2405
2406    if( m->rechokeTimer == NULL )
2407        m->rechokeTimer = createTimer( m->session, RECHOKE_PERIOD_MSEC, rechokePulse, m );
2408
2409    if( m->refillUpkeepTimer == NULL )
2410        m->refillUpkeepTimer = createTimer( m->session, REFILL_UPKEEP_PERIOD_MSEC, refillUpkeep, m );
2411}
2412
2413void
2414tr_peerMgrStartTorrent( tr_torrent * tor )
2415{
2416    Torrent * t = tor->torrentPeers;
2417
2418    assert( tr_isTorrent( tor ) );
2419    assert( tr_torrentIsLocked( tor ) );
2420
2421    ensureMgrTimersExist( t->manager );
2422
2423    t->isRunning = true;
2424    t->maxPeers = t->tor->maxConnectedPeers;
2425    t->pieceSortState = PIECES_UNSORTED;
2426
2427    rechokePulse( 0, 0, t->manager );
2428}
2429
2430static void
2431stopTorrent( Torrent * t )
2432{
2433    tr_peer * peer;
2434
2435    t->isRunning = false;
2436
2437    replicationFree( t );
2438    invalidatePieceSorting( t );
2439
2440    /* disconnect the peers. */
2441    while(( peer = tr_ptrArrayPop( &t->peers )))
2442        peerDelete( t, peer );
2443
2444    /* disconnect the handshakes. handshakeAbort calls handshakeDoneCB(),
2445     * which removes the handshake from t->outgoingHandshakes... */
2446    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
2447        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
2448}
2449
2450void
2451tr_peerMgrStopTorrent( tr_torrent * tor )
2452{
2453    assert( tr_isTorrent( tor ) );
2454    assert( tr_torrentIsLocked( tor ) );
2455
2456    stopTorrent( tor->torrentPeers );
2457}
2458
2459void
2460tr_peerMgrAddTorrent( tr_peerMgr * manager, tr_torrent * tor )
2461{
2462    assert( tr_isTorrent( tor ) );
2463    assert( tr_torrentIsLocked( tor ) );
2464    assert( tor->torrentPeers == NULL );
2465
2466    tor->torrentPeers = torrentNew( manager, tor );
2467}
2468
2469void
2470tr_peerMgrRemoveTorrent( tr_torrent * tor )
2471{
2472    assert( tr_isTorrent( tor ) );
2473    assert( tr_torrentIsLocked( tor ) );
2474
2475    stopTorrent( tor->torrentPeers );
2476    torrentFree( tor->torrentPeers );
2477}
2478
2479void
2480tr_peerUpdateProgress( tr_torrent * tor, tr_peer * peer )
2481{
2482    const tr_bitfield * have = &peer->have;
2483
2484    if( tr_bitfieldHasAll( have ) )
2485    {
2486        peer->progress = 1.0;
2487    }
2488    else if( tr_bitfieldHasNone( have ) )
2489    {
2490        peer->progress = 0.0;
2491    }
2492    else
2493    {
2494        const float true_count = tr_bitfieldCountTrueBits( have );
2495
2496        if( tr_torrentHasMetadata( tor ) )
2497            peer->progress = true_count / tor->info.pieceCount;
2498        else /* without pieceCount, this result is only a best guess... */
2499            peer->progress = true_count / ( have->bit_count + 1 );
2500    }
2501
2502    if( peer->atom && ( peer->progress >= 1.0 ) )
2503        atomSetSeed( tor->torrentPeers, peer->atom );
2504}
2505
2506void
2507tr_peerMgrOnTorrentGotMetainfo( tr_torrent * tor )
2508{
2509    int i;
2510    const int peerCount = tr_ptrArraySize( &tor->torrentPeers->peers );
2511    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &tor->torrentPeers->peers );
2512
2513    /* some peer_msgs' progress fields may not be accurate if we
2514       didn't have the metadata before now... so refresh them all... */
2515    for( i=0; i<peerCount; ++i )
2516        tr_peerUpdateProgress( tor, peers[i] );
2517}
2518
2519void
2520tr_peerMgrTorrentAvailability( const tr_torrent * tor, int8_t * tab, unsigned int tabCount )
2521{
2522    assert( tr_isTorrent( tor ) );
2523    assert( torrentIsLocked( tor->torrentPeers ) );
2524    assert( tab != NULL );
2525    assert( tabCount > 0 );
2526
2527    memset( tab, 0, tabCount );
2528
2529    if( tr_torrentHasMetadata( tor ) )
2530    {
2531        tr_piece_index_t i;
2532        const int peerCount = tr_ptrArraySize( &tor->torrentPeers->peers );
2533        const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &tor->torrentPeers->peers );
2534        const float interval = tor->info.pieceCount / (float)tabCount;
2535        const bool isSeed = tr_cpGetStatus( &tor->completion ) == TR_SEED;
2536
2537        for( i=0; i<tabCount; ++i )
2538        {
2539            const int piece = i * interval;
2540
2541            if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
2542                tab[i] = -1;
2543            else if( peerCount ) {
2544                int j;
2545                for( j=0; j<peerCount; ++j )
2546                    if( tr_bitfieldHas( &peers[j]->have, piece ) )
2547                        ++tab[i];
2548            }
2549        }
2550    }
2551}
2552
2553static bool
2554peerIsSeed( const tr_peer * peer )
2555{
2556    if( peer->progress >= 1.0 )
2557        return true;
2558
2559    if( peer->atom && atomIsSeed( peer->atom ) )
2560        return true;
2561
2562    return false;
2563}
2564
2565/* count how many bytes we want that connected peers have */
2566uint64_t
2567tr_peerMgrGetDesiredAvailable( const tr_torrent * tor )
2568{
2569    size_t i;
2570    size_t n;
2571    uint64_t desiredAvailable;
2572    const Torrent * t = tor->torrentPeers;
2573
2574    /* common shortcuts... */
2575
2576    if( tr_torrentIsSeed( t->tor ) )
2577        return 0;
2578
2579    if( !tr_torrentHasMetadata( tor ) )
2580        return 0;
2581
2582    n = tr_ptrArraySize( &t->peers );
2583    if( n == 0 )
2584        return 0;
2585    else {
2586        const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2587        for( i=0; i<n; ++i )
2588            if( peers[i]->atom && atomIsSeed( peers[i]->atom ) )
2589                return tr_cpLeftUntilDone( &tor->completion );
2590    }
2591
2592    if( !t->pieceReplication || !t->pieceReplicationSize )
2593        return 0;
2594
2595    /* do it the hard way */
2596
2597    desiredAvailable = 0;
2598    for( i=0, n=MIN(tor->info.pieceCount, t->pieceReplicationSize); i<n; ++i )
2599        if( !tor->info.pieces[i].dnd && ( t->pieceReplication[i] > 0 ) )
2600            desiredAvailable += tr_cpMissingBytesInPiece( &t->tor->completion, i );
2601
2602    assert( desiredAvailable <= tor->info.totalSize );
2603    return desiredAvailable;
2604}
2605
2606void
2607tr_peerMgrTorrentStats( tr_torrent  * tor,
2608                        int         * setmePeersConnected,
2609                        int         * setmeWebseedsSendingToUs,
2610                        int         * setmePeersSendingToUs,
2611                        int         * setmePeersGettingFromUs,
2612                        int         * setmePeersFrom )
2613{
2614    int i, size;
2615    const Torrent * t = tor->torrentPeers;
2616    const tr_peer ** peers;
2617
2618    assert( tr_torrentIsLocked( tor ) );
2619
2620    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2621    size = tr_ptrArraySize( &t->peers );
2622
2623    *setmePeersConnected       = 0;
2624    *setmePeersGettingFromUs   = 0;
2625    *setmePeersSendingToUs     = 0;
2626    *setmeWebseedsSendingToUs  = 0;
2627
2628    for( i=0; i<TR_PEER_FROM__MAX; ++i )
2629        setmePeersFrom[i] = 0;
2630
2631    for( i=0; i<size; ++i )
2632    {
2633        const tr_peer * peer = peers[i];
2634        const struct peer_atom * atom = peer->atom;
2635
2636        if( peer->io == NULL ) /* not connected */
2637            continue;
2638
2639        ++*setmePeersConnected;
2640
2641        ++setmePeersFrom[atom->fromFirst];
2642
2643        if( clientIsDownloadingFrom( tor, peer ) )
2644            ++*setmePeersSendingToUs;
2645
2646        if( clientIsUploadingTo( peer ) )
2647            ++*setmePeersGettingFromUs;
2648    }
2649
2650    *setmeWebseedsSendingToUs = countActiveWebseeds( t );
2651}
2652
2653double*
2654tr_peerMgrWebSpeeds_KBps( const tr_torrent * tor )
2655{
2656    int i;
2657    const Torrent * t = tor->torrentPeers;
2658    const int webseedCount = tr_ptrArraySize( &t->webseeds );
2659    const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2660    const uint64_t now = tr_time_msec( );
2661    double * ret = tr_new0( double, webseedCount );
2662
2663    assert( tr_isTorrent( tor ) );
2664    assert( tr_torrentIsLocked( tor ) );
2665    assert( t->manager != NULL );
2666    assert( webseedCount == tor->info.webseedCount );
2667
2668    for( i=0; i<webseedCount; ++i ) {
2669        int Bps;
2670        if( tr_webseedGetSpeed_Bps( webseeds[i], now, &Bps ) )
2671            ret[i] = Bps / (double)tr_speed_K;
2672        else
2673            ret[i] = -1.0;
2674    }
2675
2676    return ret;
2677}
2678
2679int
2680tr_peerGetPieceSpeed_Bps( const tr_peer * peer, uint64_t now, tr_direction direction )
2681{
2682    return peer->io ? tr_peerIoGetPieceSpeed_Bps( peer->io, now, direction ) : 0.0;
2683}
2684
2685struct tr_peer_stat *
2686tr_peerMgrPeerStats( const tr_torrent * tor, int * setmeCount )
2687{
2688    int i;
2689    const Torrent * t = tor->torrentPeers;
2690    const int size = tr_ptrArraySize( &t->peers );
2691    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2692    const uint64_t now_msec = tr_time_msec( );
2693    const time_t now = tr_time();
2694    tr_peer_stat * ret = tr_new0( tr_peer_stat, size );
2695
2696    assert( tr_isTorrent( tor ) );
2697    assert( tr_torrentIsLocked( tor ) );
2698    assert( t->manager );
2699
2700    for( i=0; i<size; ++i )
2701    {
2702        char *                   pch;
2703        const tr_peer *          peer = peers[i];
2704        const struct peer_atom * atom = peer->atom;
2705        tr_peer_stat *           stat = ret + i;
2706
2707        tr_address_to_string_with_buf( &atom->addr, stat->addr, sizeof( stat->addr ) );
2708        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
2709                   sizeof( stat->client ) );
2710        stat->port                = ntohs( peer->atom->port );
2711        stat->from                = atom->fromFirst;
2712        stat->progress            = peer->progress;
2713        stat->isUTP               = peer->io->utp_socket != NULL;
2714        stat->isEncrypted         = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
2715        stat->rateToPeer_KBps     = toSpeedKBps( tr_peerGetPieceSpeed_Bps( peer, now_msec, TR_CLIENT_TO_PEER ) );
2716        stat->rateToClient_KBps   = toSpeedKBps( tr_peerGetPieceSpeed_Bps( peer, now_msec, TR_PEER_TO_CLIENT ) );
2717        stat->peerIsChoked        = peer->peerIsChoked;
2718        stat->peerIsInterested    = peer->peerIsInterested;
2719        stat->clientIsChoked      = peer->clientIsChoked;
2720        stat->clientIsInterested  = peer->clientIsInterested;
2721        stat->isIncoming          = tr_peerIoIsIncoming( peer->io );
2722        stat->isDownloadingFrom   = clientIsDownloadingFrom( tor, peer );
2723        stat->isUploadingTo       = clientIsUploadingTo( peer );
2724        stat->isSeed              = peerIsSeed( peer );
2725
2726        stat->blocksToPeer        = tr_historyGet( &peer->blocksSentToPeer,    now, CANCEL_HISTORY_SEC );
2727        stat->blocksToClient      = tr_historyGet( &peer->blocksSentToClient,  now, CANCEL_HISTORY_SEC );
2728        stat->cancelsToPeer       = tr_historyGet( &peer->cancelsSentToPeer,   now, CANCEL_HISTORY_SEC );
2729        stat->cancelsToClient     = tr_historyGet( &peer->cancelsSentToClient, now, CANCEL_HISTORY_SEC );
2730
2731        stat->pendingReqsToPeer   = peer->pendingReqsToPeer;
2732        stat->pendingReqsToClient = peer->pendingReqsToClient;
2733
2734        pch = stat->flagStr;
2735        if( stat->isUTP ) *pch++ = 'T';
2736        if( t->optimistic == peer ) *pch++ = 'O';
2737        if( stat->isDownloadingFrom ) *pch++ = 'D';
2738        else if( stat->clientIsInterested ) *pch++ = 'd';
2739        if( stat->isUploadingTo ) *pch++ = 'U';
2740        else if( stat->peerIsInterested ) *pch++ = 'u';
2741        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
2742        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
2743        if( stat->isEncrypted ) *pch++ = 'E';
2744        if( stat->from == TR_PEER_FROM_DHT ) *pch++ = 'H';
2745        else if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
2746        if( stat->isIncoming ) *pch++ = 'I';
2747        *pch = '\0';
2748    }
2749
2750    *setmeCount = size;
2751
2752    return ret;
2753}
2754
2755/***
2756****
2757****
2758***/
2759
2760void
2761tr_peerMgrClearInterest( tr_torrent * tor )
2762{
2763    int i;
2764    Torrent * t = tor->torrentPeers;
2765    const int peerCount = tr_ptrArraySize( &t->peers );
2766
2767    assert( tr_isTorrent( tor ) );
2768    assert( tr_torrentIsLocked( tor ) );
2769
2770    for( i=0; i<peerCount; ++i )
2771    {
2772        const tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2773        tr_peerMsgsSetInterested( peer->msgs, false );
2774    }
2775}
2776
2777/* does this peer have any pieces that we want? */
2778static bool
2779isPeerInteresting( const tr_torrent  * const tor,
2780                   const bool        * const piece_is_interesting,
2781                   const tr_peer     * const peer )
2782{
2783    tr_piece_index_t i, n;
2784
2785    /* these cases should have already been handled by the calling code... */
2786    assert( !tr_torrentIsSeed( tor ) );
2787    assert( tr_torrentIsPieceTransferAllowed( tor, TR_PEER_TO_CLIENT ) );
2788
2789    if( peerIsSeed( peer ) )
2790        return true;
2791
2792    for( i=0, n=tor->info.pieceCount; i<n; ++i )
2793        if( piece_is_interesting[i] && tr_bitfieldHas( &peer->have, i ) )
2794            return true;
2795
2796    return false;
2797}
2798
2799typedef enum
2800{
2801    RECHOKE_STATE_GOOD,
2802    RECHOKE_STATE_UNTESTED,
2803    RECHOKE_STATE_BAD
2804}
2805tr_rechoke_state;
2806
2807struct tr_rechoke_info
2808{
2809    tr_peer * peer;
2810    int salt;
2811    int rechoke_state;
2812};
2813
2814static int
2815compare_rechoke_info( const void * va, const void * vb )
2816{
2817    const struct tr_rechoke_info * a = va;
2818    const struct tr_rechoke_info * b = vb;
2819
2820    if( a->rechoke_state != b->rechoke_state )
2821        return a->rechoke_state - b->rechoke_state;
2822
2823    return a->salt - b->salt;
2824}
2825
2826/* determines who we send "interested" messages to */
2827static void
2828rechokeDownloads( Torrent * t )
2829{
2830    int i;
2831    int maxPeers = 0;
2832    int rechoke_count = 0;
2833    struct tr_rechoke_info * rechoke = NULL;
2834    const int MIN_INTERESTING_PEERS = 5;
2835    const int peerCount = tr_ptrArraySize( &t->peers );
2836    const time_t now = tr_time( );
2837
2838    /* some cases where this function isn't necessary */
2839    if( tr_torrentIsSeed( t->tor ) )
2840        return;
2841    if ( !tr_torrentIsPieceTransferAllowed( t->tor, TR_PEER_TO_CLIENT ) )
2842        return;
2843
2844    /* decide HOW MANY peers to be interested in */
2845    {
2846        int blocks = 0;
2847        int cancels = 0;
2848        time_t timeSinceCancel;
2849
2850        /* Count up how many blocks & cancels each peer has.
2851         *
2852         * There are two situations where we send out cancels --
2853         *
2854         * 1. We've got unresponsive peers, which is handled by deciding
2855         *    -which- peers to be interested in.
2856         *
2857         * 2. We've hit our bandwidth cap, which is handled by deciding
2858         *    -how many- peers to be interested in.
2859         *
2860         * We're working on 2. here, so we need to ignore unresponsive
2861         * peers in our calculations lest they confuse Transmission into
2862         * thinking it's hit its bandwidth cap.
2863         */
2864        for( i=0; i<peerCount; ++i )
2865        {
2866            const tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2867            const int b = tr_historyGet( &peer->blocksSentToClient, now, CANCEL_HISTORY_SEC );
2868            const int c = tr_historyGet( &peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC );
2869
2870            if( b == 0 ) /* ignore unresponsive peers, as described above */
2871                continue;
2872
2873            blocks += b;
2874            cancels += c;
2875        }
2876
2877        if( cancels > 0 )
2878        {
2879            /* cancelRate: of the block requests we've recently made, the percentage we cancelled.
2880             * higher values indicate more congestion. */
2881            const double cancelRate = cancels / (double)(cancels + blocks);
2882            const double mult = 1 - MIN( cancelRate, 0.5 );
2883            maxPeers = t->interestedCount * mult;
2884            tordbg( t, "cancel rate is %.3f -- reducing the "
2885                       "number of peers we're interested in by %.0f percent",
2886                       cancelRate, mult * 100 );
2887            t->lastCancel = now;
2888        }
2889
2890        timeSinceCancel = now - t->lastCancel;
2891        if( timeSinceCancel )
2892        {
2893            const int maxIncrease = 15;
2894            const time_t maxHistory = 2 * CANCEL_HISTORY_SEC;
2895            const double mult = MIN( timeSinceCancel, maxHistory ) / (double) maxHistory;
2896            const int inc = maxIncrease * mult;
2897            maxPeers = t->maxPeers + inc;
2898            tordbg( t, "time since last cancel is %li -- increasing the "
2899                       "number of peers we're interested in by %d",
2900                       timeSinceCancel, inc );
2901        }
2902    }
2903
2904    /* don't let the previous section's number tweaking go too far... */
2905    if( maxPeers < MIN_INTERESTING_PEERS )
2906        maxPeers = MIN_INTERESTING_PEERS;
2907    if( maxPeers > t->tor->maxConnectedPeers )
2908        maxPeers = t->tor->maxConnectedPeers;
2909
2910    t->maxPeers = maxPeers;
2911
2912    if( peerCount > 0 )
2913    {
2914        bool * piece_is_interesting;
2915        const tr_torrent * const tor = t->tor;
2916        const int n = tor->info.pieceCount;
2917
2918        /* build a bitfield of interesting pieces... */
2919        piece_is_interesting = tr_new( bool, n );
2920        for( i=0; i<n; i++ )
2921            piece_is_interesting[i] = !tor->info.pieces[i].dnd && !tr_cpPieceIsComplete( &tor->completion, i );
2922
2923        /* decide WHICH peers to be interested in (based on their cancel-to-block ratio) */
2924        for( i=0; i<peerCount; ++i )
2925        {
2926            tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2927
2928            if( !isPeerInteresting( t->tor, piece_is_interesting, peer ) )
2929            {
2930                tr_peerMsgsSetInterested( peer->msgs, false );
2931            }
2932            else
2933            {
2934                tr_rechoke_state rechoke_state;
2935                const int blocks = tr_historyGet( &peer->blocksSentToClient, now, CANCEL_HISTORY_SEC );
2936                const int cancels = tr_historyGet( &peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC );
2937
2938                if( !blocks && !cancels )
2939                    rechoke_state = RECHOKE_STATE_UNTESTED;
2940                else if( !cancels )
2941                    rechoke_state = RECHOKE_STATE_GOOD;
2942                else if( !blocks )
2943                    rechoke_state = RECHOKE_STATE_BAD;
2944                else if( ( cancels * 10 ) < blocks )
2945                    rechoke_state = RECHOKE_STATE_GOOD;
2946                else
2947                    rechoke_state = RECHOKE_STATE_BAD;
2948
2949                if( rechoke == NULL )
2950                    rechoke = tr_new( struct tr_rechoke_info, peerCount );
2951
2952                 rechoke[rechoke_count].peer = peer;
2953                 rechoke[rechoke_count].rechoke_state = rechoke_state;
2954                 rechoke[rechoke_count].salt = tr_cryptoWeakRandInt( INT_MAX );
2955                 rechoke_count++;
2956            }
2957
2958        }
2959
2960        tr_free( piece_is_interesting );
2961    }
2962
2963    /* now that we know which & how many peers to be interested in... update the peer interest */
2964    qsort( rechoke, rechoke_count, sizeof( struct tr_rechoke_info ), compare_rechoke_info );
2965    t->interestedCount = MIN( maxPeers, rechoke_count );
2966    for( i=0; i<rechoke_count; ++i )
2967        tr_peerMsgsSetInterested( rechoke[i].peer->msgs, i<t->interestedCount );
2968
2969    /* cleanup */
2970    tr_free( rechoke );
2971}
2972
2973/**
2974***
2975**/
2976
2977struct ChokeData
2978{
2979    bool            isInterested;
2980    bool            wasChoked;
2981    bool            isChoked;
2982    int             rate;
2983    int             salt;
2984    tr_peer *       peer;
2985};
2986
2987static int
2988compareChoke( const void * va, const void * vb )
2989{
2990    const struct ChokeData * a = va;
2991    const struct ChokeData * b = vb;
2992
2993    if( a->rate != b->rate ) /* prefer higher overall speeds */
2994        return a->rate > b->rate ? -1 : 1;
2995
2996    if( a->wasChoked != b->wasChoked ) /* prefer unchoked */
2997        return a->wasChoked ? 1 : -1;
2998
2999    if( a->salt != b->salt ) /* random order */
3000        return a->salt - b->salt;
3001
3002    return 0;
3003}
3004
3005/* is this a new connection? */
3006static int
3007isNew( const tr_peer * peer )
3008{
3009    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
3010}
3011
3012/* get a rate for deciding which peers to choke and unchoke. */
3013static int
3014getRate( const tr_torrent * tor, struct peer_atom * atom, uint64_t now )
3015{
3016    int Bps;
3017
3018    if( tr_torrentIsSeed( tor ) )
3019        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_CLIENT_TO_PEER );
3020
3021    /* downloading a private torrent... take upload speed into account
3022     * because there may only be a small window of opportunity to share */
3023    else if( tr_torrentIsPrivate( tor ) )
3024        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_PEER_TO_CLIENT )
3025            + tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_CLIENT_TO_PEER );
3026
3027    /* downloading a public torrent */
3028    else
3029        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_PEER_TO_CLIENT );
3030
3031    /* convert it to bytes per second */
3032    return Bps;
3033}
3034
3035static inline bool
3036isBandwidthMaxedOut( const tr_bandwidth * b,
3037                     const uint64_t now_msec, tr_direction dir )
3038{
3039    if( !tr_bandwidthIsLimited( b, dir ) )
3040        return false;
3041    else {
3042        const int got = tr_bandwidthGetPieceSpeed_Bps( b, now_msec, dir );
3043        const int want = tr_bandwidthGetDesiredSpeed_Bps( b, dir );
3044        return got >= want;
3045    }
3046}
3047
3048static void
3049rechokeUploads( Torrent * t, const uint64_t now )
3050{
3051    int i, size, unchokedInterested;
3052    const int peerCount = tr_ptrArraySize( &t->peers );
3053    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
3054    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
3055    const tr_session * session = t->manager->session;
3056    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
3057    const bool isMaxedOut = isBandwidthMaxedOut( &t->tor->bandwidth, now, TR_UP );
3058
3059    assert( torrentIsLocked( t ) );
3060
3061    /* an optimistic unchoke peer's "optimistic"
3062     * state lasts for N calls to rechokeUploads(). */
3063    if( t->optimisticUnchokeTimeScaler > 0 )
3064        t->optimisticUnchokeTimeScaler--;
3065    else
3066        t->optimistic = NULL;
3067
3068    /* sort the peers by preference and rate */
3069    for( i = 0, size = 0; i < peerCount; ++i )
3070    {
3071        tr_peer * peer = peers[i];
3072        struct peer_atom * atom = peer->atom;
3073
3074        if( peerIsSeed( peer ) ) /* choke seeds and partial seeds */
3075        {
3076            tr_peerMsgsSetChoke( peer->msgs, true );
3077        }
3078        else if( chokeAll ) /* choke everyone if we're not uploading */
3079        {
3080            tr_peerMsgsSetChoke( peer->msgs, true );
3081        }
3082        else if( peer != t->optimistic )
3083        {
3084            struct ChokeData * n = &choke[size++];
3085            n->peer         = peer;
3086            n->isInterested = peer->peerIsInterested;
3087            n->wasChoked    = peer->peerIsChoked;
3088            n->rate         = getRate( t->tor, atom, now );
3089            n->salt         = tr_cryptoWeakRandInt( INT_MAX );
3090            n->isChoked     = true;
3091        }
3092    }
3093
3094    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
3095
3096    /**
3097     * Reciprocation and number of uploads capping is managed by unchoking
3098     * the N peers which have the best upload rate and are interested.
3099     * This maximizes the client's download rate. These N peers are
3100     * referred to as downloaders, because they are interested in downloading
3101     * from the client.
3102     *
3103     * Peers which have a better upload rate (as compared to the downloaders)
3104     * but aren't interested get unchoked. If they become interested, the
3105     * downloader with the worst upload rate gets choked. If a client has
3106     * a complete file, it uses its upload rate rather than its download
3107     * rate to decide which peers to unchoke.
3108     *
3109     * If our bandwidth is maxed out, don't unchoke any more peers.
3110     */
3111    unchokedInterested = 0;
3112    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
3113        choke[i].isChoked = isMaxedOut ? choke[i].wasChoked : false;
3114        if( choke[i].isInterested )
3115            ++unchokedInterested;
3116    }
3117
3118    /* optimistic unchoke */
3119    if( !t->optimistic && !isMaxedOut && (i<size) )
3120    {
3121        int n;
3122        struct ChokeData * c;
3123        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
3124
3125        for( ; i<size; ++i )
3126        {
3127            if( choke[i].isInterested )
3128            {
3129                const tr_peer * peer = choke[i].peer;
3130                int x = 1, y;
3131                if( isNew( peer ) ) x *= 3;
3132                for( y=0; y<x; ++y )
3133                    tr_ptrArrayAppend( &randPool, &choke[i] );
3134            }
3135        }
3136
3137        if(( n = tr_ptrArraySize( &randPool )))
3138        {
3139            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
3140            c->isChoked = false;
3141            t->optimistic = c->peer;
3142            t->optimisticUnchokeTimeScaler = OPTIMISTIC_UNCHOKE_MULTIPLIER;
3143        }
3144
3145        tr_ptrArrayDestruct( &randPool, NULL );
3146    }
3147
3148    for( i=0; i<size; ++i )
3149        tr_peerMsgsSetChoke( choke[i].peer->msgs, choke[i].isChoked );
3150
3151    /* cleanup */
3152    tr_free( choke );
3153}
3154
3155static void
3156rechokePulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3157{
3158    tr_torrent * tor = NULL;
3159    tr_peerMgr * mgr = vmgr;
3160    const uint64_t now = tr_time_msec( );
3161
3162    managerLock( mgr );
3163
3164    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3165        if( tor->isRunning ) {
3166            Torrent * t = tor->torrentPeers;
3167            if( !tr_ptrArrayEmpty( &t->peers ) ) {
3168                rechokeUploads( t, now );
3169                rechokeDownloads( t );
3170            }
3171        }
3172    }
3173
3174    tr_timerAddMsec( mgr->rechokeTimer, RECHOKE_PERIOD_MSEC );
3175    managerUnlock( mgr );
3176}
3177
3178/***
3179****
3180****  Life and Death
3181****
3182***/
3183
3184static bool
3185shouldPeerBeClosed( const Torrent    * t,
3186                    const tr_peer    * peer,
3187                    int                peerCount,
3188                    const time_t       now )
3189{
3190    const tr_torrent *       tor = t->tor;
3191    const struct peer_atom * atom = peer->atom;
3192
3193    /* if it's marked for purging, close it */
3194    if( peer->doPurge )
3195    {
3196        tordbg( t, "purging peer %s because its doPurge flag is set",
3197                tr_atomAddrStr( atom ) );
3198        return true;
3199    }
3200
3201    /* disconnect if we're both seeds and enough time has passed for PEX */
3202    if( tr_torrentIsSeed( tor ) && peerIsSeed( peer ) )
3203        return !tr_torrentAllowsPex(tor) || (now-atom->time>=30);
3204
3205    /* disconnect if it's been too long since piece data has been transferred.
3206     * this is on a sliding scale based on number of available peers... */
3207    {
3208        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
3209        /* if we have >= relaxIfFewerThan, strictness is 100%.
3210         * if we have zero connections, strictness is 0% */
3211        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
3212                               ? 1.0
3213                               : peerCount / (float)relaxStrictnessIfFewerThanN;
3214        const int lo = MIN_UPLOAD_IDLE_SECS;
3215        const int hi = MAX_UPLOAD_IDLE_SECS;
3216        const int limit = hi - ( ( hi - lo ) * strictness );
3217        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
3218/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
3219        if( idleTime > limit ) {
3220            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
3221                       tr_atomAddrStr( atom ), idleTime );
3222            return true;
3223        }
3224    }
3225
3226    return false;
3227}
3228
3229static tr_peer **
3230getPeersToClose( Torrent * t, const time_t now_sec, int * setmeSize )
3231{
3232    int i, peerCount, outsize;
3233    struct tr_peer ** ret = NULL;
3234    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
3235
3236    assert( torrentIsLocked( t ) );
3237
3238    for( i = outsize = 0; i < peerCount; ++i ) {
3239        if( shouldPeerBeClosed( t, peers[i], peerCount, now_sec ) ) {
3240            if( ret == NULL )
3241                ret = tr_new( tr_peer *, peerCount );
3242            ret[outsize++] = peers[i];
3243        }
3244    }
3245
3246    *setmeSize = outsize;
3247    return ret;
3248}
3249
3250static int
3251getReconnectIntervalSecs( const struct peer_atom * atom, const time_t now )
3252{
3253    int sec;
3254
3255    /* if we were recently connected to this peer and transferring piece
3256     * data, try to reconnect to them sooner rather that later -- we don't
3257     * want network troubles to get in the way of a good peer. */
3258    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
3259        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3260
3261    /* don't allow reconnects more often than our minimum */
3262    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
3263        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3264
3265    /* otherwise, the interval depends on how many times we've tried
3266     * and failed to connect to the peer */
3267    else switch( atom->numFails ) {
3268        case 0: sec = 0; break;
3269        case 1: sec = 5; break;
3270        case 2: sec = 2 * 60; break;
3271        case 3: sec = 15 * 60; break;
3272        case 4: sec = 30 * 60; break;
3273        case 5: sec = 60 * 60; break;
3274        default: sec = 120 * 60; break;
3275    }
3276
3277    /* penalize peers that were unreachable the last time we tried */
3278    if( atom->flags2 & MYFLAG_UNREACHABLE )
3279        sec += sec;
3280
3281    dbgmsg( "reconnect interval for %s is %d seconds", tr_atomAddrStr( atom ), sec );
3282    return sec;
3283}
3284
3285static void
3286removePeer( Torrent * t, tr_peer * peer )
3287{
3288    tr_peer * removed;
3289    struct peer_atom * atom = peer->atom;
3290
3291    assert( torrentIsLocked( t ) );
3292    assert( atom );
3293
3294    atom->time = tr_time( );
3295
3296    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
3297
3298    if( replicationExists( t ) )
3299        tr_decrReplicationFromBitfield( t, &peer->have );
3300
3301    assert( removed == peer );
3302    peerDelete( t, removed );
3303}
3304
3305static void
3306closePeer( Torrent * t, tr_peer * peer )
3307{
3308    struct peer_atom * atom;
3309
3310    assert( t != NULL );
3311    assert( peer != NULL );
3312
3313    atom = peer->atom;
3314
3315    /* if we transferred piece data, then they might be good peers,
3316       so reset their `numFails' weight to zero. otherwise we connected
3317       to them fruitlessly, so mark it as another fail */
3318    if( atom->piece_data_time ) {
3319        tordbg( t, "resetting atom %s numFails to 0", tr_atomAddrStr(atom) );
3320        atom->numFails = 0;
3321    } else {
3322        ++atom->numFails;
3323        tordbg( t, "incremented atom %s numFails to %d", tr_atomAddrStr(atom), (int)atom->numFails );
3324    }
3325
3326    tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
3327    removePeer( t, peer );
3328}
3329
3330static void
3331removeAllPeers( Torrent * t )
3332{
3333    while( !tr_ptrArrayEmpty( &t->peers ) )
3334        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
3335}
3336
3337static void
3338closeBadPeers( Torrent * t, const time_t now_sec )
3339{
3340    if( !tr_ptrArrayEmpty( &t->peers ) )
3341    {
3342        int i;
3343        int peerCount;
3344        struct tr_peer ** peers = getPeersToClose( t, now_sec, &peerCount );
3345        for( i=0; i<peerCount; ++i )
3346            closePeer( t, peers[i] );
3347        tr_free( peers );
3348    }
3349}
3350
3351struct peer_liveliness
3352{
3353    tr_peer * peer;
3354    void * clientData;
3355    time_t pieceDataTime;
3356    time_t time;
3357    int speed;
3358    bool doPurge;
3359};
3360
3361static int
3362comparePeerLiveliness( const void * va, const void * vb )
3363{
3364    const struct peer_liveliness * a = va;
3365    const struct peer_liveliness * b = vb;
3366
3367    if( a->doPurge != b->doPurge )
3368        return a->doPurge ? 1 : -1;
3369
3370    if( a->speed != b->speed ) /* faster goes first */
3371        return a->speed > b->speed ? -1 : 1;
3372
3373    /* the one to give us data more recently goes first */
3374    if( a->pieceDataTime != b->pieceDataTime )
3375        return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
3376
3377    /* the one we connected to most recently goes first */
3378    if( a->time != b->time )
3379        return a->time > b->time ? -1 : 1;
3380
3381    return 0;
3382}
3383
3384static void
3385sortPeersByLivelinessImpl( tr_peer  ** peers,
3386                           void     ** clientData,
3387                           int         n,
3388                           uint64_t    now,
3389                           int (*compare) ( const void *va, const void *vb ) )
3390{
3391    int i;
3392    struct peer_liveliness *lives, *l;
3393
3394    /* build a sortable array of peer + extra info */
3395    lives = l = tr_new0( struct peer_liveliness, n );
3396    for( i=0; i<n; ++i, ++l )
3397    {
3398        tr_peer * p = peers[i];
3399        l->peer = p;
3400        l->doPurge = p->doPurge;
3401        l->pieceDataTime = p->atom->piece_data_time;
3402        l->time = p->atom->time;
3403        l->speed = tr_peerGetPieceSpeed_Bps( p, now, TR_UP )
3404                 + tr_peerGetPieceSpeed_Bps( p, now, TR_DOWN );
3405        if( clientData )
3406            l->clientData = clientData[i];
3407    }
3408
3409    /* sort 'em */
3410    assert( n == ( l - lives ) );
3411    qsort( lives, n, sizeof( struct peer_liveliness ), compare );
3412
3413    /* build the peer array */
3414    for( i=0, l=lives; i<n; ++i, ++l ) {
3415        peers[i] = l->peer;
3416        if( clientData )
3417            clientData[i] = l->clientData;
3418    }
3419    assert( n == ( l - lives ) );
3420
3421    /* cleanup */
3422    tr_free( lives );
3423}
3424
3425static void
3426sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now )
3427{
3428    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLiveliness );
3429}
3430
3431
3432static void
3433enforceTorrentPeerLimit( Torrent * t, uint64_t now )
3434{
3435    int n = tr_ptrArraySize( &t->peers );
3436    const int max = tr_torrentGetPeerLimit( t->tor );
3437    if( n > max )
3438    {
3439        void * base = tr_ptrArrayBase( &t->peers );
3440        tr_peer ** peers = tr_memdup( base, n*sizeof( tr_peer* ) );
3441        sortPeersByLiveliness( peers, NULL, n, now );
3442        while( n > max )
3443            closePeer( t, peers[--n] );
3444        tr_free( peers );
3445    }
3446}
3447
3448static void
3449enforceSessionPeerLimit( tr_session * session, uint64_t now )
3450{
3451    int n = 0;
3452    tr_torrent * tor = NULL;
3453    const int max = tr_sessionGetPeerLimit( session );
3454
3455    /* count the total number of peers */
3456    while(( tor = tr_torrentNext( session, tor )))
3457        n += tr_ptrArraySize( &tor->torrentPeers->peers );
3458
3459    /* if there are too many, prune out the worst */
3460    if( n > max )
3461    {
3462        tr_peer ** peers = tr_new( tr_peer*, n );
3463        Torrent ** torrents = tr_new( Torrent*, n );
3464
3465        /* populate the peer array */
3466        n = 0;
3467        tor = NULL;
3468        while(( tor = tr_torrentNext( session, tor ))) {
3469            int i;
3470            Torrent * t = tor->torrentPeers;
3471            const int tn = tr_ptrArraySize( &t->peers );
3472            for( i=0; i<tn; ++i, ++n ) {
3473                peers[n] = tr_ptrArrayNth( &t->peers, i );
3474                torrents[n] = t;
3475            }
3476        }
3477
3478        /* sort 'em */
3479        sortPeersByLiveliness( peers, (void**)torrents, n, now );
3480
3481        /* cull out the crappiest */
3482        while( n-- > max )
3483            closePeer( torrents[n], peers[n] );
3484
3485        /* cleanup */
3486        tr_free( torrents );
3487        tr_free( peers );
3488    }
3489}
3490
3491static void makeNewPeerConnections( tr_peerMgr * mgr, const int max );
3492
3493static void
3494reconnectPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3495{
3496    tr_torrent * tor;
3497    tr_peerMgr * mgr = vmgr;
3498    const time_t now_sec = tr_time( );
3499    const uint64_t now_msec = tr_time_msec( );
3500
3501    /**
3502    ***  enforce the per-session and per-torrent peer limits
3503    **/
3504
3505    /* if we're over the per-torrent peer limits, cull some peers */
3506    tor = NULL;
3507    while(( tor = tr_torrentNext( mgr->session, tor )))
3508        if( tor->isRunning )
3509            enforceTorrentPeerLimit( tor->torrentPeers, now_msec );
3510
3511    /* if we're over the per-session peer limits, cull some peers */
3512    enforceSessionPeerLimit( mgr->session, now_msec );
3513
3514    /* remove crappy peers */
3515    tor = NULL;
3516    while(( tor = tr_torrentNext( mgr->session, tor )))
3517        if( !tor->torrentPeers->isRunning )
3518            removeAllPeers( tor->torrentPeers );
3519        else
3520            closeBadPeers( tor->torrentPeers, now_sec );
3521
3522    /* try to make new peer connections */
3523    makeNewPeerConnections( mgr, MAX_CONNECTIONS_PER_PULSE );
3524}
3525
3526/****
3527*****
3528*****  BANDWIDTH ALLOCATION
3529*****
3530****/
3531
3532static void
3533pumpAllPeers( tr_peerMgr * mgr )
3534{
3535    tr_torrent * tor = NULL;
3536
3537    while(( tor = tr_torrentNext( mgr->session, tor )))
3538    {
3539        int j;
3540        Torrent * t = tor->torrentPeers;
3541
3542        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
3543        {
3544            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
3545            tr_peerMsgsPulse( peer->msgs );
3546        }
3547    }
3548}
3549
3550static void
3551queuePulse( tr_session * session, tr_direction dir )
3552{
3553    assert( tr_isSession( session ) );
3554    assert( tr_isDirection( dir ) );
3555
3556    if( tr_sessionGetQueueEnabled( session, dir ) )
3557    {
3558        int i;
3559        const int n = tr_sessionCountQueueFreeSlots( session, dir );
3560        for( i=0; i<n; i++ ) {
3561            tr_torrent * tor = tr_sessionGetNextQueuedTorrent( session, dir );
3562            if( tor != NULL ) {
3563                tr_torrentStartNow( tor );
3564                if( tor->queue_started_callback != NULL )
3565                    (*tor->queue_started_callback)( tor, tor->queue_started_user_data );
3566            }
3567        }
3568    }
3569}
3570
3571static void
3572bandwidthPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3573{
3574    tr_torrent * tor;
3575    tr_peerMgr * mgr = vmgr;
3576    tr_session * session = mgr->session;
3577    managerLock( mgr );
3578
3579    /* FIXME: this next line probably isn't necessary... */
3580    pumpAllPeers( mgr );
3581
3582    /* allocate bandwidth to the peers */
3583    tr_bandwidthAllocate( &session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
3584    tr_bandwidthAllocate( &session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
3585
3586    /* torrent upkeep */
3587    tor = NULL;
3588    while(( tor = tr_torrentNext( session, tor )))
3589    {
3590        /* possibly stop torrents that have seeded enough */
3591        tr_torrentCheckSeedLimit( tor );
3592
3593        /* run the completeness check for any torrents that need it */
3594        if( tor->torrentPeers->needsCompletenessCheck ) {
3595            tor->torrentPeers->needsCompletenessCheck  = false;
3596            tr_torrentRecheckCompleteness( tor );
3597        }
3598
3599        /* stop torrents that are ready to stop, but couldn't be stopped
3600           earlier during the peer-io callback call chain */
3601        if( tor->isStopping )
3602            tr_torrentStop( tor );
3603    }
3604
3605    /* pump the queues */
3606    queuePulse( session, TR_UP );
3607    queuePulse( session, TR_DOWN );
3608
3609    reconnectPulse( 0, 0, mgr );
3610
3611    tr_timerAddMsec( mgr->bandwidthTimer, BANDWIDTH_PERIOD_MSEC );
3612    managerUnlock( mgr );
3613}
3614
3615/***
3616****
3617***/
3618
3619static int
3620compareAtomPtrsByAddress( const void * va, const void *vb )
3621{
3622    const struct peer_atom * a = * (const struct peer_atom**) va;
3623    const struct peer_atom * b = * (const struct peer_atom**) vb;
3624
3625    assert( tr_isAtom( a ) );
3626    assert( tr_isAtom( b ) );
3627
3628    return tr_address_compare( &a->addr, &b->addr );
3629}
3630
3631/* best come first, worst go last */
3632static int
3633compareAtomPtrsByShelfDate( const void * va, const void *vb )
3634{
3635    time_t atime;
3636    time_t btime;
3637    const struct peer_atom * a = * (const struct peer_atom**) va;
3638    const struct peer_atom * b = * (const struct peer_atom**) vb;
3639    const int data_time_cutoff_secs = 60 * 60;
3640    const time_t tr_now = tr_time( );
3641
3642    assert( tr_isAtom( a ) );
3643    assert( tr_isAtom( b ) );
3644
3645    /* primary key: the last piece data time *if* it was within the last hour */
3646    atime = a->piece_data_time; if( atime + data_time_cutoff_secs < tr_now ) atime = 0;
3647    btime = b->piece_data_time; if( btime + data_time_cutoff_secs < tr_now ) btime = 0;
3648    if( atime != btime )
3649        return atime > btime ? -1 : 1;
3650
3651    /* secondary key: shelf date. */
3652    if( a->shelf_date != b->shelf_date )
3653        return a->shelf_date > b->shelf_date ? -1 : 1;
3654
3655    return 0;
3656}
3657
3658static int
3659getMaxAtomCount( const tr_torrent * tor )
3660{
3661    const int n = tor->maxConnectedPeers;
3662    /* approximate fit of the old jump discontinuous function */
3663    if( n >= 55 ) return     n + 150;
3664    if( n >= 20 ) return 2 * n + 95;
3665    return               4 * n + 55;
3666}
3667
3668static void
3669atomPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3670{
3671    tr_torrent * tor = NULL;
3672    tr_peerMgr * mgr = vmgr;
3673    managerLock( mgr );
3674
3675    while(( tor = tr_torrentNext( mgr->session, tor )))
3676    {
3677        int atomCount;
3678        Torrent * t = tor->torrentPeers;
3679        const int maxAtomCount = getMaxAtomCount( tor );
3680        struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
3681
3682        if( atomCount > maxAtomCount ) /* we've got too many atoms... time to prune */
3683        {
3684            int i;
3685            int keepCount = 0;
3686            int testCount = 0;
3687            struct peer_atom ** keep = tr_new( struct peer_atom*, atomCount );
3688            struct peer_atom ** test = tr_new( struct peer_atom*, atomCount );
3689
3690            /* keep the ones that are in use */
3691            for( i=0; i<atomCount; ++i ) {
3692                struct peer_atom * atom = atoms[i];
3693                if( peerIsInUse( t, atom ) )
3694                    keep[keepCount++] = atom;
3695                else
3696                    test[testCount++] = atom;
3697            }
3698
3699            /* if there's room, keep the best of what's left */
3700            i = 0;
3701            if( keepCount < maxAtomCount ) {
3702                qsort( test, testCount, sizeof( struct peer_atom * ), compareAtomPtrsByShelfDate );
3703                while( i<testCount && keepCount<maxAtomCount )
3704                    keep[keepCount++] = test[i++];
3705            }
3706
3707            /* free the culled atoms */
3708            while( i<testCount )
3709                tr_free( test[i++] );
3710
3711            /* rebuild Torrent.pool with what's left */
3712            tr_ptrArrayDestruct( &t->pool, NULL );
3713            t->pool = TR_PTR_ARRAY_INIT;
3714            qsort( keep, keepCount, sizeof( struct peer_atom * ), compareAtomPtrsByAddress );
3715            for( i=0; i<keepCount; ++i )
3716                tr_ptrArrayAppend( &t->pool, keep[i] );
3717
3718            tordbg( t, "max atom count is %d... pruned from %d to %d\n", maxAtomCount, atomCount, keepCount );
3719
3720            /* cleanup */
3721            tr_free( test );
3722            tr_free( keep );
3723        }
3724    }
3725
3726    tr_timerAddMsec( mgr->atomTimer, ATOM_PERIOD_MSEC );
3727    managerUnlock( mgr );
3728}
3729
3730/***
3731****
3732****
3733****
3734***/
3735
3736/* is this atom someone that we'd want to initiate a connection to? */
3737static bool
3738isPeerCandidate( const tr_torrent * tor, struct peer_atom * atom, const time_t now )
3739{
3740    /* not if we're both seeds */
3741    if( tr_torrentIsSeed( tor ) && atomIsSeed( atom ) )
3742        return false;
3743
3744    /* not if we've already got a connection to them... */
3745    if( peerIsInUse( tor->torrentPeers, atom ) )
3746        return false;
3747
3748    /* not if we just tried them already */
3749    if( ( now - atom->time ) < getReconnectIntervalSecs( atom, now ) )
3750        return false;
3751
3752    /* not if they're blocklisted */
3753    if( isAtomBlocklisted( tor->session, atom ) )
3754        return false;
3755
3756    /* not if they're banned... */
3757    if( atom->flags2 & MYFLAG_BANNED )
3758        return false;
3759
3760    return true;
3761}
3762
3763struct peer_candidate
3764{
3765    uint64_t score;
3766    tr_torrent * tor;
3767    struct peer_atom * atom;
3768};
3769
3770static bool
3771torrentWasRecentlyStarted( const tr_torrent * tor )
3772{
3773    return difftime( tr_time( ), tor->startDate ) < 120;
3774}
3775
3776static inline uint64_t
3777addValToKey( uint64_t value, int width, uint64_t addme )
3778{
3779    value = (value << (uint64_t)width);
3780    value |= addme;
3781    return value;
3782}
3783
3784/* smaller value is better */
3785static uint64_t
3786getPeerCandidateScore( const tr_torrent * tor, const struct peer_atom * atom, uint8_t salt  )
3787{
3788    uint64_t i;
3789    uint64_t score = 0;
3790    const bool failed = atom->lastConnectionAt < atom->lastConnectionAttemptAt;
3791
3792    /* prefer peers we've connected to, or never tried, over peers we failed to connect to. */
3793    i = failed ? 1 : 0;
3794    score = addValToKey( score, 1, i );
3795
3796    /* prefer the one we attempted least recently (to cycle through all peers) */
3797    i = atom->lastConnectionAttemptAt;
3798    score = addValToKey( score, 32, i );
3799
3800    /* prefer peers belonging to a torrent of a higher priority */
3801    switch( tr_torrentGetPriority( tor ) ) {
3802        case TR_PRI_HIGH:    i = 0; break;
3803        case TR_PRI_NORMAL:  i = 1; break;
3804        case TR_PRI_LOW:     i = 2; break;
3805    }
3806    score = addValToKey( score, 4, i );
3807
3808    /* prefer recently-started torrents */
3809    i = torrentWasRecentlyStarted( tor ) ? 0 : 1;
3810    score = addValToKey( score, 1, i );
3811
3812    /* prefer torrents we're downloading with */
3813    i = tr_torrentIsSeed( tor ) ? 1 : 0;
3814    score = addValToKey( score, 1, i );
3815
3816    /* prefer peers that are known to be connectible */
3817    i = ( atom->flags & ADDED_F_CONNECTABLE ) ? 0 : 1;
3818    score = addValToKey( score, 1, i );
3819
3820    /* prefer peers that we might have a chance of uploading to...
3821       so lower seed probability is better */
3822    if( atom->seedProbability == 100 ) i = 101;
3823    else if( atom->seedProbability == -1 ) i = 100;
3824    else i = atom->seedProbability;
3825    score = addValToKey( score, 8, i );
3826
3827    /* Prefer peers that we got from more trusted sources.
3828     * lower `fromBest' values indicate more trusted sources */
3829    score = addValToKey( score, 4, atom->fromBest );
3830
3831    /* salt */
3832    score = addValToKey( score, 8, salt );
3833
3834    return score;
3835}
3836
3837#ifndef NDEBUG
3838static int
3839checkPartition( const struct peer_candidate * candidates, int left, int right, uint64_t pivotScore, int storeIndex )
3840{
3841    int i;
3842
3843    assert( storeIndex >= left );
3844    assert( storeIndex <= right );
3845    assert( candidates[storeIndex].score == pivotScore );
3846
3847    for( i=left; i<storeIndex; ++i )
3848        assert( candidates[i].score < pivotScore );
3849    for( i=storeIndex+1; i<=right; ++i )
3850        assert( candidates[i].score >= pivotScore );
3851
3852    return true;
3853}
3854#endif
3855
3856/* Helper to selectBestCandidates().
3857 * Adapted from http://en.wikipedia.org/wiki/Selection_algorithm */
3858static int
3859partitionPeerCandidates( struct peer_candidate * candidates, int left, int right, int pivotIndex )
3860{
3861    int i;
3862    int storeIndex;
3863    struct peer_candidate tmp;
3864    const struct peer_candidate pivotValue = candidates[pivotIndex];
3865
3866    /* move pivot to end */
3867    tmp = candidates[right];
3868    candidates[right] = pivotValue;
3869    candidates[pivotIndex] = tmp;
3870
3871    storeIndex = left;
3872    for( i=left; i<=right; ++i )
3873    {
3874        if( candidates[i].score < pivotValue.score )
3875        {
3876            tmp = candidates[storeIndex];
3877            candidates[storeIndex] = candidates[i];
3878            candidates[i] = tmp;
3879            storeIndex++;
3880        }
3881    }
3882
3883    /* move pivot to its final place */
3884    tmp = candidates[right];
3885    candidates[right] = candidates[storeIndex];
3886    candidates[storeIndex] = tmp;
3887
3888    /* sanity check */
3889    assert( checkPartition( candidates, left, right, pivotValue.score, storeIndex ) );
3890
3891    return storeIndex;
3892}
3893
3894/* Adapted from http://en.wikipedia.org/wiki/Selection_algorithm */
3895static void
3896selectPeerCandidates( struct peer_candidate * candidates, int left, int right, int k )
3897{
3898    if( right > left )
3899    {
3900        const int pivotIndex = left + (right-left)/2;
3901
3902        int pivotNewIndex = partitionPeerCandidates( candidates, left, right, pivotIndex );
3903
3904        if( pivotNewIndex > left + k ) /* new condition */
3905            selectPeerCandidates( candidates, left, pivotNewIndex-1, k );
3906        else if( pivotNewIndex < left + k )
3907            selectPeerCandidates( candidates, pivotNewIndex+1, right, k+left-pivotNewIndex-1 );
3908    }
3909}
3910
3911#ifndef NDEBUG
3912static bool
3913checkBestScoresComeFirst( const struct peer_candidate * candidates, int n, int k )
3914{
3915    int i;
3916    uint64_t worstFirstScore = 0;
3917    const int x = MIN( n, k ) - 1;
3918
3919    for( i=0; i<x; i++ )
3920        if( worstFirstScore < candidates[i].score )
3921            worstFirstScore = candidates[i].score;
3922
3923    for( i=0; i<x; i++ )
3924        assert( candidates[i].score <= worstFirstScore );
3925
3926    for( i=x+1; i<n; i++ )
3927        assert( candidates[i].score >= worstFirstScore );
3928
3929    return true;
3930}
3931#endif /* NDEBUG */
3932
3933/** @return an array of all the atoms we might want to connect to */
3934static struct peer_candidate*
3935getPeerCandidates( tr_session * session, int * candidateCount, int max )
3936{
3937    int atomCount;
3938    int peerCount;
3939    tr_torrent * tor;
3940    struct peer_candidate * candidates;
3941    struct peer_candidate * walk;
3942    const time_t now = tr_time( );
3943    const uint64_t now_msec = tr_time_msec( );
3944    /* leave 5% of connection slots for incoming connections -- ticket #2609 */
3945    const int maxCandidates = tr_sessionGetPeerLimit( session ) * 0.95;
3946
3947    /* count how many peers and atoms we've got */
3948    tor= NULL;
3949    atomCount = 0;
3950    peerCount = 0;
3951    while(( tor = tr_torrentNext( session, tor ))) {
3952        atomCount += tr_ptrArraySize( &tor->torrentPeers->pool );
3953        peerCount += tr_ptrArraySize( &tor->torrentPeers->peers );
3954    }
3955
3956    /* don't start any new handshakes if we're full up */
3957    if( maxCandidates <= peerCount ) {
3958        *candidateCount = 0;
3959        return NULL;
3960    }
3961
3962    /* allocate an array of candidates */
3963    walk = candidates = tr_new( struct peer_candidate, atomCount );
3964
3965    /* populate the candidate array */
3966    tor = NULL;
3967    while(( tor = tr_torrentNext( session, tor )))
3968    {
3969        int i, nAtoms;
3970        struct peer_atom ** atoms;
3971
3972        if( !tor->torrentPeers->isRunning )
3973            continue;
3974
3975        /* if we've already got enough peers in this torrent... */
3976        if( tr_torrentGetPeerLimit( tor ) <= tr_ptrArraySize( &tor->torrentPeers->peers ) )
3977            continue;
3978
3979        /* if we've already got enough speed in this torrent... */
3980        if( tr_torrentIsSeed( tor ) && isBandwidthMaxedOut( &tor->bandwidth, now_msec, TR_UP ) )
3981            continue;
3982
3983        atoms = (struct peer_atom**) tr_ptrArrayPeek( &tor->torrentPeers->pool, &nAtoms );
3984        for( i=0; i<nAtoms; ++i )
3985        {
3986            struct peer_atom * atom = atoms[i];
3987
3988            if( isPeerCandidate( tor, atom, now ) )
3989            {
3990                const uint8_t salt = tr_cryptoWeakRandInt( 1024 );
3991                walk->tor = tor;
3992                walk->atom = atom;
3993                walk->score = getPeerCandidateScore( tor, atom, salt );
3994                ++walk;
3995            }
3996        }
3997    }
3998
3999    *candidateCount = walk - candidates;
4000    if( walk != candidates )
4001        selectPeerCandidates( candidates, 0, (walk-candidates)-1, max );
4002
4003    assert( checkBestScoresComeFirst( candidates, *candidateCount, max ) );
4004
4005    return candidates;
4006}
4007
4008static void
4009initiateConnection( tr_peerMgr * mgr, Torrent * t, struct peer_atom * atom )
4010{
4011    tr_peerIo * io;
4012    const time_t now = tr_time( );
4013    bool utp = tr_sessionIsUTPEnabled(mgr->session) && !atom->utp_failed;
4014
4015    if( atom->fromFirst == TR_PEER_FROM_PEX )
4016        /* PEX has explicit signalling for uTP support.  If an atom
4017           originally came from PEX and doesn't have the uTP flag, skip the
4018           uTP connection attempt.  Are we being optimistic here? */
4019        utp = utp && (atom->flags & ADDED_F_UTP_FLAGS);
4020
4021    tordbg( t, "Starting an OUTGOING%s connection with %s",
4022            utp ? " µTP" : "",
4023            tr_atomAddrStr( atom ) );
4024
4025    io = tr_peerIoNewOutgoing( mgr->session,
4026                               &mgr->session->bandwidth,
4027                               &atom->addr,
4028                               atom->port,
4029                               t->tor->info.hash,
4030                               t->tor->completeness == TR_SEED,
4031                               utp );
4032
4033    if( io == NULL )
4034    {
4035        tordbg( t, "peerIo not created; marking peer %s as unreachable",
4036                tr_atomAddrStr( atom ) );
4037        atom->flags2 |= MYFLAG_UNREACHABLE;
4038        atom->numFails++;
4039    }
4040    else
4041    {
4042        tr_handshake * handshake = tr_handshakeNew( io,
4043                                                    mgr->session->encryptionMode,
4044                                                    myHandshakeDoneCB,
4045                                                    mgr );
4046
4047        assert( tr_peerIoGetTorrentHash( io ) );
4048
4049        tr_peerIoUnref( io ); /* balanced by the initial ref
4050                                 in tr_peerIoNewOutgoing() */
4051
4052        tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
4053                                 handshakeCompare );
4054    }
4055
4056    atom->lastConnectionAttemptAt = now;
4057    atom->time = now;
4058}
4059
4060static void
4061initiateCandidateConnection( tr_peerMgr * mgr, struct peer_candidate * c )
4062{
4063#if 0
4064    fprintf( stderr, "Starting an OUTGOING connection with %s - [%s] seedProbability==%d; %s, %s\n",
4065             tr_atomAddrStr( c->atom ),
4066             tr_torrentName( c->tor ),
4067             (int)c->atom->seedProbability,
4068             tr_torrentIsPrivate( c->tor ) ? "private" : "public",
4069             tr_torrentIsSeed( c->tor ) ? "seed" : "downloader" );
4070#endif
4071
4072    initiateConnection( mgr, c->tor->torrentPeers, c->atom );
4073}
4074
4075static void
4076makeNewPeerConnections( struct tr_peerMgr * mgr, const int max )
4077{
4078    int i, n;
4079    struct peer_candidate * candidates;
4080
4081    candidates = getPeerCandidates( mgr->session, &n, max );
4082
4083    for( i=0; i<n && i<max; ++i )
4084        initiateCandidateConnection( mgr, &candidates[i] );
4085
4086    tr_free( candidates );
4087}
Note: See TracBrowser for help on using the repository browser.