source: trunk/libtransmission/peer-mgr.c @ 11982

Last change on this file since 11982 was 11982, checked in by jordan, 11 years ago

(trunk libT) #3372 "What happened with closing idle peers?" -- fixed.

  • Property svn:keywords set to Date Rev Author Id
File size: 113.6 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 11982 2011-02-19 12:32:41Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h> /* error codes ERANGE, ... */
15#include <limits.h> /* INT_MAX */
16#include <string.h> /* memcpy, memcmp, strstr */
17#include <stdlib.h> /* qsort */
18
19#include <event2/event.h>
20#include <libutp/utp.h>
21
22#include "transmission.h"
23#include "announcer.h"
24#include "bandwidth.h"
25#include "bencode.h"
26#include "blocklist.h"
27#include "cache.h"
28#include "clients.h"
29#include "completion.h"
30#include "crypto.h"
31#include "handshake.h"
32#include "net.h"
33#include "peer-io.h"
34#include "peer-mgr.h"
35#include "peer-msgs.h"
36#include "ptrarray.h"
37#include "session.h"
38#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
39#include "torrent.h"
40#include "utils.h"
41#include "webseed.h"
42
43enum
44{
45    /* how frequently to cull old atoms */
46    ATOM_PERIOD_MSEC = ( 60 * 1000 ),
47
48    /* how frequently to change which peers are choked */
49    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
50
51    /* an optimistically unchoked peer is immune from rechoking
52       for this many calls to rechokeUploads(). */
53    OPTIMISTIC_UNCHOKE_MULTIPLIER = 4,
54
55    /* how frequently to reallocate bandwidth */
56    BANDWIDTH_PERIOD_MSEC = 500,
57
58    /* how frequently to age out old piece request lists */
59    REFILL_UPKEEP_PERIOD_MSEC = ( 10 * 1000 ),
60
61    /* how frequently to decide which peers live and die */
62    RECONNECT_PERIOD_MSEC = 500,
63
64    /* when many peers are available, keep idle ones this long */
65    MIN_UPLOAD_IDLE_SECS = ( 60 ),
66
67    /* when few peers are available, keep idle ones this long */
68    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
69
70    /* max number of peers to ask for per second overall.
71     * this throttle is to avoid overloading the router */
72    MAX_CONNECTIONS_PER_SECOND = 12,
73
74    MAX_CONNECTIONS_PER_PULSE = (int)(MAX_CONNECTIONS_PER_SECOND * (RECONNECT_PERIOD_MSEC/1000.0)),
75
76    /* number of bad pieces a peer is allowed to send before we ban them */
77    MAX_BAD_PIECES_PER_PEER = 5,
78
79    /* amount of time to keep a list of request pieces lying around
80       before it's considered too old and needs to be rebuilt */
81    PIECE_LIST_SHELF_LIFE_SECS = 60,
82
83    /* use for bitwise operations w/peer_atom.flags2 */
84    MYFLAG_BANNED = 1,
85
86    /* use for bitwise operations w/peer_atom.flags2 */
87    /* unreachable for now... but not banned.
88     * if they try to connect to us it's okay */
89    MYFLAG_UNREACHABLE = 2,
90
91    /* the minimum we'll wait before attempting to reconnect to a peer */
92    MINIMUM_RECONNECT_INTERVAL_SECS = 5,
93
94    /** how long we'll let requests we've made linger before we cancel them */
95    REQUEST_TTL_SECS = 120,
96
97    NO_BLOCKS_CANCEL_HISTORY = 120,
98
99    CANCEL_HISTORY_SEC = 60
100};
101
102const tr_peer_event TR_PEER_EVENT_INIT = { 0, 0, NULL, 0, 0, 0.0f, 0, FALSE, 0 };
103
104/**
105***
106**/
107
108enum
109{
110    UPLOAD_ONLY_UKNOWN,
111    UPLOAD_ONLY_YES,
112    UPLOAD_ONLY_NO
113};
114
115/**
116 * Peer information that should be kept even before we've connected and
117 * after we've disconnected. These are kept in a pool of peer_atoms to decide
118 * which ones would make good candidates for connecting to, and to watch out
119 * for banned peers.
120 *
121 * @see tr_peer
122 * @see tr_peermsgs
123 */
124struct peer_atom
125{
126    uint8_t     fromFirst;          /* where the peer was first found */
127    uint8_t     fromBest;           /* the "best" value of where the peer has been found */
128    uint8_t     flags;              /* these match the added_f flags */
129    uint8_t     flags2;             /* flags that aren't defined in added_f */
130    uint8_t     uploadOnly;         /* UPLOAD_ONLY_ */
131    int8_t      seedProbability;    /* how likely is this to be a seed... [0..100] or -1 for unknown */
132    int8_t      blocklisted;        /* -1 for unknown, TRUE for blocklisted, FALSE for not blocklisted */
133
134    tr_port     port;
135    tr_bool     utp_failed;         /* We recently failed to connect over uTP */
136    uint16_t    numFails;
137    time_t      time;               /* when the peer's connection status last changed */
138    time_t      piece_data_time;
139
140    time_t      lastConnectionAttemptAt;
141    time_t      lastConnectionAt;
142
143    /* similar to a TTL field, but less rigid --
144     * if the swarm is small, the atom will be kept past this date. */
145    time_t      shelf_date;
146    tr_peer   * peer;               /* will be NULL if not connected */
147    tr_address  addr;
148};
149
150#ifdef NDEBUG
151#define tr_isAtom(a) (TRUE)
152#else
153static tr_bool
154tr_isAtom( const struct peer_atom * atom )
155{
156    return ( atom != NULL )
157        && ( atom->fromFirst < TR_PEER_FROM__MAX )
158        && ( atom->fromBest < TR_PEER_FROM__MAX )
159        && ( tr_isAddress( &atom->addr ) );
160}
161#endif
162
163static const char*
164tr_atomAddrStr( const struct peer_atom * atom )
165{
166    return atom ? tr_peerIoAddrStr( &atom->addr, atom->port ) : "[no atom]";
167}
168
169struct block_request
170{
171    tr_block_index_t block;
172    tr_peer * peer;
173    time_t sentAt;
174};
175
176struct weighted_piece
177{
178    tr_piece_index_t index;
179    int16_t salt;
180    int16_t requestCount;
181};
182
183enum piece_sort_state
184{
185    PIECES_UNSORTED,
186    PIECES_SORTED_BY_INDEX,
187    PIECES_SORTED_BY_WEIGHT
188};
189
190/** @brief Opaque, per-torrent data structure for peer connection information */
191typedef struct tr_torrent_peers
192{
193    tr_ptrArray                outgoingHandshakes; /* tr_handshake */
194    tr_ptrArray                pool; /* struct peer_atom */
195    tr_ptrArray                peers; /* tr_peer */
196    tr_ptrArray                webseeds; /* tr_webseed */
197
198    tr_torrent               * tor;
199    struct tr_peerMgr        * manager;
200
201    tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
202    int                        optimisticUnchokeTimeScaler;
203
204    tr_bool                    isRunning;
205    tr_bool                    needsCompletenessCheck;
206
207    struct block_request     * requests;
208    int                        requestCount;
209    int                        requestAlloc;
210
211    struct weighted_piece    * pieces;
212    int                        pieceCount;
213    enum piece_sort_state      pieceSortState;
214
215    /* An array of pieceCount items stating how many peers have each piece.
216       This is used to help us for downloading pieces "rarest first."
217       This may be NULL if we don't have metainfo yet, or if we're not
218       downloading and don't care about rarity */
219    uint16_t                 * pieceReplication;
220    size_t                     pieceReplicationSize;
221
222    int                        interestedCount;
223    int                        maxPeers;
224    time_t                     lastCancel;
225
226    /* Before the endgame this should be 0. In endgame, is contains the average
227     * number of pending requests per peer. Only peers which have more pending
228     * requests are considered 'fast' are allowed to request a block that's
229     * already been requested from another (slower?) peer. */
230    int                        endgame;
231}
232Torrent;
233
234struct tr_peerMgr
235{
236    tr_session    * session;
237    tr_ptrArray     incomingHandshakes; /* tr_handshake */
238    struct event  * bandwidthTimer;
239    struct event  * rechokeTimer;
240    struct event  * refillUpkeepTimer;
241    struct event  * atomTimer;
242};
243
244#define tordbg( t, ... ) \
245    do { \
246        if( tr_deepLoggingIsActive( ) ) \
247            tr_deepLog( __FILE__, __LINE__, tr_torrentName( t->tor ), __VA_ARGS__ ); \
248    } while( 0 )
249
250#define dbgmsg( ... ) \
251    do { \
252        if( tr_deepLoggingIsActive( ) ) \
253            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
254    } while( 0 )
255
256/**
257***
258**/
259
260static inline void
261managerLock( const struct tr_peerMgr * manager )
262{
263    tr_sessionLock( manager->session );
264}
265
266static inline void
267managerUnlock( const struct tr_peerMgr * manager )
268{
269    tr_sessionUnlock( manager->session );
270}
271
272static inline void
273torrentLock( Torrent * torrent )
274{
275    managerLock( torrent->manager );
276}
277
278static inline void
279torrentUnlock( Torrent * torrent )
280{
281    managerUnlock( torrent->manager );
282}
283
284static inline int
285torrentIsLocked( const Torrent * t )
286{
287    return tr_sessionIsLocked( t->manager->session );
288}
289
290/**
291***
292**/
293
294static int
295handshakeCompareToAddr( const void * va, const void * vb )
296{
297    const tr_handshake * a = va;
298
299    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
300}
301
302static int
303handshakeCompare( const void * a, const void * b )
304{
305    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
306}
307
308static inline tr_handshake*
309getExistingHandshake( tr_ptrArray * handshakes, const tr_address * addr )
310{
311    if( tr_ptrArrayEmpty( handshakes ) )
312        return NULL;
313
314    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
315}
316
317static int
318comparePeerAtomToAddress( const void * va, const void * vb )
319{
320    const struct peer_atom * a = va;
321
322    return tr_compareAddresses( &a->addr, vb );
323}
324
325static int
326compareAtomsByAddress( const void * va, const void * vb )
327{
328    const struct peer_atom * b = vb;
329
330    assert( tr_isAtom( b ) );
331
332    return comparePeerAtomToAddress( va, &b->addr );
333}
334
335/**
336***
337**/
338
339const tr_address *
340tr_peerAddress( const tr_peer * peer )
341{
342    return &peer->atom->addr;
343}
344
345static Torrent*
346getExistingTorrent( tr_peerMgr *    manager,
347                    const uint8_t * hash )
348{
349    tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash );
350
351    return tor == NULL ? NULL : tor->torrentPeers;
352}
353
354static int
355peerCompare( const void * a, const void * b )
356{
357    return tr_compareAddresses( tr_peerAddress( a ), tr_peerAddress( b ) );
358}
359
360static struct peer_atom*
361getExistingAtom( const Torrent    * t,
362                 const tr_address * addr )
363{
364    Torrent * tt = (Torrent*)t;
365    assert( torrentIsLocked( t ) );
366    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
367}
368
369static tr_bool
370peerIsInUse( const Torrent * ct, const struct peer_atom * atom )
371{
372    Torrent * t = (Torrent*) ct;
373
374    assert( torrentIsLocked ( t ) );
375
376    return ( atom->peer != NULL )
377        || getExistingHandshake( &t->outgoingHandshakes, &atom->addr )
378        || getExistingHandshake( &t->manager->incomingHandshakes, &atom->addr );
379}
380
381static tr_peer*
382peerConstructor( struct peer_atom * atom )
383{
384    tr_peer * peer = tr_new0( tr_peer, 1 );
385
386    tr_bitsetConstructor( &peer->have, 0 );
387
388    peer->atom = atom;
389    atom->peer = peer;
390
391    peer->blocksSentToClient  = tr_historyNew( CANCEL_HISTORY_SEC, ( RECHOKE_PERIOD_MSEC / 1000 ) );
392    peer->blocksSentToPeer    = tr_historyNew( CANCEL_HISTORY_SEC, ( RECHOKE_PERIOD_MSEC / 1000 ) );
393    peer->cancelsSentToClient = tr_historyNew( CANCEL_HISTORY_SEC, ( RECHOKE_PERIOD_MSEC / 1000 ) );
394    peer->cancelsSentToPeer   = tr_historyNew( CANCEL_HISTORY_SEC, ( REFILL_UPKEEP_PERIOD_MSEC / 1000 ) );
395
396    return peer;
397}
398
399static tr_peer*
400getPeer( Torrent * torrent, struct peer_atom * atom )
401{
402    tr_peer * peer;
403
404    assert( torrentIsLocked( torrent ) );
405
406    peer = atom->peer;
407
408    if( peer == NULL )
409    {
410        peer = peerConstructor( atom );
411        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
412    }
413
414    return peer;
415}
416
417static void peerDeclinedAllRequests( Torrent *, const tr_peer * );
418
419static void
420peerDestructor( Torrent * t, tr_peer * peer )
421{
422    assert( peer != NULL );
423
424    peerDeclinedAllRequests( t, peer );
425
426    if( peer->msgs != NULL )
427        tr_peerMsgsFree( peer->msgs );
428
429    tr_peerIoClear( peer->io );
430    tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
431
432    tr_historyFree( peer->blocksSentToClient  );
433    tr_historyFree( peer->blocksSentToPeer    );
434    tr_historyFree( peer->cancelsSentToClient );
435    tr_historyFree( peer->cancelsSentToPeer   );
436
437    tr_bitsetDestructor( &peer->have );
438    tr_bitfieldFree( peer->blame );
439    tr_free( peer->client );
440    peer->atom->peer = NULL;
441
442    tr_free( peer );
443}
444
445static tr_bool
446replicationExists( const Torrent * t )
447{
448    return t->pieceReplication != NULL;
449}
450
451static void
452replicationFree( Torrent * t )
453{
454    tr_free( t->pieceReplication );
455    t->pieceReplication = NULL;
456    t->pieceReplicationSize = 0;
457}
458
459static void
460replicationNew( Torrent * t )
461{
462    tr_piece_index_t piece_i;
463    const tr_piece_index_t piece_count = t->tor->info.pieceCount;
464    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
465    const int peer_count = tr_ptrArraySize( &t->peers );
466
467    assert( !replicationExists( t ) );
468
469    t->pieceReplicationSize = piece_count;
470    t->pieceReplication = tr_new0( uint16_t, piece_count );
471
472    for( piece_i=0; piece_i<piece_count; ++piece_i )
473    {
474        int peer_i;
475        uint16_t r = 0;
476
477        for( peer_i=0; peer_i<peer_count; ++peer_i )
478            if( tr_bitsetHasFast( &peers[peer_i]->have, piece_i ) )
479                ++r;
480
481        t->pieceReplication[piece_i] = r;
482    }
483}
484
485static void
486torrentDestructor( void * vt )
487{
488    Torrent * t = vt;
489
490    assert( t );
491    assert( !t->isRunning );
492    assert( torrentIsLocked( t ) );
493    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
494    assert( tr_ptrArrayEmpty( &t->peers ) );
495
496    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
497    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
498    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
499    tr_ptrArrayDestruct( &t->peers, NULL );
500
501    replicationFree( t );
502
503    tr_free( t->requests );
504    tr_free( t->pieces );
505    tr_free( t );
506}
507
508static void peerCallbackFunc( tr_peer *, const tr_peer_event *, void * );
509
510static Torrent*
511torrentConstructor( tr_peerMgr * manager,
512                    tr_torrent * tor )
513{
514    int       i;
515    Torrent * t;
516
517    t = tr_new0( Torrent, 1 );
518    t->manager = manager;
519    t->tor = tor;
520    t->pool = TR_PTR_ARRAY_INIT;
521    t->peers = TR_PTR_ARRAY_INIT;
522    t->webseeds = TR_PTR_ARRAY_INIT;
523    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
524
525    for( i = 0; i < tor->info.webseedCount; ++i )
526    {
527        tr_webseed * w =
528            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
529        tr_ptrArrayAppend( &t->webseeds, w );
530    }
531
532    return t;
533}
534
535tr_peerMgr*
536tr_peerMgrNew( tr_session * session )
537{
538    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
539    m->session = session;
540    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
541    return m;
542}
543
544static void
545deleteTimer( struct event ** t )
546{
547    if( *t != NULL )
548    {
549        event_free( *t );
550        *t = NULL;
551    }
552}
553
554static void
555deleteTimers( struct tr_peerMgr * m )
556{
557    deleteTimer( &m->atomTimer );
558    deleteTimer( &m->bandwidthTimer );
559    deleteTimer( &m->rechokeTimer );
560    deleteTimer( &m->refillUpkeepTimer );
561}
562
563void
564tr_peerMgrFree( tr_peerMgr * manager )
565{
566    managerLock( manager );
567
568    deleteTimers( manager );
569
570    /* free the handshakes. Abort invokes handshakeDoneCB(), which removes
571     * the item from manager->handshakes, so this is a little roundabout... */
572    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
573        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
574
575    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
576
577    managerUnlock( manager );
578    tr_free( manager );
579}
580
581static int
582clientIsDownloadingFrom( const tr_torrent * tor, const tr_peer * peer )
583{
584    if( !tr_torrentHasMetadata( tor ) )
585        return TRUE;
586
587    return peer->clientIsInterested && !peer->clientIsChoked;
588}
589
590static int
591clientIsUploadingTo( const tr_peer * peer )
592{
593    return peer->peerIsInterested && !peer->peerIsChoked;
594}
595
596/***
597****
598***/
599
600void
601tr_peerMgrOnBlocklistChanged( tr_peerMgr * mgr )
602{
603    tr_torrent * tor = NULL;
604    tr_session * session = mgr->session;
605
606    /* we cache whether or not a peer is blocklisted...
607       since the blocklist has changed, erase that cached value */
608    while(( tor = tr_torrentNext( session, tor )))
609    {
610        int i;
611        Torrent * t = tor->torrentPeers;
612        const int n = tr_ptrArraySize( &t->pool );
613        for( i=0; i<n; ++i ) {
614            struct peer_atom * atom = tr_ptrArrayNth( &t->pool, i );
615            atom->blocklisted = -1;
616        }
617    }
618}
619
620static tr_bool
621isAtomBlocklisted( tr_session * session, struct peer_atom * atom )
622{
623    if( atom->blocklisted < 0 )
624        atom->blocklisted = tr_sessionIsAddressBlocked( session, &atom->addr );
625
626    assert( tr_isBool( atom->blocklisted ) );
627    return atom->blocklisted;
628}
629
630
631/***
632****
633***/
634
635static void
636atomSetSeedProbability( struct peer_atom * atom, int seedProbability )
637{
638    assert( atom != NULL );
639    assert( -1<=seedProbability && seedProbability<=100 );
640
641    atom->seedProbability = seedProbability;
642
643    if( seedProbability == 100 )
644        atom->flags |= ADDED_F_SEED_FLAG;
645    else if( seedProbability != -1 )
646        atom->flags &= ~ADDED_F_SEED_FLAG;
647}
648
649static void
650atomSetSeed( struct peer_atom * atom )
651{
652    atomSetSeedProbability( atom, 100 );
653}
654
655static inline tr_bool
656atomIsSeed( const struct peer_atom * atom )
657{
658    return atom->seedProbability == 100;
659}
660
661tr_bool
662tr_peerMgrPeerIsSeed( const tr_torrent  * tor,
663                      const tr_address  * addr )
664{
665    tr_bool isSeed = FALSE;
666    const Torrent * t = tor->torrentPeers;
667    const struct peer_atom * atom = getExistingAtom( t, addr );
668
669    if( atom )
670        isSeed = atomIsSeed( atom );
671
672    return isSeed;
673}
674
675void
676tr_peerMgrSetUtpSupported( tr_torrent * tor, const tr_address * addr )
677{
678    struct peer_atom * atom = getExistingAtom( tor->torrentPeers, addr );
679
680    if( atom )
681        atom->flags |= ADDED_F_UTP_FLAGS;
682}
683
684void
685tr_peerMgrSetUtpFailed( tr_torrent *tor, const tr_address *addr, tr_bool failed )
686{
687    struct peer_atom * atom = getExistingAtom( tor->torrentPeers, addr );
688
689    if( atom )
690        atom->utp_failed = failed;
691}
692
693
694/**
695***  REQUESTS
696***
697*** There are two data structures associated with managing block requests:
698***
699*** 1. Torrent::requests, an array of "struct block_request" which keeps
700***    track of which blocks have been requested, and when, and by which peers.
701***    This is list is used for (a) cancelling requests that have been pending
702***    for too long and (b) avoiding duplicate requests before endgame.
703***
704*** 2. Torrent::pieces, an array of "struct weighted_piece" which lists the
705***    pieces that we want to request. It's used to decide which blocks to
706***    return next when tr_peerMgrGetBlockRequests() is called.
707**/
708
709/**
710*** struct block_request
711**/
712
713static int
714compareReqByBlock( const void * va, const void * vb )
715{
716    const struct block_request * a = va;
717    const struct block_request * b = vb;
718
719    /* primary key: block */
720    if( a->block < b->block ) return -1;
721    if( a->block > b->block ) return 1;
722
723    /* secondary key: peer */
724    if( a->peer < b->peer ) return -1;
725    if( a->peer > b->peer ) return 1;
726
727    return 0;
728}
729
730static void
731requestListAdd( Torrent * t, tr_block_index_t block, tr_peer * peer )
732{
733    struct block_request key;
734
735    /* ensure enough room is available... */
736    if( t->requestCount + 1 >= t->requestAlloc )
737    {
738        const int CHUNK_SIZE = 128;
739        t->requestAlloc += CHUNK_SIZE;
740        t->requests = tr_renew( struct block_request,
741                                t->requests, t->requestAlloc );
742    }
743
744    /* populate the record we're inserting */
745    key.block = block;
746    key.peer = peer;
747    key.sentAt = tr_time( );
748
749    /* insert the request to our array... */
750    {
751        tr_bool exact;
752        const int pos = tr_lowerBound( &key, t->requests, t->requestCount,
753                                       sizeof( struct block_request ),
754                                       compareReqByBlock, &exact );
755        assert( !exact );
756        memmove( t->requests + pos + 1,
757                 t->requests + pos,
758                 sizeof( struct block_request ) * ( t->requestCount++ - pos ) );
759        t->requests[pos] = key;
760    }
761
762    if( peer != NULL )
763    {
764        ++peer->pendingReqsToPeer;
765        assert( peer->pendingReqsToPeer >= 0 );
766    }
767
768    /*fprintf( stderr, "added request of block %lu from peer %s... "
769                       "there are now %d block\n",
770                       (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
771}
772
773static struct block_request *
774requestListLookup( Torrent * t, tr_block_index_t block, const tr_peer * peer )
775{
776    struct block_request key;
777    key.block = block;
778    key.peer = (tr_peer*) peer;
779
780    return bsearch( &key, t->requests, t->requestCount,
781                    sizeof( struct block_request ),
782                    compareReqByBlock );
783}
784
785/**
786 * Find the peers are we currently requesting the block
787 * with index @a block from and append them to @a peerArr.
788 */
789static void
790getBlockRequestPeers( Torrent * t, tr_block_index_t block,
791                      tr_ptrArray * peerArr )
792{
793    tr_bool exact;
794    int i, pos;
795    struct block_request key;
796
797    key.block = block;
798    key.peer = NULL;
799    pos = tr_lowerBound( &key, t->requests, t->requestCount,
800                         sizeof( struct block_request ),
801                         compareReqByBlock, &exact );
802
803    assert( !exact ); /* shouldn't have a request with .peer == NULL */
804
805    for( i = pos; i < t->requestCount; ++i )
806    {
807        if( t->requests[i].block != block )
808            break;
809        tr_ptrArrayAppend( peerArr, t->requests[i].peer );
810    }
811}
812
813static void
814decrementPendingReqCount( const struct block_request * b )
815{
816    if( b->peer != NULL )
817        if( b->peer->pendingReqsToPeer > 0 )
818            --b->peer->pendingReqsToPeer;
819}
820
821static void
822requestListRemove( Torrent * t, tr_block_index_t block, const tr_peer * peer )
823{
824    const struct block_request * b = requestListLookup( t, block, peer );
825    if( b != NULL )
826    {
827        const int pos = b - t->requests;
828        assert( pos < t->requestCount );
829
830        decrementPendingReqCount( b );
831
832        tr_removeElementFromArray( t->requests,
833                                   pos,
834                                   sizeof( struct block_request ),
835                                   t->requestCount-- );
836
837        /*fprintf( stderr, "removing request of block %lu from peer %s... "
838                           "there are now %d block requests left\n",
839                           (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
840    }
841}
842
843static int
844countActiveWebseeds( const Torrent * t )
845{
846    int activeCount = 0;
847    const tr_webseed ** w = (const tr_webseed **) tr_ptrArrayBase( &t->webseeds );
848    const tr_webseed ** const wend = w + tr_ptrArraySize( &t->webseeds );
849
850    for( ; w!=wend; ++w )
851        if( tr_webseedIsActive( *w ) )
852            ++activeCount;
853
854    return activeCount;
855}
856
857static void
858updateEndgame( Torrent * t )
859{
860    const tr_torrent * tor = t->tor;
861    const tr_block_index_t missing = tr_cpBlocksMissing( &tor->completion );
862
863    assert( t->requestCount >= 0 );
864
865    if( (tr_block_index_t) t->requestCount < missing )
866    {
867        /* not in endgame */
868        t->endgame = 0;
869    }
870    else if( !t->endgame ) /* only recalculate when endgame first begins */
871    {
872        int numDownloading = 0;
873        const tr_peer ** p = (const tr_peer **) tr_ptrArrayBase( &t->peers );
874        const tr_peer ** const pend = p + tr_ptrArraySize( &t->peers );
875
876        /* add the active bittorrent peers... */
877        for( ; p!=pend; ++p )
878            if( (*p)->pendingReqsToPeer > 0 )
879                ++numDownloading;
880
881        /* add the active webseeds... */
882        numDownloading += countActiveWebseeds( t );
883
884        /* average number of pending requests per downloading peer */
885        t->endgame = t->requestCount / MAX( numDownloading, 1 );
886    }
887}
888
889
890/****
891*****
892*****  Piece List Manipulation / Accessors
893*****
894****/
895
896static inline void
897invalidatePieceSorting( Torrent * t )
898{
899    t->pieceSortState = PIECES_UNSORTED;
900}
901
902const tr_torrent * weightTorrent;
903
904const uint16_t * weightReplication;
905
906static void
907setComparePieceByWeightTorrent( Torrent * t )
908{
909    if( !replicationExists( t ) )
910        replicationNew( t );
911
912    weightTorrent = t->tor;
913    weightReplication = t->pieceReplication;
914}
915
916/* we try to create a "weight" s.t. high-priority pieces come before others,
917 * and that partially-complete pieces come before empty ones. */
918static int
919comparePieceByWeight( const void * va, const void * vb )
920{
921    const struct weighted_piece * a = va;
922    const struct weighted_piece * b = vb;
923    int ia, ib, missing, pending;
924    const tr_torrent * tor = weightTorrent;
925    const uint16_t * rep = weightReplication;
926
927    /* primary key: weight */
928    missing = tr_cpMissingBlocksInPiece( &tor->completion, a->index );
929    pending = a->requestCount;
930    ia = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
931    missing = tr_cpMissingBlocksInPiece( &tor->completion, b->index );
932    pending = b->requestCount;
933    ib = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
934    if( ia < ib ) return -1;
935    if( ia > ib ) return 1;
936
937    /* secondary key: higher priorities go first */
938    ia = tor->info.pieces[a->index].priority;
939    ib = tor->info.pieces[b->index].priority;
940    if( ia > ib ) return -1;
941    if( ia < ib ) return 1;
942
943    /* tertiary key: rarest first. */
944    ia = rep[a->index];
945    ib = rep[b->index];
946    if( ia < ib ) return -1;
947    if( ia > ib ) return 1;
948
949    /* quaternary key: random */
950    if( a->salt < b->salt ) return -1;
951    if( a->salt > b->salt ) return 1;
952
953    /* okay, they're equal */
954    return 0;
955}
956
957static int
958comparePieceByIndex( const void * va, const void * vb )
959{
960    const struct weighted_piece * a = va;
961    const struct weighted_piece * b = vb;
962    if( a->index < b->index ) return -1;
963    if( a->index > b->index ) return 1;
964    return 0;
965}
966
967static void
968pieceListSort( Torrent * t, enum piece_sort_state state )
969{
970    assert( state==PIECES_SORTED_BY_INDEX
971         || state==PIECES_SORTED_BY_WEIGHT );
972
973
974    if( state == PIECES_SORTED_BY_WEIGHT )
975    {
976        setComparePieceByWeightTorrent( t );
977        qsort( t->pieces, t->pieceCount, sizeof( struct weighted_piece ), comparePieceByWeight );
978    }
979    else
980        qsort( t->pieces, t->pieceCount, sizeof( struct weighted_piece ), comparePieceByIndex );
981
982    t->pieceSortState = state;
983}
984
985/**
986 * These functions are useful for testing, but too expensive for nightly builds.
987 * let's leave it disabled but add an easy hook to compile it back in
988 */
989#if 1
990#define assertWeightedPiecesAreSorted(t)
991#define assertReplicationCountIsExact(t)
992#else
993static void
994assertWeightedPiecesAreSorted( Torrent * t )
995{
996    if( !t->endgame )
997    {
998        int i;
999        setComparePieceByWeightTorrent( t );
1000        for( i=0; i<t->pieceCount-1; ++i )
1001            assert( comparePieceByWeight( &t->pieces[i], &t->pieces[i+1] ) <= 0 );
1002    }
1003}
1004static void
1005assertReplicationCountIsExact( Torrent * t )
1006{
1007    /* This assert might fail due to errors of implementations in other
1008     * clients. It happens when receiving duplicate bitfields/HaveAll/HaveNone
1009     * from a client. If a such a behavior is noticed,
1010     * a bug report should be filled to the faulty client. */
1011
1012    size_t piece_i;
1013    const uint16_t * rep = t->pieceReplication;
1014    const size_t piece_count = t->pieceReplicationSize;
1015    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1016    const int peer_count = tr_ptrArraySize( &t->peers );
1017
1018    assert( piece_count == t->tor->info.pieceCount );
1019
1020    for( piece_i=0; piece_i<piece_count; ++piece_i )
1021    {
1022        int peer_i;
1023        uint16_t r = 0;
1024
1025        for( peer_i=0; peer_i<peer_count; ++peer_i )
1026            if( tr_bitsetHasFast( &peers[peer_i]->have, piece_i ) )
1027                ++r;
1028
1029        assert( rep[piece_i] == r );
1030    }
1031}
1032#endif
1033
1034static struct weighted_piece *
1035pieceListLookup( Torrent * t, tr_piece_index_t index )
1036{
1037    int i;
1038
1039    for( i=0; i<t->pieceCount; ++i )
1040        if( t->pieces[i].index == index )
1041            return &t->pieces[i];
1042
1043    return NULL;
1044}
1045
1046static void
1047pieceListRebuild( Torrent * t )
1048{
1049
1050    if( !tr_torrentIsSeed( t->tor ) )
1051    {
1052        tr_piece_index_t i;
1053        tr_piece_index_t * pool;
1054        tr_piece_index_t poolCount = 0;
1055        const tr_torrent * tor = t->tor;
1056        const tr_info * inf = tr_torrentInfo( tor );
1057        struct weighted_piece * pieces;
1058        int pieceCount;
1059
1060        /* build the new list */
1061        pool = tr_new( tr_piece_index_t, inf->pieceCount );
1062        for( i=0; i<inf->pieceCount; ++i )
1063            if( !inf->pieces[i].dnd )
1064                if( !tr_cpPieceIsComplete( &tor->completion, i ) )
1065                    pool[poolCount++] = i;
1066        pieceCount = poolCount;
1067        pieces = tr_new0( struct weighted_piece, pieceCount );
1068        for( i=0; i<poolCount; ++i ) {
1069            struct weighted_piece * piece = pieces + i;
1070            piece->index = pool[i];
1071            piece->requestCount = 0;
1072            piece->salt = tr_cryptoWeakRandInt( 4096 );
1073        }
1074
1075        /* if we already had a list of pieces, merge it into
1076         * the new list so we don't lose its requestCounts */
1077        if( t->pieces != NULL )
1078        {
1079            struct weighted_piece * o = t->pieces;
1080            struct weighted_piece * oend = o + t->pieceCount;
1081            struct weighted_piece * n = pieces;
1082            struct weighted_piece * nend = n + pieceCount;
1083
1084            pieceListSort( t, PIECES_SORTED_BY_INDEX );
1085
1086            while( o!=oend && n!=nend ) {
1087                if( o->index < n->index )
1088                    ++o;
1089                else if( o->index > n->index )
1090                    ++n;
1091                else
1092                    *n++ = *o++;
1093            }
1094
1095            tr_free( t->pieces );
1096        }
1097
1098        t->pieces = pieces;
1099        t->pieceCount = pieceCount;
1100
1101        pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
1102
1103        /* cleanup */
1104        tr_free( pool );
1105    }
1106}
1107
1108static void
1109pieceListRemovePiece( Torrent * t, tr_piece_index_t piece )
1110{
1111    struct weighted_piece * p;
1112
1113    if(( p = pieceListLookup( t, piece )))
1114    {
1115        const int pos = p - t->pieces;
1116
1117        tr_removeElementFromArray( t->pieces,
1118                                   pos,
1119                                   sizeof( struct weighted_piece ),
1120                                   t->pieceCount-- );
1121
1122        if( t->pieceCount == 0 )
1123        {
1124            tr_free( t->pieces );
1125            t->pieces = NULL;
1126        }
1127    }
1128}
1129
1130static void
1131pieceListResortPiece( Torrent * t, struct weighted_piece * p )
1132{
1133    int pos;
1134    tr_bool isSorted = TRUE;
1135
1136    if( p == NULL )
1137        return;
1138
1139    /* is the torrent already sorted? */
1140    pos = p - t->pieces;
1141    setComparePieceByWeightTorrent( t );
1142    if( isSorted && ( pos > 0 ) && ( comparePieceByWeight( p-1, p ) > 0 ) )
1143        isSorted = FALSE;
1144    if( isSorted && ( pos < t->pieceCount - 1 ) && ( comparePieceByWeight( p, p+1 ) > 0 ) )
1145        isSorted = FALSE;
1146
1147    if( t->pieceSortState != PIECES_SORTED_BY_WEIGHT )
1148    {
1149       pieceListSort( t, PIECES_SORTED_BY_WEIGHT);
1150       isSorted = TRUE;
1151    }
1152
1153    /* if it's not sorted, move it around */
1154    if( !isSorted )
1155    {
1156        tr_bool exact;
1157        const struct weighted_piece tmp = *p;
1158
1159        tr_removeElementFromArray( t->pieces,
1160                                   pos,
1161                                   sizeof( struct weighted_piece ),
1162                                   t->pieceCount-- );
1163
1164        pos = tr_lowerBound( &tmp, t->pieces, t->pieceCount,
1165                             sizeof( struct weighted_piece ),
1166                             comparePieceByWeight, &exact );
1167
1168        memmove( &t->pieces[pos + 1],
1169                 &t->pieces[pos],
1170                 sizeof( struct weighted_piece ) * ( t->pieceCount++ - pos ) );
1171
1172        t->pieces[pos] = tmp;
1173    }
1174
1175    assertWeightedPiecesAreSorted( t );
1176}
1177
1178static void
1179pieceListRemoveRequest( Torrent * t, tr_block_index_t block )
1180{
1181    struct weighted_piece * p;
1182    const tr_piece_index_t index = tr_torBlockPiece( t->tor, block );
1183
1184    if( ((p = pieceListLookup( t, index ))) && ( p->requestCount > 0 ) )
1185    {
1186        --p->requestCount;
1187        pieceListResortPiece( t, p );
1188    }
1189}
1190
1191
1192/****
1193*****
1194*****  Replication count ( for rarest first policy )
1195*****
1196****/
1197
1198/**
1199 * Increase the replication count of this piece and sort it if the
1200 * piece list is already sorted
1201 */
1202static void
1203tr_incrReplicationOfPiece( Torrent * t, const size_t index )
1204{
1205    assert( replicationExists( t ) );
1206    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1207
1208    /* One more replication of this piece is present in the swarm */
1209    ++t->pieceReplication[index];
1210
1211    /* we only resort the piece if the list is already sorted */
1212    if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1213        pieceListResortPiece( t, pieceListLookup( t, index ) );
1214}
1215
1216/**
1217 * Increases the replication count of pieces present in the bitfield
1218 */
1219static void
1220tr_incrReplicationFromBitfield( Torrent * t, const tr_bitfield * b )
1221{
1222    size_t i;
1223    uint16_t * rep = t->pieceReplication;
1224    const size_t n = t->tor->info.pieceCount;
1225
1226    assert( replicationExists( t ) );
1227    assert( n == t->pieceReplicationSize );
1228    assert( tr_bitfieldTestFast( b, n-1 ) );
1229
1230    for( i=0; i<n; ++i )
1231        if( tr_bitfieldHas( b, i ) )
1232            ++rep[i];
1233
1234    if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1235        invalidatePieceSorting( t );
1236}
1237
1238/**
1239 * Increase the replication count of every piece
1240 */
1241static void
1242tr_incrReplication( Torrent * t )
1243{
1244    int i;
1245    const int n = t->pieceReplicationSize;
1246
1247    assert( replicationExists( t ) );
1248    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1249
1250    for( i=0; i<n; ++i )
1251        ++t->pieceReplication[i];
1252}
1253
1254/**
1255 * Decrease the replication count of pieces present in the bitset.
1256 */
1257static void
1258tr_decrReplicationFromBitset( Torrent * t, const tr_bitset * bitset )
1259{
1260    int i;
1261    const int n = t->pieceReplicationSize;
1262
1263    assert( replicationExists( t ) );
1264    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1265
1266    if( bitset->haveAll )
1267    {
1268        for( i=0; i<n; ++i )
1269            --t->pieceReplication[i];
1270    }
1271    else if ( !bitset->haveNone )
1272    {
1273        const tr_bitfield * const b = &bitset->bitfield;
1274
1275        for( i=0; i<n; ++i )
1276            if( tr_bitfieldHas( b, i ) )
1277                --t->pieceReplication[i];
1278
1279        if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1280            invalidatePieceSorting( t );
1281    }
1282}
1283
1284/**
1285***
1286**/
1287
1288void
1289tr_peerMgrRebuildRequests( tr_torrent * tor )
1290{
1291    assert( tr_isTorrent( tor ) );
1292
1293    pieceListRebuild( tor->torrentPeers );
1294}
1295
1296void
1297tr_peerMgrGetNextRequests( tr_torrent           * tor,
1298                           tr_peer              * peer,
1299                           int                    numwant,
1300                           tr_block_index_t     * setme,
1301                           int                  * numgot )
1302{
1303    int i;
1304    int got;
1305    Torrent * t;
1306    struct weighted_piece * pieces;
1307    const tr_bitset * have = &peer->have;
1308
1309    /* sanity clause */
1310    assert( tr_isTorrent( tor ) );
1311    assert( peer->clientIsInterested );
1312    assert( !peer->clientIsChoked );
1313    assert( numwant > 0 );
1314
1315    /* walk through the pieces and find blocks that should be requested */
1316    got = 0;
1317    t = tor->torrentPeers;
1318
1319    /* prep the pieces list */
1320    if( t->pieces == NULL )
1321        pieceListRebuild( t );
1322
1323    if( t->pieceSortState != PIECES_SORTED_BY_WEIGHT )
1324        pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
1325
1326    assertReplicationCountIsExact( t );
1327    assertWeightedPiecesAreSorted( t );
1328
1329    updateEndgame( t );
1330    pieces = t->pieces;
1331    for( i=0; i<t->pieceCount && got<numwant; ++i )
1332    {
1333        struct weighted_piece * p = pieces + i;
1334
1335        /* if the peer has this piece that we want... */
1336        if( tr_bitsetHasFast( have, p->index ) )
1337        {
1338            tr_block_index_t b = tr_torPieceFirstBlock( tor, p->index );
1339            const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, p->index );
1340            tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1341
1342            for( ; b!=e && got<numwant; ++b )
1343            {
1344                int peerCount;
1345                tr_peer ** peers;
1346
1347                /* don't request blocks we've already got */
1348                if( tr_cpBlockIsCompleteFast( &tor->completion, b ) )
1349                    continue;
1350
1351                /* always add peer if this block has no peers yet */
1352                tr_ptrArrayClear( &peerArr );
1353                getBlockRequestPeers( t, b, &peerArr );
1354                peers = (tr_peer **) tr_ptrArrayPeek( &peerArr, &peerCount );
1355                if( peerCount != 0 )
1356                {
1357                    /* don't make a second block request until the endgame */
1358                    if( !t->endgame )
1359                        continue;
1360
1361                    /* don't have more than two peers requesting this block */
1362                    if( peerCount > 1 )
1363                        continue;
1364
1365                    /* don't send the same request to the same peer twice */
1366                    if( peer == peers[0] )
1367                        continue;
1368
1369                    /* in the endgame allow an additional peer to download a
1370                       block but only if the peer seems to be handling requests
1371                       relatively fast */
1372                    if( peer->pendingReqsToPeer + numwant - got < t->endgame )
1373                        continue;
1374                }
1375
1376                /* update the caller's table */
1377                setme[got++] = b;
1378
1379                /* update our own tables */
1380                requestListAdd( t, b, peer );
1381                ++p->requestCount;
1382            }
1383
1384            tr_ptrArrayDestruct( &peerArr, NULL );
1385        }
1386    }
1387
1388    /* In most cases we've just changed the weights of a small number of pieces.
1389     * So rather than qsort()ing the entire array, it's faster to apply an
1390     * adaptive insertion sort algorithm. */
1391    if( got > 0 )
1392    {
1393        /* not enough requests || last piece modified */
1394        if ( i == t->pieceCount ) --i;
1395
1396        setComparePieceByWeightTorrent( t );
1397        while( --i >= 0 )
1398        {
1399            tr_bool exact;
1400
1401            /* relative position! */
1402            const int newpos = tr_lowerBound( &t->pieces[i], &t->pieces[i + 1],
1403                                              t->pieceCount - (i + 1),
1404                                              sizeof( struct weighted_piece ),
1405                                              comparePieceByWeight, &exact );
1406            if( newpos > 0 )
1407            {
1408                const struct weighted_piece piece = t->pieces[i];
1409                memmove( &t->pieces[i],
1410                         &t->pieces[i + 1],
1411                         sizeof( struct weighted_piece ) * ( newpos ) );
1412                t->pieces[i + newpos] = piece;
1413            }
1414        }
1415    }
1416
1417    assertWeightedPiecesAreSorted( t );
1418    *numgot = got;
1419}
1420
1421tr_bool
1422tr_peerMgrDidPeerRequest( const tr_torrent  * tor,
1423                          const tr_peer     * peer,
1424                          tr_block_index_t    block )
1425{
1426    const Torrent * t = tor->torrentPeers;
1427    return requestListLookup( (Torrent*)t, block, peer ) != NULL;
1428}
1429
1430/* cancel requests that are too old */
1431static void
1432refillUpkeep( int foo UNUSED, short bar UNUSED, void * vmgr )
1433{
1434    time_t now;
1435    time_t too_old;
1436    tr_torrent * tor;
1437    tr_peerMgr * mgr = vmgr;
1438    managerLock( mgr );
1439
1440    now = tr_time( );
1441    too_old = now - REQUEST_TTL_SECS;
1442
1443    tor = NULL;
1444    while(( tor = tr_torrentNext( mgr->session, tor )))
1445    {
1446        Torrent * t = tor->torrentPeers;
1447        const int n = t->requestCount;
1448        if( n > 0 )
1449        {
1450            int keepCount = 0;
1451            int cancelCount = 0;
1452            struct block_request * cancel = tr_new( struct block_request, n );
1453            const struct block_request * it;
1454            const struct block_request * end;
1455
1456            for( it=t->requests, end=it+n; it!=end; ++it )
1457            {
1458                if( ( it->sentAt <= too_old ) && it->peer->msgs && !tr_peerMsgsIsReadingBlock( it->peer->msgs, it->block ) )
1459                    cancel[cancelCount++] = *it;
1460                else
1461                {
1462                    if( it != &t->requests[keepCount] )
1463                        t->requests[keepCount] = *it;
1464                    keepCount++;
1465                }
1466            }
1467
1468            /* prune out the ones we aren't keeping */
1469            t->requestCount = keepCount;
1470
1471            /* send cancel messages for all the "cancel" ones */
1472            for( it=cancel, end=it+cancelCount; it!=end; ++it ) {
1473                if( ( it->peer != NULL ) && ( it->peer->msgs != NULL ) ) {
1474                    tr_historyAdd( it->peer->cancelsSentToPeer, now, 1 );
1475                    tr_peerMsgsCancel( it->peer->msgs, it->block );
1476                    decrementPendingReqCount( it );
1477                }
1478            }
1479
1480            /* decrement the pending request counts for the timed-out blocks */
1481            for( it=cancel, end=it+cancelCount; it!=end; ++it )
1482                pieceListRemoveRequest( t, it->block );
1483
1484            /* cleanup loop */
1485            tr_free( cancel );
1486        }
1487    }
1488
1489    tr_timerAddMsec( mgr->refillUpkeepTimer, REFILL_UPKEEP_PERIOD_MSEC );
1490    managerUnlock( mgr );
1491}
1492
1493static void
1494addStrike( Torrent * t, tr_peer * peer )
1495{
1496    tordbg( t, "increasing peer %s strike count to %d",
1497            tr_atomAddrStr( peer->atom ), peer->strikes + 1 );
1498
1499    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
1500    {
1501        struct peer_atom * atom = peer->atom;
1502        atom->flags2 |= MYFLAG_BANNED;
1503        peer->doPurge = 1;
1504        tordbg( t, "banning peer %s", tr_atomAddrStr( atom ) );
1505    }
1506}
1507
1508static void
1509gotBadPiece( Torrent * t, tr_piece_index_t pieceIndex )
1510{
1511    tr_torrent *   tor = t->tor;
1512    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
1513
1514    tor->corruptCur += byteCount;
1515    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1516
1517    tr_announcerAddBytes( tor, TR_ANN_CORRUPT, byteCount );
1518}
1519
1520static void
1521peerSuggestedPiece( Torrent            * t UNUSED,
1522                    tr_peer            * peer UNUSED,
1523                    tr_piece_index_t     pieceIndex UNUSED,
1524                    int                  isFastAllowed UNUSED )
1525{
1526#if 0
1527    assert( t );
1528    assert( peer );
1529    assert( peer->msgs );
1530
1531    /* is this a valid piece? */
1532    if(  pieceIndex >= t->tor->info.pieceCount )
1533        return;
1534
1535    /* don't ask for it if we've already got it */
1536    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
1537        return;
1538
1539    /* don't ask for it if they don't have it */
1540    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
1541        return;
1542
1543    /* don't ask for it if we're choked and it's not fast */
1544    if( !isFastAllowed && peer->clientIsChoked )
1545        return;
1546
1547    /* request the blocks that we don't have in this piece */
1548    {
1549        tr_block_index_t block;
1550        const tr_torrent * tor = t->tor;
1551        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
1552        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
1553
1554        for( block=start; block<end; ++block )
1555        {
1556            if( !tr_cpBlockIsComplete( tor->completion, block ) )
1557            {
1558                const uint32_t offset = getBlockOffsetInPiece( tor, block );
1559                const uint32_t length = tr_torBlockCountBytes( tor, block );
1560                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
1561                incrementPieceRequests( t, pieceIndex );
1562            }
1563        }
1564    }
1565#endif
1566}
1567
1568static void
1569removeRequestFromTables( Torrent * t, tr_block_index_t block, const tr_peer * peer )
1570{
1571    requestListRemove( t, block, peer );
1572    pieceListRemoveRequest( t, block );
1573}
1574
1575/* peer choked us, or maybe it disconnected.
1576   either way we need to remove all its requests */
1577static void
1578peerDeclinedAllRequests( Torrent * t, const tr_peer * peer )
1579{
1580    int i, n;
1581    tr_block_index_t * blocks = tr_new( tr_block_index_t, t->requestCount );
1582
1583    for( i=n=0; i<t->requestCount; ++i )
1584        if( peer == t->requests[i].peer )
1585            blocks[n++] = t->requests[i].block;
1586
1587    for( i=0; i<n; ++i )
1588        removeRequestFromTables( t, blocks[i], peer );
1589
1590    tr_free( blocks );
1591}
1592
1593static void
1594peerCallbackFunc( tr_peer * peer, const tr_peer_event * e, void * vt )
1595{
1596    Torrent * t = vt;
1597
1598    torrentLock( t );
1599
1600    switch( e->eventType )
1601    {
1602        case TR_PEER_PEER_GOT_DATA:
1603        {
1604            const time_t now = tr_time( );
1605            tr_torrent * tor = t->tor;
1606
1607            if( e->wasPieceData )
1608            {
1609                tor->uploadedCur += e->length;
1610                tr_announcerAddBytes( tor, TR_ANN_UP, e->length );
1611                tr_torrentSetActivityDate( tor, now );
1612                tr_torrentSetDirty( tor );
1613            }
1614
1615            /* update the stats */
1616            if( e->wasPieceData )
1617                tr_statsAddUploaded( tor->session, e->length );
1618
1619            /* update our atom */
1620            if( peer && e->wasPieceData )
1621                peer->atom->piece_data_time = now;
1622
1623            break;
1624        }
1625
1626        case TR_PEER_CLIENT_GOT_HAVE:
1627            if( replicationExists( t ) ) {
1628                tr_incrReplicationOfPiece( t, e->pieceIndex );
1629                assertReplicationCountIsExact( t );
1630            }
1631            break;
1632
1633        case TR_PEER_CLIENT_GOT_HAVE_ALL:
1634            if( replicationExists( t ) ) {
1635                tr_incrReplication( t );
1636                assertReplicationCountIsExact( t );
1637            }
1638            break;
1639
1640        case TR_PEER_CLIENT_GOT_HAVE_NONE:
1641            /* noop */
1642            break;
1643
1644        case TR_PEER_CLIENT_GOT_BITFIELD:
1645            assert( e->bitfield != NULL );
1646            if( replicationExists( t ) ) {
1647                tr_incrReplicationFromBitfield( t, e->bitfield );
1648                assertReplicationCountIsExact( t );
1649            }
1650            break;
1651
1652        case TR_PEER_CLIENT_GOT_REJ:
1653            removeRequestFromTables( t, _tr_block( t->tor, e->pieceIndex, e->offset ), peer );
1654            break;
1655
1656        case TR_PEER_CLIENT_GOT_CHOKE:
1657            peerDeclinedAllRequests( t, peer );
1658            break;
1659
1660        case TR_PEER_CLIENT_GOT_PORT:
1661            if( peer )
1662                peer->atom->port = e->port;
1663            break;
1664
1665        case TR_PEER_CLIENT_GOT_SUGGEST:
1666            if( peer )
1667                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1668            break;
1669
1670        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1671            if( peer )
1672                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1673            break;
1674
1675        case TR_PEER_CLIENT_GOT_DATA:
1676        {
1677            const time_t now = tr_time( );
1678            tr_torrent * tor = t->tor;
1679
1680            if( e->wasPieceData )
1681            {
1682                tor->downloadedCur += e->length;
1683                tr_torrentSetActivityDate( tor, now );
1684                tr_torrentSetDirty( tor );
1685            }
1686
1687            /* update the stats */
1688            if( e->wasPieceData )
1689                tr_statsAddDownloaded( tor->session, e->length );
1690
1691            /* update our atom */
1692            if( peer && peer->atom && e->wasPieceData )
1693                peer->atom->piece_data_time = now;
1694
1695            break;
1696        }
1697
1698        case TR_PEER_PEER_PROGRESS:
1699        {
1700            if( peer )
1701            {
1702                struct peer_atom * atom = peer->atom;
1703                if( e->progress >= 1.0 ) {
1704                    tordbg( t, "marking peer %s as a seed", tr_atomAddrStr( atom ) );
1705                    atomSetSeed( atom );
1706                }
1707            }
1708            break;
1709        }
1710
1711        case TR_PEER_CLIENT_GOT_BLOCK:
1712        {
1713            tr_torrent * tor = t->tor;
1714            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1715            int i, peerCount;
1716            tr_peer ** peers;
1717            tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1718
1719            removeRequestFromTables( t, block, peer );
1720            getBlockRequestPeers( t, block, &peerArr );
1721            peers = (tr_peer **) tr_ptrArrayPeek( &peerArr, &peerCount );
1722
1723            /* remove additional block requests and send cancel to peers */
1724            for( i=0; i<peerCount; i++ ) {
1725                tr_peer * p = peers[i];
1726                assert( p != peer );
1727                if( p->msgs ) {
1728                    tr_historyAdd( p->cancelsSentToPeer, tr_time( ), 1 );
1729                    tr_peerMsgsCancel( p->msgs, block );
1730                }
1731                removeRequestFromTables( t, block, p );
1732            }
1733
1734            tr_ptrArrayDestruct( &peerArr, FALSE );
1735
1736            if( peer && peer->blocksSentToClient )
1737                tr_historyAdd( peer->blocksSentToClient, tr_time( ), 1 );
1738
1739            if( tr_cpBlockIsComplete( &tor->completion, block ) )
1740            {
1741                /* we already have this block... */
1742                const uint32_t n = tr_torBlockCountBytes( tor, block );
1743                tor->downloadedCur -= MIN( tor->downloadedCur, n );
1744                tordbg( t, "we have this block already..." );
1745            }
1746            else
1747            {
1748                tr_cpBlockAdd( &tor->completion, block );
1749                pieceListResortPiece( t, pieceListLookup( t, e->pieceIndex ) );
1750                tr_torrentSetDirty( tor );
1751
1752                if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1753                {
1754                    const tr_piece_index_t p = e->pieceIndex;
1755                    const tr_bool ok = tr_torrentCheckPiece( tor, p );
1756
1757                    tordbg( t, "[LAZY] checked just-completed piece %zu", (size_t)p );
1758
1759                    if( !ok )
1760                    {
1761                        tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1762                                   (unsigned long)p );
1763                    }
1764
1765                    tr_peerMgrSetBlame( tor, p, ok );
1766
1767                    if( !ok )
1768                    {
1769                        gotBadPiece( t, p );
1770                    }
1771                    else
1772                    {
1773                        int i;
1774                        int peerCount;
1775                        tr_peer ** peers;
1776                        tr_file_index_t fileIndex;
1777
1778                        /* only add this to downloadedCur if we got it from a peer --
1779                         * webseeds shouldn't count against our ratio. As one tracker
1780                         * admin put it, "Those pieces are downloaded directly from the
1781                         * content distributor, not the peers, it is the tracker's job
1782                         * to manage the swarms, not the web server and does not fit
1783                         * into the jurisdiction of the tracker." */
1784                        if( peer->msgs != NULL ) {
1785                            const uint32_t n = tr_torPieceCountBytes( tor, p );
1786                            tr_announcerAddBytes( tor, TR_ANN_DOWN, n );
1787                        }
1788
1789                        peerCount = tr_ptrArraySize( &t->peers );
1790                        peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1791                        for( i=0; i<peerCount; ++i )
1792                            tr_peerMsgsHave( peers[i]->msgs, p );
1793
1794                        for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex ) {
1795                            const tr_file * file = &tor->info.files[fileIndex];
1796                            if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) ) {
1797                                if( tr_cpFileIsComplete( &tor->completion, fileIndex ) ) {
1798                                    tr_cacheFlushFile( tor->session->cache, tor, fileIndex );
1799                                    tr_torrentFileCompleted( tor, fileIndex );
1800                                }
1801                            }
1802                        }
1803
1804                        pieceListRemovePiece( t, p );
1805                    }
1806                }
1807
1808                t->needsCompletenessCheck = TRUE;
1809            }
1810            break;
1811        }
1812
1813        case TR_PEER_ERROR:
1814            if( ( e->err == ERANGE ) || ( e->err == EMSGSIZE ) || ( e->err == ENOTCONN ) )
1815            {
1816                /* some protocol error from the peer */
1817                peer->doPurge = 1;
1818                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1819                        tr_atomAddrStr( peer->atom ) );
1820            }
1821            else
1822            {
1823                tordbg( t, "unhandled error: %s", tr_strerror( e->err ) );
1824            }
1825            break;
1826
1827        default:
1828            assert( 0 );
1829    }
1830
1831    torrentUnlock( t );
1832}
1833
1834static int
1835getDefaultShelfLife( uint8_t from )
1836{
1837    /* in general, peers obtained from firsthand contact
1838     * are better than those from secondhand, etc etc */
1839    switch( from )
1840    {
1841        case TR_PEER_FROM_INCOMING : return 60 * 60 * 6;
1842        case TR_PEER_FROM_LTEP     : return 60 * 60 * 6;
1843        case TR_PEER_FROM_TRACKER  : return 60 * 60 * 3;
1844        case TR_PEER_FROM_DHT      : return 60 * 60 * 3;
1845        case TR_PEER_FROM_PEX      : return 60 * 60 * 2;
1846        case TR_PEER_FROM_RESUME   : return 60 * 60;
1847        case TR_PEER_FROM_LPD      : return 10 * 60;
1848        default                    : return 60 * 60;
1849    }
1850}
1851
1852static void
1853ensureAtomExists( Torrent           * t,
1854                  const tr_address  * addr,
1855                  const tr_port       port,
1856                  const uint8_t       flags,
1857                  const int8_t        seedProbability,
1858                  const uint8_t       from )
1859{
1860    struct peer_atom * a;
1861
1862    assert( tr_isAddress( addr ) );
1863    assert( from < TR_PEER_FROM__MAX );
1864
1865    a = getExistingAtom( t, addr );
1866
1867    if( a == NULL )
1868    {
1869        const int jitter = tr_cryptoWeakRandInt( 60*10 );
1870        a = tr_new0( struct peer_atom, 1 );
1871        a->addr = *addr;
1872        a->port = port;
1873        a->flags = flags;
1874        a->fromFirst = from;
1875        a->fromBest = from;
1876        a->shelf_date = tr_time( ) + getDefaultShelfLife( from ) + jitter;
1877        a->blocklisted = -1;
1878        atomSetSeedProbability( a, seedProbability );
1879        tr_ptrArrayInsertSorted( &t->pool, a, compareAtomsByAddress );
1880
1881        tordbg( t, "got a new atom: %s", tr_atomAddrStr( a ) );
1882    }
1883    else
1884    {
1885        if( from < a->fromBest )
1886            a->fromBest = from;
1887       
1888        if( a->seedProbability == -1 )
1889            atomSetSeedProbability( a, seedProbability );
1890
1891        a->flags |= flags;
1892    }
1893}
1894
1895static int
1896getMaxPeerCount( const tr_torrent * tor )
1897{
1898    return tor->maxConnectedPeers;
1899}
1900
1901static int
1902getPeerCount( const Torrent * t )
1903{
1904    return tr_ptrArraySize( &t->peers );/* + tr_ptrArraySize( &t->outgoingHandshakes ); */
1905}
1906
1907/* FIXME: this is kind of a mess. */
1908static tr_bool
1909myHandshakeDoneCB( tr_handshake  * handshake,
1910                   tr_peerIo     * io,
1911                   tr_bool         readAnythingFromPeer,
1912                   tr_bool         isConnected,
1913                   const uint8_t * peer_id,
1914                   void          * vmanager )
1915{
1916    tr_bool            ok = isConnected;
1917    tr_bool            success = FALSE;
1918    tr_port            port;
1919    const tr_address * addr;
1920    tr_peerMgr       * manager = vmanager;
1921    Torrent          * t;
1922    tr_handshake     * ours;
1923
1924    assert( io );
1925    assert( tr_isBool( ok ) );
1926
1927    t = tr_peerIoHasTorrentHash( io )
1928        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1929        : NULL;
1930
1931    if( tr_peerIoIsIncoming ( io ) )
1932        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1933                                        handshake, handshakeCompare );
1934    else if( t )
1935        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1936                                        handshake, handshakeCompare );
1937    else
1938        ours = handshake;
1939
1940    assert( ours );
1941    assert( ours == handshake );
1942
1943    if( t )
1944        torrentLock( t );
1945
1946    addr = tr_peerIoGetAddress( io, &port );
1947
1948    if( !ok || !t || !t->isRunning )
1949    {
1950        if( t )
1951        {
1952            struct peer_atom * atom = getExistingAtom( t, addr );
1953            if( atom )
1954            {
1955                ++atom->numFails;
1956
1957                if( !readAnythingFromPeer )
1958                {
1959                    tordbg( t, "marking peer %s as unreachable... numFails is %d", tr_atomAddrStr( atom ), (int)atom->numFails );
1960                    atom->flags2 |= MYFLAG_UNREACHABLE;
1961                }
1962            }
1963        }
1964    }
1965    else /* looking good */
1966    {
1967        struct peer_atom * atom;
1968
1969        ensureAtomExists( t, addr, port, 0, -1, TR_PEER_FROM_INCOMING );
1970        atom = getExistingAtom( t, addr );
1971        atom->time = tr_time( );
1972        atom->piece_data_time = 0;
1973        atom->lastConnectionAt = tr_time( );
1974
1975        if( !tr_peerIoIsIncoming( io ) )
1976        {
1977            atom->flags |= ADDED_F_CONNECTABLE;
1978            atom->flags2 &= ~MYFLAG_UNREACHABLE;
1979        }
1980
1981        /* In principle, this flag specifies whether the peer groks uTP,
1982           not whether it's currently connected over uTP. */
1983        if( io->utp_socket )
1984            atom->flags |= ADDED_F_UTP_FLAGS;
1985
1986        if( atom->flags2 & MYFLAG_BANNED )
1987        {
1988            tordbg( t, "banned peer %s tried to reconnect",
1989                    tr_atomAddrStr( atom ) );
1990        }
1991        else if( tr_peerIoIsIncoming( io )
1992               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1993
1994        {
1995        }
1996        else
1997        {
1998            tr_peer * peer = atom->peer;
1999
2000            if( peer ) /* we already have this peer */
2001            {
2002            }
2003            else
2004            {
2005                peer = getPeer( t, atom );
2006                tr_free( peer->client );
2007
2008                if( !peer_id )
2009                    peer->client = NULL;
2010                else {
2011                    char client[128];
2012                    tr_clientForId( client, sizeof( client ), peer_id );
2013                    peer->client = tr_strdup( client );
2014                }
2015
2016                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
2017                                                                balanced by our unref in peerDestructor()  */
2018                tr_peerIoSetParent( peer->io, t->tor->bandwidth );
2019                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t );
2020
2021                success = TRUE;
2022            }
2023        }
2024    }
2025
2026    if( t )
2027        torrentUnlock( t );
2028
2029    return success;
2030}
2031
2032void
2033tr_peerMgrAddIncoming( tr_peerMgr * manager,
2034                       tr_address * addr,
2035                       tr_port      port,
2036                       int          socket,
2037                       struct UTPSocket * utp_socket )
2038{
2039    tr_session * session;
2040
2041    managerLock( manager );
2042
2043    assert( tr_isSession( manager->session ) );
2044    session = manager->session;
2045
2046    if( tr_sessionIsAddressBlocked( session, addr ) )
2047    {
2048        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
2049        if(socket >= 0)
2050            tr_netClose( session, socket );
2051        else
2052            UTP_Close( utp_socket );
2053    }
2054    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
2055    {
2056        if(socket >= 0)
2057            tr_netClose( session, socket );
2058        else
2059            UTP_Close( utp_socket );
2060    }
2061    else /* we don't have a connection to them yet... */
2062    {
2063        tr_peerIo *    io;
2064        tr_handshake * handshake;
2065
2066        io = tr_peerIoNewIncoming( session, session->bandwidth, addr, port, socket, utp_socket );
2067
2068        handshake = tr_handshakeNew( io,
2069                                     session->encryptionMode,
2070                                     myHandshakeDoneCB,
2071                                     manager );
2072
2073        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
2074
2075        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
2076                                 handshakeCompare );
2077    }
2078
2079    managerUnlock( manager );
2080}
2081
2082void
2083tr_peerMgrAddPex( tr_torrent * tor, uint8_t from,
2084                  const tr_pex * pex, int8_t seedProbability )
2085{
2086    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
2087    {
2088        Torrent * t = tor->torrentPeers;
2089        managerLock( t->manager );
2090
2091        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
2092            if( tr_isValidPeerAddress( &pex->addr, pex->port ) )
2093                ensureAtomExists( t, &pex->addr, pex->port, pex->flags, seedProbability, from );
2094
2095        managerUnlock( t->manager );
2096    }
2097}
2098
2099void
2100tr_peerMgrMarkAllAsSeeds( tr_torrent * tor )
2101{
2102    Torrent * t = tor->torrentPeers;
2103    const int n = tr_ptrArraySize( &t->pool );
2104    struct peer_atom ** it = (struct peer_atom**) tr_ptrArrayBase( &t->pool );
2105    struct peer_atom ** end = it + n;
2106
2107    while( it != end )
2108        atomSetSeed( *it++ );
2109}
2110
2111tr_pex *
2112tr_peerMgrCompactToPex( const void *    compact,
2113                        size_t          compactLen,
2114                        const uint8_t * added_f,
2115                        size_t          added_f_len,
2116                        size_t *        pexCount )
2117{
2118    size_t          i;
2119    size_t          n = compactLen / 6;
2120    const uint8_t * walk = compact;
2121    tr_pex *        pex = tr_new0( tr_pex, n );
2122
2123    for( i = 0; i < n; ++i )
2124    {
2125        pex[i].addr.type = TR_AF_INET;
2126        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
2127        memcpy( &pex[i].port, walk, 2 ); walk += 2;
2128        if( added_f && ( n == added_f_len ) )
2129            pex[i].flags = added_f[i];
2130    }
2131
2132    *pexCount = n;
2133    return pex;
2134}
2135
2136tr_pex *
2137tr_peerMgrCompact6ToPex( const void    * compact,
2138                         size_t          compactLen,
2139                         const uint8_t * added_f,
2140                         size_t          added_f_len,
2141                         size_t        * pexCount )
2142{
2143    size_t          i;
2144    size_t          n = compactLen / 18;
2145    const uint8_t * walk = compact;
2146    tr_pex *        pex = tr_new0( tr_pex, n );
2147
2148    for( i = 0; i < n; ++i )
2149    {
2150        pex[i].addr.type = TR_AF_INET6;
2151        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
2152        memcpy( &pex[i].port, walk, 2 ); walk += 2;
2153        if( added_f && ( n == added_f_len ) )
2154            pex[i].flags = added_f[i];
2155    }
2156
2157    *pexCount = n;
2158    return pex;
2159}
2160
2161tr_pex *
2162tr_peerMgrArrayToPex( const void * array,
2163                      size_t       arrayLen,
2164                      size_t      * pexCount )
2165{
2166    size_t          i;
2167    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
2168    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
2169    const uint8_t * walk = array;
2170    tr_pex        * pex = tr_new0( tr_pex, n );
2171
2172    for( i = 0 ; i < n ; i++ ) {
2173        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
2174        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
2175        pex[i].flags = 0x00;
2176        walk += sizeof( tr_address ) + 2;
2177    }
2178
2179    *pexCount = n;
2180    return pex;
2181}
2182
2183/**
2184***
2185**/
2186
2187void
2188tr_peerMgrSetBlame( tr_torrent     * tor,
2189                    tr_piece_index_t pieceIndex,
2190                    int              success )
2191{
2192    if( !success )
2193    {
2194        int        peerCount, i;
2195        Torrent *  t = tor->torrentPeers;
2196        tr_peer ** peers;
2197
2198        assert( torrentIsLocked( t ) );
2199
2200        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
2201        for( i = 0; i < peerCount; ++i )
2202        {
2203            tr_peer * peer = peers[i];
2204            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
2205            {
2206                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
2207                        tr_atomAddrStr( peer->atom ),
2208                        pieceIndex, (int)peer->strikes + 1 );
2209                addStrike( t, peer );
2210            }
2211        }
2212    }
2213}
2214
2215int
2216tr_pexCompare( const void * va, const void * vb )
2217{
2218    const tr_pex * a = va;
2219    const tr_pex * b = vb;
2220    int i;
2221
2222    assert( tr_isPex( a ) );
2223    assert( tr_isPex( b ) );
2224
2225    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
2226        return i;
2227
2228    if( a->port != b->port )
2229        return a->port < b->port ? -1 : 1;
2230
2231    return 0;
2232}
2233
2234#if 0
2235static int
2236peerPrefersCrypto( const tr_peer * peer )
2237{
2238    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
2239        return TRUE;
2240
2241    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
2242        return FALSE;
2243
2244    return tr_peerIoIsEncrypted( peer->io );
2245}
2246#endif
2247
2248/* better goes first */
2249static int
2250compareAtomsByUsefulness( const void * va, const void *vb )
2251{
2252    const struct peer_atom * a = * (const struct peer_atom**) va;
2253    const struct peer_atom * b = * (const struct peer_atom**) vb;
2254
2255    assert( tr_isAtom( a ) );
2256    assert( tr_isAtom( b ) );
2257
2258    if( a->piece_data_time != b->piece_data_time )
2259        return a->piece_data_time > b->piece_data_time ? -1 : 1;
2260    if( a->fromBest != b->fromBest )
2261        return a->fromBest < b->fromBest ? -1 : 1;
2262    if( a->numFails != b->numFails )
2263        return a->numFails < b->numFails ? -1 : 1;
2264
2265    return 0;
2266}
2267
2268int
2269tr_peerMgrGetPeers( tr_torrent   * tor,
2270                    tr_pex      ** setme_pex,
2271                    uint8_t        af,
2272                    uint8_t        list_mode,
2273                    int            maxCount )
2274{
2275    int i;
2276    int n;
2277    int count = 0;
2278    int atomCount = 0;
2279    const Torrent * t = tor->torrentPeers;
2280    struct peer_atom ** atoms = NULL;
2281    tr_pex * pex;
2282    tr_pex * walk;
2283
2284    assert( tr_isTorrent( tor ) );
2285    assert( setme_pex != NULL );
2286    assert( af==TR_AF_INET || af==TR_AF_INET6 );
2287    assert( list_mode==TR_PEERS_CONNECTED || list_mode==TR_PEERS_ALL );
2288
2289    managerLock( t->manager );
2290
2291    /**
2292    ***  build a list of atoms
2293    **/
2294
2295    if( list_mode == TR_PEERS_CONNECTED ) /* connected peers only */
2296    {
2297        int i;
2298        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2299        atomCount = tr_ptrArraySize( &t->peers );
2300        atoms = tr_new( struct peer_atom *, atomCount );
2301        for( i=0; i<atomCount; ++i )
2302            atoms[i] = peers[i]->atom;
2303    }
2304    else /* TR_PEERS_ALL */
2305    {
2306        const struct peer_atom ** atomsBase = (const struct peer_atom**) tr_ptrArrayBase( &t->pool );
2307        atomCount = tr_ptrArraySize( &t->pool );
2308        atoms = tr_memdup( atomsBase, atomCount * sizeof( struct peer_atom * ) );
2309    }
2310
2311    qsort( atoms, atomCount, sizeof( struct peer_atom * ), compareAtomsByUsefulness );
2312
2313    /**
2314    ***  add the first N of them into our return list
2315    **/
2316
2317    n = MIN( atomCount, maxCount );
2318    pex = walk = tr_new0( tr_pex, n );
2319
2320    for( i=0; i<atomCount && count<n; ++i )
2321    {
2322        const struct peer_atom * atom = atoms[i];
2323        if( atom->addr.type == af )
2324        {
2325            assert( tr_isAddress( &atom->addr ) );
2326            walk->addr = atom->addr;
2327            walk->port = atom->port;
2328            walk->flags = atom->flags;
2329            ++count;
2330            ++walk;
2331        }
2332    }
2333
2334    qsort( pex, count, sizeof( tr_pex ), tr_pexCompare );
2335
2336    assert( ( walk - pex ) == count );
2337    *setme_pex = pex;
2338
2339    /* cleanup */
2340    tr_free( atoms );
2341    managerUnlock( t->manager );
2342    return count;
2343}
2344
2345static void atomPulse      ( int, short, void * );
2346static void bandwidthPulse ( int, short, void * );
2347static void rechokePulse   ( int, short, void * );
2348static void reconnectPulse ( int, short, void * );
2349
2350static struct event *
2351createTimer( tr_session * session, int msec, void (*callback)(int, short, void *), void * cbdata )
2352{
2353    struct event * timer = evtimer_new( session->event_base, callback, cbdata );
2354    tr_timerAddMsec( timer, msec );
2355    return timer;
2356}
2357
2358static void
2359ensureMgrTimersExist( struct tr_peerMgr * m )
2360{
2361    if( m->atomTimer == NULL )
2362        m->atomTimer = createTimer( m->session, ATOM_PERIOD_MSEC, atomPulse, m );
2363
2364    if( m->bandwidthTimer == NULL )
2365        m->bandwidthTimer = createTimer( m->session, BANDWIDTH_PERIOD_MSEC, bandwidthPulse, m );
2366
2367    if( m->rechokeTimer == NULL )
2368        m->rechokeTimer = createTimer( m->session, RECHOKE_PERIOD_MSEC, rechokePulse, m );
2369
2370    if( m->refillUpkeepTimer == NULL )
2371        m->refillUpkeepTimer = createTimer( m->session, REFILL_UPKEEP_PERIOD_MSEC, refillUpkeep, m );
2372}
2373
2374void
2375tr_peerMgrStartTorrent( tr_torrent * tor )
2376{
2377    Torrent * t = tor->torrentPeers;
2378
2379    assert( tr_isTorrent( tor ) );
2380    assert( tr_torrentIsLocked( tor ) );
2381
2382    ensureMgrTimersExist( t->manager );
2383
2384    t->isRunning = TRUE;
2385    t->maxPeers = t->tor->maxConnectedPeers;
2386    t->pieceSortState = PIECES_UNSORTED;
2387
2388    rechokePulse( 0, 0, t->manager );
2389}
2390
2391static void
2392stopTorrent( Torrent * t )
2393{
2394    int i, n;
2395
2396    t->isRunning = FALSE;
2397
2398    replicationFree( t );
2399    invalidatePieceSorting( t );
2400
2401    /* disconnect the peers. */
2402    for( i=0, n=tr_ptrArraySize( &t->peers ); i<n; ++i )
2403        peerDestructor( t, tr_ptrArrayNth( &t->peers, i ) );
2404    tr_ptrArrayClear( &t->peers );
2405
2406    /* disconnect the handshakes. handshakeAbort calls handshakeDoneCB(),
2407     * which removes the handshake from t->outgoingHandshakes... */
2408    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
2409        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
2410}
2411
2412void
2413tr_peerMgrStopTorrent( tr_torrent * tor )
2414{
2415    assert( tr_isTorrent( tor ) );
2416    assert( tr_torrentIsLocked( tor ) );
2417
2418    stopTorrent( tor->torrentPeers );
2419}
2420
2421void
2422tr_peerMgrAddTorrent( tr_peerMgr * manager, tr_torrent * tor )
2423{
2424    assert( tr_isTorrent( tor ) );
2425    assert( tr_torrentIsLocked( tor ) );
2426    assert( tor->torrentPeers == NULL );
2427
2428    tor->torrentPeers = torrentConstructor( manager, tor );
2429}
2430
2431void
2432tr_peerMgrRemoveTorrent( tr_torrent * tor )
2433{
2434    assert( tr_isTorrent( tor ) );
2435    assert( tr_torrentIsLocked( tor ) );
2436
2437    stopTorrent( tor->torrentPeers );
2438    torrentDestructor( tor->torrentPeers );
2439}
2440
2441void
2442tr_peerMgrTorrentAvailability( const tr_torrent * tor, int8_t * tab, unsigned int tabCount )
2443{
2444    assert( tr_isTorrent( tor ) );
2445    assert( torrentIsLocked( tor->torrentPeers ) );
2446    assert( tab != NULL );
2447    assert( tabCount > 0 );
2448
2449    memset( tab, 0, tabCount );
2450
2451    if( tr_torrentHasMetadata( tor ) )
2452    {
2453        tr_piece_index_t i;
2454        const int peerCount = tr_ptrArraySize( &tor->torrentPeers->peers );
2455        const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &tor->torrentPeers->peers );
2456        const float interval = tor->info.pieceCount / (float)tabCount;
2457        const tr_bool isSeed = tr_cpGetStatus( &tor->completion ) == TR_SEED;
2458
2459        for( i=0; i<tabCount; ++i )
2460        {
2461            const int piece = i * interval;
2462
2463            if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
2464                tab[i] = -1;
2465            else if( peerCount ) {
2466                int j;
2467                for( j=0; j<peerCount; ++j )
2468                    if( tr_bitsetHas( &peers[j]->have, piece ) )
2469                        ++tab[i];
2470            }
2471        }
2472    }
2473}
2474
2475/* Returns the pieces that are available from peers */
2476tr_bitfield*
2477tr_peerMgrGetAvailable( const tr_torrent * tor )
2478{
2479    int i;
2480    Torrent * t = tor->torrentPeers;
2481    const int peerCount = tr_ptrArraySize( &t->peers );
2482    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2483    tr_bitfield * pieces = tr_bitfieldNew( t->tor->info.pieceCount );
2484
2485    assert( tr_torrentIsLocked( tor ) );
2486
2487    for( i=0; i<peerCount; ++i )
2488        tr_bitsetOr( pieces, &peers[i]->have );
2489
2490    return pieces;
2491}
2492
2493void
2494tr_peerMgrTorrentStats( tr_torrent  * tor,
2495                        int         * setmePeersKnown,
2496                        int         * setmePeersConnected,
2497                        int         * setmeSeedsConnected,
2498                        int         * setmeWebseedsSendingToUs,
2499                        int         * setmePeersSendingToUs,
2500                        int         * setmePeersGettingFromUs,
2501                        int         * setmePeersFrom )
2502{
2503    int i, size;
2504    const Torrent * t = tor->torrentPeers;
2505    const tr_peer ** peers;
2506
2507    assert( tr_torrentIsLocked( tor ) );
2508
2509    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2510    size = tr_ptrArraySize( &t->peers );
2511
2512    *setmePeersKnown           = tr_ptrArraySize( &t->pool );
2513    *setmePeersConnected       = 0;
2514    *setmeSeedsConnected       = 0;
2515    *setmePeersGettingFromUs   = 0;
2516    *setmePeersSendingToUs     = 0;
2517    *setmeWebseedsSendingToUs  = 0;
2518
2519    for( i=0; i<TR_PEER_FROM__MAX; ++i )
2520        setmePeersFrom[i] = 0;
2521
2522    for( i=0; i<size; ++i )
2523    {
2524        const tr_peer * peer = peers[i];
2525        const struct peer_atom * atom = peer->atom;
2526
2527        if( peer->io == NULL ) /* not connected */
2528            continue;
2529
2530        ++*setmePeersConnected;
2531
2532        ++setmePeersFrom[atom->fromFirst];
2533
2534        if( clientIsDownloadingFrom( tor, peer ) )
2535            ++*setmePeersSendingToUs;
2536
2537        if( clientIsUploadingTo( peer ) )
2538            ++*setmePeersGettingFromUs;
2539
2540        if( atomIsSeed( atom ) )
2541            ++*setmeSeedsConnected;
2542    }
2543
2544    *setmeWebseedsSendingToUs = countActiveWebseeds( t );
2545}
2546
2547int
2548tr_peerMgrGetWebseedSpeed_Bps( const tr_torrent * tor, uint64_t now )
2549{
2550    int i;
2551    int tmp;
2552    int ret = 0;
2553
2554    const Torrent * t = tor->torrentPeers;
2555    const int n = tr_ptrArraySize( &t->webseeds );
2556    const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2557
2558    for( i=0; i<n; ++i )
2559        if( tr_webseedGetSpeed_Bps( webseeds[i], now, &tmp ) )
2560            ret += tmp;
2561
2562    return ret;
2563}
2564
2565
2566double*
2567tr_peerMgrWebSpeeds_KBps( const tr_torrent * tor )
2568{
2569    int i;
2570    const Torrent * t = tor->torrentPeers;
2571    const int webseedCount = tr_ptrArraySize( &t->webseeds );
2572    const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2573    const uint64_t now = tr_time_msec( );
2574    double * ret = tr_new0( double, webseedCount );
2575
2576    assert( tr_isTorrent( tor ) );
2577    assert( tr_torrentIsLocked( tor ) );
2578    assert( t->manager != NULL );
2579    assert( webseedCount == tor->info.webseedCount );
2580
2581    for( i=0; i<webseedCount; ++i ) {
2582        int Bps;
2583        if( tr_webseedGetSpeed_Bps( webseeds[i], now, &Bps ) )
2584            ret[i] = Bps / (double)tr_speed_K;
2585        else
2586            ret[i] = -1.0;
2587    }
2588
2589    return ret;
2590}
2591
2592int
2593tr_peerGetPieceSpeed_Bps( const tr_peer * peer, uint64_t now, tr_direction direction )
2594{
2595    return peer->io ? tr_peerIoGetPieceSpeed_Bps( peer->io, now, direction ) : 0.0;
2596}
2597
2598
2599struct tr_peer_stat *
2600tr_peerMgrPeerStats( const tr_torrent * tor, int * setmeCount )
2601{
2602    int i;
2603    const Torrent * t = tor->torrentPeers;
2604    const int size = tr_ptrArraySize( &t->peers );
2605    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2606    const uint64_t now_msec = tr_time_msec( );
2607    const time_t now = tr_time();
2608    tr_peer_stat * ret = tr_new0( tr_peer_stat, size );
2609
2610    assert( tr_isTorrent( tor ) );
2611    assert( tr_torrentIsLocked( tor ) );
2612    assert( t->manager );
2613
2614    for( i=0; i<size; ++i )
2615    {
2616        char *                   pch;
2617        const tr_peer *          peer = peers[i];
2618        const struct peer_atom * atom = peer->atom;
2619        tr_peer_stat *           stat = ret + i;
2620
2621        tr_ntop( &atom->addr, stat->addr, sizeof( stat->addr ) );
2622        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
2623                   sizeof( stat->client ) );
2624        stat->port                = ntohs( peer->atom->port );
2625        stat->from                = atom->fromFirst;
2626        stat->progress            = peer->progress;
2627        stat->isUTP               = peer->io->utp_socket != NULL;
2628        stat->isEncrypted         = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
2629        stat->rateToPeer_KBps     = toSpeedKBps( tr_peerGetPieceSpeed_Bps( peer, now_msec, TR_CLIENT_TO_PEER ) );
2630        stat->rateToClient_KBps   = toSpeedKBps( tr_peerGetPieceSpeed_Bps( peer, now_msec, TR_PEER_TO_CLIENT ) );
2631        stat->peerIsChoked        = peer->peerIsChoked;
2632        stat->peerIsInterested    = peer->peerIsInterested;
2633        stat->clientIsChoked      = peer->clientIsChoked;
2634        stat->clientIsInterested  = peer->clientIsInterested;
2635        stat->isIncoming          = tr_peerIoIsIncoming( peer->io );
2636        stat->isDownloadingFrom   = clientIsDownloadingFrom( tor, peer );
2637        stat->isUploadingTo       = clientIsUploadingTo( peer );
2638        stat->isSeed              = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
2639
2640        stat->blocksToPeer        = tr_historyGet( peer->blocksSentToPeer,    now, CANCEL_HISTORY_SEC );
2641        stat->blocksToClient      = tr_historyGet( peer->blocksSentToClient,  now, CANCEL_HISTORY_SEC );
2642        stat->cancelsToPeer       = tr_historyGet( peer->cancelsSentToPeer,   now, CANCEL_HISTORY_SEC );
2643        stat->cancelsToClient     = tr_historyGet( peer->cancelsSentToClient, now, CANCEL_HISTORY_SEC );
2644
2645        stat->pendingReqsToPeer   = peer->pendingReqsToPeer;
2646        stat->pendingReqsToClient = peer->pendingReqsToClient;
2647
2648        pch = stat->flagStr;
2649        if( stat->isUTP ) *pch++ = 'T';
2650        if( t->optimistic == peer ) *pch++ = 'O';
2651        if( stat->isDownloadingFrom ) *pch++ = 'D';
2652        else if( stat->clientIsInterested ) *pch++ = 'd';
2653        if( stat->isUploadingTo ) *pch++ = 'U';
2654        else if( stat->peerIsInterested ) *pch++ = 'u';
2655        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
2656        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
2657        if( stat->isEncrypted ) *pch++ = 'E';
2658        if( stat->from == TR_PEER_FROM_DHT ) *pch++ = 'H';
2659        else if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
2660        if( stat->isIncoming ) *pch++ = 'I';
2661        *pch = '\0';
2662    }
2663
2664    *setmeCount = size;
2665
2666    return ret;
2667}
2668
2669/**
2670***
2671**/
2672
2673void
2674tr_peerMgrClearInterest( tr_torrent * tor )
2675{
2676    int i;
2677    Torrent * t = tor->torrentPeers;
2678    const int peerCount = tr_ptrArraySize( &t->peers );
2679
2680    assert( tr_isTorrent( tor ) );
2681    assert( tr_torrentIsLocked( tor ) );
2682
2683    for( i=0; i<peerCount; ++i )
2684    {
2685        const tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2686        tr_peerMsgsSetInterested( peer->msgs, FALSE );
2687    }
2688}
2689
2690/* do we still want this piece and does the peer have it? */
2691static tr_bool
2692isPieceInteresting( const tr_torrent * tor, const tr_peer * peer, tr_piece_index_t index )
2693{
2694    return ( !tor->info.pieces[index].dnd ) /* we want it */
2695        && ( !tr_cpPieceIsComplete( &tor->completion, index ) )  /* we don't have it */
2696        && ( tr_bitsetHas( &peer->have, index ) ); /* peer has it */
2697}
2698
2699/* does this peer have any pieces that we want? */
2700static tr_bool
2701isPeerInteresting( const tr_torrent * tor, const tr_peer * peer )
2702{
2703    tr_piece_index_t i, n;
2704
2705    if ( tr_torrentIsSeed( tor ) )
2706        return FALSE;
2707
2708    if( !tr_torrentIsPieceTransferAllowed( tor, TR_PEER_TO_CLIENT ) )
2709        return FALSE;
2710
2711    for( i=0, n=tor->info.pieceCount; i<n; ++i )
2712        if( isPieceInteresting( tor, peer, i ) )
2713            return TRUE;
2714
2715    return FALSE;
2716}
2717
2718/* determines who we send "interested" messages to */
2719static void
2720rechokeDownloads( Torrent * t )
2721{
2722    int i;
2723    const time_t now = tr_time( );
2724    const int MIN_INTERESTING_PEERS = 5;
2725    const int peerCount = tr_ptrArraySize( &t->peers );
2726    int maxPeers;
2727
2728    int badCount         = 0;
2729    int goodCount        = 0;
2730    int untestedCount    = 0;
2731    tr_peer ** bad       = tr_new( tr_peer*, peerCount );
2732    tr_peer ** good      = tr_new( tr_peer*, peerCount );
2733    tr_peer ** untested  = tr_new( tr_peer*, peerCount );
2734
2735    /* decide how many peers to be interested in */
2736    {
2737        int blocks = 0;
2738        int cancels = 0;
2739        time_t timeSinceCancel;
2740
2741        /* Count up how many blocks & cancels each peer has.
2742         *
2743         * There are two situations where we send out cancels --
2744         *
2745         * 1. We've got unresponsive peers, which is handled by deciding
2746         *    -which- peers to be interested in.
2747         *
2748         * 2. We've hit our bandwidth cap, which is handled by deciding
2749         *    -how many- peers to be interested in.
2750         *
2751         * We're working on 2. here, so we need to ignore unresponsive
2752         * peers in our calculations lest they confuse Transmission into
2753         * thinking it's hit its bandwidth cap.
2754         */
2755        for( i=0; i<peerCount; ++i )
2756        {
2757            const tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2758            const int b = tr_historyGet( peer->blocksSentToClient, now, CANCEL_HISTORY_SEC );
2759            const int c = tr_historyGet( peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC );
2760
2761            if( b == 0 ) /* ignore unresponsive peers, as described above */
2762                continue;
2763
2764            blocks += b;
2765            cancels += c;
2766        }
2767
2768        if( cancels > 0 )
2769        {
2770            /* cancelRate: of the block requests we've recently made, the percentage we cancelled.
2771             * higher values indicate more congestion. */
2772            const double cancelRate = cancels / (double)(cancels + blocks);
2773            const double mult = 1 - MIN( cancelRate, 0.5 );
2774            maxPeers = t->interestedCount * mult;
2775            tordbg( t, "cancel rate is %.3f -- reducing the "
2776                       "number of peers we're interested in by %.0f percent",
2777                       cancelRate, mult * 100 );
2778            t->lastCancel = now;
2779        }
2780
2781        timeSinceCancel = now - t->lastCancel;
2782        if( timeSinceCancel )
2783        {
2784            const int maxIncrease = 15;
2785            const time_t maxHistory = 2 * CANCEL_HISTORY_SEC;
2786            const double mult = MIN( timeSinceCancel, maxHistory ) / (double) maxHistory;
2787            const int inc = maxIncrease * mult;
2788            maxPeers = t->maxPeers + inc;
2789            tordbg( t, "time since last cancel is %li -- increasing the "
2790                       "number of peers we're interested in by %d",
2791                       timeSinceCancel, inc );
2792        }
2793    }
2794
2795    /* don't let the previous section's number tweaking go too far... */
2796    if( maxPeers < MIN_INTERESTING_PEERS )
2797        maxPeers = MIN_INTERESTING_PEERS;
2798    if( maxPeers > t->tor->maxConnectedPeers )
2799        maxPeers = t->tor->maxConnectedPeers;
2800
2801    t->maxPeers = maxPeers;
2802
2803    /* separate the peers into "good" (ones with a low cancel-to-block ratio),
2804     * untested peers, and "bad" (ones with a high cancel-to-block ratio).
2805     * That's the order in which we'll choose who to show interest in */
2806    {
2807        /* Randomize the peer array so the peers in the three groups will be unsorted... */
2808        int n = peerCount;
2809        tr_peer ** peers = tr_memdup( tr_ptrArrayBase( &t->peers ), n * sizeof( tr_peer * ) );
2810
2811        while( n > 0 )
2812        {
2813            const int i = tr_cryptoWeakRandInt( n );
2814            tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2815
2816            if( !isPeerInteresting( t->tor, peer ) )
2817            {
2818                tr_peerMsgsSetInterested( peer->msgs, FALSE );
2819            }
2820            else
2821            {
2822                const int blocks = tr_historyGet( peer->blocksSentToClient, now, CANCEL_HISTORY_SEC );
2823                const int cancels = tr_historyGet( peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC );
2824
2825                if( !blocks && !cancels )
2826                    untested[untestedCount++] = peer;
2827                else if( !cancels )
2828                    good[goodCount++] = peer;
2829                else if( !blocks )
2830                    bad[badCount++] = peer;
2831                else if( ( cancels * 10 ) < blocks )
2832                    good[goodCount++] = peer;
2833                else
2834                    bad[badCount++] = peer;
2835            }
2836
2837            tr_removeElementFromArray( peers, i, sizeof(tr_peer*), n-- );
2838        }
2839
2840        tr_free( peers );
2841    }
2842
2843    t->interestedCount = 0;
2844
2845    /* We've decided (1) how many peers to be interested in,
2846     * and (2) which peers are the best candidates,
2847     * Now it's time to update our `interest' flags. */
2848    for( i=0; i<goodCount; ++i ) {
2849        const tr_bool b = t->interestedCount < maxPeers;
2850        tr_peerMsgsSetInterested( good[i]->msgs, b );
2851        if( b )
2852            ++t->interestedCount;
2853    }
2854    for( i=0; i<untestedCount; ++i ) {
2855        const tr_bool b = t->interestedCount < maxPeers;
2856        tr_peerMsgsSetInterested( untested[i]->msgs, b );
2857        if( b )
2858            ++t->interestedCount;
2859    }
2860    for( i=0; i<badCount; ++i ) {
2861        const tr_bool b = t->interestedCount < maxPeers;
2862        tr_peerMsgsSetInterested( bad[i]->msgs, b );
2863        if( b )
2864            ++t->interestedCount;
2865    }
2866
2867/*fprintf( stderr, "num interested: %d\n", t->interestedCount );*/
2868
2869    /* cleanup */
2870    tr_free( untested );
2871    tr_free( good );
2872    tr_free( bad );
2873}
2874
2875/**
2876***
2877**/
2878
2879struct ChokeData
2880{
2881    tr_bool         isInterested;
2882    tr_bool         wasChoked;
2883    tr_bool         isChoked;
2884    int             rate;
2885    int             salt;
2886    tr_peer *       peer;
2887};
2888
2889static int
2890compareChoke( const void * va,
2891              const void * vb )
2892{
2893    const struct ChokeData * a = va;
2894    const struct ChokeData * b = vb;
2895
2896    if( a->rate != b->rate ) /* prefer higher overall speeds */
2897        return a->rate > b->rate ? -1 : 1;
2898
2899    if( a->wasChoked != b->wasChoked ) /* prefer unchoked */
2900        return a->wasChoked ? 1 : -1;
2901
2902    if( a->salt != b->salt ) /* random order */
2903        return a->salt - b->salt;
2904
2905    return 0;
2906}
2907
2908/* is this a new connection? */
2909static int
2910isNew( const tr_peer * peer )
2911{
2912    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
2913}
2914
2915/* get a rate for deciding which peers to choke and unchoke. */
2916static int
2917getRate( const tr_torrent * tor, struct peer_atom * atom, uint64_t now )
2918{
2919    int Bps;
2920
2921    if( tr_torrentIsSeed( tor ) )
2922        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_CLIENT_TO_PEER );
2923
2924    /* downloading a private torrent... take upload speed into account
2925     * because there may only be a small window of opportunity to share */
2926    else if( tr_torrentIsPrivate( tor ) )
2927        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_PEER_TO_CLIENT )
2928            + tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_CLIENT_TO_PEER );
2929
2930    /* downloading a public torrent */
2931    else
2932        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_PEER_TO_CLIENT );
2933
2934    /* convert it to bytes per second */
2935    return Bps;
2936}
2937
2938static inline tr_bool
2939isBandwidthMaxedOut( const tr_bandwidth * b,
2940                     const uint64_t now_msec, tr_direction dir )
2941{
2942    if( !tr_bandwidthIsLimited( b, dir ) )
2943        return FALSE;
2944    else {
2945        const int got = tr_bandwidthGetPieceSpeed_Bps( b, now_msec, dir );
2946        const int want = tr_bandwidthGetDesiredSpeed_Bps( b, dir );
2947        return got >= want;
2948    }
2949}
2950
2951static void
2952rechokeUploads( Torrent * t, const uint64_t now )
2953{
2954    int i, size, unchokedInterested;
2955    const int peerCount = tr_ptrArraySize( &t->peers );
2956    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
2957    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
2958    const tr_session * session = t->manager->session;
2959    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
2960    const tr_bool isMaxedOut = isBandwidthMaxedOut( t->tor->bandwidth, now, TR_UP );
2961
2962    assert( torrentIsLocked( t ) );
2963
2964    /* an optimistic unchoke peer's "optimistic"
2965     * state lasts for N calls to rechokeUploads(). */
2966    if( t->optimisticUnchokeTimeScaler > 0 )
2967        t->optimisticUnchokeTimeScaler--;
2968    else
2969        t->optimistic = NULL;
2970
2971    /* sort the peers by preference and rate */
2972    for( i = 0, size = 0; i < peerCount; ++i )
2973    {
2974        tr_peer * peer = peers[i];
2975        struct peer_atom * atom = peer->atom;
2976
2977        if( peer->progress >= 1.0 ) /* choke all seeds */
2978        {
2979            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2980        }
2981        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
2982        {
2983            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2984        }
2985        else if( chokeAll ) /* choke everyone if we're not uploading */
2986        {
2987            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2988        }
2989        else if( peer != t->optimistic )
2990        {
2991            struct ChokeData * n = &choke[size++];
2992            n->peer         = peer;
2993            n->isInterested = peer->peerIsInterested;
2994            n->wasChoked    = peer->peerIsChoked;
2995            n->rate         = getRate( t->tor, atom, now );
2996            n->salt         = tr_cryptoWeakRandInt( INT_MAX );
2997            n->isChoked     = TRUE;
2998        }
2999    }
3000
3001    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
3002
3003    /**
3004     * Reciprocation and number of uploads capping is managed by unchoking
3005     * the N peers which have the best upload rate and are interested.
3006     * This maximizes the client's download rate. These N peers are
3007     * referred to as downloaders, because they are interested in downloading
3008     * from the client.
3009     *
3010     * Peers which have a better upload rate (as compared to the downloaders)
3011     * but aren't interested get unchoked. If they become interested, the
3012     * downloader with the worst upload rate gets choked. If a client has
3013     * a complete file, it uses its upload rate rather than its download
3014     * rate to decide which peers to unchoke.
3015     *
3016     * If our bandwidth is maxed out, don't unchoke any more peers.
3017     */
3018    unchokedInterested = 0;
3019    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
3020        choke[i].isChoked = isMaxedOut ? choke[i].wasChoked : FALSE;
3021        if( choke[i].isInterested )
3022            ++unchokedInterested;
3023    }
3024
3025    /* optimistic unchoke */
3026    if( !t->optimistic && !isMaxedOut && (i<size) )
3027    {
3028        int n;
3029        struct ChokeData * c;
3030        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
3031
3032        for( ; i<size; ++i )
3033        {
3034            if( choke[i].isInterested )
3035            {
3036                const tr_peer * peer = choke[i].peer;
3037                int x = 1, y;
3038                if( isNew( peer ) ) x *= 3;
3039                for( y=0; y<x; ++y )
3040                    tr_ptrArrayAppend( &randPool, &choke[i] );
3041            }
3042        }
3043
3044        if(( n = tr_ptrArraySize( &randPool )))
3045        {
3046            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
3047            c->isChoked = FALSE;
3048            t->optimistic = c->peer;
3049            t->optimisticUnchokeTimeScaler = OPTIMISTIC_UNCHOKE_MULTIPLIER;
3050        }
3051
3052        tr_ptrArrayDestruct( &randPool, NULL );
3053    }
3054
3055    for( i=0; i<size; ++i )
3056        tr_peerMsgsSetChoke( choke[i].peer->msgs, choke[i].isChoked );
3057
3058    /* cleanup */
3059    tr_free( choke );
3060}
3061
3062static void
3063rechokePulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3064{
3065    tr_torrent * tor = NULL;
3066    tr_peerMgr * mgr = vmgr;
3067    const uint64_t now = tr_sessionGetTimeMsec( mgr->session );
3068
3069    managerLock( mgr );
3070
3071    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3072        if( tor->isRunning ) {
3073            rechokeUploads( tor->torrentPeers, now );
3074            if( !tr_torrentIsSeed( tor ) )
3075                rechokeDownloads( tor->torrentPeers );
3076        }
3077    }
3078
3079    tr_timerAddMsec( mgr->rechokeTimer, RECHOKE_PERIOD_MSEC );
3080    managerUnlock( mgr );
3081}
3082
3083/***
3084****
3085****  Life and Death
3086****
3087***/
3088
3089static tr_bool
3090shouldPeerBeClosed( const Torrent    * t,
3091                    const tr_peer    * peer,
3092                    int                peerCount,
3093                    const time_t       now )
3094{
3095    const tr_torrent *       tor = t->tor;
3096    const struct peer_atom * atom = peer->atom;
3097
3098    /* if it's marked for purging, close it */
3099    if( peer->doPurge )
3100    {
3101        tordbg( t, "purging peer %s because its doPurge flag is set",
3102                tr_atomAddrStr( atom ) );
3103        return TRUE;
3104    }
3105
3106    /* if we're seeding and the peer has everything we have,
3107     * and enough time has passed for a pex exchange, then disconnect */
3108    if( tr_torrentIsSeed( tor ) )
3109    {
3110        tr_bool peerHasEverything;
3111
3112        if( atom->seedProbability != -1 )
3113        {
3114            peerHasEverything = atomIsSeed( atom );
3115        }
3116        else
3117        {
3118            tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( &tor->completion ) );
3119            tr_bitsetDifference( tmp, &peer->have );
3120            peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
3121            tr_bitfieldFree( tmp );
3122        }
3123
3124        if( peerHasEverything && ( !tr_torrentAllowsPex(tor) || (now-atom->time>=30 )))
3125        {
3126            tordbg( t, "purging peer %s because we're both seeds",
3127                    tr_atomAddrStr( atom ) );
3128            return TRUE;
3129        }
3130    }
3131
3132    /* disconnect if it's been too long since piece data has been transferred.
3133     * this is on a sliding scale based on number of available peers... */
3134    {
3135        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
3136        /* if we have >= relaxIfFewerThan, strictness is 100%.
3137         * if we have zero connections, strictness is 0% */
3138        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
3139                               ? 1.0
3140                               : peerCount / (float)relaxStrictnessIfFewerThanN;
3141        const int lo = MIN_UPLOAD_IDLE_SECS;
3142        const int hi = MAX_UPLOAD_IDLE_SECS;
3143        const int limit = hi - ( ( hi - lo ) * strictness );
3144        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
3145/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
3146        if( idleTime > limit ) {
3147            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
3148                       tr_atomAddrStr( atom ), idleTime );
3149            return TRUE;
3150        }
3151    }
3152
3153    return FALSE;
3154}
3155
3156static tr_peer **
3157getPeersToClose( Torrent * t, const time_t now_sec, int * setmeSize )
3158{
3159    int i, peerCount, outsize;
3160    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
3161    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
3162
3163    assert( torrentIsLocked( t ) );
3164
3165    for( i = outsize = 0; i < peerCount; ++i )
3166        if( shouldPeerBeClosed( t, peers[i], peerCount, now_sec ) )
3167            ret[outsize++] = peers[i];
3168
3169    *setmeSize = outsize;
3170    return ret;
3171}
3172
3173static int
3174getReconnectIntervalSecs( const struct peer_atom * atom, const time_t now )
3175{
3176    int sec;
3177
3178    /* if we were recently connected to this peer and transferring piece
3179     * data, try to reconnect to them sooner rather that later -- we don't
3180     * want network troubles to get in the way of a good peer. */
3181    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
3182        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3183
3184    /* don't allow reconnects more often than our minimum */
3185    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
3186        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3187
3188    /* otherwise, the interval depends on how many times we've tried
3189     * and failed to connect to the peer */
3190    else switch( atom->numFails ) {
3191        case 0: sec = 0; break;
3192        case 1: sec = 5; break;
3193        case 2: sec = 2 * 60; break;
3194        case 3: sec = 15 * 60; break;
3195        case 4: sec = 30 * 60; break;
3196        case 5: sec = 60 * 60; break;
3197        default: sec = 120 * 60; break;
3198    }
3199
3200    /* penalize peers that were unreachable the last time we tried */
3201    if( atom->flags2 & MYFLAG_UNREACHABLE )
3202        sec += sec;
3203
3204    dbgmsg( "reconnect interval for %s is %d seconds", tr_atomAddrStr( atom ), sec );
3205    return sec;
3206}
3207
3208static void
3209removePeer( Torrent * t, tr_peer * peer )
3210{
3211    tr_peer * removed;
3212    struct peer_atom * atom = peer->atom;
3213
3214    assert( torrentIsLocked( t ) );
3215    assert( atom );
3216
3217    atom->time = tr_time( );
3218
3219    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
3220
3221    if( replicationExists( t ) )
3222        tr_decrReplicationFromBitset( t, &peer->have );
3223
3224    assert( removed == peer );
3225    peerDestructor( t, removed );
3226}
3227
3228static void
3229closePeer( Torrent * t, tr_peer * peer )
3230{
3231    struct peer_atom * atom;
3232
3233    assert( t != NULL );
3234    assert( peer != NULL );
3235
3236    atom = peer->atom;
3237
3238    /* if we transferred piece data, then they might be good peers,
3239       so reset their `numFails' weight to zero. otherwise we connected
3240       to them fruitlessly, so mark it as another fail */
3241    if( atom->piece_data_time ) {
3242        tordbg( t, "resetting atom %s numFails to 0", tr_atomAddrStr(atom) );
3243        atom->numFails = 0;
3244    } else {
3245        ++atom->numFails;
3246        tordbg( t, "incremented atom %s numFails to %d", tr_atomAddrStr(atom), (int)atom->numFails );
3247    }
3248
3249    tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
3250    removePeer( t, peer );
3251}
3252
3253static void
3254removeAllPeers( Torrent * t )
3255{
3256    while( !tr_ptrArrayEmpty( &t->peers ) )
3257        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
3258}
3259
3260static void
3261closeBadPeers( Torrent * t, const time_t now_sec )
3262{
3263    int i;
3264    int peerCount;
3265    struct tr_peer ** peers = getPeersToClose( t, now_sec, &peerCount );
3266    for( i=0; i<peerCount; ++i )
3267        closePeer( t, peers[i] );
3268    tr_free( peers );
3269}
3270
3271struct peer_liveliness
3272{
3273    tr_peer * peer;
3274    void * clientData;
3275    time_t pieceDataTime;
3276    time_t time;
3277    int speed;
3278    tr_bool doPurge;
3279};
3280
3281static int
3282comparePeerLiveliness( const void * va, const void * vb )
3283{
3284    const struct peer_liveliness * a = va;
3285    const struct peer_liveliness * b = vb;
3286
3287    if( a->doPurge != b->doPurge )
3288        return a->doPurge ? 1 : -1;
3289
3290    if( a->speed != b->speed ) /* faster goes first */
3291        return a->speed > b->speed ? -1 : 1;
3292
3293    /* the one to give us data more recently goes first */
3294    if( a->pieceDataTime != b->pieceDataTime )
3295        return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
3296
3297    /* the one we connected to most recently goes first */
3298    if( a->time != b->time )
3299        return a->time > b->time ? -1 : 1;
3300
3301    return 0;
3302}
3303
3304static void
3305sortPeersByLivelinessImpl( tr_peer  ** peers,
3306                           void     ** clientData,
3307                           int         n,
3308                           uint64_t    now,
3309                           int (*compare) ( const void *va, const void *vb ) )
3310{
3311    int i;
3312    struct peer_liveliness *lives, *l;
3313
3314    /* build a sortable array of peer + extra info */
3315    lives = l = tr_new0( struct peer_liveliness, n );
3316    for( i=0; i<n; ++i, ++l )
3317    {
3318        tr_peer * p = peers[i];
3319        l->peer = p;
3320        l->doPurge = p->doPurge;
3321        l->pieceDataTime = p->atom->piece_data_time;
3322        l->time = p->atom->time;
3323        l->speed = tr_peerGetPieceSpeed_Bps( p, now, TR_UP )
3324                 + tr_peerGetPieceSpeed_Bps( p, now, TR_DOWN );
3325        if( clientData )
3326            l->clientData = clientData[i];
3327    }
3328
3329    /* sort 'em */
3330    assert( n == ( l - lives ) );
3331    qsort( lives, n, sizeof( struct peer_liveliness ), compare );
3332
3333    /* build the peer array */
3334    for( i=0, l=lives; i<n; ++i, ++l ) {
3335        peers[i] = l->peer;
3336        if( clientData )
3337            clientData[i] = l->clientData;
3338    }
3339    assert( n == ( l - lives ) );
3340
3341    /* cleanup */
3342    tr_free( lives );
3343}
3344
3345static void
3346sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now )
3347{
3348    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLiveliness );
3349}
3350
3351
3352static void
3353enforceTorrentPeerLimit( Torrent * t, uint64_t now )
3354{
3355    int n = tr_ptrArraySize( &t->peers );
3356    const int max = tr_torrentGetPeerLimit( t->tor );
3357    if( n > max )
3358    {
3359        void * base = tr_ptrArrayBase( &t->peers );
3360        tr_peer ** peers = tr_memdup( base, n*sizeof( tr_peer* ) );
3361        sortPeersByLiveliness( peers, NULL, n, now );
3362        while( n > max )
3363            closePeer( t, peers[--n] );
3364        tr_free( peers );
3365    }
3366}
3367
3368static void
3369enforceSessionPeerLimit( tr_session * session, uint64_t now )
3370{
3371    int n = 0;
3372    tr_torrent * tor = NULL;
3373    const int max = tr_sessionGetPeerLimit( session );
3374
3375    /* count the total number of peers */
3376    while(( tor = tr_torrentNext( session, tor )))
3377        n += tr_ptrArraySize( &tor->torrentPeers->peers );
3378
3379    /* if there are too many, prune out the worst */
3380    if( n > max )
3381    {
3382        tr_peer ** peers = tr_new( tr_peer*, n );
3383        Torrent ** torrents = tr_new( Torrent*, n );
3384
3385        /* populate the peer array */
3386        n = 0;
3387        tor = NULL;
3388        while(( tor = tr_torrentNext( session, tor ))) {
3389            int i;
3390            Torrent * t = tor->torrentPeers;
3391            const int tn = tr_ptrArraySize( &t->peers );
3392            for( i=0; i<tn; ++i, ++n ) {
3393                peers[n] = tr_ptrArrayNth( &t->peers, i );
3394                torrents[n] = t;
3395            }
3396        }
3397
3398        /* sort 'em */
3399        sortPeersByLiveliness( peers, (void**)torrents, n, now );
3400
3401        /* cull out the crappiest */
3402        while( n-- > max )
3403            closePeer( torrents[n], peers[n] );
3404
3405        /* cleanup */
3406        tr_free( torrents );
3407        tr_free( peers );
3408    }
3409}
3410
3411static void makeNewPeerConnections( tr_peerMgr * mgr, const int max );
3412
3413static void
3414reconnectPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3415{
3416    tr_torrent * tor;
3417    tr_peerMgr * mgr = vmgr;
3418    const time_t now_sec = tr_time( );
3419    const uint64_t now_msec = tr_sessionGetTimeMsec( mgr->session );
3420
3421    /**
3422    ***  enforce the per-session and per-torrent peer limits
3423    **/
3424
3425    /* if we're over the per-torrent peer limits, cull some peers */
3426    tor = NULL;
3427    while(( tor = tr_torrentNext( mgr->session, tor )))
3428        if( tor->isRunning )
3429            enforceTorrentPeerLimit( tor->torrentPeers, now_msec );
3430
3431    /* if we're over the per-session peer limits, cull some peers */
3432    enforceSessionPeerLimit( mgr->session, now_msec );
3433
3434    /* remove crappy peers */
3435    tor = NULL;
3436    while(( tor = tr_torrentNext( mgr->session, tor )))
3437        if( !tor->torrentPeers->isRunning )
3438            removeAllPeers( tor->torrentPeers );
3439        else
3440            closeBadPeers( tor->torrentPeers, now_sec );
3441
3442    /* try to make new peer connections */
3443    makeNewPeerConnections( mgr, MAX_CONNECTIONS_PER_PULSE );
3444}
3445
3446/****
3447*****
3448*****  BANDWIDTH ALLOCATION
3449*****
3450****/
3451
3452static void
3453pumpAllPeers( tr_peerMgr * mgr )
3454{
3455    tr_torrent * tor = NULL;
3456
3457    while(( tor = tr_torrentNext( mgr->session, tor )))
3458    {
3459        int j;
3460        Torrent * t = tor->torrentPeers;
3461
3462        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
3463        {
3464            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
3465            tr_peerMsgsPulse( peer->msgs );
3466        }
3467    }
3468}
3469
3470static void
3471bandwidthPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3472{
3473    tr_torrent * tor;
3474    tr_peerMgr * mgr = vmgr;
3475    managerLock( mgr );
3476
3477    /* FIXME: this next line probably isn't necessary... */
3478    pumpAllPeers( mgr );
3479
3480    /* allocate bandwidth to the peers */
3481    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
3482    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
3483
3484    /* possibly stop torrents that have seeded enough */
3485    tor = NULL;
3486    while(( tor = tr_torrentNext( mgr->session, tor )))
3487        tr_torrentCheckSeedLimit( tor );
3488
3489    /* run the completeness check for any torrents that need it */
3490    tor = NULL;
3491    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3492        if( tor->torrentPeers->needsCompletenessCheck ) {
3493            tor->torrentPeers->needsCompletenessCheck  = FALSE;
3494            tr_torrentRecheckCompleteness( tor );
3495        }
3496    }
3497
3498    /* stop torrents that are ready to stop, but couldn't be stopped earlier
3499     * during the peer-io callback call chain */
3500    tor = NULL;
3501    while(( tor = tr_torrentNext( mgr->session, tor )))
3502        if( tor->isStopping )
3503            tr_torrentStop( tor );
3504
3505    reconnectPulse( 0, 0, mgr );
3506
3507    tr_timerAddMsec( mgr->bandwidthTimer, BANDWIDTH_PERIOD_MSEC );
3508    managerUnlock( mgr );
3509}
3510
3511/***
3512****
3513***/
3514
3515static int
3516compareAtomPtrsByAddress( const void * va, const void *vb )
3517{
3518    const struct peer_atom * a = * (const struct peer_atom**) va;
3519    const struct peer_atom * b = * (const struct peer_atom**) vb;
3520
3521    assert( tr_isAtom( a ) );
3522    assert( tr_isAtom( b ) );
3523
3524    return tr_compareAddresses( &a->addr, &b->addr );
3525}
3526
3527/* best come first, worst go last */
3528static int
3529compareAtomPtrsByShelfDate( const void * va, const void *vb )
3530{
3531    time_t atime;
3532    time_t btime;
3533    const struct peer_atom * a = * (const struct peer_atom**) va;
3534    const struct peer_atom * b = * (const struct peer_atom**) vb;
3535    const int data_time_cutoff_secs = 60 * 60;
3536    const time_t tr_now = tr_time( );
3537
3538    assert( tr_isAtom( a ) );
3539    assert( tr_isAtom( b ) );
3540
3541    /* primary key: the last piece data time *if* it was within the last hour */
3542    atime = a->piece_data_time; if( atime + data_time_cutoff_secs < tr_now ) atime = 0;
3543    btime = b->piece_data_time; if( btime + data_time_cutoff_secs < tr_now ) btime = 0;
3544    if( atime != btime )
3545        return atime > btime ? -1 : 1;
3546
3547    /* secondary key: shelf date. */
3548    if( a->shelf_date != b->shelf_date )
3549        return a->shelf_date > b->shelf_date ? -1 : 1;
3550
3551    return 0;
3552}
3553
3554static int
3555getMaxAtomCount( const tr_torrent * tor )
3556{
3557    const int n = tor->maxConnectedPeers;
3558    /* approximate fit of the old jump discontinuous function */
3559    if( n >= 55 ) return     n + 150;
3560    if( n >= 20 ) return 2 * n + 95;
3561    return               4 * n + 55;
3562}
3563
3564static void
3565atomPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3566{
3567    tr_torrent * tor = NULL;
3568    tr_peerMgr * mgr = vmgr;
3569    managerLock( mgr );
3570
3571    while(( tor = tr_torrentNext( mgr->session, tor )))
3572    {
3573        int atomCount;
3574        Torrent * t = tor->torrentPeers;
3575        const int maxAtomCount = getMaxAtomCount( tor );
3576        struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
3577
3578        if( atomCount > maxAtomCount ) /* we've got too many atoms... time to prune */
3579        {
3580            int i;
3581            int keepCount = 0;
3582            int testCount = 0;
3583            struct peer_atom ** keep = tr_new( struct peer_atom*, atomCount );
3584            struct peer_atom ** test = tr_new( struct peer_atom*, atomCount );
3585
3586            /* keep the ones that are in use */
3587            for( i=0; i<atomCount; ++i ) {
3588                struct peer_atom * atom = atoms[i];
3589                if( peerIsInUse( t, atom ) )
3590                    keep[keepCount++] = atom;
3591                else
3592                    test[testCount++] = atom;
3593            }
3594
3595            /* if there's room, keep the best of what's left */
3596            i = 0;
3597            if( keepCount < maxAtomCount ) {
3598                qsort( test, testCount, sizeof( struct peer_atom * ), compareAtomPtrsByShelfDate );
3599                while( i<testCount && keepCount<maxAtomCount )
3600                    keep[keepCount++] = test[i++];
3601            }
3602
3603            /* free the culled atoms */
3604            while( i<testCount )
3605                tr_free( test[i++] );
3606
3607            /* rebuild Torrent.pool with what's left */
3608            tr_ptrArrayDestruct( &t->pool, NULL );
3609            t->pool = TR_PTR_ARRAY_INIT;
3610            qsort( keep, keepCount, sizeof( struct peer_atom * ), compareAtomPtrsByAddress );
3611            for( i=0; i<keepCount; ++i )
3612                tr_ptrArrayAppend( &t->pool, keep[i] );
3613
3614            tordbg( t, "max atom count is %d... pruned from %d to %d\n", maxAtomCount, atomCount, keepCount );
3615
3616            /* cleanup */
3617            tr_free( test );
3618            tr_free( keep );
3619        }
3620    }
3621
3622    tr_timerAddMsec( mgr->atomTimer, ATOM_PERIOD_MSEC );
3623    managerUnlock( mgr );
3624}
3625
3626/***
3627****
3628****
3629****
3630***/
3631
3632/* is this atom someone that we'd want to initiate a connection to? */
3633static tr_bool
3634isPeerCandidate( const tr_torrent * tor, struct peer_atom * atom, const time_t now )
3635{
3636    /* not if we're both seeds */
3637    if( tr_torrentIsSeed( tor ) )
3638        if( atomIsSeed( atom ) || ( atom->uploadOnly == UPLOAD_ONLY_YES ) )
3639            return FALSE;
3640
3641    /* not if we've already got a connection to them... */
3642    if( peerIsInUse( tor->torrentPeers, atom ) )
3643        return FALSE;
3644
3645    /* not if we just tried them already */
3646    if( ( now - atom->time ) < getReconnectIntervalSecs( atom, now ) )
3647        return FALSE;
3648
3649    /* not if they're blocklisted */
3650    if( isAtomBlocklisted( tor->session, atom ) )
3651        return FALSE;
3652
3653    /* not if they're banned... */
3654    if( atom->flags2 & MYFLAG_BANNED )
3655        return FALSE;
3656
3657    return TRUE;
3658}
3659
3660struct peer_candidate
3661{
3662    uint64_t score;
3663    tr_torrent * tor;
3664    struct peer_atom * atom;
3665};
3666
3667static tr_bool
3668torrentWasRecentlyStarted( const tr_torrent * tor )
3669{
3670    return difftime( tr_time( ), tor->startDate ) < 120;
3671}
3672
3673static inline uint64_t
3674addValToKey( uint64_t value, int width, uint64_t addme )
3675{
3676    value = (value << (uint64_t)width);
3677    value |= addme;
3678    return value;
3679}
3680
3681/* smaller value is better */
3682static uint64_t
3683getPeerCandidateScore( const tr_torrent * tor, const struct peer_atom * atom, uint8_t salt  )
3684{
3685    uint64_t i;
3686    uint64_t score = 0;
3687    const tr_bool failed = atom->lastConnectionAt < atom->lastConnectionAttemptAt;
3688
3689    /* prefer peers we've connected to, or never tried, over peers we failed to connect to. */
3690    i = failed ? 1 : 0;
3691    score = addValToKey( score, 1, i );
3692
3693    /* prefer the one we attempted least recently (to cycle through all peers) */
3694    i = atom->lastConnectionAttemptAt;
3695    score = addValToKey( score, 32, i );
3696
3697    /* prefer peers belonging to a torrent of a higher priority */
3698    switch( tr_torrentGetPriority( tor ) ) {
3699        case TR_PRI_HIGH:    i = 0; break;
3700        case TR_PRI_NORMAL:  i = 1; break;
3701        case TR_PRI_LOW:     i = 2; break;
3702    }
3703    score = addValToKey( score, 4, i );
3704
3705    /* prefer recently-started torrents */
3706    i = torrentWasRecentlyStarted( tor ) ? 0 : 1;
3707    score = addValToKey( score, 1, i );
3708
3709    /* prefer torrents we're downloading with */
3710    i = tr_torrentIsSeed( tor ) ? 1 : 0;
3711    score = addValToKey( score, 1, i );
3712
3713    /* prefer peers that are known to be connectible */
3714    i = ( atom->flags & ADDED_F_CONNECTABLE ) ? 0 : 1;
3715    score = addValToKey( score, 1, i );
3716
3717    /* prefer peers that we might have a chance of uploading to...
3718       so lower seed probability is better */
3719    if( atom->seedProbability == 100 ) i = 101;
3720    else if( atom->seedProbability == -1 ) i = 100;
3721    else i = atom->seedProbability;
3722    score = addValToKey( score, 8, i );
3723
3724    /* Prefer peers that we got from more trusted sources.
3725     * lower `fromBest' values indicate more trusted sources */
3726    score = addValToKey( score, 4, atom->fromBest );
3727
3728    /* salt */
3729    score = addValToKey( score, 8, salt );
3730
3731    return score;
3732}
3733
3734/* sort an array of peer candidates */
3735static int
3736comparePeerCandidates( const void * va, const void * vb )
3737{
3738    const struct peer_candidate * a = va;
3739    const struct peer_candidate * b = vb;
3740
3741    if( a->score < b->score ) return -1;
3742    if( a->score > b->score ) return 1;
3743
3744    return 0;
3745}
3746
3747/** @return an array of all the atoms we might want to connect to */
3748static struct peer_candidate*
3749getPeerCandidates( tr_session * session, int * candidateCount )
3750{
3751    int n;
3752    tr_torrent * tor;
3753    struct peer_candidate * candidates;
3754    struct peer_candidate * walk;
3755    const time_t now = tr_time( );
3756    const uint64_t now_msec = tr_time_msec( );
3757    /* leave 5% of connection slots for incoming connections -- ticket #2609 */
3758    const int maxCandidates = tr_sessionGetPeerLimit( session ) * 0.95;
3759
3760    /* don't start any new handshakes if we're full up */
3761    n = 0;
3762    tor= NULL;
3763    while(( tor = tr_torrentNext( session, tor )))
3764        n += tr_ptrArraySize( &tor->torrentPeers->peers );
3765    if( maxCandidates <= n ) {
3766        *candidateCount = 0;
3767        return NULL;
3768    }
3769
3770    /* allocate an array of candidates */
3771    n = 0;
3772    tor= NULL;
3773    while(( tor = tr_torrentNext( session, tor )))
3774        n += tr_ptrArraySize( &tor->torrentPeers->pool );
3775    walk = candidates = tr_new( struct peer_candidate, n );
3776
3777    /* populate the candidate array */
3778    tor = NULL;
3779    while(( tor = tr_torrentNext( session, tor )))
3780    {
3781        int i, nAtoms;
3782        struct peer_atom ** atoms;
3783
3784        if( !tor->torrentPeers->isRunning )
3785            continue;
3786
3787        /* if we've already got enough peers in this torrent... */
3788        if( tr_torrentGetPeerLimit( tor ) <= tr_ptrArraySize( &tor->torrentPeers->peers ) )
3789            continue;
3790
3791        /* if we've already got enough speed in this torrent... */
3792        if( tr_torrentIsSeed( tor ) && isBandwidthMaxedOut( tor->bandwidth, now_msec, TR_UP ) )
3793            continue;
3794
3795        atoms = (struct peer_atom**) tr_ptrArrayPeek( &tor->torrentPeers->pool, &nAtoms );
3796        for( i=0; i<nAtoms; ++i )
3797        {
3798            struct peer_atom * atom = atoms[i];
3799
3800            if( isPeerCandidate( tor, atom, now ) )
3801            {
3802                const uint8_t salt = tr_cryptoWeakRandInt( 1024 );
3803                walk->tor = tor;
3804                walk->atom = atom;
3805                walk->score = getPeerCandidateScore( tor, atom, salt );
3806                ++walk;
3807            }
3808        }
3809    }
3810
3811    *candidateCount = walk - candidates;
3812    if( *candidateCount > 1 )
3813        qsort( candidates, *candidateCount, sizeof( struct peer_candidate ), comparePeerCandidates );
3814    return candidates;
3815}
3816
3817static void
3818initiateConnection( tr_peerMgr * mgr, Torrent * t, struct peer_atom * atom )
3819{
3820    tr_peerIo * io;
3821    const time_t now = tr_time( );
3822    tr_bool utp = tr_sessionIsUTPEnabled(mgr->session) && !atom->utp_failed;
3823
3824    if( atom->fromFirst == TR_PEER_FROM_PEX )
3825        /* PEX has explicit signalling for uTP support.  If an atom
3826           originally came from PEX and doesn't have the uTP flag, skip the
3827           uTP connection attempt.  Are we being optimistic here? */
3828        utp = utp && (atom->flags & ADDED_F_UTP_FLAGS);
3829
3830    tordbg( t, "Starting an OUTGOING%s connection with %s",
3831            utp ? " µTP" : "",
3832            tr_atomAddrStr( atom ) );
3833
3834    io = tr_peerIoNewOutgoing( mgr->session,
3835                               mgr->session->bandwidth,
3836                               &atom->addr,
3837                               atom->port,
3838                               t->tor->info.hash,
3839                               t->tor->completeness == TR_SEED,
3840                               utp );
3841
3842    if( io == NULL )
3843    {
3844        tordbg( t, "peerIo not created; marking peer %s as unreachable",
3845                tr_atomAddrStr( atom ) );
3846        atom->flags2 |= MYFLAG_UNREACHABLE;
3847        atom->numFails++;
3848    }
3849    else
3850    {
3851        tr_handshake * handshake = tr_handshakeNew( io,
3852                                                    mgr->session->encryptionMode,
3853                                                    myHandshakeDoneCB,
3854                                                    mgr );
3855
3856        assert( tr_peerIoGetTorrentHash( io ) );
3857
3858        tr_peerIoUnref( io ); /* balanced by the initial ref
3859                                 in tr_peerIoNewOutgoing() */
3860
3861        tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
3862                                 handshakeCompare );
3863    }
3864
3865    atom->lastConnectionAttemptAt = now;
3866    atom->time = now;
3867}
3868
3869static void
3870initiateCandidateConnection( tr_peerMgr * mgr, struct peer_candidate * c )
3871{
3872#if 0
3873    fprintf( stderr, "Starting an OUTGOING connection with %s - [%s] seedProbability==%d; %s, %s\n",
3874             tr_atomAddrStr( c->atom ),
3875             tr_torrentName( c->tor ),
3876             (int)c->atom->seedProbability,
3877             tr_torrentIsPrivate( c->tor ) ? "private" : "public",
3878             tr_torrentIsSeed( c->tor ) ? "seed" : "downloader" );
3879#endif
3880
3881    initiateConnection( mgr, c->tor->torrentPeers, c->atom );
3882}
3883
3884static void
3885makeNewPeerConnections( struct tr_peerMgr * mgr, const int max )
3886{
3887    int i, n;
3888    struct peer_candidate * candidates;
3889
3890    candidates = getPeerCandidates( mgr->session, &n );
3891
3892    for( i=0; i<n && i<max; ++i )
3893        initiateCandidateConnection( mgr, &candidates[i] );
3894
3895    tr_free( candidates );
3896}
Note: See TracBrowser for help on using the repository browser.