source: trunk/libtransmission/peer-mgr.c @ 12248

Last change on this file since 12248 was 12248, checked in by jordan, 11 years ago

(trunk libT) break the mac build and introduce new crashes.

This is partially to address #4145 "Downloads stuck at 100%" by refactoring the bitset, bitfield, and tr_completion; however, the ripple effect is larger than usual so things may get worse in the short term before getting better.

livings124: to fix the mac build, remove bitset.[ch] from xcode

  • Property svn:keywords set to Date Rev Author Id
File size: 114.4 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 12248 2011-03-28 16:31:05Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h> /* error codes ERANGE, ... */
15#include <limits.h> /* INT_MAX */
16#include <string.h> /* memcpy, memcmp, strstr */
17#include <stdlib.h> /* qsort */
18
19#include <event2/event.h>
20#include <libutp/utp.h>
21
22#include "transmission.h"
23#include "announcer.h"
24#include "bandwidth.h"
25#include "blocklist.h"
26#include "cache.h"
27#include "clients.h"
28#include "completion.h"
29#include "crypto.h"
30#include "handshake.h"
31#include "net.h"
32#include "peer-io.h"
33#include "peer-mgr.h"
34#include "peer-msgs.h"
35#include "ptrarray.h"
36#include "session.h"
37#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
38#include "torrent.h"
39#include "utils.h"
40#include "webseed.h"
41
42enum
43{
44    /* how frequently to cull old atoms */
45    ATOM_PERIOD_MSEC = ( 60 * 1000 ),
46
47    /* how frequently to change which peers are choked */
48    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
49
50    /* an optimistically unchoked peer is immune from rechoking
51       for this many calls to rechokeUploads(). */
52    OPTIMISTIC_UNCHOKE_MULTIPLIER = 4,
53
54    /* how frequently to reallocate bandwidth */
55    BANDWIDTH_PERIOD_MSEC = 500,
56
57    /* how frequently to age out old piece request lists */
58    REFILL_UPKEEP_PERIOD_MSEC = ( 10 * 1000 ),
59
60    /* how frequently to decide which peers live and die */
61    RECONNECT_PERIOD_MSEC = 500,
62
63    /* when many peers are available, keep idle ones this long */
64    MIN_UPLOAD_IDLE_SECS = ( 60 ),
65
66    /* when few peers are available, keep idle ones this long */
67    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
68
69    /* max number of peers to ask for per second overall.
70     * this throttle is to avoid overloading the router */
71    MAX_CONNECTIONS_PER_SECOND = 12,
72
73    MAX_CONNECTIONS_PER_PULSE = (int)(MAX_CONNECTIONS_PER_SECOND * (RECONNECT_PERIOD_MSEC/1000.0)),
74
75    /* number of bad pieces a peer is allowed to send before we ban them */
76    MAX_BAD_PIECES_PER_PEER = 5,
77
78    /* amount of time to keep a list of request pieces lying around
79       before it's considered too old and needs to be rebuilt */
80    PIECE_LIST_SHELF_LIFE_SECS = 60,
81
82    /* use for bitwise operations w/peer_atom.flags2 */
83    MYFLAG_BANNED = 1,
84
85    /* use for bitwise operations w/peer_atom.flags2 */
86    /* unreachable for now... but not banned.
87     * if they try to connect to us it's okay */
88    MYFLAG_UNREACHABLE = 2,
89
90    /* the minimum we'll wait before attempting to reconnect to a peer */
91    MINIMUM_RECONNECT_INTERVAL_SECS = 5,
92
93    /** how long we'll let requests we've made linger before we cancel them */
94    REQUEST_TTL_SECS = 120,
95
96    NO_BLOCKS_CANCEL_HISTORY = 120,
97
98    CANCEL_HISTORY_SEC = 60
99};
100
101const tr_peer_event TR_PEER_EVENT_INIT = { 0, 0, NULL, 0, 0, 0, false, 0 };
102
103/**
104***
105**/
106
107/**
108 * Peer information that should be kept even before we've connected and
109 * after we've disconnected. These are kept in a pool of peer_atoms to decide
110 * which ones would make good candidates for connecting to, and to watch out
111 * for banned peers.
112 *
113 * @see tr_peer
114 * @see tr_peermsgs
115 */
116struct peer_atom
117{
118    uint8_t     fromFirst;          /* where the peer was first found */
119    uint8_t     fromBest;           /* the "best" value of where the peer has been found */
120    uint8_t     flags;              /* these match the added_f flags */
121    uint8_t     flags2;             /* flags that aren't defined in added_f */
122    int8_t      seedProbability;    /* how likely is this to be a seed... [0..100] or -1 for unknown */
123    int8_t      blocklisted;        /* -1 for unknown, true for blocklisted, false for not blocklisted */
124
125    tr_port     port;
126    bool        utp_failed;         /* We recently failed to connect over uTP */
127    uint16_t    numFails;
128    time_t      time;               /* when the peer's connection status last changed */
129    time_t      piece_data_time;
130
131    time_t      lastConnectionAttemptAt;
132    time_t      lastConnectionAt;
133
134    /* similar to a TTL field, but less rigid --
135     * if the swarm is small, the atom will be kept past this date. */
136    time_t      shelf_date;
137    tr_peer   * peer;               /* will be NULL if not connected */
138    tr_address  addr;
139};
140
141#ifdef NDEBUG
142#define tr_isAtom(a) (TRUE)
143#else
144static bool
145tr_isAtom( const struct peer_atom * atom )
146{
147    return ( atom != NULL )
148        && ( atom->fromFirst < TR_PEER_FROM__MAX )
149        && ( atom->fromBest < TR_PEER_FROM__MAX )
150        && ( tr_address_is_valid( &atom->addr ) );
151}
152#endif
153
154static const char*
155tr_atomAddrStr( const struct peer_atom * atom )
156{
157    return atom ? tr_peerIoAddrStr( &atom->addr, atom->port ) : "[no atom]";
158}
159
160struct block_request
161{
162    tr_block_index_t block;
163    tr_peer * peer;
164    time_t sentAt;
165};
166
167struct weighted_piece
168{
169    tr_piece_index_t index;
170    int16_t salt;
171    int16_t requestCount;
172};
173
174enum piece_sort_state
175{
176    PIECES_UNSORTED,
177    PIECES_SORTED_BY_INDEX,
178    PIECES_SORTED_BY_WEIGHT
179};
180
181/** @brief Opaque, per-torrent data structure for peer connection information */
182typedef struct tr_torrent_peers
183{
184    tr_ptrArray                outgoingHandshakes; /* tr_handshake */
185    tr_ptrArray                pool; /* struct peer_atom */
186    tr_ptrArray                peers; /* tr_peer */
187    tr_ptrArray                webseeds; /* tr_webseed */
188
189    tr_torrent               * tor;
190    struct tr_peerMgr        * manager;
191
192    tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
193    int                        optimisticUnchokeTimeScaler;
194
195    bool                       isRunning;
196    bool                       needsCompletenessCheck;
197
198    struct block_request     * requests;
199    int                        requestCount;
200    int                        requestAlloc;
201
202    struct weighted_piece    * pieces;
203    int                        pieceCount;
204    enum piece_sort_state      pieceSortState;
205
206    /* An array of pieceCount items stating how many peers have each piece.
207       This is used to help us for downloading pieces "rarest first."
208       This may be NULL if we don't have metainfo yet, or if we're not
209       downloading and don't care about rarity */
210    uint16_t                 * pieceReplication;
211    size_t                     pieceReplicationSize;
212
213    int                        interestedCount;
214    int                        maxPeers;
215    time_t                     lastCancel;
216
217    /* Before the endgame this should be 0. In endgame, is contains the average
218     * number of pending requests per peer. Only peers which have more pending
219     * requests are considered 'fast' are allowed to request a block that's
220     * already been requested from another (slower?) peer. */
221    int                        endgame;
222}
223Torrent;
224
225struct tr_peerMgr
226{
227    tr_session    * session;
228    tr_ptrArray     incomingHandshakes; /* tr_handshake */
229    struct event  * bandwidthTimer;
230    struct event  * rechokeTimer;
231    struct event  * refillUpkeepTimer;
232    struct event  * atomTimer;
233};
234
235#define tordbg( t, ... ) \
236    do { \
237        if( tr_deepLoggingIsActive( ) ) \
238            tr_deepLog( __FILE__, __LINE__, tr_torrentName( t->tor ), __VA_ARGS__ ); \
239    } while( 0 )
240
241#define dbgmsg( ... ) \
242    do { \
243        if( tr_deepLoggingIsActive( ) ) \
244            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
245    } while( 0 )
246
247/**
248***
249**/
250
251static inline void
252managerLock( const struct tr_peerMgr * manager )
253{
254    tr_sessionLock( manager->session );
255}
256
257static inline void
258managerUnlock( const struct tr_peerMgr * manager )
259{
260    tr_sessionUnlock( manager->session );
261}
262
263static inline void
264torrentLock( Torrent * torrent )
265{
266    managerLock( torrent->manager );
267}
268
269static inline void
270torrentUnlock( Torrent * torrent )
271{
272    managerUnlock( torrent->manager );
273}
274
275static inline int
276torrentIsLocked( const Torrent * t )
277{
278    return tr_sessionIsLocked( t->manager->session );
279}
280
281/**
282***
283**/
284
285static int
286handshakeCompareToAddr( const void * va, const void * vb )
287{
288    const tr_handshake * a = va;
289
290    return tr_address_compare( tr_handshakeGetAddr( a, NULL ), vb );
291}
292
293static int
294handshakeCompare( const void * a, const void * b )
295{
296    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
297}
298
299static inline tr_handshake*
300getExistingHandshake( tr_ptrArray * handshakes, const tr_address * addr )
301{
302    if( tr_ptrArrayEmpty( handshakes ) )
303        return NULL;
304
305    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
306}
307
308static int
309comparePeerAtomToAddress( const void * va, const void * vb )
310{
311    const struct peer_atom * a = va;
312
313    return tr_address_compare( &a->addr, vb );
314}
315
316static int
317compareAtomsByAddress( const void * va, const void * vb )
318{
319    const struct peer_atom * b = vb;
320
321    assert( tr_isAtom( b ) );
322
323    return comparePeerAtomToAddress( va, &b->addr );
324}
325
326/**
327***
328**/
329
330const tr_address *
331tr_peerAddress( const tr_peer * peer )
332{
333    return &peer->atom->addr;
334}
335
336static Torrent*
337getExistingTorrent( tr_peerMgr *    manager,
338                    const uint8_t * hash )
339{
340    tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash );
341
342    return tor == NULL ? NULL : tor->torrentPeers;
343}
344
345static int
346peerCompare( const void * a, const void * b )
347{
348    return tr_address_compare( tr_peerAddress( a ), tr_peerAddress( b ) );
349}
350
351static struct peer_atom*
352getExistingAtom( const Torrent    * t,
353                 const tr_address * addr )
354{
355    Torrent * tt = (Torrent*)t;
356    assert( torrentIsLocked( t ) );
357    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
358}
359
360static bool
361peerIsInUse( const Torrent * ct, const struct peer_atom * atom )
362{
363    Torrent * t = (Torrent*) ct;
364
365    assert( torrentIsLocked ( t ) );
366
367    return ( atom->peer != NULL )
368        || getExistingHandshake( &t->outgoingHandshakes, &atom->addr )
369        || getExistingHandshake( &t->manager->incomingHandshakes, &atom->addr );
370}
371
372void
373tr_peerConstruct( tr_peer * peer )
374{
375    memset( peer, 0, sizeof( tr_peer ) );
376
377    peer->have = TR_BITFIELD_INIT;
378
379    tr_historyConstruct( &peer->blocksSentToClient,  CANCEL_HISTORY_SEC, ( RECHOKE_PERIOD_MSEC / 1000 ) );
380    tr_historyConstruct( &peer->blocksSentToPeer,    CANCEL_HISTORY_SEC, ( RECHOKE_PERIOD_MSEC / 1000 ) );
381    tr_historyConstruct( &peer->cancelsSentToClient, CANCEL_HISTORY_SEC, ( RECHOKE_PERIOD_MSEC / 1000 ) );
382    tr_historyConstruct( &peer->cancelsSentToPeer,   CANCEL_HISTORY_SEC, ( REFILL_UPKEEP_PERIOD_MSEC / 1000 ) );
383}
384
385static tr_peer*
386peerNew( struct peer_atom * atom )
387{
388    tr_peer * peer = tr_new( tr_peer, 1 );
389    tr_peerConstruct( peer );
390
391    peer->atom = atom;
392    atom->peer = peer;
393
394    return peer;
395}
396
397static tr_peer*
398getPeer( Torrent * torrent, struct peer_atom * atom )
399{
400    tr_peer * peer;
401
402    assert( torrentIsLocked( torrent ) );
403
404    peer = atom->peer;
405
406    if( peer == NULL )
407    {
408        peer = peerNew( atom );
409        tr_bitfieldConstruct( &peer->have, torrent->tor->blockCount );
410        tr_bitfieldConstruct( &peer->blame, torrent->tor->blockCount );
411        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
412    }
413
414    return peer;
415}
416
417static void peerDeclinedAllRequests( Torrent *, const tr_peer * );
418
419void
420tr_peerDestruct( tr_torrent * tor, tr_peer * peer )
421{
422    assert( peer != NULL );
423
424    if( tor->torrentPeers->isRunning )
425        peerDeclinedAllRequests( tor->torrentPeers, peer );
426
427    if( peer->msgs != NULL )
428        tr_peerMsgsFree( peer->msgs );
429
430    if( peer->io ) {
431fprintf( stderr, "[%p] peer %p clearing/unreffing its io (%s:%d)\n", peer->io, peer, __FILE__, __LINE__ );
432        tr_peerIoClear( peer->io );
433        tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
434    }
435
436    tr_historyDestruct( &peer->blocksSentToClient  );
437    tr_historyDestruct( &peer->blocksSentToPeer    );
438    tr_historyDestruct( &peer->cancelsSentToClient );
439    tr_historyDestruct( &peer->cancelsSentToPeer   );
440
441    tr_bitfieldDestruct( &peer->have );
442    tr_bitfieldDestruct( &peer->blame );
443    tr_free( peer->client );
444
445    if( peer->atom )
446        peer->atom->peer = NULL;
447}
448
449static void
450peerDelete( Torrent * t, tr_peer * peer )
451{
452    tr_peerDestruct( t->tor, peer );
453    tr_free( peer );
454}
455
456static bool
457replicationExists( const Torrent * t )
458{
459    return t->pieceReplication != NULL;
460}
461
462static void
463replicationFree( Torrent * t )
464{
465    tr_free( t->pieceReplication );
466    t->pieceReplication = NULL;
467    t->pieceReplicationSize = 0;
468}
469
470static void
471replicationNew( Torrent * t )
472{
473    tr_piece_index_t piece_i;
474    const tr_piece_index_t piece_count = t->tor->info.pieceCount;
475    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
476    const int peer_count = tr_ptrArraySize( &t->peers );
477
478    assert( !replicationExists( t ) );
479
480    t->pieceReplicationSize = piece_count;
481    t->pieceReplication = tr_new0( uint16_t, piece_count );
482
483    for( piece_i=0; piece_i<piece_count; ++piece_i )
484    {
485        int peer_i;
486        uint16_t r = 0;
487
488        for( peer_i=0; peer_i<peer_count; ++peer_i )
489            if( tr_bitfieldHas( &peers[peer_i]->have, piece_i ) )
490                ++r;
491
492        t->pieceReplication[piece_i] = r;
493    }
494}
495
496static void
497torrentFree( void * vt )
498{
499    Torrent * t = vt;
500
501    assert( t );
502    assert( !t->isRunning );
503    assert( torrentIsLocked( t ) );
504    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
505    assert( tr_ptrArrayEmpty( &t->peers ) );
506
507    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
508    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
509    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
510    tr_ptrArrayDestruct( &t->peers, NULL );
511
512    replicationFree( t );
513
514    tr_free( t->requests );
515    tr_free( t->pieces );
516    tr_free( t );
517}
518
519static void peerCallbackFunc( tr_peer *, const tr_peer_event *, void * );
520
521static Torrent*
522torrentNew( tr_peerMgr * manager, tr_torrent * tor )
523{
524    int       i;
525    Torrent * t;
526
527    t = tr_new0( Torrent, 1 );
528    t->manager = manager;
529    t->tor = tor;
530    t->pool = TR_PTR_ARRAY_INIT;
531    t->peers = TR_PTR_ARRAY_INIT;
532    t->webseeds = TR_PTR_ARRAY_INIT;
533    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
534
535    for( i = 0; i < tor->info.webseedCount; ++i )
536    {
537        tr_webseed * w =
538            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
539        tr_ptrArrayAppend( &t->webseeds, w );
540    }
541
542    return t;
543}
544
545tr_peerMgr*
546tr_peerMgrNew( tr_session * session )
547{
548    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
549    m->session = session;
550    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
551    return m;
552}
553
554static void
555deleteTimer( struct event ** t )
556{
557    if( *t != NULL )
558    {
559        event_free( *t );
560        *t = NULL;
561    }
562}
563
564static void
565deleteTimers( struct tr_peerMgr * m )
566{
567    deleteTimer( &m->atomTimer );
568    deleteTimer( &m->bandwidthTimer );
569    deleteTimer( &m->rechokeTimer );
570    deleteTimer( &m->refillUpkeepTimer );
571}
572
573void
574tr_peerMgrFree( tr_peerMgr * manager )
575{
576    managerLock( manager );
577
578    deleteTimers( manager );
579
580    /* free the handshakes. Abort invokes handshakeDoneCB(), which removes
581     * the item from manager->handshakes, so this is a little roundabout... */
582    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
583        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
584
585    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
586
587    managerUnlock( manager );
588    tr_free( manager );
589}
590
591static int
592clientIsDownloadingFrom( const tr_torrent * tor, const tr_peer * peer )
593{
594    if( !tr_torrentHasMetadata( tor ) )
595        return true;
596
597    return peer->clientIsInterested && !peer->clientIsChoked;
598}
599
600static int
601clientIsUploadingTo( const tr_peer * peer )
602{
603    return peer->peerIsInterested && !peer->peerIsChoked;
604}
605
606/***
607****
608***/
609
610void
611tr_peerMgrOnBlocklistChanged( tr_peerMgr * mgr )
612{
613    tr_torrent * tor = NULL;
614    tr_session * session = mgr->session;
615
616    /* we cache whether or not a peer is blocklisted...
617       since the blocklist has changed, erase that cached value */
618    while(( tor = tr_torrentNext( session, tor )))
619    {
620        int i;
621        Torrent * t = tor->torrentPeers;
622        const int n = tr_ptrArraySize( &t->pool );
623        for( i=0; i<n; ++i ) {
624            struct peer_atom * atom = tr_ptrArrayNth( &t->pool, i );
625            atom->blocklisted = -1;
626        }
627    }
628}
629
630static bool
631isAtomBlocklisted( tr_session * session, struct peer_atom * atom )
632{
633    if( atom->blocklisted < 0 )
634        atom->blocklisted = tr_sessionIsAddressBlocked( session, &atom->addr );
635
636    assert( tr_isBool( atom->blocklisted ) );
637    return atom->blocklisted;
638}
639
640
641/***
642****
643***/
644
645static void
646atomSetSeedProbability( struct peer_atom * atom, int seedProbability )
647{
648    assert( atom != NULL );
649    assert( -1<=seedProbability && seedProbability<=100 );
650
651    atom->seedProbability = seedProbability;
652
653    if( seedProbability == 100 )
654        atom->flags |= ADDED_F_SEED_FLAG;
655    else if( seedProbability != -1 )
656        atom->flags &= ~ADDED_F_SEED_FLAG;
657}
658
659static inline bool
660atomIsSeed( const struct peer_atom * atom )
661{
662    return atom->seedProbability == 100;
663}
664
665static void
666atomSetSeed( const Torrent * t, struct peer_atom * atom )
667{
668    if( !atomIsSeed( atom ) )
669    {
670        tordbg( t, "marking peer %s as a seed", tr_atomAddrStr( atom ) );
671
672        atomSetSeedProbability( atom, 100 );
673    }
674}
675
676
677bool
678tr_peerMgrPeerIsSeed( const tr_torrent  * tor,
679                      const tr_address  * addr )
680{
681    bool isSeed = false;
682    const Torrent * t = tor->torrentPeers;
683    const struct peer_atom * atom = getExistingAtom( t, addr );
684
685    if( atom )
686        isSeed = atomIsSeed( atom );
687
688    return isSeed;
689}
690
691void
692tr_peerMgrSetUtpSupported( tr_torrent * tor, const tr_address * addr )
693{
694    struct peer_atom * atom = getExistingAtom( tor->torrentPeers, addr );
695
696    if( atom )
697        atom->flags |= ADDED_F_UTP_FLAGS;
698}
699
700void
701tr_peerMgrSetUtpFailed( tr_torrent *tor, const tr_address *addr, bool failed )
702{
703    struct peer_atom * atom = getExistingAtom( tor->torrentPeers, addr );
704
705    if( atom )
706        atom->utp_failed = failed;
707}
708
709
710/**
711***  REQUESTS
712***
713*** There are two data structures associated with managing block requests:
714***
715*** 1. Torrent::requests, an array of "struct block_request" which keeps
716***    track of which blocks have been requested, and when, and by which peers.
717***    This is list is used for (a) cancelling requests that have been pending
718***    for too long and (b) avoiding duplicate requests before endgame.
719***
720*** 2. Torrent::pieces, an array of "struct weighted_piece" which lists the
721***    pieces that we want to request. It's used to decide which blocks to
722***    return next when tr_peerMgrGetBlockRequests() is called.
723**/
724
725/**
726*** struct block_request
727**/
728
729static int
730compareReqByBlock( const void * va, const void * vb )
731{
732    const struct block_request * a = va;
733    const struct block_request * b = vb;
734
735    /* primary key: block */
736    if( a->block < b->block ) return -1;
737    if( a->block > b->block ) return 1;
738
739    /* secondary key: peer */
740    if( a->peer < b->peer ) return -1;
741    if( a->peer > b->peer ) return 1;
742
743    return 0;
744}
745
746static void
747requestListAdd( Torrent * t, tr_block_index_t block, tr_peer * peer )
748{
749    struct block_request key;
750
751    /* ensure enough room is available... */
752    if( t->requestCount + 1 >= t->requestAlloc )
753    {
754        const int CHUNK_SIZE = 128;
755        t->requestAlloc += CHUNK_SIZE;
756        t->requests = tr_renew( struct block_request,
757                                t->requests, t->requestAlloc );
758    }
759
760    /* populate the record we're inserting */
761    key.block = block;
762    key.peer = peer;
763    key.sentAt = tr_time( );
764
765    /* insert the request to our array... */
766    {
767        bool exact;
768        const int pos = tr_lowerBound( &key, t->requests, t->requestCount,
769                                       sizeof( struct block_request ),
770                                       compareReqByBlock, &exact );
771        assert( !exact );
772        memmove( t->requests + pos + 1,
773                 t->requests + pos,
774                 sizeof( struct block_request ) * ( t->requestCount++ - pos ) );
775        t->requests[pos] = key;
776    }
777
778    if( peer != NULL )
779    {
780        ++peer->pendingReqsToPeer;
781        assert( peer->pendingReqsToPeer >= 0 );
782    }
783
784    /*fprintf( stderr, "added request of block %lu from peer %s... "
785                       "there are now %d block\n",
786                       (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
787}
788
789static struct block_request *
790requestListLookup( Torrent * t, tr_block_index_t block, const tr_peer * peer )
791{
792    struct block_request key;
793    key.block = block;
794    key.peer = (tr_peer*) peer;
795
796    return bsearch( &key, t->requests, t->requestCount,
797                    sizeof( struct block_request ),
798                    compareReqByBlock );
799}
800
801/**
802 * Find the peers are we currently requesting the block
803 * with index @a block from and append them to @a peerArr.
804 */
805static void
806getBlockRequestPeers( Torrent * t, tr_block_index_t block,
807                      tr_ptrArray * peerArr )
808{
809    bool exact;
810    int i, pos;
811    struct block_request key;
812
813    key.block = block;
814    key.peer = NULL;
815    pos = tr_lowerBound( &key, t->requests, t->requestCount,
816                         sizeof( struct block_request ),
817                         compareReqByBlock, &exact );
818
819    assert( !exact ); /* shouldn't have a request with .peer == NULL */
820
821    for( i = pos; i < t->requestCount; ++i )
822    {
823        if( t->requests[i].block != block )
824            break;
825        tr_ptrArrayAppend( peerArr, t->requests[i].peer );
826    }
827}
828
829static void
830decrementPendingReqCount( const struct block_request * b )
831{
832    if( b->peer != NULL )
833        if( b->peer->pendingReqsToPeer > 0 )
834            --b->peer->pendingReqsToPeer;
835}
836
837static void
838requestListRemove( Torrent * t, tr_block_index_t block, const tr_peer * peer )
839{
840    const struct block_request * b = requestListLookup( t, block, peer );
841    if( b != NULL )
842    {
843        const int pos = b - t->requests;
844        assert( pos < t->requestCount );
845
846        decrementPendingReqCount( b );
847
848        tr_removeElementFromArray( t->requests,
849                                   pos,
850                                   sizeof( struct block_request ),
851                                   t->requestCount-- );
852
853        /*fprintf( stderr, "removing request of block %lu from peer %s... "
854                           "there are now %d block requests left\n",
855                           (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
856    }
857}
858
859static int
860countActiveWebseeds( const Torrent * t )
861{
862    int activeCount = 0;
863    const tr_webseed ** w = (const tr_webseed **) tr_ptrArrayBase( &t->webseeds );
864    const tr_webseed ** const wend = w + tr_ptrArraySize( &t->webseeds );
865
866    for( ; w!=wend; ++w )
867        if( tr_webseedIsActive( *w ) )
868            ++activeCount;
869
870    return activeCount;
871}
872
873static bool
874testForEndgame( const Torrent * t )
875{
876    /* we consider ourselves to be in endgame if the number of bytes
877       we've got requested is >= the number of bytes left to download */
878    return ( t->requestCount * t->tor->blockSize )
879               <= tr_cpLeftUntilDone( &t->tor->completion );
880}
881
882static void
883updateEndgame( Torrent * t )
884{
885    assert( t->requestCount >= 0 );
886
887    if( !testForEndgame( t ) )
888    {
889        /* not in endgame */
890        t->endgame = 0;
891    }
892    else if( !t->endgame ) /* only recalculate when endgame first begins */
893    {
894        int numDownloading = 0;
895        const tr_peer ** p = (const tr_peer **) tr_ptrArrayBase( &t->peers );
896        const tr_peer ** const pend = p + tr_ptrArraySize( &t->peers );
897
898        /* add the active bittorrent peers... */
899        for( ; p!=pend; ++p )
900            if( (*p)->pendingReqsToPeer > 0 )
901                ++numDownloading;
902
903        /* add the active webseeds... */
904        numDownloading += countActiveWebseeds( t );
905
906        /* average number of pending requests per downloading peer */
907        t->endgame = t->requestCount / MAX( numDownloading, 1 );
908    }
909}
910
911
912/****
913*****
914*****  Piece List Manipulation / Accessors
915*****
916****/
917
918static inline void
919invalidatePieceSorting( Torrent * t )
920{
921    t->pieceSortState = PIECES_UNSORTED;
922}
923
924static const tr_torrent * weightTorrent;
925
926static const uint16_t * weightReplication;
927
928static void
929setComparePieceByWeightTorrent( Torrent * t )
930{
931    if( !replicationExists( t ) )
932        replicationNew( t );
933
934    weightTorrent = t->tor;
935    weightReplication = t->pieceReplication;
936}
937
938/* we try to create a "weight" s.t. high-priority pieces come before others,
939 * and that partially-complete pieces come before empty ones. */
940static int
941comparePieceByWeight( const void * va, const void * vb )
942{
943    const struct weighted_piece * a = va;
944    const struct weighted_piece * b = vb;
945    int ia, ib, missing, pending;
946    const tr_torrent * tor = weightTorrent;
947    const uint16_t * rep = weightReplication;
948
949    /* primary key: weight */
950    missing = tr_cpMissingBlocksInPiece( &tor->completion, a->index );
951    pending = a->requestCount;
952    ia = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
953    missing = tr_cpMissingBlocksInPiece( &tor->completion, b->index );
954    pending = b->requestCount;
955    ib = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
956    if( ia < ib ) return -1;
957    if( ia > ib ) return 1;
958
959    /* secondary key: higher priorities go first */
960    ia = tor->info.pieces[a->index].priority;
961    ib = tor->info.pieces[b->index].priority;
962    if( ia > ib ) return -1;
963    if( ia < ib ) return 1;
964
965    /* tertiary key: rarest first. */
966    ia = rep[a->index];
967    ib = rep[b->index];
968    if( ia < ib ) return -1;
969    if( ia > ib ) return 1;
970
971    /* quaternary key: random */
972    if( a->salt < b->salt ) return -1;
973    if( a->salt > b->salt ) return 1;
974
975    /* okay, they're equal */
976    return 0;
977}
978
979static int
980comparePieceByIndex( const void * va, const void * vb )
981{
982    const struct weighted_piece * a = va;
983    const struct weighted_piece * b = vb;
984    if( a->index < b->index ) return -1;
985    if( a->index > b->index ) return 1;
986    return 0;
987}
988
989static void
990pieceListSort( Torrent * t, enum piece_sort_state state )
991{
992    assert( state==PIECES_SORTED_BY_INDEX
993         || state==PIECES_SORTED_BY_WEIGHT );
994
995
996    if( state == PIECES_SORTED_BY_WEIGHT )
997    {
998        setComparePieceByWeightTorrent( t );
999        qsort( t->pieces, t->pieceCount, sizeof( struct weighted_piece ), comparePieceByWeight );
1000    }
1001    else
1002        qsort( t->pieces, t->pieceCount, sizeof( struct weighted_piece ), comparePieceByIndex );
1003
1004    t->pieceSortState = state;
1005}
1006
1007/**
1008 * These functions are useful for testing, but too expensive for nightly builds.
1009 * let's leave it disabled but add an easy hook to compile it back in
1010 */
1011#if 1
1012#define assertWeightedPiecesAreSorted(t)
1013#define assertReplicationCountIsExact(t)
1014#else
1015static void
1016assertWeightedPiecesAreSorted( Torrent * t )
1017{
1018    if( !t->endgame )
1019    {
1020        int i;
1021        setComparePieceByWeightTorrent( t );
1022        for( i=0; i<t->pieceCount-1; ++i )
1023            assert( comparePieceByWeight( &t->pieces[i], &t->pieces[i+1] ) <= 0 );
1024    }
1025}
1026static void
1027assertReplicationCountIsExact( Torrent * t )
1028{
1029    /* This assert might fail due to errors of implementations in other
1030     * clients. It happens when receiving duplicate bitfields/HaveAll/HaveNone
1031     * from a client. If a such a behavior is noticed,
1032     * a bug report should be filled to the faulty client. */
1033
1034    size_t piece_i;
1035    const uint16_t * rep = t->pieceReplication;
1036    const size_t piece_count = t->pieceReplicationSize;
1037    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1038    const int peer_count = tr_ptrArraySize( &t->peers );
1039
1040    assert( piece_count == t->tor->info.pieceCount );
1041
1042    for( piece_i=0; piece_i<piece_count; ++piece_i )
1043    {
1044        int peer_i;
1045        uint16_t r = 0;
1046
1047        for( peer_i=0; peer_i<peer_count; ++peer_i )
1048            if( tr_bitsetHas( &peers[peer_i]->have, piece_i ) )
1049                ++r;
1050
1051        assert( rep[piece_i] == r );
1052    }
1053}
1054#endif
1055
1056static struct weighted_piece *
1057pieceListLookup( Torrent * t, tr_piece_index_t index )
1058{
1059    int i;
1060
1061    for( i=0; i<t->pieceCount; ++i )
1062        if( t->pieces[i].index == index )
1063            return &t->pieces[i];
1064
1065    return NULL;
1066}
1067
1068static void
1069pieceListRebuild( Torrent * t )
1070{
1071
1072    if( !tr_torrentIsSeed( t->tor ) )
1073    {
1074        tr_piece_index_t i;
1075        tr_piece_index_t * pool;
1076        tr_piece_index_t poolCount = 0;
1077        const tr_torrent * tor = t->tor;
1078        const tr_info * inf = tr_torrentInfo( tor );
1079        struct weighted_piece * pieces;
1080        int pieceCount;
1081
1082        /* build the new list */
1083        pool = tr_new( tr_piece_index_t, inf->pieceCount );
1084        for( i=0; i<inf->pieceCount; ++i )
1085            if( !inf->pieces[i].dnd )
1086                if( !tr_cpPieceIsComplete( &tor->completion, i ) )
1087                    pool[poolCount++] = i;
1088        pieceCount = poolCount;
1089        pieces = tr_new0( struct weighted_piece, pieceCount );
1090        for( i=0; i<poolCount; ++i ) {
1091            struct weighted_piece * piece = pieces + i;
1092            piece->index = pool[i];
1093            piece->requestCount = 0;
1094            piece->salt = tr_cryptoWeakRandInt( 4096 );
1095        }
1096
1097        /* if we already had a list of pieces, merge it into
1098         * the new list so we don't lose its requestCounts */
1099        if( t->pieces != NULL )
1100        {
1101            struct weighted_piece * o = t->pieces;
1102            struct weighted_piece * oend = o + t->pieceCount;
1103            struct weighted_piece * n = pieces;
1104            struct weighted_piece * nend = n + pieceCount;
1105
1106            pieceListSort( t, PIECES_SORTED_BY_INDEX );
1107
1108            while( o!=oend && n!=nend ) {
1109                if( o->index < n->index )
1110                    ++o;
1111                else if( o->index > n->index )
1112                    ++n;
1113                else
1114                    *n++ = *o++;
1115            }
1116
1117            tr_free( t->pieces );
1118        }
1119
1120        t->pieces = pieces;
1121        t->pieceCount = pieceCount;
1122
1123        pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
1124
1125        /* cleanup */
1126        tr_free( pool );
1127    }
1128}
1129
1130static void
1131pieceListRemovePiece( Torrent * t, tr_piece_index_t piece )
1132{
1133    struct weighted_piece * p;
1134
1135    if(( p = pieceListLookup( t, piece )))
1136    {
1137        const int pos = p - t->pieces;
1138
1139        tr_removeElementFromArray( t->pieces,
1140                                   pos,
1141                                   sizeof( struct weighted_piece ),
1142                                   t->pieceCount-- );
1143
1144        if( t->pieceCount == 0 )
1145        {
1146            tr_free( t->pieces );
1147            t->pieces = NULL;
1148        }
1149    }
1150}
1151
1152static void
1153pieceListResortPiece( Torrent * t, struct weighted_piece * p )
1154{
1155    int pos;
1156    bool isSorted = true;
1157
1158    if( p == NULL )
1159        return;
1160
1161    /* is the torrent already sorted? */
1162    pos = p - t->pieces;
1163    setComparePieceByWeightTorrent( t );
1164    if( isSorted && ( pos > 0 ) && ( comparePieceByWeight( p-1, p ) > 0 ) )
1165        isSorted = false;
1166    if( isSorted && ( pos < t->pieceCount - 1 ) && ( comparePieceByWeight( p, p+1 ) > 0 ) )
1167        isSorted = false;
1168
1169    if( t->pieceSortState != PIECES_SORTED_BY_WEIGHT )
1170    {
1171       pieceListSort( t, PIECES_SORTED_BY_WEIGHT);
1172       isSorted = true;
1173    }
1174
1175    /* if it's not sorted, move it around */
1176    if( !isSorted )
1177    {
1178        bool exact;
1179        const struct weighted_piece tmp = *p;
1180
1181        tr_removeElementFromArray( t->pieces,
1182                                   pos,
1183                                   sizeof( struct weighted_piece ),
1184                                   t->pieceCount-- );
1185
1186        pos = tr_lowerBound( &tmp, t->pieces, t->pieceCount,
1187                             sizeof( struct weighted_piece ),
1188                             comparePieceByWeight, &exact );
1189
1190        memmove( &t->pieces[pos + 1],
1191                 &t->pieces[pos],
1192                 sizeof( struct weighted_piece ) * ( t->pieceCount++ - pos ) );
1193
1194        t->pieces[pos] = tmp;
1195    }
1196
1197    assertWeightedPiecesAreSorted( t );
1198}
1199
1200static void
1201pieceListRemoveRequest( Torrent * t, tr_block_index_t block )
1202{
1203    struct weighted_piece * p;
1204    const tr_piece_index_t index = tr_torBlockPiece( t->tor, block );
1205
1206    if( ((p = pieceListLookup( t, index ))) && ( p->requestCount > 0 ) )
1207    {
1208        --p->requestCount;
1209        pieceListResortPiece( t, p );
1210    }
1211}
1212
1213
1214/****
1215*****
1216*****  Replication count ( for rarest first policy )
1217*****
1218****/
1219
1220/**
1221 * Increase the replication count of this piece and sort it if the
1222 * piece list is already sorted
1223 */
1224static void
1225tr_incrReplicationOfPiece( Torrent * t, const size_t index )
1226{
1227    assert( replicationExists( t ) );
1228    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1229
1230    /* One more replication of this piece is present in the swarm */
1231    ++t->pieceReplication[index];
1232
1233    /* we only resort the piece if the list is already sorted */
1234    if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1235        pieceListResortPiece( t, pieceListLookup( t, index ) );
1236}
1237
1238/**
1239 * Increases the replication count of pieces present in the bitfield
1240 */
1241static void
1242tr_incrReplicationFromBitfield( Torrent * t, const tr_bitfield * b )
1243{
1244    size_t i;
1245    uint16_t * rep = t->pieceReplication;
1246    const size_t n = t->tor->info.pieceCount;
1247
1248    assert( replicationExists( t ) );
1249    assert( n == t->pieceReplicationSize );
1250    assert( tr_bitfieldTestFast( b, n-1 ) );
1251
1252    for( i=0; i<n; ++i )
1253        if( tr_bitfieldHas( b, i ) )
1254            ++rep[i];
1255
1256    if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1257        invalidatePieceSorting( t );
1258}
1259
1260/**
1261 * Increase the replication count of every piece
1262 */
1263static void
1264tr_incrReplication( Torrent * t )
1265{
1266    int i;
1267    const int n = t->pieceReplicationSize;
1268
1269    assert( replicationExists( t ) );
1270    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1271
1272    for( i=0; i<n; ++i )
1273        ++t->pieceReplication[i];
1274}
1275
1276/**
1277 * Decrease the replication count of pieces present in the bitset.
1278 */
1279static void
1280tr_decrReplicationFromBitfield( Torrent * t, const tr_bitfield * b )
1281{
1282    int i;
1283    const int n = t->pieceReplicationSize;
1284
1285    assert( replicationExists( t ) );
1286    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1287
1288    if( tr_bitfieldHasAll( b ) )
1289    {
1290        for( i=0; i<n; ++i )
1291            --t->pieceReplication[i];
1292    }
1293    else if ( !tr_bitfieldHasNone( b ) )
1294    {
1295        for( i=0; i<n; ++i )
1296            if( tr_bitfieldHas( b, i ) )
1297                --t->pieceReplication[i];
1298
1299        if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1300            invalidatePieceSorting( t );
1301    }
1302}
1303
1304/**
1305***
1306**/
1307
1308void
1309tr_peerMgrRebuildRequests( tr_torrent * tor )
1310{
1311    assert( tr_isTorrent( tor ) );
1312
1313    pieceListRebuild( tor->torrentPeers );
1314}
1315
1316void
1317tr_peerMgrGetNextRequests( tr_torrent           * tor,
1318                           tr_peer              * peer,
1319                           int                    numwant,
1320                           tr_block_index_t     * setme,
1321                           int                  * numgot )
1322{
1323    int i;
1324    int got;
1325    Torrent * t;
1326    struct weighted_piece * pieces;
1327    const tr_bitfield * const have = &peer->have;
1328
1329    /* sanity clause */
1330    assert( tr_isTorrent( tor ) );
1331    assert( peer->clientIsInterested );
1332    assert( !peer->clientIsChoked );
1333    assert( numwant > 0 );
1334
1335    /* walk through the pieces and find blocks that should be requested */
1336    got = 0;
1337    t = tor->torrentPeers;
1338
1339    /* prep the pieces list */
1340    if( t->pieces == NULL )
1341        pieceListRebuild( t );
1342
1343    if( t->pieceSortState != PIECES_SORTED_BY_WEIGHT )
1344        pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
1345
1346    assertReplicationCountIsExact( t );
1347    assertWeightedPiecesAreSorted( t );
1348
1349    updateEndgame( t );
1350    pieces = t->pieces;
1351    for( i=0; i<t->pieceCount && got<numwant; ++i )
1352    {
1353        struct weighted_piece * p = pieces + i;
1354
1355        /* if the peer has this piece that we want... */
1356        if( tr_bitfieldHas( have, p->index ) )
1357        {
1358            tr_block_index_t b;
1359            tr_block_index_t first;
1360            tr_block_index_t last;
1361            tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1362
1363            tr_torGetPieceBlockRange( tor, p->index, &first, &last );
1364
1365            for( b=first; b<=last && got<numwant; ++b )
1366            {
1367                int peerCount;
1368                tr_peer ** peers;
1369
1370                /* don't request blocks we've already got */
1371                if( tr_cpBlockIsComplete( &tor->completion, b ) )
1372                    continue;
1373
1374                /* always add peer if this block has no peers yet */
1375                tr_ptrArrayClear( &peerArr );
1376                getBlockRequestPeers( t, b, &peerArr );
1377                peers = (tr_peer **) tr_ptrArrayPeek( &peerArr, &peerCount );
1378                if( peerCount != 0 )
1379                {
1380                    /* don't make a second block request until the endgame */
1381                    if( !t->endgame )
1382                        continue;
1383
1384                    /* don't have more than two peers requesting this block */
1385                    if( peerCount > 1 )
1386                        continue;
1387
1388                    /* don't send the same request to the same peer twice */
1389                    if( peer == peers[0] )
1390                        continue;
1391
1392                    /* in the endgame allow an additional peer to download a
1393                       block but only if the peer seems to be handling requests
1394                       relatively fast */
1395                    if( peer->pendingReqsToPeer + numwant - got < t->endgame )
1396                        continue;
1397                }
1398
1399                /* update the caller's table */
1400                setme[got++] = b;
1401
1402                /* update our own tables */
1403                requestListAdd( t, b, peer );
1404                ++p->requestCount;
1405            }
1406
1407            tr_ptrArrayDestruct( &peerArr, NULL );
1408        }
1409    }
1410
1411    /* In most cases we've just changed the weights of a small number of pieces.
1412     * So rather than qsort()ing the entire array, it's faster to apply an
1413     * adaptive insertion sort algorithm. */
1414    if( got > 0 )
1415    {
1416        /* not enough requests || last piece modified */
1417        if ( i == t->pieceCount ) --i;
1418
1419        setComparePieceByWeightTorrent( t );
1420        while( --i >= 0 )
1421        {
1422            bool exact;
1423
1424            /* relative position! */
1425            const int newpos = tr_lowerBound( &t->pieces[i], &t->pieces[i + 1],
1426                                              t->pieceCount - (i + 1),
1427                                              sizeof( struct weighted_piece ),
1428                                              comparePieceByWeight, &exact );
1429            if( newpos > 0 )
1430            {
1431                const struct weighted_piece piece = t->pieces[i];
1432                memmove( &t->pieces[i],
1433                         &t->pieces[i + 1],
1434                         sizeof( struct weighted_piece ) * ( newpos ) );
1435                t->pieces[i + newpos] = piece;
1436            }
1437        }
1438    }
1439
1440    assertWeightedPiecesAreSorted( t );
1441    *numgot = got;
1442}
1443
1444bool
1445tr_peerMgrDidPeerRequest( const tr_torrent  * tor,
1446                          const tr_peer     * peer,
1447                          tr_block_index_t    block )
1448{
1449    const Torrent * t = tor->torrentPeers;
1450    return requestListLookup( (Torrent*)t, block, peer ) != NULL;
1451}
1452
1453/* cancel requests that are too old */
1454static void
1455refillUpkeep( int foo UNUSED, short bar UNUSED, void * vmgr )
1456{
1457    time_t now;
1458    time_t too_old;
1459    tr_torrent * tor;
1460    tr_peerMgr * mgr = vmgr;
1461    managerLock( mgr );
1462
1463    now = tr_time( );
1464    too_old = now - REQUEST_TTL_SECS;
1465
1466    tor = NULL;
1467    while(( tor = tr_torrentNext( mgr->session, tor )))
1468    {
1469        Torrent * t = tor->torrentPeers;
1470        const int n = t->requestCount;
1471        if( n > 0 )
1472        {
1473            int keepCount = 0;
1474            int cancelCount = 0;
1475            struct block_request * cancel = tr_new( struct block_request, n );
1476            const struct block_request * it;
1477            const struct block_request * end;
1478
1479            for( it=t->requests, end=it+n; it!=end; ++it )
1480            {
1481                if( ( it->sentAt <= too_old ) && it->peer->msgs && !tr_peerMsgsIsReadingBlock( it->peer->msgs, it->block ) )
1482                    cancel[cancelCount++] = *it;
1483                else
1484                {
1485                    if( it != &t->requests[keepCount] )
1486                        t->requests[keepCount] = *it;
1487                    keepCount++;
1488                }
1489            }
1490
1491            /* prune out the ones we aren't keeping */
1492            t->requestCount = keepCount;
1493
1494            /* send cancel messages for all the "cancel" ones */
1495            for( it=cancel, end=it+cancelCount; it!=end; ++it ) {
1496                if( ( it->peer != NULL ) && ( it->peer->msgs != NULL ) ) {
1497                    tr_historyAdd( &it->peer->cancelsSentToPeer, now, 1 );
1498                    tr_peerMsgsCancel( it->peer->msgs, it->block );
1499                    decrementPendingReqCount( it );
1500                }
1501            }
1502
1503            /* decrement the pending request counts for the timed-out blocks */
1504            for( it=cancel, end=it+cancelCount; it!=end; ++it )
1505                pieceListRemoveRequest( t, it->block );
1506
1507            /* cleanup loop */
1508            tr_free( cancel );
1509        }
1510    }
1511
1512    tr_timerAddMsec( mgr->refillUpkeepTimer, REFILL_UPKEEP_PERIOD_MSEC );
1513    managerUnlock( mgr );
1514}
1515
1516static void
1517addStrike( Torrent * t, tr_peer * peer )
1518{
1519    tordbg( t, "increasing peer %s strike count to %d",
1520            tr_atomAddrStr( peer->atom ), peer->strikes + 1 );
1521
1522    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
1523    {
1524        struct peer_atom * atom = peer->atom;
1525        atom->flags2 |= MYFLAG_BANNED;
1526        peer->doPurge = 1;
1527        tordbg( t, "banning peer %s", tr_atomAddrStr( atom ) );
1528    }
1529}
1530
1531static void
1532gotBadPiece( Torrent * t, tr_piece_index_t pieceIndex )
1533{
1534    tr_torrent *   tor = t->tor;
1535    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
1536
1537    tor->corruptCur += byteCount;
1538    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1539
1540    tr_announcerAddBytes( tor, TR_ANN_CORRUPT, byteCount );
1541}
1542
1543static void
1544peerSuggestedPiece( Torrent            * t UNUSED,
1545                    tr_peer            * peer UNUSED,
1546                    tr_piece_index_t     pieceIndex UNUSED,
1547                    int                  isFastAllowed UNUSED )
1548{
1549#if 0
1550    assert( t );
1551    assert( peer );
1552    assert( peer->msgs );
1553
1554    /* is this a valid piece? */
1555    if(  pieceIndex >= t->tor->info.pieceCount )
1556        return;
1557
1558    /* don't ask for it if we've already got it */
1559    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
1560        return;
1561
1562    /* don't ask for it if they don't have it */
1563    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
1564        return;
1565
1566    /* don't ask for it if we're choked and it's not fast */
1567    if( !isFastAllowed && peer->clientIsChoked )
1568        return;
1569
1570    /* request the blocks that we don't have in this piece */
1571    {
1572        tr_block_index_t b;
1573        tr_block_index_t first;
1574        tr_block_index_t last;
1575        const tr_torrent * tor = t->tor;
1576
1577        tr_torGetPieceBlockRange( t->tor, pieceIndex, &first, &last );
1578
1579        for( b=first; b<=last; ++b )
1580        {
1581            if( !tr_cpBlockIsComplete( tor->completion, b ) )
1582            {
1583                const uint32_t offset = getBlockOffsetInPiece( tor, b );
1584                const uint32_t length = tr_torBlockCountBytes( tor, b );
1585                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
1586                incrementPieceRequests( t, pieceIndex );
1587            }
1588        }
1589    }
1590#endif
1591}
1592
1593static void
1594removeRequestFromTables( Torrent * t, tr_block_index_t block, const tr_peer * peer )
1595{
1596    requestListRemove( t, block, peer );
1597    pieceListRemoveRequest( t, block );
1598}
1599
1600/* peer choked us, or maybe it disconnected.
1601   either way we need to remove all its requests */
1602static void
1603peerDeclinedAllRequests( Torrent * t, const tr_peer * peer )
1604{
1605    int i, n;
1606    tr_block_index_t * blocks = tr_new( tr_block_index_t, t->requestCount );
1607
1608    for( i=n=0; i<t->requestCount; ++i )
1609        if( peer == t->requests[i].peer )
1610            blocks[n++] = t->requests[i].block;
1611
1612    for( i=0; i<n; ++i )
1613        removeRequestFromTables( t, blocks[i], peer );
1614
1615    tr_free( blocks );
1616}
1617
1618static void tr_peerMgrSetBlame( tr_torrent *, tr_piece_index_t, int );
1619
1620static void
1621peerCallbackFunc( tr_peer * peer, const tr_peer_event * e, void * vt )
1622{
1623    Torrent * t = vt;
1624
1625    torrentLock( t );
1626
1627    assert( peer != NULL );
1628
1629    switch( e->eventType )
1630    {
1631        case TR_PEER_PEER_GOT_DATA:
1632        {
1633            const time_t now = tr_time( );
1634            tr_torrent * tor = t->tor;
1635
1636            if( e->wasPieceData )
1637            {
1638                tor->uploadedCur += e->length;
1639                tr_announcerAddBytes( tor, TR_ANN_UP, e->length );
1640                tr_torrentSetActivityDate( tor, now );
1641                tr_torrentSetDirty( tor );
1642            }
1643
1644            /* update the stats */
1645            if( e->wasPieceData )
1646                tr_statsAddUploaded( tor->session, e->length );
1647
1648            /* update our atom */
1649            if( peer->atom && e->wasPieceData )
1650                peer->atom->piece_data_time = now;
1651
1652            break;
1653        }
1654
1655        case TR_PEER_CLIENT_GOT_HAVE:
1656            if( replicationExists( t ) ) {
1657                tr_incrReplicationOfPiece( t, e->pieceIndex );
1658                assertReplicationCountIsExact( t );
1659            }
1660            break;
1661
1662        case TR_PEER_CLIENT_GOT_HAVE_ALL:
1663            if( replicationExists( t ) ) {
1664                tr_incrReplication( t );
1665                assertReplicationCountIsExact( t );
1666            }
1667            break;
1668
1669        case TR_PEER_CLIENT_GOT_HAVE_NONE:
1670            /* noop */
1671            break;
1672
1673        case TR_PEER_CLIENT_GOT_BITFIELD:
1674            assert( e->bitfield != NULL );
1675            if( replicationExists( t ) ) {
1676                tr_incrReplicationFromBitfield( t, e->bitfield );
1677                assertReplicationCountIsExact( t );
1678            }
1679            break;
1680
1681        case TR_PEER_CLIENT_GOT_REJ: {
1682            tr_block_index_t b = _tr_block( t->tor, e->pieceIndex, e->offset );
1683            if( b < t->tor->blockCount )
1684                removeRequestFromTables( t, b, peer );
1685            else
1686                tordbg( t, "Peer %s sent an out-of-range reject message",
1687                           tr_atomAddrStr( peer->atom ) );
1688            break;
1689        }
1690
1691        case TR_PEER_CLIENT_GOT_CHOKE:
1692            peerDeclinedAllRequests( t, peer );
1693            break;
1694
1695        case TR_PEER_CLIENT_GOT_PORT:
1696            if( peer->atom )
1697                peer->atom->port = e->port;
1698            break;
1699
1700        case TR_PEER_CLIENT_GOT_SUGGEST:
1701            peerSuggestedPiece( t, peer, e->pieceIndex, false );
1702            break;
1703
1704        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1705            peerSuggestedPiece( t, peer, e->pieceIndex, true );
1706            break;
1707
1708        case TR_PEER_CLIENT_GOT_DATA:
1709        {
1710            const time_t now = tr_time( );
1711            tr_torrent * tor = t->tor;
1712
1713            if( e->wasPieceData )
1714            {
1715                tor->downloadedCur += e->length;
1716                tr_torrentSetActivityDate( tor, now );
1717                tr_torrentSetDirty( tor );
1718            }
1719
1720            /* update the stats */
1721            if( e->wasPieceData )
1722                tr_statsAddDownloaded( tor->session, e->length );
1723
1724            /* update our atom */
1725            if( peer->atom && e->wasPieceData )
1726                peer->atom->piece_data_time = now;
1727
1728            break;
1729        }
1730
1731        case TR_PEER_CLIENT_GOT_BLOCK:
1732        {
1733            tr_torrent * tor = t->tor;
1734            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1735            int i, peerCount;
1736            tr_peer ** peers;
1737            tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1738
1739            removeRequestFromTables( t, block, peer );
1740            getBlockRequestPeers( t, block, &peerArr );
1741            peers = (tr_peer **) tr_ptrArrayPeek( &peerArr, &peerCount );
1742
1743            /* remove additional block requests and send cancel to peers */
1744            for( i=0; i<peerCount; i++ ) {
1745                tr_peer * p = peers[i];
1746                assert( p != peer );
1747                if( p->msgs ) {
1748                    tr_historyAdd( &p->cancelsSentToPeer, tr_time( ), 1 );
1749                    tr_peerMsgsCancel( p->msgs, block );
1750                }
1751                removeRequestFromTables( t, block, p );
1752            }
1753
1754            tr_ptrArrayDestruct( &peerArr, false );
1755
1756            tr_historyAdd( &peer->blocksSentToClient, tr_time( ), 1 );
1757
1758            if( tr_cpBlockIsComplete( &tor->completion, block ) )
1759            {
1760                /* we already have this block... */
1761                const uint32_t n = tr_torBlockCountBytes( tor, block );
1762                tor->downloadedCur -= MIN( tor->downloadedCur, n );
1763                tordbg( t, "we have this block already..." );
1764            }
1765            else
1766            {
1767                tr_cpBlockAdd( &tor->completion, block );
1768                pieceListResortPiece( t, pieceListLookup( t, e->pieceIndex ) );
1769                tr_torrentSetDirty( tor );
1770
1771                if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1772                {
1773                    const tr_piece_index_t p = e->pieceIndex;
1774                    const bool ok = tr_torrentCheckPiece( tor, p );
1775
1776                    tordbg( t, "[LAZY] checked just-completed piece %zu", (size_t)p );
1777
1778                    if( !ok )
1779                    {
1780                        tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1781                                   (unsigned long)p );
1782                    }
1783
1784                    tr_peerMgrSetBlame( tor, p, ok );
1785
1786                    if( !ok )
1787                    {
1788                        gotBadPiece( t, p );
1789                    }
1790                    else
1791                    {
1792                        int i;
1793                        int peerCount;
1794                        tr_peer ** peers;
1795                        tr_file_index_t fileIndex;
1796
1797                        /* only add this to downloadedCur if we got it from a peer --
1798                         * webseeds shouldn't count against our ratio. As one tracker
1799                         * admin put it, "Those pieces are downloaded directly from the
1800                         * content distributor, not the peers, it is the tracker's job
1801                         * to manage the swarms, not the web server and does not fit
1802                         * into the jurisdiction of the tracker." */
1803                        if( peer->msgs != NULL ) {
1804                            const uint32_t n = tr_torPieceCountBytes( tor, p );
1805                            tr_announcerAddBytes( tor, TR_ANN_DOWN, n );
1806                        }
1807
1808                        peerCount = tr_ptrArraySize( &t->peers );
1809                        peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1810                        for( i=0; i<peerCount; ++i )
1811                            tr_peerMsgsHave( peers[i]->msgs, p );
1812
1813                        for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex ) {
1814                            const tr_file * file = &tor->info.files[fileIndex];
1815                            if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) ) {
1816                                if( tr_cpFileIsComplete( &tor->completion, fileIndex ) ) {
1817                                    tr_cacheFlushFile( tor->session->cache, tor, fileIndex );
1818                                    tr_torrentFileCompleted( tor, fileIndex );
1819                                }
1820                            }
1821                        }
1822
1823                        pieceListRemovePiece( t, p );
1824                    }
1825                }
1826
1827                t->needsCompletenessCheck = true;
1828            }
1829            break;
1830        }
1831
1832        case TR_PEER_ERROR:
1833            if( ( e->err == ERANGE ) || ( e->err == EMSGSIZE ) || ( e->err == ENOTCONN ) )
1834            {
1835                /* some protocol error from the peer */
1836                peer->doPurge = 1;
1837                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1838                        tr_atomAddrStr( peer->atom ) );
1839            }
1840            else
1841            {
1842                tordbg( t, "unhandled error: %s", tr_strerror( e->err ) );
1843            }
1844            break;
1845
1846        default:
1847            assert( 0 );
1848    }
1849
1850    torrentUnlock( t );
1851}
1852
1853static int
1854getDefaultShelfLife( uint8_t from )
1855{
1856    /* in general, peers obtained from firsthand contact
1857     * are better than those from secondhand, etc etc */
1858    switch( from )
1859    {
1860        case TR_PEER_FROM_INCOMING : return 60 * 60 * 6;
1861        case TR_PEER_FROM_LTEP     : return 60 * 60 * 6;
1862        case TR_PEER_FROM_TRACKER  : return 60 * 60 * 3;
1863        case TR_PEER_FROM_DHT      : return 60 * 60 * 3;
1864        case TR_PEER_FROM_PEX      : return 60 * 60 * 2;
1865        case TR_PEER_FROM_RESUME   : return 60 * 60;
1866        case TR_PEER_FROM_LPD      : return 10 * 60;
1867        default                    : return 60 * 60;
1868    }
1869}
1870
1871static void
1872ensureAtomExists( Torrent           * t,
1873                  const tr_address  * addr,
1874                  const tr_port       port,
1875                  const uint8_t       flags,
1876                  const int8_t        seedProbability,
1877                  const uint8_t       from )
1878{
1879    struct peer_atom * a;
1880
1881    assert( tr_address_is_valid( addr ) );
1882    assert( from < TR_PEER_FROM__MAX );
1883
1884    a = getExistingAtom( t, addr );
1885
1886    if( a == NULL )
1887    {
1888        const int jitter = tr_cryptoWeakRandInt( 60*10 );
1889        a = tr_new0( struct peer_atom, 1 );
1890        a->addr = *addr;
1891        a->port = port;
1892        a->flags = flags;
1893        a->fromFirst = from;
1894        a->fromBest = from;
1895        a->shelf_date = tr_time( ) + getDefaultShelfLife( from ) + jitter;
1896        a->blocklisted = -1;
1897        atomSetSeedProbability( a, seedProbability );
1898        tr_ptrArrayInsertSorted( &t->pool, a, compareAtomsByAddress );
1899
1900        tordbg( t, "got a new atom: %s", tr_atomAddrStr( a ) );
1901    }
1902    else
1903    {
1904        if( from < a->fromBest )
1905            a->fromBest = from;
1906
1907        if( a->seedProbability == -1 )
1908            atomSetSeedProbability( a, seedProbability );
1909
1910        a->flags |= flags;
1911    }
1912}
1913
1914static int
1915getMaxPeerCount( const tr_torrent * tor )
1916{
1917    return tor->maxConnectedPeers;
1918}
1919
1920static int
1921getPeerCount( const Torrent * t )
1922{
1923    return tr_ptrArraySize( &t->peers );/* + tr_ptrArraySize( &t->outgoingHandshakes ); */
1924}
1925
1926/* FIXME: this is kind of a mess. */
1927static bool
1928myHandshakeDoneCB( tr_handshake  * handshake,
1929                   tr_peerIo     * io,
1930                   bool            readAnythingFromPeer,
1931                   bool            isConnected,
1932                   const uint8_t * peer_id,
1933                   void          * vmanager )
1934{
1935    bool               ok = isConnected;
1936    bool               success = false;
1937    tr_port            port;
1938    const tr_address * addr;
1939    tr_peerMgr       * manager = vmanager;
1940    Torrent          * t;
1941    tr_handshake     * ours;
1942
1943    assert( io );
1944    assert( tr_isBool( ok ) );
1945
1946    t = tr_peerIoHasTorrentHash( io )
1947        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1948        : NULL;
1949
1950    if( tr_peerIoIsIncoming ( io ) )
1951        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1952                                        handshake, handshakeCompare );
1953    else if( t )
1954        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1955                                        handshake, handshakeCompare );
1956    else
1957        ours = handshake;
1958
1959    assert( ours );
1960    assert( ours == handshake );
1961
1962    if( t )
1963        torrentLock( t );
1964
1965    addr = tr_peerIoGetAddress( io, &port );
1966
1967    if( !ok || !t || !t->isRunning )
1968    {
1969        if( t )
1970        {
1971            struct peer_atom * atom = getExistingAtom( t, addr );
1972            if( atom )
1973            {
1974                ++atom->numFails;
1975
1976                if( !readAnythingFromPeer )
1977                {
1978                    tordbg( t, "marking peer %s as unreachable... numFails is %d", tr_atomAddrStr( atom ), (int)atom->numFails );
1979                    atom->flags2 |= MYFLAG_UNREACHABLE;
1980                }
1981            }
1982        }
1983    }
1984    else /* looking good */
1985    {
1986        struct peer_atom * atom;
1987
1988        ensureAtomExists( t, addr, port, 0, -1, TR_PEER_FROM_INCOMING );
1989        atom = getExistingAtom( t, addr );
1990        atom->time = tr_time( );
1991        atom->piece_data_time = 0;
1992        atom->lastConnectionAt = tr_time( );
1993
1994        if( !tr_peerIoIsIncoming( io ) )
1995        {
1996            atom->flags |= ADDED_F_CONNECTABLE;
1997            atom->flags2 &= ~MYFLAG_UNREACHABLE;
1998        }
1999
2000        /* In principle, this flag specifies whether the peer groks uTP,
2001           not whether it's currently connected over uTP. */
2002        if( io->utp_socket )
2003            atom->flags |= ADDED_F_UTP_FLAGS;
2004
2005        if( atom->flags2 & MYFLAG_BANNED )
2006        {
2007            tordbg( t, "banned peer %s tried to reconnect",
2008                    tr_atomAddrStr( atom ) );
2009        }
2010        else if( tr_peerIoIsIncoming( io )
2011               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
2012
2013        {
2014        }
2015        else
2016        {
2017            tr_peer * peer = atom->peer;
2018
2019            if( peer ) /* we already have this peer */
2020            {
2021            }
2022            else
2023            {
2024                peer = getPeer( t, atom );
2025                tr_free( peer->client );
2026
2027                if( !peer_id )
2028                    peer->client = NULL;
2029                else {
2030                    char client[128];
2031                    tr_clientForId( client, sizeof( client ), peer_id );
2032                    peer->client = tr_strdup( client );
2033                }
2034
2035                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
2036                                                                balanced by our unref in peerDelete()  */
2037                tr_peerIoSetParent( peer->io, t->tor->bandwidth );
2038                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t );
2039
2040                success = true;
2041            }
2042        }
2043    }
2044
2045    if( t )
2046        torrentUnlock( t );
2047
2048    return success;
2049}
2050
2051void
2052tr_peerMgrAddIncoming( tr_peerMgr * manager,
2053                       tr_address * addr,
2054                       tr_port      port,
2055                       int          socket,
2056                       struct UTPSocket * utp_socket )
2057{
2058    tr_session * session;
2059
2060    managerLock( manager );
2061
2062    assert( tr_isSession( manager->session ) );
2063    session = manager->session;
2064
2065    if( tr_sessionIsAddressBlocked( session, addr ) )
2066    {
2067        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_address_to_string( addr ) );
2068        if(socket >= 0)
2069            tr_netClose( session, socket );
2070        else
2071            UTP_Close( utp_socket );
2072    }
2073    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
2074    {
2075        if(socket >= 0)
2076            tr_netClose( session, socket );
2077        else
2078            UTP_Close( utp_socket );
2079    }
2080    else /* we don't have a connection to them yet... */
2081    {
2082        tr_peerIo *    io;
2083        tr_handshake * handshake;
2084
2085        io = tr_peerIoNewIncoming( session, session->bandwidth, addr, port, socket, utp_socket );
2086
2087        handshake = tr_handshakeNew( io,
2088                                     session->encryptionMode,
2089                                     myHandshakeDoneCB,
2090                                     manager );
2091
2092        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
2093
2094        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
2095                                 handshakeCompare );
2096    }
2097
2098    managerUnlock( manager );
2099}
2100
2101void
2102tr_peerMgrAddPex( tr_torrent * tor, uint8_t from,
2103                  const tr_pex * pex, int8_t seedProbability )
2104{
2105    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
2106    {
2107        Torrent * t = tor->torrentPeers;
2108        managerLock( t->manager );
2109
2110        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
2111            if( tr_address_is_valid_for_peers( &pex->addr, pex->port ) )
2112                ensureAtomExists( t, &pex->addr, pex->port, pex->flags, seedProbability, from );
2113
2114        managerUnlock( t->manager );
2115    }
2116}
2117
2118void
2119tr_peerMgrMarkAllAsSeeds( tr_torrent * tor )
2120{
2121    Torrent * t = tor->torrentPeers;
2122    const int n = tr_ptrArraySize( &t->pool );
2123    struct peer_atom ** it = (struct peer_atom**) tr_ptrArrayBase( &t->pool );
2124    struct peer_atom ** end = it + n;
2125
2126    while( it != end )
2127        atomSetSeed( t, *it++ );
2128}
2129
2130tr_pex *
2131tr_peerMgrCompactToPex( const void *    compact,
2132                        size_t          compactLen,
2133                        const uint8_t * added_f,
2134                        size_t          added_f_len,
2135                        size_t *        pexCount )
2136{
2137    size_t          i;
2138    size_t          n = compactLen / 6;
2139    const uint8_t * walk = compact;
2140    tr_pex *        pex = tr_new0( tr_pex, n );
2141
2142    for( i = 0; i < n; ++i )
2143    {
2144        pex[i].addr.type = TR_AF_INET;
2145        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
2146        memcpy( &pex[i].port, walk, 2 ); walk += 2;
2147        if( added_f && ( n == added_f_len ) )
2148            pex[i].flags = added_f[i];
2149    }
2150
2151    *pexCount = n;
2152    return pex;
2153}
2154
2155tr_pex *
2156tr_peerMgrCompact6ToPex( const void    * compact,
2157                         size_t          compactLen,
2158                         const uint8_t * added_f,
2159                         size_t          added_f_len,
2160                         size_t        * pexCount )
2161{
2162    size_t          i;
2163    size_t          n = compactLen / 18;
2164    const uint8_t * walk = compact;
2165    tr_pex *        pex = tr_new0( tr_pex, n );
2166
2167    for( i = 0; i < n; ++i )
2168    {
2169        pex[i].addr.type = TR_AF_INET6;
2170        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
2171        memcpy( &pex[i].port, walk, 2 ); walk += 2;
2172        if( added_f && ( n == added_f_len ) )
2173            pex[i].flags = added_f[i];
2174    }
2175
2176    *pexCount = n;
2177    return pex;
2178}
2179
2180tr_pex *
2181tr_peerMgrArrayToPex( const void * array,
2182                      size_t       arrayLen,
2183                      size_t      * pexCount )
2184{
2185    size_t          i;
2186    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
2187    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
2188    const uint8_t * walk = array;
2189    tr_pex        * pex = tr_new0( tr_pex, n );
2190
2191    for( i = 0 ; i < n ; i++ ) {
2192        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
2193        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
2194        pex[i].flags = 0x00;
2195        walk += sizeof( tr_address ) + 2;
2196    }
2197
2198    *pexCount = n;
2199    return pex;
2200}
2201
2202/**
2203***
2204**/
2205
2206static void
2207tr_peerMgrSetBlame( tr_torrent     * tor,
2208                    tr_piece_index_t pieceIndex,
2209                    int              success )
2210{
2211    if( !success )
2212    {
2213        int        peerCount, i;
2214        Torrent *  t = tor->torrentPeers;
2215        tr_peer ** peers;
2216
2217        assert( torrentIsLocked( t ) );
2218
2219        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
2220        for( i = 0; i < peerCount; ++i )
2221        {
2222            tr_peer * peer = peers[i];
2223            if( tr_bitfieldHas( &peer->blame, pieceIndex ) )
2224            {
2225                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
2226                        tr_atomAddrStr( peer->atom ),
2227                        pieceIndex, (int)peer->strikes + 1 );
2228                addStrike( t, peer );
2229            }
2230        }
2231    }
2232}
2233
2234int
2235tr_pexCompare( const void * va, const void * vb )
2236{
2237    const tr_pex * a = va;
2238    const tr_pex * b = vb;
2239    int i;
2240
2241    assert( tr_isPex( a ) );
2242    assert( tr_isPex( b ) );
2243
2244    if(( i = tr_address_compare( &a->addr, &b->addr )))
2245        return i;
2246
2247    if( a->port != b->port )
2248        return a->port < b->port ? -1 : 1;
2249
2250    return 0;
2251}
2252
2253#if 0
2254static int
2255peerPrefersCrypto( const tr_peer * peer )
2256{
2257    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
2258        return true;
2259
2260    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
2261        return false;
2262
2263    return tr_peerIoIsEncrypted( peer->io );
2264}
2265#endif
2266
2267/* better goes first */
2268static int
2269compareAtomsByUsefulness( const void * va, const void *vb )
2270{
2271    const struct peer_atom * a = * (const struct peer_atom**) va;
2272    const struct peer_atom * b = * (const struct peer_atom**) vb;
2273
2274    assert( tr_isAtom( a ) );
2275    assert( tr_isAtom( b ) );
2276
2277    if( a->piece_data_time != b->piece_data_time )
2278        return a->piece_data_time > b->piece_data_time ? -1 : 1;
2279    if( a->fromBest != b->fromBest )
2280        return a->fromBest < b->fromBest ? -1 : 1;
2281    if( a->numFails != b->numFails )
2282        return a->numFails < b->numFails ? -1 : 1;
2283
2284    return 0;
2285}
2286
2287int
2288tr_peerMgrGetPeers( tr_torrent   * tor,
2289                    tr_pex      ** setme_pex,
2290                    uint8_t        af,
2291                    uint8_t        list_mode,
2292                    int            maxCount )
2293{
2294    int i;
2295    int n;
2296    int count = 0;
2297    int atomCount = 0;
2298    const Torrent * t = tor->torrentPeers;
2299    struct peer_atom ** atoms = NULL;
2300    tr_pex * pex;
2301    tr_pex * walk;
2302
2303    assert( tr_isTorrent( tor ) );
2304    assert( setme_pex != NULL );
2305    assert( af==TR_AF_INET || af==TR_AF_INET6 );
2306    assert( list_mode==TR_PEERS_CONNECTED || list_mode==TR_PEERS_ALL );
2307
2308    managerLock( t->manager );
2309
2310    /**
2311    ***  build a list of atoms
2312    **/
2313
2314    if( list_mode == TR_PEERS_CONNECTED ) /* connected peers only */
2315    {
2316        int i;
2317        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2318        atomCount = tr_ptrArraySize( &t->peers );
2319        atoms = tr_new( struct peer_atom *, atomCount );
2320        for( i=0; i<atomCount; ++i )
2321            atoms[i] = peers[i]->atom;
2322    }
2323    else /* TR_PEERS_ALL */
2324    {
2325        const struct peer_atom ** atomsBase = (const struct peer_atom**) tr_ptrArrayBase( &t->pool );
2326        atomCount = tr_ptrArraySize( &t->pool );
2327        atoms = tr_memdup( atomsBase, atomCount * sizeof( struct peer_atom * ) );
2328    }
2329
2330    qsort( atoms, atomCount, sizeof( struct peer_atom * ), compareAtomsByUsefulness );
2331
2332    /**
2333    ***  add the first N of them into our return list
2334    **/
2335
2336    n = MIN( atomCount, maxCount );
2337    pex = walk = tr_new0( tr_pex, n );
2338
2339    for( i=0; i<atomCount && count<n; ++i )
2340    {
2341        const struct peer_atom * atom = atoms[i];
2342        if( atom->addr.type == af )
2343        {
2344            assert( tr_address_is_valid( &atom->addr ) );
2345            walk->addr = atom->addr;
2346            walk->port = atom->port;
2347            walk->flags = atom->flags;
2348            ++count;
2349            ++walk;
2350        }
2351    }
2352
2353    qsort( pex, count, sizeof( tr_pex ), tr_pexCompare );
2354
2355    assert( ( walk - pex ) == count );
2356    *setme_pex = pex;
2357
2358    /* cleanup */
2359    tr_free( atoms );
2360    managerUnlock( t->manager );
2361    return count;
2362}
2363
2364static void atomPulse      ( int, short, void * );
2365static void bandwidthPulse ( int, short, void * );
2366static void rechokePulse   ( int, short, void * );
2367static void reconnectPulse ( int, short, void * );
2368
2369static struct event *
2370createTimer( tr_session * session, int msec, void (*callback)(int, short, void *), void * cbdata )
2371{
2372    struct event * timer = evtimer_new( session->event_base, callback, cbdata );
2373    tr_timerAddMsec( timer, msec );
2374    return timer;
2375}
2376
2377static void
2378ensureMgrTimersExist( struct tr_peerMgr * m )
2379{
2380    if( m->atomTimer == NULL )
2381        m->atomTimer = createTimer( m->session, ATOM_PERIOD_MSEC, atomPulse, m );
2382
2383    if( m->bandwidthTimer == NULL )
2384        m->bandwidthTimer = createTimer( m->session, BANDWIDTH_PERIOD_MSEC, bandwidthPulse, m );
2385
2386    if( m->rechokeTimer == NULL )
2387        m->rechokeTimer = createTimer( m->session, RECHOKE_PERIOD_MSEC, rechokePulse, m );
2388
2389    if( m->refillUpkeepTimer == NULL )
2390        m->refillUpkeepTimer = createTimer( m->session, REFILL_UPKEEP_PERIOD_MSEC, refillUpkeep, m );
2391}
2392
2393void
2394tr_peerMgrStartTorrent( tr_torrent * tor )
2395{
2396    Torrent * t = tor->torrentPeers;
2397
2398    assert( tr_isTorrent( tor ) );
2399    assert( tr_torrentIsLocked( tor ) );
2400
2401    ensureMgrTimersExist( t->manager );
2402
2403    t->isRunning = true;
2404    t->maxPeers = t->tor->maxConnectedPeers;
2405    t->pieceSortState = PIECES_UNSORTED;
2406
2407    rechokePulse( 0, 0, t->manager );
2408}
2409
2410static void
2411stopTorrent( Torrent * t )
2412{
2413    tr_peer * peer;
2414
2415    t->isRunning = false;
2416
2417    replicationFree( t );
2418    invalidatePieceSorting( t );
2419
2420    /* disconnect the peers. */
2421    while(( peer = tr_ptrArrayPop( &t->peers )))
2422        peerDelete( t, peer );
2423
2424    /* disconnect the handshakes. handshakeAbort calls handshakeDoneCB(),
2425     * which removes the handshake from t->outgoingHandshakes... */
2426    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
2427        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
2428}
2429
2430void
2431tr_peerMgrStopTorrent( tr_torrent * tor )
2432{
2433    assert( tr_isTorrent( tor ) );
2434    assert( tr_torrentIsLocked( tor ) );
2435
2436    stopTorrent( tor->torrentPeers );
2437}
2438
2439void
2440tr_peerMgrAddTorrent( tr_peerMgr * manager, tr_torrent * tor )
2441{
2442    assert( tr_isTorrent( tor ) );
2443    assert( tr_torrentIsLocked( tor ) );
2444    assert( tor->torrentPeers == NULL );
2445
2446    tor->torrentPeers = torrentNew( manager, tor );
2447}
2448
2449void
2450tr_peerMgrRemoveTorrent( tr_torrent * tor )
2451{
2452    assert( tr_isTorrent( tor ) );
2453    assert( tr_torrentIsLocked( tor ) );
2454
2455    stopTorrent( tor->torrentPeers );
2456    torrentFree( tor->torrentPeers );
2457}
2458
2459void
2460tr_peerUpdateProgress( tr_torrent * tor, tr_peer * peer )
2461{
2462    const tr_bitfield * have = &peer->have;
2463
2464    if( tr_bitfieldHasAll( have ) )
2465    {
2466        peer->progress = 1.0;
2467    }
2468    else if( tr_bitfieldHasNone( have ) )
2469    {
2470        peer->progress = 0.0;
2471    }
2472    else
2473    {
2474        const float true_count = tr_bitfieldCountTrueBits( have );
2475
2476        if( tr_torrentHasMetadata( tor ) )
2477            peer->progress = true_count / tor->info.pieceCount;
2478        else /* without pieceCount, this result is only a best guess... */
2479            peer->progress = true_count / ( have->bit_count + 1 );
2480    }
2481
2482    if( peer->atom && ( peer->progress >= 1.0 ) )
2483        atomSetSeed( tor->torrentPeers, peer->atom );
2484}
2485
2486void
2487tr_peerMgrOnTorrentGotMetainfo( tr_torrent * tor )
2488{
2489    int i;
2490    const int peerCount = tr_ptrArraySize( &tor->torrentPeers->peers );
2491    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &tor->torrentPeers->peers );
2492
2493    /* some peer_msgs' progress fields may not be accurate if we
2494       didn't have the metadata before now... so refresh them all... */
2495    for( i=0; i<peerCount; ++i )
2496        tr_peerUpdateProgress( tor, peers[i] );
2497}
2498
2499void
2500tr_peerMgrTorrentAvailability( const tr_torrent * tor, int8_t * tab, unsigned int tabCount )
2501{
2502    assert( tr_isTorrent( tor ) );
2503    assert( torrentIsLocked( tor->torrentPeers ) );
2504    assert( tab != NULL );
2505    assert( tabCount > 0 );
2506
2507    memset( tab, 0, tabCount );
2508
2509    if( tr_torrentHasMetadata( tor ) )
2510    {
2511        tr_piece_index_t i;
2512        const int peerCount = tr_ptrArraySize( &tor->torrentPeers->peers );
2513        const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &tor->torrentPeers->peers );
2514        const float interval = tor->info.pieceCount / (float)tabCount;
2515        const bool isSeed = tr_cpGetStatus( &tor->completion ) == TR_SEED;
2516
2517        for( i=0; i<tabCount; ++i )
2518        {
2519            const int piece = i * interval;
2520
2521            if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
2522                tab[i] = -1;
2523            else if( peerCount ) {
2524                int j;
2525                for( j=0; j<peerCount; ++j )
2526                    if( tr_bitfieldHas( &peers[j]->have, piece ) )
2527                        ++tab[i];
2528            }
2529        }
2530    }
2531}
2532
2533static bool
2534peerIsSeed( const tr_peer * peer )
2535{
2536    if( peer->progress >= 1.0 )
2537        return true;
2538
2539    if( peer->atom && atomIsSeed( peer->atom ) )
2540        return true;
2541
2542    return false;
2543}
2544
2545/* count how many pieces we want that connected peers have */
2546uint64_t
2547tr_peerMgrGetDesiredAvailable( const tr_torrent * tor )
2548{
2549    size_t i;
2550    size_t n;
2551    uint64_t desiredAvailable;
2552    const Torrent * t = tor->torrentPeers;
2553
2554    /* common shortcuts... */
2555
2556    if( tr_torrentIsSeed( t->tor ) )
2557        return 0;
2558
2559    if( !tr_torrentHasMetadata( tor ) )
2560        return 0;
2561
2562    n = tr_ptrArraySize( &t->peers );
2563    if( n == 0 )
2564        return 0;
2565    else {
2566        const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2567        for( i=0; i<n; ++i )
2568            if( peers[i]->atom && atomIsSeed( peers[i]->atom ) )
2569                return tr_cpLeftUntilDone( &tor->completion );
2570    }
2571
2572    if( !t->pieceReplication || !t->pieceReplicationSize )
2573        return 0;
2574
2575    /* do it the hard way */
2576
2577    desiredAvailable = 0;
2578    for( i=0, n=tor->info.pieceCount; i<n; ++i )
2579    {
2580        if( tor->info.pieces[i].dnd )
2581            continue;
2582        if( t->pieceReplicationSize >= i )
2583            continue;
2584        if( t->pieceReplication[i] == 0 )
2585            continue;
2586
2587        desiredAvailable += tr_cpMissingBytesInPiece( &t->tor->completion, i );
2588    }
2589
2590    return desiredAvailable;
2591}
2592
2593void
2594tr_peerMgrTorrentStats( tr_torrent  * tor,
2595                        int         * setmePeersConnected,
2596                        int         * setmeWebseedsSendingToUs,
2597                        int         * setmePeersSendingToUs,
2598                        int         * setmePeersGettingFromUs,
2599                        int         * setmePeersFrom )
2600{
2601    int i, size;
2602    const Torrent * t = tor->torrentPeers;
2603    const tr_peer ** peers;
2604
2605    assert( tr_torrentIsLocked( tor ) );
2606
2607    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2608    size = tr_ptrArraySize( &t->peers );
2609
2610    *setmePeersConnected       = 0;
2611    *setmePeersGettingFromUs   = 0;
2612    *setmePeersSendingToUs     = 0;
2613    *setmeWebseedsSendingToUs  = 0;
2614
2615    for( i=0; i<TR_PEER_FROM__MAX; ++i )
2616        setmePeersFrom[i] = 0;
2617
2618    for( i=0; i<size; ++i )
2619    {
2620        const tr_peer * peer = peers[i];
2621        const struct peer_atom * atom = peer->atom;
2622
2623        if( peer->io == NULL ) /* not connected */
2624            continue;
2625
2626        ++*setmePeersConnected;
2627
2628        ++setmePeersFrom[atom->fromFirst];
2629
2630        if( clientIsDownloadingFrom( tor, peer ) )
2631            ++*setmePeersSendingToUs;
2632
2633        if( clientIsUploadingTo( peer ) )
2634            ++*setmePeersGettingFromUs;
2635    }
2636
2637    *setmeWebseedsSendingToUs = countActiveWebseeds( t );
2638}
2639
2640double*
2641tr_peerMgrWebSpeeds_KBps( const tr_torrent * tor )
2642{
2643    int i;
2644    const Torrent * t = tor->torrentPeers;
2645    const int webseedCount = tr_ptrArraySize( &t->webseeds );
2646    const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2647    const uint64_t now = tr_time_msec( );
2648    double * ret = tr_new0( double, webseedCount );
2649
2650    assert( tr_isTorrent( tor ) );
2651    assert( tr_torrentIsLocked( tor ) );
2652    assert( t->manager != NULL );
2653    assert( webseedCount == tor->info.webseedCount );
2654
2655    for( i=0; i<webseedCount; ++i ) {
2656        int Bps;
2657        if( tr_webseedGetSpeed_Bps( webseeds[i], now, &Bps ) )
2658            ret[i] = Bps / (double)tr_speed_K;
2659        else
2660            ret[i] = -1.0;
2661    }
2662
2663    return ret;
2664}
2665
2666int
2667tr_peerGetPieceSpeed_Bps( const tr_peer * peer, uint64_t now, tr_direction direction )
2668{
2669    return peer->io ? tr_peerIoGetPieceSpeed_Bps( peer->io, now, direction ) : 0.0;
2670}
2671
2672struct tr_peer_stat *
2673tr_peerMgrPeerStats( const tr_torrent * tor, int * setmeCount )
2674{
2675    int i;
2676    const Torrent * t = tor->torrentPeers;
2677    const int size = tr_ptrArraySize( &t->peers );
2678    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2679    const uint64_t now_msec = tr_time_msec( );
2680    const time_t now = tr_time();
2681    tr_peer_stat * ret = tr_new0( tr_peer_stat, size );
2682
2683    assert( tr_isTorrent( tor ) );
2684    assert( tr_torrentIsLocked( tor ) );
2685    assert( t->manager );
2686
2687    for( i=0; i<size; ++i )
2688    {
2689        char *                   pch;
2690        const tr_peer *          peer = peers[i];
2691        const struct peer_atom * atom = peer->atom;
2692        tr_peer_stat *           stat = ret + i;
2693
2694        tr_address_to_string_with_buf( &atom->addr, stat->addr, sizeof( stat->addr ) );
2695        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
2696                   sizeof( stat->client ) );
2697        stat->port                = ntohs( peer->atom->port );
2698        stat->from                = atom->fromFirst;
2699        stat->progress            = peer->progress;
2700        stat->isUTP               = peer->io->utp_socket != NULL;
2701        stat->isEncrypted         = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
2702        stat->rateToPeer_KBps     = toSpeedKBps( tr_peerGetPieceSpeed_Bps( peer, now_msec, TR_CLIENT_TO_PEER ) );
2703        stat->rateToClient_KBps   = toSpeedKBps( tr_peerGetPieceSpeed_Bps( peer, now_msec, TR_PEER_TO_CLIENT ) );
2704        stat->peerIsChoked        = peer->peerIsChoked;
2705        stat->peerIsInterested    = peer->peerIsInterested;
2706        stat->clientIsChoked      = peer->clientIsChoked;
2707        stat->clientIsInterested  = peer->clientIsInterested;
2708        stat->isIncoming          = tr_peerIoIsIncoming( peer->io );
2709        stat->isDownloadingFrom   = clientIsDownloadingFrom( tor, peer );
2710        stat->isUploadingTo       = clientIsUploadingTo( peer );
2711        stat->isSeed              = peerIsSeed( peer );
2712
2713        stat->blocksToPeer        = tr_historyGet( &peer->blocksSentToPeer,    now, CANCEL_HISTORY_SEC );
2714        stat->blocksToClient      = tr_historyGet( &peer->blocksSentToClient,  now, CANCEL_HISTORY_SEC );
2715        stat->cancelsToPeer       = tr_historyGet( &peer->cancelsSentToPeer,   now, CANCEL_HISTORY_SEC );
2716        stat->cancelsToClient     = tr_historyGet( &peer->cancelsSentToClient, now, CANCEL_HISTORY_SEC );
2717
2718        stat->pendingReqsToPeer   = peer->pendingReqsToPeer;
2719        stat->pendingReqsToClient = peer->pendingReqsToClient;
2720
2721        pch = stat->flagStr;
2722        if( stat->isUTP ) *pch++ = 'T';
2723        if( t->optimistic == peer ) *pch++ = 'O';
2724        if( stat->isDownloadingFrom ) *pch++ = 'D';
2725        else if( stat->clientIsInterested ) *pch++ = 'd';
2726        if( stat->isUploadingTo ) *pch++ = 'U';
2727        else if( stat->peerIsInterested ) *pch++ = 'u';
2728        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
2729        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
2730        if( stat->isEncrypted ) *pch++ = 'E';
2731        if( stat->from == TR_PEER_FROM_DHT ) *pch++ = 'H';
2732        else if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
2733        if( stat->isIncoming ) *pch++ = 'I';
2734        *pch = '\0';
2735    }
2736
2737    *setmeCount = size;
2738
2739    return ret;
2740}
2741
2742/**
2743***
2744**/
2745
2746void
2747tr_peerMgrClearInterest( tr_torrent * tor )
2748{
2749    int i;
2750    Torrent * t = tor->torrentPeers;
2751    const int peerCount = tr_ptrArraySize( &t->peers );
2752
2753    assert( tr_isTorrent( tor ) );
2754    assert( tr_torrentIsLocked( tor ) );
2755
2756    for( i=0; i<peerCount; ++i )
2757    {
2758        const tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2759        tr_peerMsgsSetInterested( peer->msgs, false );
2760    }
2761}
2762
2763/* do we still want this piece and does the peer have it? */
2764static bool
2765isPieceInteresting( const tr_torrent * tor, const tr_peer * peer, tr_piece_index_t index )
2766{
2767    return ( !tor->info.pieces[index].dnd ) /* we want it */
2768        && ( !tr_cpPieceIsComplete( &tor->completion, index ) )  /* we don't have it */
2769        && ( tr_bitfieldHas( &peer->have, index ) ); /* peer has it */
2770}
2771
2772/* does this peer have any pieces that we want? */
2773static bool
2774isPeerInteresting( const tr_torrent * tor, const tr_peer * peer )
2775{
2776    tr_piece_index_t i, n;
2777
2778    if ( tr_torrentIsSeed( tor ) )
2779        return false;
2780
2781    if( !tr_torrentIsPieceTransferAllowed( tor, TR_PEER_TO_CLIENT ) )
2782        return false;
2783
2784    for( i=0, n=tor->info.pieceCount; i<n; ++i )
2785        if( isPieceInteresting( tor, peer, i ) )
2786            return true;
2787
2788    return false;
2789}
2790
2791/* determines who we send "interested" messages to */
2792static void
2793rechokeDownloads( Torrent * t )
2794{
2795    int i;
2796    const time_t now = tr_time( );
2797    const int MIN_INTERESTING_PEERS = 5;
2798    const int peerCount = tr_ptrArraySize( &t->peers );
2799    int maxPeers;
2800
2801    int badCount         = 0;
2802    int goodCount        = 0;
2803    int untestedCount    = 0;
2804    tr_peer ** bad       = tr_new( tr_peer*, peerCount );
2805    tr_peer ** good      = tr_new( tr_peer*, peerCount );
2806    tr_peer ** untested  = tr_new( tr_peer*, peerCount );
2807
2808    /* decide how many peers to be interested in */
2809    {
2810        int blocks = 0;
2811        int cancels = 0;
2812        time_t timeSinceCancel;
2813
2814        /* Count up how many blocks & cancels each peer has.
2815         *
2816         * There are two situations where we send out cancels --
2817         *
2818         * 1. We've got unresponsive peers, which is handled by deciding
2819         *    -which- peers to be interested in.
2820         *
2821         * 2. We've hit our bandwidth cap, which is handled by deciding
2822         *    -how many- peers to be interested in.
2823         *
2824         * We're working on 2. here, so we need to ignore unresponsive
2825         * peers in our calculations lest they confuse Transmission into
2826         * thinking it's hit its bandwidth cap.
2827         */
2828        for( i=0; i<peerCount; ++i )
2829        {
2830            const tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2831            const int b = tr_historyGet( &peer->blocksSentToClient, now, CANCEL_HISTORY_SEC );
2832            const int c = tr_historyGet( &peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC );
2833
2834            if( b == 0 ) /* ignore unresponsive peers, as described above */
2835                continue;
2836
2837            blocks += b;
2838            cancels += c;
2839        }
2840
2841        if( cancels > 0 )
2842        {
2843            /* cancelRate: of the block requests we've recently made, the percentage we cancelled.
2844             * higher values indicate more congestion. */
2845            const double cancelRate = cancels / (double)(cancels + blocks);
2846            const double mult = 1 - MIN( cancelRate, 0.5 );
2847            maxPeers = t->interestedCount * mult;
2848            tordbg( t, "cancel rate is %.3f -- reducing the "
2849                       "number of peers we're interested in by %.0f percent",
2850                       cancelRate, mult * 100 );
2851            t->lastCancel = now;
2852        }
2853
2854        timeSinceCancel = now - t->lastCancel;
2855        if( timeSinceCancel )
2856        {
2857            const int maxIncrease = 15;
2858            const time_t maxHistory = 2 * CANCEL_HISTORY_SEC;
2859            const double mult = MIN( timeSinceCancel, maxHistory ) / (double) maxHistory;
2860            const int inc = maxIncrease * mult;
2861            maxPeers = t->maxPeers + inc;
2862            tordbg( t, "time since last cancel is %li -- increasing the "
2863                       "number of peers we're interested in by %d",
2864                       timeSinceCancel, inc );
2865        }
2866    }
2867
2868    /* don't let the previous section's number tweaking go too far... */
2869    if( maxPeers < MIN_INTERESTING_PEERS )
2870        maxPeers = MIN_INTERESTING_PEERS;
2871    if( maxPeers > t->tor->maxConnectedPeers )
2872        maxPeers = t->tor->maxConnectedPeers;
2873
2874    t->maxPeers = maxPeers;
2875
2876    /* separate the peers into "good" (ones with a low cancel-to-block ratio),
2877     * untested peers, and "bad" (ones with a high cancel-to-block ratio).
2878     * That's the order in which we'll choose who to show interest in */
2879    {
2880        /* Randomize the peer array so the peers in the three groups will be unsorted... */
2881        int n = peerCount;
2882        tr_peer ** peers = tr_memdup( tr_ptrArrayBase( &t->peers ), n * sizeof( tr_peer * ) );
2883
2884        while( n > 0 )
2885        {
2886            const int i = tr_cryptoWeakRandInt( n );
2887            tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2888
2889            if( !isPeerInteresting( t->tor, peer ) )
2890            {
2891                tr_peerMsgsSetInterested( peer->msgs, false );
2892            }
2893            else
2894            {
2895                const int blocks = tr_historyGet( &peer->blocksSentToClient, now, CANCEL_HISTORY_SEC );
2896                const int cancels = tr_historyGet( &peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC );
2897
2898                if( !blocks && !cancels )
2899                    untested[untestedCount++] = peer;
2900                else if( !cancels )
2901                    good[goodCount++] = peer;
2902                else if( !blocks )
2903                    bad[badCount++] = peer;
2904                else if( ( cancels * 10 ) < blocks )
2905                    good[goodCount++] = peer;
2906                else
2907                    bad[badCount++] = peer;
2908            }
2909
2910            tr_removeElementFromArray( peers, i, sizeof(tr_peer*), n-- );
2911        }
2912
2913        tr_free( peers );
2914    }
2915
2916    t->interestedCount = 0;
2917
2918    /* We've decided (1) how many peers to be interested in,
2919     * and (2) which peers are the best candidates,
2920     * Now it's time to update our `interest' flags. */
2921    for( i=0; i<goodCount; ++i ) {
2922        const bool b = t->interestedCount < maxPeers;
2923        tr_peerMsgsSetInterested( good[i]->msgs, b );
2924        if( b )
2925            ++t->interestedCount;
2926    }
2927    for( i=0; i<untestedCount; ++i ) {
2928        const bool b = t->interestedCount < maxPeers;
2929        tr_peerMsgsSetInterested( untested[i]->msgs, b );
2930        if( b )
2931            ++t->interestedCount;
2932    }
2933    for( i=0; i<badCount; ++i ) {
2934        const bool b = t->interestedCount < maxPeers;
2935        tr_peerMsgsSetInterested( bad[i]->msgs, b );
2936        if( b )
2937            ++t->interestedCount;
2938    }
2939
2940/*fprintf( stderr, "num interested: %d\n", t->interestedCount );*/
2941
2942    /* cleanup */
2943    tr_free( untested );
2944    tr_free( good );
2945    tr_free( bad );
2946}
2947
2948/**
2949***
2950**/
2951
2952struct ChokeData
2953{
2954    bool            isInterested;
2955    bool            wasChoked;
2956    bool            isChoked;
2957    int             rate;
2958    int             salt;
2959    tr_peer *       peer;
2960};
2961
2962static int
2963compareChoke( const void * va, const void * vb )
2964{
2965    const struct ChokeData * a = va;
2966    const struct ChokeData * b = vb;
2967
2968    if( a->rate != b->rate ) /* prefer higher overall speeds */
2969        return a->rate > b->rate ? -1 : 1;
2970
2971    if( a->wasChoked != b->wasChoked ) /* prefer unchoked */
2972        return a->wasChoked ? 1 : -1;
2973
2974    if( a->salt != b->salt ) /* random order */
2975        return a->salt - b->salt;
2976
2977    return 0;
2978}
2979
2980/* is this a new connection? */
2981static int
2982isNew( const tr_peer * peer )
2983{
2984    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
2985}
2986
2987/* get a rate for deciding which peers to choke and unchoke. */
2988static int
2989getRate( const tr_torrent * tor, struct peer_atom * atom, uint64_t now )
2990{
2991    int Bps;
2992
2993    if( tr_torrentIsSeed( tor ) )
2994        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_CLIENT_TO_PEER );
2995
2996    /* downloading a private torrent... take upload speed into account
2997     * because there may only be a small window of opportunity to share */
2998    else if( tr_torrentIsPrivate( tor ) )
2999        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_PEER_TO_CLIENT )
3000            + tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_CLIENT_TO_PEER );
3001
3002    /* downloading a public torrent */
3003    else
3004        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_PEER_TO_CLIENT );
3005
3006    /* convert it to bytes per second */
3007    return Bps;
3008}
3009
3010static inline bool
3011isBandwidthMaxedOut( const tr_bandwidth * b,
3012                     const uint64_t now_msec, tr_direction dir )
3013{
3014    if( !tr_bandwidthIsLimited( b, dir ) )
3015        return false;
3016    else {
3017        const int got = tr_bandwidthGetPieceSpeed_Bps( b, now_msec, dir );
3018        const int want = tr_bandwidthGetDesiredSpeed_Bps( b, dir );
3019        return got >= want;
3020    }
3021}
3022
3023static void
3024rechokeUploads( Torrent * t, const uint64_t now )
3025{
3026    int i, size, unchokedInterested;
3027    const int peerCount = tr_ptrArraySize( &t->peers );
3028    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
3029    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
3030    const tr_session * session = t->manager->session;
3031    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
3032    const bool isMaxedOut = isBandwidthMaxedOut( t->tor->bandwidth, now, TR_UP );
3033
3034    assert( torrentIsLocked( t ) );
3035
3036    /* an optimistic unchoke peer's "optimistic"
3037     * state lasts for N calls to rechokeUploads(). */
3038    if( t->optimisticUnchokeTimeScaler > 0 )
3039        t->optimisticUnchokeTimeScaler--;
3040    else
3041        t->optimistic = NULL;
3042
3043    /* sort the peers by preference and rate */
3044    for( i = 0, size = 0; i < peerCount; ++i )
3045    {
3046        tr_peer * peer = peers[i];
3047        struct peer_atom * atom = peer->atom;
3048
3049        if( peerIsSeed( peer ) ) /* choke seeds and partial seeds */
3050        {
3051            tr_peerMsgsSetChoke( peer->msgs, true );
3052        }
3053        else if( chokeAll ) /* choke everyone if we're not uploading */
3054        {
3055            tr_peerMsgsSetChoke( peer->msgs, true );
3056        }
3057        else if( peer != t->optimistic )
3058        {
3059            struct ChokeData * n = &choke[size++];
3060            n->peer         = peer;
3061            n->isInterested = peer->peerIsInterested;
3062            n->wasChoked    = peer->peerIsChoked;
3063            n->rate         = getRate( t->tor, atom, now );
3064            n->salt         = tr_cryptoWeakRandInt( INT_MAX );
3065            n->isChoked     = true;
3066        }
3067    }
3068
3069    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
3070
3071    /**
3072     * Reciprocation and number of uploads capping is managed by unchoking
3073     * the N peers which have the best upload rate and are interested.
3074     * This maximizes the client's download rate. These N peers are
3075     * referred to as downloaders, because they are interested in downloading
3076     * from the client.
3077     *
3078     * Peers which have a better upload rate (as compared to the downloaders)
3079     * but aren't interested get unchoked. If they become interested, the
3080     * downloader with the worst upload rate gets choked. If a client has
3081     * a complete file, it uses its upload rate rather than its download
3082     * rate to decide which peers to unchoke.
3083     *
3084     * If our bandwidth is maxed out, don't unchoke any more peers.
3085     */
3086    unchokedInterested = 0;
3087    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
3088        choke[i].isChoked = isMaxedOut ? choke[i].wasChoked : false;
3089        if( choke[i].isInterested )
3090            ++unchokedInterested;
3091    }
3092
3093    /* optimistic unchoke */
3094    if( !t->optimistic && !isMaxedOut && (i<size) )
3095    {
3096        int n;
3097        struct ChokeData * c;
3098        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
3099
3100        for( ; i<size; ++i )
3101        {
3102            if( choke[i].isInterested )
3103            {
3104                const tr_peer * peer = choke[i].peer;
3105                int x = 1, y;
3106                if( isNew( peer ) ) x *= 3;
3107                for( y=0; y<x; ++y )
3108                    tr_ptrArrayAppend( &randPool, &choke[i] );
3109            }
3110        }
3111
3112        if(( n = tr_ptrArraySize( &randPool )))
3113        {
3114            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
3115            c->isChoked = false;
3116            t->optimistic = c->peer;
3117            t->optimisticUnchokeTimeScaler = OPTIMISTIC_UNCHOKE_MULTIPLIER;
3118        }
3119
3120        tr_ptrArrayDestruct( &randPool, NULL );
3121    }
3122
3123    for( i=0; i<size; ++i )
3124        tr_peerMsgsSetChoke( choke[i].peer->msgs, choke[i].isChoked );
3125
3126    /* cleanup */
3127    tr_free( choke );
3128}
3129
3130static void
3131rechokePulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3132{
3133    tr_torrent * tor = NULL;
3134    tr_peerMgr * mgr = vmgr;
3135    const uint64_t now = tr_time_msec( );
3136
3137    managerLock( mgr );
3138
3139    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3140        if( tor->isRunning ) {
3141            rechokeUploads( tor->torrentPeers, now );
3142            if( !tr_torrentIsSeed( tor ) )
3143                rechokeDownloads( tor->torrentPeers );
3144        }
3145    }
3146
3147    tr_timerAddMsec( mgr->rechokeTimer, RECHOKE_PERIOD_MSEC );
3148    managerUnlock( mgr );
3149}
3150
3151/***
3152****
3153****  Life and Death
3154****
3155***/
3156
3157static bool
3158shouldPeerBeClosed( const Torrent    * t,
3159                    const tr_peer    * peer,
3160                    int                peerCount,
3161                    const time_t       now )
3162{
3163    const tr_torrent *       tor = t->tor;
3164    const struct peer_atom * atom = peer->atom;
3165
3166    /* if it's marked for purging, close it */
3167    if( peer->doPurge )
3168    {
3169        tordbg( t, "purging peer %s because its doPurge flag is set",
3170                tr_atomAddrStr( atom ) );
3171        return true;
3172    }
3173
3174    /* disconnect if we're both seeds and enough time has passed for PEX */
3175    if( tr_torrentIsSeed( tor ) && peerIsSeed( peer ) )
3176        return !tr_torrentAllowsPex(tor) || (now-atom->time>=30);
3177
3178    /* disconnect if it's been too long since piece data has been transferred.
3179     * this is on a sliding scale based on number of available peers... */
3180    {
3181        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
3182        /* if we have >= relaxIfFewerThan, strictness is 100%.
3183         * if we have zero connections, strictness is 0% */
3184        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
3185                               ? 1.0
3186                               : peerCount / (float)relaxStrictnessIfFewerThanN;
3187        const int lo = MIN_UPLOAD_IDLE_SECS;
3188        const int hi = MAX_UPLOAD_IDLE_SECS;
3189        const int limit = hi - ( ( hi - lo ) * strictness );
3190        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
3191/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
3192        if( idleTime > limit ) {
3193            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
3194                       tr_atomAddrStr( atom ), idleTime );
3195            return true;
3196        }
3197    }
3198
3199    return false;
3200}
3201
3202static tr_peer **
3203getPeersToClose( Torrent * t, const time_t now_sec, int * setmeSize )
3204{
3205    int i, peerCount, outsize;
3206    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
3207    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
3208
3209    assert( torrentIsLocked( t ) );
3210
3211    for( i = outsize = 0; i < peerCount; ++i )
3212        if( shouldPeerBeClosed( t, peers[i], peerCount, now_sec ) )
3213            ret[outsize++] = peers[i];
3214
3215    *setmeSize = outsize;
3216    return ret;
3217}
3218
3219static int
3220getReconnectIntervalSecs( const struct peer_atom * atom, const time_t now )
3221{
3222    int sec;
3223
3224    /* if we were recently connected to this peer and transferring piece
3225     * data, try to reconnect to them sooner rather that later -- we don't
3226     * want network troubles to get in the way of a good peer. */
3227    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
3228        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3229
3230    /* don't allow reconnects more often than our minimum */
3231    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
3232        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3233
3234    /* otherwise, the interval depends on how many times we've tried
3235     * and failed to connect to the peer */
3236    else switch( atom->numFails ) {
3237        case 0: sec = 0; break;
3238        case 1: sec = 5; break;
3239        case 2: sec = 2 * 60; break;
3240        case 3: sec = 15 * 60; break;
3241        case 4: sec = 30 * 60; break;
3242        case 5: sec = 60 * 60; break;
3243        default: sec = 120 * 60; break;
3244    }
3245
3246    /* penalize peers that were unreachable the last time we tried */
3247    if( atom->flags2 & MYFLAG_UNREACHABLE )
3248        sec += sec;
3249
3250    dbgmsg( "reconnect interval for %s is %d seconds", tr_atomAddrStr( atom ), sec );
3251    return sec;
3252}
3253
3254static void
3255removePeer( Torrent * t, tr_peer * peer )
3256{
3257    tr_peer * removed;
3258    struct peer_atom * atom = peer->atom;
3259
3260    assert( torrentIsLocked( t ) );
3261    assert( atom );
3262
3263    atom->time = tr_time( );
3264
3265    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
3266
3267    if( replicationExists( t ) )
3268        tr_decrReplicationFromBitfield( t, &peer->have );
3269
3270    assert( removed == peer );
3271    peerDelete( t, removed );
3272}
3273
3274static void
3275closePeer( Torrent * t, tr_peer * peer )
3276{
3277    struct peer_atom * atom;
3278
3279    assert( t != NULL );
3280    assert( peer != NULL );
3281
3282    atom = peer->atom;
3283
3284    /* if we transferred piece data, then they might be good peers,
3285       so reset their `numFails' weight to zero. otherwise we connected
3286       to them fruitlessly, so mark it as another fail */
3287    if( atom->piece_data_time ) {
3288        tordbg( t, "resetting atom %s numFails to 0", tr_atomAddrStr(atom) );
3289        atom->numFails = 0;
3290    } else {
3291        ++atom->numFails;
3292        tordbg( t, "incremented atom %s numFails to %d", tr_atomAddrStr(atom), (int)atom->numFails );
3293    }
3294
3295    tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
3296    removePeer( t, peer );
3297}
3298
3299static void
3300removeAllPeers( Torrent * t )
3301{
3302    while( !tr_ptrArrayEmpty( &t->peers ) )
3303        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
3304}
3305
3306static void
3307closeBadPeers( Torrent * t, const time_t now_sec )
3308{
3309    int i;
3310    int peerCount;
3311    struct tr_peer ** peers = getPeersToClose( t, now_sec, &peerCount );
3312    for( i=0; i<peerCount; ++i )
3313        closePeer( t, peers[i] );
3314    tr_free( peers );
3315}
3316
3317struct peer_liveliness
3318{
3319    tr_peer * peer;
3320    void * clientData;
3321    time_t pieceDataTime;
3322    time_t time;
3323    int speed;
3324    bool doPurge;
3325};
3326
3327static int
3328comparePeerLiveliness( const void * va, const void * vb )
3329{
3330    const struct peer_liveliness * a = va;
3331    const struct peer_liveliness * b = vb;
3332
3333    if( a->doPurge != b->doPurge )
3334        return a->doPurge ? 1 : -1;
3335
3336    if( a->speed != b->speed ) /* faster goes first */
3337        return a->speed > b->speed ? -1 : 1;
3338
3339    /* the one to give us data more recently goes first */
3340    if( a->pieceDataTime != b->pieceDataTime )
3341        return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
3342
3343    /* the one we connected to most recently goes first */
3344    if( a->time != b->time )
3345        return a->time > b->time ? -1 : 1;
3346
3347    return 0;
3348}
3349
3350static void
3351sortPeersByLivelinessImpl( tr_peer  ** peers,
3352                           void     ** clientData,
3353                           int         n,
3354                           uint64_t    now,
3355                           int (*compare) ( const void *va, const void *vb ) )
3356{
3357    int i;
3358    struct peer_liveliness *lives, *l;
3359
3360    /* build a sortable array of peer + extra info */
3361    lives = l = tr_new0( struct peer_liveliness, n );
3362    for( i=0; i<n; ++i, ++l )
3363    {
3364        tr_peer * p = peers[i];
3365        l->peer = p;
3366        l->doPurge = p->doPurge;
3367        l->pieceDataTime = p->atom->piece_data_time;
3368        l->time = p->atom->time;
3369        l->speed = tr_peerGetPieceSpeed_Bps( p, now, TR_UP )
3370                 + tr_peerGetPieceSpeed_Bps( p, now, TR_DOWN );
3371        if( clientData )
3372            l->clientData = clientData[i];
3373    }
3374
3375    /* sort 'em */
3376    assert( n == ( l - lives ) );
3377    qsort( lives, n, sizeof( struct peer_liveliness ), compare );
3378
3379    /* build the peer array */
3380    for( i=0, l=lives; i<n; ++i, ++l ) {
3381        peers[i] = l->peer;
3382        if( clientData )
3383            clientData[i] = l->clientData;
3384    }
3385    assert( n == ( l - lives ) );
3386
3387    /* cleanup */
3388    tr_free( lives );
3389}
3390
3391static void
3392sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now )
3393{
3394    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLiveliness );
3395}
3396
3397
3398static void
3399enforceTorrentPeerLimit( Torrent * t, uint64_t now )
3400{
3401    int n = tr_ptrArraySize( &t->peers );
3402    const int max = tr_torrentGetPeerLimit( t->tor );
3403    if( n > max )
3404    {
3405        void * base = tr_ptrArrayBase( &t->peers );
3406        tr_peer ** peers = tr_memdup( base, n*sizeof( tr_peer* ) );
3407        sortPeersByLiveliness( peers, NULL, n, now );
3408        while( n > max )
3409            closePeer( t, peers[--n] );
3410        tr_free( peers );
3411    }
3412}
3413
3414static void
3415enforceSessionPeerLimit( tr_session * session, uint64_t now )
3416{
3417    int n = 0;
3418    tr_torrent * tor = NULL;
3419    const int max = tr_sessionGetPeerLimit( session );
3420
3421    /* count the total number of peers */
3422    while(( tor = tr_torrentNext( session, tor )))
3423        n += tr_ptrArraySize( &tor->torrentPeers->peers );
3424
3425    /* if there are too many, prune out the worst */
3426    if( n > max )
3427    {
3428        tr_peer ** peers = tr_new( tr_peer*, n );
3429        Torrent ** torrents = tr_new( Torrent*, n );
3430
3431        /* populate the peer array */
3432        n = 0;
3433        tor = NULL;
3434        while(( tor = tr_torrentNext( session, tor ))) {
3435            int i;
3436            Torrent * t = tor->torrentPeers;
3437            const int tn = tr_ptrArraySize( &t->peers );
3438            for( i=0; i<tn; ++i, ++n ) {
3439                peers[n] = tr_ptrArrayNth( &t->peers, i );
3440                torrents[n] = t;
3441            }
3442        }
3443
3444        /* sort 'em */
3445        sortPeersByLiveliness( peers, (void**)torrents, n, now );
3446
3447        /* cull out the crappiest */
3448        while( n-- > max )
3449            closePeer( torrents[n], peers[n] );
3450
3451        /* cleanup */
3452        tr_free( torrents );
3453        tr_free( peers );
3454    }
3455}
3456
3457static void makeNewPeerConnections( tr_peerMgr * mgr, const int max );
3458
3459static void
3460reconnectPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3461{
3462    tr_torrent * tor;
3463    tr_peerMgr * mgr = vmgr;
3464    const time_t now_sec = tr_time( );
3465    const uint64_t now_msec = tr_time_msec( );
3466
3467    /**
3468    ***  enforce the per-session and per-torrent peer limits
3469    **/
3470
3471    /* if we're over the per-torrent peer limits, cull some peers */
3472    tor = NULL;
3473    while(( tor = tr_torrentNext( mgr->session, tor )))
3474        if( tor->isRunning )
3475            enforceTorrentPeerLimit( tor->torrentPeers, now_msec );
3476
3477    /* if we're over the per-session peer limits, cull some peers */
3478    enforceSessionPeerLimit( mgr->session, now_msec );
3479
3480    /* remove crappy peers */
3481    tor = NULL;
3482    while(( tor = tr_torrentNext( mgr->session, tor )))
3483        if( !tor->torrentPeers->isRunning )
3484            removeAllPeers( tor->torrentPeers );
3485        else
3486            closeBadPeers( tor->torrentPeers, now_sec );
3487
3488    /* try to make new peer connections */
3489    makeNewPeerConnections( mgr, MAX_CONNECTIONS_PER_PULSE );
3490}
3491
3492/****
3493*****
3494*****  BANDWIDTH ALLOCATION
3495*****
3496****/
3497
3498static void
3499pumpAllPeers( tr_peerMgr * mgr )
3500{
3501    tr_torrent * tor = NULL;
3502
3503    while(( tor = tr_torrentNext( mgr->session, tor )))
3504    {
3505        int j;
3506        Torrent * t = tor->torrentPeers;
3507
3508        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
3509        {
3510            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
3511            tr_peerMsgsPulse( peer->msgs );
3512        }
3513    }
3514}
3515
3516static void
3517bandwidthPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3518{
3519    tr_torrent * tor;
3520    tr_peerMgr * mgr = vmgr;
3521    managerLock( mgr );
3522
3523    /* FIXME: this next line probably isn't necessary... */
3524    pumpAllPeers( mgr );
3525
3526    /* allocate bandwidth to the peers */
3527    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
3528    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
3529
3530    /* possibly stop torrents that have seeded enough */
3531    tor = NULL;
3532    while(( tor = tr_torrentNext( mgr->session, tor )))
3533        tr_torrentCheckSeedLimit( tor );
3534
3535    /* run the completeness check for any torrents that need it */
3536    tor = NULL;
3537    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3538        if( tor->torrentPeers->needsCompletenessCheck ) {
3539            tor->torrentPeers->needsCompletenessCheck  = false;
3540            tr_torrentRecheckCompleteness( tor );
3541        }
3542    }
3543
3544    /* stop torrents that are ready to stop, but couldn't be stopped earlier
3545     * during the peer-io callback call chain */
3546    tor = NULL;
3547    while(( tor = tr_torrentNext( mgr->session, tor )))
3548        if( tor->isStopping )
3549            tr_torrentStop( tor );
3550
3551    reconnectPulse( 0, 0, mgr );
3552
3553    tr_timerAddMsec( mgr->bandwidthTimer, BANDWIDTH_PERIOD_MSEC );
3554    managerUnlock( mgr );
3555}
3556
3557/***
3558****
3559***/
3560
3561static int
3562compareAtomPtrsByAddress( const void * va, const void *vb )
3563{
3564    const struct peer_atom * a = * (const struct peer_atom**) va;
3565    const struct peer_atom * b = * (const struct peer_atom**) vb;
3566
3567    assert( tr_isAtom( a ) );
3568    assert( tr_isAtom( b ) );
3569
3570    return tr_address_compare( &a->addr, &b->addr );
3571}
3572
3573/* best come first, worst go last */
3574static int
3575compareAtomPtrsByShelfDate( const void * va, const void *vb )
3576{
3577    time_t atime;
3578    time_t btime;
3579    const struct peer_atom * a = * (const struct peer_atom**) va;
3580    const struct peer_atom * b = * (const struct peer_atom**) vb;
3581    const int data_time_cutoff_secs = 60 * 60;
3582    const time_t tr_now = tr_time( );
3583
3584    assert( tr_isAtom( a ) );
3585    assert( tr_isAtom( b ) );
3586
3587    /* primary key: the last piece data time *if* it was within the last hour */
3588    atime = a->piece_data_time; if( atime + data_time_cutoff_secs < tr_now ) atime = 0;
3589    btime = b->piece_data_time; if( btime + data_time_cutoff_secs < tr_now ) btime = 0;
3590    if( atime != btime )
3591        return atime > btime ? -1 : 1;
3592
3593    /* secondary key: shelf date. */
3594    if( a->shelf_date != b->shelf_date )
3595        return a->shelf_date > b->shelf_date ? -1 : 1;
3596
3597    return 0;
3598}
3599
3600static int
3601getMaxAtomCount( const tr_torrent * tor )
3602{
3603    const int n = tor->maxConnectedPeers;
3604    /* approximate fit of the old jump discontinuous function */
3605    if( n >= 55 ) return     n + 150;
3606    if( n >= 20 ) return 2 * n + 95;
3607    return               4 * n + 55;
3608}
3609
3610static void
3611atomPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3612{
3613    tr_torrent * tor = NULL;
3614    tr_peerMgr * mgr = vmgr;
3615    managerLock( mgr );
3616
3617    while(( tor = tr_torrentNext( mgr->session, tor )))
3618    {
3619        int atomCount;
3620        Torrent * t = tor->torrentPeers;
3621        const int maxAtomCount = getMaxAtomCount( tor );
3622        struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
3623
3624        if( atomCount > maxAtomCount ) /* we've got too many atoms... time to prune */
3625        {
3626            int i;
3627            int keepCount = 0;
3628            int testCount = 0;
3629            struct peer_atom ** keep = tr_new( struct peer_atom*, atomCount );
3630            struct peer_atom ** test = tr_new( struct peer_atom*, atomCount );
3631
3632            /* keep the ones that are in use */
3633            for( i=0; i<atomCount; ++i ) {
3634                struct peer_atom * atom = atoms[i];
3635                if( peerIsInUse( t, atom ) )
3636                    keep[keepCount++] = atom;
3637                else
3638                    test[testCount++] = atom;
3639            }
3640
3641            /* if there's room, keep the best of what's left */
3642            i = 0;
3643            if( keepCount < maxAtomCount ) {
3644                qsort( test, testCount, sizeof( struct peer_atom * ), compareAtomPtrsByShelfDate );
3645                while( i<testCount && keepCount<maxAtomCount )
3646                    keep[keepCount++] = test[i++];
3647            }
3648
3649            /* free the culled atoms */
3650            while( i<testCount )
3651                tr_free( test[i++] );
3652
3653            /* rebuild Torrent.pool with what's left */
3654            tr_ptrArrayDestruct( &t->pool, NULL );
3655            t->pool = TR_PTR_ARRAY_INIT;
3656            qsort( keep, keepCount, sizeof( struct peer_atom * ), compareAtomPtrsByAddress );
3657            for( i=0; i<keepCount; ++i )
3658                tr_ptrArrayAppend( &t->pool, keep[i] );
3659
3660            tordbg( t, "max atom count is %d... pruned from %d to %d\n", maxAtomCount, atomCount, keepCount );
3661
3662            /* cleanup */
3663            tr_free( test );
3664            tr_free( keep );
3665        }
3666    }
3667
3668    tr_timerAddMsec( mgr->atomTimer, ATOM_PERIOD_MSEC );
3669    managerUnlock( mgr );
3670}
3671
3672/***
3673****
3674****
3675****
3676***/
3677
3678/* is this atom someone that we'd want to initiate a connection to? */
3679static bool
3680isPeerCandidate( const tr_torrent * tor, struct peer_atom * atom, const time_t now )
3681{
3682    /* not if we're both seeds */
3683    if( tr_torrentIsSeed( tor ) && atomIsSeed( atom ) )
3684        return false;
3685
3686    /* not if we've already got a connection to them... */
3687    if( peerIsInUse( tor->torrentPeers, atom ) )
3688        return false;
3689
3690    /* not if we just tried them already */
3691    if( ( now - atom->time ) < getReconnectIntervalSecs( atom, now ) )
3692        return false;
3693
3694    /* not if they're blocklisted */
3695    if( isAtomBlocklisted( tor->session, atom ) )
3696        return false;
3697
3698    /* not if they're banned... */
3699    if( atom->flags2 & MYFLAG_BANNED )
3700        return false;
3701
3702    return true;
3703}
3704
3705struct peer_candidate
3706{
3707    uint64_t score;
3708    tr_torrent * tor;
3709    struct peer_atom * atom;
3710};
3711
3712static bool
3713torrentWasRecentlyStarted( const tr_torrent * tor )
3714{
3715    return difftime( tr_time( ), tor->startDate ) < 120;
3716}
3717
3718static inline uint64_t
3719addValToKey( uint64_t value, int width, uint64_t addme )
3720{
3721    value = (value << (uint64_t)width);
3722    value |= addme;
3723    return value;
3724}
3725
3726/* smaller value is better */
3727static uint64_t
3728getPeerCandidateScore( const tr_torrent * tor, const struct peer_atom * atom, uint8_t salt  )
3729{
3730    uint64_t i;
3731    uint64_t score = 0;
3732    const bool failed = atom->lastConnectionAt < atom->lastConnectionAttemptAt;
3733
3734    /* prefer peers we've connected to, or never tried, over peers we failed to connect to. */
3735    i = failed ? 1 : 0;
3736    score = addValToKey( score, 1, i );
3737
3738    /* prefer the one we attempted least recently (to cycle through all peers) */
3739    i = atom->lastConnectionAttemptAt;
3740    score = addValToKey( score, 32, i );
3741
3742    /* prefer peers belonging to a torrent of a higher priority */
3743    switch( tr_torrentGetPriority( tor ) ) {
3744        case TR_PRI_HIGH:    i = 0; break;
3745        case TR_PRI_NORMAL:  i = 1; break;
3746        case TR_PRI_LOW:     i = 2; break;
3747    }
3748    score = addValToKey( score, 4, i );
3749
3750    /* prefer recently-started torrents */
3751    i = torrentWasRecentlyStarted( tor ) ? 0 : 1;
3752    score = addValToKey( score, 1, i );
3753
3754    /* prefer torrents we're downloading with */
3755    i = tr_torrentIsSeed( tor ) ? 1 : 0;
3756    score = addValToKey( score, 1, i );
3757
3758    /* prefer peers that are known to be connectible */
3759    i = ( atom->flags & ADDED_F_CONNECTABLE ) ? 0 : 1;
3760    score = addValToKey( score, 1, i );
3761
3762    /* prefer peers that we might have a chance of uploading to...
3763       so lower seed probability is better */
3764    if( atom->seedProbability == 100 ) i = 101;
3765    else if( atom->seedProbability == -1 ) i = 100;
3766    else i = atom->seedProbability;
3767    score = addValToKey( score, 8, i );
3768
3769    /* Prefer peers that we got from more trusted sources.
3770     * lower `fromBest' values indicate more trusted sources */
3771    score = addValToKey( score, 4, atom->fromBest );
3772
3773    /* salt */
3774    score = addValToKey( score, 8, salt );
3775
3776    return score;
3777}
3778
3779/* sort an array of peer candidates */
3780static int
3781comparePeerCandidates( const void * va, const void * vb )
3782{
3783    const struct peer_candidate * a = va;
3784    const struct peer_candidate * b = vb;
3785
3786    if( a->score < b->score ) return -1;
3787    if( a->score > b->score ) return 1;
3788
3789    return 0;
3790}
3791
3792/** @return an array of all the atoms we might want to connect to */
3793static struct peer_candidate*
3794getPeerCandidates( tr_session * session, int * candidateCount )
3795{
3796    int n;
3797    tr_torrent * tor;
3798    struct peer_candidate * candidates;
3799    struct peer_candidate * walk;
3800    const time_t now = tr_time( );
3801    const uint64_t now_msec = tr_time_msec( );
3802    /* leave 5% of connection slots for incoming connections -- ticket #2609 */
3803    const int maxCandidates = tr_sessionGetPeerLimit( session ) * 0.95;
3804
3805    /* don't start any new handshakes if we're full up */
3806    n = 0;
3807    tor= NULL;
3808    while(( tor = tr_torrentNext( session, tor )))
3809        n += tr_ptrArraySize( &tor->torrentPeers->peers );
3810    if( maxCandidates <= n ) {
3811        *candidateCount = 0;
3812        return NULL;
3813    }
3814
3815    /* allocate an array of candidates */
3816    n = 0;
3817    tor= NULL;
3818    while(( tor = tr_torrentNext( session, tor )))
3819        n += tr_ptrArraySize( &tor->torrentPeers->pool );
3820    walk = candidates = tr_new( struct peer_candidate, n );
3821
3822    /* populate the candidate array */
3823    tor = NULL;
3824    while(( tor = tr_torrentNext( session, tor )))
3825    {
3826        int i, nAtoms;
3827        struct peer_atom ** atoms;
3828
3829        if( !tor->torrentPeers->isRunning )
3830            continue;
3831
3832        /* if we've already got enough peers in this torrent... */
3833        if( tr_torrentGetPeerLimit( tor ) <= tr_ptrArraySize( &tor->torrentPeers->peers ) )
3834            continue;
3835
3836        /* if we've already got enough speed in this torrent... */
3837        if( tr_torrentIsSeed( tor ) && isBandwidthMaxedOut( tor->bandwidth, now_msec, TR_UP ) )
3838            continue;
3839
3840        atoms = (struct peer_atom**) tr_ptrArrayPeek( &tor->torrentPeers->pool, &nAtoms );
3841        for( i=0; i<nAtoms; ++i )
3842        {
3843            struct peer_atom * atom = atoms[i];
3844
3845            if( isPeerCandidate( tor, atom, now ) )
3846            {
3847                const uint8_t salt = tr_cryptoWeakRandInt( 1024 );
3848                walk->tor = tor;
3849                walk->atom = atom;
3850                walk->score = getPeerCandidateScore( tor, atom, salt );
3851                ++walk;
3852            }
3853        }
3854    }
3855
3856    *candidateCount = walk - candidates;
3857    if( *candidateCount > 1 )
3858        qsort( candidates, *candidateCount, sizeof( struct peer_candidate ), comparePeerCandidates );
3859    return candidates;
3860}
3861
3862static void
3863initiateConnection( tr_peerMgr * mgr, Torrent * t, struct peer_atom * atom )
3864{
3865    tr_peerIo * io;
3866    const time_t now = tr_time( );
3867    bool utp = tr_sessionIsUTPEnabled(mgr->session) && !atom->utp_failed;
3868
3869    if( atom->fromFirst == TR_PEER_FROM_PEX )
3870        /* PEX has explicit signalling for uTP support.  If an atom
3871           originally came from PEX and doesn't have the uTP flag, skip the
3872           uTP connection attempt.  Are we being optimistic here? */
3873        utp = utp && (atom->flags & ADDED_F_UTP_FLAGS);
3874
3875    tordbg( t, "Starting an OUTGOING%s connection with %s",
3876            utp ? " µTP" : "",
3877            tr_atomAddrStr( atom ) );
3878
3879    io = tr_peerIoNewOutgoing( mgr->session,
3880                               mgr->session->bandwidth,
3881                               &atom->addr,
3882                               atom->port,
3883                               t->tor->info.hash,
3884                               t->tor->completeness == TR_SEED,
3885                               utp );
3886
3887    if( io == NULL )
3888    {
3889        tordbg( t, "peerIo not created; marking peer %s as unreachable",
3890                tr_atomAddrStr( atom ) );
3891        atom->flags2 |= MYFLAG_UNREACHABLE;
3892        atom->numFails++;
3893    }
3894    else
3895    {
3896        tr_handshake * handshake = tr_handshakeNew( io,
3897                                                    mgr->session->encryptionMode,
3898                                                    myHandshakeDoneCB,
3899                                                    mgr );
3900
3901        assert( tr_peerIoGetTorrentHash( io ) );
3902
3903        tr_peerIoUnref( io ); /* balanced by the initial ref
3904                                 in tr_peerIoNewOutgoing() */
3905
3906        tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
3907                                 handshakeCompare );
3908    }
3909
3910    atom->lastConnectionAttemptAt = now;
3911    atom->time = now;
3912}
3913
3914static void
3915initiateCandidateConnection( tr_peerMgr * mgr, struct peer_candidate * c )
3916{
3917#if 0
3918    fprintf( stderr, "Starting an OUTGOING connection with %s - [%s] seedProbability==%d; %s, %s\n",
3919             tr_atomAddrStr( c->atom ),
3920             tr_torrentName( c->tor ),
3921             (int)c->atom->seedProbability,
3922             tr_torrentIsPrivate( c->tor ) ? "private" : "public",
3923             tr_torrentIsSeed( c->tor ) ? "seed" : "downloader" );
3924#endif
3925
3926    initiateConnection( mgr, c->tor->torrentPeers, c->atom );
3927}
3928
3929static void
3930makeNewPeerConnections( struct tr_peerMgr * mgr, const int max )
3931{
3932    int i, n;
3933    struct peer_candidate * candidates;
3934
3935    candidates = getPeerCandidates( mgr->session, &n );
3936
3937    for( i=0; i<n && i<max; ++i )
3938        initiateCandidateConnection( mgr, &candidates[i] );
3939
3940    tr_free( candidates );
3941}
Note: See TracBrowser for help on using the repository browser.