source: trunk/libtransmission/peer-mgr.c @ 11957

Last change on this file since 11957 was 11957, checked in by jch, 11 years ago

Don't attempt uTP connexions to peers learned from PEX that didn't signal support.

  • Property svn:keywords set to Date Rev Author Id
File size: 114.4 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 11957 2011-02-18 00:43:49Z jch $
11 */
12
13#include <assert.h>
14#include <errno.h> /* error codes ERANGE, ... */
15#include <limits.h> /* INT_MAX */
16#include <string.h> /* memcpy, memcmp, strstr */
17#include <stdlib.h> /* qsort */
18
19#include <event2/event.h>
20#include <libutp/utp.h>
21
22#include "transmission.h"
23#include "announcer.h"
24#include "bandwidth.h"
25#include "bencode.h"
26#include "blocklist.h"
27#include "cache.h"
28#include "clients.h"
29#include "completion.h"
30#include "crypto.h"
31#include "handshake.h"
32#include "net.h"
33#include "peer-io.h"
34#include "peer-mgr.h"
35#include "peer-msgs.h"
36#include "ptrarray.h"
37#include "session.h"
38#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
39#include "torrent.h"
40#include "utils.h"
41#include "webseed.h"
42
43enum
44{
45    /* how frequently to cull old atoms */
46    ATOM_PERIOD_MSEC = ( 60 * 1000 ),
47
48    /* how frequently to change which peers are choked */
49    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
50
51    /* an optimistically unchoked peer is immune from rechoking
52       for this many calls to rechokeUploads(). */
53    OPTIMISTIC_UNCHOKE_MULTIPLIER = 4,
54
55    /* how frequently to reallocate bandwidth */
56    BANDWIDTH_PERIOD_MSEC = 500,
57
58    /* how frequently to age out old piece request lists */
59    REFILL_UPKEEP_PERIOD_MSEC = ( 10 * 1000 ),
60
61    /* how frequently to decide which peers live and die */
62    RECONNECT_PERIOD_MSEC = 500,
63
64    /* when many peers are available, keep idle ones this long */
65    MIN_UPLOAD_IDLE_SECS = ( 60 ),
66
67    /* when few peers are available, keep idle ones this long */
68    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
69
70    /* max number of peers to ask for per second overall.
71     * this throttle is to avoid overloading the router */
72    MAX_CONNECTIONS_PER_SECOND = 12,
73
74    MAX_CONNECTIONS_PER_PULSE = (int)(MAX_CONNECTIONS_PER_SECOND * (RECONNECT_PERIOD_MSEC/1000.0)),
75
76    /* number of bad pieces a peer is allowed to send before we ban them */
77    MAX_BAD_PIECES_PER_PEER = 5,
78
79    /* amount of time to keep a list of request pieces lying around
80       before it's considered too old and needs to be rebuilt */
81    PIECE_LIST_SHELF_LIFE_SECS = 60,
82
83    /* use for bitwise operations w/peer_atom.flags2 */
84    MYFLAG_BANNED = 1,
85
86    /* use for bitwise operations w/peer_atom.flags2 */
87    /* unreachable for now... but not banned.
88     * if they try to connect to us it's okay */
89    MYFLAG_UNREACHABLE = 2,
90
91    /* the minimum we'll wait before attempting to reconnect to a peer */
92    MINIMUM_RECONNECT_INTERVAL_SECS = 5,
93
94    /** how long we'll let requests we've made linger before we cancel them */
95    REQUEST_TTL_SECS = 120,
96
97    NO_BLOCKS_CANCEL_HISTORY = 120,
98
99    CANCEL_HISTORY_SEC = 60
100};
101
102const tr_peer_event TR_PEER_EVENT_INIT = { 0, 0, NULL, 0, 0, 0.0f, 0, FALSE, 0 };
103
104/**
105***
106**/
107
108enum
109{
110    UPLOAD_ONLY_UKNOWN,
111    UPLOAD_ONLY_YES,
112    UPLOAD_ONLY_NO
113};
114
115/**
116 * Peer information that should be kept even before we've connected and
117 * after we've disconnected. These are kept in a pool of peer_atoms to decide
118 * which ones would make good candidates for connecting to, and to watch out
119 * for banned peers.
120 *
121 * @see tr_peer
122 * @see tr_peermsgs
123 */
124struct peer_atom
125{
126    uint8_t     fromFirst;          /* where the peer was first found */
127    uint8_t     fromBest;           /* the "best" value of where the peer has been found */
128    uint8_t     flags;              /* these match the added_f flags */
129    uint8_t     flags2;             /* flags that aren't defined in added_f */
130    uint8_t     uploadOnly;         /* UPLOAD_ONLY_ */
131    int8_t      seedProbability;    /* how likely is this to be a seed... [0..100] or -1 for unknown */
132    int8_t      blocklisted;        /* -1 for unknown, TRUE for blocklisted, FALSE for not blocklisted */
133
134    tr_port     port;
135    tr_bool     utp_failed;         /* We recently failed to connect over uTP */
136    uint16_t    numFails;
137    time_t      time;               /* when the peer's connection status last changed */
138    time_t      piece_data_time;
139
140    time_t      lastConnectionAttemptAt;
141    time_t      lastConnectionAt;
142
143    /* similar to a TTL field, but less rigid --
144     * if the swarm is small, the atom will be kept past this date. */
145    time_t      shelf_date;
146    tr_peer   * peer;               /* will be NULL if not connected */
147    tr_address  addr;
148};
149
150#ifdef NDEBUG
151#define tr_isAtom(a) (TRUE)
152#else
153static tr_bool
154tr_isAtom( const struct peer_atom * atom )
155{
156    return ( atom != NULL )
157        && ( atom->fromFirst < TR_PEER_FROM__MAX )
158        && ( atom->fromBest < TR_PEER_FROM__MAX )
159        && ( tr_isAddress( &atom->addr ) );
160}
161#endif
162
163static const char*
164tr_atomAddrStr( const struct peer_atom * atom )
165{
166    return atom ? tr_peerIoAddrStr( &atom->addr, atom->port ) : "[no atom]";
167}
168
169struct block_request
170{
171    tr_block_index_t block;
172    tr_peer * peer;
173    time_t sentAt;
174};
175
176struct weighted_piece
177{
178    tr_piece_index_t index;
179    int16_t salt;
180    int16_t requestCount;
181};
182
183enum piece_sort_state
184{
185    PIECES_UNSORTED,
186    PIECES_SORTED_BY_INDEX,
187    PIECES_SORTED_BY_WEIGHT
188};
189
190/** @brief Opaque, per-torrent data structure for peer connection information */
191typedef struct tr_torrent_peers
192{
193    tr_ptrArray                outgoingHandshakes; /* tr_handshake */
194    tr_ptrArray                pool; /* struct peer_atom */
195    tr_ptrArray                peers; /* tr_peer */
196    tr_ptrArray                webseeds; /* tr_webseed */
197
198    tr_torrent               * tor;
199    struct tr_peerMgr        * manager;
200
201    tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
202    int                        optimisticUnchokeTimeScaler;
203
204    tr_bool                    isRunning;
205    tr_bool                    needsCompletenessCheck;
206
207    struct block_request     * requests;
208    int                        requestCount;
209    int                        requestAlloc;
210
211    struct weighted_piece    * pieces;
212    int                        pieceCount;
213    enum piece_sort_state      pieceSortState;
214
215    /* An array of pieceCount items stating how many peers have each piece.
216       This is used to help us for downloading pieces "rarest first."
217       This may be NULL if we don't have metainfo yet, or if we're not
218       downloading and don't care about rarity */
219    uint16_t                 * pieceReplication;
220    size_t                     pieceReplicationSize;
221
222    int                        interestedCount;
223    int                        maxPeers;
224    time_t                     lastCancel;
225
226    /* Before the endgame this should be 0. In endgame, is contains the average
227     * number of pending requests per peer. Only peers which have more pending
228     * requests are considered 'fast' are allowed to request a block that's
229     * already been requested from another (slower?) peer. */
230    int                        endgame;
231}
232Torrent;
233
234struct tr_peerMgr
235{
236    tr_session    * session;
237    tr_ptrArray     incomingHandshakes; /* tr_handshake */
238    struct event  * bandwidthTimer;
239    struct event  * rechokeTimer;
240    struct event  * refillUpkeepTimer;
241    struct event  * atomTimer;
242};
243
244#define tordbg( t, ... ) \
245    do { \
246        if( tr_deepLoggingIsActive( ) ) \
247            tr_deepLog( __FILE__, __LINE__, tr_torrentName( t->tor ), __VA_ARGS__ ); \
248    } while( 0 )
249
250#define dbgmsg( ... ) \
251    do { \
252        if( tr_deepLoggingIsActive( ) ) \
253            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
254    } while( 0 )
255
256/**
257***
258**/
259
260static inline void
261managerLock( const struct tr_peerMgr * manager )
262{
263    tr_sessionLock( manager->session );
264}
265
266static inline void
267managerUnlock( const struct tr_peerMgr * manager )
268{
269    tr_sessionUnlock( manager->session );
270}
271
272static inline void
273torrentLock( Torrent * torrent )
274{
275    managerLock( torrent->manager );
276}
277
278static inline void
279torrentUnlock( Torrent * torrent )
280{
281    managerUnlock( torrent->manager );
282}
283
284static inline int
285torrentIsLocked( const Torrent * t )
286{
287    return tr_sessionIsLocked( t->manager->session );
288}
289
290/**
291***
292**/
293
294static int
295handshakeCompareToAddr( const void * va, const void * vb )
296{
297    const tr_handshake * a = va;
298
299    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
300}
301
302static int
303handshakeCompare( const void * a, const void * b )
304{
305    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
306}
307
308static inline tr_handshake*
309getExistingHandshake( tr_ptrArray * handshakes, const tr_address * addr )
310{
311    if( tr_ptrArrayEmpty( handshakes ) )
312        return NULL;
313
314    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
315}
316
317static int
318comparePeerAtomToAddress( const void * va, const void * vb )
319{
320    const struct peer_atom * a = va;
321
322    return tr_compareAddresses( &a->addr, vb );
323}
324
325static int
326compareAtomsByAddress( const void * va, const void * vb )
327{
328    const struct peer_atom * b = vb;
329
330    assert( tr_isAtom( b ) );
331
332    return comparePeerAtomToAddress( va, &b->addr );
333}
334
335/**
336***
337**/
338
339const tr_address *
340tr_peerAddress( const tr_peer * peer )
341{
342    return &peer->atom->addr;
343}
344
345static Torrent*
346getExistingTorrent( tr_peerMgr *    manager,
347                    const uint8_t * hash )
348{
349    tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash );
350
351    return tor == NULL ? NULL : tor->torrentPeers;
352}
353
354static int
355peerCompare( const void * a, const void * b )
356{
357    return tr_compareAddresses( tr_peerAddress( a ), tr_peerAddress( b ) );
358}
359
360static struct peer_atom*
361getExistingAtom( const Torrent    * t,
362                 const tr_address * addr )
363{
364    Torrent * tt = (Torrent*)t;
365    assert( torrentIsLocked( t ) );
366    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
367}
368
369static tr_bool
370peerIsInUse( const Torrent * ct, const struct peer_atom * atom )
371{
372    Torrent * t = (Torrent*) ct;
373
374    assert( torrentIsLocked ( t ) );
375
376    return ( atom->peer != NULL )
377        || getExistingHandshake( &t->outgoingHandshakes, &atom->addr )
378        || getExistingHandshake( &t->manager->incomingHandshakes, &atom->addr );
379}
380
381static tr_peer*
382peerConstructor( struct peer_atom * atom )
383{
384    tr_peer * peer = tr_new0( tr_peer, 1 );
385
386    tr_bitsetConstructor( &peer->have, 0 );
387
388    peer->atom = atom;
389    atom->peer = peer;
390
391    peer->blocksSentToClient  = tr_historyNew( CANCEL_HISTORY_SEC, ( RECHOKE_PERIOD_MSEC / 1000 ) );
392    peer->blocksSentToPeer    = tr_historyNew( CANCEL_HISTORY_SEC, ( RECHOKE_PERIOD_MSEC / 1000 ) );
393    peer->cancelsSentToClient = tr_historyNew( CANCEL_HISTORY_SEC, ( RECHOKE_PERIOD_MSEC / 1000 ) );
394    peer->cancelsSentToPeer   = tr_historyNew( CANCEL_HISTORY_SEC, ( REFILL_UPKEEP_PERIOD_MSEC / 1000 ) );
395
396    return peer;
397}
398
399static tr_peer*
400getPeer( Torrent * torrent, struct peer_atom * atom )
401{
402    tr_peer * peer;
403
404    assert( torrentIsLocked( torrent ) );
405
406    peer = atom->peer;
407
408    if( peer == NULL )
409    {
410        peer = peerConstructor( atom );
411        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
412    }
413
414    return peer;
415}
416
417static void peerDeclinedAllRequests( Torrent *, const tr_peer * );
418
419static void
420peerDestructor( Torrent * t, tr_peer * peer )
421{
422    assert( peer != NULL );
423
424    peerDeclinedAllRequests( t, peer );
425
426    if( peer->msgs != NULL )
427        tr_peerMsgsFree( peer->msgs );
428
429    tr_peerIoClear( peer->io );
430    tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
431
432    tr_historyFree( peer->blocksSentToClient  );
433    tr_historyFree( peer->blocksSentToPeer    );
434    tr_historyFree( peer->cancelsSentToClient );
435    tr_historyFree( peer->cancelsSentToPeer   );
436
437    tr_bitsetDestructor( &peer->have );
438    tr_bitfieldFree( peer->blame );
439    tr_free( peer->client );
440    peer->atom->peer = NULL;
441
442    tr_free( peer );
443}
444
445static tr_bool
446replicationExists( const Torrent * t )
447{
448    return t->pieceReplication != NULL;
449}
450
451static void
452replicationFree( Torrent * t )
453{
454    tr_free( t->pieceReplication );
455    t->pieceReplication = NULL;
456    t->pieceReplicationSize = 0;
457}
458
459static void
460replicationNew( Torrent * t )
461{
462    tr_piece_index_t piece_i;
463    const tr_piece_index_t piece_count = t->tor->info.pieceCount;
464    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
465    const int peer_count = tr_ptrArraySize( &t->peers );
466
467    assert( !replicationExists( t ) );
468
469    t->pieceReplicationSize = piece_count;
470    t->pieceReplication = tr_new0( uint16_t, piece_count );
471
472    for( piece_i=0; piece_i<piece_count; ++piece_i )
473    {
474        int peer_i;
475        uint16_t r = 0;
476
477        for( peer_i=0; peer_i<peer_count; ++peer_i )
478            if( tr_bitsetHasFast( &peers[peer_i]->have, piece_i ) )
479                ++r;
480
481        t->pieceReplication[piece_i] = r;
482    }
483}
484
485static void
486torrentDestructor( void * vt )
487{
488    Torrent * t = vt;
489
490    assert( t );
491    assert( !t->isRunning );
492    assert( torrentIsLocked( t ) );
493    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
494    assert( tr_ptrArrayEmpty( &t->peers ) );
495
496    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
497    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
498    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
499    tr_ptrArrayDestruct( &t->peers, NULL );
500
501    replicationFree( t );
502
503    tr_free( t->requests );
504    tr_free( t->pieces );
505    tr_free( t );
506}
507
508static void peerCallbackFunc( tr_peer *, const tr_peer_event *, void * );
509
510static Torrent*
511torrentConstructor( tr_peerMgr * manager,
512                    tr_torrent * tor )
513{
514    int       i;
515    Torrent * t;
516
517    t = tr_new0( Torrent, 1 );
518    t->manager = manager;
519    t->tor = tor;
520    t->pool = TR_PTR_ARRAY_INIT;
521    t->peers = TR_PTR_ARRAY_INIT;
522    t->webseeds = TR_PTR_ARRAY_INIT;
523    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
524
525    for( i = 0; i < tor->info.webseedCount; ++i )
526    {
527        tr_webseed * w =
528            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
529        tr_ptrArrayAppend( &t->webseeds, w );
530    }
531
532    return t;
533}
534
535tr_peerMgr*
536tr_peerMgrNew( tr_session * session )
537{
538    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
539    m->session = session;
540    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
541    return m;
542}
543
544static void
545deleteTimer( struct event ** t )
546{
547    if( *t != NULL )
548    {
549        event_free( *t );
550        *t = NULL;
551    }
552}
553
554static void
555deleteTimers( struct tr_peerMgr * m )
556{
557    deleteTimer( &m->atomTimer );
558    deleteTimer( &m->bandwidthTimer );
559    deleteTimer( &m->rechokeTimer );
560    deleteTimer( &m->refillUpkeepTimer );
561}
562
563void
564tr_peerMgrFree( tr_peerMgr * manager )
565{
566    managerLock( manager );
567
568    deleteTimers( manager );
569
570    /* free the handshakes. Abort invokes handshakeDoneCB(), which removes
571     * the item from manager->handshakes, so this is a little roundabout... */
572    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
573        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
574
575    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
576
577    managerUnlock( manager );
578    tr_free( manager );
579}
580
581static int
582clientIsDownloadingFrom( const tr_torrent * tor, const tr_peer * peer )
583{
584    if( !tr_torrentHasMetadata( tor ) )
585        return TRUE;
586
587    return peer->clientIsInterested && !peer->clientIsChoked;
588}
589
590static int
591clientIsUploadingTo( const tr_peer * peer )
592{
593    return peer->peerIsInterested && !peer->peerIsChoked;
594}
595
596/***
597****
598***/
599
600void
601tr_peerMgrOnBlocklistChanged( tr_peerMgr * mgr )
602{
603    tr_torrent * tor = NULL;
604    tr_session * session = mgr->session;
605
606    /* we cache whether or not a peer is blocklisted...
607       since the blocklist has changed, erase that cached value */
608    while(( tor = tr_torrentNext( session, tor )))
609    {
610        int i;
611        Torrent * t = tor->torrentPeers;
612        const int n = tr_ptrArraySize( &t->pool );
613        for( i=0; i<n; ++i ) {
614            struct peer_atom * atom = tr_ptrArrayNth( &t->pool, i );
615            atom->blocklisted = -1;
616        }
617    }
618}
619
620static tr_bool
621isAtomBlocklisted( tr_session * session, struct peer_atom * atom )
622{
623    if( atom->blocklisted < 0 )
624        atom->blocklisted = tr_sessionIsAddressBlocked( session, &atom->addr );
625
626    assert( tr_isBool( atom->blocklisted ) );
627    return atom->blocklisted;
628}
629
630
631/***
632****
633***/
634
635static void
636atomSetSeedProbability( struct peer_atom * atom, int seedProbability )
637{
638    assert( atom != NULL );
639    assert( -1<=seedProbability && seedProbability<=100 );
640
641    atom->seedProbability = seedProbability;
642
643    if( seedProbability == 100 )
644        atom->flags |= ADDED_F_SEED_FLAG;
645    else if( seedProbability != -1 )
646        atom->flags &= ~ADDED_F_SEED_FLAG;
647}
648
649static void
650atomSetSeed( struct peer_atom * atom )
651{
652    atomSetSeedProbability( atom, 100 );
653}
654
655static inline tr_bool
656atomIsSeed( const struct peer_atom * atom )
657{
658    return atom->seedProbability == 100;
659}
660
661tr_bool
662tr_peerMgrPeerIsSeed( const tr_torrent  * tor,
663                      const tr_address  * addr )
664{
665    tr_bool isSeed = FALSE;
666    const Torrent * t = tor->torrentPeers;
667    const struct peer_atom * atom = getExistingAtom( t, addr );
668
669    if( atom )
670        isSeed = atomIsSeed( atom );
671
672    return isSeed;
673}
674
675void
676tr_peerMgrSetUtpSupported( tr_torrent * tor, const tr_address * addr )
677{
678    struct peer_atom * atom = getExistingAtom( tor->torrentPeers, addr );
679
680    if( atom )
681        atom->flags |= ADDED_F_UTP_FLAGS;
682}
683
684void
685tr_peerMgrSetUtpFailed( tr_torrent *tor, const tr_address *addr, tr_bool failed )
686{
687    struct peer_atom * atom = getExistingAtom( tor->torrentPeers, addr );
688
689    if( atom )
690        atom->utp_failed = failed;
691}
692
693
694/**
695***  REQUESTS
696***
697*** There are two data structures associated with managing block requests:
698***
699*** 1. Torrent::requests, an array of "struct block_request" which keeps
700***    track of which blocks have been requested, and when, and by which peers.
701***    This is list is used for (a) cancelling requests that have been pending
702***    for too long and (b) avoiding duplicate requests before endgame.
703***
704*** 2. Torrent::pieces, an array of "struct weighted_piece" which lists the
705***    pieces that we want to request. It's used to decide which blocks to
706***    return next when tr_peerMgrGetBlockRequests() is called.
707**/
708
709/**
710*** struct block_request
711**/
712
713static int
714compareReqByBlock( const void * va, const void * vb )
715{
716    const struct block_request * a = va;
717    const struct block_request * b = vb;
718
719    /* primary key: block */
720    if( a->block < b->block ) return -1;
721    if( a->block > b->block ) return 1;
722
723    /* secondary key: peer */
724    if( a->peer < b->peer ) return -1;
725    if( a->peer > b->peer ) return 1;
726
727    return 0;
728}
729
730static void
731requestListAdd( Torrent * t, tr_block_index_t block, tr_peer * peer )
732{
733    struct block_request key;
734
735    /* ensure enough room is available... */
736    if( t->requestCount + 1 >= t->requestAlloc )
737    {
738        const int CHUNK_SIZE = 128;
739        t->requestAlloc += CHUNK_SIZE;
740        t->requests = tr_renew( struct block_request,
741                                t->requests, t->requestAlloc );
742    }
743
744    /* populate the record we're inserting */
745    key.block = block;
746    key.peer = peer;
747    key.sentAt = tr_time( );
748
749    /* insert the request to our array... */
750    {
751        tr_bool exact;
752        const int pos = tr_lowerBound( &key, t->requests, t->requestCount,
753                                       sizeof( struct block_request ),
754                                       compareReqByBlock, &exact );
755        assert( !exact );
756        memmove( t->requests + pos + 1,
757                 t->requests + pos,
758                 sizeof( struct block_request ) * ( t->requestCount++ - pos ) );
759        t->requests[pos] = key;
760    }
761
762    if( peer != NULL )
763    {
764        ++peer->pendingReqsToPeer;
765        assert( peer->pendingReqsToPeer >= 0 );
766    }
767
768    /*fprintf( stderr, "added request of block %lu from peer %s... "
769                       "there are now %d block\n",
770                       (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
771}
772
773static struct block_request *
774requestListLookup( Torrent * t, tr_block_index_t block, const tr_peer * peer )
775{
776    struct block_request key;
777    key.block = block;
778    key.peer = (tr_peer*) peer;
779
780    return bsearch( &key, t->requests, t->requestCount,
781                    sizeof( struct block_request ),
782                    compareReqByBlock );
783}
784
785/**
786 * Find the peers are we currently requesting the block
787 * with index @a block from and append them to @a peerArr.
788 */
789static void
790getBlockRequestPeers( Torrent * t, tr_block_index_t block,
791                      tr_ptrArray * peerArr )
792{
793    tr_bool exact;
794    int i, pos;
795    struct block_request key;
796
797    key.block = block;
798    key.peer = NULL;
799    pos = tr_lowerBound( &key, t->requests, t->requestCount,
800                         sizeof( struct block_request ),
801                         compareReqByBlock, &exact );
802
803    assert( !exact ); /* shouldn't have a request with .peer == NULL */
804
805    for( i = pos; i < t->requestCount; ++i )
806    {
807        if( t->requests[i].block != block )
808            break;
809        tr_ptrArrayAppend( peerArr, t->requests[i].peer );
810    }
811}
812
813static void
814decrementPendingReqCount( const struct block_request * b )
815{
816    if( b->peer != NULL )
817        if( b->peer->pendingReqsToPeer > 0 )
818            --b->peer->pendingReqsToPeer;
819}
820
821static void
822requestListRemove( Torrent * t, tr_block_index_t block, const tr_peer * peer )
823{
824    const struct block_request * b = requestListLookup( t, block, peer );
825    if( b != NULL )
826    {
827        const int pos = b - t->requests;
828        assert( pos < t->requestCount );
829
830        decrementPendingReqCount( b );
831
832        tr_removeElementFromArray( t->requests,
833                                   pos,
834                                   sizeof( struct block_request ),
835                                   t->requestCount-- );
836
837        /*fprintf( stderr, "removing request of block %lu from peer %s... "
838                           "there are now %d block requests left\n",
839                           (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
840    }
841}
842
843static int
844countActiveWebseeds( const Torrent * t )
845{
846    int activeCount = 0;
847    const tr_webseed ** w = (const tr_webseed **) tr_ptrArrayBase( &t->webseeds );
848    const tr_webseed ** const wend = w + tr_ptrArraySize( &t->webseeds );
849
850    for( ; w!=wend; ++w )
851        if( tr_webseedIsActive( *w ) )
852            ++activeCount;
853
854    return activeCount;
855}
856
857static void
858updateEndgame( Torrent * t )
859{
860    const tr_torrent * tor = t->tor;
861    const tr_block_index_t missing = tr_cpBlocksMissing( &tor->completion );
862
863    assert( t->requestCount >= 0 );
864
865    if( (tr_block_index_t) t->requestCount < missing )
866    {
867        /* not in endgame */
868        t->endgame = 0;
869    }
870    else if( !t->endgame ) /* only recalculate when endgame first begins */
871    {
872        int numDownloading = 0;
873        const tr_peer ** p = (const tr_peer **) tr_ptrArrayBase( &t->peers );
874        const tr_peer ** const pend = p + tr_ptrArraySize( &t->peers );
875
876        /* add the active bittorrent peers... */
877        for( ; p!=pend; ++p )
878            if( (*p)->pendingReqsToPeer > 0 )
879                ++numDownloading;
880
881        /* add the active webseeds... */
882        numDownloading += countActiveWebseeds( t );
883
884        /* average number of pending requests per downloading peer */
885        t->endgame = t->requestCount / MAX( numDownloading, 1 );
886    }
887}
888
889
890/****
891*****
892*****  Piece List Manipulation / Accessors
893*****
894****/
895
896static inline void
897invalidatePieceSorting( Torrent * t )
898{
899    t->pieceSortState = PIECES_UNSORTED;
900}
901
902const tr_torrent * weightTorrent;
903
904const uint16_t * weightReplication;
905
906static void
907setComparePieceByWeightTorrent( Torrent * t )
908{
909    if( !replicationExists( t ) )
910        replicationNew( t );
911
912    weightTorrent = t->tor;
913    weightReplication = t->pieceReplication;
914}
915
916/* we try to create a "weight" s.t. high-priority pieces come before others,
917 * and that partially-complete pieces come before empty ones. */
918static int
919comparePieceByWeight( const void * va, const void * vb )
920{
921    const struct weighted_piece * a = va;
922    const struct weighted_piece * b = vb;
923    int ia, ib, missing, pending;
924    const tr_torrent * tor = weightTorrent;
925    const uint16_t * rep = weightReplication;
926
927    /* primary key: weight */
928    missing = tr_cpMissingBlocksInPiece( &tor->completion, a->index );
929    pending = a->requestCount;
930    ia = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
931    missing = tr_cpMissingBlocksInPiece( &tor->completion, b->index );
932    pending = b->requestCount;
933    ib = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
934    if( ia < ib ) return -1;
935    if( ia > ib ) return 1;
936
937    /* secondary key: higher priorities go first */
938    ia = tor->info.pieces[a->index].priority;
939    ib = tor->info.pieces[b->index].priority;
940    if( ia > ib ) return -1;
941    if( ia < ib ) return 1;
942
943    /* tertiary key: rarest first. */
944    ia = rep[a->index];
945    ib = rep[b->index];
946    if( ia < ib ) return -1;
947    if( ia > ib ) return 1;
948
949    /* quaternary key: random */
950    if( a->salt < b->salt ) return -1;
951    if( a->salt > b->salt ) return 1;
952
953    /* okay, they're equal */
954    return 0;
955}
956
957static int
958comparePieceByIndex( const void * va, const void * vb )
959{
960    const struct weighted_piece * a = va;
961    const struct weighted_piece * b = vb;
962    if( a->index < b->index ) return -1;
963    if( a->index > b->index ) return 1;
964    return 0;
965}
966
967static void
968pieceListSort( Torrent * t, enum piece_sort_state state )
969{
970    assert( state==PIECES_SORTED_BY_INDEX
971         || state==PIECES_SORTED_BY_WEIGHT );
972
973
974    if( state == PIECES_SORTED_BY_WEIGHT )
975    {
976        setComparePieceByWeightTorrent( t );
977        qsort( t->pieces, t->pieceCount, sizeof( struct weighted_piece ), comparePieceByWeight );
978    }
979    else
980        qsort( t->pieces, t->pieceCount, sizeof( struct weighted_piece ), comparePieceByIndex );
981
982    t->pieceSortState = state;
983}
984
985/**
986 * These functions are useful for testing, but too expensive for nightly builds.
987 * let's leave it disabled but add an easy hook to compile it back in
988 */
989#if 1
990#define assertWeightedPiecesAreSorted(t)
991#define assertReplicationCountIsExact(t)
992#else
993static void
994assertWeightedPiecesAreSorted( Torrent * t )
995{
996    if( !t->endgame )
997    {
998        int i;
999        setComparePieceByWeightTorrent( t );
1000        for( i=0; i<t->pieceCount-1; ++i )
1001            assert( comparePieceByWeight( &t->pieces[i], &t->pieces[i+1] ) <= 0 );
1002    }
1003}
1004static void
1005assertReplicationCountIsExact( Torrent * t )
1006{
1007    /* This assert might fail due to errors of implementations in other
1008     * clients. It happens when receiving duplicate bitfields/HaveAll/HaveNone
1009     * from a client. If a such a behavior is noticed,
1010     * a bug report should be filled to the faulty client. */
1011
1012    size_t piece_i;
1013    const uint16_t * rep = t->pieceReplication;
1014    const size_t piece_count = t->pieceReplicationSize;
1015    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1016    const int peer_count = tr_ptrArraySize( &t->peers );
1017
1018    assert( piece_count == t->tor->info.pieceCount );
1019
1020    for( piece_i=0; piece_i<piece_count; ++piece_i )
1021    {
1022        int peer_i;
1023        uint16_t r = 0;
1024
1025        for( peer_i=0; peer_i<peer_count; ++peer_i )
1026            if( tr_bitsetHasFast( &peers[peer_i]->have, piece_i ) )
1027                ++r;
1028
1029        assert( rep[piece_i] == r );
1030    }
1031}
1032#endif
1033
1034static struct weighted_piece *
1035pieceListLookup( Torrent * t, tr_piece_index_t index )
1036{
1037    int i;
1038
1039    for( i=0; i<t->pieceCount; ++i )
1040        if( t->pieces[i].index == index )
1041            return &t->pieces[i];
1042
1043    return NULL;
1044}
1045
1046static void
1047pieceListRebuild( Torrent * t )
1048{
1049
1050    if( !tr_torrentIsSeed( t->tor ) )
1051    {
1052        tr_piece_index_t i;
1053        tr_piece_index_t * pool;
1054        tr_piece_index_t poolCount = 0;
1055        const tr_torrent * tor = t->tor;
1056        const tr_info * inf = tr_torrentInfo( tor );
1057        struct weighted_piece * pieces;
1058        int pieceCount;
1059
1060        /* build the new list */
1061        pool = tr_new( tr_piece_index_t, inf->pieceCount );
1062        for( i=0; i<inf->pieceCount; ++i )
1063            if( !inf->pieces[i].dnd )
1064                if( !tr_cpPieceIsComplete( &tor->completion, i ) )
1065                    pool[poolCount++] = i;
1066        pieceCount = poolCount;
1067        pieces = tr_new0( struct weighted_piece, pieceCount );
1068        for( i=0; i<poolCount; ++i ) {
1069            struct weighted_piece * piece = pieces + i;
1070            piece->index = pool[i];
1071            piece->requestCount = 0;
1072            piece->salt = tr_cryptoWeakRandInt( 4096 );
1073        }
1074
1075        /* if we already had a list of pieces, merge it into
1076         * the new list so we don't lose its requestCounts */
1077        if( t->pieces != NULL )
1078        {
1079            struct weighted_piece * o = t->pieces;
1080            struct weighted_piece * oend = o + t->pieceCount;
1081            struct weighted_piece * n = pieces;
1082            struct weighted_piece * nend = n + pieceCount;
1083
1084            pieceListSort( t, PIECES_SORTED_BY_INDEX );
1085
1086            while( o!=oend && n!=nend ) {
1087                if( o->index < n->index )
1088                    ++o;
1089                else if( o->index > n->index )
1090                    ++n;
1091                else
1092                    *n++ = *o++;
1093            }
1094
1095            tr_free( t->pieces );
1096        }
1097
1098        t->pieces = pieces;
1099        t->pieceCount = pieceCount;
1100
1101        pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
1102
1103        /* cleanup */
1104        tr_free( pool );
1105    }
1106}
1107
1108static void
1109pieceListRemovePiece( Torrent * t, tr_piece_index_t piece )
1110{
1111    struct weighted_piece * p;
1112
1113    if(( p = pieceListLookup( t, piece )))
1114    {
1115        const int pos = p - t->pieces;
1116
1117        tr_removeElementFromArray( t->pieces,
1118                                   pos,
1119                                   sizeof( struct weighted_piece ),
1120                                   t->pieceCount-- );
1121
1122        if( t->pieceCount == 0 )
1123        {
1124            tr_free( t->pieces );
1125            t->pieces = NULL;
1126        }
1127    }
1128}
1129
1130static void
1131pieceListResortPiece( Torrent * t, struct weighted_piece * p )
1132{
1133    int pos;
1134    tr_bool isSorted = TRUE;
1135
1136    if( p == NULL )
1137        return;
1138
1139    /* is the torrent already sorted? */
1140    pos = p - t->pieces;
1141    setComparePieceByWeightTorrent( t );
1142    if( isSorted && ( pos > 0 ) && ( comparePieceByWeight( p-1, p ) > 0 ) )
1143        isSorted = FALSE;
1144    if( isSorted && ( pos < t->pieceCount - 1 ) && ( comparePieceByWeight( p, p+1 ) > 0 ) )
1145        isSorted = FALSE;
1146
1147    if( t->pieceSortState != PIECES_SORTED_BY_WEIGHT )
1148    {
1149       pieceListSort( t, PIECES_SORTED_BY_WEIGHT);
1150       isSorted = TRUE;
1151    }
1152
1153    /* if it's not sorted, move it around */
1154    if( !isSorted )
1155    {
1156        tr_bool exact;
1157        const struct weighted_piece tmp = *p;
1158
1159        tr_removeElementFromArray( t->pieces,
1160                                   pos,
1161                                   sizeof( struct weighted_piece ),
1162                                   t->pieceCount-- );
1163
1164        pos = tr_lowerBound( &tmp, t->pieces, t->pieceCount,
1165                             sizeof( struct weighted_piece ),
1166                             comparePieceByWeight, &exact );
1167
1168        memmove( &t->pieces[pos + 1],
1169                 &t->pieces[pos],
1170                 sizeof( struct weighted_piece ) * ( t->pieceCount++ - pos ) );
1171
1172        t->pieces[pos] = tmp;
1173    }
1174
1175    assertWeightedPiecesAreSorted( t );
1176}
1177
1178static void
1179pieceListRemoveRequest( Torrent * t, tr_block_index_t block )
1180{
1181    struct weighted_piece * p;
1182    const tr_piece_index_t index = tr_torBlockPiece( t->tor, block );
1183
1184    if( ((p = pieceListLookup( t, index ))) && ( p->requestCount > 0 ) )
1185    {
1186        --p->requestCount;
1187        pieceListResortPiece( t, p );
1188    }
1189}
1190
1191
1192/****
1193*****
1194*****  Replication count ( for rarest first policy )
1195*****
1196****/
1197
1198/**
1199 * Increase the replication count of this piece and sort it if the
1200 * piece list is already sorted
1201 */
1202static void
1203tr_incrReplicationOfPiece( Torrent * t, const size_t index )
1204{
1205    assert( replicationExists( t ) );
1206    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1207
1208    /* One more replication of this piece is present in the swarm */
1209    ++t->pieceReplication[index];
1210
1211    /* we only resort the piece if the list is already sorted */
1212    if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1213        pieceListResortPiece( t, pieceListLookup( t, index ) );
1214}
1215
1216/**
1217 * Increases the replication count of pieces present in the bitfield
1218 */
1219static void
1220tr_incrReplicationFromBitfield( Torrent * t, const tr_bitfield * b )
1221{
1222    size_t i;
1223    uint16_t * rep = t->pieceReplication;
1224    const size_t n = t->tor->info.pieceCount;
1225
1226    assert( replicationExists( t ) );
1227    assert( n == t->pieceReplicationSize );
1228    assert( tr_bitfieldTestFast( b, n-1 ) );
1229
1230    for( i=0; i<n; ++i )
1231        if( tr_bitfieldHas( b, i ) )
1232            ++rep[i];
1233
1234    if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1235        invalidatePieceSorting( t );
1236}
1237
1238/**
1239 * Increase the replication count of every piece
1240 */
1241static void
1242tr_incrReplication( Torrent * t )
1243{
1244    int i;
1245    const int n = t->pieceReplicationSize;
1246
1247    assert( replicationExists( t ) );
1248    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1249
1250    for( i=0; i<n; ++i )
1251        ++t->pieceReplication[i];
1252}
1253
1254/**
1255 * Decrease the replication count of pieces present in the bitset.
1256 */
1257static void
1258tr_decrReplicationFromBitset( Torrent * t, const tr_bitset * bitset )
1259{
1260    int i;
1261    const int n = t->pieceReplicationSize;
1262
1263    assert( replicationExists( t ) );
1264    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1265
1266    if( bitset->haveAll )
1267    {
1268        for( i=0; i<n; ++i )
1269            --t->pieceReplication[i];
1270    }
1271    else if ( !bitset->haveNone )
1272    {
1273        const tr_bitfield * const b = &bitset->bitfield;
1274
1275        for( i=0; i<n; ++i )
1276            if( tr_bitfieldHas( b, i ) )
1277                --t->pieceReplication[i];
1278
1279        if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1280            invalidatePieceSorting( t );
1281    }
1282}
1283
1284/**
1285***
1286**/
1287
1288void
1289tr_peerMgrRebuildRequests( tr_torrent * tor )
1290{
1291    assert( tr_isTorrent( tor ) );
1292
1293    pieceListRebuild( tor->torrentPeers );
1294}
1295
1296void
1297tr_peerMgrGetNextRequests( tr_torrent           * tor,
1298                           tr_peer              * peer,
1299                           int                    numwant,
1300                           tr_block_index_t     * setme,
1301                           int                  * numgot )
1302{
1303    int i;
1304    int got;
1305    Torrent * t;
1306    struct weighted_piece * pieces;
1307    const tr_bitset * have = &peer->have;
1308
1309    /* sanity clause */
1310    assert( tr_isTorrent( tor ) );
1311    assert( peer->clientIsInterested );
1312    assert( !peer->clientIsChoked );
1313    assert( numwant > 0 );
1314
1315    /* walk through the pieces and find blocks that should be requested */
1316    got = 0;
1317    t = tor->torrentPeers;
1318
1319    /* prep the pieces list */
1320    if( t->pieces == NULL )
1321        pieceListRebuild( t );
1322
1323    if( t->pieceSortState != PIECES_SORTED_BY_WEIGHT )
1324        pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
1325
1326    assertReplicationCountIsExact( t );
1327    assertWeightedPiecesAreSorted( t );
1328
1329    updateEndgame( t );
1330    pieces = t->pieces;
1331    for( i=0; i<t->pieceCount && got<numwant; ++i )
1332    {
1333        struct weighted_piece * p = pieces + i;
1334
1335        /* if the peer has this piece that we want... */
1336        if( tr_bitsetHasFast( have, p->index ) )
1337        {
1338            tr_block_index_t b = tr_torPieceFirstBlock( tor, p->index );
1339            const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, p->index );
1340            tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1341
1342            for( ; b!=e && got<numwant; ++b )
1343            {
1344                int peerCount;
1345                tr_peer ** peers;
1346
1347                /* don't request blocks we've already got */
1348                if( tr_cpBlockIsCompleteFast( &tor->completion, b ) )
1349                    continue;
1350
1351                /* always add peer if this block has no peers yet */
1352                tr_ptrArrayClear( &peerArr );
1353                getBlockRequestPeers( t, b, &peerArr );
1354                peers = (tr_peer **) tr_ptrArrayPeek( &peerArr, &peerCount );
1355                if( peerCount != 0 )
1356                {
1357                    /* don't make a second block request until the endgame */
1358                    if( !t->endgame )
1359                        continue;
1360
1361                    /* don't have more than two peers requesting this block */
1362                    if( peerCount > 1 )
1363                        continue;
1364
1365                    /* don't send the same request to the same peer twice */
1366                    if( peer == peers[0] )
1367                        continue;
1368
1369                    /* in the endgame allow an additional peer to download a
1370                       block but only if the peer seems to be handling requests
1371                       relatively fast */
1372                    if( peer->pendingReqsToPeer + numwant - got < t->endgame )
1373                        continue;
1374                }
1375
1376                /* update the caller's table */
1377                setme[got++] = b;
1378
1379                /* update our own tables */
1380                requestListAdd( t, b, peer );
1381                ++p->requestCount;
1382            }
1383
1384            tr_ptrArrayDestruct( &peerArr, NULL );
1385        }
1386    }
1387
1388    /* In most cases we've just changed the weights of a small number of pieces.
1389     * So rather than qsort()ing the entire array, it's faster to apply an
1390     * adaptive insertion sort algorithm. */
1391    if( got > 0 )
1392    {
1393        /* not enough requests || last piece modified */
1394        if ( i == t->pieceCount ) --i;
1395
1396        setComparePieceByWeightTorrent( t );
1397        while( --i >= 0 )
1398        {
1399            tr_bool exact;
1400
1401            /* relative position! */
1402            const int newpos = tr_lowerBound( &t->pieces[i], &t->pieces[i + 1],
1403                                              t->pieceCount - (i + 1),
1404                                              sizeof( struct weighted_piece ),
1405                                              comparePieceByWeight, &exact );
1406            if( newpos > 0 )
1407            {
1408                const struct weighted_piece piece = t->pieces[i];
1409                memmove( &t->pieces[i],
1410                         &t->pieces[i + 1],
1411                         sizeof( struct weighted_piece ) * ( newpos ) );
1412                t->pieces[i + newpos] = piece;
1413            }
1414        }
1415    }
1416
1417    assertWeightedPiecesAreSorted( t );
1418    *numgot = got;
1419}
1420
1421tr_bool
1422tr_peerMgrDidPeerRequest( const tr_torrent  * tor,
1423                          const tr_peer     * peer,
1424                          tr_block_index_t    block )
1425{
1426    const Torrent * t = tor->torrentPeers;
1427    return requestListLookup( (Torrent*)t, block, peer ) != NULL;
1428}
1429
1430/* cancel requests that are too old */
1431static void
1432refillUpkeep( int foo UNUSED, short bar UNUSED, void * vmgr )
1433{
1434    time_t now;
1435    time_t too_old;
1436    tr_torrent * tor;
1437    tr_peerMgr * mgr = vmgr;
1438    managerLock( mgr );
1439
1440    now = tr_time( );
1441    too_old = now - REQUEST_TTL_SECS;
1442
1443    tor = NULL;
1444    while(( tor = tr_torrentNext( mgr->session, tor )))
1445    {
1446        Torrent * t = tor->torrentPeers;
1447        const int n = t->requestCount;
1448        if( n > 0 )
1449        {
1450            int keepCount = 0;
1451            int cancelCount = 0;
1452            struct block_request * cancel = tr_new( struct block_request, n );
1453            const struct block_request * it;
1454            const struct block_request * end;
1455
1456            for( it=t->requests, end=it+n; it!=end; ++it )
1457            {
1458                if( ( it->sentAt <= too_old ) && it->peer->msgs && !tr_peerMsgsIsReadingBlock( it->peer->msgs, it->block ) )
1459                    cancel[cancelCount++] = *it;
1460                else
1461                {
1462                    if( it != &t->requests[keepCount] )
1463                        t->requests[keepCount] = *it;
1464                    keepCount++;
1465                }
1466            }
1467
1468            /* prune out the ones we aren't keeping */
1469            t->requestCount = keepCount;
1470
1471            /* send cancel messages for all the "cancel" ones */
1472            for( it=cancel, end=it+cancelCount; it!=end; ++it ) {
1473                if( ( it->peer != NULL ) && ( it->peer->msgs != NULL ) ) {
1474                    tr_historyAdd( it->peer->cancelsSentToPeer, now, 1 );
1475                    tr_peerMsgsCancel( it->peer->msgs, it->block );
1476                    decrementPendingReqCount( it );
1477                }
1478            }
1479
1480            /* decrement the pending request counts for the timed-out blocks */
1481            for( it=cancel, end=it+cancelCount; it!=end; ++it )
1482                pieceListRemoveRequest( t, it->block );
1483
1484            /* cleanup loop */
1485            tr_free( cancel );
1486        }
1487    }
1488
1489    tr_timerAddMsec( mgr->refillUpkeepTimer, REFILL_UPKEEP_PERIOD_MSEC );
1490    managerUnlock( mgr );
1491}
1492
1493static void
1494addStrike( Torrent * t, tr_peer * peer )
1495{
1496    tordbg( t, "increasing peer %s strike count to %d",
1497            tr_atomAddrStr( peer->atom ), peer->strikes + 1 );
1498
1499    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
1500    {
1501        struct peer_atom * atom = peer->atom;
1502        atom->flags2 |= MYFLAG_BANNED;
1503        peer->doPurge = 1;
1504        tordbg( t, "banning peer %s", tr_atomAddrStr( atom ) );
1505    }
1506}
1507
1508static void
1509gotBadPiece( Torrent * t, tr_piece_index_t pieceIndex )
1510{
1511    tr_torrent *   tor = t->tor;
1512    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
1513
1514    tor->corruptCur += byteCount;
1515    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1516
1517    tr_announcerAddBytes( tor, TR_ANN_CORRUPT, byteCount );
1518}
1519
1520static void
1521peerSuggestedPiece( Torrent            * t UNUSED,
1522                    tr_peer            * peer UNUSED,
1523                    tr_piece_index_t     pieceIndex UNUSED,
1524                    int                  isFastAllowed UNUSED )
1525{
1526#if 0
1527    assert( t );
1528    assert( peer );
1529    assert( peer->msgs );
1530
1531    /* is this a valid piece? */
1532    if(  pieceIndex >= t->tor->info.pieceCount )
1533        return;
1534
1535    /* don't ask for it if we've already got it */
1536    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
1537        return;
1538
1539    /* don't ask for it if they don't have it */
1540    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
1541        return;
1542
1543    /* don't ask for it if we're choked and it's not fast */
1544    if( !isFastAllowed && peer->clientIsChoked )
1545        return;
1546
1547    /* request the blocks that we don't have in this piece */
1548    {
1549        tr_block_index_t block;
1550        const tr_torrent * tor = t->tor;
1551        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
1552        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
1553
1554        for( block=start; block<end; ++block )
1555        {
1556            if( !tr_cpBlockIsComplete( tor->completion, block ) )
1557            {
1558                const uint32_t offset = getBlockOffsetInPiece( tor, block );
1559                const uint32_t length = tr_torBlockCountBytes( tor, block );
1560                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
1561                incrementPieceRequests( t, pieceIndex );
1562            }
1563        }
1564    }
1565#endif
1566}
1567
1568static void
1569removeRequestFromTables( Torrent * t, tr_block_index_t block, const tr_peer * peer )
1570{
1571    requestListRemove( t, block, peer );
1572    pieceListRemoveRequest( t, block );
1573}
1574
1575/* peer choked us, or maybe it disconnected.
1576   either way we need to remove all its requests */
1577static void
1578peerDeclinedAllRequests( Torrent * t, const tr_peer * peer )
1579{
1580    int i, n;
1581    tr_block_index_t * blocks = tr_new( tr_block_index_t, t->requestCount );
1582
1583    for( i=n=0; i<t->requestCount; ++i )
1584        if( peer == t->requests[i].peer )
1585            blocks[n++] = t->requests[i].block;
1586
1587    for( i=0; i<n; ++i )
1588        removeRequestFromTables( t, blocks[i], peer );
1589
1590    tr_free( blocks );
1591}
1592
1593static void
1594peerCallbackFunc( tr_peer * peer, const tr_peer_event * e, void * vt )
1595{
1596    Torrent * t = vt;
1597
1598    torrentLock( t );
1599
1600    switch( e->eventType )
1601    {
1602        case TR_PEER_PEER_GOT_DATA:
1603        {
1604            const time_t now = tr_time( );
1605            tr_torrent * tor = t->tor;
1606
1607            if( e->wasPieceData )
1608            {
1609                tor->uploadedCur += e->length;
1610                tr_announcerAddBytes( tor, TR_ANN_UP, e->length );
1611                tr_torrentSetActivityDate( tor, now );
1612                tr_torrentSetDirty( tor );
1613            }
1614
1615            /* update the stats */
1616            if( e->wasPieceData )
1617                tr_statsAddUploaded( tor->session, e->length );
1618
1619            /* update our atom */
1620            if( peer && e->wasPieceData )
1621                peer->atom->piece_data_time = now;
1622
1623            break;
1624        }
1625
1626        case TR_PEER_CLIENT_GOT_HAVE:
1627            if( replicationExists( t ) ) {
1628                tr_incrReplicationOfPiece( t, e->pieceIndex );
1629                assertReplicationCountIsExact( t );
1630            }
1631            break;
1632
1633        case TR_PEER_CLIENT_GOT_HAVE_ALL:
1634            if( replicationExists( t ) ) {
1635                tr_incrReplication( t );
1636                assertReplicationCountIsExact( t );
1637            }
1638            break;
1639
1640        case TR_PEER_CLIENT_GOT_HAVE_NONE:
1641            /* noop */
1642            break;
1643
1644        case TR_PEER_CLIENT_GOT_BITFIELD:
1645            assert( e->bitfield != NULL );
1646            if( replicationExists( t ) ) {
1647                tr_incrReplicationFromBitfield( t, e->bitfield );
1648                assertReplicationCountIsExact( t );
1649            }
1650            break;
1651
1652        case TR_PEER_CLIENT_GOT_REJ:
1653            removeRequestFromTables( t, _tr_block( t->tor, e->pieceIndex, e->offset ), peer );
1654            break;
1655
1656        case TR_PEER_CLIENT_GOT_CHOKE:
1657            peerDeclinedAllRequests( t, peer );
1658            break;
1659
1660        case TR_PEER_CLIENT_GOT_PORT:
1661            if( peer )
1662                peer->atom->port = e->port;
1663            break;
1664
1665        case TR_PEER_CLIENT_GOT_SUGGEST:
1666            if( peer )
1667                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1668            break;
1669
1670        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1671            if( peer )
1672                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1673            break;
1674
1675        case TR_PEER_CLIENT_GOT_DATA:
1676        {
1677            const time_t now = tr_time( );
1678            tr_torrent * tor = t->tor;
1679
1680            if( e->wasPieceData )
1681            {
1682                tor->downloadedCur += e->length;
1683                tr_torrentSetActivityDate( tor, now );
1684                tr_torrentSetDirty( tor );
1685            }
1686
1687            /* update the stats */
1688            if( e->wasPieceData )
1689                tr_statsAddDownloaded( tor->session, e->length );
1690
1691            /* update our atom */
1692            if( peer && peer->atom && e->wasPieceData )
1693                peer->atom->piece_data_time = now;
1694
1695            break;
1696        }
1697
1698        case TR_PEER_PEER_PROGRESS:
1699        {
1700            if( peer )
1701            {
1702                struct peer_atom * atom = peer->atom;
1703                if( e->progress >= 1.0 ) {
1704                    tordbg( t, "marking peer %s as a seed", tr_atomAddrStr( atom ) );
1705                    atomSetSeed( atom );
1706                }
1707            }
1708            break;
1709        }
1710
1711        case TR_PEER_CLIENT_GOT_BLOCK:
1712        {
1713            tr_torrent * tor = t->tor;
1714            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1715            int i, peerCount;
1716            tr_peer ** peers;
1717            tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1718
1719            removeRequestFromTables( t, block, peer );
1720            getBlockRequestPeers( t, block, &peerArr );
1721            peers = (tr_peer **) tr_ptrArrayPeek( &peerArr, &peerCount );
1722
1723            /* remove additional block requests and send cancel to peers */
1724            for( i=0; i<peerCount; i++ ) {
1725                tr_peer * p = peers[i];
1726                assert( p != peer );
1727                if( p->msgs ) {
1728                    tr_historyAdd( p->cancelsSentToPeer, tr_time( ), 1 );
1729                    tr_peerMsgsCancel( p->msgs, block );
1730                }
1731                removeRequestFromTables( t, block, p );
1732            }
1733
1734            tr_ptrArrayDestruct( &peerArr, FALSE );
1735
1736            if( peer && peer->blocksSentToClient )
1737                tr_historyAdd( peer->blocksSentToClient, tr_time( ), 1 );
1738
1739            if( tr_cpBlockIsComplete( &tor->completion, block ) )
1740            {
1741                /* we already have this block... */
1742                const uint32_t n = tr_torBlockCountBytes( tor, block );
1743                tor->downloadedCur -= MIN( tor->downloadedCur, n );
1744                tordbg( t, "we have this block already..." );
1745            }
1746            else
1747            {
1748                tr_cpBlockAdd( &tor->completion, block );
1749                pieceListResortPiece( t, pieceListLookup( t, e->pieceIndex ) );
1750                tr_torrentSetDirty( tor );
1751
1752                if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1753                {
1754                    const tr_piece_index_t p = e->pieceIndex;
1755                    const tr_bool ok = tr_torrentCheckPiece( tor, p );
1756
1757                    tordbg( t, "[LAZY] checked just-completed piece %zu", (size_t)p );
1758
1759                    if( !ok )
1760                    {
1761                        tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1762                                   (unsigned long)p );
1763                    }
1764
1765                    tr_peerMgrSetBlame( tor, p, ok );
1766
1767                    if( !ok )
1768                    {
1769                        gotBadPiece( t, p );
1770                    }
1771                    else
1772                    {
1773                        int i;
1774                        int peerCount;
1775                        tr_peer ** peers;
1776                        tr_file_index_t fileIndex;
1777
1778                        /* only add this to downloadedCur if we got it from a peer --
1779                         * webseeds shouldn't count against our ratio. As one tracker
1780                         * admin put it, "Those pieces are downloaded directly from the
1781                         * content distributor, not the peers, it is the tracker's job
1782                         * to manage the swarms, not the web server and does not fit
1783                         * into the jurisdiction of the tracker." */
1784                        if( peer->msgs != NULL ) {
1785                            const uint32_t n = tr_torPieceCountBytes( tor, p );
1786                            tr_announcerAddBytes( tor, TR_ANN_DOWN, n );
1787                        }
1788
1789                        peerCount = tr_ptrArraySize( &t->peers );
1790                        peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1791                        for( i=0; i<peerCount; ++i )
1792                            tr_peerMsgsHave( peers[i]->msgs, p );
1793
1794                        for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex ) {
1795                            const tr_file * file = &tor->info.files[fileIndex];
1796                            if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) ) {
1797                                if( tr_cpFileIsComplete( &tor->completion, fileIndex ) ) {
1798                                    tr_cacheFlushFile( tor->session->cache, tor, fileIndex );
1799                                    tr_torrentFileCompleted( tor, fileIndex );
1800                                }
1801                            }
1802                        }
1803
1804                        pieceListRemovePiece( t, p );
1805                    }
1806                }
1807
1808                t->needsCompletenessCheck = TRUE;
1809            }
1810            break;
1811        }
1812
1813        case TR_PEER_ERROR:
1814            if( ( e->err == ERANGE ) || ( e->err == EMSGSIZE ) || ( e->err == ENOTCONN ) )
1815            {
1816                /* some protocol error from the peer */
1817                peer->doPurge = 1;
1818                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1819                        tr_atomAddrStr( peer->atom ) );
1820            }
1821            else
1822            {
1823                tordbg( t, "unhandled error: %s", tr_strerror( e->err ) );
1824            }
1825            break;
1826
1827        default:
1828            assert( 0 );
1829    }
1830
1831    torrentUnlock( t );
1832}
1833
1834static int
1835getDefaultShelfLife( uint8_t from )
1836{
1837    /* in general, peers obtained from firsthand contact
1838     * are better than those from secondhand, etc etc */
1839    switch( from )
1840    {
1841        case TR_PEER_FROM_INCOMING : return 60 * 60 * 6;
1842        case TR_PEER_FROM_LTEP     : return 60 * 60 * 6;
1843        case TR_PEER_FROM_TRACKER  : return 60 * 60 * 3;
1844        case TR_PEER_FROM_DHT      : return 60 * 60 * 3;
1845        case TR_PEER_FROM_PEX      : return 60 * 60 * 2;
1846        case TR_PEER_FROM_RESUME   : return 60 * 60;
1847        case TR_PEER_FROM_LPD      : return 10 * 60;
1848        default                    : return 60 * 60;
1849    }
1850}
1851
1852static void
1853ensureAtomExists( Torrent           * t,
1854                  const tr_address  * addr,
1855                  const tr_port       port,
1856                  const uint8_t       flags,
1857                  const int8_t        seedProbability,
1858                  const uint8_t       from )
1859{
1860    struct peer_atom * a;
1861
1862    assert( tr_isAddress( addr ) );
1863    assert( from < TR_PEER_FROM__MAX );
1864
1865    a = getExistingAtom( t, addr );
1866
1867    if( a == NULL )
1868    {
1869        const int jitter = tr_cryptoWeakRandInt( 60*10 );
1870        a = tr_new0( struct peer_atom, 1 );
1871        a->addr = *addr;
1872        a->port = port;
1873        a->flags = flags;
1874        a->fromFirst = from;
1875        a->fromBest = from;
1876        a->shelf_date = tr_time( ) + getDefaultShelfLife( from ) + jitter;
1877        a->blocklisted = -1;
1878        atomSetSeedProbability( a, seedProbability );
1879        tr_ptrArrayInsertSorted( &t->pool, a, compareAtomsByAddress );
1880
1881        tordbg( t, "got a new atom: %s", tr_atomAddrStr( a ) );
1882    }
1883    else
1884    {
1885        if( from < a->fromBest )
1886            a->fromBest = from;
1887       
1888        if( a->seedProbability == -1 )
1889            atomSetSeedProbability( a, seedProbability );
1890
1891        a->flags |= flags;
1892    }
1893}
1894
1895static int
1896getMaxPeerCount( const tr_torrent * tor )
1897{
1898    return tor->maxConnectedPeers;
1899}
1900
1901static int
1902getPeerCount( const Torrent * t )
1903{
1904    return tr_ptrArraySize( &t->peers );/* + tr_ptrArraySize( &t->outgoingHandshakes ); */
1905}
1906
1907/* FIXME: this is kind of a mess. */
1908static tr_bool
1909myHandshakeDoneCB( tr_handshake  * handshake,
1910                   tr_peerIo     * io,
1911                   tr_bool         readAnythingFromPeer,
1912                   tr_bool         isConnected,
1913                   const uint8_t * peer_id,
1914                   void          * vmanager )
1915{
1916    tr_bool            ok = isConnected;
1917    tr_bool            success = FALSE;
1918    tr_port            port;
1919    const tr_address * addr;
1920    tr_peerMgr       * manager = vmanager;
1921    Torrent          * t;
1922    tr_handshake     * ours;
1923
1924    assert( io );
1925    assert( tr_isBool( ok ) );
1926
1927    t = tr_peerIoHasTorrentHash( io )
1928        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1929        : NULL;
1930
1931    if( tr_peerIoIsIncoming ( io ) )
1932        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1933                                        handshake, handshakeCompare );
1934    else if( t )
1935        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1936                                        handshake, handshakeCompare );
1937    else
1938        ours = handshake;
1939
1940    assert( ours );
1941    assert( ours == handshake );
1942
1943    if( t )
1944        torrentLock( t );
1945
1946    addr = tr_peerIoGetAddress( io, &port );
1947
1948    if( !ok || !t || !t->isRunning )
1949    {
1950        if( t )
1951        {
1952            struct peer_atom * atom = getExistingAtom( t, addr );
1953            if( atom )
1954            {
1955                ++atom->numFails;
1956
1957                if( !readAnythingFromPeer )
1958                {
1959                    tordbg( t, "marking peer %s as unreachable... numFails is %d", tr_atomAddrStr( atom ), (int)atom->numFails );
1960                    atom->flags2 |= MYFLAG_UNREACHABLE;
1961                }
1962            }
1963        }
1964    }
1965    else /* looking good */
1966    {
1967        struct peer_atom * atom;
1968
1969        ensureAtomExists( t, addr, port, 0, -1, TR_PEER_FROM_INCOMING );
1970        atom = getExistingAtom( t, addr );
1971        atom->time = tr_time( );
1972        atom->piece_data_time = 0;
1973        atom->lastConnectionAt = tr_time( );
1974
1975        if( !tr_peerIoIsIncoming( io ) )
1976        {
1977            atom->flags |= ADDED_F_CONNECTABLE;
1978            atom->flags2 &= ~MYFLAG_UNREACHABLE;
1979        }
1980
1981        /* In principle, this flag specifies whether the peer groks uTP,
1982           not whether it's currently connected over uTP. */
1983        if( io->utp_socket )
1984            atom->flags |= ADDED_F_UTP_FLAGS;
1985
1986        if( atom->flags2 & MYFLAG_BANNED )
1987        {
1988            tordbg( t, "banned peer %s tried to reconnect",
1989                    tr_atomAddrStr( atom ) );
1990        }
1991        else if( tr_peerIoIsIncoming( io )
1992               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1993
1994        {
1995        }
1996        else
1997        {
1998            tr_peer * peer = atom->peer;
1999
2000            if( peer ) /* we already have this peer */
2001            {
2002            }
2003            else
2004            {
2005                peer = getPeer( t, atom );
2006                tr_free( peer->client );
2007
2008                if( !peer_id )
2009                    peer->client = NULL;
2010                else {
2011                    char client[128];
2012                    tr_clientForId( client, sizeof( client ), peer_id );
2013                    peer->client = tr_strdup( client );
2014                }
2015
2016                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
2017                                                                balanced by our unref in peerDestructor()  */
2018                tr_peerIoSetParent( peer->io, t->tor->bandwidth );
2019                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t );
2020
2021                success = TRUE;
2022            }
2023        }
2024    }
2025
2026    if( t )
2027        torrentUnlock( t );
2028
2029    return success;
2030}
2031
2032void
2033tr_peerMgrAddIncoming( tr_peerMgr * manager,
2034                       tr_address * addr,
2035                       tr_port      port,
2036                       int          socket,
2037                       struct UTPSocket * utp_socket )
2038{
2039    tr_session * session;
2040
2041    managerLock( manager );
2042
2043    assert( tr_isSession( manager->session ) );
2044    session = manager->session;
2045
2046    if( tr_sessionIsAddressBlocked( session, addr ) )
2047    {
2048        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
2049        if(socket >= 0)
2050            tr_netClose( session, socket );
2051        else
2052            UTP_Close( utp_socket );
2053    }
2054    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
2055    {
2056        if(socket >= 0)
2057            tr_netClose( session, socket );
2058        else
2059            UTP_Close( utp_socket );
2060    }
2061    else /* we don't have a connection to them yet... */
2062    {
2063        tr_peerIo *    io;
2064        tr_handshake * handshake;
2065
2066        io = tr_peerIoNewIncoming( session, session->bandwidth, addr, port, socket, utp_socket );
2067
2068        handshake = tr_handshakeNew( io,
2069                                     session->encryptionMode,
2070                                     myHandshakeDoneCB,
2071                                     manager );
2072
2073        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
2074
2075        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
2076                                 handshakeCompare );
2077    }
2078
2079    managerUnlock( manager );
2080}
2081
2082void
2083tr_peerMgrAddPex( tr_torrent * tor, uint8_t from,
2084                  const tr_pex * pex, int8_t seedProbability )
2085{
2086    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
2087    {
2088        Torrent * t = tor->torrentPeers;
2089        managerLock( t->manager );
2090
2091        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
2092            if( tr_isValidPeerAddress( &pex->addr, pex->port ) )
2093                ensureAtomExists( t, &pex->addr, pex->port, pex->flags, seedProbability, from );
2094
2095        managerUnlock( t->manager );
2096    }
2097}
2098
2099void
2100tr_peerMgrMarkAllAsSeeds( tr_torrent * tor )
2101{
2102    Torrent * t = tor->torrentPeers;
2103    const int n = tr_ptrArraySize( &t->pool );
2104    struct peer_atom ** it = (struct peer_atom**) tr_ptrArrayBase( &t->pool );
2105    struct peer_atom ** end = it + n;
2106
2107    while( it != end )
2108        atomSetSeed( *it++ );
2109}
2110
2111tr_pex *
2112tr_peerMgrCompactToPex( const void *    compact,
2113                        size_t          compactLen,
2114                        const uint8_t * added_f,
2115                        size_t          added_f_len,
2116                        size_t *        pexCount )
2117{
2118    size_t          i;
2119    size_t          n = compactLen / 6;
2120    const uint8_t * walk = compact;
2121    tr_pex *        pex = tr_new0( tr_pex, n );
2122
2123    for( i = 0; i < n; ++i )
2124    {
2125        pex[i].addr.type = TR_AF_INET;
2126        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
2127        memcpy( &pex[i].port, walk, 2 ); walk += 2;
2128        if( added_f && ( n == added_f_len ) )
2129            pex[i].flags = added_f[i];
2130    }
2131
2132    *pexCount = n;
2133    return pex;
2134}
2135
2136tr_pex *
2137tr_peerMgrCompact6ToPex( const void    * compact,
2138                         size_t          compactLen,
2139                         const uint8_t * added_f,
2140                         size_t          added_f_len,
2141                         size_t        * pexCount )
2142{
2143    size_t          i;
2144    size_t          n = compactLen / 18;
2145    const uint8_t * walk = compact;
2146    tr_pex *        pex = tr_new0( tr_pex, n );
2147
2148    for( i = 0; i < n; ++i )
2149    {
2150        pex[i].addr.type = TR_AF_INET6;
2151        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
2152        memcpy( &pex[i].port, walk, 2 ); walk += 2;
2153        if( added_f && ( n == added_f_len ) )
2154            pex[i].flags = added_f[i];
2155    }
2156
2157    *pexCount = n;
2158    return pex;
2159}
2160
2161tr_pex *
2162tr_peerMgrArrayToPex( const void * array,
2163                      size_t       arrayLen,
2164                      size_t      * pexCount )
2165{
2166    size_t          i;
2167    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
2168    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
2169    const uint8_t * walk = array;
2170    tr_pex        * pex = tr_new0( tr_pex, n );
2171
2172    for( i = 0 ; i < n ; i++ ) {
2173        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
2174        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
2175        pex[i].flags = 0x00;
2176        walk += sizeof( tr_address ) + 2;
2177    }
2178
2179    *pexCount = n;
2180    return pex;
2181}
2182
2183/**
2184***
2185**/
2186
2187void
2188tr_peerMgrSetBlame( tr_torrent     * tor,
2189                    tr_piece_index_t pieceIndex,
2190                    int              success )
2191{
2192    if( !success )
2193    {
2194        int        peerCount, i;
2195        Torrent *  t = tor->torrentPeers;
2196        tr_peer ** peers;
2197
2198        assert( torrentIsLocked( t ) );
2199
2200        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
2201        for( i = 0; i < peerCount; ++i )
2202        {
2203            tr_peer * peer = peers[i];
2204            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
2205            {
2206                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
2207                        tr_atomAddrStr( peer->atom ),
2208                        pieceIndex, (int)peer->strikes + 1 );
2209                addStrike( t, peer );
2210            }
2211        }
2212    }
2213}
2214
2215int
2216tr_pexCompare( const void * va, const void * vb )
2217{
2218    const tr_pex * a = va;
2219    const tr_pex * b = vb;
2220    int i;
2221
2222    assert( tr_isPex( a ) );
2223    assert( tr_isPex( b ) );
2224
2225    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
2226        return i;
2227
2228    if( a->port != b->port )
2229        return a->port < b->port ? -1 : 1;
2230
2231    return 0;
2232}
2233
2234#if 0
2235static int
2236peerPrefersCrypto( const tr_peer * peer )
2237{
2238    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
2239        return TRUE;
2240
2241    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
2242        return FALSE;
2243
2244    return tr_peerIoIsEncrypted( peer->io );
2245}
2246#endif
2247
2248/* better goes first */
2249static int
2250compareAtomsByUsefulness( const void * va, const void *vb )
2251{
2252    const struct peer_atom * a = * (const struct peer_atom**) va;
2253    const struct peer_atom * b = * (const struct peer_atom**) vb;
2254
2255    assert( tr_isAtom( a ) );
2256    assert( tr_isAtom( b ) );
2257
2258    if( a->piece_data_time != b->piece_data_time )
2259        return a->piece_data_time > b->piece_data_time ? -1 : 1;
2260    if( a->fromBest != b->fromBest )
2261        return a->fromBest < b->fromBest ? -1 : 1;
2262    if( a->numFails != b->numFails )
2263        return a->numFails < b->numFails ? -1 : 1;
2264
2265    return 0;
2266}
2267
2268int
2269tr_peerMgrGetPeers( tr_torrent   * tor,
2270                    tr_pex      ** setme_pex,
2271                    uint8_t        af,
2272                    uint8_t        list_mode,
2273                    int            maxCount )
2274{
2275    int i;
2276    int n;
2277    int count = 0;
2278    int atomCount = 0;
2279    const Torrent * t = tor->torrentPeers;
2280    struct peer_atom ** atoms = NULL;
2281    tr_pex * pex;
2282    tr_pex * walk;
2283
2284    assert( tr_isTorrent( tor ) );
2285    assert( setme_pex != NULL );
2286    assert( af==TR_AF_INET || af==TR_AF_INET6 );
2287    assert( list_mode==TR_PEERS_CONNECTED || list_mode==TR_PEERS_ALL );
2288
2289    managerLock( t->manager );
2290
2291    /**
2292    ***  build a list of atoms
2293    **/
2294
2295    if( list_mode == TR_PEERS_CONNECTED ) /* connected peers only */
2296    {
2297        int i;
2298        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2299        atomCount = tr_ptrArraySize( &t->peers );
2300        atoms = tr_new( struct peer_atom *, atomCount );
2301        for( i=0; i<atomCount; ++i )
2302            atoms[i] = peers[i]->atom;
2303    }
2304    else /* TR_PEERS_ALL */
2305    {
2306        const struct peer_atom ** atomsBase = (const struct peer_atom**) tr_ptrArrayBase( &t->pool );
2307        atomCount = tr_ptrArraySize( &t->pool );
2308        atoms = tr_memdup( atomsBase, atomCount * sizeof( struct peer_atom * ) );
2309    }
2310
2311    qsort( atoms, atomCount, sizeof( struct peer_atom * ), compareAtomsByUsefulness );
2312
2313    /**
2314    ***  add the first N of them into our return list
2315    **/
2316
2317    n = MIN( atomCount, maxCount );
2318    pex = walk = tr_new0( tr_pex, n );
2319
2320    for( i=0; i<atomCount && count<n; ++i )
2321    {
2322        const struct peer_atom * atom = atoms[i];
2323        if( atom->addr.type == af )
2324        {
2325            assert( tr_isAddress( &atom->addr ) );
2326            walk->addr = atom->addr;
2327            walk->port = atom->port;
2328            walk->flags = atom->flags;
2329            ++count;
2330            ++walk;
2331        }
2332    }
2333
2334    qsort( pex, count, sizeof( tr_pex ), tr_pexCompare );
2335
2336    assert( ( walk - pex ) == count );
2337    *setme_pex = pex;
2338
2339    /* cleanup */
2340    tr_free( atoms );
2341    managerUnlock( t->manager );
2342    return count;
2343}
2344
2345static void atomPulse      ( int, short, void * );
2346static void bandwidthPulse ( int, short, void * );
2347static void rechokePulse   ( int, short, void * );
2348static void reconnectPulse ( int, short, void * );
2349
2350static struct event *
2351createTimer( tr_session * session, int msec, void (*callback)(int, short, void *), void * cbdata )
2352{
2353    struct event * timer = evtimer_new( session->event_base, callback, cbdata );
2354    tr_timerAddMsec( timer, msec );
2355    return timer;
2356}
2357
2358static void
2359ensureMgrTimersExist( struct tr_peerMgr * m )
2360{
2361    if( m->atomTimer == NULL )
2362        m->atomTimer = createTimer( m->session, ATOM_PERIOD_MSEC, atomPulse, m );
2363
2364    if( m->bandwidthTimer == NULL )
2365        m->bandwidthTimer = createTimer( m->session, BANDWIDTH_PERIOD_MSEC, bandwidthPulse, m );
2366
2367    if( m->rechokeTimer == NULL )
2368        m->rechokeTimer = createTimer( m->session, RECHOKE_PERIOD_MSEC, rechokePulse, m );
2369
2370    if( m->refillUpkeepTimer == NULL )
2371        m->refillUpkeepTimer = createTimer( m->session, REFILL_UPKEEP_PERIOD_MSEC, refillUpkeep, m );
2372}
2373
2374void
2375tr_peerMgrStartTorrent( tr_torrent * tor )
2376{
2377    Torrent * t = tor->torrentPeers;
2378
2379    assert( tr_isTorrent( tor ) );
2380    assert( tr_torrentIsLocked( tor ) );
2381
2382    ensureMgrTimersExist( t->manager );
2383
2384    t->isRunning = TRUE;
2385    t->maxPeers = t->tor->maxConnectedPeers;
2386    t->pieceSortState = PIECES_UNSORTED;
2387
2388    rechokePulse( 0, 0, t->manager );
2389}
2390
2391static void
2392stopTorrent( Torrent * t )
2393{
2394    int i, n;
2395
2396    t->isRunning = FALSE;
2397
2398    replicationFree( t );
2399    invalidatePieceSorting( t );
2400
2401    /* disconnect the peers. */
2402    for( i=0, n=tr_ptrArraySize( &t->peers ); i<n; ++i )
2403        peerDestructor( t, tr_ptrArrayNth( &t->peers, i ) );
2404    tr_ptrArrayClear( &t->peers );
2405
2406    /* disconnect the handshakes. handshakeAbort calls handshakeDoneCB(),
2407     * which removes the handshake from t->outgoingHandshakes... */
2408    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
2409        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
2410}
2411
2412void
2413tr_peerMgrStopTorrent( tr_torrent * tor )
2414{
2415    assert( tr_isTorrent( tor ) );
2416    assert( tr_torrentIsLocked( tor ) );
2417
2418    stopTorrent( tor->torrentPeers );
2419}
2420
2421void
2422tr_peerMgrAddTorrent( tr_peerMgr * manager, tr_torrent * tor )
2423{
2424    assert( tr_isTorrent( tor ) );
2425    assert( tr_torrentIsLocked( tor ) );
2426    assert( tor->torrentPeers == NULL );
2427
2428    tor->torrentPeers = torrentConstructor( manager, tor );
2429}
2430
2431void
2432tr_peerMgrRemoveTorrent( tr_torrent * tor )
2433{
2434    assert( tr_isTorrent( tor ) );
2435    assert( tr_torrentIsLocked( tor ) );
2436
2437    stopTorrent( tor->torrentPeers );
2438    torrentDestructor( tor->torrentPeers );
2439}
2440
2441void
2442tr_peerMgrTorrentAvailability( const tr_torrent * tor, int8_t * tab, unsigned int tabCount )
2443{
2444    assert( tr_isTorrent( tor ) );
2445    assert( torrentIsLocked( tor->torrentPeers ) );
2446    assert( tab != NULL );
2447    assert( tabCount > 0 );
2448
2449    memset( tab, 0, tabCount );
2450
2451    if( tr_torrentHasMetadata( tor ) )
2452    {
2453        tr_piece_index_t i;
2454        const int peerCount = tr_ptrArraySize( &tor->torrentPeers->peers );
2455        const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &tor->torrentPeers->peers );
2456        const float interval = tor->info.pieceCount / (float)tabCount;
2457        const tr_bool isSeed = tr_cpGetStatus( &tor->completion ) == TR_SEED;
2458
2459        for( i=0; i<tabCount; ++i )
2460        {
2461            const int piece = i * interval;
2462
2463            if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
2464                tab[i] = -1;
2465            else if( peerCount ) {
2466                int j;
2467                for( j=0; j<peerCount; ++j )
2468                    if( tr_bitsetHas( &peers[j]->have, piece ) )
2469                        ++tab[i];
2470            }
2471        }
2472    }
2473}
2474
2475/* Returns the pieces that are available from peers */
2476tr_bitfield*
2477tr_peerMgrGetAvailable( const tr_torrent * tor )
2478{
2479    int i;
2480    Torrent * t = tor->torrentPeers;
2481    const int peerCount = tr_ptrArraySize( &t->peers );
2482    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2483    tr_bitfield * pieces = tr_bitfieldNew( t->tor->info.pieceCount );
2484
2485    assert( tr_torrentIsLocked( tor ) );
2486
2487    for( i=0; i<peerCount; ++i )
2488        tr_bitsetOr( pieces, &peers[i]->have );
2489
2490    return pieces;
2491}
2492
2493void
2494tr_peerMgrTorrentStats( tr_torrent  * tor,
2495                        int         * setmePeersKnown,
2496                        int         * setmePeersConnected,
2497                        int         * setmeSeedsConnected,
2498                        int         * setmeWebseedsSendingToUs,
2499                        int         * setmePeersSendingToUs,
2500                        int         * setmePeersGettingFromUs,
2501                        int         * setmePeersFrom )
2502{
2503    int i, size;
2504    const Torrent * t = tor->torrentPeers;
2505    const tr_peer ** peers;
2506
2507    assert( tr_torrentIsLocked( tor ) );
2508
2509    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2510    size = tr_ptrArraySize( &t->peers );
2511
2512    *setmePeersKnown           = tr_ptrArraySize( &t->pool );
2513    *setmePeersConnected       = 0;
2514    *setmeSeedsConnected       = 0;
2515    *setmePeersGettingFromUs   = 0;
2516    *setmePeersSendingToUs     = 0;
2517    *setmeWebseedsSendingToUs  = 0;
2518
2519    for( i=0; i<TR_PEER_FROM__MAX; ++i )
2520        setmePeersFrom[i] = 0;
2521
2522    for( i=0; i<size; ++i )
2523    {
2524        const tr_peer * peer = peers[i];
2525        const struct peer_atom * atom = peer->atom;
2526
2527        if( peer->io == NULL ) /* not connected */
2528            continue;
2529
2530        ++*setmePeersConnected;
2531
2532        ++setmePeersFrom[atom->fromFirst];
2533
2534        if( clientIsDownloadingFrom( tor, peer ) )
2535            ++*setmePeersSendingToUs;
2536
2537        if( clientIsUploadingTo( peer ) )
2538            ++*setmePeersGettingFromUs;
2539
2540        if( atomIsSeed( atom ) )
2541            ++*setmeSeedsConnected;
2542    }
2543
2544    *setmeWebseedsSendingToUs = countActiveWebseeds( t );
2545}
2546
2547int
2548tr_peerMgrGetWebseedSpeed_Bps( const tr_torrent * tor, uint64_t now )
2549{
2550    int i;
2551    int tmp;
2552    int ret = 0;
2553
2554    const Torrent * t = tor->torrentPeers;
2555    const int n = tr_ptrArraySize( &t->webseeds );
2556    const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2557
2558    for( i=0; i<n; ++i )
2559        if( tr_webseedGetSpeed_Bps( webseeds[i], now, &tmp ) )
2560            ret += tmp;
2561
2562    return ret;
2563}
2564
2565
2566double*
2567tr_peerMgrWebSpeeds_KBps( const tr_torrent * tor )
2568{
2569    int i;
2570    const Torrent * t = tor->torrentPeers;
2571    const int webseedCount = tr_ptrArraySize( &t->webseeds );
2572    const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2573    const uint64_t now = tr_time_msec( );
2574    double * ret = tr_new0( double, webseedCount );
2575
2576    assert( tr_isTorrent( tor ) );
2577    assert( tr_torrentIsLocked( tor ) );
2578    assert( t->manager != NULL );
2579    assert( webseedCount == tor->info.webseedCount );
2580
2581    for( i=0; i<webseedCount; ++i ) {
2582        int Bps;
2583        if( tr_webseedGetSpeed_Bps( webseeds[i], now, &Bps ) )
2584            ret[i] = Bps / (double)tr_speed_K;
2585        else
2586            ret[i] = -1.0;
2587    }
2588
2589    return ret;
2590}
2591
2592int
2593tr_peerGetPieceSpeed_Bps( const tr_peer * peer, uint64_t now, tr_direction direction )
2594{
2595    return peer->io ? tr_peerIoGetPieceSpeed_Bps( peer->io, now, direction ) : 0.0;
2596}
2597
2598
2599struct tr_peer_stat *
2600tr_peerMgrPeerStats( const tr_torrent * tor, int * setmeCount )
2601{
2602    int i;
2603    const Torrent * t = tor->torrentPeers;
2604    const int size = tr_ptrArraySize( &t->peers );
2605    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2606    const uint64_t now_msec = tr_time_msec( );
2607    const time_t now = tr_time();
2608    tr_peer_stat * ret = tr_new0( tr_peer_stat, size );
2609
2610    assert( tr_isTorrent( tor ) );
2611    assert( tr_torrentIsLocked( tor ) );
2612    assert( t->manager );
2613
2614    for( i=0; i<size; ++i )
2615    {
2616        char *                   pch;
2617        const tr_peer *          peer = peers[i];
2618        const struct peer_atom * atom = peer->atom;
2619        tr_peer_stat *           stat = ret + i;
2620
2621        tr_ntop( &atom->addr, stat->addr, sizeof( stat->addr ) );
2622        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
2623                   sizeof( stat->client ) );
2624        stat->port                = ntohs( peer->atom->port );
2625        stat->from                = atom->fromFirst;
2626        stat->progress            = peer->progress;
2627        stat->isEncrypted         = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
2628        stat->rateToPeer_KBps     = toSpeedKBps( tr_peerGetPieceSpeed_Bps( peer, now_msec, TR_CLIENT_TO_PEER ) );
2629        stat->rateToClient_KBps   = toSpeedKBps( tr_peerGetPieceSpeed_Bps( peer, now_msec, TR_PEER_TO_CLIENT ) );
2630        stat->peerIsChoked        = peer->peerIsChoked;
2631        stat->peerIsInterested    = peer->peerIsInterested;
2632        stat->clientIsChoked      = peer->clientIsChoked;
2633        stat->clientIsInterested  = peer->clientIsInterested;
2634        stat->isIncoming          = tr_peerIoIsIncoming( peer->io );
2635        stat->isDownloadingFrom   = clientIsDownloadingFrom( tor, peer );
2636        stat->isUploadingTo       = clientIsUploadingTo( peer );
2637        stat->isSeed              = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
2638
2639        stat->blocksToPeer        = tr_historyGet( peer->blocksSentToPeer,    now, CANCEL_HISTORY_SEC );
2640        stat->blocksToClient      = tr_historyGet( peer->blocksSentToClient,  now, CANCEL_HISTORY_SEC );
2641        stat->cancelsToPeer       = tr_historyGet( peer->cancelsSentToPeer,   now, CANCEL_HISTORY_SEC );
2642        stat->cancelsToClient     = tr_historyGet( peer->cancelsSentToClient, now, CANCEL_HISTORY_SEC );
2643
2644        stat->pendingReqsToPeer   = peer->pendingReqsToPeer;
2645        stat->pendingReqsToClient = peer->pendingReqsToClient;
2646
2647        pch = stat->flagStr;
2648        if( peer->io->utp_socket != NULL) *pch++ = 'T';
2649        if( t->optimistic == peer ) *pch++ = 'O';
2650        if( stat->isDownloadingFrom ) *pch++ = 'D';
2651        else if( stat->clientIsInterested ) *pch++ = 'd';
2652        if( stat->isUploadingTo ) *pch++ = 'U';
2653        else if( stat->peerIsInterested ) *pch++ = 'u';
2654        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
2655        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
2656        if( stat->isEncrypted ) *pch++ = 'E';
2657        if( stat->from == TR_PEER_FROM_DHT ) *pch++ = 'H';
2658        else if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
2659        if( stat->isIncoming ) *pch++ = 'I';
2660        *pch = '\0';
2661    }
2662
2663    *setmeCount = size;
2664
2665    return ret;
2666}
2667
2668/**
2669***
2670**/
2671
2672void
2673tr_peerMgrClearInterest( tr_torrent * tor )
2674{
2675    int i;
2676    Torrent * t = tor->torrentPeers;
2677    const int peerCount = tr_ptrArraySize( &t->peers );
2678
2679    assert( tr_isTorrent( tor ) );
2680    assert( tr_torrentIsLocked( tor ) );
2681
2682    for( i=0; i<peerCount; ++i )
2683    {
2684        const tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2685        tr_peerMsgsSetInterested( peer->msgs, FALSE );
2686    }
2687}
2688
2689/* do we still want this piece and does the peer have it? */
2690static tr_bool
2691isPieceInteresting( const tr_torrent * tor, const tr_peer * peer, tr_piece_index_t index )
2692{
2693    return ( !tor->info.pieces[index].dnd ) /* we want it */
2694        && ( !tr_cpPieceIsComplete( &tor->completion, index ) )  /* we don't have it */
2695        && ( tr_bitsetHas( &peer->have, index ) ); /* peer has it */
2696}
2697
2698/* does this peer have any pieces that we want? */
2699static tr_bool
2700isPeerInteresting( const tr_torrent * tor, const tr_peer * peer )
2701{
2702    tr_piece_index_t i, n;
2703
2704    if ( tr_torrentIsSeed( tor ) )
2705        return FALSE;
2706
2707    if( !tr_torrentIsPieceTransferAllowed( tor, TR_PEER_TO_CLIENT ) )
2708        return FALSE;
2709
2710    for( i=0, n=tor->info.pieceCount; i<n; ++i )
2711        if( isPieceInteresting( tor, peer, i ) )
2712            return TRUE;
2713
2714    return FALSE;
2715}
2716
2717/* determines who we send "interested" messages to */
2718static void
2719rechokeDownloads( Torrent * t )
2720{
2721    int i;
2722    const time_t now = tr_time( );
2723    const int MIN_INTERESTING_PEERS = 5;
2724    const int peerCount = tr_ptrArraySize( &t->peers );
2725    int maxPeers;
2726
2727    int badCount         = 0;
2728    int goodCount        = 0;
2729    int untestedCount    = 0;
2730    tr_peer ** bad       = tr_new( tr_peer*, peerCount );
2731    tr_peer ** good      = tr_new( tr_peer*, peerCount );
2732    tr_peer ** untested  = tr_new( tr_peer*, peerCount );
2733
2734    /* decide how many peers to be interested in */
2735    {
2736        int blocks = 0;
2737        int cancels = 0;
2738        time_t timeSinceCancel;
2739
2740        /* Count up how many blocks & cancels each peer has.
2741         *
2742         * There are two situations where we send out cancels --
2743         *
2744         * 1. We've got unresponsive peers, which is handled by deciding
2745         *    -which- peers to be interested in.
2746         *
2747         * 2. We've hit our bandwidth cap, which is handled by deciding
2748         *    -how many- peers to be interested in.
2749         *
2750         * We're working on 2. here, so we need to ignore unresponsive
2751         * peers in our calculations lest they confuse Transmission into
2752         * thinking it's hit its bandwidth cap.
2753         */
2754        for( i=0; i<peerCount; ++i )
2755        {
2756            const tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2757            const int b = tr_historyGet( peer->blocksSentToClient, now, CANCEL_HISTORY_SEC );
2758            const int c = tr_historyGet( peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC );
2759
2760            if( b == 0 ) /* ignore unresponsive peers, as described above */
2761                continue;
2762
2763            blocks += b;
2764            cancels += c;
2765        }
2766
2767        if( cancels > 0 )
2768        {
2769            /* cancelRate: of the block requests we've recently made, the percentage we cancelled.
2770             * higher values indicate more congestion. */
2771            const double cancelRate = cancels / (double)(cancels + blocks);
2772            const double mult = 1 - MIN( cancelRate, 0.5 );
2773            maxPeers = t->interestedCount * mult;
2774            tordbg( t, "cancel rate is %.3f -- reducing the "
2775                       "number of peers we're interested in by %.0f percent",
2776                       cancelRate, mult * 100 );
2777            t->lastCancel = now;
2778        }
2779
2780        timeSinceCancel = now - t->lastCancel;
2781        if( timeSinceCancel )
2782        {
2783            const int maxIncrease = 15;
2784            const time_t maxHistory = 2 * CANCEL_HISTORY_SEC;
2785            const double mult = MIN( timeSinceCancel, maxHistory ) / (double) maxHistory;
2786            const int inc = maxIncrease * mult;
2787            maxPeers = t->maxPeers + inc;
2788            tordbg( t, "time since last cancel is %li -- increasing the "
2789                       "number of peers we're interested in by %d",
2790                       timeSinceCancel, inc );
2791        }
2792    }
2793
2794    /* don't let the previous section's number tweaking go too far... */
2795    if( maxPeers < MIN_INTERESTING_PEERS )
2796        maxPeers = MIN_INTERESTING_PEERS;
2797    if( maxPeers > t->tor->maxConnectedPeers )
2798        maxPeers = t->tor->maxConnectedPeers;
2799
2800    t->maxPeers = maxPeers;
2801
2802    /* separate the peers into "good" (ones with a low cancel-to-block ratio),
2803     * untested peers, and "bad" (ones with a high cancel-to-block ratio).
2804     * That's the order in which we'll choose who to show interest in */
2805    {
2806        /* Randomize the peer array so the peers in the three groups will be unsorted... */
2807        int n = peerCount;
2808        tr_peer ** peers = tr_memdup( tr_ptrArrayBase( &t->peers ), n * sizeof( tr_peer * ) );
2809
2810        while( n > 0 )
2811        {
2812            const int i = tr_cryptoWeakRandInt( n );
2813            tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2814
2815            if( !isPeerInteresting( t->tor, peer ) )
2816            {
2817                tr_peerMsgsSetInterested( peer->msgs, FALSE );
2818            }
2819            else
2820            {
2821                const int blocks = tr_historyGet( peer->blocksSentToClient, now, CANCEL_HISTORY_SEC );
2822                const int cancels = tr_historyGet( peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC );
2823
2824                if( !blocks && !cancels )
2825                    untested[untestedCount++] = peer;
2826                else if( !cancels )
2827                    good[goodCount++] = peer;
2828                else if( !blocks )
2829                    bad[badCount++] = peer;
2830                else if( ( cancels * 10 ) < blocks )
2831                    good[goodCount++] = peer;
2832                else
2833                    bad[badCount++] = peer;
2834            }
2835
2836            tr_removeElementFromArray( peers, i, sizeof(tr_peer*), n-- );
2837        }
2838
2839        tr_free( peers );
2840    }
2841
2842    t->interestedCount = 0;
2843
2844    /* We've decided (1) how many peers to be interested in,
2845     * and (2) which peers are the best candidates,
2846     * Now it's time to update our `interest' flags. */
2847    for( i=0; i<goodCount; ++i ) {
2848        const tr_bool b = t->interestedCount < maxPeers;
2849        tr_peerMsgsSetInterested( good[i]->msgs, b );
2850        if( b )
2851            ++t->interestedCount;
2852    }
2853    for( i=0; i<untestedCount; ++i ) {
2854        const tr_bool b = t->interestedCount < maxPeers;
2855        tr_peerMsgsSetInterested( untested[i]->msgs, b );
2856        if( b )
2857            ++t->interestedCount;
2858    }
2859    for( i=0; i<badCount; ++i ) {
2860        const tr_bool b = t->interestedCount < maxPeers;
2861        tr_peerMsgsSetInterested( bad[i]->msgs, b );
2862        if( b )
2863            ++t->interestedCount;
2864    }
2865
2866/*fprintf( stderr, "num interested: %d\n", t->interestedCount );*/
2867
2868    /* cleanup */
2869    tr_free( untested );
2870    tr_free( good );
2871    tr_free( bad );
2872}
2873
2874/**
2875***
2876**/
2877
2878struct ChokeData
2879{
2880    tr_bool         isInterested;
2881    tr_bool         wasChoked;
2882    tr_bool         isChoked;
2883    int             rate;
2884    int             salt;
2885    tr_peer *       peer;
2886};
2887
2888static int
2889compareChoke( const void * va,
2890              const void * vb )
2891{
2892    const struct ChokeData * a = va;
2893    const struct ChokeData * b = vb;
2894
2895    if( a->rate != b->rate ) /* prefer higher overall speeds */
2896        return a->rate > b->rate ? -1 : 1;
2897
2898    if( a->wasChoked != b->wasChoked ) /* prefer unchoked */
2899        return a->wasChoked ? 1 : -1;
2900
2901    if( a->salt != b->salt ) /* random order */
2902        return a->salt - b->salt;
2903
2904    return 0;
2905}
2906
2907/* is this a new connection? */
2908static int
2909isNew( const tr_peer * peer )
2910{
2911    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
2912}
2913
2914/* get a rate for deciding which peers to choke and unchoke. */
2915static int
2916getRate( const tr_torrent * tor, struct peer_atom * atom, uint64_t now )
2917{
2918    int Bps;
2919
2920    if( tr_torrentIsSeed( tor ) )
2921        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_CLIENT_TO_PEER );
2922
2923    /* downloading a private torrent... take upload speed into account
2924     * because there may only be a small window of opportunity to share */
2925    else if( tr_torrentIsPrivate( tor ) )
2926        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_PEER_TO_CLIENT )
2927            + tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_CLIENT_TO_PEER );
2928
2929    /* downloading a public torrent */
2930    else
2931        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_PEER_TO_CLIENT );
2932
2933    /* convert it to bytes per second */
2934    return Bps;
2935}
2936
2937static inline tr_bool
2938isBandwidthMaxedOut( const tr_bandwidth * b,
2939                     const uint64_t now_msec, tr_direction dir )
2940{
2941    if( !tr_bandwidthIsLimited( b, dir ) )
2942        return FALSE;
2943    else {
2944        const int got = tr_bandwidthGetPieceSpeed_Bps( b, now_msec, dir );
2945        const int want = tr_bandwidthGetDesiredSpeed_Bps( b, dir );
2946        return got >= want;
2947    }
2948}
2949
2950static void
2951rechokeUploads( Torrent * t, const uint64_t now )
2952{
2953    int i, size, unchokedInterested;
2954    const int peerCount = tr_ptrArraySize( &t->peers );
2955    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
2956    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
2957    const tr_session * session = t->manager->session;
2958    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
2959    const tr_bool isMaxedOut = isBandwidthMaxedOut( t->tor->bandwidth, now, TR_UP );
2960
2961    assert( torrentIsLocked( t ) );
2962
2963    /* an optimistic unchoke peer's "optimistic"
2964     * state lasts for N calls to rechokeUploads(). */
2965    if( t->optimisticUnchokeTimeScaler > 0 )
2966        t->optimisticUnchokeTimeScaler--;
2967    else
2968        t->optimistic = NULL;
2969
2970    /* sort the peers by preference and rate */
2971    for( i = 0, size = 0; i < peerCount; ++i )
2972    {
2973        tr_peer * peer = peers[i];
2974        struct peer_atom * atom = peer->atom;
2975
2976        if( peer->progress >= 1.0 ) /* choke all seeds */
2977        {
2978            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2979        }
2980        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
2981        {
2982            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2983        }
2984        else if( chokeAll ) /* choke everyone if we're not uploading */
2985        {
2986            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2987        }
2988        else if( peer != t->optimistic )
2989        {
2990            struct ChokeData * n = &choke[size++];
2991            n->peer         = peer;
2992            n->isInterested = peer->peerIsInterested;
2993            n->wasChoked    = peer->peerIsChoked;
2994            n->rate         = getRate( t->tor, atom, now );
2995            n->salt         = tr_cryptoWeakRandInt( INT_MAX );
2996            n->isChoked     = TRUE;
2997        }
2998    }
2999
3000    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
3001
3002    /**
3003     * Reciprocation and number of uploads capping is managed by unchoking
3004     * the N peers which have the best upload rate and are interested.
3005     * This maximizes the client's download rate. These N peers are
3006     * referred to as downloaders, because they are interested in downloading
3007     * from the client.
3008     *
3009     * Peers which have a better upload rate (as compared to the downloaders)
3010     * but aren't interested get unchoked. If they become interested, the
3011     * downloader with the worst upload rate gets choked. If a client has
3012     * a complete file, it uses its upload rate rather than its download
3013     * rate to decide which peers to unchoke.
3014     *
3015     * If our bandwidth is maxed out, don't unchoke any more peers.
3016     */
3017    unchokedInterested = 0;
3018    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
3019        choke[i].isChoked = isMaxedOut ? choke[i].wasChoked : FALSE;
3020        if( choke[i].isInterested )
3021            ++unchokedInterested;
3022    }
3023
3024    /* optimistic unchoke */
3025    if( !t->optimistic && !isMaxedOut && (i<size) )
3026    {
3027        int n;
3028        struct ChokeData * c;
3029        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
3030
3031        for( ; i<size; ++i )
3032        {
3033            if( choke[i].isInterested )
3034            {
3035                const tr_peer * peer = choke[i].peer;
3036                int x = 1, y;
3037                if( isNew( peer ) ) x *= 3;
3038                for( y=0; y<x; ++y )
3039                    tr_ptrArrayAppend( &randPool, &choke[i] );
3040            }
3041        }
3042
3043        if(( n = tr_ptrArraySize( &randPool )))
3044        {
3045            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
3046            c->isChoked = FALSE;
3047            t->optimistic = c->peer;
3048            t->optimisticUnchokeTimeScaler = OPTIMISTIC_UNCHOKE_MULTIPLIER;
3049        }
3050
3051        tr_ptrArrayDestruct( &randPool, NULL );
3052    }
3053
3054    for( i=0; i<size; ++i )
3055        tr_peerMsgsSetChoke( choke[i].peer->msgs, choke[i].isChoked );
3056
3057    /* cleanup */
3058    tr_free( choke );
3059}
3060
3061static void
3062rechokePulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3063{
3064    tr_torrent * tor = NULL;
3065    tr_peerMgr * mgr = vmgr;
3066    const uint64_t now = tr_sessionGetTimeMsec( mgr->session );
3067
3068    managerLock( mgr );
3069
3070    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3071        if( tor->isRunning ) {
3072            rechokeUploads( tor->torrentPeers, now );
3073            if( !tr_torrentIsSeed( tor ) )
3074                rechokeDownloads( tor->torrentPeers );
3075        }
3076    }
3077
3078    tr_timerAddMsec( mgr->rechokeTimer, RECHOKE_PERIOD_MSEC );
3079    managerUnlock( mgr );
3080}
3081
3082/***
3083****
3084****  Life and Death
3085****
3086***/
3087
3088typedef enum
3089{
3090    TR_CAN_KEEP,
3091    TR_CAN_CLOSE,
3092    TR_MUST_CLOSE,
3093}
3094tr_close_type_t;
3095
3096static tr_close_type_t
3097shouldPeerBeClosed( const Torrent    * t,
3098                    const tr_peer    * peer,
3099                    int                peerCount,
3100                    const time_t       now )
3101{
3102    const tr_torrent *       tor = t->tor;
3103    const struct peer_atom * atom = peer->atom;
3104
3105    /* if it's marked for purging, close it */
3106    if( peer->doPurge )
3107    {
3108        tordbg( t, "purging peer %s because its doPurge flag is set",
3109                tr_atomAddrStr( atom ) );
3110        return TR_MUST_CLOSE;
3111    }
3112
3113    /* if we're seeding and the peer has everything we have,
3114     * and enough time has passed for a pex exchange, then disconnect */
3115    if( tr_torrentIsSeed( tor ) )
3116    {
3117        tr_bool peerHasEverything;
3118
3119        if( atom->seedProbability != -1 )
3120        {
3121            peerHasEverything = atomIsSeed( atom );
3122        }
3123        else
3124        {
3125            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( &tor->completion ) );
3126            tr_bitsetDifference( tmp, &peer->have );
3127            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
3128            tr_bitfieldFree( tmp );
3129        }
3130
3131        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 )))
3132        {
3133            tordbg( t, "purging peer %s because we're both seeds",
3134                    tr_atomAddrStr( atom ) );
3135            return TR_MUST_CLOSE;
3136        }
3137    }
3138
3139    /* disconnect if it's been too long since piece data has been transferred.
3140     * this is on a sliding scale based on number of available peers... */
3141    {
3142        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
3143        /* if we have >= relaxIfFewerThan, strictness is 100%.
3144         * if we have zero connections, strictness is 0% */
3145        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
3146                               ? 1.0
3147                               : peerCount / (float)relaxStrictnessIfFewerThanN;
3148        const int lo = MIN_UPLOAD_IDLE_SECS;
3149        const int hi = MAX_UPLOAD_IDLE_SECS;
3150        const int limit = hi - ( ( hi - lo ) * strictness );
3151        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
3152/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
3153        if( idleTime > limit ) {
3154            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
3155                       tr_atomAddrStr( atom ), idleTime );
3156            return TR_CAN_CLOSE;
3157        }
3158    }
3159
3160    return TR_CAN_KEEP;
3161}
3162
3163static void sortPeersByLivelinessReverse( tr_peer ** peers, void ** clientData, int n, uint64_t now );
3164
3165static tr_peer **
3166getPeersToClose( Torrent * t, tr_close_type_t closeType,
3167                 const uint64_t now_msec, const time_t now_sec,
3168                 int * setmeSize )
3169{
3170    int i, peerCount, outsize;
3171    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
3172    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
3173
3174    assert( torrentIsLocked( t ) );
3175
3176    for( i = outsize = 0; i < peerCount; ++i )
3177        if( shouldPeerBeClosed( t, peers[i], peerCount, now_sec ) == closeType )
3178            ret[outsize++] = peers[i];
3179
3180    sortPeersByLivelinessReverse ( ret, NULL, outsize, now_msec );
3181
3182    *setmeSize = outsize;
3183    return ret;
3184}
3185
3186static int
3187getReconnectIntervalSecs( const struct peer_atom * atom, const time_t now )
3188{
3189    int sec;
3190
3191    /* if we were recently connected to this peer and transferring piece
3192     * data, try to reconnect to them sooner rather that later -- we don't
3193     * want network troubles to get in the way of a good peer. */
3194    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
3195        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3196
3197    /* don't allow reconnects more often than our minimum */
3198    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
3199        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3200
3201    /* otherwise, the interval depends on how many times we've tried
3202     * and failed to connect to the peer */
3203    else switch( atom->numFails ) {
3204        case 0: sec = 0; break;
3205        case 1: sec = 5; break;
3206        case 2: sec = 2 * 60; break;
3207        case 3: sec = 15 * 60; break;
3208        case 4: sec = 30 * 60; break;
3209        case 5: sec = 60 * 60; break;
3210        default: sec = 120 * 60; break;
3211    }
3212
3213    /* penalize peers that were unreachable the last time we tried */
3214    if( atom->flags2 & MYFLAG_UNREACHABLE )
3215        sec += sec;
3216
3217    dbgmsg( "reconnect interval for %s is %d seconds", tr_atomAddrStr( atom ), sec );
3218    return sec;
3219}
3220
3221static void
3222removePeer( Torrent * t, tr_peer * peer )
3223{
3224    tr_peer * removed;
3225    struct peer_atom * atom = peer->atom;
3226
3227    assert( torrentIsLocked( t ) );
3228    assert( atom );
3229
3230    atom->time = tr_time( );
3231
3232    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
3233
3234    if( replicationExists( t ) )
3235        tr_decrReplicationFromBitset( t, &peer->have );
3236
3237    assert( removed == peer );
3238    peerDestructor( t, removed );
3239}
3240
3241static void
3242closePeer( Torrent * t, tr_peer * peer )
3243{
3244    struct peer_atom * atom;
3245
3246    assert( t != NULL );
3247    assert( peer != NULL );
3248
3249    atom = peer->atom;
3250
3251    /* if we transferred piece data, then they might be good peers,
3252       so reset their `numFails' weight to zero. otherwise we connected
3253       to them fruitlessly, so mark it as another fail */
3254    if( atom->piece_data_time ) {
3255        tordbg( t, "resetting atom %s numFails to 0", tr_atomAddrStr(atom) );
3256        atom->numFails = 0;
3257    } else {
3258        ++atom->numFails;
3259        tordbg( t, "incremented atom %s numFails to %d", tr_atomAddrStr(atom), (int)atom->numFails );
3260    }
3261
3262    tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
3263    removePeer( t, peer );
3264}
3265
3266static void
3267removeAllPeers( Torrent * t )
3268{
3269    while( !tr_ptrArrayEmpty( &t->peers ) )
3270        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
3271}
3272
3273static void
3274closeBadPeers( Torrent * t, const uint64_t now_msec, const time_t now_sec )
3275{
3276    if( !t->isRunning )
3277    {
3278        removeAllPeers( t );
3279    }
3280    else
3281    {
3282        int i;
3283        int mustCloseCount;
3284        struct tr_peer ** mustClose;
3285
3286        /* disconnect the really bad peers */
3287        mustClose = getPeersToClose( t, TR_MUST_CLOSE, now_msec, now_sec, &mustCloseCount );
3288        for( i=0; i<mustCloseCount; ++i )
3289            closePeer( t, mustClose[i] );
3290        tr_free( mustClose );
3291    }
3292}
3293
3294struct peer_liveliness
3295{
3296    tr_peer * peer;
3297    void * clientData;
3298    time_t pieceDataTime;
3299    time_t time;
3300    int speed;
3301    tr_bool doPurge;
3302};
3303
3304static int
3305comparePeerLiveliness( const void * va, const void * vb )
3306{
3307    const struct peer_liveliness * a = va;
3308    const struct peer_liveliness * b = vb;
3309
3310    if( a->doPurge != b->doPurge )
3311        return a->doPurge ? 1 : -1;
3312
3313    if( a->speed != b->speed ) /* faster goes first */
3314        return a->speed > b->speed ? -1 : 1;
3315
3316    /* the one to give us data more recently goes first */
3317    if( a->pieceDataTime != b->pieceDataTime )
3318        return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
3319
3320    /* the one we connected to most recently goes first */
3321    if( a->time != b->time )
3322        return a->time > b->time ? -1 : 1;
3323
3324    return 0;
3325}
3326
3327static int
3328comparePeerLivelinessReverse( const void * va, const void * vb )
3329{
3330    return -comparePeerLiveliness (va, vb);
3331}
3332
3333static void
3334sortPeersByLivelinessImpl( tr_peer  ** peers,
3335                           void     ** clientData,
3336                           int         n,
3337                           uint64_t    now,
3338                           int (*compare) ( const void *va, const void *vb ) )
3339{
3340    int i;
3341    struct peer_liveliness *lives, *l;
3342
3343    /* build a sortable array of peer + extra info */
3344    lives = l = tr_new0( struct peer_liveliness, n );
3345    for( i=0; i<n; ++i, ++l )
3346    {
3347        tr_peer * p = peers[i];
3348        l->peer = p;
3349        l->doPurge = p->doPurge;
3350        l->pieceDataTime = p->atom->piece_data_time;
3351        l->time = p->atom->time;
3352        l->speed = tr_peerGetPieceSpeed_Bps( p, now, TR_UP )
3353                 + tr_peerGetPieceSpeed_Bps( p, now, TR_DOWN );
3354        if( clientData )
3355            l->clientData = clientData[i];
3356    }
3357
3358    /* sort 'em */
3359    assert( n == ( l - lives ) );
3360    qsort( lives, n, sizeof( struct peer_liveliness ), compare );
3361
3362    /* build the peer array */
3363    for( i=0, l=lives; i<n; ++i, ++l ) {
3364        peers[i] = l->peer;
3365        if( clientData )
3366            clientData[i] = l->clientData;
3367    }
3368    assert( n == ( l - lives ) );
3369
3370    /* cleanup */
3371    tr_free( lives );
3372}
3373
3374static void
3375sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now )
3376{
3377    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLiveliness );
3378}
3379
3380static void
3381sortPeersByLivelinessReverse( tr_peer ** peers, void ** clientData, int n, uint64_t now )
3382{
3383    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLivelinessReverse );
3384}
3385
3386
3387static void
3388enforceTorrentPeerLimit( Torrent * t, uint64_t now )
3389{
3390    int n = tr_ptrArraySize( &t->peers );
3391    const int max = tr_torrentGetPeerLimit( t->tor );
3392    if( n > max )
3393    {
3394        void * base = tr_ptrArrayBase( &t->peers );
3395        tr_peer ** peers = tr_memdup( base, n*sizeof( tr_peer* ) );
3396        sortPeersByLiveliness( peers, NULL, n, now );
3397        while( n > max )
3398            closePeer( t, peers[--n] );
3399        tr_free( peers );
3400    }
3401}
3402
3403static void
3404enforceSessionPeerLimit( tr_session * session, uint64_t now )
3405{
3406    int n = 0;
3407    tr_torrent * tor = NULL;
3408    const int max = tr_sessionGetPeerLimit( session );
3409
3410    /* count the total number of peers */
3411    while(( tor = tr_torrentNext( session, tor )))
3412        n += tr_ptrArraySize( &tor->torrentPeers->peers );
3413
3414    /* if there are too many, prune out the worst */
3415    if( n > max )
3416    {
3417        tr_peer ** peers = tr_new( tr_peer*, n );
3418        Torrent ** torrents = tr_new( Torrent*, n );
3419
3420        /* populate the peer array */
3421        n = 0;
3422        tor = NULL;
3423        while(( tor = tr_torrentNext( session, tor ))) {
3424            int i;
3425            Torrent * t = tor->torrentPeers;
3426            const int tn = tr_ptrArraySize( &t->peers );
3427            for( i=0; i<tn; ++i, ++n ) {
3428                peers[n] = tr_ptrArrayNth( &t->peers, i );
3429                torrents[n] = t;
3430            }
3431        }
3432
3433        /* sort 'em */
3434        sortPeersByLiveliness( peers, (void**)torrents, n, now );
3435
3436        /* cull out the crappiest */
3437        while( n-- > max )
3438            closePeer( torrents[n], peers[n] );
3439
3440        /* cleanup */
3441        tr_free( torrents );
3442        tr_free( peers );
3443    }
3444}
3445
3446static void makeNewPeerConnections( tr_peerMgr * mgr, const int max );
3447
3448static void
3449reconnectPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3450{
3451    tr_torrent * tor;
3452    tr_peerMgr * mgr = vmgr;
3453    const time_t now_sec = tr_time( );
3454    const uint64_t now_msec = tr_sessionGetTimeMsec( mgr->session );
3455
3456    /**
3457    ***  enforce the per-session and per-torrent peer limits
3458    **/
3459
3460    /* if we're over the per-torrent peer limits, cull some peers */
3461    tor = NULL;
3462    while(( tor = tr_torrentNext( mgr->session, tor )))
3463        if( tor->isRunning )
3464            enforceTorrentPeerLimit( tor->torrentPeers, now_msec );
3465
3466    /* if we're over the per-session peer limits, cull some peers */
3467    enforceSessionPeerLimit( mgr->session, now_msec );
3468
3469    /* remove crappy peers */
3470    tor = NULL;
3471    while(( tor = tr_torrentNext( mgr->session, tor )))
3472        closeBadPeers( tor->torrentPeers, now_msec, now_sec );
3473
3474    /* try to make new peer connections */
3475    makeNewPeerConnections( mgr, MAX_CONNECTIONS_PER_PULSE );
3476}
3477
3478/****
3479*****
3480*****  BANDWIDTH ALLOCATION
3481*****
3482****/
3483
3484static void
3485pumpAllPeers( tr_peerMgr * mgr )
3486{
3487    tr_torrent * tor = NULL;
3488
3489    while(( tor = tr_torrentNext( mgr->session, tor )))
3490    {
3491        int j;
3492        Torrent * t = tor->torrentPeers;
3493
3494        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
3495        {
3496            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
3497            tr_peerMsgsPulse( peer->msgs );
3498        }
3499    }
3500}
3501
3502static void
3503bandwidthPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3504{
3505    tr_torrent * tor;
3506    tr_peerMgr * mgr = vmgr;
3507    managerLock( mgr );
3508
3509    /* FIXME: this next line probably isn't necessary... */
3510    pumpAllPeers( mgr );
3511
3512    /* allocate bandwidth to the peers */
3513    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
3514    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
3515
3516    /* possibly stop torrents that have seeded enough */
3517    tor = NULL;
3518    while(( tor = tr_torrentNext( mgr->session, tor )))
3519        tr_torrentCheckSeedLimit( tor );
3520
3521    /* run the completeness check for any torrents that need it */
3522    tor = NULL;
3523    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3524        if( tor->torrentPeers->needsCompletenessCheck ) {
3525            tor->torrentPeers->needsCompletenessCheck  = FALSE;
3526            tr_torrentRecheckCompleteness( tor );
3527        }
3528    }
3529
3530    /* stop torrents that are ready to stop, but couldn't be stopped earlier
3531     * during the peer-io callback call chain */
3532    tor = NULL;
3533    while(( tor = tr_torrentNext( mgr->session, tor )))
3534        if( tor->isStopping )
3535            tr_torrentStop( tor );
3536
3537    reconnectPulse( 0, 0, mgr );
3538
3539    tr_timerAddMsec( mgr->bandwidthTimer, BANDWIDTH_PERIOD_MSEC );
3540    managerUnlock( mgr );
3541}
3542
3543/***
3544****
3545***/
3546
3547static int
3548compareAtomPtrsByAddress( const void * va, const void *vb )
3549{
3550    const struct peer_atom * a = * (const struct peer_atom**) va;
3551    const struct peer_atom * b = * (const struct peer_atom**) vb;
3552
3553    assert( tr_isAtom( a ) );
3554    assert( tr_isAtom( b ) );
3555
3556    return tr_compareAddresses( &a->addr, &b->addr );
3557}
3558
3559/* best come first, worst go last */
3560static int
3561compareAtomPtrsByShelfDate( const void * va, const void *vb )
3562{
3563    time_t atime;
3564    time_t btime;
3565    const struct peer_atom * a = * (const struct peer_atom**) va;
3566    const struct peer_atom * b = * (const struct peer_atom**) vb;
3567    const int data_time_cutoff_secs = 60 * 60;
3568    const time_t tr_now = tr_time( );
3569
3570    assert( tr_isAtom( a ) );
3571    assert( tr_isAtom( b ) );
3572
3573    /* primary key: the last piece data time *if* it was within the last hour */
3574    atime = a->piece_data_time; if( atime + data_time_cutoff_secs < tr_now ) atime = 0;
3575    btime = b->piece_data_time; if( btime + data_time_cutoff_secs < tr_now ) btime = 0;
3576    if( atime != btime )
3577        return atime > btime ? -1 : 1;
3578
3579    /* secondary key: shelf date. */
3580    if( a->shelf_date != b->shelf_date )
3581        return a->shelf_date > b->shelf_date ? -1 : 1;
3582
3583    return 0;
3584}
3585
3586static int
3587getMaxAtomCount( const tr_torrent * tor )
3588{
3589    const int n = tor->maxConnectedPeers;
3590    /* approximate fit of the old jump discontinuous function */
3591    if( n >= 55 ) return     n + 150;
3592    if( n >= 20 ) return 2 * n + 95;
3593    return               4 * n + 55;
3594}
3595
3596static void
3597atomPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3598{
3599    tr_torrent * tor = NULL;
3600    tr_peerMgr * mgr = vmgr;
3601    managerLock( mgr );
3602
3603    while(( tor = tr_torrentNext( mgr->session, tor )))
3604    {
3605        int atomCount;
3606        Torrent * t = tor->torrentPeers;
3607        const int maxAtomCount = getMaxAtomCount( tor );
3608        struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
3609
3610        if( atomCount > maxAtomCount ) /* we've got too many atoms... time to prune */
3611        {
3612            int i;
3613            int keepCount = 0;
3614            int testCount = 0;
3615            struct peer_atom ** keep = tr_new( struct peer_atom*, atomCount );
3616            struct peer_atom ** test = tr_new( struct peer_atom*, atomCount );
3617
3618            /* keep the ones that are in use */
3619            for( i=0; i<atomCount; ++i ) {
3620                struct peer_atom * atom = atoms[i];
3621                if( peerIsInUse( t, atom ) )
3622                    keep[keepCount++] = atom;
3623                else
3624                    test[testCount++] = atom;
3625            }
3626
3627            /* if there's room, keep the best of what's left */
3628            i = 0;
3629            if( keepCount < maxAtomCount ) {
3630                qsort( test, testCount, sizeof( struct peer_atom * ), compareAtomPtrsByShelfDate );
3631                while( i<testCount && keepCount<maxAtomCount )
3632                    keep[keepCount++] = test[i++];
3633            }
3634
3635            /* free the culled atoms */
3636            while( i<testCount )
3637                tr_free( test[i++] );
3638
3639            /* rebuild Torrent.pool with what's left */
3640            tr_ptrArrayDestruct( &t->pool, NULL );
3641            t->pool = TR_PTR_ARRAY_INIT;
3642            qsort( keep, keepCount, sizeof( struct peer_atom * ), compareAtomPtrsByAddress );
3643            for( i=0; i<keepCount; ++i )
3644                tr_ptrArrayAppend( &t->pool, keep[i] );
3645
3646            tordbg( t, "max atom count is %d... pruned from %d to %d\n", maxAtomCount, atomCount, keepCount );
3647
3648            /* cleanup */
3649            tr_free( test );
3650            tr_free( keep );
3651        }
3652    }
3653
3654    tr_timerAddMsec( mgr->atomTimer, ATOM_PERIOD_MSEC );
3655    managerUnlock( mgr );
3656}
3657
3658/***
3659****
3660****
3661****
3662***/
3663
3664/* is this atom someone that we'd want to initiate a connection to? */
3665static tr_bool
3666isPeerCandidate( const tr_torrent * tor, struct peer_atom * atom, const time_t now )
3667{
3668    /* not if we're both seeds */
3669    if( tr_torrentIsSeed( tor ) )
3670        if( atomIsSeed( atom ) || ( atom->uploadOnly == UPLOAD_ONLY_YES ) )
3671            return FALSE;
3672
3673    /* not if we've already got a connection to them... */
3674    if( peerIsInUse( tor->torrentPeers, atom ) )
3675        return FALSE;
3676
3677    /* not if we just tried them already */
3678    if( ( now - atom->time ) < getReconnectIntervalSecs( atom, now ) )
3679        return FALSE;
3680
3681    /* not if they're blocklisted */
3682    if( isAtomBlocklisted( tor->session, atom ) )
3683        return FALSE;
3684
3685    /* not if they're banned... */
3686    if( atom->flags2 & MYFLAG_BANNED )
3687        return FALSE;
3688
3689    return TRUE;
3690}
3691
3692struct peer_candidate
3693{
3694    uint64_t score;
3695    tr_torrent * tor;
3696    struct peer_atom * atom;
3697};
3698
3699static tr_bool
3700torrentWasRecentlyStarted( const tr_torrent * tor )
3701{
3702    return difftime( tr_time( ), tor->startDate ) < 120;
3703}
3704
3705static inline uint64_t
3706addValToKey( uint64_t value, int width, uint64_t addme )
3707{
3708    value = (value << (uint64_t)width);
3709    value |= addme;
3710    return value;
3711}
3712
3713/* smaller value is better */
3714static uint64_t
3715getPeerCandidateScore( const tr_torrent * tor, const struct peer_atom * atom, uint8_t salt  )
3716{
3717    uint64_t i;
3718    uint64_t score = 0;
3719    const tr_bool failed = atom->lastConnectionAt < atom->lastConnectionAttemptAt;
3720
3721    /* prefer peers we've connected to, or never tried, over peers we failed to connect to. */
3722    i = failed ? 1 : 0;
3723    score = addValToKey( score, 1, i );
3724
3725    /* prefer the one we attempted least recently (to cycle through all peers) */
3726    i = atom->lastConnectionAttemptAt;
3727    score = addValToKey( score, 32, i );
3728
3729    /* prefer peers belonging to a torrent of a higher priority */
3730    switch( tr_torrentGetPriority( tor ) ) {
3731        case TR_PRI_HIGH:    i = 0; break;
3732        case TR_PRI_NORMAL:  i = 1; break;
3733        case TR_PRI_LOW:     i = 2; break;
3734    }
3735    score = addValToKey( score, 4, i );
3736
3737    /* prefer recently-started torrents */
3738    i = torrentWasRecentlyStarted( tor ) ? 0 : 1;
3739    score = addValToKey( score, 1, i );
3740
3741    /* prefer torrents we're downloading with */
3742    i = tr_torrentIsSeed( tor ) ? 1 : 0;
3743    score = addValToKey( score, 1, i );
3744
3745    /* prefer peers that are known to be connectible */
3746    i = ( atom->flags & ADDED_F_CONNECTABLE ) ? 0 : 1;
3747    score = addValToKey( score, 1, i );
3748
3749    /* prefer peers that we might have a chance of uploading to...
3750       so lower seed probability is better */
3751    if( atom->seedProbability == 100 ) i = 101;
3752    else if( atom->seedProbability == -1 ) i = 100;
3753    else i = atom->seedProbability;
3754    score = addValToKey( score, 8, i );
3755
3756    /* Prefer peers that we got from more trusted sources.
3757     * lower `fromBest' values indicate more trusted sources */
3758    score = addValToKey( score, 4, atom->fromBest );
3759
3760    /* salt */
3761    score = addValToKey( score, 8, salt );
3762
3763    return score;
3764}
3765
3766/* sort an array of peer candidates */
3767static int
3768comparePeerCandidates( const void * va, const void * vb )
3769{
3770    const struct peer_candidate * a = va;
3771    const struct peer_candidate * b = vb;
3772
3773    if( a->score < b->score ) return -1;
3774    if( a->score > b->score ) return 1;
3775
3776    return 0;
3777}
3778
3779/** @return an array of all the atoms we might want to connect to */
3780static struct peer_candidate*
3781getPeerCandidates( tr_session * session, int * candidateCount )
3782{
3783    int n;
3784    tr_torrent * tor;
3785    struct peer_candidate * candidates;
3786    struct peer_candidate * walk;
3787    const time_t now = tr_time( );
3788    const uint64_t now_msec = tr_time_msec( );
3789    /* leave 5% of connection slots for incoming connections -- ticket #2609 */
3790    const int maxCandidates = tr_sessionGetPeerLimit( session ) * 0.95;
3791
3792    /* don't start any new handshakes if we're full up */
3793    n = 0;
3794    tor= NULL;
3795    while(( tor = tr_torrentNext( session, tor )))
3796        n += tr_ptrArraySize( &tor->torrentPeers->peers );
3797    if( maxCandidates <= n ) {
3798        *candidateCount = 0;
3799        return NULL;
3800    }
3801
3802    /* allocate an array of candidates */
3803    n = 0;
3804    tor= NULL;
3805    while(( tor = tr_torrentNext( session, tor )))
3806        n += tr_ptrArraySize( &tor->torrentPeers->pool );
3807    walk = candidates = tr_new( struct peer_candidate, n );
3808
3809    /* populate the candidate array */
3810    tor = NULL;
3811    while(( tor = tr_torrentNext( session, tor )))
3812    {
3813        int i, nAtoms;
3814        struct peer_atom ** atoms;
3815
3816        if( !tor->torrentPeers->isRunning )
3817            continue;
3818
3819        /* if we've already got enough peers in this torrent... */
3820        if( tr_torrentGetPeerLimit( tor ) <= tr_ptrArraySize( &tor->torrentPeers->peers ) )
3821            continue;
3822
3823        /* if we've already got enough speed in this torrent... */
3824        if( tr_torrentIsSeed( tor ) && isBandwidthMaxedOut( tor->bandwidth, now_msec, TR_UP ) )
3825            continue;
3826
3827        atoms = (struct peer_atom**) tr_ptrArrayPeek( &tor->torrentPeers->pool, &nAtoms );
3828        for( i=0; i<nAtoms; ++i )
3829        {
3830            struct peer_atom * atom = atoms[i];
3831
3832            if( isPeerCandidate( tor, atom, now ) )
3833            {
3834                const uint8_t salt = tr_cryptoWeakRandInt( 1024 );
3835                walk->tor = tor;
3836                walk->atom = atom;
3837                walk->score = getPeerCandidateScore( tor, atom, salt );
3838                ++walk;
3839            }
3840        }
3841    }
3842
3843    *candidateCount = walk - candidates;
3844    if( *candidateCount > 1 )
3845        qsort( candidates, *candidateCount, sizeof( struct peer_candidate ), comparePeerCandidates );
3846    return candidates;
3847}
3848
3849static void
3850initiateConnection( tr_peerMgr * mgr, Torrent * t, struct peer_atom * atom )
3851{
3852    tr_peerIo * io;
3853    const time_t now = tr_time( );
3854    tr_bool utp = tr_sessionIsUTPEnabled(mgr->session) && !atom->utp_failed;
3855
3856    if( atom->fromFirst == TR_PEER_FROM_PEX )
3857        /* PEX has explicit signalling for uTP support.  If an atom
3858           originally came from PEX and doesn't have the uTP flag, skip the
3859           uTP connection attempt.  Are we being optimistic here? */
3860        utp = utp && (atom->flags & ADDED_F_UTP_FLAGS);
3861
3862    tordbg( t, "Starting an OUTGOING%s connection with %s",
3863            utp ? " µTP" : "",
3864            tr_atomAddrStr( atom ) );
3865
3866    io = tr_peerIoNewOutgoing( mgr->session,
3867                               mgr->session->bandwidth,
3868                               &atom->addr,
3869                               atom->port,
3870                               t->tor->info.hash,
3871                               t->tor->completeness == TR_SEED,
3872                               utp );
3873
3874    if( io == NULL )
3875    {
3876        tordbg( t, "peerIo not created; marking peer %s as unreachable",
3877                tr_atomAddrStr( atom ) );
3878        atom->flags2 |= MYFLAG_UNREACHABLE;
3879        atom->numFails++;
3880    }
3881    else
3882    {
3883        tr_handshake * handshake = tr_handshakeNew( io,
3884                                                    mgr->session->encryptionMode,
3885                                                    myHandshakeDoneCB,
3886                                                    mgr );
3887
3888        assert( tr_peerIoGetTorrentHash( io ) );
3889
3890        tr_peerIoUnref( io ); /* balanced by the initial ref
3891                                 in tr_peerIoNewOutgoing() */
3892
3893        tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
3894                                 handshakeCompare );
3895    }
3896
3897    atom->lastConnectionAttemptAt = now;
3898    atom->time = now;
3899}
3900
3901static void
3902initiateCandidateConnection( tr_peerMgr * mgr, struct peer_candidate * c )
3903{
3904#if 0
3905    fprintf( stderr, "Starting an OUTGOING connection with %s - [%s] seedProbability==%d; %s, %s\n",
3906             tr_atomAddrStr( c->atom ),
3907             tr_torrentName( c->tor ),
3908             (int)c->atom->seedProbability,
3909             tr_torrentIsPrivate( c->tor ) ? "private" : "public",
3910             tr_torrentIsSeed( c->tor ) ? "seed" : "downloader" );
3911#endif
3912
3913    initiateConnection( mgr, c->tor->torrentPeers, c->atom );
3914}
3915
3916static void
3917makeNewPeerConnections( struct tr_peerMgr * mgr, const int max )
3918{
3919    int i, n;
3920    struct peer_candidate * candidates;
3921
3922    candidates = getPeerCandidates( mgr->session, &n );
3923
3924    for( i=0; i<n && i<max; ++i )
3925        initiateCandidateConnection( mgr, &candidates[i] );
3926
3927    tr_free( candidates );
3928}
Note: See TracBrowser for help on using the repository browser.