source: trunk/libtransmission/peer-mgr.c @ 12006

Last change on this file since 12006 was 12006, checked in by jordan, 11 years ago

(trunk libT) remove unused functions: tr_bitsetDifference() tr_bitfieldDifference()

  • Property svn:keywords set to Date Rev Author Id
File size: 113.0 KB
Line 
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 12006 2011-02-21 15:36:07Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h> /* error codes ERANGE, ... */
15#include <limits.h> /* INT_MAX */
16#include <string.h> /* memcpy, memcmp, strstr */
17#include <stdlib.h> /* qsort */
18
19#include <event2/event.h>
20#include <libutp/utp.h>
21
22#include "transmission.h"
23#include "announcer.h"
24#include "bandwidth.h"
25#include "bencode.h"
26#include "blocklist.h"
27#include "cache.h"
28#include "clients.h"
29#include "completion.h"
30#include "crypto.h"
31#include "handshake.h"
32#include "net.h"
33#include "peer-io.h"
34#include "peer-mgr.h"
35#include "peer-msgs.h"
36#include "ptrarray.h"
37#include "session.h"
38#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
39#include "torrent.h"
40#include "utils.h"
41#include "webseed.h"
42
43enum
44{
45    /* how frequently to cull old atoms */
46    ATOM_PERIOD_MSEC = ( 60 * 1000 ),
47
48    /* how frequently to change which peers are choked */
49    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
50
51    /* an optimistically unchoked peer is immune from rechoking
52       for this many calls to rechokeUploads(). */
53    OPTIMISTIC_UNCHOKE_MULTIPLIER = 4,
54
55    /* how frequently to reallocate bandwidth */
56    BANDWIDTH_PERIOD_MSEC = 500,
57
58    /* how frequently to age out old piece request lists */
59    REFILL_UPKEEP_PERIOD_MSEC = ( 10 * 1000 ),
60
61    /* how frequently to decide which peers live and die */
62    RECONNECT_PERIOD_MSEC = 500,
63
64    /* when many peers are available, keep idle ones this long */
65    MIN_UPLOAD_IDLE_SECS = ( 60 ),
66
67    /* when few peers are available, keep idle ones this long */
68    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
69
70    /* max number of peers to ask for per second overall.
71     * this throttle is to avoid overloading the router */
72    MAX_CONNECTIONS_PER_SECOND = 12,
73
74    MAX_CONNECTIONS_PER_PULSE = (int)(MAX_CONNECTIONS_PER_SECOND * (RECONNECT_PERIOD_MSEC/1000.0)),
75
76    /* number of bad pieces a peer is allowed to send before we ban them */
77    MAX_BAD_PIECES_PER_PEER = 5,
78
79    /* amount of time to keep a list of request pieces lying around
80       before it's considered too old and needs to be rebuilt */
81    PIECE_LIST_SHELF_LIFE_SECS = 60,
82
83    /* use for bitwise operations w/peer_atom.flags2 */
84    MYFLAG_BANNED = 1,
85
86    /* use for bitwise operations w/peer_atom.flags2 */
87    /* unreachable for now... but not banned.
88     * if they try to connect to us it's okay */
89    MYFLAG_UNREACHABLE = 2,
90
91    /* the minimum we'll wait before attempting to reconnect to a peer */
92    MINIMUM_RECONNECT_INTERVAL_SECS = 5,
93
94    /** how long we'll let requests we've made linger before we cancel them */
95    REQUEST_TTL_SECS = 120,
96
97    NO_BLOCKS_CANCEL_HISTORY = 120,
98
99    CANCEL_HISTORY_SEC = 60
100};
101
102const tr_peer_event TR_PEER_EVENT_INIT = { 0, 0, NULL, 0, 0, 0.0f, 0, FALSE, 0 };
103
104/**
105***
106**/
107
108enum
109{
110    UPLOAD_ONLY_UKNOWN,
111    UPLOAD_ONLY_YES,
112    UPLOAD_ONLY_NO
113};
114
115/**
116 * Peer information that should be kept even before we've connected and
117 * after we've disconnected. These are kept in a pool of peer_atoms to decide
118 * which ones would make good candidates for connecting to, and to watch out
119 * for banned peers.
120 *
121 * @see tr_peer
122 * @see tr_peermsgs
123 */
124struct peer_atom
125{
126    uint8_t     fromFirst;          /* where the peer was first found */
127    uint8_t     fromBest;           /* the "best" value of where the peer has been found */
128    uint8_t     flags;              /* these match the added_f flags */
129    uint8_t     flags2;             /* flags that aren't defined in added_f */
130    uint8_t     uploadOnly;         /* UPLOAD_ONLY_ */
131    int8_t      seedProbability;    /* how likely is this to be a seed... [0..100] or -1 for unknown */
132    int8_t      blocklisted;        /* -1 for unknown, TRUE for blocklisted, FALSE for not blocklisted */
133
134    tr_port     port;
135    tr_bool     utp_failed;         /* We recently failed to connect over uTP */
136    uint16_t    numFails;
137    time_t      time;               /* when the peer's connection status last changed */
138    time_t      piece_data_time;
139
140    time_t      lastConnectionAttemptAt;
141    time_t      lastConnectionAt;
142
143    /* similar to a TTL field, but less rigid --
144     * if the swarm is small, the atom will be kept past this date. */
145    time_t      shelf_date;
146    tr_peer   * peer;               /* will be NULL if not connected */
147    tr_address  addr;
148};
149
150#ifdef NDEBUG
151#define tr_isAtom(a) (TRUE)
152#else
153static tr_bool
154tr_isAtom( const struct peer_atom * atom )
155{
156    return ( atom != NULL )
157        && ( atom->fromFirst < TR_PEER_FROM__MAX )
158        && ( atom->fromBest < TR_PEER_FROM__MAX )
159        && ( tr_isAddress( &atom->addr ) );
160}
161#endif
162
163static const char*
164tr_atomAddrStr( const struct peer_atom * atom )
165{
166    return atom ? tr_peerIoAddrStr( &atom->addr, atom->port ) : "[no atom]";
167}
168
169struct block_request
170{
171    tr_block_index_t block;
172    tr_peer * peer;
173    time_t sentAt;
174};
175
176struct weighted_piece
177{
178    tr_piece_index_t index;
179    int16_t salt;
180    int16_t requestCount;
181};
182
183enum piece_sort_state
184{
185    PIECES_UNSORTED,
186    PIECES_SORTED_BY_INDEX,
187    PIECES_SORTED_BY_WEIGHT
188};
189
190/** @brief Opaque, per-torrent data structure for peer connection information */
191typedef struct tr_torrent_peers
192{
193    tr_ptrArray                outgoingHandshakes; /* tr_handshake */
194    tr_ptrArray                pool; /* struct peer_atom */
195    tr_ptrArray                peers; /* tr_peer */
196    tr_ptrArray                webseeds; /* tr_webseed */
197
198    tr_torrent               * tor;
199    struct tr_peerMgr        * manager;
200
201    tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
202    int                        optimisticUnchokeTimeScaler;
203
204    tr_bool                    isRunning;
205    tr_bool                    needsCompletenessCheck;
206
207    struct block_request     * requests;
208    int                        requestCount;
209    int                        requestAlloc;
210
211    struct weighted_piece    * pieces;
212    int                        pieceCount;
213    enum piece_sort_state      pieceSortState;
214
215    /* An array of pieceCount items stating how many peers have each piece.
216       This is used to help us for downloading pieces "rarest first."
217       This may be NULL if we don't have metainfo yet, or if we're not
218       downloading and don't care about rarity */
219    uint16_t                 * pieceReplication;
220    size_t                     pieceReplicationSize;
221
222    int                        interestedCount;
223    int                        maxPeers;
224    time_t                     lastCancel;
225
226    /* Before the endgame this should be 0. In endgame, is contains the average
227     * number of pending requests per peer. Only peers which have more pending
228     * requests are considered 'fast' are allowed to request a block that's
229     * already been requested from another (slower?) peer. */
230    int                        endgame;
231}
232Torrent;
233
234struct tr_peerMgr
235{
236    tr_session    * session;
237    tr_ptrArray     incomingHandshakes; /* tr_handshake */
238    struct event  * bandwidthTimer;
239    struct event  * rechokeTimer;
240    struct event  * refillUpkeepTimer;
241    struct event  * atomTimer;
242};
243
244#define tordbg( t, ... ) \
245    do { \
246        if( tr_deepLoggingIsActive( ) ) \
247            tr_deepLog( __FILE__, __LINE__, tr_torrentName( t->tor ), __VA_ARGS__ ); \
248    } while( 0 )
249
250#define dbgmsg( ... ) \
251    do { \
252        if( tr_deepLoggingIsActive( ) ) \
253            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
254    } while( 0 )
255
256/**
257***
258**/
259
260static inline void
261managerLock( const struct tr_peerMgr * manager )
262{
263    tr_sessionLock( manager->session );
264}
265
266static inline void
267managerUnlock( const struct tr_peerMgr * manager )
268{
269    tr_sessionUnlock( manager->session );
270}
271
272static inline void
273torrentLock( Torrent * torrent )
274{
275    managerLock( torrent->manager );
276}
277
278static inline void
279torrentUnlock( Torrent * torrent )
280{
281    managerUnlock( torrent->manager );
282}
283
284static inline int
285torrentIsLocked( const Torrent * t )
286{
287    return tr_sessionIsLocked( t->manager->session );
288}
289
290/**
291***
292**/
293
294static int
295handshakeCompareToAddr( const void * va, const void * vb )
296{
297    const tr_handshake * a = va;
298
299    return tr_compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
300}
301
302static int
303handshakeCompare( const void * a, const void * b )
304{
305    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
306}
307
308static inline tr_handshake*
309getExistingHandshake( tr_ptrArray * handshakes, const tr_address * addr )
310{
311    if( tr_ptrArrayEmpty( handshakes ) )
312        return NULL;
313
314    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
315}
316
317static int
318comparePeerAtomToAddress( const void * va, const void * vb )
319{
320    const struct peer_atom * a = va;
321
322    return tr_compareAddresses( &a->addr, vb );
323}
324
325static int
326compareAtomsByAddress( const void * va, const void * vb )
327{
328    const struct peer_atom * b = vb;
329
330    assert( tr_isAtom( b ) );
331
332    return comparePeerAtomToAddress( va, &b->addr );
333}
334
335/**
336***
337**/
338
339const tr_address *
340tr_peerAddress( const tr_peer * peer )
341{
342    return &peer->atom->addr;
343}
344
345static Torrent*
346getExistingTorrent( tr_peerMgr *    manager,
347                    const uint8_t * hash )
348{
349    tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash );
350
351    return tor == NULL ? NULL : tor->torrentPeers;
352}
353
354static int
355peerCompare( const void * a, const void * b )
356{
357    return tr_compareAddresses( tr_peerAddress( a ), tr_peerAddress( b ) );
358}
359
360static struct peer_atom*
361getExistingAtom( const Torrent    * t,
362                 const tr_address * addr )
363{
364    Torrent * tt = (Torrent*)t;
365    assert( torrentIsLocked( t ) );
366    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
367}
368
369static tr_bool
370peerIsInUse( const Torrent * ct, const struct peer_atom * atom )
371{
372    Torrent * t = (Torrent*) ct;
373
374    assert( torrentIsLocked ( t ) );
375
376    return ( atom->peer != NULL )
377        || getExistingHandshake( &t->outgoingHandshakes, &atom->addr )
378        || getExistingHandshake( &t->manager->incomingHandshakes, &atom->addr );
379}
380
381static tr_peer*
382peerConstructor( struct peer_atom * atom )
383{
384    tr_peer * peer = tr_new0( tr_peer, 1 );
385
386    tr_bitsetConstructor( &peer->have, 0 );
387
388    peer->atom = atom;
389    atom->peer = peer;
390
391    peer->blocksSentToClient  = tr_historyNew( CANCEL_HISTORY_SEC, ( RECHOKE_PERIOD_MSEC / 1000 ) );
392    peer->blocksSentToPeer    = tr_historyNew( CANCEL_HISTORY_SEC, ( RECHOKE_PERIOD_MSEC / 1000 ) );
393    peer->cancelsSentToClient = tr_historyNew( CANCEL_HISTORY_SEC, ( RECHOKE_PERIOD_MSEC / 1000 ) );
394    peer->cancelsSentToPeer   = tr_historyNew( CANCEL_HISTORY_SEC, ( REFILL_UPKEEP_PERIOD_MSEC / 1000 ) );
395
396    return peer;
397}
398
399static tr_peer*
400getPeer( Torrent * torrent, struct peer_atom * atom )
401{
402    tr_peer * peer;
403
404    assert( torrentIsLocked( torrent ) );
405
406    peer = atom->peer;
407
408    if( peer == NULL )
409    {
410        peer = peerConstructor( atom );
411        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
412    }
413
414    return peer;
415}
416
417static void peerDeclinedAllRequests( Torrent *, const tr_peer * );
418
419static void
420peerDestructor( Torrent * t, tr_peer * peer )
421{
422    assert( peer != NULL );
423
424    peerDeclinedAllRequests( t, peer );
425
426    if( peer->msgs != NULL )
427        tr_peerMsgsFree( peer->msgs );
428
429    tr_peerIoClear( peer->io );
430    tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
431
432    tr_historyFree( peer->blocksSentToClient  );
433    tr_historyFree( peer->blocksSentToPeer    );
434    tr_historyFree( peer->cancelsSentToClient );
435    tr_historyFree( peer->cancelsSentToPeer   );
436
437    tr_bitsetDestructor( &peer->have );
438    tr_bitfieldFree( peer->blame );
439    tr_free( peer->client );
440    peer->atom->peer = NULL;
441
442    tr_free( peer );
443}
444
445static tr_bool
446replicationExists( const Torrent * t )
447{
448    return t->pieceReplication != NULL;
449}
450
451static void
452replicationFree( Torrent * t )
453{
454    tr_free( t->pieceReplication );
455    t->pieceReplication = NULL;
456    t->pieceReplicationSize = 0;
457}
458
459static void
460replicationNew( Torrent * t )
461{
462    tr_piece_index_t piece_i;
463    const tr_piece_index_t piece_count = t->tor->info.pieceCount;
464    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
465    const int peer_count = tr_ptrArraySize( &t->peers );
466
467    assert( !replicationExists( t ) );
468
469    t->pieceReplicationSize = piece_count;
470    t->pieceReplication = tr_new0( uint16_t, piece_count );
471
472    for( piece_i=0; piece_i<piece_count; ++piece_i )
473    {
474        int peer_i;
475        uint16_t r = 0;
476
477        for( peer_i=0; peer_i<peer_count; ++peer_i )
478            if( tr_bitsetHas( &peers[peer_i]->have, piece_i ) )
479                ++r;
480
481        t->pieceReplication[piece_i] = r;
482    }
483}
484
485static void
486torrentDestructor( void * vt )
487{
488    Torrent * t = vt;
489
490    assert( t );
491    assert( !t->isRunning );
492    assert( torrentIsLocked( t ) );
493    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
494    assert( tr_ptrArrayEmpty( &t->peers ) );
495
496    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
497    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
498    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
499    tr_ptrArrayDestruct( &t->peers, NULL );
500
501    replicationFree( t );
502
503    tr_free( t->requests );
504    tr_free( t->pieces );
505    tr_free( t );
506}
507
508static void peerCallbackFunc( tr_peer *, const tr_peer_event *, void * );
509
510static Torrent*
511torrentConstructor( tr_peerMgr * manager,
512                    tr_torrent * tor )
513{
514    int       i;
515    Torrent * t;
516
517    t = tr_new0( Torrent, 1 );
518    t->manager = manager;
519    t->tor = tor;
520    t->pool = TR_PTR_ARRAY_INIT;
521    t->peers = TR_PTR_ARRAY_INIT;
522    t->webseeds = TR_PTR_ARRAY_INIT;
523    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
524
525    for( i = 0; i < tor->info.webseedCount; ++i )
526    {
527        tr_webseed * w =
528            tr_webseedNew( tor, tor->info.webseeds[i], peerCallbackFunc, t );
529        tr_ptrArrayAppend( &t->webseeds, w );
530    }
531
532    return t;
533}
534
535tr_peerMgr*
536tr_peerMgrNew( tr_session * session )
537{
538    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
539    m->session = session;
540    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
541    return m;
542}
543
544static void
545deleteTimer( struct event ** t )
546{
547    if( *t != NULL )
548    {
549        event_free( *t );
550        *t = NULL;
551    }
552}
553
554static void
555deleteTimers( struct tr_peerMgr * m )
556{
557    deleteTimer( &m->atomTimer );
558    deleteTimer( &m->bandwidthTimer );
559    deleteTimer( &m->rechokeTimer );
560    deleteTimer( &m->refillUpkeepTimer );
561}
562
563void
564tr_peerMgrFree( tr_peerMgr * manager )
565{
566    managerLock( manager );
567
568    deleteTimers( manager );
569
570    /* free the handshakes. Abort invokes handshakeDoneCB(), which removes
571     * the item from manager->handshakes, so this is a little roundabout... */
572    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
573        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
574
575    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
576
577    managerUnlock( manager );
578    tr_free( manager );
579}
580
581static int
582clientIsDownloadingFrom( const tr_torrent * tor, const tr_peer * peer )
583{
584    if( !tr_torrentHasMetadata( tor ) )
585        return TRUE;
586
587    return peer->clientIsInterested && !peer->clientIsChoked;
588}
589
590static int
591clientIsUploadingTo( const tr_peer * peer )
592{
593    return peer->peerIsInterested && !peer->peerIsChoked;
594}
595
596/***
597****
598***/
599
600void
601tr_peerMgrOnBlocklistChanged( tr_peerMgr * mgr )
602{
603    tr_torrent * tor = NULL;
604    tr_session * session = mgr->session;
605
606    /* we cache whether or not a peer is blocklisted...
607       since the blocklist has changed, erase that cached value */
608    while(( tor = tr_torrentNext( session, tor )))
609    {
610        int i;
611        Torrent * t = tor->torrentPeers;
612        const int n = tr_ptrArraySize( &t->pool );
613        for( i=0; i<n; ++i ) {
614            struct peer_atom * atom = tr_ptrArrayNth( &t->pool, i );
615            atom->blocklisted = -1;
616        }
617    }
618}
619
620static tr_bool
621isAtomBlocklisted( tr_session * session, struct peer_atom * atom )
622{
623    if( atom->blocklisted < 0 )
624        atom->blocklisted = tr_sessionIsAddressBlocked( session, &atom->addr );
625
626    assert( tr_isBool( atom->blocklisted ) );
627    return atom->blocklisted;
628}
629
630
631/***
632****
633***/
634
635static void
636atomSetSeedProbability( struct peer_atom * atom, int seedProbability )
637{
638    assert( atom != NULL );
639    assert( -1<=seedProbability && seedProbability<=100 );
640
641    atom->seedProbability = seedProbability;
642
643    if( seedProbability == 100 )
644        atom->flags |= ADDED_F_SEED_FLAG;
645    else if( seedProbability != -1 )
646        atom->flags &= ~ADDED_F_SEED_FLAG;
647}
648
649static void
650atomSetSeed( struct peer_atom * atom )
651{
652    atomSetSeedProbability( atom, 100 );
653}
654
655static inline tr_bool
656atomIsSeed( const struct peer_atom * atom )
657{
658    return atom->seedProbability == 100;
659}
660
661tr_bool
662tr_peerMgrPeerIsSeed( const tr_torrent  * tor,
663                      const tr_address  * addr )
664{
665    tr_bool isSeed = FALSE;
666    const Torrent * t = tor->torrentPeers;
667    const struct peer_atom * atom = getExistingAtom( t, addr );
668
669    if( atom )
670        isSeed = atomIsSeed( atom );
671
672    return isSeed;
673}
674
675void
676tr_peerMgrSetUtpSupported( tr_torrent * tor, const tr_address * addr )
677{
678    struct peer_atom * atom = getExistingAtom( tor->torrentPeers, addr );
679
680    if( atom )
681        atom->flags |= ADDED_F_UTP_FLAGS;
682}
683
684void
685tr_peerMgrSetUtpFailed( tr_torrent *tor, const tr_address *addr, tr_bool failed )
686{
687    struct peer_atom * atom = getExistingAtom( tor->torrentPeers, addr );
688
689    if( atom )
690        atom->utp_failed = failed;
691}
692
693
694/**
695***  REQUESTS
696***
697*** There are two data structures associated with managing block requests:
698***
699*** 1. Torrent::requests, an array of "struct block_request" which keeps
700***    track of which blocks have been requested, and when, and by which peers.
701***    This is list is used for (a) cancelling requests that have been pending
702***    for too long and (b) avoiding duplicate requests before endgame.
703***
704*** 2. Torrent::pieces, an array of "struct weighted_piece" which lists the
705***    pieces that we want to request. It's used to decide which blocks to
706***    return next when tr_peerMgrGetBlockRequests() is called.
707**/
708
709/**
710*** struct block_request
711**/
712
713static int
714compareReqByBlock( const void * va, const void * vb )
715{
716    const struct block_request * a = va;
717    const struct block_request * b = vb;
718
719    /* primary key: block */
720    if( a->block < b->block ) return -1;
721    if( a->block > b->block ) return 1;
722
723    /* secondary key: peer */
724    if( a->peer < b->peer ) return -1;
725    if( a->peer > b->peer ) return 1;
726
727    return 0;
728}
729
730static void
731requestListAdd( Torrent * t, tr_block_index_t block, tr_peer * peer )
732{
733    struct block_request key;
734
735    /* ensure enough room is available... */
736    if( t->requestCount + 1 >= t->requestAlloc )
737    {
738        const int CHUNK_SIZE = 128;
739        t->requestAlloc += CHUNK_SIZE;
740        t->requests = tr_renew( struct block_request,
741                                t->requests, t->requestAlloc );
742    }
743
744    /* populate the record we're inserting */
745    key.block = block;
746    key.peer = peer;
747    key.sentAt = tr_time( );
748
749    /* insert the request to our array... */
750    {
751        tr_bool exact;
752        const int pos = tr_lowerBound( &key, t->requests, t->requestCount,
753                                       sizeof( struct block_request ),
754                                       compareReqByBlock, &exact );
755        assert( !exact );
756        memmove( t->requests + pos + 1,
757                 t->requests + pos,
758                 sizeof( struct block_request ) * ( t->requestCount++ - pos ) );
759        t->requests[pos] = key;
760    }
761
762    if( peer != NULL )
763    {
764        ++peer->pendingReqsToPeer;
765        assert( peer->pendingReqsToPeer >= 0 );
766    }
767
768    /*fprintf( stderr, "added request of block %lu from peer %s... "
769                       "there are now %d block\n",
770                       (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
771}
772
773static struct block_request *
774requestListLookup( Torrent * t, tr_block_index_t block, const tr_peer * peer )
775{
776    struct block_request key;
777    key.block = block;
778    key.peer = (tr_peer*) peer;
779
780    return bsearch( &key, t->requests, t->requestCount,
781                    sizeof( struct block_request ),
782                    compareReqByBlock );
783}
784
785/**
786 * Find the peers are we currently requesting the block
787 * with index @a block from and append them to @a peerArr.
788 */
789static void
790getBlockRequestPeers( Torrent * t, tr_block_index_t block,
791                      tr_ptrArray * peerArr )
792{
793    tr_bool exact;
794    int i, pos;
795    struct block_request key;
796
797    key.block = block;
798    key.peer = NULL;
799    pos = tr_lowerBound( &key, t->requests, t->requestCount,
800                         sizeof( struct block_request ),
801                         compareReqByBlock, &exact );
802
803    assert( !exact ); /* shouldn't have a request with .peer == NULL */
804
805    for( i = pos; i < t->requestCount; ++i )
806    {
807        if( t->requests[i].block != block )
808            break;
809        tr_ptrArrayAppend( peerArr, t->requests[i].peer );
810    }
811}
812
813static void
814decrementPendingReqCount( const struct block_request * b )
815{
816    if( b->peer != NULL )
817        if( b->peer->pendingReqsToPeer > 0 )
818            --b->peer->pendingReqsToPeer;
819}
820
821static void
822requestListRemove( Torrent * t, tr_block_index_t block, const tr_peer * peer )
823{
824    const struct block_request * b = requestListLookup( t, block, peer );
825    if( b != NULL )
826    {
827        const int pos = b - t->requests;
828        assert( pos < t->requestCount );
829
830        decrementPendingReqCount( b );
831
832        tr_removeElementFromArray( t->requests,
833                                   pos,
834                                   sizeof( struct block_request ),
835                                   t->requestCount-- );
836
837        /*fprintf( stderr, "removing request of block %lu from peer %s... "
838                           "there are now %d block requests left\n",
839                           (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
840    }
841}
842
843static int
844countActiveWebseeds( const Torrent * t )
845{
846    int activeCount = 0;
847    const tr_webseed ** w = (const tr_webseed **) tr_ptrArrayBase( &t->webseeds );
848    const tr_webseed ** const wend = w + tr_ptrArraySize( &t->webseeds );
849
850    for( ; w!=wend; ++w )
851        if( tr_webseedIsActive( *w ) )
852            ++activeCount;
853
854    return activeCount;
855}
856
857static void
858updateEndgame( Torrent * t )
859{
860    const tr_torrent * tor = t->tor;
861    const tr_block_index_t missing = tr_cpBlocksMissing( &tor->completion );
862
863    assert( t->requestCount >= 0 );
864
865    if( (tr_block_index_t) t->requestCount < missing )
866    {
867        /* not in endgame */
868        t->endgame = 0;
869    }
870    else if( !t->endgame ) /* only recalculate when endgame first begins */
871    {
872        int numDownloading = 0;
873        const tr_peer ** p = (const tr_peer **) tr_ptrArrayBase( &t->peers );
874        const tr_peer ** const pend = p + tr_ptrArraySize( &t->peers );
875
876        /* add the active bittorrent peers... */
877        for( ; p!=pend; ++p )
878            if( (*p)->pendingReqsToPeer > 0 )
879                ++numDownloading;
880
881        /* add the active webseeds... */
882        numDownloading += countActiveWebseeds( t );
883
884        /* average number of pending requests per downloading peer */
885        t->endgame = t->requestCount / MAX( numDownloading, 1 );
886    }
887}
888
889
890/****
891*****
892*****  Piece List Manipulation / Accessors
893*****
894****/
895
896static inline void
897invalidatePieceSorting( Torrent * t )
898{
899    t->pieceSortState = PIECES_UNSORTED;
900}
901
902const tr_torrent * weightTorrent;
903
904const uint16_t * weightReplication;
905
906static void
907setComparePieceByWeightTorrent( Torrent * t )
908{
909    if( !replicationExists( t ) )
910        replicationNew( t );
911
912    weightTorrent = t->tor;
913    weightReplication = t->pieceReplication;
914}
915
916/* we try to create a "weight" s.t. high-priority pieces come before others,
917 * and that partially-complete pieces come before empty ones. */
918static int
919comparePieceByWeight( const void * va, const void * vb )
920{
921    const struct weighted_piece * a = va;
922    const struct weighted_piece * b = vb;
923    int ia, ib, missing, pending;
924    const tr_torrent * tor = weightTorrent;
925    const uint16_t * rep = weightReplication;
926
927    /* primary key: weight */
928    missing = tr_cpMissingBlocksInPiece( &tor->completion, a->index );
929    pending = a->requestCount;
930    ia = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
931    missing = tr_cpMissingBlocksInPiece( &tor->completion, b->index );
932    pending = b->requestCount;
933    ib = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
934    if( ia < ib ) return -1;
935    if( ia > ib ) return 1;
936
937    /* secondary key: higher priorities go first */
938    ia = tor->info.pieces[a->index].priority;
939    ib = tor->info.pieces[b->index].priority;
940    if( ia > ib ) return -1;
941    if( ia < ib ) return 1;
942
943    /* tertiary key: rarest first. */
944    ia = rep[a->index];
945    ib = rep[b->index];
946    if( ia < ib ) return -1;
947    if( ia > ib ) return 1;
948
949    /* quaternary key: random */
950    if( a->salt < b->salt ) return -1;
951    if( a->salt > b->salt ) return 1;
952
953    /* okay, they're equal */
954    return 0;
955}
956
957static int
958comparePieceByIndex( const void * va, const void * vb )
959{
960    const struct weighted_piece * a = va;
961    const struct weighted_piece * b = vb;
962    if( a->index < b->index ) return -1;
963    if( a->index > b->index ) return 1;
964    return 0;
965}
966
967static void
968pieceListSort( Torrent * t, enum piece_sort_state state )
969{
970    assert( state==PIECES_SORTED_BY_INDEX
971         || state==PIECES_SORTED_BY_WEIGHT );
972
973
974    if( state == PIECES_SORTED_BY_WEIGHT )
975    {
976        setComparePieceByWeightTorrent( t );
977        qsort( t->pieces, t->pieceCount, sizeof( struct weighted_piece ), comparePieceByWeight );
978    }
979    else
980        qsort( t->pieces, t->pieceCount, sizeof( struct weighted_piece ), comparePieceByIndex );
981
982    t->pieceSortState = state;
983}
984
985/**
986 * These functions are useful for testing, but too expensive for nightly builds.
987 * let's leave it disabled but add an easy hook to compile it back in
988 */
989#if 1
990#define assertWeightedPiecesAreSorted(t)
991#define assertReplicationCountIsExact(t)
992#else
993static void
994assertWeightedPiecesAreSorted( Torrent * t )
995{
996    if( !t->endgame )
997    {
998        int i;
999        setComparePieceByWeightTorrent( t );
1000        for( i=0; i<t->pieceCount-1; ++i )
1001            assert( comparePieceByWeight( &t->pieces[i], &t->pieces[i+1] ) <= 0 );
1002    }
1003}
1004static void
1005assertReplicationCountIsExact( Torrent * t )
1006{
1007    /* This assert might fail due to errors of implementations in other
1008     * clients. It happens when receiving duplicate bitfields/HaveAll/HaveNone
1009     * from a client. If a such a behavior is noticed,
1010     * a bug report should be filled to the faulty client. */
1011
1012    size_t piece_i;
1013    const uint16_t * rep = t->pieceReplication;
1014    const size_t piece_count = t->pieceReplicationSize;
1015    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1016    const int peer_count = tr_ptrArraySize( &t->peers );
1017
1018    assert( piece_count == t->tor->info.pieceCount );
1019
1020    for( piece_i=0; piece_i<piece_count; ++piece_i )
1021    {
1022        int peer_i;
1023        uint16_t r = 0;
1024
1025        for( peer_i=0; peer_i<peer_count; ++peer_i )
1026            if( tr_bitsetHas( &peers[peer_i]->have, piece_i ) )
1027                ++r;
1028
1029        assert( rep[piece_i] == r );
1030    }
1031}
1032#endif
1033
1034static struct weighted_piece *
1035pieceListLookup( Torrent * t, tr_piece_index_t index )
1036{
1037    int i;
1038
1039    for( i=0; i<t->pieceCount; ++i )
1040        if( t->pieces[i].index == index )
1041            return &t->pieces[i];
1042
1043    return NULL;
1044}
1045
1046static void
1047pieceListRebuild( Torrent * t )
1048{
1049
1050    if( !tr_torrentIsSeed( t->tor ) )
1051    {
1052        tr_piece_index_t i;
1053        tr_piece_index_t * pool;
1054        tr_piece_index_t poolCount = 0;
1055        const tr_torrent * tor = t->tor;
1056        const tr_info * inf = tr_torrentInfo( tor );
1057        struct weighted_piece * pieces;
1058        int pieceCount;
1059
1060        /* build the new list */
1061        pool = tr_new( tr_piece_index_t, inf->pieceCount );
1062        for( i=0; i<inf->pieceCount; ++i )
1063            if( !inf->pieces[i].dnd )
1064                if( !tr_cpPieceIsComplete( &tor->completion, i ) )
1065                    pool[poolCount++] = i;
1066        pieceCount = poolCount;
1067        pieces = tr_new0( struct weighted_piece, pieceCount );
1068        for( i=0; i<poolCount; ++i ) {
1069            struct weighted_piece * piece = pieces + i;
1070            piece->index = pool[i];
1071            piece->requestCount = 0;
1072            piece->salt = tr_cryptoWeakRandInt( 4096 );
1073        }
1074
1075        /* if we already had a list of pieces, merge it into
1076         * the new list so we don't lose its requestCounts */
1077        if( t->pieces != NULL )
1078        {
1079            struct weighted_piece * o = t->pieces;
1080            struct weighted_piece * oend = o + t->pieceCount;
1081            struct weighted_piece * n = pieces;
1082            struct weighted_piece * nend = n + pieceCount;
1083
1084            pieceListSort( t, PIECES_SORTED_BY_INDEX );
1085
1086            while( o!=oend && n!=nend ) {
1087                if( o->index < n->index )
1088                    ++o;
1089                else if( o->index > n->index )
1090                    ++n;
1091                else
1092                    *n++ = *o++;
1093            }
1094
1095            tr_free( t->pieces );
1096        }
1097
1098        t->pieces = pieces;
1099        t->pieceCount = pieceCount;
1100
1101        pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
1102
1103        /* cleanup */
1104        tr_free( pool );
1105    }
1106}
1107
1108static void
1109pieceListRemovePiece( Torrent * t, tr_piece_index_t piece )
1110{
1111    struct weighted_piece * p;
1112
1113    if(( p = pieceListLookup( t, piece )))
1114    {
1115        const int pos = p - t->pieces;
1116
1117        tr_removeElementFromArray( t->pieces,
1118                                   pos,
1119                                   sizeof( struct weighted_piece ),
1120                                   t->pieceCount-- );
1121
1122        if( t->pieceCount == 0 )
1123        {
1124            tr_free( t->pieces );
1125            t->pieces = NULL;
1126        }
1127    }
1128}
1129
1130static void
1131pieceListResortPiece( Torrent * t, struct weighted_piece * p )
1132{
1133    int pos;
1134    tr_bool isSorted = TRUE;
1135
1136    if( p == NULL )
1137        return;
1138
1139    /* is the torrent already sorted? */
1140    pos = p - t->pieces;
1141    setComparePieceByWeightTorrent( t );
1142    if( isSorted && ( pos > 0 ) && ( comparePieceByWeight( p-1, p ) > 0 ) )
1143        isSorted = FALSE;
1144    if( isSorted && ( pos < t->pieceCount - 1 ) && ( comparePieceByWeight( p, p+1 ) > 0 ) )
1145        isSorted = FALSE;
1146
1147    if( t->pieceSortState != PIECES_SORTED_BY_WEIGHT )
1148    {
1149       pieceListSort( t, PIECES_SORTED_BY_WEIGHT);
1150       isSorted = TRUE;
1151    }
1152
1153    /* if it's not sorted, move it around */
1154    if( !isSorted )
1155    {
1156        tr_bool exact;
1157        const struct weighted_piece tmp = *p;
1158
1159        tr_removeElementFromArray( t->pieces,
1160                                   pos,
1161                                   sizeof( struct weighted_piece ),
1162                                   t->pieceCount-- );
1163
1164        pos = tr_lowerBound( &tmp, t->pieces, t->pieceCount,
1165                             sizeof( struct weighted_piece ),
1166                             comparePieceByWeight, &exact );
1167
1168        memmove( &t->pieces[pos + 1],
1169                 &t->pieces[pos],
1170                 sizeof( struct weighted_piece ) * ( t->pieceCount++ - pos ) );
1171
1172        t->pieces[pos] = tmp;
1173    }
1174
1175    assertWeightedPiecesAreSorted( t );
1176}
1177
1178static void
1179pieceListRemoveRequest( Torrent * t, tr_block_index_t block )
1180{
1181    struct weighted_piece * p;
1182    const tr_piece_index_t index = tr_torBlockPiece( t->tor, block );
1183
1184    if( ((p = pieceListLookup( t, index ))) && ( p->requestCount > 0 ) )
1185    {
1186        --p->requestCount;
1187        pieceListResortPiece( t, p );
1188    }
1189}
1190
1191
1192/****
1193*****
1194*****  Replication count ( for rarest first policy )
1195*****
1196****/
1197
1198/**
1199 * Increase the replication count of this piece and sort it if the
1200 * piece list is already sorted
1201 */
1202static void
1203tr_incrReplicationOfPiece( Torrent * t, const size_t index )
1204{
1205    assert( replicationExists( t ) );
1206    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1207
1208    /* One more replication of this piece is present in the swarm */
1209    ++t->pieceReplication[index];
1210
1211    /* we only resort the piece if the list is already sorted */
1212    if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1213        pieceListResortPiece( t, pieceListLookup( t, index ) );
1214}
1215
1216/**
1217 * Increases the replication count of pieces present in the bitfield
1218 */
1219static void
1220tr_incrReplicationFromBitfield( Torrent * t, const tr_bitfield * b )
1221{
1222    size_t i;
1223    uint16_t * rep = t->pieceReplication;
1224    const size_t n = t->tor->info.pieceCount;
1225
1226    assert( replicationExists( t ) );
1227    assert( n == t->pieceReplicationSize );
1228    assert( tr_bitfieldTestFast( b, n-1 ) );
1229
1230    for( i=0; i<n; ++i )
1231        if( tr_bitfieldHas( b, i ) )
1232            ++rep[i];
1233
1234    if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1235        invalidatePieceSorting( t );
1236}
1237
1238/**
1239 * Increase the replication count of every piece
1240 */
1241static void
1242tr_incrReplication( Torrent * t )
1243{
1244    int i;
1245    const int n = t->pieceReplicationSize;
1246
1247    assert( replicationExists( t ) );
1248    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1249
1250    for( i=0; i<n; ++i )
1251        ++t->pieceReplication[i];
1252}
1253
1254/**
1255 * Decrease the replication count of pieces present in the bitset.
1256 */
1257static void
1258tr_decrReplicationFromBitset( Torrent * t, const tr_bitset * bitset )
1259{
1260    int i;
1261    const int n = t->pieceReplicationSize;
1262
1263    assert( replicationExists( t ) );
1264    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1265
1266    if( bitset->haveAll )
1267    {
1268        for( i=0; i<n; ++i )
1269            --t->pieceReplication[i];
1270    }
1271    else if ( !bitset->haveNone )
1272    {
1273        const tr_bitfield * const b = &bitset->bitfield;
1274
1275        for( i=0; i<n; ++i )
1276            if( tr_bitfieldHas( b, i ) )
1277                --t->pieceReplication[i];
1278
1279        if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1280            invalidatePieceSorting( t );
1281    }
1282}
1283
1284/**
1285***
1286**/
1287
1288void
1289tr_peerMgrRebuildRequests( tr_torrent * tor )
1290{
1291    assert( tr_isTorrent( tor ) );
1292
1293    pieceListRebuild( tor->torrentPeers );
1294}
1295
1296void
1297tr_peerMgrGetNextRequests( tr_torrent           * tor,
1298                           tr_peer              * peer,
1299                           int                    numwant,
1300                           tr_block_index_t     * setme,
1301                           int                  * numgot )
1302{
1303    int i;
1304    int got;
1305    Torrent * t;
1306    struct weighted_piece * pieces;
1307    const tr_bitset * have = &peer->have;
1308
1309    /* sanity clause */
1310    assert( tr_isTorrent( tor ) );
1311    assert( peer->clientIsInterested );
1312    assert( !peer->clientIsChoked );
1313    assert( numwant > 0 );
1314
1315    /* walk through the pieces and find blocks that should be requested */
1316    got = 0;
1317    t = tor->torrentPeers;
1318
1319    /* prep the pieces list */
1320    if( t->pieces == NULL )
1321        pieceListRebuild( t );
1322
1323    if( t->pieceSortState != PIECES_SORTED_BY_WEIGHT )
1324        pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
1325
1326    assertReplicationCountIsExact( t );
1327    assertWeightedPiecesAreSorted( t );
1328
1329    updateEndgame( t );
1330    pieces = t->pieces;
1331    for( i=0; i<t->pieceCount && got<numwant; ++i )
1332    {
1333        struct weighted_piece * p = pieces + i;
1334
1335        /* if the peer has this piece that we want... */
1336        if( tr_bitsetHas( have, p->index ) )
1337        {
1338            tr_block_index_t b = tr_torPieceFirstBlock( tor, p->index );
1339            const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, p->index );
1340            tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1341
1342            for( ; b!=e && got<numwant; ++b )
1343            {
1344                int peerCount;
1345                tr_peer ** peers;
1346
1347                /* don't request blocks we've already got */
1348                if( tr_cpBlockIsCompleteFast( &tor->completion, b ) )
1349                    continue;
1350
1351                /* always add peer if this block has no peers yet */
1352                tr_ptrArrayClear( &peerArr );
1353                getBlockRequestPeers( t, b, &peerArr );
1354                peers = (tr_peer **) tr_ptrArrayPeek( &peerArr, &peerCount );
1355                if( peerCount != 0 )
1356                {
1357                    /* don't make a second block request until the endgame */
1358                    if( !t->endgame )
1359                        continue;
1360
1361                    /* don't have more than two peers requesting this block */
1362                    if( peerCount > 1 )
1363                        continue;
1364
1365                    /* don't send the same request to the same peer twice */
1366                    if( peer == peers[0] )
1367                        continue;
1368
1369                    /* in the endgame allow an additional peer to download a
1370                       block but only if the peer seems to be handling requests
1371                       relatively fast */
1372                    if( peer->pendingReqsToPeer + numwant - got < t->endgame )
1373                        continue;
1374                }
1375
1376                /* update the caller's table */
1377                setme[got++] = b;
1378
1379                /* update our own tables */
1380                requestListAdd( t, b, peer );
1381                ++p->requestCount;
1382            }
1383
1384            tr_ptrArrayDestruct( &peerArr, NULL );
1385        }
1386    }
1387
1388    /* In most cases we've just changed the weights of a small number of pieces.
1389     * So rather than qsort()ing the entire array, it's faster to apply an
1390     * adaptive insertion sort algorithm. */
1391    if( got > 0 )
1392    {
1393        /* not enough requests || last piece modified */
1394        if ( i == t->pieceCount ) --i;
1395
1396        setComparePieceByWeightTorrent( t );
1397        while( --i >= 0 )
1398        {
1399            tr_bool exact;
1400
1401            /* relative position! */
1402            const int newpos = tr_lowerBound( &t->pieces[i], &t->pieces[i + 1],
1403                                              t->pieceCount - (i + 1),
1404                                              sizeof( struct weighted_piece ),
1405                                              comparePieceByWeight, &exact );
1406            if( newpos > 0 )
1407            {
1408                const struct weighted_piece piece = t->pieces[i];
1409                memmove( &t->pieces[i],
1410                         &t->pieces[i + 1],
1411                         sizeof( struct weighted_piece ) * ( newpos ) );
1412                t->pieces[i + newpos] = piece;
1413            }
1414        }
1415    }
1416
1417    assertWeightedPiecesAreSorted( t );
1418    *numgot = got;
1419}
1420
1421tr_bool
1422tr_peerMgrDidPeerRequest( const tr_torrent  * tor,
1423                          const tr_peer     * peer,
1424                          tr_block_index_t    block )
1425{
1426    const Torrent * t = tor->torrentPeers;
1427    return requestListLookup( (Torrent*)t, block, peer ) != NULL;
1428}
1429
1430/* cancel requests that are too old */
1431static void
1432refillUpkeep( int foo UNUSED, short bar UNUSED, void * vmgr )
1433{
1434    time_t now;
1435    time_t too_old;
1436    tr_torrent * tor;
1437    tr_peerMgr * mgr = vmgr;
1438    managerLock( mgr );
1439
1440    now = tr_time( );
1441    too_old = now - REQUEST_TTL_SECS;
1442
1443    tor = NULL;
1444    while(( tor = tr_torrentNext( mgr->session, tor )))
1445    {
1446        Torrent * t = tor->torrentPeers;
1447        const int n = t->requestCount;
1448        if( n > 0 )
1449        {
1450            int keepCount = 0;
1451            int cancelCount = 0;
1452            struct block_request * cancel = tr_new( struct block_request, n );
1453            const struct block_request * it;
1454            const struct block_request * end;
1455
1456            for( it=t->requests, end=it+n; it!=end; ++it )
1457            {
1458                if( ( it->sentAt <= too_old ) && it->peer->msgs && !tr_peerMsgsIsReadingBlock( it->peer->msgs, it->block ) )
1459                    cancel[cancelCount++] = *it;
1460                else
1461                {
1462                    if( it != &t->requests[keepCount] )
1463                        t->requests[keepCount] = *it;
1464                    keepCount++;
1465                }
1466            }
1467
1468            /* prune out the ones we aren't keeping */
1469            t->requestCount = keepCount;
1470
1471            /* send cancel messages for all the "cancel" ones */
1472            for( it=cancel, end=it+cancelCount; it!=end; ++it ) {
1473                if( ( it->peer != NULL ) && ( it->peer->msgs != NULL ) ) {
1474                    tr_historyAdd( it->peer->cancelsSentToPeer, now, 1 );
1475                    tr_peerMsgsCancel( it->peer->msgs, it->block );
1476                    decrementPendingReqCount( it );
1477                }
1478            }
1479
1480            /* decrement the pending request counts for the timed-out blocks */
1481            for( it=cancel, end=it+cancelCount; it!=end; ++it )
1482                pieceListRemoveRequest( t, it->block );
1483
1484            /* cleanup loop */
1485            tr_free( cancel );
1486        }
1487    }
1488
1489    tr_timerAddMsec( mgr->refillUpkeepTimer, REFILL_UPKEEP_PERIOD_MSEC );
1490    managerUnlock( mgr );
1491}
1492
1493static void
1494addStrike( Torrent * t, tr_peer * peer )
1495{
1496    tordbg( t, "increasing peer %s strike count to %d",
1497            tr_atomAddrStr( peer->atom ), peer->strikes + 1 );
1498
1499    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
1500    {
1501        struct peer_atom * atom = peer->atom;
1502        atom->flags2 |= MYFLAG_BANNED;
1503        peer->doPurge = 1;
1504        tordbg( t, "banning peer %s", tr_atomAddrStr( atom ) );
1505    }
1506}
1507
1508static void
1509gotBadPiece( Torrent * t, tr_piece_index_t pieceIndex )
1510{
1511    tr_torrent *   tor = t->tor;
1512    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
1513
1514    tor->corruptCur += byteCount;
1515    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1516
1517    tr_announcerAddBytes( tor, TR_ANN_CORRUPT, byteCount );
1518}
1519
1520static void
1521peerSuggestedPiece( Torrent            * t UNUSED,
1522                    tr_peer            * peer UNUSED,
1523                    tr_piece_index_t     pieceIndex UNUSED,
1524                    int                  isFastAllowed UNUSED )
1525{
1526#if 0
1527    assert( t );
1528    assert( peer );
1529    assert( peer->msgs );
1530
1531    /* is this a valid piece? */
1532    if(  pieceIndex >= t->tor->info.pieceCount )
1533        return;
1534
1535    /* don't ask for it if we've already got it */
1536    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
1537        return;
1538
1539    /* don't ask for it if they don't have it */
1540    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
1541        return;
1542
1543    /* don't ask for it if we're choked and it's not fast */
1544    if( !isFastAllowed && peer->clientIsChoked )
1545        return;
1546
1547    /* request the blocks that we don't have in this piece */
1548    {
1549        tr_block_index_t block;
1550        const tr_torrent * tor = t->tor;
1551        const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
1552        const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
1553
1554        for( block=start; block<end; ++block )
1555        {
1556            if( !tr_cpBlockIsComplete( tor->completion, block ) )
1557            {
1558                const uint32_t offset = getBlockOffsetInPiece( tor, block );
1559                const uint32_t length = tr_torBlockCountBytes( tor, block );
1560                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
1561                incrementPieceRequests( t, pieceIndex );
1562            }
1563        }
1564    }
1565#endif
1566}
1567
1568static void
1569removeRequestFromTables( Torrent * t, tr_block_index_t block, const tr_peer * peer )
1570{
1571    requestListRemove( t, block, peer );
1572    pieceListRemoveRequest( t, block );
1573}
1574
1575/* peer choked us, or maybe it disconnected.
1576   either way we need to remove all its requests */
1577static void
1578peerDeclinedAllRequests( Torrent * t, const tr_peer * peer )
1579{
1580    int i, n;
1581    tr_block_index_t * blocks = tr_new( tr_block_index_t, t->requestCount );
1582
1583    for( i=n=0; i<t->requestCount; ++i )
1584        if( peer == t->requests[i].peer )
1585            blocks[n++] = t->requests[i].block;
1586
1587    for( i=0; i<n; ++i )
1588        removeRequestFromTables( t, blocks[i], peer );
1589
1590    tr_free( blocks );
1591}
1592
1593static void
1594peerCallbackFunc( tr_peer * peer, const tr_peer_event * e, void * vt )
1595{
1596    Torrent * t = vt;
1597
1598    torrentLock( t );
1599
1600    switch( e->eventType )
1601    {
1602        case TR_PEER_PEER_GOT_DATA:
1603        {
1604            const time_t now = tr_time( );
1605            tr_torrent * tor = t->tor;
1606
1607            if( e->wasPieceData )
1608            {
1609                tor->uploadedCur += e->length;
1610                tr_announcerAddBytes( tor, TR_ANN_UP, e->length );
1611                tr_torrentSetActivityDate( tor, now );
1612                tr_torrentSetDirty( tor );
1613            }
1614
1615            /* update the stats */
1616            if( e->wasPieceData )
1617                tr_statsAddUploaded( tor->session, e->length );
1618
1619            /* update our atom */
1620            if( peer && e->wasPieceData )
1621                peer->atom->piece_data_time = now;
1622
1623            break;
1624        }
1625
1626        case TR_PEER_CLIENT_GOT_HAVE:
1627            if( replicationExists( t ) ) {
1628                tr_incrReplicationOfPiece( t, e->pieceIndex );
1629                assertReplicationCountIsExact( t );
1630            }
1631            break;
1632
1633        case TR_PEER_CLIENT_GOT_HAVE_ALL:
1634            if( replicationExists( t ) ) {
1635                tr_incrReplication( t );
1636                assertReplicationCountIsExact( t );
1637            }
1638            break;
1639
1640        case TR_PEER_CLIENT_GOT_HAVE_NONE:
1641            /* noop */
1642            break;
1643
1644        case TR_PEER_CLIENT_GOT_BITFIELD:
1645            assert( e->bitfield != NULL );
1646            if( replicationExists( t ) ) {
1647                tr_incrReplicationFromBitfield( t, e->bitfield );
1648                assertReplicationCountIsExact( t );
1649            }
1650            break;
1651
1652        case TR_PEER_CLIENT_GOT_REJ:
1653            removeRequestFromTables( t, _tr_block( t->tor, e->pieceIndex, e->offset ), peer );
1654            break;
1655
1656        case TR_PEER_CLIENT_GOT_CHOKE:
1657            peerDeclinedAllRequests( t, peer );
1658            break;
1659
1660        case TR_PEER_CLIENT_GOT_PORT:
1661            if( peer )
1662                peer->atom->port = e->port;
1663            break;
1664
1665        case TR_PEER_CLIENT_GOT_SUGGEST:
1666            if( peer )
1667                peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
1668            break;
1669
1670        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1671            if( peer )
1672                peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
1673            break;
1674
1675        case TR_PEER_CLIENT_GOT_DATA:
1676        {
1677            const time_t now = tr_time( );
1678            tr_torrent * tor = t->tor;
1679
1680            if( e->wasPieceData )
1681            {
1682                tor->downloadedCur += e->length;
1683                tr_torrentSetActivityDate( tor, now );
1684                tr_torrentSetDirty( tor );
1685            }
1686
1687            /* update the stats */
1688            if( e->wasPieceData )
1689                tr_statsAddDownloaded( tor->session, e->length );
1690
1691            /* update our atom */
1692            if( peer && peer->atom && e->wasPieceData )
1693                peer->atom->piece_data_time = now;
1694
1695            break;
1696        }
1697
1698        case TR_PEER_PEER_PROGRESS:
1699        {
1700            if( peer )
1701            {
1702                struct peer_atom * atom = peer->atom;
1703                if( e->progress >= 1.0 ) {
1704                    tordbg( t, "marking peer %s as a seed", tr_atomAddrStr( atom ) );
1705                    atomSetSeed( atom );
1706                }
1707            }
1708            break;
1709        }
1710
1711        case TR_PEER_CLIENT_GOT_BLOCK:
1712        {
1713            tr_torrent * tor = t->tor;
1714            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1715            int i, peerCount;
1716            tr_peer ** peers;
1717            tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1718
1719            removeRequestFromTables( t, block, peer );
1720            getBlockRequestPeers( t, block, &peerArr );
1721            peers = (tr_peer **) tr_ptrArrayPeek( &peerArr, &peerCount );
1722
1723            /* remove additional block requests and send cancel to peers */
1724            for( i=0; i<peerCount; i++ ) {
1725                tr_peer * p = peers[i];
1726                assert( p != peer );
1727                if( p->msgs ) {
1728                    tr_historyAdd( p->cancelsSentToPeer, tr_time( ), 1 );
1729                    tr_peerMsgsCancel( p->msgs, block );
1730                }
1731                removeRequestFromTables( t, block, p );
1732            }
1733
1734            tr_ptrArrayDestruct( &peerArr, FALSE );
1735
1736            if( peer && peer->blocksSentToClient )
1737                tr_historyAdd( peer->blocksSentToClient, tr_time( ), 1 );
1738
1739            if( tr_cpBlockIsComplete( &tor->completion, block ) )
1740            {
1741                /* we already have this block... */
1742                const uint32_t n = tr_torBlockCountBytes( tor, block );
1743                tor->downloadedCur -= MIN( tor->downloadedCur, n );
1744                tordbg( t, "we have this block already..." );
1745            }
1746            else
1747            {
1748                tr_cpBlockAdd( &tor->completion, block );
1749                pieceListResortPiece( t, pieceListLookup( t, e->pieceIndex ) );
1750                tr_torrentSetDirty( tor );
1751
1752                if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1753                {
1754                    const tr_piece_index_t p = e->pieceIndex;
1755                    const tr_bool ok = tr_torrentCheckPiece( tor, p );
1756
1757                    tordbg( t, "[LAZY] checked just-completed piece %zu", (size_t)p );
1758
1759                    if( !ok )
1760                    {
1761                        tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1762                                   (unsigned long)p );
1763                    }
1764
1765                    tr_peerMgrSetBlame( tor, p, ok );
1766
1767                    if( !ok )
1768                    {
1769                        gotBadPiece( t, p );
1770                    }
1771                    else
1772                    {
1773                        int i;
1774                        int peerCount;
1775                        tr_peer ** peers;
1776                        tr_file_index_t fileIndex;
1777
1778                        /* only add this to downloadedCur if we got it from a peer --
1779                         * webseeds shouldn't count against our ratio. As one tracker
1780                         * admin put it, "Those pieces are downloaded directly from the
1781                         * content distributor, not the peers, it is the tracker's job
1782                         * to manage the swarms, not the web server and does not fit
1783                         * into the jurisdiction of the tracker." */
1784                        if( peer->msgs != NULL ) {
1785                            const uint32_t n = tr_torPieceCountBytes( tor, p );
1786                            tr_announcerAddBytes( tor, TR_ANN_DOWN, n );
1787                        }
1788
1789                        peerCount = tr_ptrArraySize( &t->peers );
1790                        peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1791                        for( i=0; i<peerCount; ++i )
1792                            tr_peerMsgsHave( peers[i]->msgs, p );
1793
1794                        for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex ) {
1795                            const tr_file * file = &tor->info.files[fileIndex];
1796                            if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) ) {
1797                                if( tr_cpFileIsComplete( &tor->completion, fileIndex ) ) {
1798                                    tr_cacheFlushFile( tor->session->cache, tor, fileIndex );
1799                                    tr_torrentFileCompleted( tor, fileIndex );
1800                                }
1801                            }
1802                        }
1803
1804                        pieceListRemovePiece( t, p );
1805                    }
1806                }
1807
1808                t->needsCompletenessCheck = TRUE;
1809            }
1810            break;
1811        }
1812
1813        case TR_PEER_ERROR:
1814            if( ( e->err == ERANGE ) || ( e->err == EMSGSIZE ) || ( e->err == ENOTCONN ) )
1815            {
1816                /* some protocol error from the peer */
1817                peer->doPurge = 1;
1818                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1819                        tr_atomAddrStr( peer->atom ) );
1820            }
1821            else
1822            {
1823                tordbg( t, "unhandled error: %s", tr_strerror( e->err ) );
1824            }
1825            break;
1826
1827        default:
1828            assert( 0 );
1829    }
1830
1831    torrentUnlock( t );
1832}
1833
1834static int
1835getDefaultShelfLife( uint8_t from )
1836{
1837    /* in general, peers obtained from firsthand contact
1838     * are better than those from secondhand, etc etc */
1839    switch( from )
1840    {
1841        case TR_PEER_FROM_INCOMING : return 60 * 60 * 6;
1842        case TR_PEER_FROM_LTEP     : return 60 * 60 * 6;
1843        case TR_PEER_FROM_TRACKER  : return 60 * 60 * 3;
1844        case TR_PEER_FROM_DHT      : return 60 * 60 * 3;
1845        case TR_PEER_FROM_PEX      : return 60 * 60 * 2;
1846        case TR_PEER_FROM_RESUME   : return 60 * 60;
1847        case TR_PEER_FROM_LPD      : return 10 * 60;
1848        default                    : return 60 * 60;
1849    }
1850}
1851
1852static void
1853ensureAtomExists( Torrent           * t,
1854                  const tr_address  * addr,
1855                  const tr_port       port,
1856                  const uint8_t       flags,
1857                  const int8_t        seedProbability,
1858                  const uint8_t       from )
1859{
1860    struct peer_atom * a;
1861
1862    assert( tr_isAddress( addr ) );
1863    assert( from < TR_PEER_FROM__MAX );
1864
1865    a = getExistingAtom( t, addr );
1866
1867    if( a == NULL )
1868    {
1869        const int jitter = tr_cryptoWeakRandInt( 60*10 );
1870        a = tr_new0( struct peer_atom, 1 );
1871        a->addr = *addr;
1872        a->port = port;
1873        a->flags = flags;
1874        a->fromFirst = from;
1875        a->fromBest = from;
1876        a->shelf_date = tr_time( ) + getDefaultShelfLife( from ) + jitter;
1877        a->blocklisted = -1;
1878        atomSetSeedProbability( a, seedProbability );
1879        tr_ptrArrayInsertSorted( &t->pool, a, compareAtomsByAddress );
1880
1881        tordbg( t, "got a new atom: %s", tr_atomAddrStr( a ) );
1882    }
1883    else
1884    {
1885        if( from < a->fromBest )
1886            a->fromBest = from;
1887       
1888        if( a->seedProbability == -1 )
1889            atomSetSeedProbability( a, seedProbability );
1890
1891        a->flags |= flags;
1892    }
1893}
1894
1895static int
1896getMaxPeerCount( const tr_torrent * tor )
1897{
1898    return tor->maxConnectedPeers;
1899}
1900
1901static int
1902getPeerCount( const Torrent * t )
1903{
1904    return tr_ptrArraySize( &t->peers );/* + tr_ptrArraySize( &t->outgoingHandshakes ); */
1905}
1906
1907/* FIXME: this is kind of a mess. */
1908static tr_bool
1909myHandshakeDoneCB( tr_handshake  * handshake,
1910                   tr_peerIo     * io,
1911                   tr_bool         readAnythingFromPeer,
1912                   tr_bool         isConnected,
1913                   const uint8_t * peer_id,
1914                   void          * vmanager )
1915{
1916    tr_bool            ok = isConnected;
1917    tr_bool            success = FALSE;
1918    tr_port            port;
1919    const tr_address * addr;
1920    tr_peerMgr       * manager = vmanager;
1921    Torrent          * t;
1922    tr_handshake     * ours;
1923
1924    assert( io );
1925    assert( tr_isBool( ok ) );
1926
1927    t = tr_peerIoHasTorrentHash( io )
1928        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1929        : NULL;
1930
1931    if( tr_peerIoIsIncoming ( io ) )
1932        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1933                                        handshake, handshakeCompare );
1934    else if( t )
1935        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1936                                        handshake, handshakeCompare );
1937    else
1938        ours = handshake;
1939
1940    assert( ours );
1941    assert( ours == handshake );
1942
1943    if( t )
1944        torrentLock( t );
1945
1946    addr = tr_peerIoGetAddress( io, &port );
1947
1948    if( !ok || !t || !t->isRunning )
1949    {
1950        if( t )
1951        {
1952            struct peer_atom * atom = getExistingAtom( t, addr );
1953            if( atom )
1954            {
1955                ++atom->numFails;
1956
1957                if( !readAnythingFromPeer )
1958                {
1959                    tordbg( t, "marking peer %s as unreachable... numFails is %d", tr_atomAddrStr( atom ), (int)atom->numFails );
1960                    atom->flags2 |= MYFLAG_UNREACHABLE;
1961                }
1962            }
1963        }
1964    }
1965    else /* looking good */
1966    {
1967        struct peer_atom * atom;
1968
1969        ensureAtomExists( t, addr, port, 0, -1, TR_PEER_FROM_INCOMING );
1970        atom = getExistingAtom( t, addr );
1971        atom->time = tr_time( );
1972        atom->piece_data_time = 0;
1973        atom->lastConnectionAt = tr_time( );
1974
1975        if( !tr_peerIoIsIncoming( io ) )
1976        {
1977            atom->flags |= ADDED_F_CONNECTABLE;
1978            atom->flags2 &= ~MYFLAG_UNREACHABLE;
1979        }
1980
1981        /* In principle, this flag specifies whether the peer groks uTP,
1982           not whether it's currently connected over uTP. */
1983        if( io->utp_socket )
1984            atom->flags |= ADDED_F_UTP_FLAGS;
1985
1986        if( atom->flags2 & MYFLAG_BANNED )
1987        {
1988            tordbg( t, "banned peer %s tried to reconnect",
1989                    tr_atomAddrStr( atom ) );
1990        }
1991        else if( tr_peerIoIsIncoming( io )
1992               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
1993
1994        {
1995        }
1996        else
1997        {
1998            tr_peer * peer = atom->peer;
1999
2000            if( peer ) /* we already have this peer */
2001            {
2002            }
2003            else
2004            {
2005                peer = getPeer( t, atom );
2006                tr_free( peer->client );
2007
2008                if( !peer_id )
2009                    peer->client = NULL;
2010                else {
2011                    char client[128];
2012                    tr_clientForId( client, sizeof( client ), peer_id );
2013                    peer->client = tr_strdup( client );
2014                }
2015
2016                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
2017                                                                balanced by our unref in peerDestructor()  */
2018                tr_peerIoSetParent( peer->io, t->tor->bandwidth );
2019                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t );
2020
2021                success = TRUE;
2022            }
2023        }
2024    }
2025
2026    if( t )
2027        torrentUnlock( t );
2028
2029    return success;
2030}
2031
2032void
2033tr_peerMgrAddIncoming( tr_peerMgr * manager,
2034                       tr_address * addr,
2035                       tr_port      port,
2036                       int          socket,
2037                       struct UTPSocket * utp_socket )
2038{
2039    tr_session * session;
2040
2041    managerLock( manager );
2042
2043    assert( tr_isSession( manager->session ) );
2044    session = manager->session;
2045
2046    if( tr_sessionIsAddressBlocked( session, addr ) )
2047    {
2048        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_ntop_non_ts( addr ) );
2049        if(socket >= 0)
2050            tr_netClose( session, socket );
2051        else
2052            UTP_Close( utp_socket );
2053    }
2054    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
2055    {
2056        if(socket >= 0)
2057            tr_netClose( session, socket );
2058        else
2059            UTP_Close( utp_socket );
2060    }
2061    else /* we don't have a connection to them yet... */
2062    {
2063        tr_peerIo *    io;
2064        tr_handshake * handshake;
2065
2066        io = tr_peerIoNewIncoming( session, session->bandwidth, addr, port, socket, utp_socket );
2067
2068        handshake = tr_handshakeNew( io,
2069                                     session->encryptionMode,
2070                                     myHandshakeDoneCB,
2071                                     manager );
2072
2073        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
2074
2075        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
2076                                 handshakeCompare );
2077    }
2078
2079    managerUnlock( manager );
2080}
2081
2082void
2083tr_peerMgrAddPex( tr_torrent * tor, uint8_t from,
2084                  const tr_pex * pex, int8_t seedProbability )
2085{
2086    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
2087    {
2088        Torrent * t = tor->torrentPeers;
2089        managerLock( t->manager );
2090
2091        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
2092            if( tr_isValidPeerAddress( &pex->addr, pex->port ) )
2093                ensureAtomExists( t, &pex->addr, pex->port, pex->flags, seedProbability, from );
2094
2095        managerUnlock( t->manager );
2096    }
2097}
2098
2099void
2100tr_peerMgrMarkAllAsSeeds( tr_torrent * tor )
2101{
2102    Torrent * t = tor->torrentPeers;
2103    const int n = tr_ptrArraySize( &t->pool );
2104    struct peer_atom ** it = (struct peer_atom**) tr_ptrArrayBase( &t->pool );
2105    struct peer_atom ** end = it + n;
2106
2107    while( it != end )
2108        atomSetSeed( *it++ );
2109}
2110
2111tr_pex *
2112tr_peerMgrCompactToPex( const void *    compact,
2113                        size_t          compactLen,
2114                        const uint8_t * added_f,
2115                        size_t          added_f_len,
2116                        size_t *        pexCount )
2117{
2118    size_t          i;
2119    size_t          n = compactLen / 6;
2120    const uint8_t * walk = compact;
2121    tr_pex *        pex = tr_new0( tr_pex, n );
2122
2123    for( i = 0; i < n; ++i )
2124    {
2125        pex[i].addr.type = TR_AF_INET;
2126        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
2127        memcpy( &pex[i].port, walk, 2 ); walk += 2;
2128        if( added_f && ( n == added_f_len ) )
2129            pex[i].flags = added_f[i];
2130    }
2131
2132    *pexCount = n;
2133    return pex;
2134}
2135
2136tr_pex *
2137tr_peerMgrCompact6ToPex( const void    * compact,
2138                         size_t          compactLen,
2139                         const uint8_t * added_f,
2140                         size_t          added_f_len,
2141                         size_t        * pexCount )
2142{
2143    size_t          i;
2144    size_t          n = compactLen / 18;
2145    const uint8_t * walk = compact;
2146    tr_pex *        pex = tr_new0( tr_pex, n );
2147
2148    for( i = 0; i < n; ++i )
2149    {
2150        pex[i].addr.type = TR_AF_INET6;
2151        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
2152        memcpy( &pex[i].port, walk, 2 ); walk += 2;
2153        if( added_f && ( n == added_f_len ) )
2154            pex[i].flags = added_f[i];
2155    }
2156
2157    *pexCount = n;
2158    return pex;
2159}
2160
2161tr_pex *
2162tr_peerMgrArrayToPex( const void * array,
2163                      size_t       arrayLen,
2164                      size_t      * pexCount )
2165{
2166    size_t          i;
2167    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
2168    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
2169    const uint8_t * walk = array;
2170    tr_pex        * pex = tr_new0( tr_pex, n );
2171
2172    for( i = 0 ; i < n ; i++ ) {
2173        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
2174        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
2175        pex[i].flags = 0x00;
2176        walk += sizeof( tr_address ) + 2;
2177    }
2178
2179    *pexCount = n;
2180    return pex;
2181}
2182
2183/**
2184***
2185**/
2186
2187void
2188tr_peerMgrSetBlame( tr_torrent     * tor,
2189                    tr_piece_index_t pieceIndex,
2190                    int              success )
2191{
2192    if( !success )
2193    {
2194        int        peerCount, i;
2195        Torrent *  t = tor->torrentPeers;
2196        tr_peer ** peers;
2197
2198        assert( torrentIsLocked( t ) );
2199
2200        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
2201        for( i = 0; i < peerCount; ++i )
2202        {
2203            tr_peer * peer = peers[i];
2204            if( tr_bitfieldHas( peer->blame, pieceIndex ) )
2205            {
2206                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
2207                        tr_atomAddrStr( peer->atom ),
2208                        pieceIndex, (int)peer->strikes + 1 );
2209                addStrike( t, peer );
2210            }
2211        }
2212    }
2213}
2214
2215int
2216tr_pexCompare( const void * va, const void * vb )
2217{
2218    const tr_pex * a = va;
2219    const tr_pex * b = vb;
2220    int i;
2221
2222    assert( tr_isPex( a ) );
2223    assert( tr_isPex( b ) );
2224
2225    if(( i = tr_compareAddresses( &a->addr, &b->addr )))
2226        return i;
2227
2228    if( a->port != b->port )
2229        return a->port < b->port ? -1 : 1;
2230
2231    return 0;
2232}
2233
2234#if 0
2235static int
2236peerPrefersCrypto( const tr_peer * peer )
2237{
2238    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_YES )
2239        return TRUE;
2240
2241    if( peer->encryption_preference == ENCRYPTION_PREFERENCE_NO )
2242        return FALSE;
2243
2244    return tr_peerIoIsEncrypted( peer->io );
2245}
2246#endif
2247
2248/* better goes first */
2249static int
2250compareAtomsByUsefulness( const void * va, const void *vb )
2251{
2252    const struct peer_atom * a = * (const struct peer_atom**) va;
2253    const struct peer_atom * b = * (const struct peer_atom**) vb;
2254
2255    assert( tr_isAtom( a ) );
2256    assert( tr_isAtom( b ) );
2257
2258    if( a->piece_data_time != b->piece_data_time )
2259        return a->piece_data_time > b->piece_data_time ? -1 : 1;
2260    if( a->fromBest != b->fromBest )
2261        return a->fromBest < b->fromBest ? -1 : 1;
2262    if( a->numFails != b->numFails )
2263        return a->numFails < b->numFails ? -1 : 1;
2264
2265    return 0;
2266}
2267
2268int
2269tr_peerMgrGetPeers( tr_torrent   * tor,
2270                    tr_pex      ** setme_pex,
2271                    uint8_t        af,
2272                    uint8_t        list_mode,
2273                    int            maxCount )
2274{
2275    int i;
2276    int n;
2277    int count = 0;
2278    int atomCount = 0;
2279    const Torrent * t = tor->torrentPeers;
2280    struct peer_atom ** atoms = NULL;
2281    tr_pex * pex;
2282    tr_pex * walk;
2283
2284    assert( tr_isTorrent( tor ) );
2285    assert( setme_pex != NULL );
2286    assert( af==TR_AF_INET || af==TR_AF_INET6 );
2287    assert( list_mode==TR_PEERS_CONNECTED || list_mode==TR_PEERS_ALL );
2288
2289    managerLock( t->manager );
2290
2291    /**
2292    ***  build a list of atoms
2293    **/
2294
2295    if( list_mode == TR_PEERS_CONNECTED ) /* connected peers only */
2296    {
2297        int i;
2298        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2299        atomCount = tr_ptrArraySize( &t->peers );
2300        atoms = tr_new( struct peer_atom *, atomCount );
2301        for( i=0; i<atomCount; ++i )
2302            atoms[i] = peers[i]->atom;
2303    }
2304    else /* TR_PEERS_ALL */
2305    {
2306        const struct peer_atom ** atomsBase = (const struct peer_atom**) tr_ptrArrayBase( &t->pool );
2307        atomCount = tr_ptrArraySize( &t->pool );
2308        atoms = tr_memdup( atomsBase, atomCount * sizeof( struct peer_atom * ) );
2309    }
2310
2311    qsort( atoms, atomCount, sizeof( struct peer_atom * ), compareAtomsByUsefulness );
2312
2313    /**
2314    ***  add the first N of them into our return list
2315    **/
2316
2317    n = MIN( atomCount, maxCount );
2318    pex = walk = tr_new0( tr_pex, n );
2319
2320    for( i=0; i<atomCount && count<n; ++i )
2321    {
2322        const struct peer_atom * atom = atoms[i];
2323        if( atom->addr.type == af )
2324        {
2325            assert( tr_isAddress( &atom->addr ) );
2326            walk->addr = atom->addr;
2327            walk->port = atom->port;
2328            walk->flags = atom->flags;
2329            ++count;
2330            ++walk;
2331        }
2332    }
2333
2334    qsort( pex, count, sizeof( tr_pex ), tr_pexCompare );
2335
2336    assert( ( walk - pex ) == count );
2337    *setme_pex = pex;
2338
2339    /* cleanup */
2340    tr_free( atoms );
2341    managerUnlock( t->manager );
2342    return count;
2343}
2344
2345static void atomPulse      ( int, short, void * );
2346static void bandwidthPulse ( int, short, void * );
2347static void rechokePulse   ( int, short, void * );
2348static void reconnectPulse ( int, short, void * );
2349
2350static struct event *
2351createTimer( tr_session * session, int msec, void (*callback)(int, short, void *), void * cbdata )
2352{
2353    struct event * timer = evtimer_new( session->event_base, callback, cbdata );
2354    tr_timerAddMsec( timer, msec );
2355    return timer;
2356}
2357
2358static void
2359ensureMgrTimersExist( struct tr_peerMgr * m )
2360{
2361    if( m->atomTimer == NULL )
2362        m->atomTimer = createTimer( m->session, ATOM_PERIOD_MSEC, atomPulse, m );
2363
2364    if( m->bandwidthTimer == NULL )
2365        m->bandwidthTimer = createTimer( m->session, BANDWIDTH_PERIOD_MSEC, bandwidthPulse, m );
2366
2367    if( m->rechokeTimer == NULL )
2368        m->rechokeTimer = createTimer( m->session, RECHOKE_PERIOD_MSEC, rechokePulse, m );
2369
2370    if( m->refillUpkeepTimer == NULL )
2371        m->refillUpkeepTimer = createTimer( m->session, REFILL_UPKEEP_PERIOD_MSEC, refillUpkeep, m );
2372}
2373
2374void
2375tr_peerMgrStartTorrent( tr_torrent * tor )
2376{
2377    Torrent * t = tor->torrentPeers;
2378
2379    assert( tr_isTorrent( tor ) );
2380    assert( tr_torrentIsLocked( tor ) );
2381
2382    ensureMgrTimersExist( t->manager );
2383
2384    t->isRunning = TRUE;
2385    t->maxPeers = t->tor->maxConnectedPeers;
2386    t->pieceSortState = PIECES_UNSORTED;
2387
2388    rechokePulse( 0, 0, t->manager );
2389}
2390
2391static void
2392stopTorrent( Torrent * t )
2393{
2394    int i, n;
2395
2396    t->isRunning = FALSE;
2397
2398    replicationFree( t );
2399    invalidatePieceSorting( t );
2400
2401    /* disconnect the peers. */
2402    for( i=0, n=tr_ptrArraySize( &t->peers ); i<n; ++i )
2403        peerDestructor( t, tr_ptrArrayNth( &t->peers, i ) );
2404    tr_ptrArrayClear( &t->peers );
2405
2406    /* disconnect the handshakes. handshakeAbort calls handshakeDoneCB(),
2407     * which removes the handshake from t->outgoingHandshakes... */
2408    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
2409        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
2410}
2411
2412void
2413tr_peerMgrStopTorrent( tr_torrent * tor )
2414{
2415    assert( tr_isTorrent( tor ) );
2416    assert( tr_torrentIsLocked( tor ) );
2417
2418    stopTorrent( tor->torrentPeers );
2419}
2420
2421void
2422tr_peerMgrAddTorrent( tr_peerMgr * manager, tr_torrent * tor )
2423{
2424    assert( tr_isTorrent( tor ) );
2425    assert( tr_torrentIsLocked( tor ) );
2426    assert( tor->torrentPeers == NULL );
2427
2428    tor->torrentPeers = torrentConstructor( manager, tor );
2429}
2430
2431void
2432tr_peerMgrRemoveTorrent( tr_torrent * tor )
2433{
2434    assert( tr_isTorrent( tor ) );
2435    assert( tr_torrentIsLocked( tor ) );
2436
2437    stopTorrent( tor->torrentPeers );
2438    torrentDestructor( tor->torrentPeers );
2439}
2440
2441void
2442tr_peerMgrTorrentAvailability( const tr_torrent * tor, int8_t * tab, unsigned int tabCount )
2443{
2444    assert( tr_isTorrent( tor ) );
2445    assert( torrentIsLocked( tor->torrentPeers ) );
2446    assert( tab != NULL );
2447    assert( tabCount > 0 );
2448
2449    memset( tab, 0, tabCount );
2450
2451    if( tr_torrentHasMetadata( tor ) )
2452    {
2453        tr_piece_index_t i;
2454        const int peerCount = tr_ptrArraySize( &tor->torrentPeers->peers );
2455        const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &tor->torrentPeers->peers );
2456        const float interval = tor->info.pieceCount / (float)tabCount;
2457        const tr_bool isSeed = tr_cpGetStatus( &tor->completion ) == TR_SEED;
2458
2459        for( i=0; i<tabCount; ++i )
2460        {
2461            const int piece = i * interval;
2462
2463            if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
2464                tab[i] = -1;
2465            else if( peerCount ) {
2466                int j;
2467                for( j=0; j<peerCount; ++j )
2468                    if( tr_bitsetHas( &peers[j]->have, piece ) )
2469                        ++tab[i];
2470            }
2471        }
2472    }
2473}
2474
2475/* Returns the pieces that are available from peers */
2476tr_bitfield*
2477tr_peerMgrGetAvailable( const tr_torrent * tor )
2478{
2479    int i;
2480    Torrent * t = tor->torrentPeers;
2481    const int peerCount = tr_ptrArraySize( &t->peers );
2482    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2483    tr_bitfield * pieces = tr_bitfieldNew( t->tor->info.pieceCount );
2484
2485    assert( tr_torrentIsLocked( tor ) );
2486
2487    for( i=0; i<peerCount; ++i )
2488        tr_bitsetOr( pieces, &peers[i]->have );
2489
2490    return pieces;
2491}
2492
2493void
2494tr_peerMgrTorrentStats( tr_torrent  * tor,
2495                        int         * setmePeersKnown,
2496                        int         * setmePeersConnected,
2497                        int         * setmeSeedsConnected,
2498                        int         * setmeWebseedsSendingToUs,
2499                        int         * setmePeersSendingToUs,
2500                        int         * setmePeersGettingFromUs,
2501                        int         * setmePeersFrom )
2502{
2503    int i, size;
2504    const Torrent * t = tor->torrentPeers;
2505    const tr_peer ** peers;
2506
2507    assert( tr_torrentIsLocked( tor ) );
2508
2509    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2510    size = tr_ptrArraySize( &t->peers );
2511
2512    *setmePeersKnown           = tr_ptrArraySize( &t->pool );
2513    *setmePeersConnected       = 0;
2514    *setmeSeedsConnected       = 0;
2515    *setmePeersGettingFromUs   = 0;
2516    *setmePeersSendingToUs     = 0;
2517    *setmeWebseedsSendingToUs  = 0;
2518
2519    for( i=0; i<TR_PEER_FROM__MAX; ++i )
2520        setmePeersFrom[i] = 0;
2521
2522    for( i=0; i<size; ++i )
2523    {
2524        const tr_peer * peer = peers[i];
2525        const struct peer_atom * atom = peer->atom;
2526
2527        if( peer->io == NULL ) /* not connected */
2528            continue;
2529
2530        ++*setmePeersConnected;
2531
2532        ++setmePeersFrom[atom->fromFirst];
2533
2534        if( clientIsDownloadingFrom( tor, peer ) )
2535            ++*setmePeersSendingToUs;
2536
2537        if( clientIsUploadingTo( peer ) )
2538            ++*setmePeersGettingFromUs;
2539
2540        if( atomIsSeed( atom ) )
2541            ++*setmeSeedsConnected;
2542    }
2543
2544    *setmeWebseedsSendingToUs = countActiveWebseeds( t );
2545}
2546
2547int
2548tr_peerMgrGetWebseedSpeed_Bps( const tr_torrent * tor, uint64_t now )
2549{
2550    int i;
2551    int tmp;
2552    int ret = 0;
2553
2554    const Torrent * t = tor->torrentPeers;
2555    const int n = tr_ptrArraySize( &t->webseeds );
2556    const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2557
2558    for( i=0; i<n; ++i )
2559        if( tr_webseedGetSpeed_Bps( webseeds[i], now, &tmp ) )
2560            ret += tmp;
2561
2562    return ret;
2563}
2564
2565
2566double*
2567tr_peerMgrWebSpeeds_KBps( const tr_torrent * tor )
2568{
2569    int i;
2570    const Torrent * t = tor->torrentPeers;
2571    const int webseedCount = tr_ptrArraySize( &t->webseeds );
2572    const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2573    const uint64_t now = tr_time_msec( );
2574    double * ret = tr_new0( double, webseedCount );
2575
2576    assert( tr_isTorrent( tor ) );
2577    assert( tr_torrentIsLocked( tor ) );
2578    assert( t->manager != NULL );
2579    assert( webseedCount == tor->info.webseedCount );
2580
2581    for( i=0; i<webseedCount; ++i ) {
2582        int Bps;
2583        if( tr_webseedGetSpeed_Bps( webseeds[i], now, &Bps ) )
2584            ret[i] = Bps / (double)tr_speed_K;
2585        else
2586            ret[i] = -1.0;
2587    }
2588
2589    return ret;
2590}
2591
2592int
2593tr_peerGetPieceSpeed_Bps( const tr_peer * peer, uint64_t now, tr_direction direction )
2594{
2595    return peer->io ? tr_peerIoGetPieceSpeed_Bps( peer->io, now, direction ) : 0.0;
2596}
2597
2598
2599struct tr_peer_stat *
2600tr_peerMgrPeerStats( const tr_torrent * tor, int * setmeCount )
2601{
2602    int i;
2603    const Torrent * t = tor->torrentPeers;
2604    const int size = tr_ptrArraySize( &t->peers );
2605    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2606    const uint64_t now_msec = tr_time_msec( );
2607    const time_t now = tr_time();
2608    tr_peer_stat * ret = tr_new0( tr_peer_stat, size );
2609
2610    assert( tr_isTorrent( tor ) );
2611    assert( tr_torrentIsLocked( tor ) );
2612    assert( t->manager );
2613
2614    for( i=0; i<size; ++i )
2615    {
2616        char *                   pch;
2617        const tr_peer *          peer = peers[i];
2618        const struct peer_atom * atom = peer->atom;
2619        tr_peer_stat *           stat = ret + i;
2620
2621        tr_ntop( &atom->addr, stat->addr, sizeof( stat->addr ) );
2622        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
2623                   sizeof( stat->client ) );
2624        stat->port                = ntohs( peer->atom->port );
2625        stat->from                = atom->fromFirst;
2626        stat->progress            = peer->progress;
2627        stat->isUTP               = peer->io->utp_socket != NULL;
2628        stat->isEncrypted         = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
2629        stat->rateToPeer_KBps     = toSpeedKBps( tr_peerGetPieceSpeed_Bps( peer, now_msec, TR_CLIENT_TO_PEER ) );
2630        stat->rateToClient_KBps   = toSpeedKBps( tr_peerGetPieceSpeed_Bps( peer, now_msec, TR_PEER_TO_CLIENT ) );
2631        stat->peerIsChoked        = peer->peerIsChoked;
2632        stat->peerIsInterested    = peer->peerIsInterested;
2633        stat->clientIsChoked      = peer->clientIsChoked;
2634        stat->clientIsInterested  = peer->clientIsInterested;
2635        stat->isIncoming          = tr_peerIoIsIncoming( peer->io );
2636        stat->isDownloadingFrom   = clientIsDownloadingFrom( tor, peer );
2637        stat->isUploadingTo       = clientIsUploadingTo( peer );
2638        stat->isSeed              = ( atom->uploadOnly == UPLOAD_ONLY_YES ) || ( peer->progress >= 1.0 );
2639
2640        stat->blocksToPeer        = tr_historyGet( peer->blocksSentToPeer,    now, CANCEL_HISTORY_SEC );
2641        stat->blocksToClient      = tr_historyGet( peer->blocksSentToClient,  now, CANCEL_HISTORY_SEC );
2642        stat->cancelsToPeer       = tr_historyGet( peer->cancelsSentToPeer,   now, CANCEL_HISTORY_SEC );
2643        stat->cancelsToClient     = tr_historyGet( peer->cancelsSentToClient, now, CANCEL_HISTORY_SEC );
2644
2645        stat->pendingReqsToPeer   = peer->pendingReqsToPeer;
2646        stat->pendingReqsToClient = peer->pendingReqsToClient;
2647
2648        pch = stat->flagStr;
2649        if( stat->isUTP ) *pch++ = 'T';
2650        if( t->optimistic == peer ) *pch++ = 'O';
2651        if( stat->isDownloadingFrom ) *pch++ = 'D';
2652        else if( stat->clientIsInterested ) *pch++ = 'd';
2653        if( stat->isUploadingTo ) *pch++ = 'U';
2654        else if( stat->peerIsInterested ) *pch++ = 'u';
2655        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
2656        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
2657        if( stat->isEncrypted ) *pch++ = 'E';
2658        if( stat->from == TR_PEER_FROM_DHT ) *pch++ = 'H';
2659        else if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
2660        if( stat->isIncoming ) *pch++ = 'I';
2661        *pch = '\0';
2662    }
2663
2664    *setmeCount = size;
2665
2666    return ret;
2667}
2668
2669/**
2670***
2671**/
2672
2673void
2674tr_peerMgrClearInterest( tr_torrent * tor )
2675{
2676    int i;
2677    Torrent * t = tor->torrentPeers;
2678    const int peerCount = tr_ptrArraySize( &t->peers );
2679
2680    assert( tr_isTorrent( tor ) );
2681    assert( tr_torrentIsLocked( tor ) );
2682
2683    for( i=0; i<peerCount; ++i )
2684    {
2685        const tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2686        tr_peerMsgsSetInterested( peer->msgs, FALSE );
2687    }
2688}
2689
2690/* do we still want this piece and does the peer have it? */
2691static tr_bool
2692isPieceInteresting( const tr_torrent * tor, const tr_peer * peer, tr_piece_index_t index )
2693{
2694    return ( !tor->info.pieces[index].dnd ) /* we want it */
2695        && ( !tr_cpPieceIsComplete( &tor->completion, index ) )  /* we don't have it */
2696        && ( tr_bitsetHas( &peer->have, index ) ); /* peer has it */
2697}
2698
2699/* does this peer have any pieces that we want? */
2700static tr_bool
2701isPeerInteresting( const tr_torrent * tor, const tr_peer * peer )
2702{
2703    tr_piece_index_t i, n;
2704
2705    if ( tr_torrentIsSeed( tor ) )
2706        return FALSE;
2707
2708    if( !tr_torrentIsPieceTransferAllowed( tor, TR_PEER_TO_CLIENT ) )
2709        return FALSE;
2710
2711    for( i=0, n=tor->info.pieceCount; i<n; ++i )
2712        if( isPieceInteresting( tor, peer, i ) )
2713            return TRUE;
2714
2715    return FALSE;
2716}
2717
2718/* determines who we send "interested" messages to */
2719static void
2720rechokeDownloads( Torrent * t )
2721{
2722    int i;
2723    const time_t now = tr_time( );
2724    const int MIN_INTERESTING_PEERS = 5;
2725    const int peerCount = tr_ptrArraySize( &t->peers );
2726    int maxPeers;
2727
2728    int badCount         = 0;
2729    int goodCount        = 0;
2730    int untestedCount    = 0;
2731    tr_peer ** bad       = tr_new( tr_peer*, peerCount );
2732    tr_peer ** good      = tr_new( tr_peer*, peerCount );
2733    tr_peer ** untested  = tr_new( tr_peer*, peerCount );
2734
2735    /* decide how many peers to be interested in */
2736    {
2737        int blocks = 0;
2738        int cancels = 0;
2739        time_t timeSinceCancel;
2740
2741        /* Count up how many blocks & cancels each peer has.
2742         *
2743         * There are two situations where we send out cancels --
2744         *
2745         * 1. We've got unresponsive peers, which is handled by deciding
2746         *    -which- peers to be interested in.
2747         *
2748         * 2. We've hit our bandwidth cap, which is handled by deciding
2749         *    -how many- peers to be interested in.
2750         *
2751         * We're working on 2. here, so we need to ignore unresponsive
2752         * peers in our calculations lest they confuse Transmission into
2753         * thinking it's hit its bandwidth cap.
2754         */
2755        for( i=0; i<peerCount; ++i )
2756        {
2757            const tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2758            const int b = tr_historyGet( peer->blocksSentToClient, now, CANCEL_HISTORY_SEC );
2759            const int c = tr_historyGet( peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC );
2760
2761            if( b == 0 ) /* ignore unresponsive peers, as described above */
2762                continue;
2763
2764            blocks += b;
2765            cancels += c;
2766        }
2767
2768        if( cancels > 0 )
2769        {
2770            /* cancelRate: of the block requests we've recently made, the percentage we cancelled.
2771             * higher values indicate more congestion. */
2772            const double cancelRate = cancels / (double)(cancels + blocks);
2773            const double mult = 1 - MIN( cancelRate, 0.5 );
2774            maxPeers = t->interestedCount * mult;
2775            tordbg( t, "cancel rate is %.3f -- reducing the "
2776                       "number of peers we're interested in by %.0f percent",
2777                       cancelRate, mult * 100 );
2778            t->lastCancel = now;
2779        }
2780
2781        timeSinceCancel = now - t->lastCancel;
2782        if( timeSinceCancel )
2783        {
2784            const int maxIncrease = 15;
2785            const time_t maxHistory = 2 * CANCEL_HISTORY_SEC;
2786            const double mult = MIN( timeSinceCancel, maxHistory ) / (double) maxHistory;
2787            const int inc = maxIncrease * mult;
2788            maxPeers = t->maxPeers + inc;
2789            tordbg( t, "time since last cancel is %li -- increasing the "
2790                       "number of peers we're interested in by %d",
2791                       timeSinceCancel, inc );
2792        }
2793    }
2794
2795    /* don't let the previous section's number tweaking go too far... */
2796    if( maxPeers < MIN_INTERESTING_PEERS )
2797        maxPeers = MIN_INTERESTING_PEERS;
2798    if( maxPeers > t->tor->maxConnectedPeers )
2799        maxPeers = t->tor->maxConnectedPeers;
2800
2801    t->maxPeers = maxPeers;
2802
2803    /* separate the peers into "good" (ones with a low cancel-to-block ratio),
2804     * untested peers, and "bad" (ones with a high cancel-to-block ratio).
2805     * That's the order in which we'll choose who to show interest in */
2806    {
2807        /* Randomize the peer array so the peers in the three groups will be unsorted... */
2808        int n = peerCount;
2809        tr_peer ** peers = tr_memdup( tr_ptrArrayBase( &t->peers ), n * sizeof( tr_peer * ) );
2810
2811        while( n > 0 )
2812        {
2813            const int i = tr_cryptoWeakRandInt( n );
2814            tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2815
2816            if( !isPeerInteresting( t->tor, peer ) )
2817            {
2818                tr_peerMsgsSetInterested( peer->msgs, FALSE );
2819            }
2820            else
2821            {
2822                const int blocks = tr_historyGet( peer->blocksSentToClient, now, CANCEL_HISTORY_SEC );
2823                const int cancels = tr_historyGet( peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC );
2824
2825                if( !blocks && !cancels )
2826                    untested[untestedCount++] = peer;
2827                else if( !cancels )
2828                    good[goodCount++] = peer;
2829                else if( !blocks )
2830                    bad[badCount++] = peer;
2831                else if( ( cancels * 10 ) < blocks )
2832                    good[goodCount++] = peer;
2833                else
2834                    bad[badCount++] = peer;
2835            }
2836
2837            tr_removeElementFromArray( peers, i, sizeof(tr_peer*), n-- );
2838        }
2839
2840        tr_free( peers );
2841    }
2842
2843    t->interestedCount = 0;
2844
2845    /* We've decided (1) how many peers to be interested in,
2846     * and (2) which peers are the best candidates,
2847     * Now it's time to update our `interest' flags. */
2848    for( i=0; i<goodCount; ++i ) {
2849        const tr_bool b = t->interestedCount < maxPeers;
2850        tr_peerMsgsSetInterested( good[i]->msgs, b );
2851        if( b )
2852            ++t->interestedCount;
2853    }
2854    for( i=0; i<untestedCount; ++i ) {
2855        const tr_bool b = t->interestedCount < maxPeers;
2856        tr_peerMsgsSetInterested( untested[i]->msgs, b );
2857        if( b )
2858            ++t->interestedCount;
2859    }
2860    for( i=0; i<badCount; ++i ) {
2861        const tr_bool b = t->interestedCount < maxPeers;
2862        tr_peerMsgsSetInterested( bad[i]->msgs, b );
2863        if( b )
2864            ++t->interestedCount;
2865    }
2866
2867/*fprintf( stderr, "num interested: %d\n", t->interestedCount );*/
2868
2869    /* cleanup */
2870    tr_free( untested );
2871    tr_free( good );
2872    tr_free( bad );
2873}
2874
2875/**
2876***
2877**/
2878
2879struct ChokeData
2880{
2881    tr_bool         isInterested;
2882    tr_bool         wasChoked;
2883    tr_bool         isChoked;
2884    int             rate;
2885    int             salt;
2886    tr_peer *       peer;
2887};
2888
2889static int
2890compareChoke( const void * va,
2891              const void * vb )
2892{
2893    const struct ChokeData * a = va;
2894    const struct ChokeData * b = vb;
2895
2896    if( a->rate != b->rate ) /* prefer higher overall speeds */
2897        return a->rate > b->rate ? -1 : 1;
2898
2899    if( a->wasChoked != b->wasChoked ) /* prefer unchoked */
2900        return a->wasChoked ? 1 : -1;
2901
2902    if( a->salt != b->salt ) /* random order */
2903        return a->salt - b->salt;
2904
2905    return 0;
2906}
2907
2908/* is this a new connection? */
2909static int
2910isNew( const tr_peer * peer )
2911{
2912    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
2913}
2914
2915/* get a rate for deciding which peers to choke and unchoke. */
2916static int
2917getRate( const tr_torrent * tor, struct peer_atom * atom, uint64_t now )
2918{
2919    int Bps;
2920
2921    if( tr_torrentIsSeed( tor ) )
2922        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_CLIENT_TO_PEER );
2923
2924    /* downloading a private torrent... take upload speed into account
2925     * because there may only be a small window of opportunity to share */
2926    else if( tr_torrentIsPrivate( tor ) )
2927        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_PEER_TO_CLIENT )
2928            + tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_CLIENT_TO_PEER );
2929
2930    /* downloading a public torrent */
2931    else
2932        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_PEER_TO_CLIENT );
2933
2934    /* convert it to bytes per second */
2935    return Bps;
2936}
2937
2938static inline tr_bool
2939isBandwidthMaxedOut( const tr_bandwidth * b,
2940                     const uint64_t now_msec, tr_direction dir )
2941{
2942    if( !tr_bandwidthIsLimited( b, dir ) )
2943        return FALSE;
2944    else {
2945        const int got = tr_bandwidthGetPieceSpeed_Bps( b, now_msec, dir );
2946        const int want = tr_bandwidthGetDesiredSpeed_Bps( b, dir );
2947        return got >= want;
2948    }
2949}
2950
2951static void
2952rechokeUploads( Torrent * t, const uint64_t now )
2953{
2954    int i, size, unchokedInterested;
2955    const int peerCount = tr_ptrArraySize( &t->peers );
2956    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
2957    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
2958    const tr_session * session = t->manager->session;
2959    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
2960    const tr_bool isMaxedOut = isBandwidthMaxedOut( t->tor->bandwidth, now, TR_UP );
2961
2962    assert( torrentIsLocked( t ) );
2963
2964    /* an optimistic unchoke peer's "optimistic"
2965     * state lasts for N calls to rechokeUploads(). */
2966    if( t->optimisticUnchokeTimeScaler > 0 )
2967        t->optimisticUnchokeTimeScaler--;
2968    else
2969        t->optimistic = NULL;
2970
2971    /* sort the peers by preference and rate */
2972    for( i = 0, size = 0; i < peerCount; ++i )
2973    {
2974        tr_peer * peer = peers[i];
2975        struct peer_atom * atom = peer->atom;
2976
2977        if( peer->progress >= 1.0 ) /* choke all seeds */
2978        {
2979            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2980        }
2981        else if( atom->uploadOnly == UPLOAD_ONLY_YES ) /* choke partial seeds */
2982        {
2983            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2984        }
2985        else if( chokeAll ) /* choke everyone if we're not uploading */
2986        {
2987            tr_peerMsgsSetChoke( peer->msgs, TRUE );
2988        }
2989        else if( peer != t->optimistic )
2990        {
2991            struct ChokeData * n = &choke[size++];
2992            n->peer         = peer;
2993            n->isInterested = peer->peerIsInterested;
2994            n->wasChoked    = peer->peerIsChoked;
2995            n->rate         = getRate( t->tor, atom, now );
2996            n->salt         = tr_cryptoWeakRandInt( INT_MAX );
2997            n->isChoked     = TRUE;
2998        }
2999    }
3000
3001    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
3002
3003    /**
3004     * Reciprocation and number of uploads capping is managed by unchoking
3005     * the N peers which have the best upload rate and are interested.
3006     * This maximizes the client's download rate. These N peers are
3007     * referred to as downloaders, because they are interested in downloading
3008     * from the client.
3009     *
3010     * Peers which have a better upload rate (as compared to the downloaders)
3011     * but aren't interested get unchoked. If they become interested, the
3012     * downloader with the worst upload rate gets choked. If a client has
3013     * a complete file, it uses its upload rate rather than its download
3014     * rate to decide which peers to unchoke.
3015     *
3016     * If our bandwidth is maxed out, don't unchoke any more peers.
3017     */
3018    unchokedInterested = 0;
3019    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
3020        choke[i].isChoked = isMaxedOut ? choke[i].wasChoked : FALSE;
3021        if( choke[i].isInterested )
3022            ++unchokedInterested;
3023    }
3024
3025    /* optimistic unchoke */
3026    if( !t->optimistic && !isMaxedOut && (i<size) )
3027    {
3028        int n;
3029        struct ChokeData * c;
3030        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
3031
3032        for( ; i<size; ++i )
3033        {
3034            if( choke[i].isInterested )
3035            {
3036                const tr_peer * peer = choke[i].peer;
3037                int x = 1, y;
3038                if( isNew( peer ) ) x *= 3;
3039                for( y=0; y<x; ++y )
3040                    tr_ptrArrayAppend( &randPool, &choke[i] );
3041            }
3042        }
3043
3044        if(( n = tr_ptrArraySize( &randPool )))
3045        {
3046            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
3047            c->isChoked = FALSE;
3048            t->optimistic = c->peer;
3049            t->optimisticUnchokeTimeScaler = OPTIMISTIC_UNCHOKE_MULTIPLIER;
3050        }
3051
3052        tr_ptrArrayDestruct( &randPool, NULL );
3053    }
3054
3055    for( i=0; i<size; ++i )
3056        tr_peerMsgsSetChoke( choke[i].peer->msgs, choke[i].isChoked );
3057
3058    /* cleanup */
3059    tr_free( choke );
3060}
3061
3062static void
3063rechokePulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3064{
3065    tr_torrent * tor = NULL;
3066    tr_peerMgr * mgr = vmgr;
3067    const uint64_t now = tr_sessionGetTimeMsec( mgr->session );
3068
3069    managerLock( mgr );
3070
3071    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3072        if( tor->isRunning ) {
3073            rechokeUploads( tor->torrentPeers, now );
3074            if( !tr_torrentIsSeed( tor ) )
3075                rechokeDownloads( tor->torrentPeers );
3076        }
3077    }
3078
3079    tr_timerAddMsec( mgr->rechokeTimer, RECHOKE_PERIOD_MSEC );
3080    managerUnlock( mgr );
3081}
3082
3083/***
3084****
3085****  Life and Death
3086****
3087***/
3088
3089static tr_bool
3090shouldPeerBeClosed( const Torrent    * t,
3091                    const tr_peer    * peer,
3092                    int                peerCount,
3093                    const time_t       now )
3094{
3095    const tr_torrent *       tor = t->tor;
3096    const struct peer_atom * atom = peer->atom;
3097
3098    /* if it's marked for purging, close it */
3099    if( peer->doPurge )
3100    {
3101        tordbg( t, "purging peer %s because its doPurge flag is set",
3102                tr_atomAddrStr( atom ) );
3103        return TRUE;
3104    }
3105
3106    /* if we're seeding and the peer has everything we have,
3107     * and enough time has passed for a pex exchange, then disconnect */
3108    if( tr_torrentIsSeed( tor ) && ( peer->progress >= 1.0f ) )
3109        return !tr_torrentAllowsPex(tor) || (now-atom->time>=30);
3110
3111    /* disconnect if it's been too long since piece data has been transferred.
3112     * this is on a sliding scale based on number of available peers... */
3113    {
3114        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
3115        /* if we have >= relaxIfFewerThan, strictness is 100%.
3116         * if we have zero connections, strictness is 0% */
3117        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
3118                               ? 1.0
3119                               : peerCount / (float)relaxStrictnessIfFewerThanN;
3120        const int lo = MIN_UPLOAD_IDLE_SECS;
3121        const int hi = MAX_UPLOAD_IDLE_SECS;
3122        const int limit = hi - ( ( hi - lo ) * strictness );
3123        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
3124/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
3125        if( idleTime > limit ) {
3126            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
3127                       tr_atomAddrStr( atom ), idleTime );
3128            return TRUE;
3129        }
3130    }
3131
3132    return FALSE;
3133}
3134
3135static tr_peer **
3136getPeersToClose( Torrent * t, const time_t now_sec, int * setmeSize )
3137{
3138    int i, peerCount, outsize;
3139    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
3140    struct tr_peer ** ret = tr_new( tr_peer *, peerCount );
3141
3142    assert( torrentIsLocked( t ) );
3143
3144    for( i = outsize = 0; i < peerCount; ++i )
3145        if( shouldPeerBeClosed( t, peers[i], peerCount, now_sec ) )
3146            ret[outsize++] = peers[i];
3147
3148    *setmeSize = outsize;
3149    return ret;
3150}
3151
3152static int
3153getReconnectIntervalSecs( const struct peer_atom * atom, const time_t now )
3154{
3155    int sec;
3156
3157    /* if we were recently connected to this peer and transferring piece
3158     * data, try to reconnect to them sooner rather that later -- we don't
3159     * want network troubles to get in the way of a good peer. */
3160    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
3161        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3162
3163    /* don't allow reconnects more often than our minimum */
3164    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
3165        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3166
3167    /* otherwise, the interval depends on how many times we've tried
3168     * and failed to connect to the peer */
3169    else switch( atom->numFails ) {
3170        case 0: sec = 0; break;
3171        case 1: sec = 5; break;
3172        case 2: sec = 2 * 60; break;
3173        case 3: sec = 15 * 60; break;
3174        case 4: sec = 30 * 60; break;
3175        case 5: sec = 60 * 60; break;
3176        default: sec = 120 * 60; break;
3177    }
3178
3179    /* penalize peers that were unreachable the last time we tried */
3180    if( atom->flags2 & MYFLAG_UNREACHABLE )
3181        sec += sec;
3182
3183    dbgmsg( "reconnect interval for %s is %d seconds", tr_atomAddrStr( atom ), sec );
3184    return sec;
3185}
3186
3187static void
3188removePeer( Torrent * t, tr_peer * peer )
3189{
3190    tr_peer * removed;
3191    struct peer_atom * atom = peer->atom;
3192
3193    assert( torrentIsLocked( t ) );
3194    assert( atom );
3195
3196    atom->time = tr_time( );
3197
3198    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
3199
3200    if( replicationExists( t ) )
3201        tr_decrReplicationFromBitset( t, &peer->have );
3202
3203    assert( removed == peer );
3204    peerDestructor( t, removed );
3205}
3206
3207static void
3208closePeer( Torrent * t, tr_peer * peer )
3209{
3210    struct peer_atom * atom;
3211
3212    assert( t != NULL );
3213    assert( peer != NULL );
3214
3215    atom = peer->atom;
3216
3217    /* if we transferred piece data, then they might be good peers,
3218       so reset their `numFails' weight to zero. otherwise we connected
3219       to them fruitlessly, so mark it as another fail */
3220    if( atom->piece_data_time ) {
3221        tordbg( t, "resetting atom %s numFails to 0", tr_atomAddrStr(atom) );
3222        atom->numFails = 0;
3223    } else {
3224        ++atom->numFails;
3225        tordbg( t, "incremented atom %s numFails to %d", tr_atomAddrStr(atom), (int)atom->numFails );
3226    }
3227
3228    tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
3229    removePeer( t, peer );
3230}
3231
3232static void
3233removeAllPeers( Torrent * t )
3234{
3235    while( !tr_ptrArrayEmpty( &t->peers ) )
3236        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
3237}
3238
3239static void
3240closeBadPeers( Torrent * t, const time_t now_sec )
3241{
3242    int i;
3243    int peerCount;
3244    struct tr_peer ** peers = getPeersToClose( t, now_sec, &peerCount );
3245    for( i=0; i<peerCount; ++i )
3246        closePeer( t, peers[i] );
3247    tr_free( peers );
3248}
3249
3250struct peer_liveliness
3251{
3252    tr_peer * peer;
3253    void * clientData;
3254    time_t pieceDataTime;
3255    time_t time;
3256    int speed;
3257    tr_bool doPurge;
3258};
3259
3260static int
3261comparePeerLiveliness( const void * va, const void * vb )
3262{
3263    const struct peer_liveliness * a = va;
3264    const struct peer_liveliness * b = vb;
3265
3266    if( a->doPurge != b->doPurge )
3267        return a->doPurge ? 1 : -1;
3268
3269    if( a->speed != b->speed ) /* faster goes first */
3270        return a->speed > b->speed ? -1 : 1;
3271
3272    /* the one to give us data more recently goes first */
3273    if( a->pieceDataTime != b->pieceDataTime )
3274        return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
3275
3276    /* the one we connected to most recently goes first */
3277    if( a->time != b->time )
3278        return a->time > b->time ? -1 : 1;
3279
3280    return 0;
3281}
3282
3283static void
3284sortPeersByLivelinessImpl( tr_peer  ** peers,
3285                           void     ** clientData,
3286                           int         n,
3287                           uint64_t    now,
3288                           int (*compare) ( const void *va, const void *vb ) )
3289{
3290    int i;
3291    struct peer_liveliness *lives, *l;
3292
3293    /* build a sortable array of peer + extra info */
3294    lives = l = tr_new0( struct peer_liveliness, n );
3295    for( i=0; i<n; ++i, ++l )
3296    {
3297        tr_peer * p = peers[i];
3298        l->peer = p;
3299        l->doPurge = p->doPurge;
3300        l->pieceDataTime = p->atom->piece_data_time;
3301        l->time = p->atom->time;
3302        l->speed = tr_peerGetPieceSpeed_Bps( p, now, TR_UP )
3303                 + tr_peerGetPieceSpeed_Bps( p, now, TR_DOWN );
3304        if( clientData )
3305            l->clientData = clientData[i];
3306    }
3307
3308    /* sort 'em */
3309    assert( n == ( l - lives ) );
3310    qsort( lives, n, sizeof( struct peer_liveliness ), compare );
3311
3312    /* build the peer array */
3313    for( i=0, l=lives; i<n; ++i, ++l ) {
3314        peers[i] = l->peer;
3315        if( clientData )
3316            clientData[i] = l->clientData;
3317    }
3318    assert( n == ( l - lives ) );
3319
3320    /* cleanup */
3321    tr_free( lives );
3322}
3323
3324static void
3325sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now )
3326{
3327    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLiveliness );
3328}
3329
3330
3331static void
3332enforceTorrentPeerLimit( Torrent * t, uint64_t now )
3333{
3334    int n = tr_ptrArraySize( &t->peers );
3335    const int max = tr_torrentGetPeerLimit( t->tor );
3336    if( n > max )
3337    {
3338        void * base = tr_ptrArrayBase( &t->peers );
3339        tr_peer ** peers = tr_memdup( base, n*sizeof( tr_peer* ) );
3340        sortPeersByLiveliness( peers, NULL, n, now );
3341        while( n > max )
3342            closePeer( t, peers[--n] );
3343        tr_free( peers );
3344    }
3345}
3346
3347static void
3348enforceSessionPeerLimit( tr_session * session, uint64_t now )
3349{
3350    int n = 0;
3351    tr_torrent * tor = NULL;
3352    const int max = tr_sessionGetPeerLimit( session );
3353
3354    /* count the total number of peers */
3355    while(( tor = tr_torrentNext( session, tor )))
3356        n += tr_ptrArraySize( &tor->torrentPeers->peers );
3357
3358    /* if there are too many, prune out the worst */
3359    if( n > max )
3360    {
3361        tr_peer ** peers = tr_new( tr_peer*, n );
3362        Torrent ** torrents = tr_new( Torrent*, n );
3363
3364        /* populate the peer array */
3365        n = 0;
3366        tor = NULL;
3367        while(( tor = tr_torrentNext( session, tor ))) {
3368            int i;
3369            Torrent * t = tor->torrentPeers;
3370            const int tn = tr_ptrArraySize( &t->peers );
3371            for( i=0; i<tn; ++i, ++n ) {
3372                peers[n] = tr_ptrArrayNth( &t->peers, i );
3373                torrents[n] = t;
3374            }
3375        }
3376
3377        /* sort 'em */
3378        sortPeersByLiveliness( peers, (void**)torrents, n, now );
3379
3380        /* cull out the crappiest */
3381        while( n-- > max )
3382            closePeer( torrents[n], peers[n] );
3383
3384        /* cleanup */
3385        tr_free( torrents );
3386        tr_free( peers );
3387    }
3388}
3389
3390static void makeNewPeerConnections( tr_peerMgr * mgr, const int max );
3391
3392static void
3393reconnectPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3394{
3395    tr_torrent * tor;
3396    tr_peerMgr * mgr = vmgr;
3397    const time_t now_sec = tr_time( );
3398    const uint64_t now_msec = tr_sessionGetTimeMsec( mgr->session );
3399
3400    /**
3401    ***  enforce the per-session and per-torrent peer limits
3402    **/
3403
3404    /* if we're over the per-torrent peer limits, cull some peers */
3405    tor = NULL;
3406    while(( tor = tr_torrentNext( mgr->session, tor )))
3407        if( tor->isRunning )
3408            enforceTorrentPeerLimit( tor->torrentPeers, now_msec );
3409
3410    /* if we're over the per-session peer limits, cull some peers */
3411    enforceSessionPeerLimit( mgr->session, now_msec );
3412
3413    /* remove crappy peers */
3414    tor = NULL;
3415    while(( tor = tr_torrentNext( mgr->session, tor )))
3416        if( !tor->torrentPeers->isRunning )
3417            removeAllPeers( tor->torrentPeers );
3418        else
3419            closeBadPeers( tor->torrentPeers, now_sec );
3420
3421    /* try to make new peer connections */
3422    makeNewPeerConnections( mgr, MAX_CONNECTIONS_PER_PULSE );
3423}
3424
3425/****
3426*****
3427*****  BANDWIDTH ALLOCATION
3428*****
3429****/
3430
3431static void
3432pumpAllPeers( tr_peerMgr * mgr )
3433{
3434    tr_torrent * tor = NULL;
3435
3436    while(( tor = tr_torrentNext( mgr->session, tor )))
3437    {
3438        int j;
3439        Torrent * t = tor->torrentPeers;
3440
3441        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
3442        {
3443            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
3444            tr_peerMsgsPulse( peer->msgs );
3445        }
3446    }
3447}
3448
3449static void
3450bandwidthPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3451{
3452    tr_torrent * tor;
3453    tr_peerMgr * mgr = vmgr;
3454    managerLock( mgr );
3455
3456    /* FIXME: this next line probably isn't necessary... */
3457    pumpAllPeers( mgr );
3458
3459    /* allocate bandwidth to the peers */
3460    tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
3461    tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
3462
3463    /* possibly stop torrents that have seeded enough */
3464    tor = NULL;
3465    while(( tor = tr_torrentNext( mgr->session, tor )))
3466        tr_torrentCheckSeedLimit( tor );
3467
3468    /* run the completeness check for any torrents that need it */
3469    tor = NULL;
3470    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3471        if( tor->torrentPeers->needsCompletenessCheck ) {
3472            tor->torrentPeers->needsCompletenessCheck  = FALSE;
3473            tr_torrentRecheckCompleteness( tor );
3474        }
3475    }
3476
3477    /* stop torrents that are ready to stop, but couldn't be stopped earlier
3478     * during the peer-io callback call chain */
3479    tor = NULL;
3480    while(( tor = tr_torrentNext( mgr->session, tor )))
3481        if( tor->isStopping )
3482            tr_torrentStop( tor );
3483
3484    reconnectPulse( 0, 0, mgr );
3485
3486    tr_timerAddMsec( mgr->bandwidthTimer, BANDWIDTH_PERIOD_MSEC );
3487    managerUnlock( mgr );
3488}
3489
3490/***
3491****
3492***/
3493
3494static int
3495compareAtomPtrsByAddress( const void * va, const void *vb )
3496{
3497    const struct peer_atom * a = * (const struct peer_atom**) va;
3498    const struct peer_atom * b = * (const struct peer_atom**) vb;
3499
3500    assert( tr_isAtom( a ) );
3501    assert( tr_isAtom( b ) );
3502
3503    return tr_compareAddresses( &a->addr, &b->addr );
3504}
3505
3506/* best come first, worst go last */
3507static int
3508compareAtomPtrsByShelfDate( const void * va, const void *vb )
3509{
3510    time_t atime;
3511    time_t btime;
3512    const struct peer_atom * a = * (const struct peer_atom**) va;
3513    const struct peer_atom * b = * (const struct peer_atom**) vb;
3514    const int data_time_cutoff_secs = 60 * 60;
3515    const time_t tr_now = tr_time( );
3516
3517    assert( tr_isAtom( a ) );
3518    assert( tr_isAtom( b ) );
3519
3520    /* primary key: the last piece data time *if* it was within the last hour */
3521    atime = a->piece_data_time; if( atime + data_time_cutoff_secs < tr_now ) atime = 0;
3522    btime = b->piece_data_time; if( btime + data_time_cutoff_secs < tr_now ) btime = 0;
3523    if( atime != btime )
3524        return atime > btime ? -1 : 1;
3525
3526    /* secondary key: shelf date. */
3527    if( a->shelf_date != b->shelf_date )
3528        return a->shelf_date > b->shelf_date ? -1 : 1;
3529
3530    return 0;
3531}
3532
3533static int
3534getMaxAtomCount( const tr_torrent * tor )
3535{
3536    const int n = tor->maxConnectedPeers;
3537    /* approximate fit of the old jump discontinuous function */
3538    if( n >= 55 ) return     n + 150;
3539    if( n >= 20 ) return 2 * n + 95;
3540    return               4 * n + 55;
3541}
3542
3543static void
3544atomPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3545{
3546    tr_torrent * tor = NULL;
3547    tr_peerMgr * mgr = vmgr;
3548    managerLock( mgr );
3549
3550    while(( tor = tr_torrentNext( mgr->session, tor )))
3551    {
3552        int atomCount;
3553        Torrent * t = tor->torrentPeers;
3554        const int maxAtomCount = getMaxAtomCount( tor );
3555        struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
3556
3557        if( atomCount > maxAtomCount ) /* we've got too many atoms... time to prune */
3558        {
3559            int i;
3560            int keepCount = 0;
3561            int testCount = 0;
3562            struct peer_atom ** keep = tr_new( struct peer_atom*, atomCount );
3563            struct peer_atom ** test = tr_new( struct peer_atom*, atomCount );
3564
3565            /* keep the ones that are in use */
3566            for( i=0; i<atomCount; ++i ) {
3567                struct peer_atom * atom = atoms[i];
3568                if( peerIsInUse( t, atom ) )
3569                    keep[keepCount++] = atom;
3570                else
3571                    test[testCount++] = atom;
3572            }
3573
3574            /* if there's room, keep the best of what's left */
3575            i = 0;
3576            if( keepCount < maxAtomCount ) {
3577                qsort( test, testCount, sizeof( struct peer_atom * ), compareAtomPtrsByShelfDate );
3578                while( i<testCount && keepCount<maxAtomCount )
3579                    keep[keepCount++] = test[i++];
3580            }
3581
3582            /* free the culled atoms */
3583            while( i<testCount )
3584                tr_free( test[i++] );
3585
3586            /* rebuild Torrent.pool with what's left */
3587            tr_ptrArrayDestruct( &t->pool, NULL );
3588            t->pool = TR_PTR_ARRAY_INIT;
3589            qsort( keep, keepCount, sizeof( struct peer_atom * ), compareAtomPtrsByAddress );
3590            for( i=0; i<keepCount; ++i )
3591                tr_ptrArrayAppend( &t->pool, keep[i] );
3592
3593            tordbg( t, "max atom count is %d... pruned from %d to %d\n", maxAtomCount, atomCount, keepCount );
3594
3595            /* cleanup */
3596            tr_free( test );
3597            tr_free( keep );
3598        }
3599    }
3600
3601    tr_timerAddMsec( mgr->atomTimer, ATOM_PERIOD_MSEC );
3602    managerUnlock( mgr );
3603}
3604
3605/***
3606****
3607****
3608****
3609***/
3610
3611/* is this atom someone that we'd want to initiate a connection to? */
3612static tr_bool
3613isPeerCandidate( const tr_torrent * tor, struct peer_atom * atom, const time_t now )
3614{
3615    /* not if we're both seeds */
3616    if( tr_torrentIsSeed( tor ) )
3617        if( atomIsSeed( atom ) || ( atom->uploadOnly == UPLOAD_ONLY_YES ) )
3618            return FALSE;
3619
3620    /* not if we've already got a connection to them... */
3621    if( peerIsInUse( tor->torrentPeers, atom ) )
3622        return FALSE;
3623
3624    /* not if we just tried them already */
3625    if( ( now - atom->time ) < getReconnectIntervalSecs( atom, now ) )
3626        return FALSE;
3627
3628    /* not if they're blocklisted */
3629    if( isAtomBlocklisted( tor->session, atom ) )
3630        return FALSE;
3631
3632    /* not if they're banned... */
3633    if( atom->flags2 & MYFLAG_BANNED )
3634        return FALSE;
3635
3636    return TRUE;
3637}
3638
3639struct peer_candidate
3640{
3641    uint64_t score;
3642    tr_torrent * tor;
3643    struct peer_atom * atom;
3644};
3645
3646static tr_bool
3647torrentWasRecentlyStarted( const tr_torrent * tor )
3648{
3649    return difftime( tr_time( ), tor->startDate ) < 120;
3650}
3651
3652static inline uint64_t
3653addValToKey( uint64_t value, int width, uint64_t addme )
3654{
3655    value = (value << (uint64_t)width);
3656    value |= addme;
3657    return value;
3658}
3659
3660/* smaller value is better */
3661static uint64_t
3662getPeerCandidateScore( const tr_torrent * tor, const struct peer_atom * atom, uint8_t salt  )
3663{
3664    uint64_t i;
3665    uint64_t score = 0;
3666    const tr_bool failed = atom->lastConnectionAt < atom->lastConnectionAttemptAt;
3667
3668    /* prefer peers we've connected to, or never tried, over peers we failed to connect to. */
3669    i = failed ? 1 : 0;
3670    score = addValToKey( score, 1, i );
3671
3672    /* prefer the one we attempted least recently (to cycle through all peers) */
3673    i = atom->lastConnectionAttemptAt;
3674    score = addValToKey( score, 32, i );
3675
3676    /* prefer peers belonging to a torrent of a higher priority */
3677    switch( tr_torrentGetPriority( tor ) ) {
3678        case TR_PRI_HIGH:    i = 0; break;
3679        case TR_PRI_NORMAL:  i = 1; break;
3680        case TR_PRI_LOW:     i = 2; break;
3681    }
3682    score = addValToKey( score, 4, i );
3683
3684    /* prefer recently-started torrents */
3685    i = torrentWasRecentlyStarted( tor ) ? 0 : 1;
3686    score = addValToKey( score, 1, i );
3687
3688    /* prefer torrents we're downloading with */
3689    i = tr_torrentIsSeed( tor ) ? 1 : 0;
3690    score = addValToKey( score, 1, i );
3691
3692    /* prefer peers that are known to be connectible */
3693    i = ( atom->flags & ADDED_F_CONNECTABLE ) ? 0 : 1;
3694    score = addValToKey( score, 1, i );
3695
3696    /* prefer peers that we might have a chance of uploading to...
3697       so lower seed probability is better */
3698    if( atom->seedProbability == 100 ) i = 101;
3699    else if( atom->seedProbability == -1 ) i = 100;
3700    else i = atom->seedProbability;
3701    score = addValToKey( score, 8, i );
3702
3703    /* Prefer peers that we got from more trusted sources.
3704     * lower `fromBest' values indicate more trusted sources */
3705    score = addValToKey( score, 4, atom->fromBest );
3706
3707    /* salt */
3708    score = addValToKey( score, 8, salt );
3709
3710    return score;
3711}
3712
3713/* sort an array of peer candidates */
3714static int
3715comparePeerCandidates( const void * va, const void * vb )
3716{
3717    const struct peer_candidate * a = va;
3718    const struct peer_candidate * b = vb;
3719
3720    if( a->score < b->score ) return -1;
3721    if( a->score > b->score ) return 1;
3722
3723    return 0;
3724}
3725
3726/** @return an array of all the atoms we might want to connect to */
3727static struct peer_candidate*
3728getPeerCandidates( tr_session * session, int * candidateCount )
3729{
3730    int n;
3731    tr_torrent * tor;
3732    struct peer_candidate * candidates;
3733    struct peer_candidate * walk;
3734    const time_t now = tr_time( );
3735    const uint64_t now_msec = tr_time_msec( );
3736    /* leave 5% of connection slots for incoming connections -- ticket #2609 */
3737    const int maxCandidates = tr_sessionGetPeerLimit( session ) * 0.95;
3738
3739    /* don't start any new handshakes if we're full up */
3740    n = 0;
3741    tor= NULL;
3742    while(( tor = tr_torrentNext( session, tor )))
3743        n += tr_ptrArraySize( &tor->torrentPeers->peers );
3744    if( maxCandidates <= n ) {
3745        *candidateCount = 0;
3746        return NULL;
3747    }
3748
3749    /* allocate an array of candidates */
3750    n = 0;
3751    tor= NULL;
3752    while(( tor = tr_torrentNext( session, tor )))
3753        n += tr_ptrArraySize( &tor->torrentPeers->pool );
3754    walk = candidates = tr_new( struct peer_candidate, n );
3755
3756    /* populate the candidate array */
3757    tor = NULL;
3758    while(( tor = tr_torrentNext( session, tor )))
3759    {
3760        int i, nAtoms;
3761        struct peer_atom ** atoms;
3762
3763        if( !tor->torrentPeers->isRunning )
3764            continue;
3765
3766        /* if we've already got enough peers in this torrent... */
3767        if( tr_torrentGetPeerLimit( tor ) <= tr_ptrArraySize( &tor->torrentPeers->peers ) )
3768            continue;
3769
3770        /* if we've already got enough speed in this torrent... */
3771        if( tr_torrentIsSeed( tor ) && isBandwidthMaxedOut( tor->bandwidth, now_msec, TR_UP ) )
3772            continue;
3773
3774        atoms = (struct peer_atom**) tr_ptrArrayPeek( &tor->torrentPeers->pool, &nAtoms );
3775        for( i=0; i<nAtoms; ++i )
3776        {
3777            struct peer_atom * atom = atoms[i];
3778
3779            if( isPeerCandidate( tor, atom, now ) )
3780            {
3781                const uint8_t salt = tr_cryptoWeakRandInt( 1024 );
3782                walk->tor = tor;
3783                walk->atom = atom;
3784                walk->score = getPeerCandidateScore( tor, atom, salt );
3785                ++walk;
3786            }
3787        }
3788    }
3789
3790    *candidateCount = walk - candidates;
3791    if( *candidateCount > 1 )
3792        qsort( candidates, *candidateCount, sizeof( struct peer_candidate ), comparePeerCandidates );
3793    return candidates;
3794}
3795
3796static void
3797initiateConnection( tr_peerMgr * mgr, Torrent * t, struct peer_atom * atom )
3798{
3799    tr_peerIo * io;
3800    const time_t now = tr_time( );
3801    tr_bool utp = tr_sessionIsUTPEnabled(mgr->session) && !atom->utp_failed;
3802
3803    if( atom->fromFirst == TR_PEER_FROM_PEX )
3804        /* PEX has explicit signalling for uTP support.  If an atom
3805           originally came from PEX and doesn't have the uTP flag, skip the
3806           uTP connection attempt.  Are we being optimistic here? */
3807        utp = utp && (atom->flags & ADDED_F_UTP_FLAGS);
3808
3809    tordbg( t, "Starting an OUTGOING%s connection with %s",
3810            utp ? " µTP" : "",
3811            tr_atomAddrStr( atom ) );
3812
3813    io = tr_peerIoNewOutgoing( mgr->session,
3814                               mgr->session->bandwidth,
3815                               &atom->addr,
3816                               atom->port,
3817                               t->tor->info.hash,
3818                               t->tor->completeness == TR_SEED,
3819                               utp );
3820
3821    if( io == NULL )
3822    {
3823        tordbg( t, "peerIo not created; marking peer %s as unreachable",
3824                tr_atomAddrStr( atom ) );
3825        atom->flags2 |= MYFLAG_UNREACHABLE;
3826        atom->numFails++;
3827    }
3828    else
3829    {
3830        tr_handshake * handshake = tr_handshakeNew( io,
3831                                                    mgr->session->encryptionMode,
3832                                                    myHandshakeDoneCB,
3833                                                    mgr );
3834
3835        assert( tr_peerIoGetTorrentHash( io ) );
3836
3837        tr_peerIoUnref( io ); /* balanced by the initial ref
3838                                 in tr_peerIoNewOutgoing() */
3839
3840        tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
3841                                 handshakeCompare );
3842    }
3843
3844    atom->lastConnectionAttemptAt = now;
3845    atom->time = now;
3846}
3847
3848static void
3849initiateCandidateConnection( tr_peerMgr * mgr, struct peer_candidate * c )
3850{
3851#if 0
3852    fprintf( stderr, "Starting an OUTGOING connection with %s - [%s] seedProbability==%d; %s, %s\n",
3853             tr_atomAddrStr( c->atom ),
3854             tr_torrentName( c->tor ),
3855             (int)c->atom->seedProbability,
3856             tr_torrentIsPrivate( c->tor ) ? "private" : "public",
3857             tr_torrentIsSeed( c->tor ) ? "seed" : "downloader" );
3858#endif
3859
3860    initiateConnection( mgr, c->tor->torrentPeers, c->atom );
3861}
3862
3863static void
3864makeNewPeerConnections( struct tr_peerMgr * mgr, const int max )
3865{
3866    int i, n;
3867    struct peer_candidate * candidates;
3868
3869    candidates = getPeerCandidates( mgr->session, &n );
3870
3871    for( i=0; i<n && i<max; ++i )
3872        initiateCandidateConnection( mgr, &candidates[i] );
3873
3874    tr_free( candidates );
3875}
Note: See TracBrowser for help on using the repository browser.